
6.5 Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等
Spark 1.6推出了以RpcEnv、RPCEndpoint、RPCEndpointRef为核心的新型架构下的RPC通信方式,就目前的实现而言,其底层依旧是Akka。Akka是基于Actor的分布式消息通信系统,而在Spark 1.6中封装了Akka,提供更高层的Rpc实现,目的是移除对Akka的依赖,为扩展和自定义Rpc打下基础。
Spark 2.0版本中Rpc的变化情况如下。
SPARK-6280:从Spark中删除Akka systemName。
SPARK-7995:删除AkkaRpcEnv,并从Core的依赖中删除Akka。
SPARK-7997:删除开发人员api SparkEnv.actorSystem和AkkaUtils。
RpcEnv是一个抽象类abstract class,传入SparkConf。RPC环境中[RpcEndpoint]需要注册自己的名字[RpcEnv]来接收消息。[RpcEnv]将处理消息发送到[RpcEndpointRef]或远程节点,并提供给相应的[RpcEndpoint]。[RpcEnv]未被捕获的异常,[RpcEnv]将使用[RpcCallContext.sendFailure]发送异常给发送者,如果没有这样的发送者,则记录日志NotSerializableException。
RpcEnv.scala的源码如下:

RpcCallContext.scala处理异常的方法包括reply、sendFailure、senderAddress,其中reply是给发送者发送一个信息。如果发送者是[RpcEndpoint],它的[RpcEndpoint.receive]将被调用。
其中,RpcCallContext的地址RpcAddress是一个case class,包括hostPort、toSparkURL等成员。
RpcAddress.scala的源码如下:

RpcAddress伴生对象object RpcAddress属于包org.apache.spark.rpc,fromURIString方法从String中提取出RpcAddress;fromSparkURL方法也是从String中提取出RpcAddress。说明:case class RpcAddress通过伴生对象object RpcAddress的方法调用,case class RpcAddress也有自己的方法fromURIString、fromSparkURL,而且方法fromURIString、fromSparkURL的返回值也是RpcAddress。
伴生对象RpcAddress的源码如下:

RpcEnv解析如下。
(1)RpcEnv是RPC的环境(相当于Akka中的ActorSystem),所有的RPCEndpoint都需要注册到RpcEnv实例对象中(注册的时候会指定注册的名称,这样客户端就可以通过名称查询到RpcEndpoint的RpcEndpointRef引用,从而进行通信),在RpcEndpoint接收到消息后会调用receive方法进行处理。
(2)RpcEndpoint如果接收到需要reply的消息,就会交给自己的receiveAndReply来处理(回复时是通过RpcCallContext中的reply方法来回复发送者的),如果不需要reply,就交给receive方法来处理。
(3)RpcEnvFactory是负责创建RpcEnv的,通过create方法创建RpcEnv实例对象,默认用Netty。
RpcEnv示意图如图6-4所示。

图6-4 RPCEnv示意图
回到RpcEnv.scala的源码,首先调用RpcUtils.lookupRpcTimeout(conf),返回RPC远程端点查找时默认Spark的超时时间。方法lookupRpcTimeout中构建了一个RpcTimeout,定义spark.rpc.lookupTimeout。spark.network.timeout的超时时间是120s。
RpcUtils.scala的lookupRpcTimeout方法的源码如下:

进入RpcTimeout,进行RpcTimeout关联超时的原因描述,当TimeoutException发生的时候,关于超时的额外的上下文将包含在异常消息中。
RpcTimeout.scala的源码如下:

其中的RpcTimeoutException继承自TimeoutException。

其中的TimeoutException继承自Exception。

回到RpcTimeout.scala,其中的addMessageIfTimeout方法,如果出现超时,将加入这些信息。
RpcTimeout.scala的addMessageIfTimeout的源码如下:

RpcTimeout.scala中的awaitResult方法比较关键:awaitResult一直等结果完成并获得结果,如果在指定的时间没有返回结果,就抛出异常[RpcTimeoutException]。
RpcTimeout.scala的源码如下:

