Simple PlayFramework: Integrating with Akka

The application actor system

一个Play应用定义了一个特殊的actor system用于使用,这个actor system跟随应用的声明周期,当应用重启时会自动重启.

Writing actors

开始使用Akka时需要编写一个actor,下面是一个简单的例子:

import akka.actor._

object HelloActor {
  def props = Props[HelloActor]

  case class SayHello(name: String)
}

class HelloActor extends Actor {
  import HelloActor._

  def receive = {
    case SayHello(name: String) =>
      sender() ! "Hello, " + name
  }
}

这个actor遵循了一些Akka的协定:

  1. 它发送或接受的消息,或者成为协议,在它的伴生对象中进行定义
  2. 在半生对象中同时定义了一个props方法用于返回该actor的引用

Creating and using actors

创建或使用actor时需要一个ActorSystem,可以通过声明一个ActorSystem依赖获得:

import play.api.mvc._
import akka.actor._
import javax.inject._

import actors.HelloActor

@Singleton
class Application @Inject() (system: ActorSystem) extends Controller {

  val helloActor = system.actorOf(HelloActor.props, "hello-actor")

  //...
}

actorOf方法用于创建一个新的actor,注意我们声明这个控制器为单例.当创建一个actor或保存它的引用时时非常必要的.如果没有声明为单例,则控制器每次被创建都会创建一个新的actor,最终会抛出一个异常,因为一个ActorSystem中不能存在两个名字一样的actor.

Asking things of actors

一个actor最基本的功能就是向他发送一个消息,就像HTTP有一个请求和响应的协议一样.这种场景下,比较相似的用法是使用ask模式,该模式返回一个Future,然后你就可以将它映射为你自己的结果类型.

下面的例子使用ask模式:

import play.api.libs.concurrent.Execution.Implicits.defaultContext
import scala.concurrent.duration._
import akka.pattern.ask
implicit val timeout = 5.seconds

def sayHello(name: String) = Action.async {
  (helloActor ? SayHello(name)).mapTo[String].map { message =>
    Ok(message)
  }
}

需要注意的地方:

  1. ask模式需要被引入(import),然会会为actor提供一个?操作符
  2. ask返回的结果类型为Future[Any],然后就可以使用mapTo方法映射为自己想要的类型
  3. 在作用域中需要提供一个隐式的超时时间设置,超时后会返回一个超时错误

Dependency injecting actors

如果需要,可以将Guice实例到actor并绑定到控制器或依赖它的组件上.

比如,如果你需要一个基于Play配置的actor:

import akka.actor._
import javax.inject._
import play.api.Configuration

object ConfiguredActor {
  case object GetConfig
}

class ConfiguredActor @Inject() (configuration: Configuration) extends Actor {
  import ConfiguredActor._

  val config = configuration.getString("my.config").getOrElse("none")

  def receive = {
    case GetConfig =>
      sender() ! config
  }
}

Play提供了一些辅助方法用于actor绑定.这允许actor本身是依赖注入的,并允许actor的引用被注入到其他组件.

import com.google.inject.AbstractModule
import play.api.libs.concurrent.AkkaGuiceSupport

import actors.ConfiguredActor

class MyModule extends AbstractModule with AkkaGuiceSupport {
  def configure = {
    bindActor[ConfiguredActor]("configured-actor")
  }
}

这个actor会被命名为configured-actor,同时又能使用configured-actor来进行注入.现在就可以在控制器基于actor或者其他的组件中.

import play.api.mvc._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import javax.inject._
import actors.ConfiguredActor._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

@Singleton
class Application @Inject() (@Named("configured-actor") configuredActor: ActorRef)
                            (implicit ec: ExecutionContext) extends Controller {

  implicit val timeout: Timeout = 5.seconds

  def getConfig = Action.async {
    (configuredActor ? GetConfig).mapTo[String].map { message =>
      Ok(message)
    }
  }
}

Dependency injecting child actors

上面的例子是注入root级actor,但是很多其他的actor都是子actor并且不会绑定到Play应用的声明周期,并且可能会有额外的状态传送给他们.

