简介
这个介绍基于2.4.2版本,这个版本的API与其他版本有轻微的不同,可以通过sbt添加相关的依赖:
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"
Stream的API主要包含三种类型,对比基于Java编写的 reactive-streams-jvm,这些API强大的多,同时也更加复杂.
下面是一些在所有的例子中需要用到的依赖,现在进行一次性导入,以便后边使用:
import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
import导入语句部分大多用于类型的声明,变量system表示一个actor system,变量materializer表示stream的计算上下文(evaluation context).在我们的例子中我们使用了ActorMaterializer,表示基于actor的stream求值.这两个变量都被标记为implicit,以告诉编译器在需要的时候进行自动的注入.system.dispatcher的导入用于Future的执行上下文.
A New API
Akka Stream有如下关键性质:
- 详细实现了 Reactive Streams,包含三个主要的规则,背压(backpressure),异步(async),无阻塞边界(non-blocking boundaries).
- 为stream的计算引擎提供了一个抽象,称为Materializer.
- 程序均作为可重用的构件块,分别表示三种主要的类型:Source,Sink,Flow.这些构件块的求值均基于Materializer并且需要被显式触发.
下面详细介绍这三种主要类型的用法.
Source
Source是一个数据生成器,对于stream来说作为一个输入源.每个Source拥有一个单独的输出channel,但没有输入channel.所有的数据流通过已连接到Source的输出channel进行输出.
可以通过多种方式创建一个Source:
scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...
scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future
上面的代码中我们用有限的数据填充source,这表示他们最终将被关闭.同时需要注意的是,Reactive Streams默认的lazy和异步,这表示需要明确的对stream的求值进行请求.在Akka Streams中这些可以通过 run*
方法完成. runForeach 和普通的 foreach作用一样,加上一个前缀run表示我们请求stream进行求值.
介绍完有限数据,下面是一个无限数据的介绍:
scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5
使用 take 方法可以创建一个假定的停止点以免进行无限的计算.同时actor以内建的方式提供支持,我们可以通过向actor发送消息来对stream进行填充:
def run(actor: ActorRef) = {
Future { Thread.sleep(300); actor ! 1 }
Future { Thread.sleep(200); actor ! 2 }
Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
.actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
.mapMaterializedValue(run)
scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1
可以发现 Futures在不同的线程以异步的方式进行计算,以求得最后的结果.上面例子中的进入元素缓冲buffer并不是必须的,同时可以使用 OverflowStrategy.fail 来配置stream,当buffer溢出时stream 执行fail.
通过这个actor接口,我们可以使用任何数据源来填充stream,无论是被当前线程创建的数据还是其他线程,或是其他进程,甚至是通过远程系统.
Sink
Sink基本上与Source是一个对立事物,它是stream的终点,因此用于消费数据.一个Sink拥有一个单独的输入channel而没有输出channel.
当我们希望在不对stream进行求值而又对数据集合的指定处理行为进行重用时,通常需要Sink,上面的介绍中已经了解到 run*
方法并不能达到这些目的,因此我们可以使用sink来进行处理.
下面是一个sink的实例:
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3
使用 to 方法可以将一个source连接到一个sink,返回一个叫做RunnableFlow的实例,我们稍后会看到详细的Flow介绍,表示一个stream可以通过 run() 方法进行执行.
同样可以将到达sink的值发送给actor,即,将actor作为一个sink使用:
val actor = system.actorOf(Props(new Actor {
override def receive = {
case msg => println(s"actor received: $msg")
}
}))
scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...
scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed
Flow
如果想要连接Akka stream和现有的系统,使用数据源和sink进行组合是很有效的方式,但是并不能做什么实际的操作.Flow是Akka stream中另一个抽象,它可以作为不同stream之间的连接器同时又能对stream中的元素进行转换.
如果一个Flow连接到一个Source上,结果会得到一个新的Source,同样,一个Flow连接到一个Sink后会成为一个新的Sink,而如果以个Flow同时连接一个Source和Sink时则会生成一个RunnableFlow.因此,一个Flow处于Source和Sink之间,但是再它连接到一个Source或Sink之前它不属于任何其中一种.
为了更好的理解Flow,我们可以看一些实例:
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6
通过 via 方法可以对Source和Flow进行连接.需要制定输入的类型,因为编译器不能帮我们自动推断.并且这些叫做invert,doubler的Flow完全独立于任何数据生产者和消费者.他们只是对数据进行转换让后发送到输出channel,这表示我们可以在多个stream中对这些Flow进行重用:
scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3
scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1