Simple Scala: Composing Dependent Futures

简介

我已经注意到,每当提起”reactive”时,并不倾向于很实际的代码相关联.”reactive”的一方面是指”非阻塞”代码,无阻塞是指你在程序中执行了一个调用后继续往下执行其他的动作,直到收到一个之前调用产生的通知.

有很多框架使用不同的方式处理类似这样响应不能立即返回的通知,Scala中可以选择几种不同的非阻塞机制.

Futures

Scala中使用 scala.concurrent.Future 作为无阻塞访问的基础单元.

一个比较好的理解方式是,把Future作为一个box,在其中包含你想要的东西,关键的一点是你从来不会打开这个box,试图强制打开box会引起阻塞或错误,取而代之的是把Future放在一个更大的盒子,然后使用map方法.

下面这个例子中Future包含一个String,当Future完成的时候,Console.println会被调用.

object Main {
  import scala.concurrent.Future
  import scala.concurrent.ExecutionContext.Implicits.global
  def main(args:Array[String]) : Unit = {
    val stringFuture: Future[String] = Future.successful("hello world!")
    stringFuture.map { someString =>
      // if you use .foreach you avoid creating an extra Future, but we are proving
      // the concept here...
      Console.println(someString)
    }
  }
}

这个例子中Future[String]使用global提供的ExecutionContext,完成时调用Console.println,比较有趣的一点是:当someString出现时我们放弃控制权,同时Console.println会被调用,这一切都是系统本身进行管理.

下面做一个对比,当我们强制打开box时会发生什么:

val stringFuture: Future[String] = Future.successful("hello world!")
val someString = Future.await(future)

在这种情况下,我们不得不进行等待,保持线程在缓存中运行,直到someString完成.我们虽然已经打开了box,但不得不占用系统资源等待其完成.

Event Based Systems with Akka

当我们讨论scala中的响应式系统时实质上市指事件驱动的系统,Akka.当我们想从akka系统得到结果时,有两种方式可以实现.

使用tell发送消息到actor:

fooActor ! GetFoo(1)

然后依赖fooActor回复给我们一条消息:

def receive = {
  case GetFoo(id) =>
    sender() ! Foo(id)
}

或者使用ask,生成一个Future:

val fooFuture: Future[Foo] = fooActor ? GetFoo(1)

当actor的receive方法回复一个Foo(id)时fooFuture就会完成.或者你想用其他方式,从Future到Actor,可以使用pipeTo方法:

Future.successful {
  Foo(1)
} pipeTo actorRef

通常推荐的做法是tell而不是ask.

但是一个重要的警告是:并不是所有的系统都是基于Akka的.如果要操作一个NoSQL比如Redis或Cassandra,确保你使用的是一个直接基于Future的驱动.或者你使用Play!的时候,会允许你通过Action.async传递一个Future:

def index(): Future[Result] = Action.async {
  Future.successful {
    Ok("hello world!") // 200 HTTP Result
  }
}

这些意味着什么呢?在实践中,如果你使用的系统不是基于Akka,或者,你没有使用一个基于stream的API,比如 Iteratees/Reactive Streams,然后你又听到”响应式”或”无阻塞”,那么这时候你就应该看到Future了.

Dependent Futures

假设一个service用于传递数据:

trait FooService {
  def find(fooId:FooID): Option[Foo]
}

这个服务返回一个Option[T],Option是另一种容器类型,只有两种值会返回:如果Foo(1)存在时返回Some(Foo(1)),否则返回None.使用Option则不需要像下面这样进行null值检查:

val foo:Foo = fooService.find(fooId)
if (foo != null) { // WITHOUT OPTION
  Console.println("Foo is " + foo)
}

调用Option.map可以在值存在是安全的获得该值:

val maybeFoo:Option[Foo] = fooService.find(fooId)
maybeFoo.map { foo => // WITH OPTION
  Console.println("Foo is " + foo)
}

可以发现Future和Option都是基于同样的工作原则:一个类型中包含另一个只有某些条件下才能得到的类型.

但是FooService并非无阻塞的,假设服务后是无阻塞的数据源,我们可以这样处理:

def find(fooId:FooID): Future[Option[Foo]]

然后这样调用:

fooService.find(fooId).map { maybeFoo =>
  maybeFoo.map { foo =>
    Console.println("Foo is " + foo)
  }
}

但是现在有趣的是不能使用Option来组合Future.如果是两层嵌套的Future可以使用flatMap解包成一个Future,或者一个嵌套的Option也可以解包成一个单独的Option,但是不能解包一个Future[Option[T]],像这样:

fooService.find(fooId).flatMap { foo =>
  Console.println("Foo is " + foo)
}

现在在FooService之外再添加两个服务:

trait BarService {
  def find(barId:BarID) : Future[Option[Bar]]
}
trait QuuxService {
  def find(quuxId:QuuxID) : Future[Option[Quux]]
}

假设所有的这次服务都返回单独的Future:

val fooFuture = fooService.find(FooID(1))
val barFuture = barService.find(BarID(2))
val quuxFuture = quuxService.find(QuuxID(3))

这会得到三个不同的Future,可能三个不同的线程在并行的运行他们.这种模式在各种Future中会见到.

或者更多情况下,你会需要这样的处理方式:

val fooFuture = fooService.find(1)
val barFuture = barService.find(foo.barId) // where is foo?
val quuxFuture = quuxService.find(bar.quuxId) // where is bar?

一个Future计算依赖于其他Future计算.

The Obvious Solution

比较直接的处理方式可能是这样:

fooService.get(1).map { maybeFoo =>
  maybeFoo.map { foo =>
    barService.get(foo.barId).map { maybeBar =>
      maybeBar.map { bar =>
        quuxService.get(bar.quuxId).map { maybeQuux =>
          maybeQuux.map { quux =>
            Console.println("Quux = " + quux)
          }
        }
      }
    }
  }
}

但很明显,这种方式并不是很优雅.

For Comprehensions

有多种不同的方式可以更好的处理这种问题,我们可以使用for语句.

一种情况是嵌套的for语句:

for (maybeFoo <- fooService.find(1)) yield {
  for (foo <- maybeFoo) yield ...
}

或者:

def : Future[Option[Bar]] = {
  for {
    Some(foo) <- fooService.find(1)
    maybeBar <- barService.find(foo.barId)
  } yield maybeBar
}