Simple Akka: Akka Essentials - Dispatchers and Routers

Dispatchers(调度器)

真实世界中,调度器是负责消息收发的沟通协调员,在紧急服务中,调度员负责在调度中将消息传递给其他部门(医疗/警察/消费/其他)的人,调度员负责协调所有这些部门的路线和活动,以确保帮助能够尽快正确的到达目的地.

调度器作为一种模式

调度程序是java程序中公认和使用的的模式.调度程序用于控制执行流.调度器基于调度策略将传入的消息或请求路由给业务流程.作为一种模式的调度器有如下优点:

  • Centralized control(集中控制):调度器提供一个消息/请求集中处理的中心.这个”集中”是指代码的重复使用,提高可维护性并减少代码重复.
  • Application partitioning(应用程序分区):使业务逻辑和显式逻辑清晰分离.
  • Reduced inter-dependencies(减少依赖关系):显式逻辑与业务逻辑分离意味着两者的依赖关系减少,这意味着更少的争用相同资源,从而构建一种可扩展模型.

调度员作为一种概念在程序中提供集中的控制机制,减少相互间的依赖关系从而将不同的处理逻辑分离.

Java中的Executor

在Akka中,dispatchers基于java的执行框架(一部分是java.util.concurrent),Executor提供了用于执行异步任务的框架,它基于生产者-消费者模型,这意味着被分为任务提交行为和任务执行行为.提交和执行任务是不同的线程.

两种非常重要的执行器框架:

  • ThreadPoolExecutor: 使用根据配置预定义的线程池中的线程执行每个提交的任务.
  • ForkJoinPool: 使用同样的线程池模型但是辅以work-stealing模式.所有在池中的线程尝试去执行其他线程创建的子任务,这样就很少有线程处于空闲状态,非常高效。

Executor被构造成可以自定义和控制任务的执行方式.使用Executor的控制结构:

  • 线程池大小
  • 如何在处理该任务前将任务列队
  • 多少个任务可以同时执行
  • 如果系统过载,被选中的任务被拒绝怎么办
  • 什么样的任务执行顺序(LIFO/FIFO/other)
  • 可以运行什么样的pre-和post-任务

Akka中的Dispatchers

在Akka中,调度器负责消息与映射到底层线程的Actor之间的调度.他们要确保资源进行优化,并尽可能快地处理消息.阿卡提供多个可以根据底层硬件资源(核心或可用内存的大小)和应用程序工作负载类型定制的调度策略.

如果我们把机场的例子映射到Akka,则机场跑道被映射为线程,不同组的航线被映射为包含消息的各个邮箱,控制塔则被映射为调度器.

airport

akka-dis1

调度器运行于线程中,将actor和来自附加邮箱的邮件和向执行者线程堆上分配。执行器线程配置和调到底层的处理器内核,用于处理消息。

Dispatchers类型

Akka中提供四种调度器框架:

  • Dispatcher
  • Pinned dispatcher
  • Balancing dispatcher
  • Calling thread dispatcher

同时提供四种邮箱:

  • Unbounded mailbox
  • Bounded mailbox
  • Unbounded priority mailbox
  • Bounded priority mailbox

Akka支持用户自己实现不同的调度器和邮箱类型.

Dispatcher

Dispatcher

如果没有任何配置选项时,akka将使用Dispatcher作为默认的调度器.这是一种基于事件的调度器,通过BlockingQueue方法将一组actor绑定到线程池.
Dispatcher提供的特性:

  • 每个actor都有自己的邮箱
  • 调度器可以被任意数量的actor共享
  • 同时支持thread-pool/fork-join-pool
  • 针对非阻塞代码进行了优化

Pinned dispatcher

Pinned-dispatcher

这种调度器为每个actor创建一个单一专用的线程.这种调度器适用于actor执行IO操作或长时间的运行计算.当线程处于一段时间的非活动状态后将解除与actor的绑定.
特性:

  • 每个actor都有自己的邮箱
  • 每个actor都有单独的线程意味着该调度器不能与其他actor共享
  • 支持thread pool executor
  • 调度程序为阻塞操作进行额优化,例如.如果代码使用I/O调用或数据库调用,此类行为者将等待直到完成任务.对于这种阻塞的操作,固定的调度性能优于默认调度。

