官网上对这个新接口的介绍很多,大致就是不与zookeeper交互,直接去kafka中读取数据,自己维护offset,于是速度比
项目中需要尝试使用这个接口,同时还要进行offset的监控,于是只能按照官网所说的,自己将offset写入zookeeper。
方法1
1 2 3 4 5 6 7 8 9 |
def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] ): InputDStream[(K, V)] {...} |
这个方法只有3个参数,使用起来最为方便,但是每次启动的时候默认从Latest offset开始读取,或者设置参数
显然这2种读取位置都不适合生产环境。
(更多…)