消息可以通过路由发送到指定的actor,称为routees.一个Router可以在actor内部或外部使用,可以自己管理路由,或者使用一个可配置的路由actor.
可以根据应用的需求使用不同的路由策略,Akka提供了多种不同的路由策略,或者可以创建自己的路由策略.
A Simple Router
下面的例子说明了如何在一个actor中使用Router管理路由:
import akka.routing.{ ActorRefRoutee, RoundRobinRoutingLogic, Router }
class Master extends Actor {
var router = {
val routees = Vector.fill(5) {
val r = context.actorOf(Props[Worker])
context watch r
ActorRefRoutee(r)
}
Router(RoundRobinRoutingLogic(), routees)
}
def receive = {
case w: Work =>
router.route(w, sender())
case Terminated(a) =>
router = router.removeRoutee(a)
val r = context.actorOf(Props[Worker])
context watch r
router = router.addRoutee(r)
}
}
创建一个Router并制定使用RoundRobinRoutingLogic来对接收到的消息进行路由.
Akka中的路由规则:
- akka.routing.RoundRobinRoutingLogic
- akka.routing.RandomRoutingLogic
- akka.routing.SmallestMailboxRoutingLogic
- akka.routing.BroadcastRoutingLogic
- akka.routing.ScatterGatherFirstCompletedRoutingLogic
- akka.routing.TailChoppingRoutingLogic
- akka.routing.ConsistentHashingRoutingLogic
使用ActorRefRoutee封装的普通actor创建路由路径,同时watch这些actor以便他们终止的时候进行替换.
通过路由的route方法发送消息,在上面的例子中,当Mater接收到Work消息是,通过已创建的router调用route方法对该消息进行转发.
Router是不可变的,RoutingLogic是线程安全的,这意味着他们可以在actor之外使用.
注意,通常情况下,任何发送给router的消息都会发送到指定路径(routee),但是有一个例外,特殊的 Broadcast Messages 会被发送到Router的所有路径.
A Router Actor
路由同样可以作为一个自容器的actor创建,自身对路径进行管理,同时通过设置类加载路由逻辑.
这种类型的路由actor有两种不同的风格:
- Pool: 路由以子actor的形式创建路径,当子actor终止时从路由中移除
- Group: 路径actor在路由外部创建,同过selection将消息发送到指定的actor,不会去监控actor的终止信号
路由actor的设置可以在配置中或编码中进行设置.尽管路由actor可以在配置文件中进行定义,但仍然需要以编码的形式创建,而不能仅适用外部配置文件创建路由.同时,如果在配置文件中定义了路由,这些设置会替代所有以编码形式提供的参数.
给路径发送消息的方式在路由actor和外部actor的方式是一样的,由于他们都是ActorRef.路由actor在向路径发送消息时不会改变原始的sender.当路径返回消息时,回复同样会发送给原始sender,而不是路由的sender.
Pool
下面的代码片段展示了如何创建一个 round-robin 策略的路由并向5个Worker路径分发消息,这些路径会以路由子actor的形式创建:
akka.actor.deployment {
/parent/router1 {
router = round-robin-pool
nr-of-instances = 5
}
}
然后在代码中使用:
val router1: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router1")
下面是以编程的方式代替配置文件创建路由:
val router2: ActorRef =
context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")
Remote Deployed Routees
除了能够在本地创建actor作为路径,还可以自定路由将已创建的子actor部署在远程服务器上.路径会以round-robin的形式部署.为了能将路径部署在远程服务器,需要将路由配置使用RemoteRouterConfig进行包装,附加上需要部署的节点地址.远程部署需要在依赖中添加akka-remote模块:
import akka.actor.{ Address, AddressFromURIString }
import akka.remote.routing.RemoteRouterConfig
val addresses = Seq(
Address("akka.tcp", "remotesys", "otherhost", 1234),
AddressFromURIString("akka.tcp://othersys@anotherhost:1234"))
val routerRemote = system.actorOf(
RemoteRouterConfig(RoundRobinPool(5), addresses).props(Props[Echo]))
Senders
默认情况下,当路径发送消息时,它会隐式的将自身设置为一个sender:
sender() ! x // replies will go to this actor
通常情况下是讲router设置为一个sender,比如,当你想隐藏路由的详细路径时就需要把router设置为一个sender.下面的代码片段展示了如何建一个父router当做一个sender:
sender().tell("reply", context.parent) // replies will go back to parent
sender().!("reply")(context.parent) // alternative syntax (beware of the parens!)
Supervision
pool路由创建的路径会作为路由的子actor进行创建,因此路由同时是这些子actor的监管者.
router的监管策略可以通过Pool的supervisorStrategy属性进行设置,如果没有提供设置,路由的默认策略是”向上升级”,意思是讲错误传递给路由的监管者来处理,这是路由的监管者来决定如何处理错误.
注意,路由的监管者会把这些错误当成是router本省的错误,因此,停止或重启的指令会引起router自身停止或重启.然后router进一步引起子actor的停止或重启.
需要提醒的是,router的重启行为已经被重写了,这会重新创建所有的子actor,同时会在Pool中保持同样数量的子actor.
这意味着,如果你没有指定router的supervisorStrategy属性,或者路径的错误会升级到router的监管者,默认会对router进行重启,同时从其所有的路径,但是重启过程中并不会关闭原有的路径.
设置策略的方式很简单:
val escalator = OneForOneStrategy() {
case e ⇒ testActor ! e; SupervisorStrategy.Escalate
}
val router = system.actorOf(RoundRobinPool(1, supervisorStrategy = escalator).props(
routeeProps = Props[TestActor]))
注意,如果pool router的子路径终止了,它并不会自动创建添加一个新的子actor,当所有的路径都终止的话router自身也会终止,除非这是一个动态router.
Group
有些情况下,比起让router自己创建路径,更合适的方式是分别创建路径然后提供给router使用,可以将路径的的path传递给router的配置.消息会以ActorSelection的方式发送给这些路径.
下面的例子展示了如果通过传递路径actor地址的方式创建一个router:
akka.actor.deployment {
/parent/router3 {
router = round-robin-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
}
}
val router3: ActorRef =
context.actorOf(FromConfig.props(), "router3")
然后以编码方式替换配置文件的方式:
val router4: ActorRef =
context.actorOf(RoundRobinGroup(paths).props(), "router4")
system.actorOf(Props[Workers], "workers")
class Workers extends Actor {
context.actorOf(Props[Worker], name = "w1")
context.actorOf(Props[Worker], name = "w2")
context.actorOf(Props[Worker], name = "w3")
// ...
运行在远程服务上的actor需要提供响应的协议和地址.同样需要akka-remote模块:
akka.actor.deployment {
/parent/remoteGroup {
router = round-robin-group
routees.paths = [
"akka.tcp://app@10.0.0.1:2552/user/workers/w1",
"akka.tcp://app@10.0.0.2:2552/user/workers/w1",
"akka.tcp://app@10.0.0.3:2552/user/workers/w1"]
}
}
Router usage
这一节中的router actor在一个root actor中创建,名字为parent,配置文件中的部署地址为 /parent/后跟router actor的名字:
system.actorOf(Props[Parent], "parent")
RoundRobinPool and RoundRobinGroup
配置文件中定义RoundRobinPool:
akka.actor.deployment {
/parent/router1 {
router = round-robin-pool
nr-of-instances = 5
}
}
val router1: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router1")
代码形式:
val router2: ActorRef =
context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")
配置文件中定义RoundRobinGroup:
akka.actor.deployment {
/parent/router3 {
router = round-robin-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
}
}
val router3: ActorRef =
context.actorOf(FromConfig.props(), "router3")
代码形式:
val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
val router4: ActorRef =
context.actorOf(RoundRobinGroup(paths).props(), "router4")
RandomPool and RandomGroup
BalancingPool
SmallestMailboxPool
BroadcastPool and BroadcastGroup
ScatterGatherFirstCompletedPool and ScatterGatherFirstCompletedGroup
TailChoppingPool and TailChoppingGroup
ConsistentHashingPool and ConsistentHashingGroup
Specially Handled Messages
大多数消息会根据路由规则发送到指定的路径,但是有个别类型的消息用于特殊的行为.
Broadcast Messages
Broadcast消息用于向router的所有路径发送消息,当router收到Broadcast消息时会忽略一般消息的路由逻辑而将消息广播给所有的路径.
下面展示如何使用Broadcast发送一个重要的消息:
import akka.routing.Broadcast
router ! Broadcast("Watch out for Davy Jones' locker")
PoisonPill Messages
PoisonPill消息对于所有的actor都有一个特殊的处理方式,包括router.当actor收到一个PoisonPill消息,该actor将会关闭:
import akka.actor.PoisonPill
router ! PoisonPill
当PoisonPill消息发送给router时,router不会将其发给routees,router会在关闭自己的同时关闭所有子actor.
如果想要在所有的路径处理完消息之后再关闭router,这时就不能在使用PoisonPill了,这时需要使用Broadcast对PoisonPill进行包装,这样所有的子actor就都能收到PoisonPill消息了.这会关闭所有的路径,包括哪些不属于router子actor的路径在内.
import akka.actor.PoisonPill
import akka.routing.Broadcast
router ! Broadcast(PoisonPill)
Kill Messages
收到该消息后router会抛出ActorKilledException异常然后失败,随后由监管者决定是重启还是关闭.
router的子Actor会收到router的影响,行为有router的监管策略决定,哪些不属于router子actor的路径将不会收到影响.
import akka.actor.Kill
router ! Kill
而下面的方式将关闭所有路径,包括非router的子actor路径:
import akka.actor.Kill
import akka.routing.Broadcast
router ! Broadcast(Kill)
Management Messages
以下消息对router的路径进行管理:
- akka.routing.GetRoutees: 返回一个akka.routing.Routees,router当前使用的所有路径
- akka.routing.RemoveRoutee: 向当前的路径集合添加该路径
- akka.routing.RemoveRoutee: 从router当前的路径集合中删除该路径
- akka.routing.AdjustPoolSize: 向router的路径集合中添加或删除对应数量的路径
Dynamically Resizable Pool
How Routing is Designed within Akka
Configuring Dispatchers
akka.actor.deployment {
/poolWithDispatcher {
router = random-pool
nr-of-instances = 5
pool-dispatcher {
fork-join-executor.parallelism-min = 5
fork-join-executor.parallelism-max = 5
}
}
}