Simple Scala: Future

简介

相较于大多数编程语言低级的并发API,Scala提供的方法可以让人们更好的理解并发以及编写良好架构的并发程序.

Future就是两大基石之一,另一个是Actor.

顺序代码为什么会变坏

假如你想冲一杯卡布奇诺,下面是制作步骤:

1. 研磨所需的咖啡豆
2. 加热一些水
3. 用研磨好的咖啡豆和热水做一杯咖啡
4. 打奶泡
5. 结合咖啡和奶泡做成卡布奇诺

转换成Scala代码后可能会是这样:

import scala.util.Try
// 基本类型别名,以便构造更有意义的方法签名:
type CoffeeBeans = String
type GroundCoffee = String
case class Water(temperature: Int)
type Milk = String
type FrothedMilk = String
type Espresso = String
type Cappuccino = String

// 各个步骤的假设实现:
def grind(beans: CoffeeBeans): GroundCoffee = s"ground coffee of $beans"
def heatWater(water: Water): Water ` water.copy(temperature `  85)
def frothMilk(milk: Milk): FrothedMilk = s"frothed $milk"
def brew(coffee: GroundCoffee, heatedWater: Water): Espresso = "espresso"
def combine(espresso: Espresso, frothedMilk: FrothedMilk): Cappuccino = "cappuccino"

// 各个步骤可能出现的异常:
// (后面的代码实验中会用到):
case class GrindingException(msg: String) extends Exception(msg)
case class FrothingException(msg: String) extends Exception(msg)
case class WaterBoilingException(msg: String) extends Exception(msg)
case class BrewingException(msg: String) extends Exception(msg)

// 按顺序执行这些步骤:
def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)

这样做有几个优点,可以很轻易的弄清事情的步骤,一目了然,而且不会混淆(毕竟没有上下文切换).

不好的一面是,大部分时间,你的大脑和身体都在等待状态: 在等待研磨咖啡豆时,你完全不能做任何事情,只有当这一步完成后,才能开始烧水.这显然是在浪费时间,所以你可能想一次开始多个步骤,让他们同时执行,一旦水烧开,咖啡豆也研磨好了,你可以制作咖啡了,这期间打奶泡也可以开始了.

这和编写软件么有什么不同,一个web服务器可以用来处理和响应请求的线程只有那么多,不能因为要等待数据库查询或其他HTTP服务调用的结果而阻塞了这些可贵的线程.相反,一个异步编程模型和非阻塞IO会更合适,这样的话,当一个请求处理在等待数据库查询结果时,处理这个请求的线程也能为其他请求服务.

"I heard you like callbacks, so I put a callback in your callback!"

在并发家族里,你应该知道nodejs这个很酷的家伙,nodejs完全通过回调来通信,不幸的是,这很容易导致回调中包含回调的回调,这简直一团糟,代码难于阅读和调试.

Scala的Future也允许回调,但它提供了更好的选择,所以你就不怎么需要使用回调了.

"I know Futures, and they are completely useless!"

也许你知道其他Future的实现,最引人瞩目的Java实现的那个.但是对于Java的Future,你只能去查看它是否已经完成,或者阻塞线程直到其结束.简而言之,Java的Future几乎没用,而且用起来绝对不会让人开心.

当然Scala的Future绝对不是这样.

Future语义

scala.concurrent包里的Future[T]是一个容器类型,代表了一种返回值类型为T的计算,计算可能会出错,也可能会超时,它可能会包含异常,而不是你期望的那个值.

Future只能写一次:当一个Future完成后,它就不能再被改变了.同时,Future只提供了读取计算值的接口,写入计算值的接口交给了Promise.这样,API层面上会有一个清晰的界限.

使用Future

Future有很多种使用方式,我将通过重写”卡布奇诺”这个例子进行证明.

首先,所有可以并行执行的函数,应该返回一个Future:

import scala.concurrent.future
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
  println("start grinding...")
  Thread.sleep(Random.nextInt(2000))
  if (beans == "baked beans") throw GrindingException("are you joking?")
  println("finished grinding...")
  s"ground coffee of $beans"
}

def heatWater(water: Water): Future[Water] = Future {
  println("heating the water now")
  Thread.sleep(Random.nextInt(2000))
  println("hot, it's hot!")
  water.copy(temperature = 85)
}

def frothMilk(milk: Milk): Future[FrothedMilk] = Future {
  println("milk frothing system engaged!")
  Thread.sleep(Random.nextInt(2000))
  println("shutting down milk frothing system")
  s"frothed $milk"
}

