
4.4 SchedulerBackend解析
本节讲解SchedulerBackend原理剖析、SchedulerBackend源码解析、Spark程序的注册机制、Spark程序对计算资源Executor的管理等内容。
4.4.1 SchedulerBackend原理剖析
以Spark Standalone部署方式为例,StandaloneSchedulerBackend在启动的时候构造了StandaloneAppClient实例,并在该实例start的时候启动了ClientEndpoint消息循环体,ClientEndpoint在启动的时候会向Master注册当前程序。而StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start的时候会实例化类型为DriverEndPoint(这就是程序运行时的经典的对象Driver)的消息循环体,StandaloneSchedulerBackend专门负责收集Worker上的资源信息,当ExecutorBackend启动的时候,会发送RegisteredExecutor信息向DriverEndpoint注册,此时StandaloneSchedulerBackend就掌握了当前应用程序拥有的计算资源,TaskScheduler就是通过StandaloneSchedulerBackend拥有的计算资源来具体运行Task的。
4.4.2 SchedulerBackend源码解析
StandaloneSchedulerBackend收集和分配资源给调度的Task使用。
StandaloneSchedulerBackend.scala的源码如下:

在StandaloneAppClient的start方法中调用new()函数创建一个ClientEndpoint,将在ClientEndpoint中向Master注册。
StandaloneAppClient.scala的源码如下:

4.4.3 Spark程序的注册机制
在上面的源码分析中,StandaloneAppClient在启动的时候创建了StandaloneAppClient内部类ClientEndpoint的实例对象作为消息循环体,以便向Master注册当前的Application。既然ClientEndpoint是RpcEndpoint的子类,那么就会有这样的生命周期:constructor -> onStart-> receive ->onStop。根据这个原理,我们来看ClientEndpoint的onStart方法代码。
StandaloneAppClient.scala的源码如下:

ClientEndpoint在启动时就立即调用registerWithMaster来注册Application,继续查看registerWithMaster方法代码。
StandaloneAppClient.scala的源码如下:

ClientEndpoint在tryRegiesterAllMasters方法中会向所有的Master尝试注册Application。向Master发送RegisterApplication消息。
StandaloneAppClient.scala的源码如下:

Master也是RpcEndpoint的子类,所以可以通过receive方法接收DeployMessage类型的消息RegisterApplication。
Master.scala的源码如下:

ClientEndpoint最后在receive方法中得到来自Master注册好Application的确认消息RegisteredApplication。
StandaloneAppClient.scala的源码如下:

至此,Application向Master注册完毕。在上面的RegisterApplication中,调用了schedule方法,这个方法将完成Application的调度,并在Worker节点上启动分配好的Executor给Application使用。
4.4.4 Spark程序对计算资源Executor的管理
从TaskSchedulerImpl的submitTasks的方法中我们知道,Spark Standalone部署模式调用StandaloneSchedulerBackend的reviveOffers方法进行TaskSet所需要资源的分配,得到足够的资源后,将TaskSet中的Task逐个发送到Executor去执行。下面来看这里的资源,即Executor是如何得到和分配的。
StandaloneSchedulerBackend的reviveOffers方法很简单,就是发送一个ReviveOffers消息给内部类DriverEndpoint,代码如下所示。
CoarseGrainedSchedulerBackend.scala的源码如下:

DriverEndpoint的receive方法处理ReviveOffers消息也很简单,就是调用makeOffers方法。receive方法部分关键代码如下所示。
CoarseGrainedSchedulerBackend.scala的源码如下:

DriverEndpoint的makeOffers方法首先过滤出Alive状态的Executor放到activeExecutorsHahMap变量中,然后使用id、ExecutorData.ExecutorHost、ExecutorData.freeCores构建代表Executor可用资源的WorkerOffer。然后是最重要的两个方法调用。先是调用TaskSchedulerImpl的resourceOffers得到TaskDescription的二维数组,包含Task ID、Executor ID、Task Index等Task执行需要的信息。然后回调DriverEndpoint的launchTask给每个Task对应的Executor发执行Task的LaunchTask消息(其实是由CourseGrainedExecutorBackend转发LauchTask消息)。
TaskSchedulerImpl的resourceOffers方法返回二维数组TaskDescription后作为DriverEndpoint的launchTasks方法的参数,DriverEndpoint的launchTasks方法中首先对传入的tasks进行扁平化操作(例如,将多维数组降维成一维数组),得到所有的Task,然后遍历所有的Task。在遍历过程中,调用serialize()方法对task进行序列化,得到serializedTask。判断如果serializedTask大于等于Akka帧减去Akka预留空间大小,则调用TaskSetManager的abort方法终止该任务的执行,否则将LaunchTask(new SerializableBuffer(serializedTask))消息发送到CoarseGrainedExecutorBackend。
CoarseGrainedExecutorBackend匹配到LaunchTask(data)消息后,首先调用deserialized方法,反序列化出task,然后调用Executor的lauchTask方法执行Task的处理。