spark 分布式的基础-通信系统 rpc

spark rpc 的设计

spark 历史上内部使用过两种 rpc 实现,

  • 基于 akka actor 的rpc
  • 基于netty 自己封装的rpc

akka 是一个支持高并发的异步消息通信系统, 基于 scala 构建, 它是基于协程的,性能不容置疑;基于scala的偏函数,易用性也没有话说,但是它毕竟只是RPC通信,无法适用大的package/stream的数据传输,这也是Spark早期引入Netty的原因。

也就是说在早期 spark rpc调用使用的是 akka, 大文件数据流传输使用的 netty, 这样不统一的网络编程方式对于 spark设计者们来讲是不完美的。再加上很多Spark用户饱受Akka复杂依赖关系的困扰,所以后来干脆就直接用Netty代替了Akka, 当然借鉴了很多 akka中的一些设计思路。

我们本文主要介绍一下 spark 使用 netty 自己构建 rpc系统的一些设计和实际。

我们来看下一个经典的学生发送消息给教师的系统图, 里面的几个角色和概念, 已经在这个系统中起到的作用,

  • 学生小红
  • 老师老王
  • 老师的引用
  • 消息分发器
  • 邮箱
  • 整个系统

通俗语言来描述一下过程

  1. 学生小红可以根据 老师老王的名字从 系统中获取到一个引用, 然后发送消息到这个引用上,
  2. 所有引用的消息都发送的分发器
  3. 分发器根据消息的目标投递到对应的邮箱
  4. 然后老师从邮箱中取出邮件获得消息

翻译成计算机语言就是

  1. 客户端根据 url 地址获取到 对端的endpoint的一个引用 endpointRef, 然后再这个对象上调用 send, 发生了一次网络请求,传输消息到服务端
  2. 服务端监听端口,获取到所有的消息,然后调用内部的分发器,分发器根据先前注册的信息获取对应的 endpoint
  3. dispatcher 把消息和对应的 endpoint 包装在一起放进队列中
  4. 然后一个无限循环线程取队列中的item,然后解构出item中的消息和endpoint, 然后调用这个 endpoint 处理逻辑处理消息。

这就是整个 异步rpc 系统运行的原理, akka actor 和 spark rpc 的实现方式都比较类似, 不过 spark rpc 实现方式相对简化了好多。

spark rpc 的内部实现

这里是 spark rpc 的实现的类图,

这里面的一些类和上面我们提到的一些的角色的对应关系

  • 整个系统 -> RpcEnv
  • 老师的引用 -> RpcEndpointRef
  • 老师 -> RpcEndpoint
  • 消息分发器 -> Disatcher
  • 邮箱 -> Inbox
  • 每个老师都有自己的邮箱 -> 每个 RpcEndpoint 都有自己的 Inbox
  • 老师到分发器上声明注册自己 -> RpcEndpoint 注册到 RpcEnv 系统里面
  • 系统包含以上角色 -> RpcEnv 包含自己的 Dispatcher

netty 是作为 rpc的一种实现方式

  • RpcEnv 的实现 NettyRpcEnv
  • RpcEndpointRef 的实现 NettyRpcEndpointRef

下面是一些内部实现代码片段

private[spark] abstract class RpcEnv(conf: SparkConf) {
def address: RpcAddress
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef
}
  • 每个 RpcEnv 拥有自己的 地址
  • RpcEndpoint 可以调用 RpcEnv 的 setupEndpoint 方法注册自己
  • 学生实例 调用 RpcEnv 的 setupEndpointRef方法获取老师的 引用
private[spark] abstract class RpcEndpointRef(conf: SparkConf) {
def send(message: Any): Unit
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
}

获得一个引用, 可以给它发送消息, 这里有两种模式,

  • send and forget 就是发送完消息就不管了
  • send and wait 发送消息后阻塞等待回应,并且可以设置超时时间
private[spark] trait RpcEndpoint {
def receive: PartialFunction[Any, Unit]
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]
}

RpcEndpoint 的拿到一条消息后,处理, 或者处理后应答。

下面我们来看下 netty 的实现方式

上图是内部的流程, nettyPrcEnv 启动的时候会设置一个 handler 为 NettyRpcHandler

这个 handler 的 receive 方法就是调用 dispatcher.postOneWayMessage

val data = endpoints.get(endpointName)
data.inbox.post(message)
receivers.offer(data)

