spark 资源动态调整设计和实现剖析

总体描述

这里有executor 个数动态变化的图

可以看到, 初始状态的会积压等待 schedulerBacklogTimeout 时间, 然后以越来越快的速度递增,斜率随sustainedSchedulerBacklogTimeout 2倍速度增加, 一个stage 结束后,恢复初始状态,executor 数目根据配置有一个上限和下限, 当 job 结束, executor 空闲一定时间, 然后会一个个 被 remove 掉。

ExecutorAllocationManager 是 一个根据工作负载动态分配和删除 executors 的管家, ExecutorAllocationManager 维持一个动态调整的目标executors数目, 并且定期同步到资源管理者,也就是 yarn 或者 mesos。
启动的时候根据配置设置一个目标executors数目, spark 运行过程中会根据等待(pending)和正在运行(running)的tasks数目动态调整目标executors数目。

当前的executors 个数超过实际负载所需的个数时,会触发减少目标executors数目。

目标executors数目一般会调整为等待(pending)和正在运行(running)的tasks的数目和,
当有过多的task 积压, 等待调度的时候, 会触发增加目标executors数目。
如果调度队列没有在 N 秒内消费完, 就增加新的 executors。 如果积压的情况还会持续 M 秒, 就会触发增加更多的executors, 每一轮增加executors的幅度指数上升, 直到到达上限, 需求的数目根据配置的属性 和当前的 正在运行(running)的tasks的数目和 确定。

增长的幅度受到两个因素控制,

  • 开始的时候, 应该缓慢增加,防止激增超出实际需要, 我们还得一个个删除
  • 目标executors数目很高的情况持续了一段时间后, 就应该快速递增,否则会让少量 executors 吃重太长时间。

删除的策略倒是很简单:

如果一个 executor 已经空闲了 K秒, 意外着它短时间内都不会被调度到, 删除之。

在任何一种情况下, 都没有重试逻辑, 因为我们假定, 资源管理器收到异步请求后, 可以很好的完成。

以下是相关的配置参数

  • spark.dynamicAllocation.enabled 是否开启动态资源调整
  • spark.dynamicAllocation.minExecutors executors 数目的下限
  • spark.dynamicAllocation.maxExecutors executors 数目的上限
  • spark.dynamicAllocation.initialExecutors 初始 executors 数目
  • spark.dynamicAllocation.schedulerBacklogTimeout(M) 第一次积压的情况持续多少秒会,会触发调整
  • spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(N) 第一次初始积压超时触发调整后, 如果积压的情况持续多少秒会,会触发调整

  • spark.dynamicAllocation.executorIdleTimeout(K) 空闲多长时间, 会触发删除 executor

监听获取压力信息

在 new ExecutorAllocationManager 的时候会new 一个ExecutorAllocationListener放在spark 的 listenBus 中, 这里使用了监听者模式, 就是发生以下事件的时候, 我们注册的钩子函数都会被调用到, 下面表格中列出了, 监听那些事件, 以及事件发生的时候做了什么

这个 监听器 给动态资源调整提供了调整的依据, 因为只有 资源调整器知道了积压情况 才能进行调整。

消息 处理
onStageSubmitted 一个stage 开始的时候,更新 stageid 和 task个数关系, 启动第一次积压定时,更新资源管理器的task 偏向 host的信息
onStageCompleted 恢复到没有积压过task的状态,递增的幅度设置为 1
onTaskStart numRunningTasks加一,如果当前task是最后一个pending task, 恢复到没有积压过task的状态,递增的幅度设置为 1 , 运行task的executor如果是空闲状态,标记为不空闲
onTaskEnd numRunningTasks减一,判断如果运行task的executor里面没有运行一个task,标记为空闲 idle 状态
onExecutorAdded executorIds中加入 Executor
onExecutorRemoved 从各种集合中删除该 Executor

我们可以看到, 当stage 开始的时候, 启动第一次积压定时, 定时的时长可以通过 spark.dynamicAllocation.schedulerBacklogTimeout 参数控制, 如果超过了这个时间, 就会触发增加更多的executors, 如果一个 stage 运行完了, 就会恢复到没有积压过task的状态, 递增的幅度设置为 1 , 因为下一个stage 开始的时候应该缓慢增加。不能在上一个 stage的递增速度的基础下递增的更快,防止激增超出实际需要, 我们还得一个个删除。

如果一个executor里面运行了task, 就会把这个 executor 从待删除的executor集合中拿走,

动态调整的执行者

如果动态资源管理器根据当前压力确定了目前task所需目标executors数目, 这个决策最终要下达到最终执行者, 这里我们先简单了解, 以后我会深入写一下这个过程,

在这里 我们把 资源管理决策的最终执行者抽象为一个 ExecutorAllocationClient 的 trait, 也就是这个 最终执行者 要具有以下的能力

下面是协议图, 和协议

