kafka无限重连
现象
spark任务/kafka集群会隔一段时间出错,日志基本都会显示too many open files。查看socket连接会发现会有几千到几十万的连接。
仔细的查看spark的运行日志,会发现有如下错误:
1 |
17/01/23 12:26:15 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException |
我们的kafka中每个topic都是12个分区,对于连接1个topic的spark streaming任务,每次刷出上面的日志后,通过命令
排查
基本可以确定是连接没有正确关闭导致的,gitlab上看了下最近的改动:由于每个batch的处理时间接近batch的间隔时间,所以把处理逻辑全部放到了线程池中去处理。大概的代码如下:
1 2 3 4 5 |
stream.foreachRDD { rdd => rdd.foreachPartition { partition => pool { // do something with partiton} } } |
回滚了这个改动后,日志不再刷出上面的错误日志,查看连接数也一直是固定的12,正好对应12个kafka partition。于是将代码改成:
1 2 3 4 5 6 7 8 |
stream.foreachRDD { rdd => rdd.foreachPartition { partition => val list = partition.toList pool { // do something with list } } } |
然后一切都正常了。
从结果来看,真正去kafka读取数据是发生在读取partition变量内的数据时。若直接将partition变量放入线程池,则导致无法使用已有的连接,然后就会重新建立连接,而且在读取完数据后也没有正确的关闭连接,从而导致了上面的错误。
stopGracefullyOnShutdown
为了能优雅的关闭spark streaming,可以
然而坑爹的事情是,需要等到下一个batch才能优雅的关闭,不论当前的batch是否已经处理完。比如batch间隔是10分钟,每个batch处理时间是3分钟。若在10:00调用关闭任务的命令,spark不会在10:03执行完当前batch后关闭,而是直到下一个batch执行完才关闭,也就是10:13。对于batch间隔小于30s的倒是也能接受这样的设定,但对于10min这样的间隔,简直是无法忍受。
此外,还可以加上:
1 2 3 4 5 6 |
sys.addShutdownHook { info.info("Gracefully stopping Application...") ssc.stop(stopSparkContext = true, stopGracefully = true) // wait something info.info("Application stopped gracefully") } |
把stream当成集合
1 2 3 4 5 6 7 8 9 |
stream.foreachRDD { rdd => rdd.foreachPartition { partition => println(s"debug: partition size: ${partition.size}") partition.foreach { item => // do something } } } |
这里将永远也进入不了
受教了!呵呵!
stopGracefullyOnShutdown在yarn-client模式下我测试的无效,你的呢
sdfs
楼主你好,我偶尔也会遇到Reconnect due to socket error: java.nio.channels.ClosedChannelException这类问题,但是我没有直接用steam,而是对流进行了赋值给另外一个变量(而且还有了稍微的操作后赋值),而且查看9092也没有几万几十万的连接,也是那么几十个而已,郁闷了 查了N多资料还是...