One actor is no Actor
当系统中只有一个actor时,所能完成的工作非常有限,并不能解决并发并行的问题.
Actors需要的是多个actor互相组合完成工作:
- 一个actor只负责一项工作并且把它做好
- 然后告诉其他的actor来未它完成其他的工作
子actor通常用于工作或任务的分配
- 避免使用
actorSelection
,而是通过actor互相引用
一般只在这两种情况下才会去使用 actorSelection
, 而其他的时候应该尽量避免:
- 因为某些原因需要使用actor path中的通配符优势,比如在path中使用通配符同时选择多个actor以便于同时发送消息
- 在拥有一个远程actor的引用之前,想要与之通信
同时使用 actorSelection
的两个最佳实践:
- 尽量使用ActorRef,并通过这个ActorRef使用tell发送消息,而不是
actorSelection
,因为他类似一种UDP,发送消息时并不知道对方的状态 - 不要硬编码path,而是在一个公用类中进行维护
Structure your Actors
不同的actor之间在设计时进行层级划分以制定合理的结构,并同时设置合理的监管策略,同时,同一种类型的actor可以创建一个Pool使用合适的消息分发策略.
Name your Actors
为actor制定合理清晰的命名,而不是由系统生成base64格式的编码.
比如:
// better: but not very informative...
context.actorOf(childProps, nextFetchWorkerName) // "fetch-worker-1", "fetch-worker-2"
private var _fetchWorkers: Int = 0
private def nextFetchWorkerName: String = {
_fetchWorkers += 1
"fetch-worker-${_fetchWorkers}"
}
这会生成一个有序的actor名,便于管理或错误排查.
或者更加优雅的方式:
abstract class SeqActorName {
def next(): String
def copy(name:String):SeqActorName
}
object SeqActorName {
def apply(prefix:String) = new SeqActorNameImpl(prefix, new AtomicLong(0))
}
final class SeqActorNameImpl(val prefix:String, counter: AtomicLong) extends SeqActorName{
def next():String = prefix + '-' + counter.getAndIncremet()
def copy(newPrefix:String):SeqActorName = new SeqActorNameImpl(newPrefix, counter)
}
当然,最后的方式是根据actor的真正用途定义有意义的名字:
context.actorOf(childProps, fetcherName(videoUrl)) // "fetch-yt-MRCWy2E_Ts", ...
def fetcherName(link: Link) = link match {
case YoutubeLink(id, metadata) => s"fetch-yt-$id"
case DailyMotionLink(id, metadata) => s"fetch-dm-$id"
case VimeoLink(id, metadata) => s"fetch-vim-$id"
}
然后在监管策略中,打印出现错误的actor的名字:
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
// ... extends Actor with ActorLogging {
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
case ex: Exception =>
log.warning("Child {} failed with {}, attempting restart...", sender().path.name, // actor的名字
ex.getMessage)
Restart
}
同时注意,在使用logger时应该总是使用 {}
进行格式化,而不是 s""
的方式:
log.debug("Something heavy {} from {}", generateId, physicalAddress)
Matrix of mutalility(Pain)
各种可变性的使用效果:
Item | Imutable | Mutable |
---|---|---|
Val | 最好 | 尽量不用 |
var | 可以使用 | 绝对不能 |
比如:
val people:List[Person]
val people: mutable.ListBuffer[Person]
var people: List[Persion]
var people:mutable.ListBuffer[Person] // 不能这样使用
Blocking needs careful management
阻塞本身来说是没有好处的,因为相比于做别的事情,阻塞时什么也不做就只是在等待,浪费CPU时间.
下面的例子:
// BAD! (不要在Future中进行阻塞操作):
implicit val defaultDispatcher = system.dispatcher
val routes: Route = post {
complete {
Future { // 使用了默认的Dispatcher
Thread.sleep(5000) // will block on the default dispatcher,
System.currentTimeMillis().toString // starving the routing infra
}
}
}
我们可以单独在application.conf
中配置一个dispatcher设置供阻塞的Future使用:
// application.conf
my-blocking-dispatcher {
type = Dispatcher
executor = “thread-pool-executor"
thread-pool-executor {
// in Akka previous to 2.4.2:
core-pool-size-min = 16
core-pool-size-max = 16
max-pool-size-min = 16
max-pool-size-max = 16
// or in Akka 2.4.2+
fixed-pool-size = 16
}
throughput = 100
}
然后在导入dispatcher时进行指定:
// GOOD (due to the blocking on a dedicated dispatcher):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
val routes: Route = post {
complete {
Future { // uses the good "blocking dispatcher" that we configured,
// instead of the default dispatcher – the blocking is isolated.
Thread.sleep(5000)
System.currentTimeMillis().toString
}
}
}
在Future中使用阻塞代码时,并不意味着这段代码就不阻塞了,而是表示阻塞不会发生在当前线程,这时候要确认Future使用的线程池是否真的够用.
Never Await, for/flatMap instead
不使用 Await
,而是使用 for/flatMap
来获取Future的结果:
// ... extends Actor {
import context.dispatcher
import scala.concurrent.duration._
import scala.concurrent.Await // bad sign!
// BAD!!!
val fThings: Future[Things] = computeThings()
val t: Things = Await.result(fThings, atMost = 3.seconds)
val d: Details = Await.result(moreDetailsFor(t), atMost = 3.seconds)
// Good:
val fThingsWithDetails = for {
t <- computeThings()
d <- moreDetailsFor(t)
} yield t -> d
fThingsWithDetails foreach {
case (things, details) => // case (things: Things, details: Details) =>
println(s"$things with $details")
}
或者为Future的计算添加超时:
// adding timeout:
val timeoutFuture = akka.pattern.after(3.seconds, context.system.scheduler) {
Future.failed(new TimeoutException("My timeout details..."))
}
Future.firstCompletedOf(fThingsWithDetails :: timeoutFuture :: Nil) foreach {
case (things, details) => // case (things: Things, details: Details) =>
println(s"$things with $details")
}
Avoid Java Serialization
Akka中默认的序列化模块由Java提供,通常默认就会使用它来处理消息的序列化,因为不需要任何配置.
如果需要更高的性能或者运行一个拥有大规模节点的集群,这时就需要对序列化进行配置.
比较流行的是ProtoBuf和Kryo.ProtoBuf不容易维护但是模式比较利于扩展,Kryo则相反.
配置序列化支持:
// dependencies
"com.github.romix.akka" %% "akka-kryo-serialization" % "0.4.0"
// application.conf
extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
serializers {
java = "akka.serialization.JavaSerializer"
kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
}
akka.actor.serialization-bindings {
"com.mycompany.Example": kryo
...
}
消息通常会在服务中保持一段时间,如果消息很多则会占用大量内存,特别是默认的java序列器,如果需要易于维护的扩展方式并且兼容,使用JSON或者ProtocolBuffers都是比较推荐的,甚至是Thrift.
Trust no-one, benchmark ererything!
性能测试通常难于实行,需要选择合适的工具:
- JMH (for Scala via: ktoso/sbt-jmh)
- YourKit / JProfiler / …
- Linux perf_events
Let it Crash
Error: 通常是预期的,并且是通过编码实现的条件.比如在对输入进行验证的时候发现一个错误,这是这个错误就会同时到客户端.
Failure: 通常是一个不可预期的事件,在服务中阻止提供正常的功能.一个Failure通常会阻止当前访问,甚至是后来的所有客户端请求.
Backoff Superivision
原则是尽管让他崩溃,然后优雅的进行恢复.合理的设置监管策略,覆写父actor的supervisorStrategy来针对各种异常做出正确的处理.
在策略中支持的4中不同的处理方式:
- Resume the subordinate, keeping its accumulated internal state
- Restart the subordinate, clearing out its accumulated internal state
- Stop the subordinate permanently
- Escalate the failure, thereby failing itself
可以查看Akka文档中详细的处理方式.
Design using State Machines
针对一个actor中多种不同的处理逻辑,可以进行分类并切分,然后在不同的状态之间切换:
def receive = { case Thingy() =>
// ...
case AnotherThingy() => // ...
case DoOtherThings() => // ...
case PleaseGoAway() => // ...
case CarryOn() => // ...
case MakeSomething() => // ...
// ...
}
def receive = awaitingInstructions
def awaitingInstructions: Receive = terminationHandling orElse {
case CarryOn() => // ...
case MakeSomething(metadata) =>
// ...
context become makeThings(meta)
}
def makeThings(metadata: Metadata): Receive = terminationHandling orElse {
case Thingy() =>
// make a thingy ...
case AnotherThingy() =>
// make another thingy ...
case DoOtherThings(meta) => // ...
context become awaitingInstructions
}
def terminationHandling: Receive = {
case PleaseGoAway() =>
// ...
context stop self
}
或者在有些合适的场合使用FSM:
class Buncher extends FSM[State, Data] {
startWith(Idle, Uninitialized)
when(Idle) {
case Event(SetTarget(ref), Uninitialized) =>
stay using Todo(ref, Vector.empty)
}
// transition elided ...
when(Active, stateTimeout = 1 second) {
case Event(Flush | StateTimeout, t: Todo) =>
goto(Idle) using t.copy(queue = Vector.empty)
}
// unhandled elided ...
initialize()
}
Cluster Convergence and Joining
When a node can prove that the cluster state it is observing has been observed by all other nodes in the cluster.
如果一个节点能够观察集群的状态,它的状态同时也被其他节点观察到了.
Cluster Partitions and Down
Convergence is required for “Leader actions”, which include Join-ing and Remove-ing a node.
Down-ing can happen without convergence.
In order to guarantee consistency via “single writer principle”.
Akka Distributed Data has no need for “single writer”, it’s CRDT based. But it’s harder to model things as CRDT, so it’s a trade off.
Akka is a Tookkit
Future: Single value, no streaming by definition. Local abstraction. Execution contexts.
Strems: Mostly static processing layouts. Well typed and Back-pressured!
Akka Typed: Plain Actor’s younger brother, experimental. Location transparent, well typed.Technically unconstrained in actions performed.
Akka Actor: Location transparent.Various resilience mechanisms. (watching, persistent recovering, migration, pools)Untyped and unconstrained in actions performed.
Akka Persistence is specifically geared towards Event Sourcing.
If you know you you want a raw Key-Value Store and do not need Event Sourcing’s capabilities, don’t use Akka Persistence – it would be the wrong tool for the job.
If you use it for Event Sourcing though… it’ll work very well for you. There it is the right tool for the job.