Simple Akka: Cluster

简介

Akka提供一个高容错,基于点对点的权利分散的集群成员关系,不会出现单点错误或单点瓶颈.基于gossip协议和一个自动的错误检查器实现.

Gossip协议

Gossip算法又被称为反熵(Anti-Entropy),熵物理学上的一种概念,代表杂乱无章,而反熵就是在杂乱无章中寻求一致,这充分说明了Gossip的特点: 在一个有界网络中,每个节点随机的与其他节点通信,经过一番杂乱无章的通信,最终所有的节点都会达成一致.每个节点都可能知道其他节点,也可能只知道一个邻居节点,只要这些节点可以通过网络连通,最终他们的状态都是一致的,当然这也是疫情传播的特点.

要注意的一点是,即使所有的节点因宕机而重启,有新节点加入,但经过一段时间后,这些节点的状态也会与其他节点达成一致,也就是说,Gossip天然具有分布式容错的特点.

Gossip是一个带有冗余的容错算法,更进一步,Gossip是一个最终一致算法.虽然无法保证在某个时刻所有节点状态一致,但可以保证最终所有节点一致,最终是一个现实中存在,但理论上无法证明的时间点.

因为Gossip不要求节点知道所有其他节点,因此又具有去中心化的特点,节点之间完全对等,不需要任何的中心节点.实际上Gossip可以用于众多能接受最终一致性的领域: 失败检测,路由同步,Pub/Sub,动态负载均衡等.

但Gossip的缺点也很明显,冗余通信会对网路带宽和CPU资源造成很大的负载,而这些负载有受限于通信频率,该频率有影响着算法收敛的速度.

术语

  1. node: 集群的一个逻辑成员,在一个物理机上可以有多个节点.使用一个hostname:port:uid的元组进行定义.
  2. cluster: 一组节点通过成员关系(membership)服务组合在一起构成一个集群.
  3. leader: 集群中有一个单独的节点作为一个领导者leader,管理集群集合和成员关系状态变更.

成员关系

一个集群由多个成员节点组成.每个节点的表示是一个hostname:port:uid的元组.一个Akka应用可以在每个节点host应用的一部分来将整个应用分布的整个集群.成员关系和应用中运行在各节点上的actor是解耦的.一个节点不运行任何actor也可以成为集群的成员.通过向集群中的一个节点发起Join命令来加入集群.

节点标识内部会包含一个UID来独特的标识出在一个hostname:port上的actor系统实例.Akka通过这个UID来触发可靠的远程死亡监控(death watch).这表示一个actor系统在被移出集群之前不能再次被加入到集群中.如果想以相同的hostname:port将一个actor系统加入到集群首先需要关闭该actor系统,然后重新以同样的hostname:port进行加入,这时会受到一个不同的UID.

集群关系状态是一个专业的CRDT(一种最终一致性理论),他有一个无变化的合并功能.当在多个节点上同时发生多个改变,更新总是会合并并归一到一个相同的最终结果.

Gossip

Akka中使用的成员关系是基于Amazon的Dynamo系统,特别接近于Basho的Riak分布式数据库.成员关系公国Gossip协议进行通信.

向量时钟(Vector Clocks)

Vector Clocks是一个数据结构和算法,用于在分布式系统中生成局部排序的时间和因果错误的探测.

在Akka中用于促使不同集群状态的一致并进行合并.一个Vector Clocks是一个(node, counter)对,每次更新集群状态都会同时更新向量时钟.

Gossip Convergence

Failure Detector

Leader

Seed Nodes

Gossip Protocol

Membership Lifecycle

成员状态

  1. joining: 加入集群时一个短暂的状态
  2. weakly up: 当网络被隔开时一个短暂的状态(akka.cluster.allow-weakly-up-members=on)
  3. up: 正常的运行状态
  4. leaving / exiting: 被优雅的移除时的状态
  5. down: 被标记为down,不再是集群的一部分
  6. removed: 死亡状态,不再是一个成员

用户动作

  1. join: 将一个单节点加入到集群,如果在配置文件中指定的话会在开启时自动加入
  2. leave: 使一个节点优雅的离开集群
  3. down: 将一个节点标记为down

Leader动作

  1. 改变一个节点加入或离开集群:

    1. joining -> up
    2. exiting -> removed
  2. fd*

  3. unreachable*

添加依赖

Akka集群是一个单独的jar文件:

"com.typesafe.akka" %% "akka-cluster" % "2.4.2"

一个简单的集群例子

下面的配置文件用于激活集群扩展:

// application.conf

akka {
  actor {                                           // 设置actor引用类型
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {                                          // 激活remote配置
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"                        // 本地IP
      port = 0
    }
  }

  cluster {                                         // 集群节点信息,这里有两个节点
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
  }
}

# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off

# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native

然后创建一个actor来使用集群扩展:

package sample.cluster.simple

import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.ActorLogging
import akka.actor.Actor

class SimpleClusterListener extends Actor with ActorLogging {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, re-subscribe when restart 
  override def preStart(): Unit = {
    //#subscribe
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
      classOf[MemberEvent], classOf[UnreachableMember])
    //#subscribe
  }
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)
    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)
    case MemberRemoved(member, previousStatus) =>
      log.info("Member is Removed: {} after {}",
        member.address, previousStatus)
    case _: MemberEvent => // ignore
  }
}

