Simple Scala: Slick with PostgreSQL

简介

Slick意思是“Scala Language-Integrated Connection Kit”,支持与多个数据库的交互:

DB2 (via slick-extensions)
Derby/JavaDB
H2
HSQLDB/HyperSQL
Microsoft SQL Server (via slick-extensions)
MySQL
Oracle (via slick-extensions)
PostgreSQL
SQLite

配置

要在SBT项目中使用Slick,首先需要添加包依赖,这里主要介绍PostgreSQL的使用:

"com.typesafe.slick" %% "slick" % 3.1.0

然后在配置文件src/main/resources/application.conf中配置数据库的连接属性:

postgres = {
  url = "jdbc:postgresql://localhost/akka-http-rest"
  url = ${?PSQL_URL}
  user = "postgres"
  user = ${?PSQL_USER}
  password = "password"
  password = ${?PSQL_PASSWORD}
  driver = org.postgresql.Driver
  connectionPool = disabled
  keepAliveConnection = true
}

并将对应的选项修改为自己的配置.

开始使用

添加完依赖和基本配置后,编写特质提供数据库连接的创建:

// utils/PostgresConfig.scala

trait PostgresConfig {
  val driver = slick.driver.PostgresDriver              // 确认数据库驱动

  import driver.api._                                   // 引入该驱动所有API

  def db = Database.forConfig("postgres")               // 加载数据库配置文件
  implicit val session:Session = db.createSession()     // 创建数据库连接的隐式变量
}

在需要交互数据库的类中,只需要混入该特质,即可使用隐式变量session的数据库连接.

数据结构

业务数据结构定义

然后定义我们的业务数据结构:

// models/UserEntity.scala
import org.mindrot.jbcrypt.BCrypt

case class UserEntity(id: Option[Long] = None, username: String, password: String) {
  require(!username.isEmpty, "username.empty")
  require(!password.isEmpty, "password.empty")

  def withHashedPassword(): UserEntity = this.copy(password = BCrypt.hashpw(password, BCrypt.gensalt()))
}

case class UserEntityUpdate(username: Option[String] = None, password: Option[String] = None) {
  def merge(user: UserEntity): UserEntity = {
    UserEntity(user.id, username.getOrElse(user.username), password.map(ps => BCrypt.hashpw(ps, BCrypt.gensalt())).getOrElse(user.password))
  }
}

// models/TokenEntity.scala
import java.util.UUID

case class TokenEntity(id: Option[Long] = None, userId: Option[Long], token: String = UUID.randomUUID().toString.replaceAll("-", ""))

UserEntity类包含一个Option[Long]类型的的id参数并默认为None,String类型的username和password在初始化时会进行非空检查,同时提供withHashedPassword方法对密码进行加密.

TokenEntity类同样包含一个Option[Long]类型的的id参数并默认为None,userId类型为Option[Long],token为String类型,由java.util.UUID库自动生成.

数据库表结构定义

最后定义业务case类与数据库表的交互特质:

// models/db/UserEntityTable.scala
import models.UserEntity                            // 引入之前定义的业务case类和数据库连接特质
import utils.DatabaseConfig

trait UserEntityTable extends DatabaseConfig {

  import driver.api._

  class Users(tag: Tag) extends Table[UserEntity](tag, "users") {
    def id = column[Option[Long]]("id", O.PrimaryKey, O.AutoInc)
    def username = column[String]("username")
    def password = column[String]("password")

    def * = (id, username, password) <> ((UserEntity.apply _).tupled, UserEntity.unapply)
  }

  protected val users = TableQuery[Users]

}

import models.TokenEntity
import utils.DatabaseConfig

trait TokenEntityTable extends UserEntityTable with DatabaseConfig {

  import driver.api._

  class Tokens(tag: Tag) extends Table[TokenEntity](tag, "tokens") {
    def id = column[Option[Long]]("id", O.PrimaryKey, O.AutoInc)
    def userId = column[Option[Long]]("user_id")
    def token = column[String]("token")

