Simple Akka: Routing

消息可以通过路由发送到指定的actor,称为routees.一个Router可以在actor内部或外部使用,可以自己管理路由,或者使用一个可配置的路由actor.

可以根据应用的需求使用不同的路由策略,Akka提供了多种不同的路由策略,或者可以创建自己的路由策略.

A Simple Router

下面的例子说明了如何在一个actor中使用Router管理路由:

import akka.routing.{ ActorRefRoutee, RoundRobinRoutingLogic, Router }

class Master extends Actor {
  var router = {
    val routees = Vector.fill(5) {
      val r = context.actorOf(Props[Worker])
      context watch r
      ActorRefRoutee(r)
    }
    Router(RoundRobinRoutingLogic(), routees)
  }

  def receive = {
    case w: Work =>
      router.route(w, sender())
    case Terminated(a) =>
      router = router.removeRoutee(a)
      val r = context.actorOf(Props[Worker])
      context watch r
      router = router.addRoutee(r)
  }
}

创建一个Router并制定使用RoundRobinRoutingLogic来对接收到的消息进行路由.

Akka中的路由规则:

  • akka.routing.RoundRobinRoutingLogic
  • akka.routing.RandomRoutingLogic
  • akka.routing.SmallestMailboxRoutingLogic
  • akka.routing.BroadcastRoutingLogic
  • akka.routing.ScatterGatherFirstCompletedRoutingLogic
  • akka.routing.TailChoppingRoutingLogic
  • akka.routing.ConsistentHashingRoutingLogic

使用ActorRefRoutee封装的普通actor创建路由路径,同时watch这些actor以便他们终止的时候进行替换.

通过路由的route方法发送消息,在上面的例子中,当Mater接收到Work消息是,通过已创建的router调用route方法对该消息进行转发.

Router是不可变的,RoutingLogic是线程安全的,这意味着他们可以在actor之外使用.

注意,通常情况下,任何发送给router的消息都会发送到指定路径(routee),但是有一个例外,特殊的 Broadcast Messages 会被发送到Router的所有路径.

A Router Actor

路由同样可以作为一个自容器的actor创建,自身对路径进行管理,同时通过设置类加载路由逻辑.

这种类型的路由actor有两种不同的风格:

  1. Pool: 路由以子actor的形式创建路径,当子actor终止时从路由中移除
  2. Group: 路径actor在路由外部创建,同过selection将消息发送到指定的actor,不会去监控actor的终止信号

路由actor的设置可以在配置中或编码中进行设置.尽管路由actor可以在配置文件中进行定义,但仍然需要以编码的形式创建,而不能仅适用外部配置文件创建路由.同时,如果在配置文件中定义了路由,这些设置会替代所有以编码形式提供的参数.

给路径发送消息的方式在路由actor和外部actor的方式是一样的,由于他们都是ActorRef.路由actor在向路径发送消息时不会改变原始的sender.当路径返回消息时,回复同样会发送给原始sender,而不是路由的sender.

Pool

下面的代码片段展示了如何创建一个 round-robin 策略的路由并向5个Worker路径分发消息,这些路径会以路由子actor的形式创建:

akka.actor.deployment {
  /parent/router1 {
    router = round-robin-pool
    nr-of-instances = 5
  }
}

然后在代码中使用:

val router1: ActorRef =
  context.actorOf(FromConfig.props(Props[Worker]), "router1")

下面是以编程的方式代替配置文件创建路由:

val router2: ActorRef =
  context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")

Remote Deployed Routees

除了能够在本地创建actor作为路径,还可以自定路由将已创建的子actor部署在远程服务器上.路径会以round-robin的形式部署.为了能将路径部署在远程服务器,需要将路由配置使用RemoteRouterConfig进行包装,附加上需要部署的节点地址.远程部署需要在依赖中添加akka-remote模块:

import akka.actor.{ Address, AddressFromURIString }
import akka.remote.routing.RemoteRouterConfig
val addresses = Seq(
  Address("akka.tcp", "remotesys", "otherhost", 1234),
  AddressFromURIString("akka.tcp://othersys@anotherhost:1234"))
val routerRemote = system.actorOf(
  RemoteRouterConfig(RoundRobinPool(5), addresses).props(Props[Echo]))

Senders

默认情况下,当路径发送消息时,它会隐式的将自身设置为一个sender:

sender() ! x // replies will go to this actor

通常情况下是讲router设置为一个sender,比如,当你想隐藏路由的详细路径时就需要把router设置为一个sender.下面的代码片段展示了如何建一个父router当做一个sender:

sender().tell("reply", context.parent) // replies will go back to parent
sender().!("reply")(context.parent) // alternative syntax (beware of the parens!)

Supervision

pool路由创建的路径会作为路由的子actor进行创建,因此路由同时是这些子actor的监管者.

router的监管策略可以通过Pool的supervisorStrategy属性进行设置,如果没有提供设置,路由的默认策略是”向上升级”,意思是讲错误传递给路由的监管者来处理,这是路由的监管者来决定如何处理错误.

注意,路由的监管者会把这些错误当成是router本省的错误,因此,停止或重启的指令会引起router自身停止或重启.然后router进一步引起子actor的停止或重启.

需要提醒的是,router的重启行为已经被重写了,这会重新创建所有的子actor,同时会在Pool中保持同样数量的子actor.

这意味着,如果你没有指定router的supervisorStrategy属性,或者路径的错误会升级到router的监管者,默认会对router进行重启,同时从其所有的路径,但是重启过程中并不会关闭原有的路径.