这个actor把自己注册为集群事件的订阅者.当订阅开始时它会收到当前集群中状态改变的事件.

向集群中添加一个节点

当一个节点启动时,它会发送一个消息到所有节点,然后向返回消息的节点发送一个加入命令.如果其中一个节点回复后(或者其他节点还没有启动),它会一直尝试这个流程,知道成功或者自己被关闭.

在配置文件application.conf中定义所有集群中的节点信息:

akka.cluster.seed-nodes = [
  "akka.tcp://ClusterSystem@host1:2552",
  "akka.tcp://ClusterSystem@host2:2552"]

这些节点可以以然和顺序启动并且不用全部都启动.但是当最初启动一个集群时,列表中的第一个节点必须被首先启动,否则其他的节点不会被初始化,也不会被加入到集群中.当启动两个节点以上时第一个节点再被关闭就没有问题了.如果第一个节点被重启,它会首先尝试加入到集群中其他已有的节点.

如果不配置节点信息的话则需要以编程的方式或者手动的方式提供节点信息.

自动或手动关闭

默认情况下必须以手动的方式(命令行)关闭集群,或者可以在配置文件中设置为自动方式:

akka.cluster.auto-down-unreachable-after = 120s

这表示集群管理员会自定把连接超时的节点从unreachable状态设置为down状态.

Leaving

有两种方式从集群中移除一个节点.

首先可以关闭actor系统.

另一种优雅的方式是告诉集群一个节点需要离开,可以通过命令行工具,或者使用编程的方式:

val cluster = Cluster(system)
cluster.leave(cluster.selfAddress)

WeaklyUp Members(实验性,仅支持2.4以上的版本)

当一个节点变为unreachable时,我们仍然希望它能够再次激活,这个特性默认是关闭的,可以通过配置文件激活:

akka.cluster.allow-weakly-up-members = on

激活这个特性后,如果当时没有gossip convergence,Joining状态的成员会变成WeaklyUp同时成为集群的一部分,一旦gossip convergence到达,管理员就会把WeaklyUp的成员改为Up状态.

订阅集群事件

使用Cluster(system).subscribe可以订阅集群改变的提醒.

cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])

akka.cluster.ClusterEvent.CurrentClusterState,集群完整状态的快照,会作为第一条消息发送到订阅者,后续的时间会发送新的更新.

如果你在join流程完成之前开启订阅,还没有任何成员,这时会受到一个空的CurrentClusterState.

如果不想接收CurrentClusterState,可以使用ClusterEvent.InitialStateAsEvents参数进行订阅,这会发送整个集群中真正作用到集群的过往消息.

cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
  classOf[MemberEvent], classOf[UnreachableMember])

整个生命周期的轨迹事件包括:

  1. ClusterEvent.MemberJoined
  2. ClusterEvent.MemberUp
  3. ClusterEvent.MemberExited
  4. ClusterEvent.MemberRemoved
  5. ClusterEvent.UnreachableMember
  6. ClusterEvent.ReachableMember

还有更多别的事件,详细参考API.如果只想获取成员关系状态可以使用Cluster(system).state.

Worker Dial-in Example

这个样例应用用于转换一个文本.当一个文本发送到前端服务时,它会被分发到一个后端工作节点来完成转换工作,然后在把结果返回给原始的客户端.一个新的前端或者后端节点可以动态的向集群添加或移除.

用到的消息:

final case class TransformationJob(text: String)
final case class TransformationResult(text: String)
final case class JobFailed(reason: String, job: TransformationJob)
case object BackendRegistration

完成转换工作的后端工作节点:

class TransformationBackend extends Actor {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  // re-subscribe when restart
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
    case state: CurrentClusterState =>
      state.members.filter(_.status == MemberStatus.Up) foreach register
    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))     // 检查节点角色
      context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
        BackendRegistration
}

注意这个TransformationBackendactor订阅了集群事件用于检测新的前端节点,然后给他发送一个注册消息以告诉他们可以使用后端的工作节点了.

前端节点接收转换工作并委派到以注册的后端工作节点:

class TransformationFrontend extends Actor {

  var backends = IndexedSeq.empty[ActorRef]
  var jobCounter = 0

  def receive = {
    case job: TransformationJob if backends.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later", job)

    case job: TransformationJob =>
      jobCounter += 1
      backends(jobCounter % backends.size) forward job

    case BackendRegistration if !backends.contains(sender()) =>
      context watch sender()
      backends = backends :+ sender()

    case Terminated(a) =>
      backends = backends.filterNot(_ == a)
  }
}

注意前端节点使用watch监控了已注册的节点,以便他们关闭时会发来Terminated消息从而将该关闭的节点从可用节点列表中移除.

节点角色

并不是所有的节点都需要提供所有的功能: 一些用于运行web前端,一些运行数据库访问层,一些用于计数.部署actor时提供角色信息可以将节点的角色归档以进行职责分配.

节点角色在配置文件的akka.cluster.roles属性中定义,通常定义在系统启动脚本或环境变量中.