def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = Future {
  println("happy brewing :)")
  Thread.sleep(Random.nextInt(2000))
  println("it's brewed!")
  "espresso"
}

上面的代码有几处需要解释.

首先Future伴生对象里的apply方法需要两个参数:

object Future {
  def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T]
}

需要执行的异步计算通过传名参数body传入,第二个参数是一个隐式参数.隐式参数是指函数调用时,如果作用域中存在一个匹配的隐式值,就无需显式指定这个参数.ExecutionContext可以执行一个Future,可以把它看成是一个线程池,是绝大部分Future API的隐式参数.

import scala.concurrent.ExecutionContext.Implicits.global 引入了一个全局执行的上下文,确保了隐式值的存在.这时候只需要一个单元素列表,可以用大括号代替小括号.调用Future方法时,经常使用这种形式,使得它看起来像是一种语言特性,而不是一个普通的方法调用.

这个例子中没有大量计算,所以用随机时间休眠来模拟以便说明问题,而且,为了清晰说明并发代码的执行顺序,还在”计算”之前和之后打印了一些信息.

Future计算会在创建后的某个不确定时间点上由ExecutionContext给其分配的某个线程中执行.

回调

对于一些简单的问题,使用回调就能很好解决,Future的回调是偏函数,你可以把回调传递给Future的onSucces方法,如果这个Future成功完成,这个回调就会执行,并把Future的返回值作为参数输入:

grind("arabica beans").onSuccess { 
  case ground => println("okay, got my ground coffee")
}

类似也可以在onFailure上注册回调,只不过实在Future失败时调用,其输入是一个Throwable.

通常的做法是将两个方法结合在一起更好的处理Future:在onComplete方法上注册回调,回调的输入是一个Try:

import scala.util.{Success, Failure}
grind("baked beans").onComplete {
  case Success(ground) => println(s"got my $ground")
  case Failure(ex) => println("This grinder needs a replacement, seriously!")
}

传递给grind的参数是”baked beans”,因此grind会返回一个异常,进而导致Future中的计算失败.

Future组合

当嵌套使用Future时,回调就变得烦人.不过没有必要这么做,因为Future是可组合的,这是它真正发挥威力的时候.

你一定注意到,之前讨论过的所有容器类型都可以使用map/flatMap操作,也可以用在for语句中.同样作为一种容器类型,Future支持这些操作也不足为奇.

Map操作

Scala让”时间旅行”成为可能,假设想在水加热后检查它的温度,可以通过将Future[Water]映射到Future[Boolean]来完成:

val tempreatureOkay: Future[Boolean] = heatWater(Water(25)) map { water =>
   println("we're in the future!")
   (80 to 85) contains (water.temperature)
}

tempreatureOkay最终会包含水温的结果,你可以改变heatWater的实现让它抛出异常(比如加热器爆炸了),然后等待 “we’re in the future!” 出现在显示屏上,不过你永远等不到.

写传递给map的函数时,你就处在未来(或者说可能的未来),一旦Future[Water]实例成功完成,这个函数就会执行,只不过,该函数的时间线可能不是你现在所处的这个.如果Future[Water]失败,传给map的函数中的事情永远也不会发生,调用map的结果将是一个失败的Future[Boolean].

FlatMap操作

如果一个Future计算依赖另一个Future的结果,那需要求救于flatMap以避免Future的嵌套.

假设,测量水温的线程需要一些时间,那你可能想异步的检查水温是否OK,比如有一个函数,接受一个Water,并返回Future[Boolean]:

def temperatureOkay(water: Water): Future[Boolean] = future {
  (80 to 85) contains (water.temperature)
}

使用flatMap而不是map,将会得到一个Future[Boolean],而不是Future[Future[Boolean]]:

val nestedFuture: Future[Future[Boolean]] = heatWater(Water(25)) map {
  water => temperatureOkay(water)
}

val flatFuture: Future[Boolean] = heatWater(Water(25)) flatMap {
  water => temperatureOkay(water)
}

同样,映射只会在Future[Water]成功完成的情况下.

for语句

除了调用flatMap,也可以写成for语句,上面的例子可以重写成:

val acceptable: Future[Boolean] = for {
  heatedWater <- heatWater(Water(25))
  okay <- temperatureOkay(heatedWater)
} yield okay

如果有多个可以并行执行的计算,则需要特别注意,要现在for语句外面创建好对应的Futures.

def prepareCappuccinoSequentially(): Future[Cappuccino] =
  for {
    ground <- grind("arabica beans")
    water <- heatWater(Water(25))
    foam <- frothMilk("milk")
    espresso <- brew(ground, water)
  } yield combine(espresso, foam)

这看起来很漂亮,但要知道,for语句只不过是flatMap嵌套调用的语法糖.这意味着,只有当Future[GroundCoffee]成功完成后,heatWater才会创建Future[Water],你可以查看函数运行时打印出来的东西验证这个说法.

因此,要确保在for语句之前实例化所有相互独立的Future:

def prepareCappuccino(): Future[Cappuccino] = {
  val groundCoffee = grind("arabica beans")
  val heatedWater = heatWater(Water(20))
  val frothedMilk = frothMilk("milk")
  for {
    ground <- groundCoffee
    water <- heatedWater
    foam <- frothedMilk
    espresso <- brew(ground, water)
  } yield combine(espresso, foam)
}

在for语句之前,三个Future在创建之后就各自独立的运行,显示屏输出的顺序是不确定的,唯一确定的是”happy brewing”总是出现在后面.因为该输出所在的函数brew是在其他两个函数执行完毕后才开始执行的,也因为这样,可以在for语句里直接调用它,当让,前提是前面的Future都成功完成.

失败偏向的Future

你可能发现Future[T]是偏向成功的,允许你使用map/flatMap/for等.

但是有可能需要处理出错的情况.调用Future[T]上的failed方法,会得到一个失败偏向的Future,类型是Future[Throwable].之后可以映射这个Future[Throwable],在失败的情况下执行mapping操作.

Filter

Filter可以用于创建一个新的Future对象,该对象只有在满足特定条件的前提下才会得到原始Future的计算值,否则会抛出一个NoSuchElementException的异常而失败.调用了filter的future,其效果与直接调用withFilter完全一样.

而Collect与Filter的关系跟在集合操作时的API含义类似.

Foreach

调用foreach组合器并不会在计算值可用的时候阻塞当前的线程去获取计算值.恰恰相反,只有当future对象成功计算完成了,foreach所迭代的函数才能够被异步的执行,这表示foreach与onSucess回调意义完全相同.

Recover

由于Future特质从概念上看包含两种类型的返回值,即计算结果和异常,所以组合器会有一个处理异常的需求.

Recover能够创建一个新的future对象,该对象当计算完成时持有和原future对象一样的值.如果执行不成功则偏函数的参数会被传递给是Future失败的那个Throwable异常.若果把Throwable映射到了某个值,那么新的Future就会成功完成并返回该值.如果偏函数没有定义在Throwable中,那么最终产生结果的future也会失败并返回同样的Throwable.

而recoverWith与recover的关系类似于flatMap与map的关系.

FallbackTo

FallbackTo组合器生成的Future对象可以在该原future成功计算时返回结果,如果原future失败或者异常则返回该future参数对象的成功值.在原future和参数future都失败的情况下,新future对象会完成并返回原future抛出的异常,正如下面的例子中,本想打印美元汇率,但是在获取美元汇率失败的情况下会打印出瑞士法郎的汇率:

val usdQuote = future {
  connection.getCurrentValue(USD)
} map {
  usd => "Value: " + usd + "$"
}
val chfQuote = future {
  connection.getCurrentValue(CHF)
} map {
  chf => "Value: " + chf + "CHF"
}
al anyQuote = usdQuote fallbackTo chfQuote
anyQuote onSuccess { println(_) }

上面的例子中,若果usdQuote失败了,会返回chfQuote,如果两者同时失败,则返回usdQuote的异常.

AndThen

组合器andThen的用法是处于纯粹的side-effecting(副作用)目的.经andThen返回的新Future,无论原future成功或者失败都会返回与原future一样的结果.一旦原future成功并返回结果,andThen后跟的代码块就会被调用,且新Future将返回与future一样的结果,这确保了多个andThen调用的顺序执行.

在下面的例子中,把近期发出的帖子收集到一个可变集合中,然后把他们打印到屏幕上:

val allposts = mutable.Set[String]()
future {
  session.getRecentPosts
} andThen {
  posts => allposts ++= posts
} andThen {
  posts =>
  clearAll()
  for (post <- allposts) render(post)
}

总结: Future的组合器是纯函数式的,每个组合器都会返回一个跟原future相关的新future.

参考

参考链接