Simple Akka: Futures

介绍

在Scala标准库中,Future是一种从并发操作中获取处理结果的数据结构.这些结果可以以异步(无阻塞)或同步(阻塞)的方式被访问.

执行上下文(Execution Contexts)

为了执行回调和操作,Futures需要一个叫做ExecutionContext的东西,非常类似于java.util.concurrent.Executor.如果你在作用域中有一个ActorSystem,Future会使用默认的调度器(dispatcher)作为ExecutionContext,或者你可以使用ExecutionContext伴生对象中提供的工厂方法来提供Executors和ExecutorServices,或者自己创建一个.

import scala.concurrent.{ ExecutionContext, Promise }

implicit val ec = ExecutionContext.fromExecutorService(yourExecutorServiceGoesHere)

// Do stuff with your brand new shiny ExecutionContext
val f = Promise.successful("foo")

// Then shut your ExecutionContext down at some
// appropriate place in your program/application
ec.shutdown()

在Actors内部使用

每个actor都会被配置基于一个MessageDispatcher运行,这个调度器(dispatcher)同时是一个ExecutionContext.如果Future和actor属性匹配或者执行的活动跟actor兼容,比如所有CPU的约束或无延迟的要求,那么最简单的方式就是通过导入context.dispatcher来重用调度器(dispatcher)以运行Futures.

class A extends Actor {
  import context.dispatcher
  val f = Future("hello")
  def receive = {
    // receive omitted ...
  }
}

与Actor一起使用

通常有两种方法从actor中获取回复.第一种情况,如果原始的sender(original sender,即消息发送方)是一个actor,可以使用(actor ! msg).第二种情况就是通过Future.

使用Actor的 “?” 方法(ask)会返回一个Future:

import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

implicit val timeout = Timeout(5 seconds)
val future = actor ? msg // enabled by the “ask” import
val result = Await.result(future, timeout.duration).asInstanceOf[String]

这种用法会导致线程阻塞以等待Actor对Futur完成填充从而进行回复.阻塞会带来性能问题因此并不被鼓励使用.这些位于Await.result和Await.ready的阻塞操作使其更容易发现阻塞发生位置,避免阻塞的方案会在本文档的其他位置进行讨论.

同时需要注意的是,由于Actor是动态的,因此从Actor重返回的Future类型是一个Future[Any],这也是上例中使用asInstanceOf的原因.在无阻塞处理中使用maoTo方法可以更好的把Future作为预期的类型进行计算.

import scala.concurrent.Future
import akka.pattern.ask

val future: Future[String] = ask(actor, msg).mapTo[String]

如果计算成功的话,mapTo方法会返回一个包含结果的新Future,失败的话返回一个ClassCastException,异常处理会在稍后讨论.

可以通过pipe结果把Future类型的结果发送给Actor:

import akka.pattern.pipe
future pipeTo actor

更直接的使用

Akka的另一种常见用例是执行一些并行计算,同时又不需要一些额外的actor.如果你需要创建一个actor池只是为了进行一些并行计算,这有另一种更加简单快速的方式:

import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._

val future = Future {
  "Hello" + "World"
}
future foreach println

上例传送给Future的代码块会被默认的Dispatcher执行,然后使用代码块返回的计算结果(“HelloWorld”)来填充Future.不像从Actor中返回Future,这个Future是被预定义类型的,同时有避免了另外管理一个actor的开销.

同样你也可以使用Future伴生对象创建一个已完成的Future,并且可以是成功的:

val future = Future.successful("Yay!")

或失败的(失败的Future中包含一个异常):

val otherFuture = Future.failed[String](new IllegalArgumentException("Bang!"))

或者创建一个空得Promise,稍后进行填充并获得对应的Future:

val promise = Promise[String]()
val theFuture = promise.future
promise.success("hello")

函数式的Future

Scala中的Future有一些monadic方法,和collection集合中的方法非常类似.这允许你创建pipelines或streams以使结果进行来往.

Future是Monad

Future第一个函数式的方法时map.这个方法接收一个方法用于在Future的结果上执行操作,然后返回一个新的结果.map方法的返回值是一个包含结果的新的Future.

val f1 = Future {
  "Hello" + "World"
}
val f2 = f1 map { x =>
  x.length
}
f2 foreach println