节点角色同样是MemberEvent成员关系信息的一部分,可以进行订阅.

How To Startup when Cluster Size Reached

一个普遍的用例是当集群初始化后,成员已经加入,并且集群已经到达一个值时启动actor.

可以通过配置文件设置一个值,来限制管理员将Joining状态的节点改变为Up时需要的最小成员数量:

akka.cluster.min-nr-of-members = 3

或者根据不同角色的数量来进行限制:

akka.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}

可以使用registerOnMemberUp回调来创建actor,当当前的成员状态变为Up时会自动被调用,集群中至少会拥和有上面定义的数量一致的成员个数:

Cluster(system) registerOnMemberUp {
  system.actorOf(Props(classOf[FactorialFrontend], upToN, true),
    name = "factorialFrontend")
}

这个回调不仅仅只可以用于创建actor.

How To Cleanup when Member is Removed

可以在一个registerOnMemberRemoved回调中处理一些清理工作,当当前的成员状态被变为Removed时或者集群关闭时,会自动被调用.比如在退出JVM时关闭actor系统.

Cluster(system).registerOnMemberRemoved {
  // exit JVM when ActorSystem has been terminated
  system.registerOnTermination(System.exit(0))
  // shut down ActorSystem
  system.terminate()

  // In case ActorSystem shutdown takes longer than 10 seconds,
  // exit the JVM forcefully anyway.
  // We must spawn a separate thread to not block current thread,
  // since that would have blocked the shutdown of the ActorSystem.
  new Thread {
    override def run(): Unit = {
      if (Try(Await.ready(system.whenTerminated, 10.seconds)).isFailure)
        System.exit(-1)
    }
  }.start()
}

Cluster Aware Routers

所有种类的路由器都可以用于集群的成员,比如部署新的路由或者查找集群节点的路由.当一个节点称为unreachable或离开了集群,该节点的路径会自动从路由器中注销.当新节点加入集群时,一个额外的路由会根据配置添加到路由器.当节点从unreachable变为reachable状态时也会添加路由.

当开启WeaklyUp特性时,该状态的节点也会被应用路由.

主要有两种不同的路由:

  1. Group - router通过actor selection将消息发送到指定的路径.一个路由可以个集群中的不同节点共享.这类路由器的一个用例就是服务运行于集群的后台节点,路由器运行于集群的前端节点.
  2. Pool - router将路由创建为子节点,并在远程节点上部署.每个路由器都会有自己的路由实例.比如,如果在一个拥有10个节点的集群中创建一个3节点的路由,如果路由器配置为每个节点一个实例的话总共会又有30个路由.被不同路由器创建的路由不会再路由器之间共享.这类路由的实例是一个单独的管理者将真正的工作路由到集群的不同节点中.

Router with Group of Routees

当使用Group时必须在集群成员节点上启动路由actor.这并不是由路由器完成的,下面是配置示例:

akka.actor.deployment {
  /statsService/workerRouter {
      router = consistent-hashing-group
      routees.paths = ["/user/statsWorker"]
      cluster {
        enabled = on
        allow-local-routees = on
        use-role = compute
      }
    }
}

注意,当启动actor系统时路由actor需要尽可能早的启动,因为当这些成员状态编程Up时路由器会立即尝试使用它们.

actor的路径被定义在routees.paths中,路由器用来选择消息将要发送的actor.消息会通过ActorSelection发送到路由,因此应该是符合预期的同样的传递语义.通过使用use-role可以限制路由对相关角色节点的查询.

max-total-nr-of-instances定义了集群中路由的总数,默认设置为一个很高的值(10000),因此当节点加入到集群是路由能够加入到路由器,可以自定义进行限制.

同时可以在代码中进行定义:

import akka.cluster.routing.ClusterRouterGroup
import akka.cluster.routing.ClusterRouterGroupSettings
import akka.routing.ConsistentHashingGroup

val workerRouter = context.actorOf(
  ClusterRouterGroup(ConsistentHashingGroup(Nil), ClusterRouterGroupSettings(
    totalInstances = 100, routeesPaths = List("/user/statsWorker"),
    allowLocalRoutees = true, useRole = Some("compute"))).props(),
  name = "workerRouter2")

Router Example with Group of Routees

这个样例提供了一个文本统计的服务,当文本发送到服务时会被拆分成单词,然后将统计每个单词长度的任务分发给工作者,就是路由器的各个路由.每个单词的计数会被返回给一个聚合器,当集齐所有结果时计算每个单词的平均数.

消息:

final case class StatsJob(text: String)
final case class StatsResult(meanWordLength: Double)
final case class JobFailed(reason: String)

用于对每个单词计数的worker:

class StatsWorker extends Actor {
  var cache = Map.empty[String, Int]
  def receive = {
    case word: String =>
      val length = cache.get(word) match {
        case Some(x) => x
        case None =>
          val x = word.length
          cache += (word -> x)
          x
      }

      sender() ! length
  }
}

接收用户文本并拆分成单词的服务,委派worker并聚合:

class StatsService extends Actor {
  // This router is used both with lookup and deploy of routees. If you
  // have a router with only lookup of routees you can use Props.empty
  // instead of Props[StatsWorker.class].
  val workerRouter = context.actorOf(FromConfig.props(Props[StatsWorker]),
    name = "workerRouter")

