Build RPC with Scala and Akka

简介

Spark中的RPC是基于Akka实现的,Akka的设计目标就是分布式,Actor之间的交互都是基于消息,并且所有动作都是异步的.

在Spark中的应用中会有需要实现RPC的功能,比如:从一个一直运行的SparkStreaming应用程序中实时获取的数据,或者更新一些变量等等,这时候,可以使用Akka的Remote Actor,将SparkStreaming应用程序作为一个Remote Actor(Server),当需要获取数据时,可以向Server发送请求消息,Server收到请求之后,需要等待Server将数据响应回来之后,才能处理并退出,这里的客户端,对于Server来说其实也是一个Remote Actor.

Server端

Server端作为一个Remote Actor,在本机的2555端口对外提供服务.

在接收到消息类型为AkkaMessage之后,将消息做一个简单的处理(在前面加上response_标识),并封装成Response消息类型,响应给请求者:

import akka.actor.{ Actor, Props }
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

object ServerActor {
    case class AkkaMessage(message:String)
    case class Response(response:String)

    def props: Props = Props(new ServerActor) 

    def main(args:Array[String]):Unit = {
        val serverSystem("server-system", ConfigFactory.parseString("""
            akka{
                actor {
                    provider = "akka.remote.RemoteActorRefProvider"
                }
                remote {
                    enabled-transports = ["akka.remote.netty.tcp"]
                    netty.tcp {
                        hostname = "127.0.0.1"
                        port = 2552
                    }
                }
            }
        """
        ))

        serverSystem.actorOf(ServerActor.props,"server-actor")
    }
}

class ServerActor extends Actor{
    override def receive:Receive = {
        case msg:AkkaMessage => {
            println(s"Server got message: ${msg.message}")
            sender ! Response(s"response_${msg.message}")
        }
        case _ => println("Unsupport Message!")
    }
}

一般akka配置文件写在application.conf中,本例便于展示.

Client端

Client端启动之后同样作为一个Remote Actor.

接收到请求者的消息类型为AkkaMessage之后,将其转发至Server端.

接收到Server端返回的Response之后,将其响应给消息的请求者.

请求者在向Client端的Actor发送一个AkkaMessage消息之后,等待响应之后,再继续发送下一个消息:

import akka.actor.{ Actor, Props }
import akka.actor.{ ActorSystem, ActorSelection }
import com.typesafe.config.ConfigFactory
import scala.concurrent.{ Await, Future }
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent._
import ServerActor._

object ClientActor {

    def props:Props = Props(new ClientActor)

    def main(args: Array[String]) : Unit = {
        val clientSystem = ActorSystem("ClientSystem", ConfigFactory.parseString("""
            akka {
                actor {
                    provider = "akka.remote.RemoteActorRefProvider"
                }
            }
        """))
        var client = clientSystem.actorOf(ClientActor.props)
        var msgs = Array[AkkaMessage](AkkaMessage("message1"),AkkaMessage("message2"),AkkaMessage("message3"),AkkaMessage("message4"))

        implicit val timeout = Timeout(3 seconds)

        msgs.foreach { x => 
            val future = client ? x
            val result = Await.result(future,timeout.duration).asInstanceOf[Response]
            println("收到的反馈: " + result)
        }

        // 异步发送模式
        //     msgs.foreach { x => 
        //       client ! x
        //     }

        clientSystem.shutdown()
    }    
}

class ClientActor extends Actor {
    //远程Actor
    var remoteActor : ActorSelection = null
    //当前Actor
    var localActor : akka.actor.ActorRef = null

    @throws[Exception](classOf[Exception])
    override def preStart(): Unit = {
        remoteActor = context.actorSelection("akka.tcp://lxw1234@127.0.0.1:2555/user/server")
        println("远程服务端地址 : " + remoteActor)
    }

    override def receive: Receive = {
        //接收到消息类型为AkkaMessage后,将消息转发至远程Actor
        case msg: AkkaMessage => {
            println("客户端发送消息 : " + msg)
            this.localActor = sender()
            remoteActor ! msg
        }
        //接收到远程Actor发送的消息类型为Response,响应
        case res: Response => {
            localActor ! res
        }
        case _ => println("客户端不支持的消息类型 .. ")
      }
}

本代码实例中的实现不够优雅,或者说是不符合Scala和Akka中的使用惯例,仅用作RPC思想的模式学习.

参考: lxw的大数据学习天地

RPC解释

RPC,Remote Procedure Call Protoco,即远程过程调用协议,它是一种通过网络从远程计算机服务上请求服务,而不需要了解底层网络技术的协议.

知乎用户 @用心阁 关于RPC的解释:

指,有两台服务器A和B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络技术来表达调用的语义和传达调用的数据.

比如一个方法的定义如下: Employee getEmployeeByName(String fullName)

  1. 首先要解决通讯问题,主要是通过客户端和服务端之间建立TCP连接,远程过程调用的所有交换的数据都在这个连接里传输.连接可以是按需连接,调用结束后就断开,也可以是长连接,多个远程过程调用公用一个连接.
  2. 要解决寻址的问题,也就是说,A服务器上应用怎么告诉底层的RPC框架,如何连接到B服务器(如IP和端口),方法的名称是什么,这样才能完成调用.比如基于WEB服务栈的RPC,就需要提供一个endpoint URI,或者使用UDDI(发现和集成服务)上查找.如果是RMI(一种基于Java的远程方法调用)调用的话,还需要一个RMI Registry来注册服务的地址.
  3. 第三,当A服务器上的应用发起远程过程调用时,方法参数需要通过底层的网络协议如TCP传递到B服务器,由于网络协议是基于二进制的,内存中的参数的值需要序列化成二进制格式,也就是序列化(Serialize)和编组(marshal),通过寻址和传输将序列化的二进制数据传递给B服务器.
  4. B服务器收到请求后,需要对参数进行反序列化,恢复为内存中的表达方式,然后找到对应的方法(寻址的一部分)进行本地调用,然后得到返回值.
  5. 返回值还要发送给A服务器上的应用,也要经过序列化的方式发送,服务器A接收到以后,再反序列化,恢复为内存中的表达式,交给A服务器上的应用.

RPC

为什么要使用RPC呢,就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成需求,比如在不同的系统间通讯,甚至不同的组织间的通讯.

RPC的协议有很多,比如最早的CORBA,Java RMI,Web Service的RPC风格,Hessian,Thrift,甚至Rest API.协议是指调用信息和答复信息,即对调用过程的处理方式的约定.

关于Netty

Netty框架不仅限于RPC,更多的是作为一种网络协议的实现框架,比如Http,TCP,由于RPC需要高效的网络通信,就可能选择Netty作为基础,最新的Spark版本已由Akka迁移到Netty.

除了网络通信,RPC还需要有比较高效的序列化框架,以及寻址方式.如果是带会话状态的RPC,还需要有会话和状态的保持功能.

大体上说,Netty就是提供一种时间驱动的,责任链式的网络协议实现方式,更多详细介绍参考<>.