spark bulkload 写入hbase

为什么

  • BulkLoad不会写WAL,也不会产生flush以及split。
  • 如果我们大量调用PUT接口插入数据,可能会导致大量的GC操作。除了影响性能之外,严重时甚至可能会对HBase节点的稳定性造成影响。但是采用BulkLoad就不会有这个顾虑。
  • 过程中没有大量的接口调用消耗性能
  • 可以利用spark 强大的计算能力

原理

上面是一个总的执行流程图, 数据生成,HFile转换以及HFile加载, 下面是HFile 的格式, 就是个key value 存储结构,
key 是由行健column family 和限定符指定, 然后再加上key的索引,

数据生成

我的源数据是个域名数据库,表示域名的归属, 数据json举例 {“domain”:”www.a.com”, “uid”:”12345678”} ,
这些数据在线上一个服务的接口可以获取
创建了两个actor, 一个actor 使用scalaj库发http请求获取数据, 发送给另外一个actor输出到一个文本里面,
由于数据里面只有250w条数据, 我给每个域名后面添加一个uuid字符串, 把数据放大40倍,最终生成一个1亿条数据的文件
大约10个G, 每行类似 www.a.com,12345678上传hdfs备用,
case class DomainId(domain:String, id:String)
processMessage ! DomainId(domain, uid)

HFile转换

val domainUid = sc.textFile("/fusionlog/Test/Hbase/domainUid.txt").map { x =>
val a = x.split(",")
val domain = a(0)
val uid = a(1)
(domain, uid)
}
val result = domainUid.distinct().sortByKey(numPartitions = 1).map{ x =>
val domain = x._1
val uid = x._2
val kv : KeyValue = new KeyValue(Bytes.toBytes(domain), "traffic".getBytes(), "uid".getBytes(), uid.getBytes())
(new ImmutableBytesWritable(Bytes.toBytes(domain)), kv)
}
result.saveAsNewAPIHadoopFile("/log/Test/Hbase/result", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)

主要流程就是加载数据, 生成域名和用户keyvalue, 然后转换为hbase 的keyvalue 对象, 这个是hbase的一个基础类型
hadoop 处理流程一般分为 输入, combine, map, reduce 和输出, 这里就是套用了 hadoop的输出api,

saveAsNewAPIHadoopFile -> saveAsNewAPIHadoopDataset , 里面会初始化输出类, 然后调用 输出类的 getRecordWriter
获取一个writer, 然后在各个分区上面迭代输出
这里的输出类是 HFileOutputFormat -> getRecordWriter -> HFileOutputFormat2.createRecordWriter 里面的writer
做的事基本上就是构造上图中keyvalue和索引, 而且key是有序的, 按照字典序,否则会报错,

这里跟 mapreduce 处理过程不一样的是, Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。
目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作,
不然会报错,

Added a key not lexically larger than previous.

我在处理的过程中发现key有重复, 会报错, 就先distinct 了一下, 注意者是一个shuffle操作, 会消耗性能

HFile加载

一旦HFile转换完成,我们就可以将其加载至HBase集群中。加载的方式其实十分简单,就是将HFile移动至HBase对应的RegionServer的存储目录下,所以往往该操作执行地十分快。我们可以通过命令行调用completebulkload工具加载,也可以通过代码执行。
我这里使用 completebulkload 工具加载, 一亿条数据只用了不到3秒钟,看了下代码里面是一个队列,是个异步的过程
hbase在后台慢慢执行

这里我碰到了个麻烦, 我给了 hdfs的一个相对路径 /log/Test/Hbase/result

val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "")
conf.set(TableOutputFormat.OUTPUT_TABLE, outputTable)
val table = new HTable(conf, outputTable)
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/log/Test/Hbase/result"), table)

我开始的时候是使用代码load,没有任何输出, hbase shell 里面看也没有数据, 然后我使用 completebulkload 工具加载
看错误信息一致在retry, 看着像连接 regionserver 连接不上, 我找到 这个hbase table 对应的region的机器, 登录节点
看了下 regionserver的日志, 原因是 hbase 从hdfs那里拿到的是一个 hdfs://ourhadoop/log/Test/Hbase/result 路径,然后
报解析不了ourhadoop, 估计跟hdfs-site里面的配置有关系,

<property>
<name>dfs.nameservices</name>
<value>ourhadoop</value>
</property>

我使用 hdfs://node:8020/log/Test/Hbase/result 就解决掉了,

总结

BulkLoad将原来位于HBase集群的写入消耗挪到了spark集群上,可以有效利用spark强大的计算力, 和线性扩展能力, 并且通过直接拷贝HFile的方式,极大地提高了数据的写入速度。BulkLoad通常用在HBase表的预初始化,增量数据定时导入以及数据迁移等涉及到大规模数据批量导入的场景。

欢迎大家关注:sunbiaobiao's微信公众号