  def receive = {
    case StatsJob(text) if text != "" =>
      val words = text.split(" ")
      val replyTo = sender() // important to not close over sender()
      // create actor that collects replies from workers
      val aggregator = context.actorOf(Props(
        classOf[StatsAggregator], words.size, replyTo))
      words foreach { word =>
        workerRouter.tell(
          ConsistentHashableEnvelope(word, word), aggregator)
      }
  }
}

class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor {
  var results = IndexedSeq.empty[Int]
  context.setReceiveTimeout(3.seconds)

  def receive = {
    case wordCount: Int =>
      results = results :+ wordCount
      if (results.size == expectedResults) {
        val meanWordLength = results.sum.toDouble / results.size
        replyTo ! StatsResult(meanWordLength)
        context.stop(self)
      }
    case ReceiveTimeout =>
      replyTo ! JobFailed("Service unavailable, try again later")
      context.stop(self)
  }
}

注意没有任何集群的特性,只是简单的actor.

所有的节点都会启动StatsServiceStatsWorkeractor,这个场景中worker就是路由,路由器通过routees.paths配置:

akka.actor.deployment {
  /statsService/workerRouter {
    router = consistent-hashing-group
    routees.paths = ["/user/statsWorker"]
    cluster {
      enabled = on
      allow-local-routees = on
      use-role = compute
    }
  }
}

这表示用户的请求会被发送到任何节点的StatsService并会在所有节点上使用StatsWorker.

即,每个路由会在每个节点上部署,该路由会在每个节点上创建多个路由,请求会被发送到任何一个节点的路由器,然后路由器会将任务委派到任何节点上对应路由的路由.

Router with Pool of Remote Deployed Routees

配置示例:

akka.actor.deployment {
  /statsService/singleton/workerRouter {
      router = consistent-hashing-pool
      cluster {
        enabled = on
        max-nr-of-instances-per-node = 3
        allow-local-routees = on
        use-role = compute
      }
    }
}

可以通过use-role指定路由部署的节点类型.

max-total-nr-of-instances定义了集群中路由实例的总数.max-nr-of-instances-per-node定义的每个节点上的路由实例数会比总的数量要多.

同样可以使用代码定义配置:

import akka.cluster.routing.ClusterRouterPool
import akka.cluster.routing.ClusterRouterPoolSettings
import akka.routing.ConsistentHashingPool

val workerRouter = context.actorOf(
  ClusterRouterPool(ConsistentHashingPool(0), ClusterRouterPoolSettings(
    totalInstances = 100, maxInstancesPerNode = 3,
    allowLocalRoutees = false, useRole = None)).props(Props[StatsWorker]),
  name = "workerRouter3")

Router Example with Pool of Remote Deployed Routees

实例中,一个单独的,master节点来创建和部署worker.使用集群单例来实现master,ClusterSingletonManager会在每个节点中启动:

system.actorOf(ClusterSingletonManager.props(
  singletonProps = Props[StatsService],
  terminationMessage = PoisonPill,
  settings = ClusterSingletonManagerSettings(system).withRole("compute")),
  name = "statsService")

同时需要在每个节点上有一个actor来跟踪当前master并将任务委派给StatsService,通过ClusterSingletonProxy提供:

system.actorOf(ClusterSingletonProxy.props(singletonManagerPath = "/user/statsService",
  settings = ClusterSingletonProxySettings(system).withRole("compute")),
  name = "statsServiceProxy")

ClusterSingletonProxy会接收用户的文本并委派给当前的StatsService,这个单独的actor.它通过监听集群时间来在最老的节点上查找StatsService.

所有的节点都会启动ClusterSingletonProxyClusterSingletonManager,路由会这样配置:

akka.actor.deployment {
  /statsService/singleton/workerRouter {
    router = consistent-hashing-pool
    cluster {
      enabled = on
      max-nr-of-instances-per-node = 3
      allow-local-routees = on
      use-role = compute
    }
  }
}

Cluster Singleton

对于有些场景或者需求,需要在集群中运行一个单独的actor.比如:

  1. 一些集群中为了一致性结果的单点任务,或者集群中的协调动作
  2. 一个外部系统的单点入口
  3. 一个单独的主服务,多个工作服务
  4. 中央命名服务,或者路由逻辑

使用单例不应该作为第一个设计选择.它有很多缺点,比如单点瓶颈,单点错误也是需要格外注意的,不过有些特性会用于处理这些问题以保证最终在其他地方会重新启动一个单例.

集群的单例模式通过akka.cluster.singleton.ClusterSingletonManager来实现.通过一个特殊的逻辑在整个集群节点或一组节点中维持一个单例actor实例.ClusterSingletonManager是一个可能会在所有节点上启动的actor,或者在进群中通过规则特殊指定的那些节点启动.事实上,单例actor会被ClusterSingletonManager在最老的节点上通过提供一个Props创建子actor的方式启动.ClusterSingletonManager会确保整个集群中在任何时间只会有一个单例运行.

单例actor总是会根据一个特殊的规则运行在最老的节点上.最老节点的选择通过akka.cluster.Member#isOlderThan类决定.这可以通过将成员移出集群来改变.注意在交接过程中会有一个很小的时间间隔没有激活的单例可用.

集群的错误检测器会注意到最老节点的状态改变为unreachable,比如JVM崩溃,硬退出或者网络错误.这是一个新的最老节点会接管并创建一个新的单例actor.对于这些错误情况并不会有一个优雅的接管方式,但是多个激活的单例actor会妨碍更多正常的任务.一些比较偏僻的问题可以通过配置超时时间来解决.

可以通过提供的akka.cluster.singleton.ClusterSingletonProxy来访问单例actor,可以将所有的消息路由到当前的单例actor实例.这个代理会保留集群中最老节点的轨迹,通过单例的ActorRef隐式的向单例的actorSelection发送akka.actor.Identify消息,然后等待回复.如果单例没有在一定的时间内(可配置)回复的话这个过程会被周期性的执行.如果在一个周期时间内ActorRef不可用则会放弃执行,比如节点已经离开集群了.这时代理会会对发送到单例的消息进行缓冲并在单例可用时进行重发.当这个缓冲满的时候又有新消息进入,ClusterSingletonProxy会丢弃掉最老的消息,这个缓冲可以通过配置修改并且可以设置为0来关闭缓冲.

由于actor的分布式性质,消息总有可能会丢失.通常会在单例中实现一些额外的逻辑或者在客户端actor中确保消息能够最少一次被提交.

需要注意的潜在问题

这种模式看起来很能吸引人去使用,但是它也有很多缺点,下面是一个常见问题列表:

  1. 集群单例可能会很快成为一个性能瓶颈
  2. 不能信赖集群单例永不停止的可能,当运行单例的节点宕机后需要几秒钟的时间才能转移,然后单例会被移交到另一个节点
  3. 在设置了Automatic Downing模式时可能出现的网络分割情况下,会发生每个单独的就群都会去启动一个单独的单例实例

特别需要注意最后一点,在使用了集群单例模式时需要自己关注宕机的节点而不能单单依赖于auto-down的特性.Automatic Downing特性允许集群分离成多个单独的集群,这时多个集群会分别启动各自的单例.

实例

加入我们的外部系统需要一个单点实例,一个actor接收来自JMS队列的消息,并且严格要求同时只能有一个JMS消费者存在以保证消息的处理顺序.这种事情可能没人愿意设计,但这是在于外部系统集成时一个真实的情况.

进群中的每个节点都需要启动一个ClusterSingletonManager并且提供一个单例actor的Props,这里是JMS的消费者.

system.actorOf(ClusterSingletonManager.props(
  singletonProps = Props(classOf[Consumer], queue, testActor),
  terminationMessage = End,
  settings = ClusterSingletonManagerSettings(system).withRole("worker")),
  name = "consumer")

这里限制了单例只能在worker角色的节点上运行,如果不指定,则可以在所有角色的节点运行.

这里使用了一个特殊用途的terminationMessage来确保在实际关闭单例之前关闭对应的资源.但如果仅需要关闭actor的话使用PoisonPill会比terminationMessage要好.

这里展示了单例actor如何处理terminationMessage消息:

case End =>                     // 当自身收到关闭消息的时候通知队列消费者未注册
  queue ! UnregisterConsumer
case UnregistrationOk =>        // 当队列返回消息表示注销成功时才进行关闭,并设置对应的标志
  stoppedBeforeUnregistration = false
  context stop self
case Ping =>
  sender() ! Pong

通过上面给出的name,可以使用正确配置的代理通过集群中任何一个节点访问单例.

system.actorOf(ClusterSingletonProxy.props(
  singletonManagerPath = "/user/consumer",
  settings = ClusterSingletonProxySettings(system).withRole("worker")),
  name = "consumerProxy")

依赖配置

"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.2"

配置选项

下面的配置属性,在使用ActorSystem参数创建单例时通过ClusterSingletonManagerSettings读取.ClusterSingletonManagerSettingsClusterSingletonManager.props工厂方法的一个参数,如果需要,可以为每个单例配置不同的属性.

akka.cluster.singleton {
  # 单例actor的name.
  singleton-name = "singleton"

  # 为单例能够运行的节点指定一个角色,如果不提供角色,单例运行在所有节点上运行.
  role = ""

  # 当一个actor变成最老时会想上一个准备离开集群的最老的actor发送交接请求, 这个值用于设置请求的频率,
  # 直到该actor确认交接开始或者从集群中移除(+ akka.cluster.down-removal-margin).
  hand-over-retry-interval = 1s

  # The number of retries are derived from hand-over-retry-interval and
  # akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin),
  # but it will never be less than this property.
  min-number-of-hand-over-retries = 10
}

下面的配置属性,在使用ActorSystem参数创建代理时通过ClusterSingletonProxySettings读取.ClusterSingletonProxySettingsClusterSingletonProxy.props工厂方法的参数,每个单例代理可以配置不同的属性.