    def userFk = foreignKey("USER_FK", userId, users)(_.id, onUpdate = ForeignKeyAction.Restrict, onDelete = ForeignKeyAction.Cascade)

    def * = (id, userId, token) <> ((TokenEntity.apply _).tupled, TokenEntity.unapply)
  }

  protected val tokens = TableQuery[Tokens]

}

字段类型

“class Users(tag: Tag) extends Table[UserEntity](tag, “users”)”,本行中定义该数据表所映射的case类和数据库中的表名.

所有的字段都使用column方法定义,同时每个字段都包含一个Scala类型和一个数据库中的字段名.基于JDBC的数据库在JdbcProfile中支持以下的原始类型:

Numeric types: Byte, Short, Int, Long, BigDecimal, Float, Double
LOB types: java.sql.Blob, java.sql.Clob, Array[Byte]
Date types: java.sql.Date, java.sql.Time, java.sql.Timestamp
Boolean
String
Unit
java.util.UUID

同时在字段名后面可以跟一个可选配置用于字段的定义,这些选项通过”O”对象实现:

PrimaryKey:定义该字段为主键
Default[T](defaultValue: T):为该列提供一个默认值
DBType(dbType: String):使用基于数据库的非标准类型,比如使用String时添加DBType("VARCHAR(20)")
AutoInc:定义该字段为自增
NotNull, Nullable:定义该字段为非空或可为空

所有的表定义都需要提供一个”*”方法,用于描述进行数据库查询时每行返回的数据结构.该定义不需要与数据库中的表完全一致,可以根据需要增加或省略一些字段.

除了表字段定义,还需要一个TableQuery值,用于表示真正的数据表:

protected val tokens = TableQuery[Tokens]

同时在”*”方法中,可以使用”<>”操作符对数据结构进行映射:

def * = (id.?, first, last) <> (User.tupled, User.unapply)

约束

通过Table的foreignKey方法定义数据表的外键约束,同时定义外键的关联动作:

class Suppliers(tag: Tag) extends Table[(Int, String, String, String, String, String)](tag, "SUPPLIERS") {
  def id = column[Int]("SUP_ID", O.PrimaryKey)
  //...
}
val suppliers = TableQuery[Suppliers]

class Coffees(tag: Tag) extends Table[(String, Int, Double, Int, Int)](tag, "COFFEES") {
  def supID = column[Int]("SUP_ID")
  //...
  def supplier = foreignKey("SUP_FK", supID, suppliers)(_.id, onUpdate=ForeignKeyAction.Restrict, onDelete=ForeignKeyAction.Cascade)
  // compiles to SQL:
  //   alter table "COFFEES" add constraint "SUP_FK" foreign key("SUP_ID")
  //     references "SUPPLIERS"("SUP_ID")
  //     on update RESTRICT on delete CASCADE
}
val coffees = TableQuery[Coffees]

如上所示:在foreignKey方法中,定义字段”supID”引用表”suppliers”中的”id”字段,同时,当被引用表的”id”字段改变,更新时不进行处理,删除时进行同步.

查询

在使用Slick进行查询时,并不是对标准的Scala类型操作,而是将各种数据类型包装称为”Rep”类型的结构,简单的对比:

// 使用Scala基本类型
case class Coffee(name: String, price: Double)
val coffees: List[Coffee] = //...

val l = coffees.filter(_.price > 8.0).map(_.name)
//                       ^       ^          ^
//                       Double  Double     String

// Slick查询中的类型
class Coffees(tag: Tag) extends Table[(String, Double)](tag, "COFFEES") {
  def name = column[String]("COF_NAME")
  def price = column[Double]("PRICE")
  def * = (name, price)
}
val coffees = TableQuery[Coffees]

val q = coffees.filter(_.price > 8.0).map(_.name)
//                       ^       ^          ^
//               Rep[Double]  Rep[Double]  Rep[String]