这个例子中,在一个Future中将两个字符串进行jion处理.与其等待这个计算完成,我们通过map方法把计算字符串长度的函数作用到该Future上.现在我们得到第二个Future,最后会包含一个Int.当最初的第一个Future完成时即会执行我们的函数,然后将该函数的结果填充以完成第二个Future.我们最终得到的结果会包含一个数字10,而第一个Future仍然包含一个”HelloWorld”字符串而没有被map方法改变.

当我们处理单个Future时map方法能很好的工作,但是如果有两个或更多的Future时,map方法则不会允许你将他们结合在一起:

val f1 = Future {
  "Hello" + "World"
}
val f2 = Future.successful(3)
val f3 = f1 map { x =>
  f2 map { y =>
    x.length * y
  }
}
f3 foreach println

f3会是一个Future[Future[Int]]而不是Future[Int].这种情况就需要使用flatMap方法:

val f1 = Future {
  "Hello" + "World"
}
val f2 = Future.successful(3)
val f3 = f1 flatMap { x =>
  f2 map { y =>
    x.length * y
  }
}
f3 foreach println

使用嵌套组合子有时会变得非常复杂并难以阅读.这时候,使用for表达式会使代码更加清晰易读,或者需要添加判断语法时可以使用filter方法:

val future1 = Future.successful(4)
val future2 = future1.filter(_ % 2 == 0)

future2 foreach println

val failedFilter = future1.filter(_ % 2 == 1).recover {
  // When filter fails, it will have a java.util.NoSuchElementException
  case m: NoSuchElementException => 0
}

failedFilter foreach println

For表达式

Future除了可以使用map/flatMap/filter方法,还可以简单的使用for表达式:

val f = for {
  a <- Future(10 / 2) // 10 / 2 = 5
  b <- Future(a + 1) //  5 + 1 = 6
  c <- Future(a - 1) //  5 - 1 = 4
  if c > 3 // Future.filter
} yield b * c //  6 * 4 = 24

// 这种写法写a/b/c并不是并行执行

f foreach println

上面这种处理方式虽然看起来是可以并行处理的,但是要理解的是for表达式中的语句都是顺序执行的.正确的做法是先单独的创建每个Future,然后再将他们结合再一起.

组合做个Future

上面的例子中演示了如何组合多个future进行计算.这种情况的常见用例是将多个actor回复的结果组合成一个单独的计算,而不需要调用Await.result或Await.ready阻塞来得到每个结果.下面是一个使用Await.result的例子:

val f1 = ask(actor1, msg1)
val f2 = ask(actor2, msg2)

val a = Await.result(f1, 3 seconds).asInstanceOf[Int]
val b = Await.result(f2, 3 seconds).asInstanceOf[Int]

val f3 = ask(actor3, (a + b))

val result = Await.result(f3, 3 seconds).asInstanceOf[Int]

这个例子中,我们在将结果发到第三个actor前首先要等待两个actor返回结果.一共使用了三次Await.result,这会导致我们的小程序在得到结果前需要阻塞三次,现在对比下面这个例子:

val f1 = ask(actor1, msg1)
val f2 = ask(actor2, msg2)

val f3 = for {
  a <- f1.mapTo[Int]
  b <- f2.mapTo[Int]
  c <- ask(actor3, (a + b)).mapTo[Int]
} yield c

f3 foreach println

这里两个actor各自处理一条消息,一旦两个结果可用,即完成(这里并没有阻塞来获取结果),他们就会被组合到一起然后发送给第三个actor,最终回复一个字符串,即我们要的结果.

这种处理方式对于处理已知数量的actor很合适,但是数量一多会变得难以控制.这时sequence和traverse方法可以使处理更复杂用例时变得简单.两个方法都可以用于转换,比如一个Traversable的子类T,然后将T[Future[A]]转换为Future[T[A]],实例:

// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
val listOfFutures = List.fill(100)(akka.pattern.ask(oddActor, GetNext).mapTo[Int])

// now we have a Future[List[Int]]
val futureList = Future.sequence(listOfFutures)

// Find the sum of the odd numbers
val oddSum = futureList.map(_.sum)
oddSum foreach println

详细解释这个例子中发生了什么,Future.sequence接收一个List[Future[Int]]然后将它转换成一个Future[List[Int]],然后就可以直接使用map方法处理List[Int],求得这个List得sum值.

traverse方法和sequence类似,但是它是接收一个T[A]和一个函数A => Future[B],然后返回一个Future[T[B]],其中T是Traversable得一个子类,例如求前100个偶数的和:

val futureList = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1))
val oddSum = futureList.map(_.sum)
oddSum foreach println

下面这个例子也会得到同样的结果:

val futureList = Future.sequence((1 to 100).toList.map(x => Future(x * 2 - 1)))
val oddSum = futureList.map(_.sum)
oddSum foreach println

但是就速度来说traverse会更快,因为它不需要创建一个中间的List[Future[Int]].

fold方法接收一个起始值,一个Future序列,一个函数,该函数 根据初始值和Future的类型返回一个跟初始值类型一样的值,然后将该函数作用到Future序列的所有元素上,并且是以异步的方式:

// Create a sequence of Futures
val futures = for (i <- 1 to 1000) yield Future(i * 2)
val futureSum = Future.fold(futures)(0)(_ + _)
futureSum foreach println

如果传给fold的序列是空的,则会返回初始值,上面的用例中将会返回0,如果有些用例中你不提供初始值,则会使用Future序列中第一个完成的Future中的值作为初始值,这样可以使用reduce方法:

// Create a sequence of Futures
val futures = for (i <- 1 to 1000) yield Future(i * 2)
val futureSum = Future.reduce(futures)(_ + _)
futureSum foreach println

跟fold一样,整个执行过程会在最后一个Future完成时异步的结束,当还还可以嵌套的处理多个序列,然后在调用reduce处理多个序列经过reduce处理的结果.

回调

有时候你只是需要监听一个Future完成,但并不是通过的返回一个新的Future进行响应,而是通过它的副作用.这种场景下Scala提供了onComplete, onSuccess和onFailure,后面两种情况是第一个分割.

future onSuccess {
  case "bar"     => println("Got my bar alright!")
  case x: String => println("Got some random string: " + x)
}

future onFailure {
  case ise: IllegalStateException if ise.getMessage == "OHNOES" =>
  //OHNOES! We are in deep trouble, do something!
  case e: Exception =>
  //Do something else
}

future onComplete {
  case Success(result)  => doSomethingOnSuccess(result)
  case Failure(failure) => doSomethingOnFailure(failure)
}

定义顺序

由于回调以任何顺序和潜在的并行执行,这在你需要一些顺序执行时会比较棘手.andThen用于解决这种场景的问题,它在预期的回调中创建一个Future:

val result = Future { loadPage(url) } andThen {
  case Failure(exception) => log(exception)
} andThen {
  case _ => watchSomeTV()
}
result foreach println

辅助方法

Future的fallbackTo方法将两个Future组合成一个新的Future,如果第一个Future失败的话则会包含第二个Future成功的结果.

val future4 = future1 fallbackTo future2 fallbackTo future3
future4 foreach println

同样可以将两个Future组合成一个新的Future同时包含两个Future成功结果的元祖,使用zip操作:

val future3 = future1 zip future2 map { case (a, b) => a + " " + b }
future3 foreach println

异常处理

由于Future的结果是以并行的方式进行处理,异常必须以不同的方式进行处理.

如果实在Actor或dispatcher处理Future则没有关系,如果捕捉到一个异常,则该Future会包含该异常而不是一个正确值.如果一个Future包含一个异常,调用Await.result方法会重新抛出该异常,因此能够进行正确的处理.

同样也可以返回一个不同的值来处理异常,使用recover实现:

val future = akka.pattern.ask(actor, msg1) recover {
  case e: ArithmeticException => 0
}
future foreach println

这个例子中,如果actor回复一个包含ArithmeticException的akka.actor.Status.Failure,Future中这会包含一个0,recover方法的工作的工作方式和try/catch非常相似,这种风格可以同时处理多个异常.

同时可以使用recoverWith方法,和recover关系类似于flatMap之于map:

val future = akka.pattern.ask(actor, msg1) recoverWith {
  case e: ArithmeticException => Future.successful(0)
  case foo: IllegalArgumentException =>
    Future.failed[Int](new IllegalStateException("All br0ken!"))
}
future foreach println

After

akka.pattern.after可以简单的在一段固定的超时时间后完成一个包含值或异常的Future:

// TODO after is unfortunately shadowed by ScalaTest, fix as part of #3759
// import akka.pattern.after

val delayed = akka.pattern.after(200 millis, using = system.scheduler)(Future.failed(
  new IllegalStateException("OHNOES")))
val future = Future { Thread.sleep(1000); "foo" }
val result = Future firstCompletedOf Seq(future, delayed)