官网上对这个新接口的介绍很多,大致就是不与zookeeper交互,直接去kafka中读取数据,自己维护offset,于是速度比
项目中需要尝试使用这个接口,同时还要进行offset的监控,于是只能按照官网所说的,自己将offset写入zookeeper。
方法1
1 2 3 4 5 6 7 8 9 |
def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] ): InputDStream[(K, V)] {...} |
这个方法只有3个参数,使用起来最为方便,但是每次启动的时候默认从Latest offset开始读取,或者设置参数
显然这2种读取位置都不适合生产环境。
方法2
1 2 3 4 5 6 7 8 9 10 11 |
def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = {...} |
这个方法可以在启动的时候可以设置offset,但参数设置起来复杂很多,首先是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
val topic2Partitions = ZkUtils.getPartitionsForTopics(zkClient, Config.kafkaConfig.topic) var fromOffsets: Map[TopicAndPartition, Long] = Map() topic2Partitions.foreach(topic2Partitions => { val topic:String = topic2Partitions._1 val partitions:Seq[Int] = topic2Partitions._2 val topicDirs = new ZKGroupTopicDirs(Config.kafkaConfig.kafkaGroupId, topic) partitions.foreach(partition => { val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" ZkUtils.makeSurePersistentPathExists(zkClient, zkPath) val untilOffset = zkClient.readData[String](zkPath) val tp = TopicAndPartition(topic, partition) val offset = try { if (untilOffset == null || untilOffset.trim == "") getMaxOffset(tp) else untilOffset.toLong } catch { case e: Exception => getMaxOffset(tp) } fromOffsets += (tp -> offset) logger.info(s"Offset init: set offset of $topic/$partition as $offset") }) }) |
其中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
private def getMaxOffset(tp:TopicAndPartition):Long = { val request = OffsetRequest(immutable.Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) ZkUtils.getLeaderForPartition(zkClient, tp.topic, tp.partition) match { case Some(brokerId) => { ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { case Some(brokerInfoString) => { Json.parseFull(brokerInfoString) match { case Some(m) => val brokerInfo = m.asInstanceOf[Map[String, Any]] val host = brokerInfo.get("host").get.asInstanceOf[String] val port = brokerInfo.get("port").get.asInstanceOf[Int] new SimpleConsumer(host, port, 10000, 100000, "getMaxOffset") .getOffsetsBefore(request) .partitionErrorAndOffsets(tp) .offsets .head case None => throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) } } case None => throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) } } case None => throw new Exception("No broker for partition %s - %s".format(tp.topic, tp.partition)) } } |
然后是参数messageHandler的设置,为了后续处理中能获取到topic,这里形成(topic, message)的tuple:
1 |
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) |
接着将从获取rdd的offset并写入到zookeeper中:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
var offsetRanges = Array[OffsetRange]() messages.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD(rdd => { rdd.foreachPartition(HBasePuter.batchSave) offsetRanges.foreach(o => { val topicDirs = new ZKGroupTopicDirs(Config.kafkaConfig.kafkaGroupId, o.topic) val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString) logger.info(s"Offset update: set offset of ${o.topic}/${o.partition} as ${o.untilOffset.toString}") }) }) |
最后附上batchSave的示例:
1 2 3 4 5 6 7 |
def batchSave(iter:Iterator[(String,String)]):Unit = { iter.foreach(item => { val topic = item._1 val message = item._2 ... }) } |
(转载本站文章请注明作者和出处 程序员的自我修养 – SelfUp.cn ,请勿用于任何商业用途)
赞
很不错,试问有java版的吗?
ZkUtils.getPartitionsForTopics(zkClient, Config.kafkaConfig.topic) 你确定 kafka 里面有这个类 ? 个人在kafka 最新 稳定版 几个版本中都没有找到对应的接口 请指教
ZkUtils.getPartitionsForTopics(zkClient, Config.kafkaConfig.topic)
那个方法是在 spark-streaming_2.10 中 kafka 中没有对应的方法
个人建议 把对应的 import 带上
Config.kafkaConfig.kafkaGroupId 这个是指自己配置的group id 还是从 import org.apache.kafka.common.config.Config 这个类
如果是import org.apache.kafka.common.config.Config 这里面没有对应的API
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.I0Itec.zkclient.ZkClient
val zkClient = new ZkClient(KafkaProperties.ZookeeperQuorums,5000,1000,ZKStringSerializer)这段代码该如何处理?。。。求大哥大姐指导。。。
Error:(36, 29) wrong number of type parameters for overloaded method value createDirectStream with alternatives:
[K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V]](jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], kafkaParams: java.util.Map[String,String], topics: java.util.Set[String])org.apache.spark.streaming.api.java.JavaPairInputDStream[K,V]
[K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V], R](jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], recordClass: Class[R], kafkaParams: java.util.Map[String,String], fromOffsets: java.util.Map[kafka.common.TopicAndPartition,Long], messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[K,V],R])org.apache.spark.streaming.api.java.JavaInputDStream[R]
[K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V]](ssc: org.apache.spark.streaming.StreamingContext, kafkaParams: Map[String,String], topics: Set[String])(implicit evidence$19: scala.reflect.ClassTag[K], implicit evidence$20: scala.reflect.ClassTag[V], implicit evidence$21: scala.reflect.ClassTag[KD], implicit evidence$22: scala.reflect.ClassTag[VD])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
[K, V, KD <: kafka.serializer.Decoder[K], VD R)(implicit evidence$14: scala.reflect.ClassTag[K], implicit evidence$15: scala.reflect.ClassTag[V], implicit evidence$16: scala.reflect.ClassTag[KD], implicit evidence$17: scala.reflect.ClassTag[VD], implicit evidence$18: scala.reflect.ClassTag[R])org.apache.spark.streaming.dstream.InputDStream[R]
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParam, topicSet);
出现这个错误,但是我的参数没有问题啊