akka.cluster.singleton-proxy {
  # 被ClusterSingletonManager启动的单例actor的name
  singleton-name = ${akka.cluster.singleton.singleton-name}

  # 单例可以部署的集群节点角色,如果不指定则所有节点都可以部署. 
  role = ""

  # 代理用来查找单例的频率
  singleton-identification-interval = 1s

  # 单例位置未知时代理缓冲消息的数量,设置为0则关闭缓冲,当单例未知则消息直接被丢弃,最大为10000. 
  buffer-size = 1000 
}

集群中的分布式发布订阅

在不知道actor运行在哪个节点时如何向它发送消息呢? 或者如何向集群中订阅主题的actor进行广播消息呢?

该模式提供了一个中介actor,akka.cluster.pubsub.DistributedPubSubMediator,它会管理一个actor引用的登记薄,并复制整个集群中一个或一组通过指定规则标记的actor.

这个DistributedPubSubMediatoractor允许运行在集群的所有节点,或者所有通过规则指定的节点.中介者可以通过DistributedPubSub扩展或者作为一个额外的actor启动.

登记薄是最终一致的,比如有时候改变不能作用到其他节点,但是在几秒后会被完整的复制到其他所有节点.

如果集群成员带有WeaklyUp状态,并且该特性激活,则会参与到集群的发布订阅.如果发布者节点和订阅者节点在网络分割的同一边则WeaklyUp状态的节点就能收到发布者的消息.

任何一个节点都可以通过中介者发布消息到任何其他订阅的节点.

消息提交有两种模式.

Publish

这是真正的发布/订阅模式,这个模式的典型用例就是即时通信应用中的聊天室.

Actor通过命名主题进行订阅.可以作用于每个节点的多个订阅者.消息会被发送到该主题的所有订阅者.

为了效率消息会被发送到每个节点一次(通过匹配的主题),然后提交到该节点本地代理人的所有订阅者.

使用DistributedPubSubMediator.Subscribe将actor注册到本地中介者,成功的Subscribe(订阅)Unsubscribe(退订)通过DistributedPubSubMediator.SubscribeAck进行确认,通过DistributedPubSubMediator.UnsubscribeAck进行回复.确认的意思是该订阅已经注册,但是复制到其他节点仍然会花费一些时间.

通过向本地的中介者发送DistributedPubSubMediator.Publish消息来进行消息的发送.

当actor终止时会自动从订阅中移除,或者可以使用DistributedPubSubMediator.Unsubscribe进行显式的移除.

一个订阅者actor的例子:

class Subscriber extends Actor with ActorLogging {
  import DistributedPubSubMediator.{ Subscribe, SubscribeAck }
  val mediator = DistributedPubSub(context.system).mediator     // 获取本地中介者引用
  mediator ! Subscribe("content", self)                         // 订阅"content"主题

  def receive = {
    case s: String => log.info("Got {}", s)
    case SubscribeAck(Subscribe("content", None, `self`)) => log.info("subscribing");
  }
}

订阅者actor可以在集群中的多个节点启动,都会受到发布到content主题的消息:

runOn(first) {
  system.actorOf(Props[Subscriber], "subscriber1")
}
runOn(second) {
  system.actorOf(Props[Subscriber], "subscriber2")
  system.actorOf(Props[Subscriber], "subscriber3")
}

主题content的发布者actor例子:

class Publisher extends Actor {
  import DistributedPubSubMediator.Publish
  // activate the extension
  val mediator = DistributedPubSub(context.system).mediator

  def receive = {
    case in: String =>
      val out = in.toUpperCase
      mediator ! Publish("content", out)
  }
}

可以从集群中的任何节点想主题发布消息:

runOn(third) {
  val publisher = system.actorOf(Props[Publisher], "publisher")
  later()
  // after a while the subscriptions are replicated
  publisher ! "hello"
}

Actor同时可以使用一个可选的属性(group)对命名主题进行订阅.如果通过组名(group name)订阅,任何发布到主题中并且标记为sendOneMessageToEachGroup的消息会通过提供的RoutingLogic(默认为random)会被发送到该订阅组中的其中一个actor,如果所有的订阅者都拥有相同的组名,这是工作方式就类似于Send并且所有的消息都会被提交给一个actor.同样,如果每个actor都拥有不同的组名,则工作方式就跟普通的Publish一样,所有的消息都会被广播的所有的订阅者.

Send

这是一个点对点(point-to-point)的模式,每个消息都被提交到一个目的地,但是同样不需要知道目的地的位置.这个模式的经典用例是及时通信应用中的用户私聊.同样可以用于分布任务到已注册的工作者,类似一个集群路由器并且路由可以对自己进行动态注册.

如果没有任何注册的话,消息会通过一个路径匹配提交到授权者.如果由于已经在多个节点上注册从何多个实体匹配到了这个路径,则消息会通过提供的RoutingLogic(默认Random)提交到其中一个目的地.消息的sender()可以指定本地关系优先.如果可能的话,消息会被发送到本地的同一个actor系统中,就像之前用过的中介者,否则会路由到其他匹配的实体.

通过DistributedPubSubMediator.Put将actor注册到本地中介者.Put方法中的ActorRef必须和中介者在同一个本地的actor系统.不带地址信息的路径是用来发送消息的键(key).一个节点中只能存在一个这个路径的actor,因为同一个本地actor系统中路径是唯一的.

