spark 源码剖析之DAGScheduler

基本概念

我们先举一个小例子

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

这个例子从hdfs上读取文件, 然后每行按照空格分割成单词, 然后reduce出每个单词的数目, 然后把结果保存到hdfs, 我们来看下这段代码经过 DAG 划分究竟变成什么样

代码中每一个对spark框架函数的调用都会生成一个RDD实例, 然后上一个rdd作为生成 rdd的 parents,
最终生成的 rdd 链接图如下

上面根据宽依赖的窄依赖, 最终整个job,会划分为不同的stage, 像是用篱笆隔开了一样, 如果中间有宽依赖,就用刀切一刀, 最终生成了stage1 和stage2,

map 和flatmap 都是窄依赖, 不会产生分界, reduceBykey 是宽依赖, 会产生分界,后面就会产生一个新的stage,

先提前说下, 最终运行的时候, 同一个stage 不同的分区,会产生一个task, 调度到executor上运行,

大家注意这个结论, 如果每一个map就产生一个task到executor运行,调度单位就多了几倍, 数据传输也会多了几倍,性能上做不到, 如果多个stage产生一个 task, 由于宽依赖下一个rdd,会依赖父rdd多个分区里面的数据,所以要产生shuffle, 要到不同的executor上面去拉取数据, 所以也不行,

最终只能一个stage 的每一个分区产生一个task,

只有上一个stage不同分区的多个task全部运行完, 才会运行下一个stage的不同分区的多个task, 所以说多个stage像是用篱笆隔开一样,是这样一个意思

stage 分为两种, ResultStageShuffleMapStage 最终产生结果的stage是ResultStage , 中间过程都是 ShuffleMapStage

属于 ResultStage 的Task都是 ResultTask , 属于 ShuffleMapStage 的Task都是 ShuffleMapTask

最后一个 saveAsTextFile 是一个action 动作,

saveAsTextFile -> saveAsHadoopFile -> saveAsHadoopDataset -> self.context.runJob -> dagScheduler.runJob -> dagScheduler.submitJob

最终触发了job的实际运行, 流程走到了DAGScheduler 模块对整个作业进行 stage的划分,

eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))

DAGScheduler中维护了一个 eventProcessLoop 队列, 所有需要执行的Job 都放入队列,然后依次执行,

从队列中取出消息, 最终 执行handleMapStageSubmitted

finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
...
submitStage(finalStage)

我们忽略一些无用的代码, 只保留了关键的一些代码, createResultStage 这一步里面做的事情,主要是把一个job的一个rdd链条, 根据宽依赖还是窄依赖, 划分为多个 stage, 并且每个 stage 都有一个List[stage] 作为父stage集合,

举个例子, 如果一个依赖关系是上图中所示,

这里就会生成三个 stage,

stage0 的List[Stage] 是stage1 和stage2

submitStage 这个调用就会进行 stage的提交运行, 就会先运行 stage1 和stage2, 等这两个stage都运行完了, 然后运行 stage3

这里的 stage0 就是 ResultStage, stage0 和stage1 都是ShuffleMapStage

创建DAG链

stage 的划分是在 createResultStage 函数中进行的, 其实这里是递归调用,

val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)

这里的parents就就是一个 List[Stage] 然后作为 resultStage 的父Stage集合,new 出来最终的ResultStage,

整个stage划分是一个递归过程, 我们看下这个递归过程的每次递归是什么样的

加入现在起始点是一个 最终 发起action的rdd 或者一个 stage的rdd, 下面就会在下面图示中不断的调来调去

假设我们 0 代表孙子辈的节点, 1代表父亲辈的节点, 2代表爷爷辈的节点

加入我们这里是从 createResultStage 开始, 孙子节点0 然后就到了 getOrCreateParentStages 获取 获取所有父亲辈节点的依赖,List[1] ,遍历调用 父亲辈的节点,调用 getOrCreateShuffleMapStage 函数中 getMissingAncestorShuffleDependencies 会获取所有父亲辈的所有宽依赖, 也就是爷爷辈的节点, 放入 shuffleIdToMapStage 集合中, 在这里爷爷又变成了孙子,递归调用,
这里下一步是创建父亲辈的节点, 这里如果再调用 getOrCreateParentStages的时候, 因为父亲的父亲在刚才递归调用的时候已经放入 shuffleIdToMapStage 中, 所以直接返回,stage, 孙子节点第一次调用返回的时候就获取到了 List[1]

这里的递归终结点就是当前节点没有宽依赖了, 也就没有 父节点了, List[] 为空, 创建了一个 List[Parents] 为空的 ShuffleMapStage

这里有些人会有疑惑, 是 父亲辈的 stageId大, 还是 孙子辈的StageId 大,

其实很简单, 因为在递归调用过程中,

val id = nextStageId.getAndIncrement()

是父亲辈先实际运行这个原子递增的代码, 所有 孙子辈的stageId大, 在DAG 图中,越靠近右边, stageId越大,

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}

private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}

获取一个节点所有的宽依赖, 若遇到ShuffleDependency(即宽依赖),则调用getOrCreateShuffleMapStage(shufDep, jobId)返回一个ShuffleMapStage类型对象,添加到父stage列表中。若为NarrowDenpendency,则将该NarrowDenpendency包含的RDD加入到待visit队列中,之后继续遍历待visit队列中的RDD,直到遇到ShuffleDependency或无依赖的RDD。

函数getParentStages的职责说白了就是:以参数rdd为起点,一级一级遍历依赖,碰到窄依赖就继续往前遍历,碰到宽依赖就调用getShuffleMapStage(shufDep, jobId)。这里需要特别注意的是,getParentStages以rdd为起点遍历RDD依赖并不会遍历整个RDD依赖图,而是一级一级遍历直到所有“遍历路线”都碰到了宽依赖就停止。剩下的事,在遍历的过程中交给getOrCreateShuffleMapStage。

提交运行 stage

等划分为stage, 就产生了一个血缘关系图

下面就要进行提交运行了, 比如 图示中的 stage 血缘关系图

stage1 是没有依赖的stage, stage2 也是没有依赖的stage, stage3 的父亲是stage 和stage2,

stage3 作为一个resultStage 执行 submitStage(finalStage)

submitStage(stage3)

private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}

这里也是一个递归调用, 如果是没有依赖的stage, 直接调用 submitMissingTasks

如果是有依赖的stage 获取依赖的List[stage], 先提交依赖的stage集合,

我们的例子中, 肯定是先执行, stage1, 和stage2,

调用 submitMissingTasks(stage1, jobId.get)

至此, 整个job 就变成了,stage, 然后 从左往右一次提交运行 stage, 后面的代码太长,而且跟 stage的划分相关性不大, 我就不贴了,主要干了以下一些事

  • 根据数据的位置, 匹配 数据所在位置 和 executor 运行槽所在位置, 做到计算追着数据走
  • 每个 分区, 每个 stage, 产生一个task, 然后序列化。序列化后通过广播机制发送到所有节点上去。
  • 序列化的其实是一个rdd, 调度到executor上运行的时候其实是运行的rdd的 compute 方法,
  • resultStage 产生 resultTasks, shuffleMapStage 产生shuffleTask,
  • 包装成一个 TaskSet 调用 taskScheduler.submitTasks
  • 如果 taskScheduler 运行完一个 task, 产生一个消息, DAGScheduler, 就会根据task类型, 然后判断整个stage的task 是否全部运行完, 从而触发下一个stage,
欢迎大家关注:sunbiaobiao's微信公众号