创建一个应用
第一个akka应用中将使用Mapreduce方法创建一个简单的单词计数器,接收一个简单的字符串格式的句子然后对单词进行计数.
么个句子将会经历如下处理流程:
- Map task:这部分被定义为从句子到单词的映射,并且过滤掉一些比如”is”,”a”的停用词,然后对实际剩下的单词进行计数,每个词的每次出现记为1,然后将整个列表传送给Reduce task;
- Reduce task:这部分被定义为对每个句子的单词列表进行归纳,检查每个单词,返现重复的单词则对该单词的计数递增1,然后传送给Aggregate task;
- Aggregate task:这部分被定义为将所有列表归纳为一个整体的列表.
应用设计
整个应用将创建三个不同的actor:Map actor,Reduce actor,Aggregate actor,为了管理这些actor的声明周期,我们额外创建一个Master actor,actor之间消息的通行流程将按如下方式进行:
- Master actor发送一个句子给Map actor;
- Map actor映射句子中的单词然后返回MapData给Master actor,Master actor将MapData发送给Reduce actor;
- Reduce actor处理MapData对单词进行归纳,归纳后的单词列表作为ReduceData通过Master actor发送给Aggregate actor;
- Aggregate actor接收到ReduceData后更新包含已完成数据的内部状态;
- Master actor发送一个Result给Aggregate actor,Aggregate actor则返回一个聚合后的列表作为响应消息.
整个开发流程:
- 定义消息的数据结构
- 定义四个actor并定义每个的计算逻辑
- 定义一个actor系统,创建actor并提供一些数据进行运算
实例
// 定义消息类
sealed trait MapReduceMessage
case class WordCount(word: String, count: Int) extends MapReduceMessage
case class MapData(dataList: ArrayBuffer[WordCount]) extends MapReduceMessage
case class ReduceData(reduceDataMap: Map[String, Int]) extends MapReduceMessage
case class Result extends MapReduceMessage
// MapActor.scala
class MapActor extends Actor {
val STOP_WORDS_LIST = List("a", "am", "an", "and", "are", "as",
"at", "be", "do", "go", "if", "in", "is", "it", "of", "on", "the", "to")
val defaultCount: Int = 1
def receive: Receive = {
case message: String =>
reduceActor ! evaluateExpression(message)
}
def evaluateExpression(line: String): MapData = MapData {
// logic to map the words in the sentences
line.split("""\s+""").foldLeft(ArrayBuffer.empty[WordCount]) {
(index, word) =>
if(!STOP_WORDS_LIST.contains(word. toLowerCase))
index += WordCount(word.toLowerCase, 1)
else
index
}
}
}
// ReduceActor.scala
class ReduceActor extends Actor {
def receive: Receive = {
case MapData(dataList) => sender ! reduce(dataList)
}
def reduce(words: IndexedSeq[WordCount]): ReduceData = ReduceData {
words.foldLeft(Map.empty[String, Int]) {
(index, words) =>
if (index contains words.word)
index + (words.word -> (index.get(words.word).get + 1))
else
index + (words.word -> 1)
 }
}
}
// AggregateActor.scala
class AggregateActor extends Actor {
val finalReducedMap = new HashMap[String, Int]
def receive: Receive = {
case ReduceData(reduceDataMap) =>
aggregateInMemoryReduce(reduceDataMap)
case Result =>
sender ! finalReducedMap.toString()
}
def aggregateInMemoryReduce(reducedList: Map[String, Int]): Unit = {
for ((key,value) <- reducedList) {
if (finalReducedMap contains key)
finalReducedMap(key) = (value + finalReducedMap.get(key).get)
else
finalReducedMap += (key -> value)
}
 }
}
// MasterActor.scala
class MasterActor extends Actor {
val mapActor = context.actorOf(Props[MapActor].withRouter(
RoundRobinRouter(nrOfInstances = 5)) , name = "map")
val reduceActor = context.actorOf(Props[ReduceActor].withRouter(
RoundRobinRouter(nrOfInstances = 5)) , name = "reduce")
val aggregateActor = context.actorOf(Props[AggregateActor], name = "aggregate")
def receive: Receive = {
case line: String => mapActor ! line
case mapData: MapData => reduceActor ! mapData
case reduceData: ReduceData =>
 }
}
// MapReduceApplication.scala
object MapReduceApplication extends App {
val _system = ActorSystem("MapReduceApp")
val master = _system.actorOf(Props[MasterActor], name = "master")
implicit val timeout = Timeout(5 seconds)
master ! "The quick brown fox tried to jump over the lazy dog and fell on the dog"
master ! "Dog is man's best friend"
master ! "Dog and Fox belong to the same family"
Thread.sleep(500)
val future = (master ? Result).mapTo[String]
val result = Await.result(future, timeout.duration) println(result)
_system.shutdown
}