Simple Akka: Akka Essentials - Actors

Actors

Actors是为构建Akka应用提供的最基本结构.Actors提供了构建位置透明,分布式,高容错应用的高级抽象.具体提供了以下资料:

  • 为并发和并行提供了简单高级的的抽象工具
  • 异步/无阻塞/事件驱动的编程模型(使用消息)
  • 轻量级事件驱动进程

Actors是封装状态和行为的对象.一个actor可以通过发送消息来改变其他actor的状态,从actor模型看,actor是一个响应接收到的消息的计算单元,然后回复消息或发送消息给其他actor.Actors之间通过各自邮箱的地址进行通信,并且通信是异步无阻塞的,并且只响应发送给他们的消息.
一个actor的生命周期大致可以由以下几部分组成:

  • Actor初始化并启动
  • Actor接收并使用特定的行为处理消息
  • Actor接收到一个终止消息时关闭自己

Akka提供额外的hook来对生命周期进行管理:

  • preStart()和postStop()用于执行初始化或清理actor处理消息时所使用的资源
  • preRestart()和postRestart()用于管理异常发生时actor的状态或者作为actor的守护进程对actor进行监控

定义一个actor

定义一个actor只需要继承自Actor然后实现用于处理消息的必要抽象方法:

class MyActor extends Actor { 
  def receive = {
    // behaviors
  }
}

创建actor

一旦一个actor定义完成,就可以创建并启动它.所有的actor都创建于一个actor系统的上下文或其他actor中,创建后会立即启动,在接收和处理消息之前,actor会执行一个preStart()方法用于actor所有必要的初始化动作.
Akka提供了一个Props结构用于创建actor,Props是一个配置类,以应用于各种他所需要的配置和actor.

actor默认结构

val system = ActorSystem("MyActorSystem")
val myActor = system.actorOf(Props[MyActor], name = "myActor")

为了创建一个actor,我们创建了一个actor系统然后调用actorOf()方法.actorOf()方法接收两个参数,第一是Props对象,第二和是一个字符串类型的actor名称,这个名称需要全局唯一,Props对象接收了一个用于初始化和启动的Actor类对象,actor被实例化后会立即启动.
实例化的Actor使用ActorRef保持,ActorRef提供一个不可变且可串行的底层actor句柄.本质上说,ActorRef是对actor的封装,只能支持到actor的消息传递.每个actor可以通过self字段访问自身实例的引用.

actor非默认结构

首先定义一个简单的actor:

class MyActor(initialise:Int) extends Actor { 
  def receive = {
  }
}

然后使用非默认结构的方式创建actor:

val system = ActorSystem("MyActorSystem")
val myActor = system.actorOf(Props(new MyActor(10)), name = "myActor")

使用分层结构创建actor

在一个actor对象内部使用parent context创建一个子actor.

class SupervisorActor extends Actor {
    val myWorkerActor = context.actorOf(Props[MyWorkerActor], "myWorkerActor" )
}

消息模型

actor模型的前提是关于消息通信,actor的状态响应或回复基于传递给他的消息.所有的消息传递都是不可变的,如果你发送可变的消息给actor,actor可能行为怪异,因为他们共享了可变的消息.

发送消息

一旦actor的引用可用,消息就可以按如下方式在两个actor之间传递:

  • Fire and forget:这是一个单向的消息模型,发送者不会等待接收者的回复,消息以异步方式发送,发送方法调用后会立即返回,使用tell()方法实现这种消息模式
  • Send and receive:这种模式下发送者会等待接收者的回复,消息被异步的发送然后返回一个Future,一个可能的回复.使用ask()方法实现.

tell

actor ! msg
actor.tell(msg)
actor tell msg

ask

接收消息

class DemoActor extends Actor with ActorLogging { 
  def receive = {
    case message:String =>
      log. info("Message Received by Actor -> {}",message)
    case _ => log.info("unknown message") 
  }
}

回复消息

def receive = {
  case message:String =>
    sender ! (message + "world")
}

将来的消息

有时,actor会被当做一个forwarding agent,如果一个actor被实现为提供路由/负载均衡/分片时,传入的actor会被转发给目标actor.这种情况下非常重要的是原始sender的引用维持和对目标actor的消息发送,以确保消息回复给原始sender引用而不是转发者.

actor.forward(message)

停止Actor

Actor的停止过程:

  • 停止处理邮箱中的消息
  • 向所哟子actor发送STOP信号
  • 等待所有子actor的终止消息
  • 调用自身终止进程
    • 调用postStop()方法
    • dump附属的邮箱
    • 向DeathWatch发布终止消息
    • 向监管者通报终止信息

杀死Actor

actor ! kill

热插拔

become()和unbecome()