设置策略的方式很简单:

val escalator = OneForOneStrategy() {
  case e ⇒ testActor ! e; SupervisorStrategy.Escalate
}
val router = system.actorOf(RoundRobinPool(1, supervisorStrategy = escalator).props(
  routeeProps = Props[TestActor]))

注意,如果pool router的子路径终止了,它并不会自动创建添加一个新的子actor,当所有的路径都终止的话router自身也会终止,除非这是一个动态router.

Group

有些情况下,比起让router自己创建路径,更合适的方式是分别创建路径然后提供给router使用,可以将路径的的path传递给router的配置.消息会以ActorSelection的方式发送给这些路径.

下面的例子展示了如果通过传递路径actor地址的方式创建一个router:

akka.actor.deployment {
  /parent/router3 {
    router = round-robin-group
    routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  }
}

val router3: ActorRef =
  context.actorOf(FromConfig.props(), "router3")

然后以编码方式替换配置文件的方式:

val router4: ActorRef =
  context.actorOf(RoundRobinGroup(paths).props(), "router4")

system.actorOf(Props[Workers], "workers")

class Workers extends Actor {
  context.actorOf(Props[Worker], name = "w1")
  context.actorOf(Props[Worker], name = "w2")
  context.actorOf(Props[Worker], name = "w3")
  // ...

运行在远程服务上的actor需要提供响应的协议和地址.同样需要akka-remote模块:

akka.actor.deployment {
  /parent/remoteGroup {
    router = round-robin-group
    routees.paths = [
      "akka.tcp://app@10.0.0.1:2552/user/workers/w1", 
      "akka.tcp://app@10.0.0.2:2552/user/workers/w1",
      "akka.tcp://app@10.0.0.3:2552/user/workers/w1"]
  }
}

Router usage

这一节中的router actor在一个root actor中创建,名字为parent,配置文件中的部署地址为 /parent/后跟router actor的名字:

system.actorOf(Props[Parent], "parent")

RoundRobinPool and RoundRobinGroup

配置文件中定义RoundRobinPool:

akka.actor.deployment {
  /parent/router1 {
    router = round-robin-pool
    nr-of-instances = 5
  }
}
val router1: ActorRef =
  context.actorOf(FromConfig.props(Props[Worker]), "router1")

代码形式:

val router2: ActorRef =
  context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")

配置文件中定义RoundRobinGroup:

akka.actor.deployment {
  /parent/router3 {
    router = round-robin-group
    routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  }
}
val router3: ActorRef =
  context.actorOf(FromConfig.props(), "router3")

代码形式:

val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
val router4: ActorRef =
  context.actorOf(RoundRobinGroup(paths).props(), "router4")

RandomPool and RandomGroup

BalancingPool

SmallestMailboxPool

BroadcastPool and BroadcastGroup

ScatterGatherFirstCompletedPool and ScatterGatherFirstCompletedGroup

TailChoppingPool and TailChoppingGroup

ConsistentHashingPool and ConsistentHashingGroup

Specially Handled Messages

大多数消息会根据路由规则发送到指定的路径,但是有个别类型的消息用于特殊的行为.

Broadcast Messages

Broadcast消息用于向router的所有路径发送消息,当router收到Broadcast消息时会忽略一般消息的路由逻辑而将消息广播给所有的路径.

下面展示如何使用Broadcast发送一个重要的消息:

import akka.routing.Broadcast
router ! Broadcast("Watch out for Davy Jones' locker")

PoisonPill Messages

PoisonPill消息对于所有的actor都有一个特殊的处理方式,包括router.当actor收到一个PoisonPill消息,该actor将会关闭:

import akka.actor.PoisonPill
router ! PoisonPill

当PoisonPill消息发送给router时,router不会将其发给routees,router会在关闭自己的同时关闭所有子actor.

如果想要在所有的路径处理完消息之后再关闭router,这时就不能在使用PoisonPill了,这时需要使用Broadcast对PoisonPill进行包装,这样所有的子actor就都能收到PoisonPill消息了.这会关闭所有的路径,包括哪些不属于router子actor的路径在内.

import akka.actor.PoisonPill
import akka.routing.Broadcast
router ! Broadcast(PoisonPill)

Kill Messages

收到该消息后router会抛出ActorKilledException异常然后失败,随后由监管者决定是重启还是关闭.

router的子Actor会收到router的影响,行为有router的监管策略决定,哪些不属于router子actor的路径将不会收到影响.

import akka.actor.Kill
router ! Kill

而下面的方式将关闭所有路径,包括非router的子actor路径:

import akka.actor.Kill
import akka.routing.Broadcast
router ! Broadcast(Kill)

Management Messages

以下消息对router的路径进行管理:

  1. akka.routing.GetRoutees: 返回一个akka.routing.Routees,router当前使用的所有路径
  2. akka.routing.RemoveRoutee: 向当前的路径集合添加该路径
  3. akka.routing.RemoveRoutee: 从router当前的路径集合中删除该路径
  4. akka.routing.AdjustPoolSize: 向router的路径集合中添加或删除对应数量的路径

Dynamically Resizable Pool

How Routing is Designed within Akka

Configuring Dispatchers

akka.actor.deployment {
  /poolWithDispatcher {
    router = random-pool
    nr-of-instances = 5
    pool-dispatcher {
      fork-join-executor.parallelism-min = 5
      fork-join-executor.parallelism-max = 5
    }
  }
}