排序和过滤

// 过滤
val q1 = coffees.filter(_.supID === 101)
// compiles to SQL (simplified):
//   select "COF_NAME", "SUP_ID", "PRICE", "SALES", "TOTAL"
//     from "COFFEES"
//     where "SUP_ID" = 101

// 分页
val q2 = coffees.drop(10).take(5)
// compiles to SQL (simplified):
//   select "COF_NAME", "SUP_ID", "PRICE", "SALES", "TOTAL"
//     from "COFFEES"
//     limit 5 offset 10    // 忽略掉前10行,返回5行数据

// 排序
val q3 = coffees.sortBy(_.name.desc.nullsFirst)
// compiles to SQL (simplified):
//   select "COF_NAME", "SUP_ID", "PRICE", "SALES", "TOTAL"
//     from "COFFEES"
//     order by "COF_NAME" desc nulls first

// 类似根据一个web表单创建一个动态过滤条件
val criteriaColombian = Option("Colombian")
val criteriaEspresso = Option("Espresso")
val criteriaRoast:Option[String] = None

val q4 = coffees.filter { coffee =>
  List(
      criteriaColombian.map(coffee.name === _),
      criteriaEspresso.map(coffee.name === _),
      criteriaRoast.map(coffee.name === _) // not a condition as `criteriaRoast` evaluates to `None`
  ).collect({case Some(criteria)  => criteria}).reduceLeftOption(_ || _).getOrElse(true: Rep[Boolean])
}
// compiles to SQL (simplified):
//   select "COF_NAME", "SUP_ID", "PRICE", "SALES", "TOTAL"
//     from "COFFEES"
//     where ("COF_NAME" = 'Colombian' or "COF_NAME" = 'Espresso')

Joining and Zipping

Join用于将两个不同的表或查询组合成一个单独的查询,有两种不同的方式编写Join:Applicative或monadic.

Applicative joins

Applicative(应用型) join的执行是通过调用方法,将两个单独的查询联合成一个单独的查询,返回一个tuple类型的结果.跟原生SQL中的限制一样,即右侧可以不依赖于左侧.

val crossJoin = for {
  (c, s) <- coffees join suppliers
} yield (c.name, s.name)
// compiles to SQL (simplified):
//   select x2."COF_NAME", x3."SUP_NAME" from "COFFEES" x2
//     inner join "SUPPLIERS" x3

val innerJoin = for {
  (c, s) <- coffees join suppliers on (_.supID === _.id)
} yield (c.name, s.name)
// compiles to SQL (simplified):
//   select x2."COF_NAME", x3."SUP_NAME" from "COFFEES" x2
//     inner join "SUPPLIERS" x3
//     on x2."SUP_ID" = x3."SUP_ID"

val leftOuterJoin = for {
  (c, s) <- coffees joinLeft suppliers on (_.supID === _.id)
} yield (c.name, s.map(_.name))
// compiles to SQL (simplified):
//   select x2."COF_NAME", x3."SUP_NAME" from "COFFEES" x2
//     left outer join "SUPPLIERS" x3
//     on x2."SUP_ID" = x3."SUP_ID"

val rightOuterJoin = for {
  (c, s) <- coffees joinRight suppliers on (_.supID === _.id)
} yield (c.map(_.name), s.name)
// compiles to SQL (simplified):
//   select x2."COF_NAME", x3."SUP_NAME" from "COFFEES" x2
//     right outer join "SUPPLIERS" x3
//     on x2."SUP_ID" = x3."SUP_ID"

val fullOuterJoin = for {
  (c, s) <- coffees joinFull suppliers on (_.supID === _.id)
} yield (c.map(_.name), s.map(_.name))
// compiles to SQL (simplified):
//   select x2."COF_NAME", x3."SUP_NAME" from "COFFEES" x2
//     full outer join "SUPPLIERS" x3
//     on x2."SUP_ID" = x3."SUP_ID"

