05 调度系统:如何把握分布式计算的精髓?
你好,我是吴磊。
在上一讲,我们通过“包工头与施工工人”的例子,初步认识了Spark进程模型中的Driver和Executors、以及它们之间的交互关系。Driver负责解析用户代码、构建计算流图,然后将计算流图转化为分布式任务,并把任务分发给集群中的Executors交付运行。
不过,你可能会好奇:“对于给定的用户代码和相应的计算流图,Driver是怎么把计算图拆解为分布式任务,又是按照什么规则分发给Executors的呢?还有,Executors具体又是如何执行分布式任务的呢?”
我们之前一再强调,分布式计算的精髓,在于如何把抽象的计算图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行。深入理解分布式计算,是我们做好大数据开发的关键和前提,它能有效避免我们掉入“单机思维”的陷阱,同时也能为性能导向的开发奠定坚实基础。
而上面的这一系列问题,恰恰是我们吃透分布式计算的关键所在。因此,今天这一讲,我们就顺着这些问题,一起去深入探究Spark调度系统,进而弄清楚分布式计算的来龙去脉。
角色划分与斯巴克建筑集团
在上一讲,我们通过“包工头与施工工人”的类比、以及Word Count的示例,其实已经大致厘清了Spark分布式任务调度的核心环节与关键步骤。今天这一讲的核心任务,就是带你去深入其中的每一个环节,做到“既见森林、也见树木”。这里咱们不妨先把这些环节和涉及的组件梳理出来,从而让你在整体上有一个清晰的把握。
不难发现,表中的步骤与组件众多,要是照本宣科地去讲调度系统,先别说你可能看着看着就开始犯困了,就连我自己,也可能写着写着就睡着了。因此,咱们不妨把这些环节与组件融入到一个故事中去,让你像读小说一样,在捧腹之余弄懂Spark调度系统。
话说很久以前,美国有一家名扬海内外的建筑集团,名为“斯巴克(Spark)”。这家建筑集团规模庞大,设有一个总公司(Driver),和多个分公司(Executors)。斯巴克公司的主要服务对象是建筑设计师(开发者),建筑设计师负责提供设计图纸(用户代码、计算图),而斯巴克公司的主营业务是将图纸落地、建造起一栋栋高楼大厦。
要完成主营业务,集团公司需要招聘能够看懂图纸、并将其转化为建筑项目的架构师,因此斯巴克公司挖角了行业知名架构师“戴格”(DAGScheduler)。集团公司给戴格安排的职位是总公司的一把手,同时要求两位创始元老“塔斯克”和“拜肯德”全力配合戴格的工作。
听到这里,你肯定会问“塔斯克”和“拜肯德”是谁呢?
塔斯克(TaskScheduler)一毕业就加入了斯巴克公司,现任总公司施工经理,成功指挥完成了多个大大小小的工程项目,业绩非常突出,深得公司赏识。拜肯德(SchedulerBackend)和塔斯克在上大学的时候就是上下铺,关系好得穿一条裤子,现任总公司人力资源总监,负责与分公司协调、安排人力资源。从公司的安排来看,三位主管的分工还是比较明确的。
之所以说塔斯克(TaskScheduler)和拜肯德(SchedulerBackend)是公司元老,原因在于,在SparkContext / SparkSession的初始化中,TaskScheduler和SchedulerBackend是最早、且同时被创建的调度系统组件。这二者的关系非常微妙:SchedulerBackend在构造方法中引用TaskScheduler,而TaskScheduler在初始化时会引用SchedulerBackend。
值得一提的是,SchedulerBackend组件的实例化,取决于开发者指定的Spark MasterURL,也就是我们使用spark-shell(或是spark-submit)时指定的–master 参数,如“–master spark://ip:host”就代表Standalone 部署模式,“–master yarn”就代表YARN 模式等等。
不难发现,SchedulerBackend 与资源管理器(Standalone、YARN、Mesos等)强绑定,是资源管理器在 Spark 中的代理。其实硬件资源与人力资源一样,都是“干活儿的”。所以,如果我们用集团公司的人力资源来类比Spark集群的硬件资源,那么“拜肯德”就是名副其实的人力资源总监。
从全局视角来看,DAGScheduler是任务调度的发起者,DAGScheduler以TaskSet为粒度,向TaskScheduler提交任务调度请求。TaskScheduler在初始化的过程中,会创建任务调度队列,任务调度队列用于缓存 DAGScheduler提交的TaskSets。TaskScheduler结合SchedulerBackend提供的 WorkerOffer,按照预先设置的调度策略依次对队列中的任务进行调度。
简而言之,DAGScheduler手里有“活儿”,SchedulerBackend手里有“人力”,TaskScheduler的核心职能,就是把合适的“活儿”派发到合适的“人”的手里。由此可见,TaskScheduler承担的是承上启下、上通下达的关键角色,这也正是我们将“塔斯克”视为斯巴克建筑公司元老之一的重要原因。
那么,了解了这三个主管的角色职责,我们接下来就来详细说说,他们是怎么各自完成自己的工作的。
总架戴格:DAGScheduler
回到我们的故事里,戴格在两位元老的协助下,工作开展得还算顺利,然而,冰层之下,暗流涌动,作为一名空降的领导,戴老板还需亲自“露两手”,才能赢得平级的认可与信任。
作为集团公司的“总架”(总架构师),戴格的核心职责,是把计算图DAG拆分为执行阶段Stages,Stages指的是不同的运行阶段,同时还要负责把Stages转化为任务集合TaskSets,也就是把“建筑图纸”转化成可执行、可操作的“建筑项目”。
用一句话来概括从 DAG 到 Stages 的拆分过程,那就是:以 Actions 算子为起点,从后向前回溯 DAG,以 Shuffle 操作为边界去划分 Stages。
在第2讲介绍编程模型的时候,我们以Word Count为例,提到Spark作业的运行分为两个环节,第一个是以惰性的方式构建计算图,第二个则是通过Actions算子触发作业的从头计算:
对于图中的第二个环节,Spark在实际运行的过程中,会把它再细化为两个步骤。第一个步骤,就是以Shuffle为边界,从后向前以递归的方式,把逻辑上的计算图DAG,转化成一个又一个Stages。
我们还是以Word Count为例,Spark以take算子为起点,依次把DAG中的RDD划入到第一个Stage,直到遇到reduceByKey算子。由于reduceByKey算子会引入Shuffle,因此第一个Stage创建完毕,且只包含wordCounts这一个RDD。接下来,Spark继续向前回溯,由于未曾碰到会引入Shuffle的算子,因此它把“沿途”所有的RDD都划入了第二个Stage。
在Stages创建完毕之后,就到了触发计算的第二个步骤:Spark从后向前,以递归的方式,依次提请执行所有的Stages。
具体来说,在Word Count的例子中,DAGScheduler最先提请执行的是Stage1。在提交的时候,DAGScheduler发现Stage1依赖的父Stage,也就是Stage0,还没有执行过,那么这个时候它会把Stage1的提交动作压栈,转而去提请执行Stage0。当Stage0执行完毕的时候,DAGScheduler通过出栈的动作,再次提请执行Stage 1。
对于提请执行的每一个Stage,DAGScheduler根据Stage内RDD的partitions属性创建分布式任务集合TaskSet。TaskSet包含一个又一个分布式任务Task,RDD有多少数据分区,TaskSet就包含多少个Task。换句话说,Task与RDD的分区,是一一对应的。
你可能会问:“Task代表的是分布式任务,不过它具体是什么呢?”要更好地认识Task,我们不妨来看看它的关键属性。
在上表中,stageId、stageAttemptId标记了Task与执行阶段Stage的所属关系;taskBinary则封装了隶属于这个执行阶段的用户代码;partition就是我们刚刚说的RDD数据分区;locs属性以字符串的形式记录了该任务倾向的计算节点或是Executor ID。
不难发现,taskBinary、partition和locs这三个属性,一起描述了这样一件事情:Task应该在哪里(locs)为谁(partition)执行什么任务(taskBinary)。
到这里,我们讲完了戴格的职责,让我们来一起简单汇总一下,戴格指代的是DAGScheduler,DAGScheduler的主要职责有三个:
- 根据用户代码构建DAG;
- 以Shuffle为边界切割Stages;
- 基于Stages创建TaskSets,并将TaskSets提交给TaskScheduler请求调度。
现在,戴格不辱使命,完成了“建筑图纸”到“建筑项目”的转化,接下来,他需要把这些“活儿”下派给塔斯克,由塔斯克进一步完成任务的委派。
不过,对于塔斯克来说,要想把这些“活儿”委派出去,他得先摸清楚集团内有多少“适龄劳动力”才行。要做到这一点,他必须仰仗死党:拜肯德的帮忙。
拜肯德:SchedulerBackend
作为集团公司的人力资源总监,拜肯德的核心职责,就是实时汇总并掌握全公司的人力资源状况。前面我们讲了,全公司的人力资源对应的就是Spark的计算资源。对于集群中可用的计算资源,SchedulerBackend用一个叫做ExecutorDataMap的数据结构,来记录每一个计算节点中Executors的资源状态。
这里的ExecutorDataMap是一种HashMap,它的Key是标记 Executor 的字符串,Value是一种叫做ExecutorData的数据结构。ExecutorData用于封装Executor的资源状态,如RPC地址、主机地址、可用CPU核数和满配CPU核数等等,它相当于是对Executor做的“资源画像”。
有了ExecutorDataMap这本“人力资源小册子”,对内,SchedulerBackend可以就Executor做“资源画像”;对外,SchedulerBackend以WorkerOffer为粒度提供计算资源。其中,WorkerOffer封装了Executor ID、主机地址和CPU核数,它用来表示一份可用于调度任务的空闲资源。
显然,基于Executor资源画像,SchedulerBackend可以同时提供多个WorkerOffer用于分布式任务调度。WorkerOffer这个名字起得很传神,Offer的字面意思是公司给你提供的工作机会,到了Spark调度系统的上下文,它就变成了使用硬件资源的机会。
你可能会好奇,坐镇总公司的拜肯德,对于整个集团的人力资源,他是怎么做到足不出户就如数家珍的?一个篱笆三个桩,一个好汉三个帮。仅凭拜肯德一己之力,自然是力不从心,幕后功臣实际上是驻扎在分公司的一众小弟们:ExecutorBackend。
SchedulerBackend与集群内所有Executors中的ExecutorBackend保持周期性通信,双方通过LaunchedExecutor、RemoveExecutor、StatusUpdate等消息来互通有无、变更可用计算资源。拜肯德正是通过这些小弟发送的“信件”,来不停地更新自己手中的那本小册子,从而对集团人力资源了如指掌。
塔斯克:TaskScheduler
一把手戴格有“活儿”,三把手拜肯德出“人力”,接下来,终于轮到牵线搭桥的塔斯克出马了。作为施工经理,塔斯克的核心职责是,给定拜肯德提供的“人力”,遴选出最合适的“活儿”并派发出去。而这个遴选的过程,就是任务调度的核心所在,如下图步骤3所示:
那么问题来了,对于SchedulerBackend提供的一个个WorkerOffer,TaskScheduler是依据什么规则来挑选Tasks的呢?
用一句话来回答,对于给定的WorkerOffer,TaskScheduler是按照任务的本地倾向性,来遴选出TaskSet中适合调度的Tasks。这是什么意思呢?听上去比较抽象,我们还是从DAGScheduler在Stage内创建任务集TaskSet说起。
我们刚刚说过,Task与RDD的partitions是一一对应的,在创建Task的过程中,DAGScheduler会根据数据分区的物理地址,来为Task设置locs属性。locs属性记录了数据分区所在的计算节点、甚至是Executor进程ID。
举例来说,当我们调用textFile API从HDFS文件系统中读取源文件时,Spark会根据HDFS NameNode当中记录的元数据,获取数据分区的存储地址,例如node0:/rootPath/partition0-replica0,node1:/rootPath/partition0-replica1和node2:/rootPath/partition0-replica2。
那么,DAGScheduler在为该数据分区创建Task0的时候,会把这些地址中的计算节点记录到Task0的locs属性。
如此一来,当TaskScheduler需要调度Task0这个分布式任务的时候,根据Task0的locs属性,它就知道:“Task0所需处理的数据分区,在节点node0、node1、node2上存有副本,因此,如果WorkOffer是来自这3个节点的计算资源,那对Task0来说就是投其所好”。
从这个例子我们就能更好地理解,每个任务都是自带本地倾向性的,换句话说,每个任务都有自己的“调度意愿”。
回到斯巴克建筑集团的类比,就好比是某个“活儿”,并不是所有人都能干,而是只倾向于让某些人来做,因为他们更专业。比如砌墙这件事,更倾向于给工龄3年以上的瓦工来做;而吊顶,则更倾向于给经验超过5年的木工来做,诸如此类。
像上面这种定向到计算节点粒度的本地性倾向,Spark中的术语叫做NODE_LOCAL。除了定向到节点,Task还可以定向到进程(Executor)、机架、任意地址,它们对应的术语分别是PROCESS_LOCAL、RACK_LOCAL和ANY。
对于倾向PROCESS_LOCAL的Task来说,它要求对应的数据分区在某个进程(Executor)中存有副本;而对于倾向RACK_LOCAL的Task来说,它仅要求相应的数据分区存在于同一机架即可。ANY则等同于无定向,也就是Task对于分发的目的地没有倾向性,被调度到哪里都可以。
下图展示的是,TaskScheduler依据本地性倾向,依次进行任务调度的运行逻辑:
不难发现,从PROCESS_LOCAL、NODE_LOCAL、到RACK_LOCAL、再到ANY,Task的本地性倾向逐渐从严苛变得宽松。TaskScheduler接收到WorkerOffer之后,也正是按照这个顺序来遍历TaskSet中的Tasks,优先调度本地性倾向为PROCESS_LOCAL的Task,而NODE_LOCAL次之,RACK_LOCAL为再次,最后是ANY。
你可能会问:“Spark区分对待不同的本地倾向性,它的初衷和意图是什么呢?”实际上,不同的本地性倾向,本质上是用来区分计算(代码)与数据之间的关系。
Spark调度系统的核心思想,是“数据不动、代码动”。也就是说,在任务调度的过程中,为了完成分布式计算,Spark倾向于让数据待在原地、保持不动,而把计算任务(代码)调度、分发到数据所在的地方,从而消除数据分发引入的性能隐患。毕竟,相比分发数据,分发代码要轻量得多。
本地性倾向则意味着代码和数据应该在哪里“相会”,PROCESS_LOCAL是在JVM进程中,NODE_LOCAL是在节点内,RACK_LOCAL是不超出物理机架的范围,而ANY则代表“无所谓、不重要”。
好啦,到此为止,结合WorkerOffer与任务的本地性倾向,塔斯克TaskScheduler挑选出了适合调度的“活儿”:Tasks。接下来,TaskScheduler就把这些Tasks通过LaunchTask消息,发送给好基友SchedulerBackend。人力资源总监SchedulerBackend拿到这些活儿之后,同样使用LaunchTask消息,把活儿进一步下发给分公司的小弟:ExecutorBackend。
那么小弟ExecutorBackend拿到活之后,是怎么工作的呢?我们接着往下看吧!
付诸执行:ExecutorBackend
作为分公司的人力资源主管,ExecutorBackend拿到“活儿”之后,随即把活儿派发给分公司的建筑工人。这些工人,就是Executors线程池中一个又一个的CPU线程,每个线程负责处理一个Task。
每当Task处理完毕,这些线程便会通过ExecutorBackend,向Driver端的SchedulerBackend发送StatusUpdate事件,告知Task执行状态。接下来,TaskScheduler与SchedulerBackend通过接力的方式,最终把状态汇报给DAGScheduler,如图中步骤7、8、9所示:
对于同一个TaskSet当中的Tasks来说,当它们分别完成了任务调度与任务执行这两个环节时,也就是上图中步骤1到步骤9的计算过程,Spark调度系统就完成了DAG中某一个Stage的任务调度。
不过,故事到这里并未结束。我们知道,一个DAG会包含多个Stages,一个Stage的结束即宣告下一个Stage的开始,而这也是戴格起初将DAG划分为Stages的意义所在。只有当所有的Stages全部调度、执行完毕,才表示一个完整的Spark作业宣告结束。
路遥知马力,在一起合作了一个又一个建筑项目之后,空降老大戴格终于赢得了元老塔斯克和拜肯德的信任与认可,坐稳了斯巴克建筑集团的头把交椅。来日可期,戴格的前景一片光明。
重点回顾
今天这一讲,我们用斯巴克建筑集团的故事,介绍了Spark调度系统的工作原理。对于调度系统的工作流程,你需要掌握表格中的5个关键环节:
具体说来,任务调度分为如下5个步骤:
1.DAGScheduler以Shuffle为边界,将开发者设计的计算图DAG拆分为多个执行阶段Stages,然后为每个Stage创建任务集TaskSet。
2.SchedulerBackend通过与Executors中的ExecutorBackend的交互来实时地获取集群中可用的计算资源,并将这些信息记录到ExecutorDataMap数据结构。
3.与此同时,SchedulerBackend根据ExecutorDataMap中可用资源创建WorkerOffer,以WorkerOffer为粒度提供计算资源。
4.对于给定WorkerOffer,TaskScheduler结合TaskSet中任务的本地性倾向,按照PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL和ANY的顺序,依次对TaskSet中的任务进行遍历,优先调度本地性倾向要求苛刻的Task。
5.被选中的Task由TaskScheduler传递给SchedulerBackend,再由SchedulerBackend分发到Executors中的ExecutorBackend。Executors接收到Task之后,即调用本地线程池来执行分布式任务。
今天的内容就是这些,调度系统是分布式计算系统的核心,掌握了Spark任务调度的来龙去脉,你也就把握住了Spark分布式计算引擎的精髓,这会为你开发出高性能的Spark分布式应用打下坚实基础。
每课一练
课程的最后,我来给你留一道练习题。请你想一想,DAGScheduler如何得知一个Stage中所有的Tasks都已调度、执行完毕,然后才决定开始调度DAG中的下一个Stage?
欢迎你在评论区回答这个问题。如果你觉得这一讲对你有所帮助,也欢迎你把它分享给更多的朋友和同事。我在评论区等你,咱们下一讲见!
- Geek_2dfa9a 👍(42) 💬(4)
回答这个流程比较长哈,没有点开源源码阅读经验还真不好答哈,我这里都是自己的理解,如有异议欢迎讨论。 以老师的WordCount为例,首先stage分为两种:ResultStage,ShuffleMapStage。 ResultStage是啥呢就是处理Action动作的,一般也就是最后一个Stage(当然一个driver里也可能有多个Action,所以ResultStage也可能有多个, 这里简单点,WordCount例子里就是Stage1)。 ShuffleMapStage是啥呢,就是产生一个shuffle文件的stage,对应WordCount的Stage0。 为啥要分这么两类呢,因为你DAGScheduler就是根据是否shuffle倒推出来的stage嘛。这里多提一句,多个Job会共享Stage,这样就可以避免重复计算提升效率。 再根据老师讲的每个Stage里会根据RDD的partitions创建Task这段结合源码发现Task也有两种:ResultTask和ShuffleMapTask,对应的是Stage的两种类型。 接下来分析下DAGScheduler怎么感知Task的执行状态,DAGScheduler内部有一个线程eventProcessLoop,线程使用了生产者消费者模式, 里面有一个LinkedBlockingDeque队列,生产者TaskSetManager(可以忽略,理解为一个线程即可)发送给DAGScheduler的各种Task的event, 消费者eventProcessLoop把event取出来然后委托给DAGScheduler处理,其中对应Task完成的逻辑在方法handleTaskCompletion(event: CompletionEvent)中。既然问的是怎么知道当前Stage已经运行完成,可以运行下一个Stage,那当前Stage肯定不是ResultStage, 因此在handleTaskCompletion找到处理ShuffleMapTask的event的逻辑,具体逻辑为:先找到executor的ID标志execId, 校验execId是否为下发的Executor(以防伪造的event)然后标记ShuffleMapTask的结果可用,然后检查当前ShuffleMapStage是否还有没处理的Task, 如果没有的话说明该Stage完成,最后submitWaitingChildStages提交等待中的后续Stage。 这里使用的eventProcessLoop生产者消费者模式比较巧妙,生产者可能有多个线程(没进一步确认,有可能是1/n个),但是消费者是单线程的, 生产者也不直接修改DAGScheduler内部的成员,只通过丢event给线程安全的LinkedBlockingDeque,这样就保证了没有数据竞争。 最后,老师讲的非常清楚,看了这门课后我买了老师的另一门Spark性能调优,在这里感谢老师。最后提一句,您配图里的字能调大点嘛,字太小了。
2021-09-20 - welldo 👍(5) 💬(2)
老师, 根据“当 TaskScheduler 需要调度 Task0 这个任务时,根据 Task0 的 locs 属性,它就知道。。。”这段话, 和你回复unknown同学的“仅仅知道数据在某个机架内(一个机架包含多台机器)”这段话, 结合起来,意思是不是: task对于它要处理的数据在哪里, 有时候精确知道(在某个进程内或某个节点内); 有时候模糊知道(在某个机架内);有时候不知道(any), 并且标注在它的locs属性里, “精确知道的”task,就只挑选符合要求的offer; “模糊知道的”task,就挑选大致符合要求的offer; “不知道的”task,就随便选一个offer。 老师,请问我的理解对吗?
2021-10-27 - 花生耿 👍(4) 💬(3)
老师,我是spark零基础。这个例子中的一堆中文名字反而增加了理解的难度,我建议直接把里面的角色直接替换成真实的组件更容易理解,要不然还得一边看中文名字,一边跟那个组件对应。
2022-02-28 - pythonbug 👍(4) 💬(1)
老师好,TaskScheduler调度优先级那里是不是可以这样理解: for 优先级 <- PROCESS_LOCAL to ANY { for task <- task0 to taskn{ if task0.locs == workoffer 分配 else 跳过 } }
2021-11-13 - bian 👍(4) 💬(1)
这章真的是将执行流程讲的棒极了
2021-10-22 - Jordan·李威 👍(4) 💬(1)
总公司和分公司的工作任务和人力资源分配调度的例子太匹配了。
2021-09-20 - 艾利特-G 👍(3) 💬(1)
> 2.SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构。 这一步中,假如数据是存放在HDFS中,那么SchedulerBackend通过读取HDFS的元数据就知道了某个task所需要的数据分片在哪个DataNode(YARN/Spark worker node)。 这个时候为了知道该DataNode的计算资源,不需要实际地启动一个executor,只需要向YARN NodeManager(或者别的资源调度框架中的worker daemon)询问一下就知道了吧?
2022-02-07 - 赌神很低调 👍(3) 💬(1)
老师好,文中说task任务分发通过数据本地性找到合适的executor,想了解executor进程是根据数据所在的节点创建的吗?否则不是很大几率都找不到合适的executor?
2022-01-22 - D.C 👍(3) 💬(1)
一个taskSet下的不同task,可以分配到同一个executors么?
2021-11-05 - 加乘 👍(2) 💬(3)
老师好,有个基础的问题,没太明白,就是文中说到“当我们调用 textFile API 从 HDFS 文件系统中读取源文件时,Spark 会根据 HDFS NameNode 当中记录的元数据......”时,数据和Executors应该不在一个物理机器上吧,那么Executors执行的时候,是到数据节点的机器上读取数据再进行处理吗? 后面又提到“数据不动,代码动”,感觉应该是把代码发到数据节点,然后在数据节点上进行计算处理。然后再返回给Executors执行结果,这样理解对吗?
2021-09-28 - Unknown element 👍(1) 💬(2)
机架就是物理上的“铁架”吗 那 rack_local 和 node_local 有什么区别呢
2021-09-29 - 虹桥念苏 👍(1) 💬(5)
虽说很形象,但有点啰嗦,学习还是喜欢干货满满,可能比较适合完全零基础的同学吧
2021-09-22 - 康 👍(0) 💬(1)
老师讲的太好了
2022-01-25 - liugddx 👍(0) 💬(2)
我想问下,DAGScheduler划分stage为啥是按照shuffle倒推呢?
2021-12-10 - 哇塞 👍(0) 💬(1)
牛逼 例子很形象啊!!!
2021-12-08