3.4 解析Spark中的DAG逻辑视图
本节讲解DAG生成的机制,通过DAG,Spark可以对计算的流程进行优化;通过WordCounts的示例对DAG逻辑视图进行解析。
3.4.1 DAG生成的机制
在图论中,如果一个有向图无法从任意顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。而在Spark中,由于计算过程很多时候会有先后顺序,受制于某些任务必须比另一些任务较早执行的限制,我们必须对任务进行排队,形成一个队列的任务集合,这个队列的任务集合就是DAG图,每一个定点就是一个任务,每一条边代表一种限制约束(Spark中的依赖关系)。
通过DAG,Spark可以对计算的流程进行优化,对于数据处理,可以将在单一节点上进行的计算操作进行合并,并且计算中间数据通过内存进行高效读写,对于数据处理,需要涉及Shuffle操作的步骤划分Stage,从而使计算资源的利用更加高效和合理,减少计算资源的等待过程,减少计算中间数据读写产生的时间浪费(基于内存的高效读写)。
Spark中DAG生成过程的重点是对Stage的划分,其划分的依据是RDD的依赖关系,对于不同的依赖关系,高层调度器会进行不同的处理。对于窄依赖,RDD之间的数据不需要进行Shuffle,多个数据处理可以在同一台机器的内存中完成,所以窄依赖在Spark中被划分为同一个Stage;对于宽依赖,由于Shuffle的存在,必须等到父RDD的Shuffle处理完成后,才能开始接下来的计算,所以会在此处进行Stage的切分。
在Spark中,DAG生成的流程关键在于回溯,在程序提交后,高层调度器将所有的RDD看成是一个Stage,然后对此Stage进行从后往前的回溯,遇到Shuffle就断开,遇到窄依赖,则归并到同一个Stage。等到所有的步骤回溯完成,便生成一个DAG图。
DAG生成的相关源码位于Spark的DAGScheduler.scala。getOrCreateParentStages获取或创建一个给定RDD的父Stages列表,getOrCreateParentStages调用了getShuffleDependencies (rdd),getShuffleDependencies返回给定RDD的父节点中直接的Shuffle依赖。
DAGScheduler.scala的getOrCreateParentStages的源码如下。
1. private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { 2. getShuffleDependencies(rdd).map { shuffleDep => 3. getOrCreateShuffleMapStage(shuffleDep, firstJobId) 4. }.toList 5. } 6. 7. ...... 8. private[scheduler] def getShuffleDependencies( 9. rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { 10. val parents = new HashSet[ShuffleDependency[_, _, _]] 11. val visited = new HashSet[RDD[_]] 12. val waitingForVisit = new Stack[RDD[_]] 13. waitingForVisit.push(rdd) 14. while (waitingForVisit.nonEmpty) { 15. val toVisit = waitingForVisit.pop() 16. if (!visited(toVisit)) { 17. visited += toVisit 18. toVisit.dependencies.foreach { 19. case shuffleDep: ShuffleDependency[_, _, _] => 20. parents += shuffleDep 21. case dependency => 22. waitingForVisit.push(dependency.rdd) 23. } 24. } 25. } 26. parents 27. }
3.4.2 DAG逻辑视图解析
本节通过一个简单计数案例讲解DAG具体的生成流程和关系。示例代码如下。
1. val conf = new SparkConf()//创建SparkConf 2. conf.setAppName("Wow,My First Spark App")//设置应用名称 3. conf.setMaster("local")//在本地运行 4. val sc =new SparkContext(conf) 5. val lines = sc.textFile ("C://Users//feng//IdeaProjects//WordCount//src //SparkText.txt",1) 6. //操作1,flatMap由lines通过flatMap操作形成新的MapPartitionRDD 7. val words = lines.flatMap{ lines => lines.split(" ") } 8. //操作2,map 由word通过Map操作形成新的MapPartitionRDD 9. val pairs =words.map { word => (word,1) } 10. //操作3,reduceByKey(包含2步reduce) 11. //此步骤生成MapPartitionRDD和ShuffleRDD 12. val WordCounts =pairs.reduceByKey(_+_) 13. WordCounts.collect.foreach(println) 14. sc.stop()
在程序正式运行前,Spark的DAG调度器会将整个流程设定为一个Stage,此Stage包含3个操作,5个RDD,分别为MapPartitionRDD(读取文件数据时)、MapPartitionRDD(flatMap操作)、MapPartitionRDD(map操作)、MapPartitionRDD(reduceByKey的local段的操作)、ShuffleRDD(reduceByKeyshuffle操作)。
(1)回溯整个流程,在shuffleRDD与MapPartitionRDD(reduceByKey的local段的操作)中存在shuffle操作,整个RDD先在此切开,形成两个Stage。
(2)继续向前回溯,MapPartitionRDD(reduceByKey的local段的操作)与MapPartitionRDD (map操作)中间不存在Shuffle(即两个RDD的依赖关系为窄依赖),归为同一个Stage。
(3)继续回溯,发现往前的所有的RDD之间都不存在Shuffle,应归为同一个Stage。
(4)回溯完成,形成DAG,由两个Stage构成:
第一个Stage由MapPartitionRDD(读取文件数据时)、MapPartitionRDD(flatMap操作)、MapPartitionRDD(map操作)、MapPartitionRDD(reduceByKey的local段的操作)构成,如图3-4所示。
图3-4 Stage 0的构成
第二个Stage由ShuffleRDD(reduceByKey Shuffle操作)构成,如图3-5所示。
图3-5 Stage 1的构成