Monadic joins

Monadic join使用flatMap创建.它在理论上比Applicative join更有威力,因为右侧可以不用依赖左侧.

val monadicCrossJoin = for {
  c <- coffees
  s <- suppliers
} yield (c.name, s.name)
// compiles to SQL:
//   select x2."COF_NAME", x3."SUP_NAME"
//     from "COFFEES" x2, "SUPPLIERS" x3

val monadicInnerJoin = for {
  c <- coffees
  s <- suppliers if c.supID === s.id
} yield (c.name, s.name)
// compiles to SQL:
//   select x2."COF_NAME", x3."SUP_NAME"
//     from "COFFEES" x2, "SUPPLIERS" x3
//     where x2."SUP_ID" = x3."SUP_ID"

Zip joins

val zipJoinQuery = for {
  (c, s) <- coffees zip suppliers
} yield (c.name, s.name)

val zipWithJoin = for {
  res <- coffees.zipWith(suppliers, (c: Coffees, s: Suppliers) => (c.name, s.name))
} yield res

Unions

两个查询拥有兼容的类型,可以通过++ (or unionAll) 或 union进行连接:

val q1 = coffees.filter(_.price < 8.0)
val q2 = coffees.filter(_.price > 9.0)

val unionQuery = q1 union q2
// compiles to SQL (simplified):
//   select x8."COF_NAME", x8."SUP_ID", x8."PRICE", x8."SALES", x8."TOTAL"
//     from "COFFEES" x8
//     where x8."PRICE" < 8.0
//   union select x9."COF_NAME", x9."SUP_ID", x9."PRICE", x9."SALES", x9."TOTAL"
//     from "COFFEES" x9
//     where x9."PRICE" > 9.0

val unionAllQuery = q1 ++ q2
// compiles to SQL (simplified):
//   select x8."COF_NAME", x8."SUP_ID", x8."PRICE", x8."SALES", x8."TOTAL"
//     from "COFFEES" x8
//     where x8."PRICE" < 8.0
//   union all select x9."COF_NAME", x9."SUP_ID", x9."PRICE", x9."SALES", x9."TOTAL"
//     from "COFFEES" x9
//     where x9."PRICE" > 9.0

union会过滤出重复的值,”++”会将单个结果进行连接,会更有效率.

Aggregation

聚集的最简单形式包括计算从查询返回一列,一般用数字类型,如原始值:

val q = coffees.map(_.price)

val q1 = q.min
// compiles to SQL (simplified):
//   select min(x4."PRICE") from "COFFEES" x4

val q2 = q.max
// compiles to SQL (simplified):
//   select max(x4."PRICE") from "COFFEES" x4

val q3 = q.sum
// compiles to SQL (simplified):
//   select sum(x4."PRICE") from "COFFEES" x4

val q4 = q.avg
// compiles to SQL (simplified):
//   select avg(x4."PRICE") from "COFFEES" x4

这些聚合操作返回一个scalar结果而不是一个集合.有些聚合操作用于任意查询而不仅是一列:

val q1 = coffees.length
// compiles to SQL (simplified):
//   select count(1) from "COFFEES"

val q2 = coffees.exists
// compiles to SQL (simplified):
//   select exists(select * from "COFFEES")

Grouping使用groupBy方法:

val q = (for {
  c <- coffees
  s <- c.supplier
} yield (c, s)).groupBy(_._1.supID)

val q2 = q.map { case (supID, css) =>
  (supID, css.length, css.map(_._1.price).avg)
}
// compiles to SQL:
//   select x2."SUP_ID", count(1), avg(x2."PRICE")
//     from "COFFEES" x2, "SUPPLIERS" x3
//     where x3."SUP_ID" = x2."SUP_ID"
//     group by x2."SUP_ID"

Querying