Balancing dispatcher

Balancing-dispatcher

负载均衡调度器,基于事件的将消息在繁忙和空闲的actor之间平衡调度.只能用于所有的actor都是相同类型的.这种任务的重新分配类似于ForkJoinPool中的work-stealing技术,这种调度器寻找空闲的actor然后把任务分配给他进行处理.
特性:

  • 所有actor公用一个邮箱
  • 这种调度器只能被同一种actor共享
  • 同时支持thread-pool/fork-join-pool

Calling thread dispatcher

这种调度器主要用于测试.调度器只在当前线程上执行任务.它不会创建任何新的线程并且提供确定性的执行顺序.
特性:

  • 每个actor都有各自的邮箱
  • 可以被任意数量的actor共享
  • 支持calling thread

邮箱类型

邮箱是根据java并发包种的queue实现的.队列特征有如下两种:

  • Blocking queue:阻塞队列意味着在向队列中填充元素时等待空间可用,同时从队列检索元素时会等待队列变成非空.
  • Bounded queue:界限队列意味着队列有一个最小值,添加的元素数量不能超过预期的设置值.

Akka中的队列实现都基于下面的blocking/bounded代理:
MailBoxType

Dispatcher用法

  • Thread pool executor:首先创建一个工作线程池,任务通过队列分配到线程池.如果任务数量超过线程数量,任务将会被排队直到线程池中出现可用线程,工作线程最小化分配/解锁线程的开销.
  • Fork join executor:这种思路基于分治的思想,把一项大任务分解成更小的任务,让后将结果组合成最终的答案,所有的任务需要能够独立并行运行.

对于每种执行唤醒,akka都提供自定义配置以定义和构造底层资源:

  • 可分配的最小线程数
  • 可分配的最大线程数
  • 将要使用的乘数因子(取决于CPU核心数)

比如最小线程数是3因子是2,则调度器启动6个线程,最大线程数定义了线程数上限,如果最大线程数是8,则最多可创建8 * 2 = 16个线程.

Thread pool executor

下面是Thread pool需要配置的参数:

# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads
core-pool-size-min = 2
# available processors * factor
core-pool-size-factor = 2.0
# maximum number of threads
core-pool-size-max = 10
}

Fork join executor

下面是Fork join需要配置的参数:

# Configuration for the fork join pool
fork-join-executor {
# Min number of threads
parallelism-min = 2
# available processors * factor
parallelism-factor = 2.0
# Max number of threads
parallelism-max = 10
}

根据调度器和支持的执行器的不同,还有一些其他的参数与前面的参数同时进行配置:








































Parameter name Description Potential values
type 标识事件类型调度程序正在使用的名称 Dispatcher/PinnedDispatcher/BalancingDispatcher/FQCN of a class extending MessageDispatcherConfigurato
executor 使用何种executor服务 fork-join-executor/thread-pool-executor/FQCN of a class extending ExecutorServiceConfigurator
fork-joinexecutor fork-join-executor parameters
thread-poolexecutor Rthread-pool-executor parameters
throughput 线程在跳到下个线程之前当前actor能够处理的消息的最大数量 1
Rmailboxcapacity(可选) actor队列的邮箱容量 Negative (or zero) implies usage of an unbounded mailbox (default). A positive number implies bounded mailbox and with the specified size.
mailbox-type 要使用的邮箱类型 Bounded or unbounded mailbox used if nothing is specified (dependent on mailbox capacity) or FQCN of the mailbox implementation

application.conf文件中关于Dispatcher的配置样例:

my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
mailbox-capacity = -1
mailbox-type =""
}

或者PinnedDispatcher的样例:

my-dispatcher {
type = PinnedDispatcher 

executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-factor = 2.0
core-pool-size-max = 10
}
throughput = 100
mailbox-capacity = -1
mailbox-type =""
}