通过向本地中介者发送带有目的actor路径(不带地址信息)的DistributedPubSubMediator.Send消息来进行消息的发送.

Actor终止时会自动从订阅中移除,或者通过DistributedPubSubMediator.Remove显式的移除.

一个目的地actor的例子:

class Destination extends Actor with ActorLogging {
  import DistributedPubSubMediator.Put
  val mediator = DistributedPubSub(context.system).mediator

  mediator ! Put(self)      // 通过本地中介者注册自己的路径

  def receive = {
    case s: String => log.info("Got {}", s)
  }
}

目的地actor可以在集群中的多个节点启动,都会受到发送到该路径的消息:

runOn(first) {
  system.actorOf(Props[Destination], "destination")
}
runOn(second) {
  system.actorOf(Props[Destination], "destination")
}

向该路径发送消息的actor例子:

class Sender extends Actor {
  import DistributedPubSubMediator.Send
  // activate the extension
  val mediator = DistributedPubSub(context.system).mediator

  def receive = {
    case in: String =>
      val out = in.toUpperCase
      mediator ! Send(path = "/user/destination", msg = out, localAffinity = true)
  }
}

可以从集群的任何节点发送消息:

runOn(third) {
  val sender = system.actorOf(Props[Sender], "sender")
  later()
  // after a while the destinations are replicated
  sender ! "hello"
}

同时可以通过向本地中介者发送Put.SendDistributedPubSubMediator.SendToAll消息,将消息发送到所有注册到匹配路径的目的地actor.带有相同路径不带地址信息的actor可以注册到不同的节点,每个节点上只能存在一个这样的节点,因为本地actor系统中路径是唯一的.

这个模式的经典用例是讲消息广播到所有拥有相同路径的actor.比如不同的节点都执行相同的动作.同时可以指定一个特殊的属性(allButSelf)来决定是否将消息发送到自身节点中匹配的路径.

DistributedPubSub Extension

上面的例子中,中介者使用akka.cluster.pubsub.DistributedPubSub扩展来创建和访问.在很多场景中这是方便理想的方式,同时需要了解的是可以创建一个单独的actor来作为中介者,或者同时创建多个不同的中介者来大量的actor或主题分割到不同的中介者.比如可以为不同的中介者使用不同的集群节点角色.

DistributedPubSub扩展可以通过下面的配置文件来设置属性:

# Settings for the DistributedPubSub extension
akka.cluster.pub-sub {
  # 中介者的actor名字name, /system/distributedPubSubMediator
  name = distributedPubSubMediator

  # 在那种角色的节点上穿件这个中介者actor,不设置则在所有节点创建
  role = ""

  # `Send`的路由方法: random, round-robin, broadcast
  routing-logic = random

  # DistributedPubSubMediator发送gossip信息的频率
  gossip-interval = 1s

  # Removed entries are pruned after this duration
  removed-time-to-live = 120s

  # Maximum number of elements to transfer in one message when synchronizing the registries.
  # Next chunk will be transferred in next round of gossip.
  max-delta-elements = 3000

  # The id of the dispatcher to use for DistributedPubSubMediator actors. 
  # If not specified default dispatcher is used.
  # If specified you need to define the settings of the actual dispatcher.
  use-dispatcher = ""
}

推荐的做法是通过定义akka.extensions属性以在actor系统启动是加载扩展.否则会在第一次启动时加载,这通常会需要一点时间.

akka.extensions = ["akka.cluster.pubsub.DistributedPubSub"]

依赖

"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.2"

Cluster Client

集群之外的actor系统可以通过ClusterClient与集群进行通信,这个客户端可以属于另外一个集群.它只需要知道一个或其中几个节点作为最初的联系点.它会创建一个与集群中ClusterReceptionist的连接,并且监控连接,如果连接丢失则会重新进行创建.当寻找新的连接点时会刷新上一个连接,或者周期性的刷新连接,最初的连接点并不是必须的.

注意: ClusterClient不能用于向运行于集群本身的actor发送消息.如果需要,可以使用分布式的订阅发布来向当前集群的actor发送消息.

注意在使用ClusterClient时同样需要将Akka的akka.actor.providerakka.actor.LocalActorRefProvider改变为akka.remote.RemoteActorRefProvider or akka.cluster.ClusterActorRefProvider.

连接点可以在所有节点上启动,或者通过指定一个规则在部分节点上启动.通过ClusterClientReceptionist扩展启动,或者作为一个单独的actor启动.

可以通过ClusterClient向集群中使用ClusterReceptionist注册到DistributedPubSubMediator的actor发送消息.ClusterClientReceptionist为已注册的actor提供了一个方法用于和客户端联系.消息通过ClusterClient.Send,ClusterClient.SendToAllClusterClient.Publish封装.

  1. ClusterClient.Send: 消息会通过匹配的路径提交给一个存在的接收者,如果同时有多个匹配路径的实体会通过随机的方式提交到目的地.消息的sender()可以指定本地优先.

  2. ClusterClient.SendToAll: 消息会被发送到所有匹配路径的目的地.

  3. ClusterClient.Publish: 消息会被发送到所有通过主题名注册为订阅者的目的地.