其中的future是Future[T]类型,继承自Awaitable。
1. trait Future[+T] extends Awaitable[T]
Awaitable是一个trait,其中的ready方法是指Duration时间片内,Awaitable的状态变成completed状态,就是ready。在Await.result中,result本身是阻塞的。
Awaitable.scala的源码如下:

回到RpcEnv.scala中,其中endpointRef方法返回我们注册的RpcEndpoint的引用,是代理的模式。我们要使用RpcEndpoint,是通过RpcEndpointRef来使用的。Address方法是RpcEnv监听的地址;setupEndpoint方法注册时根据RpcEndpoint名称返回RpcEndpointRef。fileServer返回用于服务文件的文件服务器实例。如果RpcEnv不以服务器模式运行,可能是null值。
RpcEnv.scala的源码如下:

RpcEnv.scala中的RpcEnvFileServer方法中的RpcEnvConfig是一个case class。Spark 2.2.1版本的RpcEnvFileServer的源码如下:

Spark 2.4.3版本的RpcEnv.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第10行新增加一个参数numUsableCores。

RpcEnv是一个抽象类,其具体的子类是NettyRpcEnv。Spark 1.6版本中包括AkkaRpcEnv和NettyRpcEnv两种方式。Spark 2.0版本中只有NettyRpcEnv。
下面看一下RpcEnvFactory。RpcEnvFactory是一个工厂类,创建[RpcEnv],必须有一个空构造函数,以便可以使用反射创建。create根据具体的配置,反射出具体的实例对象。RpcEndpoint方法中定义了receiveAndReply方法和receive方法。
RpcEndpoint.scala的源码如下:

Master继承自ThreadSafeRpcEndpoint,接收消息使用receive方法和receiveAndReply方法。
其中,ThreadSafeRpcEndpoint继承自RpcEndpoint:ThreadSafeRpcEndpoint是一个trait,需要RpcEnv线程安全地发送消息给它。线程安全是指在处理下一个消息之前通过同样的[ThreadSafeRpcEndpoint]处理一条消息。换句话说,改变[ThreadSafeRpcEndpoint]的内部字段在处理下一个消息是可见的,[ThreadSafeRpcEndpoint]的字段不需要volatile或equivalent,不能保证对于不同的消息在相同的[ThreadSafeRpcEndpoint]线程中来处理。
1. private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
回到RpcEndpoint.scala,重点看一下receiveAndReply方法和receive方法。receive方法处理从[RpcEndpointRef.send]或者[RpcCallContext.reply]发过来的消息,如果收到一个不匹配的消息,[SparkException]会抛出一个异常onError。receiveAndReply方法处理从[RpcEndpointRef.ask]发过来的消息,如果收到一个不匹配的消息,[SparkException]会抛出一个异常onError。receiveAndReply方法返回PartialFunction对象。
RpcEndpoint.scala的源码如下:

在Master中,Receive方法中收到消息以后,不需要回复对方。
Master.scala的Receive方法的源码如下:

在Master中,receiveAndReply方法中收到消息以后,都要通过context.reply回复对方。
在Master中,RpcEndpoint如果接收到需要reply的消息,就会交给自己的receiveAndReply来处理(回复时是通过RpcCallContext中的reply方法来回复发送者的),如果不需要reply,就交给receive方法来处理。
RpcCallContext的源码如下:

回到RpcEndpoint.scala,RpcEnvFactory是一个trait,负责创建RpcEnv,通过create方法创建RpcEnv实例对象,默认用Netty。
RpcEndpoint.scala的源码如下:

RpcEnvFactory的create方法没有具体的实现。下面看一下RpcEnvFactory子类NettyRpcEnvFactory中create的具体实现,使用的方式为nettyEnv。
Spark 2.2.1版本的NettyRpcEnv.scala的create方法的源码如下:

Spark 2.4.3版本的NettyRpcEnv.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第9行新增加一个参数config.numUsableCores。

在Spark 2.0版本中回溯一下NettyRpcEnv的实例化过程。在SparkContext实例化时调用createSparkEnv方法。
Spark 2.2.1版本的SparkContext.scala的源码如下:

