spark 记录一次 jar exists and does not match contents of 错误定位

一个偶发的错误,

org.apache.spark.SparkException: File ./fusionlogv50-2016-10-12T15:11:32.779+08:00.jar exists and does not match contents of http://10.34.41.37:22726/jars/fusionlogv50-2016-10-12T15:11:32.779+08:00.jar

我们的运行的模式是 spark on yarn , 而且是 yarn-client 运行的

看了下spark的源码,整体机制是 如果jar包不在 hdfs上面,在 drive的本地, spark 就会创建一个https服务器, 在分发 task的时候,excutor 服务进程 CoarseGrainedExecutorBackend 发现 jar不在excutor本地缓存目录里面就会 去下载然后拷贝到指定位置, 猜测这个过程中哪一步出了问题。

定位过程

使用 ansible 分布去 md5 找了下不一致jar所在位置

ansible all -m shell -a "/bin/su - yarn -c ' md5sum /disk*/yarn/usercache/spark/appcache/application*/container*/fusionlogv50*'"

xs331 | success | rc=0 >>
6641c10bc315eec71d22b7ded3679f41 /disk11/yarn/usercache/spark/appcache/application_1475588526370_0146/container_1475588526370_0146_01_000280/fusionlogv50-2016-10-12T16:14:43.713+08:00.jar

发现了其中几个jar包的md5 明显和其他大部分不一样, 登上机器,

pasted image

里面有 下载后的jar包, jar -tf 了一下, 果然是个损坏的,

看了下 CoarseGrainedExecutorBackend的启动脚本 launch_container.sh

pasted image

里面的信息很多,你能找到错误日志文件, drive的host和端口, 对应的 executor-id

/disk3/yarn/logs/application_1475588526370_0146/container_1475588526370_0146_01_000082/stderr"

我找到错误日志文件,打开看看

里面的信息挺全的, 可以看到excutor的整个执行过程,

Connecting to driver: spark://CoarseGrainedScheduler@10.34.41.37:32458

启动后连接drive

org.apache.spark.network.netty.NettyBlockTransferService

创建 netty服务,这个是传大文件用的

BlockManagerMaster: Trying to register BlockManager

向drive 注册 blockManager ,这个是为shuffle做准备,因为shuffle write和 shuffle read的时候会通过 dirve中转文件位置信息

CoarseGrainedExecutorBackend: Got assigned task 58

获取 drive分配给本 excutor的序列化的 task, 下面触发下载执行 task用的 jar, 到了关键位置了

Utils: Fetching http://10.34.41.37:22726/jars/fusionlogv50-2016-10-12T15:11:32.779+08:00.jar to /disk1/yarn/usercache/spark/appcache/application_1475588526370_0141/spark-ba87c1e4-d6ea-4bfa-be77-dbcd145804c4/fetchFileTemp2883980223515665229.tmp

开始http下载文件,先把文件 下载到了 /disk1上面,

Utils: Copying /disk1/yarn/usercache/spark/appcache/application_1475588526370_0141/spark-ba87c1e4-d6ea-4bfa-be77-dbcd145804c4/19164301101476256334761_cache to /disk10/yarn/usercache/spark/appcache/application_1475588526370_0141/container_1475588526370_0141_01_000231/./spark-job-server.jar

然后拷贝文件到指定位置, 按道理下面就会加载jar包里面的类,然后执行自己的 input split的数据,

org.apache.spark.SparkException: File ./fusionlogv50-2016-10-12T15:11:32.779+08:00.jar exists and does not match contents of http://10.34.41.37:22726/jars/fusionlogv50-2016-10-12T15:11:32.779+08:00.jar

解开jar包的时候 先 校验一下, 发现内容不匹配,然后报错。

有点诡异,前面 下载拷贝过程中竟然没有报错,

没办法,到这里就没有线索了, 只能再扣代码,
源文件位置 apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala

里面 doFetchFile -> downloadFile -> copyStream

过程就是 建立连接, 然后 拷贝网络流

uc.setReadTimeout(timeoutMs)

里面设置了网络流读取超时时间,

conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000

这个 超时时间默认是 60s,我默默的估计了一下, 我们30台机器, 每台上面 9个 excutor, 我的jar包 50M, 如果一个 job 的task比较多, 这些点同时下载, 我的内网带宽估计撑不住,

spark.files.fetchTimeout

Communication timeout to use when fetching files added through SparkContext.addFile() from the driver.

spark.files.overwrite

Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source.

官方给了两个控制参数, 一个是控制超时时间的,一个是控制如果jar内容不一致是报错还是重新下载, 默认都是false

下面就是枯燥的是控制变量测试, 测试下来 时间设的很小的时候就会出现内容不一致报错, 大了就不会,

我设置了一个比较保守的 5m,

这个傻逼的问题到此打住。

欢迎大家关注:sunbiaobiao's微信公众号