简介
Spark中的RPC是基于Akka实现的,Akka的设计目标就是分布式,Actor之间的交互都是基于消息,并且所有动作都是异步的.
在Spark中的应用中会有需要实现RPC的功能,比如:从一个一直运行的SparkStreaming应用程序中实时获取的数据,或者更新一些变量等等,这时候,可以使用Akka的Remote Actor,将SparkStreaming应用程序作为一个Remote Actor(Server),当需要获取数据时,可以向Server发送请求消息,Server收到请求之后,需要等待Server将数据响应回来之后,才能处理并退出,这里的客户端,对于Server来说其实也是一个Remote Actor.
Server端
Server端作为一个Remote Actor,在本机的2555端口对外提供服务.
在接收到消息类型为AkkaMessage之后,将消息做一个简单的处理(在前面加上response_标识),并封装成Response消息类型,响应给请求者:
import akka.actor.{ Actor, Props }
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
object ServerActor {
case class AkkaMessage(message:String)
case class Response(response:String)
def props: Props = Props(new ServerActor)
def main(args:Array[String]):Unit = {
val serverSystem("server-system", ConfigFactory.parseString("""
akka{
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
}
"""
))
serverSystem.actorOf(ServerActor.props,"server-actor")
}
}
class ServerActor extends Actor{
override def receive:Receive = {
case msg:AkkaMessage => {
println(s"Server got message: ${msg.message}")
sender ! Response(s"response_${msg.message}")
}
case _ => println("Unsupport Message!")
}
}
一般akka配置文件写在application.conf中,本例便于展示.
Client端
Client端启动之后同样作为一个Remote Actor.
接收到请求者的消息类型为AkkaMessage之后,将其转发至Server端.
接收到Server端返回的Response之后,将其响应给消息的请求者.
请求者在向Client端的Actor发送一个AkkaMessage消息之后,等待响应之后,再继续发送下一个消息:
import akka.actor.{ Actor, Props }
import akka.actor.{ ActorSystem, ActorSelection }
import com.typesafe.config.ConfigFactory
import scala.concurrent.{ Await, Future }
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent._
import ServerActor._
object ClientActor {
def props:Props = Props(new ClientActor)
def main(args: Array[String]) : Unit = {
val clientSystem = ActorSystem("ClientSystem", ConfigFactory.parseString("""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
}
"""))
var client = clientSystem.actorOf(ClientActor.props)
var msgs = Array[AkkaMessage](AkkaMessage("message1"),AkkaMessage("message2"),AkkaMessage("message3"),AkkaMessage("message4"))
implicit val timeout = Timeout(3 seconds)
msgs.foreach { x =>
val future = client ? x
val result = Await.result(future,timeout.duration).asInstanceOf[Response]
println("收到的反馈: " + result)
}
// 异步发送模式
// msgs.foreach { x =>
// client ! x
// }
clientSystem.shutdown()
}
}
class ClientActor extends Actor {
//远程Actor
var remoteActor : ActorSelection = null
//当前Actor
var localActor : akka.actor.ActorRef = null
@throws[Exception](classOf[Exception])
override def preStart(): Unit = {
remoteActor = context.actorSelection("akka.tcp://lxw1234@127.0.0.1:2555/user/server")
println("远程服务端地址 : " + remoteActor)
}
override def receive: Receive = {
//接收到消息类型为AkkaMessage后,将消息转发至远程Actor
case msg: AkkaMessage => {
println("客户端发送消息 : " + msg)
this.localActor = sender()
remoteActor ! msg
}
//接收到远程Actor发送的消息类型为Response,响应
case res: Response => {
localActor ! res
}
case _ => println("客户端不支持的消息类型 .. ")
}
}
本代码实例中的实现不够优雅,或者说是不符合Scala和Akka中的使用惯例,仅用作RPC思想的模式学习.
RPC解释
RPC,Remote Procedure Call Protoco,即远程过程调用协议,它是一种通过网络从远程计算机服务上请求服务,而不需要了解底层网络技术的协议.
知乎用户 @用心阁 关于RPC的解释:
指,有两台服务器A和B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络技术来表达调用的语义和传达调用的数据.
比如一个方法的定义如下: Employee getEmployeeByName(String fullName)
- 首先要解决通讯问题,主要是通过客户端和服务端之间建立TCP连接,远程过程调用的所有交换的数据都在这个连接里传输.连接可以是按需连接,调用结束后就断开,也可以是长连接,多个远程过程调用公用一个连接.
- 要解决寻址的问题,也就是说,A服务器上应用怎么告诉底层的RPC框架,如何连接到B服务器(如IP和端口),方法的名称是什么,这样才能完成调用.比如基于WEB服务栈的RPC,就需要提供一个endpoint URI,或者使用UDDI(发现和集成服务)上查找.如果是RMI(一种基于Java的远程方法调用)调用的话,还需要一个RMI Registry来注册服务的地址.
- 第三,当A服务器上的应用发起远程过程调用时,方法参数需要通过底层的网络协议如TCP传递到B服务器,由于网络协议是基于二进制的,内存中的参数的值需要序列化成二进制格式,也就是序列化(Serialize)和编组(marshal),通过寻址和传输将序列化的二进制数据传递给B服务器.
- B服务器收到请求后,需要对参数进行反序列化,恢复为内存中的表达方式,然后找到对应的方法(寻址的一部分)进行本地调用,然后得到返回值.
- 返回值还要发送给A服务器上的应用,也要经过序列化的方式发送,服务器A接收到以后,再反序列化,恢复为内存中的表达式,交给A服务器上的应用.
为什么要使用RPC呢,就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成需求,比如在不同的系统间通讯,甚至不同的组织间的通讯.
RPC的协议有很多,比如最早的CORBA,Java RMI,Web Service的RPC风格,Hessian,Thrift,甚至Rest API.协议是指调用信息和答复信息,即对调用过程的处理方式的约定.
关于Netty
Netty框架不仅限于RPC,更多的是作为一种网络协议的实现框架,比如Http,TCP,由于RPC需要高效的网络通信,就可能选择Netty作为基础,最新的Spark版本已由Akka迁移到Netty.
除了网络通信,RPC还需要有比较高效的序列化框架,以及寻址方式.如果是带会话状态的RPC,还需要有会话和状态的保持功能.
大体上说,Netty就是提供一种时间驱动的,责任链式的网络协议实现方式,更多详细介绍参考<