Spark 2.4.3版本的SparkContext.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第10行SparkContext.numDriverCores新增加一个参数conf。

SparkContext的createSparkEnv方法中调用了SparkEnv.createDriverEnv方法。下面看一下createDriverEnv方法的实现,其调用了create方法。
Spark 2.2.1版本的SparkEnv.scala的createDriverEnv的源码如下:

Spark 2.4.3版本的SparkEnv.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第8行port参数调整为Option(port)。

在RpcEnv.scala中,creat方法直接调用new()函数创建一个NettyRpcEnvFactory,调用NettyRpcEnvFactory().create方法,NettyRpcEnvFactory继承自RpcEnvFactory。在Spark 2.0中,RpcEnvFactory直接使用NettyRpcEnvFactory的方式。
Spark 2.2.1版本的RpcEnv.scala的源码如下:

Spark 2.4.3版本的RpcEnv.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第10行之后新增一个参数numUsableCores。
上段代码中第13行新增一个参数numUsableCores。

NettyRpcEnvFactory().create的方法如下。
Spark 2.2.1版本的NettyRpcEnv.scala的源码如下:

Spark 2.4.3版本的NettyRpcEnv.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第7行新增加一个参数config.numUsableCores。

在NettyRpcEnvFactory().create中调用new()函数创建一个NettyRpcEnv。NettyRpcEnv传入SparkConf参数,包括fileServer、startServer等方法。
Spark 2.2.1版本的NettyRpcEnv.scala的源码如下:

Spark 2.4.3版本的NettyRpcEnv.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第5行新增加一个参数numUsableCores。

NettyRpcEnv.scala的startServer中,通过transportContext.createServer创建Server,使用dispatcher.registerRpcEndpoint方法dispatcher注册RpcEndpoint。在createServer方法中调用new()函数创建一个TransportServer。
TransportContext的createServer方法的源码如下:

Spark 2.2.1版本的TransportServer.java的源码如下:

Spark 2.4.3版本的TransportServer.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第10行之后新增一行代码,增加一个布尔值变量shouldClose。
上段代码中第13行之后新增一行代码,shouldClose = false。
上段代码中第14~16行调整为finally的异常处理代码。

TransportServer.java中的关键方法是init,这是Netty本身的实现内容。
TransportServer.java中的init的源码如下:

接下来,我们看一下RpcEndpointRef。RpcEndpointRef是一个抽象类,是代理模式。
RpcEndpointRef.scala的源码如下:

NettyRpcEndpointRef是RpcEndpointRef的具体实现子类。ask方法通过调用nettyEnv.ask传递消息。RequestMessage是一个case class。
NettyRpcEnv.scala的NettyRpcEndpointRef的源码如下:

下面从实例的角度来看RPC的应用。
RpcEndpoint的生命周期:构造(constructor)–>启动(onStart)、消息接收(receive、receiveAndReply)、停止(onStop)。
Master中接收消息的方式有两种:① receive接收消息不回复;② receiveAndReply通过context.reply的方式回复消息。例如,Worker发送Master的RegisterWorker消息,当Master注册成功,Master就返回Worker RegisteredWorker消息。
Worker启动时,从生命周期的角度,Worker实例化的时候提交Master进行注册。
Worker的onStart的源码如下:

进入registerWithMaster方法。
Worker的registerWithMaster的源码如下:

进入tryRegisterAllMasters方法:在rpcEnv.setupEndpointRef中根据masterAddress、ENDPOINT_NAME名称获取RpcEndpointRef。
Worker.scala的tryRegisterAllMasters的源码如下:

基于masterEndpoint,使用sendRegisterMessageToMaster方法注册。
Worker.scala的sendRegisterMessageToMaster的源码如下:

sendRegisterMessageToMaster方法中的Worker发送RegisterWorker消息给Master以后,就完成此次注册。Master节点收到RegisterWorker消息另行处理,如果注册成功,Master就发送Worker节点成功的RegisteredWorker消息;如果注册失败,Master就发送Worker节点失败的RegisterWorkerFailed消息。
Worker.scala的handleRegisterResponse源码如下:
