程序员的自我修养
Home » Apache Storm » Apache Storm初探

Apache Storm初探

0条评论2,200次浏览

Storm简介

Storm,一个由Twitter发起的开源项目。Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流,常被称为“实时的Hadoop”。

若把Hadoop理解成将一大桶水(大量的数据)倒入锅中煮(计算),煮完了就收工;那么Storm则可以理解成为一根水管连接到锅中,水持续的进入锅中,而锅持续的煮水,直到人为的把锅端掉。Storm的官网上的一张图很好的诠释了这个含义:
topology

上图中的水龙头就是数据的输入源Spout,而闪电图标的节点则是Blot节点,用来对数据进行加工处理。

Storm的特性

Storm最大的特性毫无疑问就是实时的数据流处理。Storm有很多使用场景:如实时分析,在线机器学习,持续计算,分布式RPC,ETL等等。其主要特性如下:

  • 低延迟,实时响应。Hadoop在批处理上无人能及,Storm在实时流数据处理上无人可比。
  • 高性能。用到了集群要是性能还说不过去都不好意思说自己是集群处理。其底层消息队列采用了ZeroMQ, 保证消息能快速被处理(0.9版本后不在采用)。
  • 可轻松扩展。和Hadoop一样,可以横向扩展来提高集群的处理能力;同样的,还能方便的扩展每台机器上的工作线程。
  • 高可靠性。Storm是记录级容错,保证每条消息都被处理。
  • 方便开发。Storm同MapReduce一样也有一些编程模型,且Storm除了Java以外还支持大量的编程语言。

Storm的基本组件

类比Hadoop中的组件:

Hadoop Storm
系统角色 JobTracker Nimbus
TaskTracker Supervisor
应用名称 Job Topology
组件接口 Mapper/Reducer Spout/Bolt
  • Nimbus:负责资源分配和任务调度。
  • Supervisor:负责接受nimbus分配的任务。
  • Topology:等价于Hadoop中的Job,之所以叫拓扑,大概因为数据在各个组件中流转形成了一个逻辑上的拓扑结构。可以理解成将一个Job分散成几个部分,每部分在不同的节点上处理,不同的节点通过Tuple来传递数据。
  • Spout:在一个Topology中产生源数据流的组件。
  • Bolt:在一个Topology中接受数据然后执行处理的组件,其接受的数据可以来自Spout也可以来自其它Bolt。

Storm的Nimbus与Supervisor的交互通过Zookeeper,所以Zookeeper为Storm中不可缺少的角色。如下图所示:
storm-cluster

