Simple Scala: Promise

简介

Future是一个只读类型,允许你使用它计算得到的值,或者处理计算中出现的错误.但在这之前,必须有一个方法把这个值放进去.而Promise可以达到这种目的.

类型Promise

之前,我们把顺序执行的代码块传递给了scala.concurrent中的future方法,并且在作用域中给出了一个ExecutionContext,它神奇的异步调用代码块,返回一个Future类型的结果.

虽然这种获得Future的方式很简单,但还有其他的方法来创建Future实例,并填充它,这就是Promise.Promise允许你在Future中放入一个值,不过只能做一次,Future一旦完成,就不能更改了.

一个Future实例总是(只能)和一个Promise实例关联在一起,如果你再REPL里调用future方法,你会发现返回的是一个Promise:

import concurrent.Future
import concurrent.Future

scala> import concurrent.future
import concurrent.future

scala> import concurrent.ExecutionContext.Implicits.global
import concurrent.ExecutionContext.Implicits.global

scala> val f: Future[String] = future { "Hello World!" }
f: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@2b509249

你得到的是一个DefaultPromise,它实现了Future和Promise接口,不过这就是具体的实现细节了,使用者只需要知道代码实现把Future和Promise之间的联系分的很清晰.

这个例子说明,除了Promise,没有其他方法可以完成一个Future,future方法也只是一个辅助函数,隐藏了具体的实现机制.

给出承诺

当我们谈论起承诺能否被兑现时,一个很熟知的例子就是那些政客的竞选诺言.

假设被推选的政客给他得投票者一个减税的承诺,这可以用Promise[TaxCut]表示:

import concurrent.Promise
case class TaxCut(reduction: Int)
// either give the type as a type parameter to the factory method:
val taxcut = Promise[TaxCut]()
// or give the compiler a hint by specifying the type of your val:
val taxcut2: Promise[TaxCut] = Promise()
// taxcut: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@66ae2a84
// taxcut2: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@346974c6

一旦创建了一个Promise,就可以在它上面调用future方法来获取承诺的未来:

val taxCutF: Future[TaxCut] = taxcut.future
// `> scala.concurrent.Future[TaxCut] `  scala.concurrent.impl.Promise$DefaultPromise@66ae2a84

返回的Future可能和Promise并不一样,但在同一个Promise上调用future方法总是返回同一个对象,以确保Future和Promise之间一对一的关系.

结束承诺

一旦给出了承诺,并告诉世界在不远的将来兑现它,那最好尽力去实现.在Scala中,可以结束一个Promise,无论成功还是失败.

对象承诺

为了成功兑现一个Promise,可以调用他的succes方法,并传递一个大家期许的结果:

taxcut.success(TaxCut(20))

这样做之后,Promise就无法在写入其他值了,如果偏要再写会产生异常.

此时,Promise关联的Future也成功完成,注册的回调也开始执行,或者说多这个Future进行了映射,那这个时候,映射函数也该执行了.

一般来说,Promise的完成和对返回的Future的处理发生在不同的线程,很可能你创建了Promise,并立即返回和他关联的Future给调用者,而实际上,另外一个线程还在计算它:

object Government {
  def redeemCampaignPledge(): Future[TaxCut] = {
    val p = Promise[TaxCut]()
    Future {
      println("Starting the new legislative period.")
      Thread.sleep(2000)
      p.success(TaxCut(20))
      println("We reduced the taxes! You must reelect us!!!!1111")
    }
    p.future
  }
}

这个例子中使用了Future半生对象,不过不要被他搞混淆了,这个例子的重点是:Promise并是在调用者的线程里完成的.

现在我们来兑现当初的竞选宣言,在Future上添加一个onComplete回调:

import scala.util.{Success, Failure}
val taxCutF: Future[TaxCut] = Government.redeemCampaignPledge()
println("Now that they're elected, let's see if they remember their promises...")
taxCutF.onComplete {
  case Success(TaxCut(reduction)) =>
    println(s"A miracle! They really cut our taxes by $reduction percentage points!")
  case Failure(ex) =>
    println(s"They broke their promises! Again! Because of a ${ex.getMessage}")
}

