Dispatchers(调度器)
真实世界中,调度器是负责消息收发的沟通协调员,在紧急服务中,调度员负责在调度中将消息传递给其他部门(医疗/警察/消费/其他)的人,调度员负责协调所有这些部门的路线和活动,以确保帮助能够尽快正确的到达目的地.
调度器作为一种模式
调度程序是java程序中公认和使用的的模式.调度程序用于控制执行流.调度器基于调度策略将传入的消息或请求路由给业务流程.作为一种模式的调度器有如下优点:
- Centralized control(集中控制):调度器提供一个消息/请求集中处理的中心.这个”集中”是指代码的重复使用,提高可维护性并减少代码重复.
- Application partitioning(应用程序分区):使业务逻辑和显式逻辑清晰分离.
- Reduced inter-dependencies(减少依赖关系):显式逻辑与业务逻辑分离意味着两者的依赖关系减少,这意味着更少的争用相同资源,从而构建一种可扩展模型.
调度员作为一种概念在程序中提供集中的控制机制,减少相互间的依赖关系从而将不同的处理逻辑分离.
Java中的Executor
在Akka中,dispatchers基于java的执行框架(一部分是java.util.concurrent),Executor提供了用于执行异步任务的框架,它基于生产者-消费者模型,这意味着被分为任务提交行为和任务执行行为.提交和执行任务是不同的线程.
两种非常重要的执行器框架:
- ThreadPoolExecutor: 使用根据配置预定义的线程池中的线程执行每个提交的任务.
- ForkJoinPool: 使用同样的线程池模型但是辅以work-stealing模式.所有在池中的线程尝试去执行其他线程创建的子任务,这样就很少有线程处于空闲状态,非常高效。
Executor被构造成可以自定义和控制任务的执行方式.使用Executor的控制结构:
- 线程池大小
- 如何在处理该任务前将任务列队
- 多少个任务可以同时执行
- 如果系统过载,被选中的任务被拒绝怎么办
- 什么样的任务执行顺序(LIFO/FIFO/other)
- 可以运行什么样的pre-和post-任务
Akka中的Dispatchers
在Akka中,调度器负责消息与映射到底层线程的Actor之间的调度.他们要确保资源进行优化,并尽可能快地处理消息.阿卡提供多个可以根据底层硬件资源(核心或可用内存的大小)和应用程序工作负载类型定制的调度策略.
如果我们把机场的例子映射到Akka,则机场跑道被映射为线程,不同组的航线被映射为包含消息的各个邮箱,控制塔则被映射为调度器.
调度器运行于线程中,将actor和来自附加邮箱的邮件和向执行者线程堆上分配。执行器线程配置和调到底层的处理器内核,用于处理消息。
Dispatchers类型
Akka中提供四种调度器框架:
- Dispatcher
- Pinned dispatcher
- Balancing dispatcher
- Calling thread dispatcher
同时提供四种邮箱:
- Unbounded mailbox
- Bounded mailbox
- Unbounded priority mailbox
- Bounded priority mailbox
Akka支持用户自己实现不同的调度器和邮箱类型.
Dispatcher
如果没有任何配置选项时,akka将使用Dispatcher作为默认的调度器.这是一种基于事件的调度器,通过BlockingQueue方法将一组actor绑定到线程池.
Dispatcher提供的特性:
- 每个actor都有自己的邮箱
- 调度器可以被任意数量的actor共享
- 同时支持thread-pool/fork-join-pool
- 针对非阻塞代码进行了优化
Pinned dispatcher
这种调度器为每个actor创建一个单一专用的线程.这种调度器适用于actor执行IO操作或长时间的运行计算.当线程处于一段时间的非活动状态后将解除与actor的绑定.
特性:
- 每个actor都有自己的邮箱
- 每个actor都有单独的线程意味着该调度器不能与其他actor共享
- 支持thread pool executor
- 调度程序为阻塞操作进行额优化,例如.如果代码使用I/O调用或数据库调用,此类行为者将等待直到完成任务.对于这种阻塞的操作,固定的调度性能优于默认调度。
Balancing dispatcher
负载均衡调度器,基于事件的将消息在繁忙和空闲的actor之间平衡调度.只能用于所有的actor都是相同类型的.这种任务的重新分配类似于ForkJoinPool中的work-stealing技术,这种调度器寻找空闲的actor然后把任务分配给他进行处理.
特性:
- 所有actor公用一个邮箱
- 这种调度器只能被同一种actor共享
- 同时支持thread-pool/fork-join-pool
Calling thread dispatcher
这种调度器主要用于测试.调度器只在当前线程上执行任务.它不会创建任何新的线程并且提供确定性的执行顺序.
特性:
- 每个actor都有各自的邮箱
- 可以被任意数量的actor共享
- 支持calling thread
邮箱类型
邮箱是根据java并发包种的queue实现的.队列特征有如下两种:
- Blocking queue:阻塞队列意味着在向队列中填充元素时等待空间可用,同时从队列检索元素时会等待队列变成非空.
- Bounded queue:界限队列意味着队列有一个最小值,添加的元素数量不能超过预期的设置值.
Akka中的队列实现都基于下面的blocking/bounded代理:
Dispatcher用法
- Thread pool executor:首先创建一个工作线程池,任务通过队列分配到线程池.如果任务数量超过线程数量,任务将会被排队直到线程池中出现可用线程,工作线程最小化分配/解锁线程的开销.
- Fork join executor:这种思路基于分治的思想,把一项大任务分解成更小的任务,让后将结果组合成最终的答案,所有的任务需要能够独立并行运行.
对于每种执行唤醒,akka都提供自定义配置以定义和构造底层资源:
- 可分配的最小线程数
- 可分配的最大线程数
- 将要使用的乘数因子(取决于CPU核心数)
比如最小线程数是3因子是2,则调度器启动6个线程,最大线程数定义了线程数上限,如果最大线程数是8,则最多可创建8 * 2 = 16个线程.
Thread pool executor
下面是Thread pool需要配置的参数:
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads
core-pool-size-min = 2
# available processors * factor
core-pool-size-factor = 2.0
# maximum number of threads
core-pool-size-max = 10
}
Fork join executor
下面是Fork join需要配置的参数:
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads
parallelism-min = 2
# available processors * factor
parallelism-factor = 2.0
# Max number of threads
parallelism-max = 10
}
根据调度器和支持的执行器的不同,还有一些其他的参数与前面的参数同时进行配置:
Parameter name | Description | Potential values |
type | 标识事件类型调度程序正在使用的名称 | Dispatcher/PinnedDispatcher/BalancingDispatcher/FQCN of a class extending MessageDispatcherConfigurato |
executor | 使用何种executor服务 | fork-join-executor/thread-pool-executor/FQCN of a class extending ExecutorServiceConfigurator |
fork-joinexecutor | fork-join-executor parameters | |
thread-poolexecutor | Rthread-pool-executor parameters | |
throughput | 线程在跳到下个线程之前当前actor能够处理的消息的最大数量 | 1 |
Rmailboxcapacity(可选) | actor队列的邮箱容量 | Negative (or zero) implies usage of an unbounded mailbox (default). A positive number implies bounded mailbox and with the specified size. |
mailbox-type | 要使用的邮箱类型 | Bounded or unbounded mailbox used if nothing is specified (dependent on mailbox capacity) or FQCN of the mailbox implementation |
application.conf文件中关于Dispatcher的配置样例:
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
mailbox-capacity = -1
mailbox-type =""
}
或者PinnedDispatcher的样例:
my-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-factor = 2.0
core-pool-size-max = 10
}
throughput = 100
mailbox-capacity = -1
mailbox-type =""
}
下面是应用性能的关键参数:
- Choice of dispatcher:基于actor执行的活动类型选择正确的调度器.根据阻塞或非阻塞操作,同质或不同质的actor类型,选择合适的调度器.
- Choice of executor:thread pool和fork join的选择取决于应用程序的逻辑特点,通常是伴随着默认调度器或者负载均衡调度器的选择.通常来说fork join是比较常用的,因为可以有大量的任务被同时执行.
- Number of threads (min/max) factored to the CPU cores:调度器的最大最小线程数映射到底层CPU核心数以确定应用的处理能力,定义太高的话线程会做大量的上下文切换,太少的话处理能力不能充分优化.
- Throughput factor:用于设置一个actor一次处理邮件的数量.比如是50,actor则会处理50的消息之后返回线程池.同时其他actor将会等待线程可用以开始处理他们各自的消息.这个选项的设置取决于每种消息的处理时长.
一旦完成 application.conf中的调度器配置,应用程序需要只能哪个actor用哪种调度策略.使用正确的调度器和执行器组合构建调度策略,可以定义不同组的actor然后给不同的actor分配不同的调度策略.
val _system = ActorSystem("dispatcher", ConfigFactory.load().getConfig ("MyDispatcherExample"))
val actor = _system.actorOf(Props[MsgEchoActor].withDispatcher ("my-dispatcher"))
当定义actor的时候,Props提供了withDispatcher()方法,通过提供一个application.conf中定义的调度器名进行调用.
可以通过一个调度器配置创建一个actor的多个实例.调度器通常使用akka提供的路由功能进行连接.
Routers
当巨大数量的actor并行运行以处理进入的消息流,需要一种实体将消息从源导向目标actor,这种实体称为路由器.
在Akka中,router是一种将消息路由到其他actor的的actor,对于router来说,这些外部的actor被称为routees,router使用不同的算法集将消息路由到routees.
Akka默认支持一下集中路由机制:
- Round robin router:将传入消息按循环顺序路由到所有路径
- Random router:将传入消息按随机选择路由到路径
- Smallest mailbox router:将消息路由到邮箱中消息数量最少的路径
- Broadcast router:将同一个消息发给所有actor
- Scatter gather first completed router:将消息forward给所有路径成一个Future,无论哪个路径响应,立即将结果返回给调用者
Router用法
val router = system.actorOf(Props[MyActor].withRouter
(RoundRobinRouter
(nrOfInstances = 5)) , name = "myRouterActor")
在定义actor的时候提供一个router实例,此处是RoundRobinRouter,结构中包含了一个实例数量用于创建路由的routees.其他router类型也会以这样的方式创建.
多种不同router的创建方式:
Router type | Usage |
RoundRobinRouter | val router = system.actorOf(Props[MyActor].withRouter(RoundRobinRouter(nrOfInstances = 5))) |
RandomRouter | val router = system.actorOf(Props[MyActor].withRouter(RandomRouter(nrOfInstances = 5))) |
SmallestMailbox Router | val router = system.actorOf(Props[MyActor].withRouter(SmallestMailboxRouter(nrOfInstances = 5))) |
BroadcastRouter | val router = system.actorOf(Props[MyActor].withRouter(BroadcastRouter(nrOfInstances = 5))) |
ScatterGatherFirst CompletedRouter | val router = system.actorOf(Props[MyActor].withRouter(ScatterGatherFirstCompletedRouter(nrOfInstances = 5, within = 5 seconds))) |
Router的application.conf配置
首先在application.conf进行配置:
MyRouterExample{
akka.actor.deployment {
/myRandomRouterActor {
router = random
nr-of-instances = 5
}
}
}
然后再ActorSystem中加载:
val _system = ActorSystem.create("RandomRouterExample",ConfigFactory.load()
.getConfig("MyRouterExample"))
val randomRouter = _system.actorOf(Props[MsgEchoActor].
withRouter(FromConfig()), name = "myRandomRouterActor")