Storm集群部署

  1. 搭建Zookeeper集群。Storm使用Zookeeper协调集群,不会用Zookeeper来传递消息,所以Storm给Zookeeper带来的压力相当低,虽然1个Zookeeper节点可以完全胜任,但最好还是保持3个或以上的Zookeeper节点
  2. 安装Storm依赖库。Java 6和Python 2.6.6
  3. 安装Storm。下载Storm,解压即可
  4. 配置Storm。编辑conf/storm.yaml文件。
    • storm.zookeeper.servers: - "192.168.1.100" - "192.168.1.101" - "192.168.1.102"
      (有多少台Zookeeper就写几个,若不是默认接口,则需在storm.zookeeper.port参数中配置)
    • storm.local.dir: "/home/user/storm/"
      (用于存放Nimbus和Supervisor运行所需的文件,如状态信息文件、Jar等)
    • nimbus.host: "192.168.1.110"
      (Nimbus的IP地址)
    • supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
      (每台Supervisor启动几个Worker,这些Worker的端口号)
  5. 运行Storm。将Storm的bin配置至PATH中去。
    • 启动Nimbus: storm nimbus(nimbus.host配置的机器上启动)
    • 启动Supervisor: storm supervisor
    • 启动web界面:storm ui(必须在启动nimbus的机器上启动,访问http://{nimbus host}:8080可查看)

Storm编程初探

如下图所示,一个Topology往往由一个Spout和多个Bolt组成,而每个Spout或Bolt又可以启动多个实例(线程、Worker)。下图中每个方框代表一台机器,每个圆圈代表该机器上启动的实例(线程、Worker)。
topology-tasks

接下来实现一个简单的Topology:一个Spout进行生成1-10之间的随机数,一个Bolt接收数据并打印出来。

Spout实现

Bolt实现

Topology实现

运行结果

  • 以上代码启动了1个Spout和1个Bolt,运行结果如下,可以看到Spout线程id为80,Bolt线程id为78

    Readed(78): 80:2
    Readed(78): 80:8
    Readed(78): 80:2
    Readed(78): 80:0
    Readed(78): 80:0
    Readed(78): 80:7
    Readed(78): 80:2
    Readed(78): 80:9

  • 启动2个Spout线程、1个Bolt线程

    运行结果如下,可以看到Spout线程id为80和82,Bolt线程id为78

    Readed(78): 80:1
    Readed(78): 82:4
    Readed(78): 80:4
    Readed(78): 82:6
    Readed(78): 80:2
    Readed(78): 82:0
    Readed(78): 80:8
    Readed(78): 82:6
    Readed(78): 80:4
    Readed(78): 82:1
    Readed(78): 80:9
    Readed(78): 82:2
    Readed(78): 80:8
    Readed(78): 82:6
    Readed(78): 80:6
    Readed(78): 82:8

  • 启动1个Spout线程、2个Bolt线程

    运行结果如下,可以看到Spout线程id为82,Bolt线程id为78和80

    Readed(80): 82:1
    Readed(78): 82:1
    Readed(80): 82:3
    Readed(78): 82:0
    Readed(78): 82:5
    Readed(80): 82:5
    Readed(80): 82:2
    Readed(78): 82:1

  • 启动1个Spout线程、2个Bolt线程,采用fieldsGrouping

    并修改Spout代码,根据随机数的奇偶只发射2种数据

    运行结果如下,可以看到Spout线程id为82,Bolt线程id为78和80,且同样的数据只会发给同一个Bolt线程

    Readed(80): 82:b
    Readed(80): 82:b
    Readed(78): 82:a
    Readed(80): 82:b
    Readed(80): 82:b
    Readed(80): 82:b
    Readed(78): 82:a

最后

进过Storm编程的初步实验,可以看到Storm在多线程方面的扩展十分方便,更多探索和使用以及原理将随后进行。

参考资料

(转载本站文章请注明作者和出处 程序员的自我修养 – SelfUp.cn ,请勿用于任何商业用途)
分类:Apache Storm
标签:
发表评论


profile
  • 文章总数:78篇
  • 评论总数:252条
  • 分类总数:31个
  • 标签总数:43个
  • 运行时间:946天

大家好,欢迎来到selfup.cn。

这不是一个只谈技术的博客,这里记录我成长的点点滴滴,coding、riding and everthing!

最新评论
  • pacificLee: :twisted:
  • 小码: 为什么没有后面的呢,只有前10个
  • Anonymous: :lol:
  • Anonymous: :razz: 楼主是属于会聊天的。 我想问,sqoop发了几个版本了,应该没这些问题了吧。
  • Anonymous: Config.kafkaConfig.kafkaGroupI d 这个是指自己配置的group id 还是从 import org.apache.kafka.common.config .Config 这个类...
  • Anonymous: ZkUtils.getPartitionsForTopics (zkClient, Config.kafkaConfig.topic) 那个方法是在 spark-streaming_2.10 中 kafka...
  • Anonymous: ZkUtils.getPartitionsForTopics (zkClient, Config.kafkaConfig.topic) 你确定 kafka 里面有这个类 ? 个人在kafka 最新 稳定版...
  • Anonymous: :roll:
  • Anonymous: 很不错,试问有java版的吗?
  • Anonymous: 赞
  • Anonymous: 哈哈 看楼主的吐槽乐死了 where子句是可以写的 同样找不到资料 一点点试出来的 select id from xxxx where ${CONDITIONS} and 1=1 and 2=2 limit 4
  • EVIL: 我在运行完C4.5的代码后,显示 defined object DecisionTreeTest 是什么意思?这是有错误吗?运行结果在哪里看?
  • sf: 楼主的问题,我都遇到。。。没办法项目已经定型了,最后都硬着头 皮一个一个的改了源码
  • zz: 我去,楼主你真及时,我们今天上了新的HTTP2 push之后也发现速度曲线很奇怪,开始有200k/min,跟 另一台老的推送协议速度差不多,但是过了一会,立马降到只有几k /min,百思不得其解,我们还用了一个海外代理,在...
  • qi365: :mad: 很可恶,百度助纣为虐~
  • qi365: :? :shock: haha~ very good~
  • 张是大: 《深入浅出Spark机器学习实战(用户行为分析)》 课程网盘下载:http://pan.baidu.com/s/ 1mixvUli 密码:1pfn
  • Anonymous: :???:
  • Anonymous: 我用着sqoop感觉还可以,select 几十个字段也没事,估计是版本低。。
  • Anonymous: :grin: