介绍
在Scala标准库中,Future是一种从并发操作中获取处理结果的数据结构.这些结果可以以异步(无阻塞)或同步(阻塞)的方式被访问.
执行上下文(Execution Contexts)
为了执行回调和操作,Futures需要一个叫做ExecutionContext的东西,非常类似于java.util.concurrent.Executor.如果你在作用域中有一个ActorSystem,Future会使用默认的调度器(dispatcher)作为ExecutionContext,或者你可以使用ExecutionContext伴生对象中提供的工厂方法来提供Executors和ExecutorServices,或者自己创建一个.
import scala.concurrent.{ ExecutionContext, Promise }
implicit val ec = ExecutionContext.fromExecutorService(yourExecutorServiceGoesHere)
// Do stuff with your brand new shiny ExecutionContext
val f = Promise.successful("foo")
// Then shut your ExecutionContext down at some
// appropriate place in your program/application
ec.shutdown()
在Actors内部使用
每个actor都会被配置基于一个MessageDispatcher运行,这个调度器(dispatcher)同时是一个ExecutionContext.如果Future和actor属性匹配或者执行的活动跟actor兼容,比如所有CPU的约束或无延迟的要求,那么最简单的方式就是通过导入context.dispatcher来重用调度器(dispatcher)以运行Futures.
class A extends Actor {
import context.dispatcher
val f = Future("hello")
def receive = {
// receive omitted ...
}
}
与Actor一起使用
通常有两种方法从actor中获取回复.第一种情况,如果原始的sender(original sender,即消息发送方)是一个actor,可以使用(actor ! msg).第二种情况就是通过Future.
使用Actor的 “?” 方法(ask)会返回一个Future:
import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
implicit val timeout = Timeout(5 seconds)
val future = actor ? msg // enabled by the “ask” import
val result = Await.result(future, timeout.duration).asInstanceOf[String]
这种用法会导致线程阻塞以等待Actor对Futur完成填充从而进行回复.阻塞会带来性能问题因此并不被鼓励使用.这些位于Await.result和Await.ready的阻塞操作使其更容易发现阻塞发生位置,避免阻塞的方案会在本文档的其他位置进行讨论.
同时需要注意的是,由于Actor是动态的,因此从Actor重返回的Future类型是一个Future[Any],这也是上例中使用asInstanceOf的原因.在无阻塞处理中使用maoTo方法可以更好的把Future作为预期的类型进行计算.
import scala.concurrent.Future
import akka.pattern.ask
val future: Future[String] = ask(actor, msg).mapTo[String]
如果计算成功的话,mapTo方法会返回一个包含结果的新Future,失败的话返回一个ClassCastException,异常处理会在稍后讨论.
可以通过pipe结果把Future类型的结果发送给Actor:
import akka.pattern.pipe
future pipeTo actor
更直接的使用
Akka的另一种常见用例是执行一些并行计算,同时又不需要一些额外的actor.如果你需要创建一个actor池只是为了进行一些并行计算,这有另一种更加简单快速的方式:
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
val future = Future {
"Hello" + "World"
}
future foreach println
上例传送给Future的代码块会被默认的Dispatcher执行,然后使用代码块返回的计算结果(“HelloWorld”)来填充Future.不像从Actor中返回Future,这个Future是被预定义类型的,同时有避免了另外管理一个actor的开销.
同样你也可以使用Future伴生对象创建一个已完成的Future,并且可以是成功的:
val future = Future.successful("Yay!")
或失败的(失败的Future中包含一个异常):
val otherFuture = Future.failed[String](new IllegalArgumentException("Bang!"))
或者创建一个空得Promise,稍后进行填充并获得对应的Future:
val promise = Promise[String]()
val theFuture = promise.future
promise.success("hello")
函数式的Future
Scala中的Future有一些monadic方法,和collection集合中的方法非常类似.这允许你创建pipelines或streams以使结果进行来往.
Future是Monad
Future第一个函数式的方法时map.这个方法接收一个方法用于在Future的结果上执行操作,然后返回一个新的结果.map方法的返回值是一个包含结果的新的Future.
val f1 = Future {
"Hello" + "World"
}
val f2 = f1 map { x =>
x.length
}
f2 foreach println
这个例子中,在一个Future中将两个字符串进行jion处理.与其等待这个计算完成,我们通过map方法把计算字符串长度的函数作用到该Future上.现在我们得到第二个Future,最后会包含一个Int.当最初的第一个Future完成时即会执行我们的函数,然后将该函数的结果填充以完成第二个Future.我们最终得到的结果会包含一个数字10,而第一个Future仍然包含一个”HelloWorld”字符串而没有被map方法改变.
当我们处理单个Future时map方法能很好的工作,但是如果有两个或更多的Future时,map方法则不会允许你将他们结合在一起:
val f1 = Future {
"Hello" + "World"
}
val f2 = Future.successful(3)
val f3 = f1 map { x =>
f2 map { y =>
x.length * y
}
}
f3 foreach println
f3会是一个Future[Future[Int]]而不是Future[Int].这种情况就需要使用flatMap方法:
val f1 = Future {
"Hello" + "World"
}
val f2 = Future.successful(3)
val f3 = f1 flatMap { x =>
f2 map { y =>
x.length * y
}
}
f3 foreach println
使用嵌套组合子有时会变得非常复杂并难以阅读.这时候,使用for表达式会使代码更加清晰易读,或者需要添加判断语法时可以使用filter方法:
val future1 = Future.successful(4)
val future2 = future1.filter(_ % 2 == 0)
future2 foreach println
val failedFilter = future1.filter(_ % 2 == 1).recover {
// When filter fails, it will have a java.util.NoSuchElementException
case m: NoSuchElementException => 0
}
failedFilter foreach println
For表达式
Future除了可以使用map/flatMap/filter方法,还可以简单的使用for表达式:
val f = for {
a <- Future(10 / 2) // 10 / 2 = 5
b <- Future(a + 1) // 5 + 1 = 6
c <- Future(a - 1) // 5 - 1 = 4
if c > 3 // Future.filter
} yield b * c // 6 * 4 = 24
// 这种写法写a/b/c并不是并行执行
f foreach println
上面这种处理方式虽然看起来是可以并行处理的,但是要理解的是for表达式中的语句都是顺序执行的.正确的做法是先单独的创建每个Future,然后再将他们结合再一起.
组合做个Future
上面的例子中演示了如何组合多个future进行计算.这种情况的常见用例是将多个actor回复的结果组合成一个单独的计算,而不需要调用Await.result或Await.ready阻塞来得到每个结果.下面是一个使用Await.result的例子:
val f1 = ask(actor1, msg1)
val f2 = ask(actor2, msg2)
val a = Await.result(f1, 3 seconds).asInstanceOf[Int]
val b = Await.result(f2, 3 seconds).asInstanceOf[Int]
val f3 = ask(actor3, (a + b))
val result = Await.result(f3, 3 seconds).asInstanceOf[Int]
这个例子中,我们在将结果发到第三个actor前首先要等待两个actor返回结果.一共使用了三次Await.result,这会导致我们的小程序在得到结果前需要阻塞三次,现在对比下面这个例子:
val f1 = ask(actor1, msg1)
val f2 = ask(actor2, msg2)
val f3 = for {
a <- f1.mapTo[Int]
b <- f2.mapTo[Int]
c <- ask(actor3, (a + b)).mapTo[Int]
} yield c
f3 foreach println
这里两个actor各自处理一条消息,一旦两个结果可用,即完成(这里并没有阻塞来获取结果),他们就会被组合到一起然后发送给第三个actor,最终回复一个字符串,即我们要的结果.
这种处理方式对于处理已知数量的actor很合适,但是数量一多会变得难以控制.这时sequence和traverse方法可以使处理更复杂用例时变得简单.两个方法都可以用于转换,比如一个Traversable的子类T,然后将T[Future[A]]转换为Future[T[A]],实例:
// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
val listOfFutures = List.fill(100)(akka.pattern.ask(oddActor, GetNext).mapTo[Int])
// now we have a Future[List[Int]]
val futureList = Future.sequence(listOfFutures)
// Find the sum of the odd numbers
val oddSum = futureList.map(_.sum)
oddSum foreach println
详细解释这个例子中发生了什么,Future.sequence接收一个List[Future[Int]]然后将它转换成一个Future[List[Int]],然后就可以直接使用map方法处理List[Int],求得这个List得sum值.
traverse方法和sequence类似,但是它是接收一个T[A]和一个函数A => Future[B],然后返回一个Future[T[B]],其中T是Traversable得一个子类,例如求前100个偶数的和:
val futureList = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1))
val oddSum = futureList.map(_.sum)
oddSum foreach println
下面这个例子也会得到同样的结果:
val futureList = Future.sequence((1 to 100).toList.map(x => Future(x * 2 - 1)))
val oddSum = futureList.map(_.sum)
oddSum foreach println
但是就速度来说traverse会更快,因为它不需要创建一个中间的List[Future[Int]].
fold方法接收一个起始值,一个Future序列,一个函数,该函数 根据初始值和Future的类型返回一个跟初始值类型一样的值,然后将该函数作用到Future序列的所有元素上,并且是以异步的方式:
// Create a sequence of Futures
val futures = for (i <- 1 to 1000) yield Future(i * 2)
val futureSum = Future.fold(futures)(0)(_ + _)
futureSum foreach println
如果传给fold的序列是空的,则会返回初始值,上面的用例中将会返回0,如果有些用例中你不提供初始值,则会使用Future序列中第一个完成的Future中的值作为初始值,这样可以使用reduce方法:
// Create a sequence of Futures
val futures = for (i <- 1 to 1000) yield Future(i * 2)
val futureSum = Future.reduce(futures)(_ + _)
futureSum foreach println
跟fold一样,整个执行过程会在最后一个Future完成时异步的结束,当还还可以嵌套的处理多个序列,然后在调用reduce处理多个序列经过reduce处理的结果.
回调
有时候你只是需要监听一个Future完成,但并不是通过的返回一个新的Future进行响应,而是通过它的副作用.这种场景下Scala提供了onComplete, onSuccess和onFailure,后面两种情况是第一个分割.
future onSuccess {
case "bar" => println("Got my bar alright!")
case x: String => println("Got some random string: " + x)
}
future onFailure {
case ise: IllegalStateException if ise.getMessage == "OHNOES" =>
//OHNOES! We are in deep trouble, do something!
case e: Exception =>
//Do something else
}
future onComplete {
case Success(result) => doSomethingOnSuccess(result)
case Failure(failure) => doSomethingOnFailure(failure)
}
定义顺序
由于回调以任何顺序和潜在的并行执行,这在你需要一些顺序执行时会比较棘手.andThen用于解决这种场景的问题,它在预期的回调中创建一个Future:
val result = Future { loadPage(url) } andThen {
case Failure(exception) => log(exception)
} andThen {
case _ => watchSomeTV()
}
result foreach println
辅助方法
Future的fallbackTo方法将两个Future组合成一个新的Future,如果第一个Future失败的话则会包含第二个Future成功的结果.
val future4 = future1 fallbackTo future2 fallbackTo future3
future4 foreach println
同样可以将两个Future组合成一个新的Future同时包含两个Future成功结果的元祖,使用zip操作:
val future3 = future1 zip future2 map { case (a, b) => a + " " + b }
future3 foreach println
异常处理
由于Future的结果是以并行的方式进行处理,异常必须以不同的方式进行处理.
如果实在Actor或dispatcher处理Future则没有关系,如果捕捉到一个异常,则该Future会包含该异常而不是一个正确值.如果一个Future包含一个异常,调用Await.result方法会重新抛出该异常,因此能够进行正确的处理.
同样也可以返回一个不同的值来处理异常,使用recover实现:
val future = akka.pattern.ask(actor, msg1) recover {
case e: ArithmeticException => 0
}
future foreach println
这个例子中,如果actor回复一个包含ArithmeticException的akka.actor.Status.Failure,Future中这会包含一个0,recover方法的工作的工作方式和try/catch非常相似,这种风格可以同时处理多个异常.
同时可以使用recoverWith方法,和recover关系类似于flatMap之于map:
val future = akka.pattern.ask(actor, msg1) recoverWith {
case e: ArithmeticException => Future.successful(0)
case foo: IllegalArgumentException =>
Future.failed[Int](new IllegalStateException("All br0ken!"))
}
future foreach println
After
akka.pattern.after可以简单的在一段固定的超时时间后完成一个包含值或异常的Future:
// TODO after is unfortunately shadowed by ScalaTest, fix as part of #3759
// import akka.pattern.after
val delayed = akka.pattern.after(200 millis, using = system.scheduler)(Future.failed(
new IllegalStateException("OHNOES")))
val future = Future { Thread.sleep(1000); "foo" }
val result = Future firstCompletedOf Seq(future, delayed)