为了能够实现子actor的依赖注入,Play利用了Guice的AssistedInject支持.

比如下面的actor,哪个依赖的配置文件被注入了,添加一个key参数:

import akka.actor._
import javax.inject._
import com.google.inject.assistedinject.Assisted
import play.api.Configuration

object ConfiguredChildActor {
  case object GetConfig

  trait Factory {
    def apply(key: String): Actor
  }
}

class ConfiguredChildActor @Inject() (configuration: Configuration,
    @Assisted key: String) extends Actor {
  import ConfiguredChildActor._

  val config = configuration.getString(key).getOrElse("none")

  def receive = {
    case GetConfig =>
      sender() ! config
  }
}

注意key参数被声明为@Assisted,这表示它将要收到提供.

同时定义了一个Factory特质,接收一个key参数,然后返回一个actor.我们并不会去实现它,Guice会实现它,不但给我们传入一个key参数,同时找出Configuration依赖并进行注入.由于这个特质仅返回一个actor,测试的时候我们可以注入一个factory然后返回任何actor,比如他允许我们注入一个mock的子actor,而不是一个实际的actor.

现在,基于这样的actor我们可以扩展InjectedActorSupport,可以基于我们创建的factory:

import akka.actor._
import javax.inject._
import play.api.libs.concurrent.InjectedActorSupport

object ParentActor {
  case class GetChild(key: String)
}

class ParentActor @Inject() (
    childFactory: ConfiguredChildActor.Factory
) extends Actor with InjectedActorSupport {
  import ParentActor._

  def receive = {
    case GetChild(key: String) =>
      val child: ActorRef = injectedChild(childFactory(key), key)
      sender() ! child
  }
}

它使用injectedChild创建并获取一个子actor的引用,通过传入一个key.

最后我们需要绑定我们的actor,我们使用bindActorFactory方法来绑定父actor,同时将子factory绑定到子actor的实现:

import com.google.inject.AbstractModule
import play.api.libs.concurrent.AkkaGuiceSupport

import actors._

class MyModule extends AbstractModule with AkkaGuiceSupport {
  def configure = {
    bindActor[ParentActor]("parent-actor")
    bindActorFactory[ConfiguredChildActor, ConfiguredChildActor.Factory]
  }
}

这会使Guice自动绑定一个ConfiguredChildActor.Factory实例,当ConfiguredChildActor初始化时会为其提供一个Configuration实例.

Configuration

默认的actor system的配置会去读取Play应用的配置文件,比如,为应用的actor system配置默认的dispatcher,在conf/application.conf文件中添加:

akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 64
akka.actor.debug.receive = on

Changing configuration prefix

如果需要使用akka.*配置来设置另一个actor system,可以告诉Play从另一个位置加载Akka配置:

play.akka.config = "my-akka"

现在配置会从my-akka读取而不再是akka:

my-akka.actor.default-dispatcher.fork-join-executor.pool-size-max = 64
my-akka.actor.debug.receive = on

Built-in actor system name

Play中默认的actor system名为application,可以在conf/application.conf中进行修改:

play.akka.actor-system = "custom-name"

Scheduling asynchronous tasks

可以安排(schedule)发送消息到actor并执行任务(函数或者Runnable).会返回一个Cancellable,可以调用cancel来取消安排过的操作执行.

比如,每300毫秒像testActor发送一个消息:

import scala.concurrent.duration._

val cancellable = system.scheduler.schedule(0.microseconds, 300.microseconds, testActor, "tick")

类似的,每10秒运行一次代码块:

import play.api.libs.concurrent.Execution.Implicits.defaultContext
system.scheduler.scheduleOnce(10.milliseconds) {
  file.delete()
}

Using your own Actor system

在使用自己的actor system时需要注意的地方:

  1. 注册一个stop hook,当Play关闭时关闭actor system
  2. 从Play环境变量传入一个正确的类加载器,除非Akka并不需要查找你应用中的类.
  3. 确定要么改变Play使用play.akka.config读取配置文件的位置,要么你不要从默认的akka配置中读取你的配置文件,因为当多个actor system试图绑定到同一个地址是会出现错误