Simple Akka: Akka Essentials - Starting with Akka

创建一个应用

第一个akka应用中将使用Mapreduce方法创建一个简单的单词计数器,接收一个简单的字符串格式的句子然后对单词进行计数.
么个句子将会经历如下处理流程:

  • Map task:这部分被定义为从句子到单词的映射,并且过滤掉一些比如”is”,”a”的停用词,然后对实际剩下的单词进行计数,每个词的每次出现记为1,然后将整个列表传送给Reduce task;
  • Reduce task:这部分被定义为对每个句子的单词列表进行归纳,检查每个单词,返现重复的单词则对该单词的计数递增1,然后传送给Aggregate task;
  • Aggregate task:这部分被定义为将所有列表归纳为一个整体的列表.

应用设计

整个应用将创建三个不同的actor:Map actor,Reduce actor,Aggregate actor,为了管理这些actor的声明周期,我们额外创建一个Master actor,actor之间消息的通行流程将按如下方式进行:

  • Master actor发送一个句子给Map actor;
  • Map actor映射句子中的单词然后返回MapData给Master actor,Master actor将MapData发送给Reduce actor;
  • Reduce actor处理MapData对单词进行归纳,归纳后的单词列表作为ReduceData通过Master actor发送给Aggregate actor;
  • Aggregate actor接收到ReduceData后更新包含已完成数据的内部状态;
  • Master actor发送一个Result给Aggregate actor,Aggregate actor则返回一个聚合后的列表作为响应消息.

整个开发流程:

  • 定义消息的数据结构
  • 定义四个actor并定义每个的计算逻辑
  • 定义一个actor系统,创建actor并提供一些数据进行运算

实例

// 定义消息类
sealed trait MapReduceMessage
case class WordCount(word: String, count: Int) extends MapReduceMessage
case class MapData(dataList: ArrayBuffer[WordCount]) extends MapReduceMessage
case class ReduceData(reduceDataMap: Map[String, Int]) extends MapReduceMessage
case class Result extends MapReduceMessage

// MapActor.scala
class MapActor extends Actor { 

  val STOP_WORDS_LIST = List("a", "am", "an", "and", "are", "as",
    "at", "be", "do", "go", "if", "in", "is", "it", "of", "on", "the", "to")

  val defaultCount: Int = 1

  def receive: Receive = {
    case message: String =>
      reduceActor ! evaluateExpression(message)
  }
  def evaluateExpression(line: String): MapData = MapData {
    // logic to map the words in the sentences 
    line.split("""\s+""").foldLeft(ArrayBuffer.empty[WordCount]) {
      (index, word) => 
        if(!STOP_WORDS_LIST.contains(word. toLowerCase)) 
          index += WordCount(word.toLowerCase, 1) 
        else 
          index 
    }
  }
}

// ReduceActor.scala
class ReduceActor extends Actor { 

  def receive: Receive = {
    case MapData(dataList) => sender ! reduce(dataList)
  }

  def reduce(words: IndexedSeq[WordCount]): ReduceData = ReduceData {
    words.foldLeft(Map.empty[String, Int]) { 
      (index, words) => 
        if (index contains words.word)
          index + (words.word -> (index.get(words.word).get + 1)) 
        else
          index + (words.word -> 1)
   } 
  }
}

// AggregateActor.scala
class AggregateActor extends Actor {
  val finalReducedMap = new HashMap[String, Int] 

  def receive: Receive = {
    case ReduceData(reduceDataMap) =>
      aggregateInMemoryReduce(reduceDataMap)
    case Result =>
        sender ! finalReducedMap.toString()
  }

  def aggregateInMemoryReduce(reducedList: Map[String, Int]): Unit = {
    for ((key,value) <- reducedList) { 
      if (finalReducedMap contains key)
        finalReducedMap(key) = (value + finalReducedMap.get(key).get) 
      else
        finalReducedMap += (key -> value) 
    }
 } 
}

// MasterActor.scala
class MasterActor extends Actor {
  val mapActor = context.actorOf(Props[MapActor].withRouter(
    RoundRobinRouter(nrOfInstances = 5)) , name = "map")
  val reduceActor = context.actorOf(Props[ReduceActor].withRouter(
    RoundRobinRouter(nrOfInstances = 5)) , name = "reduce") 
  val aggregateActor = context.actorOf(Props[AggregateActor], name = "aggregate")
  def receive: Receive = {
    case line: String => mapActor ! line
    case mapData: MapData => reduceActor ! mapData
    case reduceData: ReduceData =>
 }
}

// MapReduceApplication.scala
object MapReduceApplication extends App {
  val _system = ActorSystem("MapReduceApp")
  val master = _system.actorOf(Props[MasterActor], name = "master") 
  implicit val timeout = Timeout(5 seconds)

  master ! "The quick brown fox tried to jump over the lazy dog and fell on the dog"
  master ! "Dog is man's best friend"
  master ! "Dog and Fox belong to the same family"

  Thread.sleep(500)
  val future = (master ? Result).mapTo[String]
  val result = Await.result(future, timeout.duration) println(result)
  _system.shutdown
}