The application actor system
一个Play应用定义了一个特殊的actor system用于使用,这个actor system跟随应用的声明周期,当应用重启时会自动重启.
Writing actors
开始使用Akka时需要编写一个actor,下面是一个简单的例子:
import akka.actor._
object HelloActor {
def props = Props[HelloActor]
case class SayHello(name: String)
}
class HelloActor extends Actor {
import HelloActor._
def receive = {
case SayHello(name: String) =>
sender() ! "Hello, " + name
}
}
这个actor遵循了一些Akka的协定:
- 它发送或接受的消息,或者成为协议,在它的伴生对象中进行定义
- 在半生对象中同时定义了一个
props
方法用于返回该actor的引用
Creating and using actors
创建或使用actor时需要一个ActorSystem
,可以通过声明一个ActorSystem依赖获得:
import play.api.mvc._
import akka.actor._
import javax.inject._
import actors.HelloActor
@Singleton
class Application @Inject() (system: ActorSystem) extends Controller {
val helloActor = system.actorOf(HelloActor.props, "hello-actor")
//...
}
actorOf
方法用于创建一个新的actor,注意我们声明这个控制器为单例.当创建一个actor或保存它的引用时时非常必要的.如果没有声明为单例,则控制器每次被创建都会创建一个新的actor,最终会抛出一个异常,因为一个ActorSystem中不能存在两个名字一样的actor.
Asking things of actors
一个actor最基本的功能就是向他发送一个消息,就像HTTP有一个请求和响应的协议一样.这种场景下,比较相似的用法是使用ask
模式,该模式返回一个Future,然后你就可以将它映射为你自己的结果类型.
下面的例子使用ask
模式:
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import scala.concurrent.duration._
import akka.pattern.ask
implicit val timeout = 5.seconds
def sayHello(name: String) = Action.async {
(helloActor ? SayHello(name)).mapTo[String].map { message =>
Ok(message)
}
}
需要注意的地方:
ask
模式需要被引入(import),然会会为actor提供一个?
操作符- ask返回的结果类型为
Future[Any]
,然后就可以使用mapTo
方法映射为自己想要的类型 - 在作用域中需要提供一个隐式的超时时间设置,超时后会返回一个超时错误
Dependency injecting actors
如果需要,可以将Guice实例到actor并绑定到控制器或依赖它的组件上.
比如,如果你需要一个基于Play配置的actor:
import akka.actor._
import javax.inject._
import play.api.Configuration
object ConfiguredActor {
case object GetConfig
}
class ConfiguredActor @Inject() (configuration: Configuration) extends Actor {
import ConfiguredActor._
val config = configuration.getString("my.config").getOrElse("none")
def receive = {
case GetConfig =>
sender() ! config
}
}
Play提供了一些辅助方法用于actor绑定.这允许actor本身是依赖注入的,并允许actor的引用被注入到其他组件.
import com.google.inject.AbstractModule
import play.api.libs.concurrent.AkkaGuiceSupport
import actors.ConfiguredActor
class MyModule extends AbstractModule with AkkaGuiceSupport {
def configure = {
bindActor[ConfiguredActor]("configured-actor")
}
}
这个actor会被命名为configured-actor
,同时又能使用configured-actor
来进行注入.现在就可以在控制器基于actor或者其他的组件中.
import play.api.mvc._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import javax.inject._
import actors.ConfiguredActor._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
@Singleton
class Application @Inject() (@Named("configured-actor") configuredActor: ActorRef)
(implicit ec: ExecutionContext) extends Controller {
implicit val timeout: Timeout = 5.seconds
def getConfig = Action.async {
(configuredActor ? GetConfig).mapTo[String].map { message =>
Ok(message)
}
}
}
Dependency injecting child actors
上面的例子是注入root级actor,但是很多其他的actor都是子actor并且不会绑定到Play应用的声明周期,并且可能会有额外的状态传送给他们.
为了能够实现子actor的依赖注入,Play利用了Guice的AssistedInject
支持.
比如下面的actor,哪个依赖的配置文件被注入了,添加一个key参数:
import akka.actor._
import javax.inject._
import com.google.inject.assistedinject.Assisted
import play.api.Configuration
object ConfiguredChildActor {
case object GetConfig
trait Factory {
def apply(key: String): Actor
}
}
class ConfiguredChildActor @Inject() (configuration: Configuration,
@Assisted key: String) extends Actor {
import ConfiguredChildActor._
val config = configuration.getString(key).getOrElse("none")
def receive = {
case GetConfig =>
sender() ! config
}
}
注意key参数被声明为@Assisted
,这表示它将要收到提供.
同时定义了一个Factory
特质,接收一个key
参数,然后返回一个actor.我们并不会去实现它,Guice会实现它,不但给我们传入一个key参数,同时找出Configuration
依赖并进行注入.由于这个特质仅返回一个actor,测试的时候我们可以注入一个factory然后返回任何actor,比如他允许我们注入一个mock的子actor,而不是一个实际的actor.
现在,基于这样的actor我们可以扩展InjectedActorSupport
,可以基于我们创建的factory:
import akka.actor._
import javax.inject._
import play.api.libs.concurrent.InjectedActorSupport
object ParentActor {
case class GetChild(key: String)
}
class ParentActor @Inject() (
childFactory: ConfiguredChildActor.Factory
) extends Actor with InjectedActorSupport {
import ParentActor._
def receive = {
case GetChild(key: String) =>
val child: ActorRef = injectedChild(childFactory(key), key)
sender() ! child
}
}
它使用injectedChild
创建并获取一个子actor的引用,通过传入一个key.
最后我们需要绑定我们的actor,我们使用bindActorFactory
方法来绑定父actor,同时将子factory绑定到子actor的实现:
import com.google.inject.AbstractModule
import play.api.libs.concurrent.AkkaGuiceSupport
import actors._
class MyModule extends AbstractModule with AkkaGuiceSupport {
def configure = {
bindActor[ParentActor]("parent-actor")
bindActorFactory[ConfiguredChildActor, ConfiguredChildActor.Factory]
}
}
这会使Guice自动绑定一个ConfiguredChildActor.Factory
实例,当ConfiguredChildActor
初始化时会为其提供一个Configuration
实例.
Configuration
默认的actor system的配置会去读取Play应用的配置文件,比如,为应用的actor system配置默认的dispatcher,在conf/application.conf
文件中添加:
akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 64
akka.actor.debug.receive = on
Changing configuration prefix
如果需要使用akka.*
配置来设置另一个actor system,可以告诉Play从另一个位置加载Akka配置:
play.akka.config = "my-akka"
现在配置会从my-akka
读取而不再是akka
:
my-akka.actor.default-dispatcher.fork-join-executor.pool-size-max = 64
my-akka.actor.debug.receive = on
Built-in actor system name
Play中默认的actor system名为application
,可以在conf/application.conf
中进行修改:
play.akka.actor-system = "custom-name"
Scheduling asynchronous tasks
可以安排(schedule)发送消息到actor并执行任务(函数或者Runnable).会返回一个Cancellable
,可以调用cancel
来取消安排过的操作执行.
比如,每300毫秒像testActor
发送一个消息:
import scala.concurrent.duration._
val cancellable = system.scheduler.schedule(0.microseconds, 300.microseconds, testActor, "tick")
类似的,每10秒运行一次代码块:
import play.api.libs.concurrent.Execution.Implicits.defaultContext
system.scheduler.scheduleOnce(10.milliseconds) {
file.delete()
}
Using your own Actor system
在使用自己的actor system时需要注意的地方:
- 注册一个
stop hook
,当Play关闭时关闭actor system - 从Play环境变量传入一个正确的类加载器,除非Akka并不需要查找你应用中的类.
- 确定要么改变Play使用
play.akka.config
读取配置文件的位置,要么你不要从默认的akka
配置中读取你的配置文件,因为当多个actor system试图绑定到同一个地址是会出现错误