从目的地actor回复的消息会被接待者以隧道的方式转接,以避免其他集群的节点对连接的绑定.比如,被目的地actor看到的sender()并不是这个客户端本身.被客户端看到的发送响应消息的sneder()会收到deadLetters,因为客户端通常会通过ClusterClient发送后续的消息.如果客户端需要立即与集群中的一个actor通信时,同样可以在回复消息中传入原始的sender.

当与目的地之间建立联系时,ClusterClient会对消息进行缓冲然后当连接建立完成后将消息发送.缓冲满的时候ClusterClient再收到新的消息则会将最老的消息丢弃.或者将缓冲设置为0来关闭缓冲功能.

实例

首先在集群中启动接待员.推荐的做法是通过在配置文件中定义akka.extensions属性在actor系统启动时加载扩展.

akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]

然后将客户端需要连接的actor进行注册:

runOn(host1) {
  val serviceA = system.actorOf(Props[Service], "serviceA")
  ClusterClientReceptionist(system).registerService(serviceA)
}

runOn(host2, host3) {
  val serviceB = system.actorOf(Props[Service], "serviceB")
  ClusterClientReceptionist(system).registerService(serviceB)
}

在客户端可以创建一个ClusterClientactor作为网关来向集群中被路径标识的actor发送消息.

runOn(client) {
  val c = system.actorOf(ClusterClient.props(
    ClusterClientSettings(system).withInitialContacts(initialContacts)), "client")
  c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true)
  c ! ClusterClient.SendToAll("/user/serviceB", "hi")
}

上面的initialContacts参数是一个Set[ActorPath],可以按如下的方式创建:

val initialContacts = Set(
  ActorPath.fromString("akka.tcp://OtherSys@host1:2552/system/receptionist"),
  ActorPath.fromString("akka.tcp://OtherSys@host2:2552/system/receptionist"))
val settings = ClusterClientSettings(system)
  .withInitialContacts(initialContacts)

或者在配置文件中或系统变量中定义.

ClusterClientReceptionist Extension

上面的例子中接待者使用akka.cluster.client.ClusterClientReceptionist扩展创建和访问.这种方式很方便,通知支持启动一个单独的akka.cluster.client.ClusterReceptionistactor,并且支持同时存在多个接待者以服务不同类型的客户端.

注意ClusterClientReceptionist同时使用了DistributedPubSub扩展,已经在分布式发布订阅集群的部分进行了介绍.

依赖

"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.2"

配置

ClusterClientReceptionist扩展可以通过如下属性进行配置:

# Settings for the ClusterClientReceptionist extension
akka.cluster.client.receptionist {
  # Actor name of the ClusterReceptionist actor, /system/receptionist
  name = receptionist

  # 用以启动接待者的节点角色,不设置则所有节点都可以启动
  role = ""

  # The receptionist will send this number of contact points to the client
  number-of-contacts = 3

  # The actor that tunnel response messages to the client will be stopped
  # after this time of inactivity.
  response-tunnel-receive-timeout = 30s

  # The id of the dispatcher to use for ClusterReceptionist actors. 
  # If not specified default dispatcher is used.
  # If specified you need to define the settings of the actual dispatcher.
  use-dispatcher = ""

}

下面的配置属性在使用参数创建ActorSystem时由ClusterClientSettings读取,或者可以通过ClusterClientSettings在读取时按上面的格式设置其他的配置文件.

# Settings for the ClusterClient
akka.cluster.client {
  # Actor paths of the ClusterReceptionist actors on the servers (cluster nodes)
  # that the client will try to contact initially. It is mandatory to specify
  # at least one initial contact. 
  # Comma separated full actor paths defined by a string on the form of
  # "akka.tcp://system@hostname:port/system/receptionist"
  initial-contacts = []

  # Interval at which the client retries to establish contact with one of 
  # ClusterReceptionist on the servers (cluster nodes)
  establishing-get-contacts-interval = 3s

  # Interval at which the client will ask the ClusterReceptionist for
  # new contact points to be used for next reconnect.
  refresh-contacts-interval = 60s

  # How often failure detection heartbeat messages should be sent
  heartbeat-interval = 2s

  # Number of potentially lost/delayed heartbeats that will be
  # accepted before considering it to be an anomaly.
  # The ClusterClient is using the akka.remote.DeadlineFailureDetector, which
  # will trigger if there are no heartbeats within the duration 
  # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with
  # the default settings.
  acceptable-heartbeat-pause = 13s

  # If connection to the receptionist is not established the client will buffer
  # this number of messages and deliver them the connection is established.
  # When the buffer is full old messages will be dropped when new messages are sent
  # via the client. Use 0 to disable buffering, i.e. messages will be dropped
  # immediately if the location of the singleton is unknown.
  # Maximum allowed buffer size is 10000.
  buffer-size = 1000

  # If connection to the receiptionist is lost and the client has not been
  # able to acquire a new connection for this long the client will stop itself.
  # This duration makes it possible to watch the cluster client and react on a more permanent
  # loss of connection with the cluster, for example by accessing some kind of
  # service registry for an updated set of initial contacts to start a new cluster client with.
  # If this is not wanted it can be set to "off" to disable the timeout and retry
  # forever.
  reconnect-timeout = off
}