请你停下了花几分钟读一读 spark 的代码

现在网上充斥中各种程度不一的混乱的信息, 害人不浅啊, 当然有的是因为讲的是 spark 早期的版本, spark 已经发展了那么长时间了, 早已经不是当年的阿蒙了, 有些人分不清,拿来各种 spark 剖析 和原理之类的博客来看, 看的是越来越糊涂,

前段时间有个人来问我,

spark 的 broadcast变量分发不是通过 http的么, driver 应该是一个 http server, server 怎么能向 executor 里面的 client 发送数据呢,

我说你是怎么有这个判断的, 怎么碰到这个问题的,

他说,他看了一篇博客,里面讲

Broadcast彻底解析,Broadcast就是将数据通过http的协议从一个节点发送到其他的节点上。例如Driver上有一张表,而Executor中的每个并行执行的Task(100万个)都要查询这张表,那我们通过Broadcast的方式就只需要往每个Executor把这张表发送一次就行了。Executor中的每个运行的Task查询这张唯一的表,而不是每次执行的时候都从Driver获得这张表!

不知道大家看到这样一段描述, 是怎么理解的, 反正我是觉得讲的很是似是而非,

我问你使用的是 spark 那个版本

他说使用的是最新的版本 spark 2.1.0

我说你等一下,让我翻下源码, 我在我的机器上找到 spark2.1.0的源码,

我们都知道 spark 使用 方式很简单,

val array: Array[Int] = ???
val broadcasted = sc.broadcast(array)
val rdd: RDD[Int] = ???
rdd.map(i => array.contains(i)) // 这种没有使用 broadcast, 每次 task 都要传一下 数组, 浪费内网带宽
rdd.map(i => broadcasted.value.contains(i))

上面的一个小的 demo 就是把一个 数组通过 broadcast 的方式,广播出去, 然后就可以在 task 里面使用数组变量了, 这个数组变量是缓存在 executor上的, 不用每次调度 task运行的时候都得传输一次 数组

我们可以看到这个 broadcast 的使用点, 无非就是 sc.broadcast 定义了一个 广播变量 和 broadcasted.value 使用广播变量的 value 方法,找到真正的数组数据

找到 sparkcontext 的 broadcast 方法, 这个方法返回了一个 Broadcast[T] 类型的变量, 里面是通过 broadcastManager new 出来的。

val bc = env.broadcastManager.newBroadcast[T](value, isLocal)

这个 broadcastManager 也是放在 env 里面的, 我们到 sparkEnv 里面找找, 打开 sparkEnv.scala 文件 ,关键字搜索 BroadcastManager, 还真找到了,

到里面看了一眼, 初始化方法里面, 默认现在 就使用的 TorrentBroadcastFactory , 根本就没有 使用 http 了, 更没有哪里有 http server, 官方最新都不使用 http 来传输广播变量了, 我们还操心那个干嘛,

网上一大堆的教程都是讲 什么 httpBoradcast 原理剖析的, 看这个有什么卵用, 除非你是想看下 spark是怎么一步步走过来的。

我这里科普一下

为了解决 HttpBroadast 中 driver 单点网络瓶颈的问题,Spark 又设计了一种 broadcast 的方法称为 TorrentBroadcast,这个类似于大家常用的 BitTorrent 技术。基本思想就是将 data 分块成 data blocks,然后假设有 executor fetch 到了一些 data blocks,那么这个 executor 就可以被当作 data server 了,随着 fetch 的 executor 越来越多,有更多的 data server 加入,data 就很快能传播到全部的 executor 那里去了。

如果还不明白, 你想想 你怎么用迅雷下电影下得那么快的, 就是把一个大电影分成一块块的, 这些小块分布在一些离你比较近的同时也在下载同一部电影的电脑上, 这样你就可以直接从你隔壁的张三电脑上取数据了, 这样是不是就快了,

如果还不明白, 那你就别想了,

我们能看到整个调用过程

sc.boradcast -> env.broadcastManager.newBroadcast -> broadcastFactory.newBroadcast -> TorrentBroadcastFactory.newBroadcast -> new TorrentBroadcast

