程序员的自我修养
Home » Apache Kafka, Apache Spark, Scala语言 » KafkaUtils.createDirectStream

KafkaUtils.createDirectStream

9条评论16,475次浏览

官网上对这个新接口的介绍很多,大致就是不与zookeeper交互,直接去kafka中读取数据,自己维护offset,于是速度比KafkaUtils.createStream要快上很多。但有利就有弊:无法进行offset的监控。

项目中需要尝试使用这个接口,同时还要进行offset的监控,于是只能按照官网所说的,自己将offset写入zookeeper。

方法1

这个方法只有3个参数,使用起来最为方便,但是每次启动的时候默认从Latest offset开始读取,或者设置参数auto.offset.reset="smallest"后将会从Earliest offset开始读取。

显然这2种读取位置都不适合生产环境。

方法2

这个方法可以在启动的时候可以设置offset,但参数设置起来复杂很多,首先是fromOffsets: Map[TopicAndPartition, Long]的设置,参考下方代码。

其中getMaxOffset方法是用来获取最大的offset。当第一次启动spark任务或者zookeeper上的数据被删除或设置出错时,将选取最大的offset开始消费。代码如下:

然后是参数messageHandler的设置,为了后续处理中能获取到topic,这里形成(topic, message)的tuple:

接着将从获取rdd的offset并写入到zookeeper中:

最后附上batchSave的示例:

(转载本站文章请注明作者和出处 程序员的自我修养 – SelfUp.cn ,请勿用于任何商业用途)
标签:, ,
9条评论
  1. 匿名说道:

    :!: 网站做的挺漂亮的

  2. 匿名说道:

    很不错,试问有java版的吗?

  3. 匿名说道:

    ZkUtils.getPartitionsForTopics(zkClient, Config.kafkaConfig.topic) 你确定 kafka 里面有这个类 ? 个人在kafka 最新 稳定版 几个版本中都没有找到对应的接口 请指教

    • 匿名说道:

      ZkUtils.getPartitionsForTopics(zkClient, Config.kafkaConfig.topic)
      那个方法是在 spark-streaming_2.10 中 kafka 中没有对应的方法
      个人建议 把对应的 import 带上

  4. 匿名说道:

    Config.kafkaConfig.kafkaGroupId 这个是指自己配置的group id 还是从 import org.apache.kafka.common.config.Config 这个类
    如果是import org.apache.kafka.common.config.Config 这里面没有对应的API

  5. 匿名说道:

    java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
    org.I0Itec.zkclient.ZkClient
    val zkClient = new ZkClient(KafkaProperties.ZookeeperQuorums,5000,1000,ZKStringSerializer)这段代码该如何处理?。。。求大哥大姐指导。。。

发表评论给匿名


profile
  • 文章总数:79篇
  • 评论总数:254条
  • 分类总数:31个
  • 标签总数:44个
  • 运行时间:1192天

大家好,欢迎来到selfup.cn。

这不是一个只谈技术的博客,这里记录我成长的点点滴滴,coding、riding and everthing!

最新评论
  • Anonymous: :arrow: :neutral: :cry:
  • Anonymous: java.io.NotSerializableExcepti on: DStream checkpointing has been enabled but the DStreams with their...
  • wick: HI,请问一下,U,S,V得到后,怎么得到近似矩阵呢(用sp ark java),谢谢。
  • Michael Whitaker: Thank you for this blog, it was very helpful in troubleshooting my own issues. It seems that no...
  • Anonymous: :mad:
  • Anonymous: :???:
  • Anonymous: :mad: :mad: :mad:
  • 洋流: 哥们,我问个问题,你把testOnborrow去掉了。。如果 得到的jedis资源是个不可用的,服务从来都不出问题么?
  • 洋流: 哥们,我问个问题,你把testOnborrow去掉了。。如果 得到的jedis资源是个不可用的,服务从来都不出问题么?
  • Anonymous: :razz: :evil: :grin:
  • 张瑞昌: 有很多,比较常见的是Jacob迭代法,一次迭代O(n^3), 迭代次数不清楚。 如果是手动算的话按照定义求就可以了
  • Anonymous: :mrgreen:
  • lc277: 你好 我想问下一般删除节点要多久,要删除的datanode大概用了 1t,解除授权已经30多小时还没完成,请问是出现什么问题了吗 麻烦告诉下谢谢 qq1844554123
  • Anonymous: 你好 我想问下一般删除节点要多久,要删除的datanode大概用了 1t,解除授权已经30多小时还没完成,请问是出现什么问题了吗
  • Anonymous: :smile: :grin: :eek:
  • 李雪璇: 想要完整代码,可以帮忙发给我吗
  • Anonymous: 请问一下,那个 user的推荐结果楼主查看了么? 为什么输入数据 最高是五分,输出结果都是7分8分啥的?怎么设置输出的分数的最 大值?
  • Anonymous: 那个 user的推荐结果楼主查看了么? 为什么输入数据 最高是五分,输出结果都是7分8分啥的?
  • Anonymous: stopGracefullyOnShutdown在yarn- client模式下我测试的无效,你的呢
  • Anonymous: 另外,import的lib包能否发个列表.