程序员的自我修养
Home » 文章归档 » 2017年二月

spark streaming一些坑爹事

4条评论1,591次浏览

kafka无限重连

现象

spark任务/kafka集群会隔一段时间出错,日志基本都会显示too many open files。查看socket连接会发现会有几千到几十万的连接。

仔细的查看spark的运行日志,会发现有如下错误:

我们的kafka中每个topic都是12个分区,对于连接1个topic的spark streaming任务,每次刷出上面的日志后,通过命令netstat -antup |grep 9092或者ss -atp |grep 9092会发现对应的spark任务的连接数又多12个。

排查

基本可以确定是连接没有正确关闭导致的,gitlab上看了下最近的改动:由于每个batch的处理时间接近batch的间隔时间,所以把处理逻辑全部放到了线程池中去处理。大概的代码如下:

回滚了这个改动后,日志不再刷出上面的错误日志,查看连接数也一直是固定的12,正好对应12个kafka partition。于是将代码改成:

然后一切都正常了。

从结果来看,真正去kafka读取数据是发生在读取partition变量内的数据时。若直接将partition变量放入线程池,则导致无法使用已有的连接,然后就会重新建立连接,而且在读取完数据后也没有正确的关闭连接,从而导致了上面的错误。

stopGracefullyOnShutdown

为了能优雅的关闭spark streaming,可以SparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

然而坑爹的事情是,需要等到下一个batch才能优雅的关闭,不论当前的batch是否已经处理完。比如batch间隔是10分钟,每个batch处理时间是3分钟。若在10:00调用关闭任务的命令,spark不会在10:03执行完当前batch后关闭,而是直到下一个batch执行完才关闭,也就是10:13。对于batch间隔小于30s的倒是也能接受这样的设定,但对于10min这样的间隔,简直是无法忍受。

此外,还可以加上:

把stream当成集合

这里将永远也进入不了partition.foreach,因为partition.size会遍历一遍partition,而遍历一遍的stream就代表消费完毕了。╮(╯_╰)╭

分类:Apache Spark
标签:,
11
profile
  • 文章总数:79篇
  • 评论总数:254条
  • 分类总数:31个
  • 标签总数:44个
  • 运行时间:1193天

大家好,欢迎来到selfup.cn。

这不是一个只谈技术的博客,这里记录我成长的点点滴滴,coding、riding and everthing!

最新评论
  • Anonymous: :arrow: :neutral: :cry:
  • Anonymous: java.io.NotSerializableExcepti on: DStream checkpointing has been enabled but the DStreams with their...
  • wick: HI,请问一下,U,S,V得到后,怎么得到近似矩阵呢(用sp ark java),谢谢。
  • Michael Whitaker: Thank you for this blog, it was very helpful in troubleshooting my own issues. It seems that no...
  • Anonymous: :mad:
  • Anonymous: :???:
  • Anonymous: :mad: :mad: :mad:
  • 洋流: 哥们,我问个问题,你把testOnborrow去掉了。。如果 得到的jedis资源是个不可用的,服务从来都不出问题么?
  • 洋流: 哥们,我问个问题,你把testOnborrow去掉了。。如果 得到的jedis资源是个不可用的,服务从来都不出问题么?
  • Anonymous: :razz: :evil: :grin:
  • 张瑞昌: 有很多,比较常见的是Jacob迭代法,一次迭代O(n^3), 迭代次数不清楚。 如果是手动算的话按照定义求就可以了
  • Anonymous: :mrgreen:
  • lc277: 你好 我想问下一般删除节点要多久,要删除的datanode大概用了 1t,解除授权已经30多小时还没完成,请问是出现什么问题了吗 麻烦告诉下谢谢 qq1844554123
  • Anonymous: 你好 我想问下一般删除节点要多久,要删除的datanode大概用了 1t,解除授权已经30多小时还没完成,请问是出现什么问题了吗
  • Anonymous: :smile: :grin: :eek:
  • 李雪璇: 想要完整代码,可以帮忙发给我吗
  • Anonymous: 请问一下,那个 user的推荐结果楼主查看了么? 为什么输入数据 最高是五分,输出结果都是7分8分啥的?怎么设置输出的分数的最 大值?
  • Anonymous: 那个 user的推荐结果楼主查看了么? 为什么输入数据 最高是五分,输出结果都是7分8分啥的?
  • Anonymous: stopGracefullyOnShutdown在yarn- client模式下我测试的无效,你的呢
  • Anonymous: 另外,import的lib包能否发个列表.