一路调下去, 是new了一个 TorrentBroadcast,

我们看下 这个类的注释

A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].

The mechanism is as follows:

The driver divides the serialized object into small chunks and
stores those chunks in the BlockManager of the driver.

On each executor, the executor first attempts to fetch the object from its BlockManager. If
it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
other executors if available. Once it gets the chunks, it puts the chunks in its own
BlockManager, ready for other executors to fetch from.

This prevents the driver from being the bottleneck in sending out multiple copies of the
broadcast data (one per executor).

我就不翻译了, 大意就是把一个大的对象, 分成多个小块,放在多个 executor 上, 让相互离得近的互相下载, 免得 driver 成为下载的瓶颈, 想想是不是和迅雷下电影一样一样的。

` private val numBlocks: Int = writeBlocks(obj)

我们可以看到,这个 广播变量, 包裹了真正的变量, 在new 这个 TorrentBroadcast 的时候, 就调用了 writeBlocks 方法,

跟进去看下, 大体实现方式 , 就是把一个大的变量, 切分为多个小块, 每个 pieceId 代表一个小块, 这些小块的 broadcastId 相同, 所有 broadcastId 相同的多个小块 组合起来 就是这个大的变量本身,

写的时候, 加上了给每块加上了 校验码, 读取的时候, 比较一下, 看看块有没有损坏

差点忘了说了, 这里的底层存储使用的是 blockmanager, 我们都知道这是一个 分布式的存储系统, 读取的时候, 是就近拿数据的,

如果不了解的, 建议看下我之前发的文章,

blocakmanager 完全解析

抱歉目前不支持加链接,下面贴出 url地址

http://coolplayer.net/2017/03/30/spark-%E8%87%AA%E5%B7%B1%E7%9A%84%E5%88%86%E5%B8%83%E5%BC%8F%E5%AD%98%E5%82%A8%E7%B3%BB%E7%BB%9F-BlockManager/

以上是写, 使用广播变量的时候, 调用 broadcasted.value 这个 value 实际上是一个方法, 里面就是从 blockmanager 中根据 BroadcastBlockId 取出多个 块, 然后拼起来,

好了, 我这篇不是源码剖析篇, 只是用来讲一个道理的, 回到那个提的问题,

spark 的 broadcast分发不是通过 http的么, driver 应该是一个 http server, server 怎么能向 executor 里面的 client 发送数据呢 ?

我的答案, 就是

spark 最新的官方已经不使用 http了, 都是历史垃圾了, 也没有必要了解了, 数据 不是 server 发送到 client, 而是通过 blockmanager 分布式存储, 把大的文件分成多个小块, task 里面带的 broadcast变量只是一个 闭包, 实际数据不是存在里面的, 当在 executor 反序列化 task 的时候, 调用广播变量的 value 方法, 这个时候才实际去获取数组数据, 这个方法先在 本地 blockManager 里面去看看有没有 BroadcastBlockId, 如果有, 直接返回, 如果没有去 remote fetch, 然后将数据放在 本地的 blockManager 里面, 这样后面运行的 task 如果需要广播变量, 本地就能取到了,

看了源码是不是对 广播变量的 使用和机制 就一目了然了, 哪里有那么多疑问啊, 看下源码不就什么都清楚了,

与其在 网上看些乱七八糟的,(当然不包括我的剖析, 我保证我还是很负责的, 都是基于最新的代码,不懂的不说,过时的不说), 还不如自己去翻下代码, 来的快, 我带着这个问题, 看代码, 到搞明白花了不超过 10分钟,

如果看代码的时候, 代码太多看不过来, 就看下我的剖析指引一下, 但最终还是那句话,

请你停下了花几分钟读一读 spark 的代码,

原创精品,首发个人公众号 spark技术分享 , 同步个人网站 coolplayer.net ,未经本人同意,禁止一切转载

欢迎关注公众号 spark技术分享

欢迎大家关注:sunbiaobiao's微信公众号