程序员的自我修养
Home » Apache Kafka, Apache Spark, Scala语言 » KafkaUtils.createDirectStream

KafkaUtils.createDirectStream

8条评论13,032次浏览

官网上对这个新接口的介绍很多,大致就是不与zookeeper交互,直接去kafka中读取数据,自己维护offset,于是速度比KafkaUtils.createStream要快上很多。但有利就有弊:无法进行offset的监控。

项目中需要尝试使用这个接口,同时还要进行offset的监控,于是只能按照官网所说的,自己将offset写入zookeeper。

方法1

这个方法只有3个参数,使用起来最为方便,但是每次启动的时候默认从Latest offset开始读取,或者设置参数auto.offset.reset="smallest"后将会从Earliest offset开始读取。

显然这2种读取位置都不适合生产环境。

方法2

这个方法可以在启动的时候可以设置offset,但参数设置起来复杂很多,首先是fromOffsets: Map[TopicAndPartition, Long]的设置,参考下方代码。

其中getMaxOffset方法是用来获取最大的offset。当第一次启动spark任务或者zookeeper上的数据被删除或设置出错时,将选取最大的offset开始消费。代码如下:

然后是参数messageHandler的设置,为了后续处理中能获取到topic,这里形成(topic, message)的tuple:

接着将从获取rdd的offset并写入到zookeeper中:

最后附上batchSave的示例:

(转载本站文章请注明作者和出处 程序员的自我修养 – SelfUp.cn ,请勿用于任何商业用途)
标签:, ,
8条评论
  1. 匿名说道:

    :!: 网站做的挺漂亮的

  2. 匿名说道:

    很不错,试问有java版的吗?

  3. 匿名说道:

    ZkUtils.getPartitionsForTopics(zkClient, Config.kafkaConfig.topic) 你确定 kafka 里面有这个类 ? 个人在kafka 最新 稳定版 几个版本中都没有找到对应的接口 请指教

    • 匿名说道:

      ZkUtils.getPartitionsForTopics(zkClient, Config.kafkaConfig.topic)
      那个方法是在 spark-streaming_2.10 中 kafka 中没有对应的方法
      个人建议 把对应的 import 带上

  4. 匿名说道:

    Config.kafkaConfig.kafkaGroupId 这个是指自己配置的group id 还是从 import org.apache.kafka.common.config.Config 这个类
    如果是import org.apache.kafka.common.config.Config 这里面没有对应的API

发表评论


profile
  • 文章总数:79篇
  • 评论总数:331条
  • 分类总数:31个
  • 标签总数:44个
  • 运行时间:1072天

大家好,欢迎来到selfup.cn。

这不是一个只谈技术的博客,这里记录我成长的点点滴滴,coding、riding and everthing!

最新评论
  • 增达网: 受教了!呵呵!
  • Anonymous: :!: :smile: :oops: :grin: :eek: :shock:
  • 27: :razz: dsa会报错,rsa不会
  • Anonymous: 看错了 忽略我
  • Anonymous: UserSideCF这个类在哪里
  • 晴子: 在主节点初始化CM5数据库的时候报错误:Verifying that we can write to /opt/cm-5.9.0/etc/cloudera-scm -server log4j:ERROR Could not...
  • zhangnew: 就4题 :?:
  • linxh: “ 但要是遇到预先并不知道数组的长度而又需要获取正确的(或者称之 为原始的)split长度时,该如何处理呢。。? ” 印象中可以split函数参数传-1?
  • linxh: 班门弄斧一下: ssh host cmd 和直接ssh上后cmd结果不一样是因为ssh直接运行远程命令 是非交互非登录模式与ssh上去得到一个登录交互式Shell二 者加载的环境变量不一样。
  • 匿名: 其实文本分类和数字分类是一样的,只是文本分类需要多一个步骤, 就是计算它的tf-idf值将其转换为double类型
  • yurnom: 可能苹果最近又改变了返回值吧,最近没做测试了。 BadDeviceToken一般测试环境和正式环境弄错的情况 下会出现。
  • Anonymous: :razz: 博主,良心贴啊, 最近也在弄apns推送。 有个问题想请教你一下啊。 你博客中写的 Unregistered 错误,有准确的说明吗, 我看你博客中写的:...
  • 一波清泉: 回复邮箱: 1004161699@qq.com 多谢
  • Anonymous: 17/02/09 01:15:02 WARN Utils: Service ‘SparkUI’ could not bind on port 4040. Attempting port...
  • pacificLee: :twisted:
  • 小码: 为什么没有后面的呢,只有前10个
  • Anonymous: :lol:
  • Anonymous: :razz: 楼主是属于会聊天的。 我想问,sqoop发了几个版本了,应该没这些问题了吧。
  • Anonymous: Config.kafkaConfig.kafkaGroupI d 这个是指自己配置的group id 还是从 import org.apache.kafka.common.config .Config 这个类...
  • Anonymous: ZkUtils.getPartitionsForTopics (zkClient, Config.kafkaConfig.topic) 那个方法是在 spark-streaming_2.10 中 kafka...