spark streaming 将乱序消息有序存储 离线保证exact once 语义

流式计算的困境

在讨论解决消息乱序问题之前,需先定义时间和顺序。在流处理中,时间的概念有两个:

  • Event time :Event time是事件发生的时间,经常以时间戳表示,并和数据一起发送。带时间戳的数据流有,Web服务日志、监控agent的日志、移动端日志等;
  • Processing time :Processing time是处理事件数据的服务器时间,一般是运行流处理应用的服务器时钟。

上图中 time1,time2, time3等是我们spark straming 拿到消息将要处理的时间, 图中方块中的数字代表这个event 产生的时间, 有可能因为网络抖动导致部分机器上的日志收集产生了延迟, 在time3的batch中包含event time 为2的日志, 特别说明一下, kafka 中的不同分区的消息也是没有顺序的,

在实时处理过程中也就产生了两个问题

  • spark streaming 从kafka 中拉取到的一批数据,我们可能认为里面包含多个时间区间的数据,
  • 同一个时间的数据可能出现在多个 batch 中,

针对第一个问题, 一个 batch 中包含多个时间区间的数据, 加入我们的区间粒度是5分钟, 那么一个batch钟有可能包含 0~5 时间区间中的部分数据, 也有可能包含 5~10 时间区间中的部分数据, 这个很好处理,我们先对时间进行向下5分钟取整,然后使用取整后的时间分为多组, 然后计算出来指标,select time, count(*) group by 取整(time)

就算出来了这个batch中每个时间区间中的数据

但是对于第二个问题,就很麻烦, 图中举例, 时间区间中 2 出现在了 time2 和time3, 我们需要在两个batch中计算出2 的指标, 然后进行累计, 这个累计的过程, 你可以在内存中保存状态, 使用 spark streaming 中的 UpdateStateByKey等算子, 但是不推荐这样使用, 这样就在你的应用中引入了状态和 checkpoint机制, 还有一个方法, 就是把这个状态放在持久化存储中, 比如每次都在 redis, 或者 hbase 中进行累计,

spark streaming 从kafka中拉取处理日志可以做到不丢失日志,
参考我的文章

但是这种模式 很难保证 exact once

假如有下面一种情形,

就会存在这种情况, 我们对 job1 执行 checkpoint 操作, 然后 job1 被调度执行, 从kafak 拉取数据处理, 然后结果保存在 hbase 中, 保存了一半, 机器挂了, 如果重启,recover, 这时候 job1 就会被重复执行, kafka 中的数据就会被重复消费, hbase中的部分指标也就多加了一份,

虽然我们可以使用 spark 或者 flink 中提供的 watermark 功能

也就是维护一个窗口, 然后设置一个最大等待时间, T1 ~T4 中的数据到了最大等待时间后就会触发计算,
但是这样也会有问题, 如果部分数据的延迟超过了最大等待时间, 这部分数据也就永远的丢失了,

当然如果业务可以容忍, 那么使用这个功能也是可以的,每次都使用 全量覆盖操作

解决方案

以上我们面临的问题是 sparkstreaming + kaka 组合可以保证at lease once ,但是很难保证 exact once, 也就是会重复消费, 我们得想办法做到去重,

计算结果 落地存储会有两种模式,

  • append 增量的模式, 也就是每次我都做累加
  • complete的模式, 也即是我保证幂等性, 每次都是覆盖, 保证没有副作用,

因为同一个时间的数据可能出现在多个 batch 中,所以我们在准实时计算中, 只能是append 模式, 上文我们已经论证过了,这种模式会出现重复消费的问题,

由于机器挂了的现象是偶发的, 所以我们可以在挂掉后, 对数据进行离线修复, 也就是我们要保证有一份全量的离线数据,

这份数据我们要保证是不漏不多, 而且是按照event time 时间区间分开的, 这样我们就可以针对出问题的时间区间, 加载这个时间区间的离线数据, 算出结果, 然后进行覆盖。这样就保证了数据的准确性。

我们落地的数据的特点是

  • 全量的,不漏不多
  • 按照定义的时间区间分片

因为从kafka 中拉取存储能保证不丢, 这里我们考虑如何去重, 首先我们要对消息能有一个唯一 ID, 我们使用Kafka的partition加offset作为这个消息的唯一ID, 如果存储到hbase, 这样的话在生成一个消息的时候,我们的ID就不会重复,即使你重跑很多次,HBase会自动把它去重。

