简介
我已经注意到,每当提起”reactive”时,并不倾向于很实际的代码相关联.”reactive”的一方面是指”非阻塞”代码,无阻塞是指你在程序中执行了一个调用后继续往下执行其他的动作,直到收到一个之前调用产生的通知.
有很多框架使用不同的方式处理类似这样响应不能立即返回的通知,Scala中可以选择几种不同的非阻塞机制.
Futures
Scala中使用 scala.concurrent.Future 作为无阻塞访问的基础单元.
一个比较好的理解方式是,把Future作为一个box,在其中包含你想要的东西,关键的一点是你从来不会打开这个box,试图强制打开box会引起阻塞或错误,取而代之的是把Future放在一个更大的盒子,然后使用map方法.
下面这个例子中Future包含一个String,当Future完成的时候,Console.println会被调用.
object Main {
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
def main(args:Array[String]) : Unit = {
val stringFuture: Future[String] = Future.successful("hello world!")
stringFuture.map { someString =>
// if you use .foreach you avoid creating an extra Future, but we are proving
// the concept here...
Console.println(someString)
}
}
}
这个例子中Future[String]使用global提供的ExecutionContext,完成时调用Console.println,比较有趣的一点是:当someString出现时我们放弃控制权,同时Console.println会被调用,这一切都是系统本身进行管理.
下面做一个对比,当我们强制打开box时会发生什么:
val stringFuture: Future[String] = Future.successful("hello world!")
val someString = Future.await(future)
在这种情况下,我们不得不进行等待,保持线程在缓存中运行,直到someString完成.我们虽然已经打开了box,但不得不占用系统资源等待其完成.
Event Based Systems with Akka
当我们讨论scala中的响应式系统时实质上市指事件驱动的系统,Akka.当我们想从akka系统得到结果时,有两种方式可以实现.
使用tell发送消息到actor:
fooActor ! GetFoo(1)
然后依赖fooActor回复给我们一条消息:
def receive = {
case GetFoo(id) =>
sender() ! Foo(id)
}
或者使用ask,生成一个Future:
val fooFuture: Future[Foo] = fooActor ? GetFoo(1)
当actor的receive方法回复一个Foo(id)时fooFuture就会完成.或者你想用其他方式,从Future到Actor,可以使用pipeTo方法:
Future.successful {
Foo(1)
} pipeTo actorRef
通常推荐的做法是tell而不是ask.
但是一个重要的警告是:并不是所有的系统都是基于Akka的.如果要操作一个NoSQL比如Redis或Cassandra,确保你使用的是一个直接基于Future的驱动.或者你使用Play!的时候,会允许你通过Action.async传递一个Future:
def index(): Future[Result] = Action.async {
Future.successful {
Ok("hello world!") // 200 HTTP Result
}
}
这些意味着什么呢?在实践中,如果你使用的系统不是基于Akka,或者,你没有使用一个基于stream的API,比如 Iteratees/Reactive Streams,然后你又听到”响应式”或”无阻塞”,那么这时候你就应该看到Future了.
Dependent Futures
假设一个service用于传递数据:
trait FooService {
def find(fooId:FooID): Option[Foo]
}
这个服务返回一个Option[T],Option是另一种容器类型,只有两种值会返回:如果Foo(1)存在时返回Some(Foo(1)),否则返回None.使用Option则不需要像下面这样进行null值检查:
val foo:Foo = fooService.find(fooId)
if (foo != null) { // WITHOUT OPTION
Console.println("Foo is " + foo)
}
调用Option.map可以在值存在是安全的获得该值:
val maybeFoo:Option[Foo] = fooService.find(fooId)
maybeFoo.map { foo => // WITH OPTION
Console.println("Foo is " + foo)
}
可以发现Future和Option都是基于同样的工作原则:一个类型中包含另一个只有某些条件下才能得到的类型.
但是FooService并非无阻塞的,假设服务后是无阻塞的数据源,我们可以这样处理:
def find(fooId:FooID): Future[Option[Foo]]
然后这样调用:
fooService.find(fooId).map { maybeFoo =>
maybeFoo.map { foo =>
Console.println("Foo is " + foo)
}
}
但是现在有趣的是不能使用Option来组合Future.如果是两层嵌套的Future可以使用flatMap解包成一个Future,或者一个嵌套的Option也可以解包成一个单独的Option,但是不能解包一个Future[Option[T]],像这样:
fooService.find(fooId).flatMap { foo =>
Console.println("Foo is " + foo)
}
现在在FooService之外再添加两个服务:
trait BarService {
def find(barId:BarID) : Future[Option[Bar]]
}
trait QuuxService {
def find(quuxId:QuuxID) : Future[Option[Quux]]
}
假设所有的这次服务都返回单独的Future:
val fooFuture = fooService.find(FooID(1))
val barFuture = barService.find(BarID(2))
val quuxFuture = quuxService.find(QuuxID(3))
这会得到三个不同的Future,可能三个不同的线程在并行的运行他们.这种模式在各种Future中会见到.
或者更多情况下,你会需要这样的处理方式:
val fooFuture = fooService.find(1)
val barFuture = barService.find(foo.barId) // where is foo?
val quuxFuture = quuxService.find(bar.quuxId) // where is bar?
一个Future计算依赖于其他Future计算.
The Obvious Solution
比较直接的处理方式可能是这样:
fooService.get(1).map { maybeFoo =>
maybeFoo.map { foo =>
barService.get(foo.barId).map { maybeBar =>
maybeBar.map { bar =>
quuxService.get(bar.quuxId).map { maybeQuux =>
maybeQuux.map { quux =>
Console.println("Quux = " + quux)
}
}
}
}
}
}
但很明显,这种方式并不是很优雅.
For Comprehensions
有多种不同的方式可以更好的处理这种问题,我们可以使用for语句.
一种情况是嵌套的for语句:
for (maybeFoo <- fooService.find(1)) yield {
for (foo <- maybeFoo) yield ...
}
或者:
def : Future[Option[Bar]] = {
for {
Some(foo) <- fooService.find(1)
maybeBar <- barService.find(foo.barId)
} yield maybeBar
}