Simple PlayFramework: Calling WebServices

The Play WS API

有时候需要从Play应用中请求其他的HTTP服务,Play提供了WS库来支持,提供了一种创建异步HTTP请求的方式.

WS API有两个重要的部分: 发起请求和处理响应.

创建请求

首先在build.sbt中添加依赖:

libraryDependencies ++= Seq(
  ws
)

现在所有需要使用WS的控制器或组件需要声明一个WS上的依赖:

import javax.inject.Inject
import scala.concurrent.Future
import scala.concurrent.duration._

import play.api.mvc._
import play.api.libs.ws._
import play.api.http.HttpEntity

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.ExecutionContext

class Application @Inject() (ws: WSClient) extends Controller {

}

我们已经将WSClient的实例命名为ws,下面的例子都会使用这个名字.

创建HTTP请求时使用ws.url()来指定URL:

val request: WSRequest = ws.url(url)

这会返回一个WSRequest,可以用于指定多种HTTP选项,比如设置请求头,并可以链接起来构造一个复杂的请求:

val complexRequest: WSRequest =
  request.withHeaders("Accept" -> "application/json")
    .withRequestTimeout(10000.millis)
    .withQueryString("search" -> "play")

然后以一个你需要的并且与HTTP方法对应的请求方法来结束,这会结束整个链接,然后使用所有已配置的选项发起请求:

val futureResponse: Future[WSResponse] = complexRequest.get()

返回结果是一个Future[WSResponse],Response中包含了从服务端返回的数据.

Request with authentication

如果需要使用HTTP用户验证,可以在创建器中指定,使用用户名,密码,和一个AuthScheme,AuthScheme有效的方式包括BASIC, DIGEST, KERBEROS, NONE, NTLM, SPNEGO.

ws.url(url).withAuth(user, password, WSAuthScheme.BASIC).get()

Request with follow redirects

如果一个HTTP调用结果是一个302或301重定向,你可以自动更随重定向而不用重新创建一个请求:

ws.url(url).withFollowRedirects(true).get()

Request with query parameters

参数可以指定为一个连续的key/value元组:

ws.url(url).withQueryString("paramKey" -> "paramValue").get()

Request with additional headers

请求头可以指定为一个连续的key/value元组:

ws.url(url).withHeaders("headerKey" -> "headerValue").get()

如果需要使用特殊的方式发送普通文本,则需要显式的指定文本类型:

ws.url(url).withHeaders("Content-Type" -> "application/xml").post(xmlString)

Request with virtual host

一个实际host可以指定为一个String:

ws.url(url).withVirtualHost("192.168.1.1").get()

Request with timeout

如果想要指定请求超时时间,可以使用withRequestTimeout设置一个值,一个无限的时间可以通过传入一个Duration.Inf设置:

ws.url(url).withRequestTimeout(5000.millis).get()

Submitting form data

使用POST提交一个Map[String, Seq[String]]类型的url-form-encoded数据:

ws.url(url).post(Map("key" -> Seq("value")))

Submitting JSON data

提交JSON数据最简单的方式就是使用JSON库:

import play.api.libs.json._
val data = Json.obj(
  "key1" -> "value1",
  "key2" -> "value2"
)
val futureResponse: Future[WSResponse] = ws.url(url).post(data)

Submitting XML data

val data = <person>
  <name>Steve</name>
  <age>23</age>
</person>
val futureResponse: Future[WSResponse] = ws.url(url).post(data)

Streaming data

比如执行了一个数据库查询然后返回个大的图片,然后你想将数据发送到不同的端点进行更多的处理.在理念上,如果如果你能在接收到数据的同时发送数据,将会减少很多潜在问题,并且不用把大量的数据加载到内存.如果你的数据库访问模块支持Reactive Streams(Slick),下面是一个实现的例子:

val wsResponse: Future[WSResponse] = ws.url(url)
  .withBody(StreamedBody(largeImageFromDB)).execute("PUT")

largeImageFromDB部分是一个Source[ByteString, _]类型的Akka-stream.

Request Filters

可以通过添加一个请求过滤器来做额外的处理.通过继承play.api.libs.ws.WSRequestFilter的方式实现,然后以request.withRequestFilter(filter)的方式添加到请求.

一个将请求日志格式化到SLF4J的例子已经在play.api.libs.ws.ahc.AhcCurlRequestLogger提供:

ws.url(s"http://localhost:$testServerPort")
  .withRequestFilter(AhcCurlRequestLogger())
  .withBody(Map("param1" -> Seq("value1")))
  .put(Map("key" -> Seq("value")))

输出:

curl \
  --verbose \
  --request PUT \
 --header 'Content-Type: application/x-www-form-urlencoded; charset=utf-8' \
 --data 'key=value' \
 'http://localhost:19001/

