![Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)](https://wfqqreader-1252317822.image.myqcloud.com/cover/174/40375174/b_40375174.jpg)
8.4 Driver如何管理ShuffleMapTask和ResultTask的处理结果
Spark Job中,根据Task所处Stage的位置,我们将Task分为两类:第一类叫shuffleMapTask,指Task所处的Stage不是最后一个Stage,也就是Stage的计算结果还没有输出,而是通过Shuffle交给下一个Stage使用;第二类叫resultTask,指Task所处Stage是DAG中最后的一个Stage,也就是Stage计算结果需要进行输出等操作,计算到此为止已经结束。简单地说,Spark Job中除了最后一个Stage的Task叫resultTask,其他所有Task都叫ShuffleMapTask。
8.4.1 ShuffleMapTask执行结果和Driver的交互原理及源码详解
Driver中的CoarseGrainedSchedulerBackend给CoarseGrainedExecutorBackend发送launchTasks消息,CoarseGrainedExecutorBackend收到launchTasks消息以后会调用executor.launchTask。通过launchTask执行Task,launchTask方法中根据传入的参数:taskId、尝试次数、任务名称、序列化后的任务创建一个TaskRunner,在threadPool中执行TaskRunner。TaskRunner内部会先做一些准备工作,如反序列化Task的依赖,通过网络获取需要的文件、Jar等;然后调用反序列化后的Task.run方法来执行任务并获得执行结果。
其中,Task的run方法调用的时候会导致Task的抽象方法runTask的调用,Task.scala的runTask方法是一个抽象方法。Task包括ResultTask、ShuffleMapTask两种Task,抽象runTask方法具体的实现由子类的runTask实现。ShuffleMapTask的runTask实际运行的时候会调用RDD的iterator,然后针对Partition进行计算。
ShuffleMapTask.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P400_255444.jpg?sign=1738951703-plvDe7SOk3fy1u7tQU52jebiLhMMMs6i-0-692c7e4ed6a9d30fffe0a134544051a5)
ShuffleMapTask方法中调用ShuffleManager写入器writer方法,在write时最终计算会调用RDD的compute方法。通过writer.stop(success = true).get,如果写入成功,就返回MapStatus结果值。
SortShuffleWriter.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P401_255446.jpg?sign=1738951703-BTsSO3j7VcVz5xTwSiP4BsGQwemf2DrS-0-09c659475c60ad8dcb77f34d3f5bd06f)
回到TaskRunner的run方法,把task.run执行结果通过resultSer.serialize(value)序列化,生成一个directResult。然后根据大小判断不同的结果赋值给serializedResult,传回给Driver。
(1)如果任务执行结果特别大,超过1GB,日志就提示超出任务大小限制,返回元数据ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))。
Executor.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P401_255447.jpg?sign=1738951703-n3Ti9vTRrUYlxHxYWQTjmHpeEcOReRvO-0-9b88c61552c1b9530a71b074d104b0c0)
(2)如果任务执行结果小于1GB,大于maxDirectResultSize(128MB),就放入blockManager,返回元数据ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))。
Executor.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P401_255448.jpg?sign=1738951703-P1wmhPQSmCLFtFADi76n6S8ams7Ur4uA-0-4c1d35f0fc16aad64c7360d9d37fe037)
(3)如果任务执行结果小于128MB,就直接返回serializedDirectResult。
Executor.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P402_255449.jpg?sign=1738951703-ak1hklAL7nqfxqHyFUGlJ8hHpeA8Vefw-0-e801c4efc206663555d7250dfc4ae82f)
接下来,TaskRunner的run方法中调用execBackend.statusUpdate(taskId,TaskState.FINISHED,serializedResult)给Driver发送一个消息,消息中将taskId、TaskState.FINISHED、serializedResult传进去。这里,execBackend是CoarseGrainedExecutorBackend。
Executor.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P402_255450.jpg?sign=1738951703-HRvtLN56I9cRegXmi4g0vqAsTDGwqmli-0-77992f6ade4e8a2b4c436199c7df682a)
CoarseGrainedExecutorBackend的statusUpdate方法的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P402_255451.jpg?sign=1738951703-58zcRYcGhZ9e5RXW0UtkxzVzeDlZmd5d-0-c2303b374c4acb9dcca9116c82c93790)
CoarseGrainedExecutorBackend给DriverEndpoint发送StatusUpdate来传输执行结果。DriverEndpoint是一个ThreadSafeRpcEndpoint消息循环体,模式匹配收到StatusUpdate消息,调用scheduler.statusUpdate(taskId, state, data.value)方法执行。这里的scheduler是TaskSchedulerImpl。
CoarseGrainedSchedulerBackend.scala的DriverEndpoint的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P402_255452.jpg?sign=1738951703-VKTTadf0q4cgOAfODnLIr0KPCjw5TLNv-0-88b09133cc6d789e8e4c3c973e32cfa7)
DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,交给TaskResultGetter内部,通过线程去分别处理Task执行成功和失败时的不同情况,然后告诉DAGScheduler任务处理结束的状况。
TaskSchedulerImpl.scala的statusUpdate的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P402_255453.jpg?sign=1738951703-vEYcw6qCtP3wZnoO9EbqKUXQOnQKvvWe-0-e8ac79d196bee12119a2375b5ae0d919)
TaskResultGetter.scala的enqueueSuccessfulTask方法中,开辟一条新线程处理成功任务,对结果进行相应的处理后调用scheduler.handleSuccessfulTask。
TaskSchedulerImpl的handleSuccessfulTask的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P403_255454.jpg?sign=1738951703-NChcl7B9ybABXoY3EfLb5yWQ6U8cXpDf-0-1dcb7cae323558b6e9391891dbcc016e)
TaskSchedulerImpl的handleSuccessfulTask交给TaskSetManager调用handleSuccessfulTask。
TaskSetManager的handleSuccessfulTask的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P403_255456.jpg?sign=1738951703-oU3xioU5Vxp00vdxg0jeUvh8EalLNIA3-0-5e1b6339d4ac448680d435b3016e5487)
handleSuccessfulTask方法中调用sched.dagScheduler.taskEnded,taskEnded由TaskSetManager调用,汇报任务完成或者失败。将任务完成的事件CompletionEvent放入eventProcessLoop事件处理循环中。
DAGScheduler.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P403_255457.jpg?sign=1738951703-nI7VepGtsuPiUSiH36xTxnSmeo9uMUvK-0-301ad4c23c5662ca901c2959db3ba8eb)
由事件循环线程读取消息,并调用DAGSchedulerEventProcessLoop.onReceive方法进行消息处理。
DAGScheduler.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P403_255458.jpg?sign=1738951703-qEQouNob22NGSAJjyWahJMJB0BJIN6EV-0-e1446278597886469ee378cf91cc2df3)
onReceive中调用doOnReceive(event)方法,模式匹配到CompletionEvent,调用dagScheduler.handleTaskCompletion方法。
DAGScheduler.scala的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P403_255459.jpg?sign=1738951703-ofOCgMd9IIexcS0o7Y30SnHKIBfMBNbB-0-df679c2b6c1854db282f1b24a2637276)
DAGScheduler.handleTaskCompletion中task执行成功的情况,根据ShuffleMapTask和ResultTask两种情况分别处理。其中,ShuffleMapTask将MapStatus汇报给MapOutTracker。
Spark 2.2.1版本的DAGScheduler的handleTaskCompletion的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P404_255461.jpg?sign=1738951703-dgRxu7wazAFbdR5xaDCMVv2WfFF99Uxh-0-bc24d6952805e2862005830adbc2295b)
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P405_255462.jpg?sign=1738951703-9UrtaSTb6VaDbx757RH9F4JRu8X3Kmr6-0-6f2483bc8a33f6bc8929aed62128f38b)
Spark 2.4.3版本的DAGScheduler的handleTaskCompletion的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第11行之后新增代码,从shuffleStage.pendingPartitions中去掉分区ID。
上段代码中删掉第15~21行。
上段代码中删掉第25~32行,新增mapOutputTracker.registerMapOutput进行注册的代码。
上段代码中删掉第46~49行,新增mapOutputTracker.incrementEpoch的代码。
上段代码中删掉第61~67行,将其封装为一个方法,新增markMapStageJobsAs-Finished(shuffleStage)的代码。
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P405_255463.jpg?sign=1738951703-PbA6iInqF0IT6bcDkTVIdZoTYG8tRQUN-0-2de1af7bfbf443ea4223ac0cffdd58fc)
8.4.2 ResultTask执行结果与Driver的交互原理及源码详解
Task的run方法调用的时候会导致Task的抽象方法runTask的调用,Task.scala的runTask方法是一个抽象方法。Task包括ResultTask、ShuffleMapTask两种Task,抽象runTask方法具体的实现由子类的runTask实现。ResultTask的runTask具体实现的源码如下。
ResultTask.scala的runTask的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P406_255466.jpg?sign=1738951703-04nFQXHcplO8Xnmr6GdDYfWImUep3yz2-0-8df4b57866e4ee2929d90ac0bddd2b7a)
而ResultTask的runTask方法中反序列化生成func函数,最后通过func函数计算出最终的结果。
ResultTask执行结果与Driver的交互过程同ShuffleMapTask类似,最终,DAGScheduler.handleTaskCompletion中Task执行结果,根据ShuffleMapTask和ResultTask两种情况分别处理。其中,ResultTask的处理结果如下所示。
DAGScheduler的handleTaskCompletion的源码如下:
![](https://epubservercos.yuewen.com/A9A703/20964119708003506/epubprivate/OEBPS/Images/Figure-P406_255467.jpg?sign=1738951703-ctUhP2PWFXLB3Ge1VqHvGJ1NkDSqhX6v-0-03ab093316357e56a8b9188d78b19bc0)
Driver端的DAGScheduler的MapOutputTracker把shuffleMapTask执行的结果交给ResultTask,ResultTask根据前面Stage的执行结果进行shuffle后产生整个Job最后的结果。