响应式
Akka Streams是 Akka项目中一个实验性质的扩展,是一个基于Akka,使用actor并发模型实现的响应式流规范.响应式流规范已经被一些对跨系统边界和技术栈的基于异步、非阻塞的事件数据处理感兴趣的公司所创建.
无阻塞背压(back pressure)的异步处理
The Reactive Streams initiative is notable because it aims at addressing one of the most critical yet problematic challenges in asynchronous processing, that is the ability to align the processing speeds between producers and consumers of messages while allowing for efficient use of system resources. Basically, it can be potentially catastrophic to allow a fast producer to overwhelm a slower consumer with the rate of incoming messages as these situations generally lead to resource exhaustion somewhere in the path of the data if the source of data is not being back-pressured properly.
In the past back pressure has been commonly achieved by blocking the producer, while waiting on the consumer to process the messages at its own pace. This approach, dependent on synchronous processing of messages between systems is very inefficient and negates the benefits of asynchronous processing (much greater scalability and better resource utilisation), therefore a non-blocking solution for implementing back pressure is required. In the context of reactive streams back pressure is an integral part of asynchronous processing model and is implemented via asynchronous message passing.
开始
在build.sbt中添加项目依赖:
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream-experimental" % "1.0" )
为了执行我们需要的流式处理,创建ActorSystem和ActorMaterializer:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
object MyFirstStream {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("MyActorSystem")
implicit val materializer = ActorMaterializer()
// stream definition and execution code goes here
}
}
ActorMaterializer负责创建最终运行处理流程的actor,依赖于ActorSystem的implicitly.
基本流构建基块
在我们整理并执行基本的流之前,让我们回顾一下Akka提供的基本构造块.其他更先进的处理节点类型会在后续进行介绍.
Source
Source是流的起始点,是流经该流的数据的起始.一个Source可以使任何可以生产消息的类似于一个集合,数据库查询或Http请求.Akka Streams允许从繁多的数据生产实体中创建Source:
val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1,2,3))
val sourceFromFuture = Source(Future.successful("hello"))
val sourceWithSingleElement = Source.single("just one")
val sourceEmittingTheSameElement = Source.repeat("again and again")
val emptySource = Source.empty
Sink
Sink是流经该流的所有消息的最终目的地.扩展库中支持多种开箱即用的多种Sink实现:
val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
val sinkReturningTheFirstElement = Sink.head
val sinkNoop = Sink.ignore
Flow
Flow是Stream中的处理步骤,它结合一个数据传入通道和传出通道,和一些对经过它传送的数据的转换,Akka Streams提供丰富的DSL语法用于创建不同类型的用户行为:
val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // back-pressures the source if the buffer is full
定义Stream
Streams可以用于表示任意处理流程或网络,Akka Streams提供强大的DSL以轻松创建类似的处理.
任何Stream的定义需要包含Source和Sink两个步骤:
val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)
Unfortunately, putting the stream together like this forces us to deal with the way that materialised value of the processing stage is combined with the subsequent stage (a sink in the example above), hence the use of toMat method that allows to specify which materialised value should be exposed during stream materialisation. Using runWith or runFold instead collapses attaching the sink and running the entire stream into a single method. Stream materialisation and dealing with materialised values is a pretty involved area and definitely deserves a separate post.
如果想要在Stream中包含任何转换,可以在Source和Sink之间添加Flow:
val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)
运行Stream
定义完Stream后就可以执行它了,这一过程称为Stream中的具体化,涉及编写Actor以创建基本结构来处理Source生成的任何数据,最终生成一个Stream的具体值.
val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run()
sumOfElements.foreach(println) // we expect to see 6
val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run()
sumOfDoubledElements.foreach(println) // we expect to see 12
或者使用更简洁的方式实现:
// runs the stream by attaching specified sink
sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println)
// runs the stream by attaching sink that folds over elements on a stream
Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)