下面是应用性能的关键参数:

  • Choice of dispatcher:基于actor执行的活动类型选择正确的调度器.根据阻塞或非阻塞操作,同质或不同质的actor类型,选择合适的调度器.
  • Choice of executor:thread pool和fork join的选择取决于应用程序的逻辑特点,通常是伴随着默认调度器或者负载均衡调度器的选择.通常来说fork join是比较常用的,因为可以有大量的任务被同时执行.
  • Number of threads (min/max) factored to the CPU cores:调度器的最大最小线程数映射到底层CPU核心数以确定应用的处理能力,定义太高的话线程会做大量的上下文切换,太少的话处理能力不能充分优化.
  • Throughput factor:用于设置一个actor一次处理邮件的数量.比如是50,actor则会处理50的消息之后返回线程池.同时其他actor将会等待线程可用以开始处理他们各自的消息.这个选项的设置取决于每种消息的处理时长.

一旦完成 application.conf中的调度器配置,应用程序需要只能哪个actor用哪种调度策略.使用正确的调度器和执行器组合构建调度策略,可以定义不同组的actor然后给不同的actor分配不同的调度策略.

val _system = ActorSystem("dispatcher", ConfigFactory.load().getConfig ("MyDispatcherExample"))
val actor = _system.actorOf(Props[MsgEchoActor].withDispatcher ("my-dispatcher"))

当定义actor的时候,Props提供了withDispatcher()方法,通过提供一个application.conf中定义的调度器名进行调用.
可以通过一个调度器配置创建一个actor的多个实例.调度器通常使用akka提供的路由功能进行连接.

Routers

当巨大数量的actor并行运行以处理进入的消息流,需要一种实体将消息从源导向目标actor,这种实体称为路由器.
在Akka中,router是一种将消息路由到其他actor的的actor,对于router来说,这些外部的actor被称为routees,router使用不同的算法集将消息路由到routees.

router-actor

Akka默认支持一下集中路由机制:

  • Round robin router:将传入消息按循环顺序路由到所有路径
  • Random router:将传入消息按随机选择路由到路径
  • Smallest mailbox router:将消息路由到邮箱中消息数量最少的路径
  • Broadcast router:将同一个消息发给所有actor
  • Scatter gather first completed router:将消息forward给所有路径成一个Future,无论哪个路径响应,立即将结果返回给调用者

Router用法

val router = system.actorOf(Props[MyActor].withRouter
  (RoundRobinRouter
  (nrOfInstances = 5)) , name = "myRouterActor")

在定义actor的时候提供一个router实例,此处是RoundRobinRouter,结构中包含了一个实例数量用于创建路由的routees.其他router类型也会以这样的方式创建.

多种不同router的创建方式:






















Router type Usage
RoundRobinRouter val router = system.actorOf(Props[MyActor].withRouter(RoundRobinRouter(nrOfInstances = 5)))
RandomRouter val router = system.actorOf(Props[MyActor].withRouter(RandomRouter(nrOfInstances = 5)))
SmallestMailbox Router val router = system.actorOf(Props[MyActor].withRouter(SmallestMailboxRouter(nrOfInstances = 5)))
BroadcastRouter val router = system.actorOf(Props[MyActor].withRouter(BroadcastRouter(nrOfInstances = 5)))
ScatterGatherFirst CompletedRouter val router = system.actorOf(Props[MyActor].withRouter(ScatterGatherFirstCompletedRouter(nrOfInstances = 5, within = 5 seconds)))

Router的application.conf配置

首先在application.conf进行配置:

MyRouterExample{
akka.actor.deployment {
/myRandomRouterActor {
router = random
nr-of-instances = 5
}
}
}

然后再ActorSystem中加载:

val _system = ActorSystem.create("RandomRouterExample",ConfigFactory.load()
  .getConfig("MyRouterExample"))

val randomRouter = _system.actorOf(Props[MsgEchoActor].
  withRouter(FromConfig()), name = "myRandomRouterActor")