Simple Akka: Persistence

简介

Akka的persistence(以下使用”持久”)支持有状态的actor来保持他们的内部状态,因此他们可以在一个actor启动,在JVM崩溃后重启,或被监管者重启,或者合并进一个集群时进行恢复.Akka persistence背后的核心概念是,只会改变一个actor中已被留存的内部状态而不是当前的状态(除了可选的快照).这些改变从来只会添加到存储中,没有东西是可以改变的,这些特性支持高级别的事务和高效的复制集.

有状态的actor通过替换已存储的改变进行恢复,可以重建他们的内部状态.可以是一个完整的改变历史,或者是从其中一个快照以减少恢复时间.同时提供了一个点到点的,带有最少一次提交的通信方式.

Akka persistence的灵感来自于eventsourced库,保留了同样的架构个概念,只是在API层和实现层有所不同.

依赖

Akka persistence是一个单独的jar文件.确认在配置文件中添加了如下依赖:

"com.typesafe.akka" %% "akka-persistence" % "2.4.2"

Akka persistence扩展自带了一些内建的持久(persistence)插件,包括基于内存堆的日志,基于本地文件系统的快照保存,基于LevelDB的日志.

基于LevelDB的插件需要以下额外的依赖声明:

"org.iq80.leveldb"            % "leveldb"          % "0.7"
"org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8"

架构

  1. PersistentActor: 是一个持久的,有状态的actor,它可以将事件持久到日志,并且以线程安全的方式进行操作.它可以用于实现基于命令(command)或事件驱动(event sourced)的actor.当一个持久actor启动或重启时,被日志记录的消息会对actor进行重播以根据这些消息回复内部状态.

  2. PersistentView: 一个试图是一个持久有状态的actor,用以接收那些被其他持久actor写入的消息.一个试图本身并不会日志新的消息,它仅仅通过其他持久actor的复制消息流来更新内部状态.

  3. AtLeastOnceDelivery: 发送消息到目的地时带有一个最少一次到达的语义,即最少发送到一次.包括发送端和接收端的JVM崩溃.

  4. AsyncWriteJournal: 一个journal(日志器)将发送到持久actor的消息进行存储.应用可以对需要日志的消息进行控制.日志器会保持highestSequenceNr随着每个消息进行递增.日志器的存储后台是一插件化的.持久 扩展了一个基于LevelDB的插件,会将日志写入到文件系统.复制集日志器在社区插件中可以找到.

  5. Snapshot store: 一个快照存储会保存一个持久actor的快照,或者一个试图的内部状态.这些快照用于优化恢复时间.存储的后台同样是插件化的.

事件驱动

事件驱动背后的想法十分简单.当一个持久actor接收到一个(非持久的)命令时首先会验证它是否能作用到当前状态上.这里说的验证可以是任何方式,比如,通过检查一个命令消息的字段与服务端的外部服务进行沟通.如果验证成功,会从命令生成一个时间,用于描述命令的作用.这些时间会被持久存储,成功持久后用于改变actor的状态.当这个持久actor需要被恢复的时候,只有这些被持久的事件会进行重播,他们会重新被成功应用.另外,相对于命令来说,这些被重播到持久actor的消息不会失败.事件驱动的actor同样可以处理那些不会改变actor状态的命令,比如查询命令.

Akka的持久使用PersistentActor特质支持事件驱动,一个actor继承这个特质后使用persist方法来持久和处理事件.PersistentActor的行为通过实现receiveRecoverreceiveCommand来定义.下面是一个例子:

import akka.actor._
import akka.persistence._

case class Cmd(data: String)
case class Evt(data: String)

case class ExampleState(events: List[String] = Nil) {
  def updated(evt: Evt): ExampleState = copy(evt.data :: events)
  def size: Int = events.length
  override def toString: String = events.reverse.toString
}

class ExamplePersistentActor extends PersistentActor {
  override def persistenceId = "sample-id-1"

  var state = ExampleState()

  def updateState(event: Evt): Unit =
    state = state.updated(event)

  def numEvents =
    state.size

  val receiveRecover: Receive = {
    case evt: Evt                                 => updateState(evt)
    case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
  }

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(Evt(s"${data}-${numEvents}"))(updateState)
      persist(Evt(s"${data}-${numEvents + 1}")) { event =>
        updateState(event)
        context.system.eventStream.publish(event)
      }
    case "snap"  => saveSnapshot(state)
    case "print" => println(state)
  }

}

这个例子定义了两种数据类型,CmdEvt分别用于表示命令和事件,ExamplePersistentActor的状态是一个被持久事件数据的列表保存在ExampleState中.

这个持久actor的receiveRecover方法定义了状态是如何通过处理EvtSnapshotOffer消息进行恢复的.receiveCommand方法是一个命令处理器.在这个例子中,一个命令会被处理为生成两个事件,然后一个持久,一个被处理.事件的存储通过调用persist方法进行,第一参数为事件或事件的序列,第二个参数为事件处理器.

persist方法会异步的持久事件,事件处理器会执行那些被持久成功的事件.成功持久的事件会会以一个特殊消息的方式重新发送给持久actor以触发事件处理器对事件进行处理.一个事件处理器可以覆盖持久actor的状态并改变它.被持久事件的发送者与命令的发送者是一致的.这允许事件处理器可以向发送者回复一个命令.

事件处理器的职责是使用事件数据改变持久actor的状态,并通过发布事件来通知其他成员状态的成功改变.

当通过persist持久实现的时候,会保证持久actor不会在持久调用和相关的处理器执行期间接受将来的其他命令.这同时包括同一个上下文中对一个命令的多次持久调用.传入的消息在persist完成之前会被储存.

如果一个事件的存储失败了,onPersistFailure会被唤起(默认是打印错误日志),这个actor会被无条件的终止.如果一个事件的持久在存储是被拒绝,比如序列化错误,onPersistRejected会被唤起(默认是打印错误日志),然后actor继续处理下一条消息.

标识

一个持久actor必须拥有一个标识,并且不能在actor转化之间改变.必须通过persistenceId方法定义:

override def persistenceId = "my-stable-persistence-id"

恢复

默认情况下,一个持久actor会在启动和重启时通过记录的日志自动恢复.新的消息发送给持久actor不会影响到恢复过程中重播的消息.它们会被缓存,然后在恢复阶段完成时会被持久actor收到.

自定义恢复

应用可以通过在PersistentActor的recovery方法中返回一个自定义的Recovery对象来定义恢复的执行方式.

override def recovery = Recovery(toSequenceNr = 457L)

通过将recovery方法设置为Recovery.none()可以将恢复关闭.

恢复状态

一个持久actor可以通过方法查询自身的恢复状态:

def recoveryRunning: Boolean
def recoveryFinished: Boolean

有时候需要在恢复完成后并在处理其他消息之前执行一些额外的初始化工作.持久化actor会在完成恢复并在接收其他消息之前收到一个特殊的RecoveryCompleted消息:

override def receiveRecover: Receive = {
  case RecoveryCompleted =>
  // perform init after recovery, before any other messages
  //...这里可以定义一些恢复后的初始化工作
  case evt               => //...
}

override def receiveCommand: Receive = {
  case msg => //...
}

如果从日志恢复状态的过程中出现错误,onRecoveryFailure会被调用(默认打印日志),然后actor会被停止.

内部存储

持久actor有一个内部的存储,用于在恢复过程或者persist\persistAll方法持久事件期间缓存传入的消息.当然可以使用继承的存储或者根据需要创建一个或多个别的存储.