处理响应数据

当一个Future中的操作完成时,必须有一个隐式的执行上下文可见,着声明了Future的回调执行的线程.可以在类的构造器为ExecutionContext声明一个额外的依赖来注入Play默认的执行上下文.

class PersonService @Inject()(implicit context: ExecutionContext) {
  // ...
}

如果不适用DI,仍然可以适用Play默认的执行上下文:

implicit val context = play.api.libs.concurrent.Execution.Implicits.defaultContext

下面的例子会适用到这个case类来进行序列化和反序列化:

case class Person(name: String, age: Int)

Processing a response as JSON

通过调用resonpse.json将相应作为一个JSON处理:

val futureResult: Future[String] = ws.url(url).get().map {
  response =>
    (response.json \ "person" \ "name").as[String]
}

JSON库提供了一些有用的特性可以将隐式的Reads[T]准确的映射到类:

import play.api.libs.json._

implicit val personReads = Json.reads[Person]

val futureResult: Future[JsResult[Person]] = ws.url(url).get().map {
  response => (response.json \ "person").validate[Person]
}

Processing a response as XML

val futureResult: Future[scala.xml.NodeSeq] = ws.url(url).get().map {
  response =>
    response.xml \ "message"
}

Processing large responses

调用get(), post(), execute()会在响应可用之前将响应体的数据加载到内存,当下载一个大的多字节的文件,这会引起一些不受欢迎的垃圾集合或引起内存溢出.

WS支持使用Akka-streamSink来以增量的方式消费响应体重的数据,WSRequeststream()方法返回一个Future[StreamedResponse],StreamedResponse是一个保存响应体和响应头的简单容器.

下面的例子使用Sink的fold来对响应体中的字节数进行计数:

// Make the request
val futureResponse: Future[StreamedResponse] =
  ws.url(url).withMethod("GET").stream()

val bytesReturned: Future[Long] = futureResponse.flatMap {
  res =>
    // Count the number of bytes returned
    res.body.runWith(Sink.fold[Long, ByteString](0L){ (total, bytes) =>
      total + bytes.length
    })
}

或者将数据stream到其他位置:

// Make the request
val futureResponse: Future[StreamedResponse] =
  ws.url(url).withMethod("GET").stream()

val downloadedFile: Future[File] = futureResponse.flatMap {
  res =>
    val outputStream = new FileOutputStream(file)

    // The sink that writes to the output stream
    val sink = Sink.foreach[ByteString] { bytes =>
      outputStream.write(bytes.toArray)
    }

    // materialize and run the stream
    res.body.runWith(sink).andThen {
      case result =>
        // Close the output stream whether there was an error or not
        outputStream.close()
        // Get the result or rethrow the error
        result.get
    }.map(_ => file)
}

另一种用途是将响应体stream到一个控制器的Action中进行返回:

def downloadFile = Action.async {

  // Make the request
  ws.url(url).withMethod("GET").stream().map {
    case StreamedResponse(response, body) =>

      // Check that the response was successful
      if (response.status == 200) {

        // Get the content type
        val contentType = response.headers.get("Content-Type").flatMap(_.headOption)
          .getOrElse("application/octet-stream")

        // If there's a content length, send that, otherwise return the body chunked
        response.headers.get("Content-Length") match {
          case Some(Seq(length)) =>
            Ok.sendEntity(HttpEntity.Streamed(body, Some(length.toLong), Some(contentType)))
          case _ =>
            Ok.chunked(body).as(contentType)
        }
      } else {
        BadGateway
      }
  }
}

你会注意到在调用stream()之前需要使用withMethod来设置请求方法.下面的例子中使用PUT:

val futureResponse: Future[StreamedResponse] =
  ws.url(url).withMethod("PUT").withBody("some body").stream()

Common Patterns and Use Cases

链接WS调用

使用for表达式将多个WS请求链接起来,然后使用Future.recover处理可能出现的错误:

val futureResponse: Future[WSResponse] = for {
  responseOne <- ws.url(urlOne).get()
  responseTwo <- ws.url(responseOne.body).get()
  responseThree <- ws.url(responseTwo.body).get()
} yield responseThree

futureResponse.recover {
  case e: Exception =>
    val exceptionData = Map("error" -> Seq(e.getMessage))
    ws.url(exceptionUrl).post(exceptionData)
}

在控制器中使用

在控制器中使用时可以将响应映射为一个Future[Result],可以和Play的action创建器Action.async组合使用:

def wsAction = Action.async {
  ws.url(url).get().map { response =>
    Ok(response.body)
  }
}
status(wsAction(FakeRequest())) must_== OK