处理异步结果
异步化Controller
本质上,Play从本质上就是异步的,Play以异步,无阻塞的方式处理所有的请求.
默认的配置文件已经对异步化Controller做了调谐.或者说,应用的代码需要在控制层避免阻塞,比如控制器的代码等待一个操作.比较常见的操作有JDBC调用,流式API,HTTP请求,或者花费时间比较长的计算.
尽管在默认的执行上下文(execution context)中增加线程能够允许更过的并发请求在控制器中处理,遵循推荐的方法能够使控制器异步并以更容易的方式扩展或控制系统的负载.
创建无阻塞Action
由于Play的工作方式,action的代码必须尽可能的快,比如无阻塞.因此在我们不能及时生成结果时应该返回一个什么呢.响应将会是一个Future类型的结果.
一个Future[Result]
最终会被一个Result
类型的值填充.通过返回一个Future[Result]
而不是普通的Result
,我们可以以无阻塞的方式快速生成结果,Play会在Promise可用的时候立即返回结果.
客户端请求会在等待响应的时候被阻塞,但是服务端不会发生任何阻塞,并且服务端资源同时可以用于服务其他客户端请求.
如何创建Future[Result]
在创建Future[Result]
时首先需要另一个Future: 这个Future会给我们真正需要用于计算的值.
import play.api.libs.concurrent.Execution.Implicits.defaultContext
val futurePIValue: Future[Double] = computePIAsynchronously()
val futureResult: Future[Result] = futurePIValue.map { pi =>
Ok("PI value computed: " + pi)
}
Play所有的异步API都会给你一个Future
,无论是你通过play.api.libs.WS
API来调用外部的服务,或是使用Akka调度异步的任务,或者是通过play.api.libs.Akka
与actor通信.
下面是一个以异步的方式执行一个代码块并得到一个Future:
import play.api.libs.concurrent.Execution.Implicits.defaultContext
val futureInt: Future[Int] = scala.concurrent.Future {
intensiveComputation()
}
注意,非常重要的是能够理解那个线程正在运行Future.在上面的两个代码块中,通过import
导入了Play的默认执行上下文(execution context).这是一个隐式的参数,用于传给所有Future的API方法然后接受回调.这个执行上下文相当于一个线程池,尽管并不是必须的.
你不能通过魔法般的方式将一个同步IO使用Future
包装成一个异步IO.如果你不能改变应用的价格以避免阻塞操作,从而这些操作不得不被执行,最终线程就会被阻塞.因此除了Future
内部的操作,配置一个单独的执行上下文并且配置能够满足并发预期的线程数量也是非常必要的.查看Understanding Play thread pool
获取更多信息.
另外使用actor来处理阻塞操作也是非常有帮助的.Actor为处理超时和失败提供了非常清晰的模型.
返回Future
到现在为止我们都是使用Action.apply
的创建器方法来创建action,而发送异步的结果需要使用Action.async
创建器方法.
import play.api.libs.concurrent.Execution.Implicits.defaultContext
def index = Action.async {
val futureInt = scala.concurrent.Future { intensiveComputation() }
futureInt.map(i => Ok("Got result: " + i))
}
Action默认都是异步的
Play的action默认都是异步的,在下面的控制器代码中,{ Ok(...) }
部分的代码并不属于控制器部分的方法体.它是传递给Action
对象的apply
方法的异步函数,它会创建一个Action
类型的对象.在内部,你编写的内部方法会被调用并且结构会包含在一个Future
当中.
def echo = Action { request =>
Ok("Got request [" + request + "]")
}
注意,Action.apply
和Action.async
创建的action对象在内部都会以同样的方式进行处理.仅仅只有一种类型的action,并且是异步的,而不是两种(一个异步一个同步),.async
创建器只是一个用于创建能够返回Future结果的工厂方法,仅仅是为了更方便的编写异步代码.
处理超时
合理的处理超时是很有用的,能够避免web浏览器阻塞后者等待发生错误.可以很方便的创建一个带有超时时间的Promise
:
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import scala.concurrent.duration._
def index = Action.async {
val futureInt = scala.concurrent.Future { intensiveComputation() }
val timeoutFuture = play.api.libs.concurrent.Promise.timeout("Oops", 1.second)
Future.firstCompletedOf(Seq(futureInt, timeoutFuture)).map {
case i: Int => Ok("Got result: " + i)
case t: String => InternalServerError(t)
}
}
Streaming HTTP responses
标准响应和Content-Length
头
从HTTP 1.1
开始,一个Http请求和一个响应只会开启一个连接,服务端必须在响应中携带适当的Content-Length
头.
默认情况下,当你返回一个简单的结果时并不会指定一个Content-Length
头,比如:
def index = Action {
Ok("Hello World")
}
当然,因为你发送的内容是比较熟悉的,Play能够计算内容的长度并且自动为你设置正确的头信息.
事实上,我们前面曾见过一个响应体被一个play.api.libs.iteratee.Enumerator
指定:
def index = Action {
Result(
header = ResponseHeader(200),
body = Enumerator("Hello World")
)
}
这表示需要合适的计算Content-Length
,Play必须加载整个Enumerator到内存.
发送大规模的数据
如果把简单的Enumerator加载到内存是没有问题的,那大的数据集合有如何呢? 然我们看一下当向客户端返回一个大文件时会怎样.
val file = new java.io.File("/tmp/fileToServe.pdf")
val fileContent: Enumerator[Array[Byte]] = Enumerator.fromFile(file)
然后把这个Enumerator指定为响应体:
def index = Action {
val file = new java.io.File("/tmp/fileToServe.pdf")
val fileContent: Enumerator[Array[Byte]] = Enumerator.fromFile(file)
Result(
header = ResponseHeader(200),
body = fileContent
)
}
事实上这里会出现一个问题,因为我们没有指定合适的Content-Length
头,Play会自己对他进行计算,唯一的方式把他整个加载到内存,然后进行计算.
这里的问题就是我们并不想把整个大文件加载到内存,所有为了避免,我们需要自己指定正确的头信息:
def index = Action {
val file = new java.io.File("/tmp/fileToServe.pdf")
val fileContent: Enumerator[Array[Byte]] = Enumerator.fromFile(file)
Result(
header = ResponseHeader(200, Map(CONTENT_LENGTH -> file.length.toString)),
body = fileContent
)
}
文件服务
Play提供了很多方便使用的方式来提供文件服务:
def index = Action {
Ok.sendFile(new java.io.File("/tmp/fileToServe.pdf"))
}
辅助方法会根据文件名计算Content-Length
,然后添加Content-Disposition
来指定浏览器接收响应的方式.默认会在响应体重添加Content-Disposition: attachment;filename=fileToServe.pdf
头来告诉浏览器下载文件.
或者提供自己的文件名:
def index = Action {
Ok.sendFile(
content = new java.io.File("/tmp/fileToServe.pdf"),
fileName = _ => "termsOfService.pdf"
)
}
或者想要inline
的方式:
def index = Action {
Ok.sendFile(
content = new java.io.File("/tmp/fileToServe.pdf"),
inline = true
)
}
这时就不需要指定文件名了,浏览器也不会尝试去下载它,只是会在浏览器窗口对他进行展示.这在支持文本,HTML,图片时比较有用.
Chunked responses
到现在为止,我们能够很好的处理流式文件内容,因为我们在streaming一个文件之前能够计算它的长度,但是如果是动态的计算内容呢,并没有真正的内容长度可用呢?
这种类型的响应需要使用Chunked transfer encoding
.由HTTP 1.1
提供支持.
它的好处是我们能够实时的处理数据,表示我们能够在数据块可用时将他们发送.缺点是由于浏览器并不知道内容大小,他不能够正常的进行展示.
比如我们有一个动态InputStream
来计算数据,首先要为这个stream创建一个Enumerator
:
val data = getDataStream
val dataContent: Enumerator[Array[Byte]] = Enumerator.fromStream(data)
现在可以使用Ok.chunked
来stream这些数据:
def index = Action {
val data = getDataStream
val dataContent: Enumerator[Array[Byte]] = Enumerator.fromStream(data)
Ok.chunked(dataContent)
}
当然,我们可以使用任何Enumerator
来指定块数据:
def index = Action {
Ok.chunked(
Enumerator("kiki", "foo", "bar").andThen(Enumerator.eof)
)
}
然后检查服务端发送过来的响应:
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Transfer-Encoding: chunked
4
kiki
3
foo
3
bar
0
会得到三个块数据然后跟随一个空的块最后关闭了响应.
Comet
Using chunked responses with Comet
一个块数据常用的方法创建一个Comet套接字.
一个Comet套接字是一个仅包含<script>
元素的块式text/html
响应.每一个块中编写一个包含Javascript的<script>
标签,然后被浏览器立即执行.通过这种方式,我们可以从服务端向浏览器发送实时的事件:每一个消息中在<script>
标签内包含一个Javascript回调函数,然后写入块响应.
因为Ok.chunked
促使Akka-stream
携带一个Flow[ByteString]
,我们可以发送一个元素的Flow
然后对他进行转换,然后每个元素都会被包装为Javascript方法.Comet的辅助方法会使Comet套接字自动化,向浏览器推送一个初始的空缓冲数据,同时支持String或JSON消息.
Comet模块导入
一些Comet辅助方法的导入:
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import play.api.http.ContentTypes
import play.api.inject.guice.GuiceApplicationBuilder
import play.api.libs.Comet
import play.api.libs.iteratee.Enumerator
import play.api.libs.json._
import play.api.libs.streams.Streams
import play.api.mvc._
你可能同时需要一个物化器(materializer),最好从你的DI system
中获取akka.stream.Materializer
.
Using Comet with String Flow
通过Flow
推送字符串消息:
def cometString = Action {
implicit val m = materializer
def stringSource: Source[String, _] = Source(List("kiki", "foo", "bar"))
Ok.chunked(stringSource via Comet.string("parent.cometMessage")).as(ContentTypes.HTML)
}
Using Comet with JSON Flow
通过Flow
发送JSON消息:
def cometJson = Action {
implicit val m = materializer
def jsonSource: Source[JsValue, _] = Source(List(JsString("jsonString")))
Ok.chunked(jsonSource via Comet.json("parent.cometMessage")).as(ContentTypes.HTML)
}
Using Comet with iframe
Comet辅助方法比较典型的是和forever-iframe
技术一起使用,在一个HTML页面中:
<script type="text/javascript">
var cometMessage = function(event) {
console.log('Received event: ' + event)
}
</script>
<iframe src="/comet"></iframe>
WebSockets
Handling WebSockets
到目前为止,我们都是使用Action
实例来处理标准的HTTP请求然后返回标准的HTTP响应.WebSockets是一个完全不同的概念,不能用标准的Action
来处理.
Play提供了两种不同的内建机制来处理WebSockets,一种是使用actor,一种是使用iteratees(迭代器),这两种机制都可以使用WebSocket
创建器进行访问.
使用Actor处理WebSockets
为了使用actor处理WebSocket,需要为Play提供一个akka.actor.Props
对象,当Play接收到一个WebSocket连接时进行创建.Play会提供一个akka.actor.ActorRef
用以向上游发送消息,因此我们可以用它来创建Props
对象:
import play.api.mvc._
import play.api.Play.current
import play.api.Play.materializer
def socket = WebSocket.acceptWithActor[String, String] { request => out => // 此处的out是一个actor引用
MyWebSocketActor.props(out)
}
这里我们用以发送的actor是这样:
import akka.actor._
object MyWebSocketActor {
def props(out: ActorRef) = Props(new MyWebSocketActor(out))
}
class MyWebSocketActor(out: ActorRef) extends Actor { // 自定义的actor需要接收Play提供的actor引用作为参数,以想Play回复消息
def receive = {
case msg: String =>
out ! ("I received your message: " + msg)
}
}
任何客户端发送过来的消息都会被发送给actor,然后发送给Play提供的actor的消息都会被发送给客户端.
检查WebSocket何时被关闭
当一个WebSocket被关闭时,Play会自动关闭actor,这表示你可以通过实现actor的postStop
方法来处理这种情况,用于清理WebSocket消耗的资源.比如:
override def postStop() = {
someResource.close()
}
关闭一个WebSocket
当你的actor处理了WebSocket的terminate时,Play会自动关闭WebSocket.比如关闭WebSocket,向你的actor发送一个PoisonPill
消息:
import akka.actor.PoisonPill
self ! PoisonPill
拒绝一个WebSocket
有时你会需要拒绝一个Websocket请求,比如用户必须经过验证才能连接到WebSocket,或者WebSocket关联了一些ID通过path传入的资源,但是该ID的资源并不存在.Play提供了一个tryAcceptWithActor
来进行处理,运行你要么返回一个结果(比如forbidden,notFount),要么使用actor来处理这个WebSocket:
import scala.concurrent.Future
import play.api.mvc._
import play.api.Play.current
import play.api.Play.materializer
def socket = WebSocket.tryAcceptWithActor[String, String] { request =>
Future.successful(request.session.get("user") match {
case None => Left(Forbidden)
case Some(_) => Right(MyWebSocketActor.props)
})
}
处理不同类型的消息
到现在只看到了处理String类型的消息.Play同样为Array[Byte]
提供了内建的处理器,或者通过String解析的JsValue,可以把这些作为一个参数发送给WebSocket的创建方法:
import play.api.mvc._
import play.api.libs.json._
import play.api.Play.current
import play.api.Play.materializer
def socket = WebSocket.acceptWithActor[JsValue, JsValue] { request => out =>
MyWebSocketActor.props(out)
}
你会注意到有两个类型参数,这允许我们使用两种不同的类型来处理消息的传入和传出,这对底层类型没有什么用,但有助于将消息解析为高级的类型.
比如,我们想要接收JSON类型的消息,然后需要把传入的消息解析为InEvent
,把传出的消息解析为OutEvent
.首先要做的就是为这两个消息创建JSON格式化方法.
import play.api.libs.json._
implicit val inEventFormat = Json.format[InEvent]
implicit val outEventFormat = Json.format[OutEvent]
然后就可以为这种类型创建WebSocket的FrameFormatter
:
import play.api.mvc.WebSocket.FrameFormatter
implicit val messageFlowTransformer = MessageFlowTransformer.jsonMessageFlowTransformer[InEvent, OutEvent]
最后就可以在WebSocket中使用:
import play.api.mvc._
import play.api.Play.current
import play.api.Play.materializer
def socket = WebSocket.acceptWithActor[InEvent, OutEvent] { request => out =>
MyWebSocketActor.props(out)
}
现在在actor中就可以接收InEvent
类型的消息然后发送OutEvent
类型的消息.
Handling WebSockets with iteratees
Actor是一个适合处理互不关联的消息的很好的抽象,iteratees是一个适合处理stream的很好的抽象.
使用WebSocket
而不是Action
来处理Websocket请求:
import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
def socket = WebSocket.using[String] { request =>
// Log events to the console
val in = Iteratee.foreach[String](println).map { _ =>
println("Disconnected")
}
// Send a single 'Hello!' message
val out = Enumerator("Hello!")
(in, out)
}
一个WebSocket
可以访问请求头,运行你检索标准的请求头信息和session数据.但是不能访问请求体或者响应体.
当使用这种凡事构造WebSocket时,必须同时返回in
和out
两个channel:
in
是一个Iteratee[A,Unit]
(A是消息类型,这里是String),它会被每条消息通知,当客户端的socket关闭时会受到一个EOF
,out
是一个Enumerator[A]
,会生成消息然后发送到客户端,通过发送EOF
可以关闭服务端的套接字
我们的例子中只是创建了一个简单的iteratees发消息打印出来.为了发送消息,我们创建了一个模拟的Enumerator
然后仅发送一个Hello!
消息.
创建另一个例子,丢弃传入的数据,发送一个Hello!
消息然后关闭套接字:
import play.api.mvc._
import play.api.libs.iteratee._
def socket = WebSocket.using[String] { request =>
// Just ignore the input
val in = Iteratee.ignore[String]
// Send a single 'Hello!' message and close
val out = Enumerator("Hello!").andThen(Enumerator.eof)
(in, out)
}
另一个例子是将传入的数据记录到标准输出然后利用Concurrent.broadcast
想客户端广播:
import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
def socket = WebSocket.using[String] { request =>
// Concurrent.broadcast returns (Enumerator, Concurrent.Channel)
val (out, channel) = Concurrent.broadcast[String]
// log the message to stdout and send response back to client
val in = Iteratee.foreach[String] {
msg => println(msg)
// the Enumerator returned by Concurrent.broadcast subscribes to the channel and will
// receive the pushed messages
channel push("I received your message: " + msg)
}
(in,out)
}