公司业务准备上流数据处理了。由于之前基础平台选用了CDH,而CDH自带Spark,且由于数据源是每隔几分钟发一组数据文件的形式来传送数据,所以最终选取用Spark Steaming来做流数据处理。
下面记录初步使用Spark Steaming和Flume的一些过程。
第一个测试:Flume(spooldir to hdfs)
原始数据通过ftp每隔几分钟拉取一批数据到本地某文件夹。于是测试了下flume监控文件夹并将新加入的文件写入hdfs的功能。
配置文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data/ a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/flume/%Y-%m-%d/%H%M%S a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.channels.c1.type = file |
根据官方文档
若不设置
1 2 3 4 5 6 7 8 9 10 11 12 |
15/06/05 17:16:45 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204) at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:224) at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:420) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:388) ... 3 more |
期间还遇到了下面这个错误,原因很简单,当时想把路径设置成为yyyy-MM-dd hh:mm:ss这样的形式,但HDFS不允许创建带有“:”符号的文件夹,所以报错。
1 2 3 4 5 6 7 8 9 10 11 |
15/06/08 11:21:07 ERROR hdfs.HDFSEventSink: process failed java.lang.IllegalArgumentException: Pathname /user/flume/2015-06-08/1121:04/FlumeData.1433733664360.tmp from /user/flume/2015-06-08/1121:04/FlumeData.1433733664360.tmp is not a valid DFS filename. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:195) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:104) at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:396) at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:392) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:392) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:336) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889) |
启动命令:
1 |
bin/flume-ng agent --conf conf --conf-file dir2hdfs.conf --name a1 -Dflume.root.logger=INFO,console |
第二个测试:执行Spark Steaming任务
测试程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
public final class FlumeTest { private FlumeTest() { } public static void main(String[] args) { if (args.length != 2) { System.err.println("Usage: FlumeTest <host> <port>"); System.exit(1); } String host = args[0]; int port = Integer.parseInt(args[1]); Duration batchInterval = new Duration(2000); SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); flumeStream.count(); flumeStream.count().map(new Function<Long, String>() { @Override public String call(Long in) { return "Received " + in + " flume events."; } }).print(); ssc.start(); ssc.awaitTermination(); } } |
打jar包后放在集群上跑,结果第一次报错VERSION版本号不对,然后看了看本机编译的时候采用的是JAVA 8,而集群都是JAVA 7,于是重新用JAVA 7打包。第二次报错,大概意思是找不到org.apache.spark.streaming下面的方法,这个就无语了,尝试了多次后突然想到可能是所下载的spark lib包是java 8编译的,于是尝试了用CDH集群的spark lib包,好使,果然是lib包的问题。
出现各种找不到方法的错误,如下:
1 2 |
Exception in thread "Driver" scala.MatchError: java.lang.NoClassDefFoundError: org/apache/spark/streaming/flume/FlumeUtils (of class java.lang.NoClassDefFoundError) ????at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:432) |
1 2 3 4 5 6 |
Exception in thread "Thread-48" java.lang.NoClassDefFoundError: org/apache/avro/ipc/Responder at org.apache.spark.streaming.flume.FlumeInputDStream.getReceiver(FlumeInputDStream.scala:55) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$4.apply(ReceiverTracker.scala:248) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$4.apply(ReceiverTracker.scala:247) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) |
1 2 3 4 5 6 |
15/06/08 13:22:20 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-14] shutting down ActorSystem [sparkDriver] java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2499) at java.lang.Class.getDeclaredField(Class.java:1951) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) |
解决方法1:启动命令里面添加缺少的各种jar包
1 |
spark-submit --class com.mobicloud.test.FlumeTest --jars /root/mrwork/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar,/root/mrwork/spark-streaming-flume-sink_2.10-1.3.1.jar,/root/mrwork/flume-ng-sdk-1.5.0-cdh5.3.0.jar,/root/mrwork/avro-ipc.jar --deploy-mode cluster --master yarn wc.jar master 33333 |
解决方法2:程序中添加所需的各种jar包
1 2 3 4 5 6 7 8 |
SparkConf sparkConf = new SparkConf() .setAppName("FileStream") .setJars(new String[]{ "/root/mrwork/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar", "/root/mrwork/spark-streaming-flume-sink_2.10-1.3.1.jar", "/root/mrwork/flume-ng-sdk-1.5.0-cdh5.3.0.jar", "/root/mrwork/avro-ipc.jar" }); |
终于在启动时加上各种各样的jar包后可以正常的运行且不在报错,此时ctrl+c无法取消任务,且用
第三个测试:Flume发送数据给Spark Streaming程序
Spark Streaming接收Flume以avro形式发送的数据。Flume配置文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data/ a1.sinks.k1.type = avro a1.sinks.k1.hostname = master a1.sinks.k1.port = 33333 a1.channels.c1.type = file |
先启动Spark Steaming任务,在启动Flume-ng,否则会报以下错误:
1 2 3 4 5 6 7 |
15/06/05 17:35:12 WARN sink.AbstractRpcSink: Unable to create Rpc client using hostname: master, port: 33333 org.apache.flume.FlumeException: NettyAvroRpcClient { host: slave07, port: 44443 }: RPC connection error at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:182) at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:121) at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:638) at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:89) at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127) |
尝试将一个文件复制到Flume监控的/root/data文件夹下,Spark任务就会在log日志中输出一行:
1 2 3 4 |
------------------------------------------- Time: 1433729594000 ms ------------------------------------------- Received 75 flume events. |
但看了下放进去的文件行数是74,这里不知道为啥会多一行,不知道是不是文件头信息还是时间戳信息什么的,待后续分析。
参考资料
- Flume 1.6.0 User Guide
- Spark Overview
- Spark Streaming Programming Guide
- Spark Streaming + Flume Integration Guide
后续
开始看Spark Streaming的官方文档了,发现可以直接使用Spark Streaming中的Basic Sources的File Streams即可,这个玩意就能监控文件夹。看来用Flume有点多此一举了,具体如何取舍看实际业务需求。
spark streaming 的File Streams监视只能监视文件名变动或整个文件增减,不能对文件行数进行监视,子文件夹也不行。所以flume还是得用啊~