一个Query可以通过调用result方法转换成一个Action,该行动就可以直接在流执行或完全物化的方式,或与其他动作还组成:

val q = coffees.map(_.price)
val action = q.result
val result: Future[Seq[Double]] = db.run(action)
val sql = action.statements.head

Deleting

Deleting的执行方式和Querying非常相似,通过调用delete方法执行:

val q = coffees.filter(_.supID === 15)
val action = q.delete
val affectedRowsCount: Future[Int] = db.run(action)
val sql = action.statements.head

Inserting

Insert操作通过表与字段的映射完成:

val insertActions = DBIO.seq(
  coffees += ("Colombian", 101, 7.99, 0, 0),

  coffees ++= Seq(
    ("French_Roast", 49, 8.99, 0, 0),
    ("Espresso",    150, 9.99, 0, 0)
  ),

  // "sales" and "total" will use the default value 0:
  coffees.map(c => (c.name, c.supID, c.price)) += ("Colombian_Decaf", 101, 8.99)
)

// Get the statement without having to specify a value to insert:
val sql = coffees.insertStatement

// compiles to SQL:
//   INSERT INTO "COFFEES" ("COF_NAME","SUP_ID","PRICE","SALES","TOTAL") VALUES (?,?,?,?,?)

// 插入后返回指定的字段
val userId =
  (users returning users.map(_.id)) += User(None, "Stefan", "Zeiger")

// 可以在returning方法后面跟随into方法,可以把已经插入的数据和插入时生成的值生成一个新的值:
val userWithId =
  (users returning users.map(_.id)
         into ((user,id) => user.copy(id=Some(id)))
  ) += User(None, "Stefan", "Zeiger")

// 使用Query插入
class Users2(tag: Tag) extends Table[(Int, String)](tag, "users2") {
  def id = column[Int]("id", O.PrimaryKey)
  def name = column[String]("name")
  def * = (id, name)
}
val users2 = TableQuery[Users2]

val actions = DBIO.seq(
  users2.schema.create,
  users2 forceInsertQuery (users.map { u => (u.id, u.first ++ " " ++ u.last) }),
  users2 forceInsertExpr (users.length + 1, "admin")
)

Updating

val q = for { c <- coffees if c.name === "Espresso" } yield c.price
val updateAction = q.update(10.49)

// Get the statement without having to specify an updated value:
val sql = q.updateStatement

// compiles to SQL:
//   update "COFFEES" set "PRICE" = ? where "COFFEES"."COF_NAME" = 'Espresso'

Upserting

val updated = users.insertOrUpdate(User(Some(1), "Admin", "Zeiger"))
// returns: number of rows updated

val updatedAdmin = (users returning users).insertOrUpdate(User(Some(1), "Slick Admin", "Zeiger"))
// returns: None if updated, Some((Int, String)) if row inserted

Compiled Queries

编译查询语句:

def userNameByIDRange(min: Rep[Int], max: Rep[Int]) =
  for {
    u <- users if u.id >= min && u.id < max
  } yield u.first

val userNameByIDRangeCompiled = Compiled(userNameByIDRange _)

// The query will be compiled only once:
val namesAction1 = userNameByIDRangeCompiled(2, 5).result
val namesAction2 = userNameByIDRangeCompiled(1, 3).result
// Also works for .insert, .update and .delete

用于分页:

val userPaged = Compiled((d: ConstColumn[Long], t: ConstColumn[Long]) => users.drop(d).take(t))

val usersAction1 = userPaged(2, 1).result
val usersAction2 = userPaged(1, 3).result

使用flatMap实现:

val userNameByID = for {
  id <- Parameters[Int]
  u <- users if u.id === id
} yield u.first

val nameAction = userNameByID(2).result.head

val userNameByIDRange = for {
  (min, max) <- Parameters[(Int, Int)]
  u <- users if u.id >= min && u.id < max
} yield u.first

val namesAction = userNameByIDRange(2, 5).result