这里的逻辑根据不同的 处理者名称 获取到对应的EndpointData(EndpointData是endpoint 和 自己的inBox一个 包装), 然后把消息放入这个 RpcEndpoint 的消息队列中,,一般情况下,简单业务可以在RpcHandler直接完成请求的处理,但是考虑一个RpcEnv的Server上会挂载了很多个RpcEndpoint,每个RpcEndpoint的RPC请求频率不可控,因此需要对一定的分发机制和队列来维护这些请求,其中Dispatcher为分发器,InBox即为请求队列;

这里每个 endpoint 和 自己的inBox 包装为一个 EndpointData, dispatcher 根据名字获取的对应的 EndpointData, 也就是这个 endpoint 有消息了,把消息放入这个endpoint 的inBox, 然后作为一个整体,往 receivers 队列里面插入一次,等待处理

spark 内部就会有多个 RpcEndpoint, 所以也需要设置多个 InBox,上面的receivers 里面就是存储每个
RpcEndpoint 和 自己的inBox,

ThreadPoolExecutor线程池中的多个线程并发的从 receivers 拿出消息,走后面的流程

val data = receivers.take()
data.inbox.process(Dispatcher.this)

从receivers 里面take出内容, 然后解出来每个 消息 和 endpoint 进行处理。

每个 Inbox 内有对应的 RpcEndpoint, 其实处理里面就是调用 这个 RpcEndpoint 偏函数上的处理方法, 会根据消息的类型进行模式匹配, 然后进入匹配到的处理分支

spark rpc 在 spark 内部的使用

总管家 sparkEnv

sparkEnv 像一个胶水一样把各个模块粘在一起, 方便各个模块之间进行交互,

在new sparkEnv 的时候, 也就会对这些模块就行初始化,

这个 sparkEnv 会分别在 drive 和 executor上都会初始化, 从而构建出一个分布式的系统, 启动的时候会根据环境变量知道自己是在 drive 上 还是在 executor上, 从而启动不同的组件, slave实例中会保存 master 服务的 RpcEndpointRef, 从而可以远程调用 master上面的服务。

我们随便找几个子模块, 来看看这些子模块是怎么构建在 spark rpc 上来达到相互通信, 从而 master 和 slave 上一起完成一个分布式子功能模块。

这里我分为两种,

  • 单向沟通
  • 双向沟通

单向沟通 - MapOutputTracker服务

单向沟通的组件一般是slave 跟 master 汇报自己的状态,slave 在启动的时候会获取到 master的 RpcEndpointRef 调用方向是 slave 远程调用 master的方法,

sparkEnv 里面有这样一个方法

def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}

比较关键, 如果 sparkEnv 是在 drive上启动的, 就注册一个 Endpoint, 如果是在executor上启动的时候, 只获取一个 EndpointRef, 有了这个引用后, executor就可以远程调用 drive上的方法了

MapOutputTracker 是一个 记录 shuffer 中间结果信息的一个服务, 这里 driver rpcEnv 上有一个
MapOutputTrackerMasterEndpoint rpc 服务 , executor 启动的时候根据 地址, 获取到 引用, 然后就可以远程 rpc 调用, 调用driver上的GetMapOutputStatuses 方法, 获取存储在driver上的 上个stage shuffer writer 数据的数据位置,

双向沟通- BlockaManager 分布式存储系统

双向沟通是slave和 master 之间会互相沟通, master 向 slave 传达指令, slave 也会向 master 发生调用,

上图是 sparkEnv 在 master上启动的时候, 构造了一个 BlockManagerMasterEndpoint

上图是 sparkEnv 在executor上启动的时候, 通过 rpcEnv.setupEndpointRef 获取到了 BlockManagerMaster的引用 BlockManagerMasterRef,

而且 executor new 自己的 BlockManager的时候, 会通过 rpcEnv.setupEndpoint 注册到自己的rpcEnv, 当如也会把自己的 setupEndpointRef 通过 master.registerBlockManager 汇报给 drive,

这里的executor 的逻辑不知道大家看明白没有,

启动 executor 的时候, 只有 driverHostdriverPort 组成的driverUrl

  • 根据 driverUrl 和 BlockManagerMaster 名字搞到, BlockManagerMaster的引用BlockManagerMasterRef
  • executor new 自己的 EndPoint 的时候, 通过 BlockManagerMasterRef 向 drive 注册自己。
  • drive 收到 executor 的注册, 保存在对应的 BlockManagerInfo中,

上图中是 启动完之后, driver 和 executor 之前分别有对方的 RpcEndpoint 的引用, 从而就可以远程交互调用,从而为 spark 提供分布式存储功能。

当然 driver 上会用多个 BlockManagerInfo 保存多个 executor BlockManager 的引用, 这里只是简化了一下。

参考

欢迎大家关注:sunbiaobiao's微信公众号