能力 解释
requestTotalExecutors 根据我们的需求, 总共目标executors数目,进行调整,直到调整一致为止, 如果这里对应的是 yarn, 就通过 applicaiton master 来调整,当然如果 我们目前正在用的 executor 数目 大于 需求的 executor 数目, 这个调用时不会主动删除 executor的
requestExecutor 我们进一步额外需要的executor 数目
killExecutors 删除 集合中指定的 executor
killExecutor 根据 executorId删除 executor

触发调整的机制

这里的机制其实类似于操作系统对进程的调整, 属于一种抢占式调整, 而不是协作式调整, 是定时触发调整,而不是一个task 让出executor的时候触发的调整。

积压时增加 executor 过程

  • 根据 totalPendingTasks 和 totalRunningTasks 的和(这个信息也是从 监听器中获取的)除以 每个 executor 的 core 数目来决定总共需要多少个 executor,
  • 如果需要的 executor数 跟上次 如果需要的 executor 一样,就什么也不通知 决策执行者, 如果比上次少, 需要 通过 requestTotalExecutors, 更新 我们需要的 Executors数目, 然后传递给决策执行者去调整,
  • 如果本轮目标executors数目比上轮多,如果首次积压时间没有超时, 什么也不干, 如果首次积压时间超时,进行实际的增加, 并且把下次的递增时间设置为当前时间 加上 sustainedSchedulerBacklogTimeout 时间
  • 如果本轮目标executors数目 大于 上限, 两者取其大
  • 如果在调整过程中, 本轮目标executors数目和上一轮一样, 把调整幅度降低 为 1
  • 把 本地递增幅度 * 2 作为 下次的递增幅度
  • 把 我们需要的 Executors数目, 然后传递给决策执行者去调整,

可以看到整个调整过程过程中,调整幅度以2倍递增, 如果到达上限, 立马将为1,

删除 executor 过程

  • 监听器在一个 task 运行结束的时候, 因为一个executor 可以配置多个core, 就会看下 当前 task 使用的 executor 上面还有没有运行其他的 task, 如果没有,就把这个 executor 标记为 idle 空闲状态, 并且放在 removeTimes中, 里面保存 executorid 和 空闲超时时间, 这个空闲超时时间设置为 now + executorIdleTimeoutS * 1000
  • 定时触发调整的时候,removeTimes 中保留没有 空闲超时的 executor, 把已经 空闲超时的 executor 放到 executorIdsToBeRemoved 中, 遍历删除
  • 如果删除的过程中发现到了下限, 就停止删除,

我们可以看到, 删除过程很简单, 就是根据 executor 空闲了一定的时间, 没有 task 运行, 就依次删除

怎么解决要删除 executor 中的状态

碰到状态一般都很麻烦, 做服务我们也要尽可能内存中没有状态, 状态意味着内存中有数据, 当前进程不能杀掉,

这里删除 executor 的时候会碰到什么状态问题呢

  • shuffle 数据我们碰到的问题就是 一个 executor 已经 shuffle write 完了之后, 我们如果要依赖这个 executor 读取数据, 那么这个 executor就是不能删除的, 即使这个executor 很长时间都是空闲的

  • cache 数据

这个问题在动态资源调整机制中是这样解决的

  • 对于 shuffle write 数据, 我们可以启动一个辅助进程, 负责对 shuffle 数据的读取进行服务, 也就是由外部的ShuffleService来充当读取Server的功能,并由专门的ExternalShuffleClient来与其进行交互,从而获取到相应Block数据。 其实外部的ShuffleService最终是来自Hadoop的AuxiliaryService概念,AuxiliaryService为计算节点NodeManager常驻的服务线程,早期的MapReduce是进程级别的调度,ShuffleMap完成shuffle文件的输出以后,即立即退出,在ShuffleReduce过程中由谁来提供文件的读取服务呢?即AuxiliaryService,每一个ShuffleMap都会将自己在本地的输出,注册到AuxiliaryService,由AuxiliaryService提供本地数据的清理以及外部读取的功能。在目前Spark中,也提供了这样的一个AuxiliaryService, YarnShuffleService, 这个服务注册了一个 netty的 ExternalShuffleBlockHandler, 通过 ExternalShuffleBlockResolver 来解析识别 shuffle write的数据。
  • cahche 问题解决方案就相对简单很多, 在把executor 标记为 空闲状态的时候, 会去blockManager 中去看下当前 executor 中是否cache了数据, 如果有cache, 会根据 cachedExecutorIdleTimeoutS 来设置 空闲超时时间, 如果也超时了, 就会把 executor 干掉, cache 数据会根据依赖链 重算。

原创精品,首发个人公众号 spark技术分享 , 同步个人网站 coolplayer.net ,未经本人同意,禁止一切转载

欢迎关注公众号 spark技术分享

欢迎大家关注:sunbiaobiao's微信公众号