多次运行这个例子,会发现显示屏输出的结果顺序是不确定的,而且,最终回调函数会执行,进入成功的那个 case.

违背诺言

政客习惯违背诺言,Scala程序员有时候也只能这样做,调用failure方法,传递一个异常,结束Promise:

case class LameExcuse(msg: String) extends Exception(msg)
object Government {
  def redeemCampaignPledge(): Future[TaxCut] = {
     val p = Promise[TaxCut]()
     Future {
       println("Starting the new legislative period.")
       Thread.sleep(2000)
       p.failure(LameExcuse("global economy crisis"))
       println("We didn't fulfill our promises, but surely they'll understand.")
     }
     p.future
   }
}

这个redeemCampaignPledge实现最终会违背承诺.一旦用failure结束这个Promise,也就无法再次写入了,正如succes方法一样.相关联的Future也会以Failure收场.

如果已经有了一个Try,那可以直接把他传递给Promise的complete方法,以此来结束它.如果这是一个Success,关联Future会成功完成,否则,就失败.

基于Future的编程实践

如果想要使用基于Future的编程范式以增加应用的扩展性,那应用从下都上都必须设计成非阻塞模式.这意味着,基本上应用层所有的函数都应该是异步的,并且返回Future.

当下,一个可能的使用场景是开发 Web 应用。 流行的 Scala Web 框架,允许你将响应作为 Future[Response] 返回,而不是等到你完成响应再返回。 这个非常重要,因为它允许 Web 服务器用少量的线程处理更多的连接。 通过赋予服务器 Future[Response] 的能力,你可以最大化服务器线程池的利用率。

而且,应用的服务可能需要多次调用数据库层以及(或者)某些外部服务, 这时候可以获取多个 Future,用 for 语句将它们组合成新的 Future,简单可读! 最终,Web 层再将这样的一个 Future 变成 Future[Response]。

但是该怎样在实践中实现这些呢?需要考虑三种不同的场景:

非阻塞IO

应用很可能涉及到大量的 IO 操作。比如,可能需要和数据库交互,还可能作为客户端去调用其他的 Web 服务。

如果是这样,可以使用一些基于 Java 非阻塞 IO 实现的库,也可以直接或通过 Netty 这样的库来使用 Java 的 NIO API。 这样的库可以用定量的线程池处理大量的连接。

但如果是想开发这样的一个库,直接和 Promise 打交道更为合适。

阻塞 IO

有时候,并没有基于 NIO 的库可用。比如,Java 世界里大多数的数据库驱动都是使用阻塞 IO。 在 Web 应用中,如果用这样的驱动发起大量访问数据库的调用,要记得这些调用是发生在服务器线程里的。 为了避免这个问题,可以将所有需要和数据库交互的代码都放入 future 代码块里,就像这样:

// get back a Future[ResultSet] or something similar:
Future {
  queryDB(query)
}

到现在为止,我们都是使用隐式可用的全局 ExecutionContext 来执行这些代码块。 通常,更好的方式是创建一个专用的 ExecutionContext 放在数据库层里。 可以从 Java的 ExecutorService 来它,这也意味着,可以异步的调整线程池来执行数据库调用,应用的其他部分不受影响。

import java.util.concurrent.Executors
import concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)

长时间运行的计算

取决于应用的本质特点,一个应用偶尔还会调用一些长时间运行的任务,它们完全不涉及 IO(CPU 密集的任务)。 这些任务也不应该在服务器线程中执行,因此需要将它们变成 Future:

Future {
  longRunningComputation(data, moreData)
}

同样,最好有一些专属的 ExecutionContext 来处理这些 CPU 密集的计算。 怎样调整这些线程池大小取决于应用的特征,这些已经超过了本文的范围。

参考

参考链接