Actors
Actors是为构建Akka应用提供的最基本结构.Actors提供了构建位置透明,分布式,高容错应用的高级抽象.具体提供了以下资料:
- 为并发和并行提供了简单高级的的抽象工具
- 异步/无阻塞/事件驱动的编程模型(使用消息)
- 轻量级事件驱动进程
Actors是封装状态和行为的对象.一个actor可以通过发送消息来改变其他actor的状态,从actor模型看,actor是一个响应接收到的消息的计算单元,然后回复消息或发送消息给其他actor.Actors之间通过各自邮箱的地址进行通信,并且通信是异步无阻塞的,并且只响应发送给他们的消息.
一个actor的生命周期大致可以由以下几部分组成:
- Actor初始化并启动
- Actor接收并使用特定的行为处理消息
- Actor接收到一个终止消息时关闭自己
Akka提供额外的hook来对生命周期进行管理:
- preStart()和postStop()用于执行初始化或清理actor处理消息时所使用的资源
- preRestart()和postRestart()用于管理异常发生时actor的状态或者作为actor的守护进程对actor进行监控
定义一个actor
定义一个actor只需要继承自Actor然后实现用于处理消息的必要抽象方法:
class MyActor extends Actor {
def receive = {
// behaviors
}
}
创建actor
一旦一个actor定义完成,就可以创建并启动它.所有的actor都创建于一个actor系统的上下文或其他actor中,创建后会立即启动,在接收和处理消息之前,actor会执行一个preStart()方法用于actor所有必要的初始化动作.
Akka提供了一个Props结构用于创建actor,Props是一个配置类,以应用于各种他所需要的配置和actor.
actor默认结构
val system = ActorSystem("MyActorSystem")
val myActor = system.actorOf(Props[MyActor], name = "myActor")
为了创建一个actor,我们创建了一个actor系统然后调用actorOf()方法.actorOf()方法接收两个参数,第一是Props对象,第二和是一个字符串类型的actor名称,这个名称需要全局唯一,Props对象接收了一个用于初始化和启动的Actor类对象,actor被实例化后会立即启动.
实例化的Actor使用ActorRef保持,ActorRef提供一个不可变且可串行的底层actor句柄.本质上说,ActorRef是对actor的封装,只能支持到actor的消息传递.每个actor可以通过self字段访问自身实例的引用.
actor非默认结构
首先定义一个简单的actor:
class MyActor(initialise:Int) extends Actor {
def receive = {
}
}
然后使用非默认结构的方式创建actor:
val system = ActorSystem("MyActorSystem")
val myActor = system.actorOf(Props(new MyActor(10)), name = "myActor")
使用分层结构创建actor
在一个actor对象内部使用parent context创建一个子actor.
class SupervisorActor extends Actor {
val myWorkerActor = context.actorOf(Props[MyWorkerActor], "myWorkerActor" )
}
消息模型
actor模型的前提是关于消息通信,actor的状态响应或回复基于传递给他的消息.所有的消息传递都是不可变的,如果你发送可变的消息给actor,actor可能行为怪异,因为他们共享了可变的消息.
发送消息
一旦actor的引用可用,消息就可以按如下方式在两个actor之间传递:
- Fire and forget:这是一个单向的消息模型,发送者不会等待接收者的回复,消息以异步方式发送,发送方法调用后会立即返回,使用tell()方法实现这种消息模式
- Send and receive:这种模式下发送者会等待接收者的回复,消息被异步的发送然后返回一个Future,一个可能的回复.使用ask()方法实现.
tell
actor ! msg
actor.tell(msg)
actor tell msg
ask
接收消息
class DemoActor extends Actor with ActorLogging {
def receive = {
case message:String =>
log. info("Message Received by Actor -> {}",message)
case _ => log.info("unknown message")
}
}
回复消息
def receive = {
case message:String =>
sender ! (message + "world")
}
将来的消息
有时,actor会被当做一个forwarding agent,如果一个actor被实现为提供路由/负载均衡/分片时,传入的actor会被转发给目标actor.这种情况下非常重要的是原始sender的引用维持和对目标actor的消息发送,以确保消息回复给原始sender引用而不是转发者.
actor.forward(message)
停止Actor
Actor的停止过程:
- 停止处理邮箱中的消息
- 向所哟子actor发送STOP信号
- 等待所有子actor的终止消息
- 调用自身终止进程
- 调用postStop()方法
- dump附属的邮箱
- 向DeathWatch发布终止消息
- 向监管者通报终止信息
杀死Actor
actor ! kill
热插拔
become()和unbecome()