程序员的自我修养
Home » Apache Flume, Apache Spark » Flume+Spark Steaming初探

Flume+Spark Steaming初探

1条评论8,608次浏览

公司业务准备上流数据处理了。由于之前基础平台选用了CDH,而CDH自带Spark,且由于数据源是每隔几分钟发一组数据文件的形式来传送数据,所以最终选取用Spark Steaming来做流数据处理。

下面记录初步使用Spark Steaming和Flume的一些过程。

第一个测试:Flume(spooldir to hdfs)

原始数据通过ftp每隔几分钟拉取一批数据到本地某文件夹。于是测试了下flume监控文件夹并将新加入的文件写入hdfs的功能。

配置文件如下:

根据官方文档hdfs.fileType默认是SequenceFile,这里选用DataStream将不压缩输出文件。

若不设置hdfs.useLocalTimeStamptrue则会报下面的错误,暂时不知为何。

期间还遇到了下面这个错误,原因很简单,当时想把路径设置成为yyyy-MM-dd hh:mm:ss这样的形式,但HDFS不允许创建带有“:”符号的文件夹,所以报错。

启动命令:

第二个测试:执行Spark Steaming任务

测试程序:

打jar包后放在集群上跑,结果第一次报错VERSION版本号不对,然后看了看本机编译的时候采用的是JAVA 8,而集群都是JAVA 7,于是重新用JAVA 7打包。第二次报错,大概意思是找不到org.apache.spark.streaming下面的方法,这个就无语了,尝试了多次后突然想到可能是所下载的spark lib包是java 8编译的,于是尝试了用CDH集群的spark lib包,好使,果然是lib包的问题。

出现各种找不到方法的错误,如下:

解决方法1:启动命令里面添加缺少的各种jar包

解决方法2:程序中添加所需的各种jar包

终于在启动时加上各种各样的jar包后可以正常的运行且不在报错,此时ctrl+c无法取消任务,且用mapred job -kill jobid也无法杀死任务,只能用yarn application -kill jobid来干掉任务。

第三个测试:Flume发送数据给Spark Streaming程序

Spark Streaming接收Flume以avro形式发送的数据。Flume配置文件如下:

先启动Spark Steaming任务,在启动Flume-ng,否则会报以下错误:

尝试将一个文件复制到Flume监控的/root/data文件夹下,Spark任务就会在log日志中输出一行:

但看了下放进去的文件行数是74,这里不知道为啥会多一行,不知道是不是文件头信息还是时间戳信息什么的,待后续分析。

参考资料

后续

开始看Spark Streaming的官方文档了,发现可以直接使用Spark Streaming中的Basic Sources的File Streams即可,这个玩意就能监控文件夹。看来用Flume有点多此一举了,具体如何取舍看实际业务需求。

(转载本站文章请注明作者和出处 程序员的自我修养 – SelfUp.cn ,请勿用于任何商业用途)
标签:,
1条评论
  1. ggy说道:

    spark streaming 的File Streams监视只能监视文件名变动或整个文件增减,不能对文件行数进行监视,子文件夹也不行。所以flume还是得用啊~

发表评论


profile
  • 文章总数:79篇
  • 评论总数:402条
  • 分类总数:31个
  • 标签总数:44个
  • 运行时间:1013天

大家好,欢迎来到selfup.cn。

这不是一个只谈技术的博客,这里记录我成长的点点滴滴,coding、riding and everthing!

最新评论
  • 晴子: 在主节点初始化CM5数据库的时候报错误:Verifying that we can write to /opt/cm-5.9.0/etc/cloudera-scm -server log4j:ERROR Could not...
  • zhangnew: 就4题 :?:
  • linxh: “ 但要是遇到预先并不知道数组的长度而又需要获取正确的(或者称之 为原始的)split长度时,该如何处理呢。。? ” 印象中可以split函数参数传-1?
  • linxh: 班门弄斧一下: ssh host cmd 和直接ssh上后cmd结果不一样是因为ssh直接运行远程命令 是非交互非登录模式与ssh上去得到一个登录交互式Shell二 者加载的环境变量不一样。
  • 匿名: 其实文本分类和数字分类是一样的,只是文本分类需要多一个步骤, 就是计算它的tf-idf值将其转换为double类型
  • yurnom: 可能苹果最近又改变了返回值吧,最近没做测试了。 BadDeviceToken一般测试环境和正式环境弄错的情况 下会出现。
  • Anonymous: :razz: 博主,良心贴啊, 最近也在弄apns推送。 有个问题想请教你一下啊。 你博客中写的 Unregistered 错误,有准确的说明吗, 我看你博客中写的:...
  • 一波清泉: 回复邮箱: 1004161699@qq.com 多谢
  • Anonymous: 17/02/09 01:15:02 WARN Utils: Service ‘SparkUI’ could not bind on port 4040. Attempting port...
  • pacificLee: :twisted:
  • 小码: 为什么没有后面的呢,只有前10个
  • Anonymous: :lol:
  • Anonymous: :razz: 楼主是属于会聊天的。 我想问,sqoop发了几个版本了,应该没这些问题了吧。
  • Anonymous: Config.kafkaConfig.kafkaGroupI d 这个是指自己配置的group id 还是从 import org.apache.kafka.common.config .Config 这个类...
  • Anonymous: ZkUtils.getPartitionsForTopics (zkClient, Config.kafkaConfig.topic) 那个方法是在 spark-streaming_2.10 中 kafka...
  • Anonymous: ZkUtils.getPartitionsForTopics (zkClient, Config.kafkaConfig.topic) 你确定 kafka 里面有这个类 ? 个人在kafka 最新 稳定版...
  • Anonymous: :roll:
  • Anonymous: 很不错,试问有java版的吗?
  • Anonymous: 赞
  • Anonymous: 哈哈 看楼主的吐槽乐死了 where子句是可以写的 同样找不到资料 一点点试出来的 select id from xxxx where ${CONDITIONS} and 1=1 and 2=2 limit 4