如果存储到 hdfs, 我们可以每行数据前面都用 ID 作为头字段, 离线处理的时候根据这个字段先进行去重处理,这样也能保证了 exact once 语义。

输出流程

我们看下 spark streaming 存储到 hdfs 或者 hbase 都会调用 saveAsHadoopDataset

val writer = new SparkHadoopWriter(hadoopConf)
writer.open()
Utils.tryWithSafeFinallyAndFailureCallbacks {
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
}(finallyBlock = writer.close())
writer.commit()

这里根据你传入的 OutFormat 调用 getwriter,

然后再 writer上调用 open write close commit 方法,

这里如果是 hbase 就是调用 hbase client 的写入方法

  • 用户提交put请求后,HBase客户端会将put请求添加到本地buffer中,符合一定条件就会通过AsyncProcess异步批量提交。HBase默认设置autoflush=true,表示put请求直接会提交给服务器进行处理;用户可以设置autoflush=false,这样的话put请求会首先放到本地buffer,等到本地buffer大小超过一定阈值(默认为2M,可以通过配置文件配置)之后才会提交。很显然,后者采用group commit机制提交请求,可以极大地提升写入性能,但是因为没有保护机制,如果客户端崩溃的话会导致提交的请求丢失。

  • 在提交之前,HBase会在元数据表.meta.中根据rowkey找到它们归属的region server,这个定位的过程是通过HConnection的locateRegion方法获得的。如果是批量请求的话还会把这些rowkey按照HRegionLocation分组,每个分组可以对应一次RPC请求。

  • HBase会为每个HRegionLocation构造一个远程RPC请求MultiServerCallable,然后通过rpcCallerFactory. newCaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束。

这里如果是 hdfs 文件写入

  • 首先根据 TaskAttemptID构造出来一个临时写入路径,构造一个文件流
  • 写入临时写入路径,
  • commit 的时候调用 commitTask 根据目标路径是否存在, 如果已经存在就删除临时文件,报错, 如果不存在就直接 rename, 把临时文件名, 改为目标文件名, 这里主要是防止多个分区写入同一个目标文件,导致的冲突。

多文件分组输出

如果我有一个需求,需要把数据根据不同的key输出到不同的文件中, 上文中,我们先根据 batch 进行分组, 然后不同分组的文件输出到不同的文件,这时候就需要用到 MultipleOutputFormat

TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
K actualKey = generateActualKey(key, value);
V actualValue = generateActualValue(key, value);
RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
if (rw == null) {
rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
this.recordWriters.put(finalPath, rw);
}
rw.write(actualKey, actualValue);

这里就是维护了一个TreeMap, 里面每个不同的key, 构造一个 writer, 这个writer 是
getBaseRecordWriter -> theTextOutputFormat.getRecordWriter
根据临时路径构造出一个输出流, 包装为一个 LineRecordWriter 最终的 writer就是在这个 DataOutputStream 上进行输出,

上层多文件输出根据不同的key, 从treeMap上获取到不同的文件输出流, 然后进行多文件输出
这里会存在一个问题, 同一个时间的数据可能出现在多个 batch 中, 就是会产生很多小文件,hdfs 对小文件支持很差,我们需要合并小文件,但是我们也可以直接在输出的时候进行 append 操作,就直接避免了产生小文件,

这里就需要改源码了,

上面的类图可以清楚的显示类图的关系, MultipleOutputFormat 的writer 会调用子类的 getBaseRecordWriter, 我们可以在这里改写一下, 使用我们自己的 TextOutputFormatNew 的 getRecordWriterNew 方法, 在方法里面构造输出流的时候, 如果文件已经存在,就进行 append 操作,

val fileOut: FSDataOutputStream = if (HDFSFileService.existsPath(file)) {
println("appendfile")
fs.append(file)
} else {
println("createfile")
fs.create(file, progress)
}
def getTaskOutputPath(job: JobConf, iname: String): Path = {
val name: String = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR)
val completePath = name + "/" + iname
val path = new Path(completePath)
path
}

把构造临时路径的方法也修改了, 强制不产生临时路径, 每次都往同一个文件中进行 append, 这样就达到了目的

欢迎大家关注:sunbiaobiao's微信公众号