程序员的自我修养
Home » Apache Storm » Apache Storm初探

Apache Storm初探

0条评论2,514次浏览

Storm简介

Storm,一个由Twitter发起的开源项目。Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流,常被称为“实时的Hadoop”。

若把Hadoop理解成将一大桶水(大量的数据)倒入锅中煮(计算),煮完了就收工;那么Storm则可以理解成为一根水管连接到锅中,水持续的进入锅中,而锅持续的煮水,直到人为的把锅端掉。Storm的官网上的一张图很好的诠释了这个含义:
topology

上图中的水龙头就是数据的输入源Spout,而闪电图标的节点则是Blot节点,用来对数据进行加工处理。

Storm的特性

Storm最大的特性毫无疑问就是实时的数据流处理。Storm有很多使用场景:如实时分析,在线机器学习,持续计算,分布式RPC,ETL等等。其主要特性如下:

  • 低延迟,实时响应。Hadoop在批处理上无人能及,Storm在实时流数据处理上无人可比。
  • 高性能。用到了集群要是性能还说不过去都不好意思说自己是集群处理。其底层消息队列采用了ZeroMQ, 保证消息能快速被处理(0.9版本后不在采用)。
  • 可轻松扩展。和Hadoop一样,可以横向扩展来提高集群的处理能力;同样的,还能方便的扩展每台机器上的工作线程。
  • 高可靠性。Storm是记录级容错,保证每条消息都被处理。
  • 方便开发。Storm同MapReduce一样也有一些编程模型,且Storm除了Java以外还支持大量的编程语言。

Storm的基本组件

类比Hadoop中的组件:

Hadoop Storm
系统角色 JobTracker Nimbus
TaskTracker Supervisor
应用名称 Job Topology
组件接口 Mapper/Reducer Spout/Bolt
  • Nimbus:负责资源分配和任务调度。
  • Supervisor:负责接受nimbus分配的任务。
  • Topology:等价于Hadoop中的Job,之所以叫拓扑,大概因为数据在各个组件中流转形成了一个逻辑上的拓扑结构。可以理解成将一个Job分散成几个部分,每部分在不同的节点上处理,不同的节点通过Tuple来传递数据。
  • Spout:在一个Topology中产生源数据流的组件。
  • Bolt:在一个Topology中接受数据然后执行处理的组件,其接受的数据可以来自Spout也可以来自其它Bolt。

Storm的Nimbus与Supervisor的交互通过Zookeeper,所以Zookeeper为Storm中不可缺少的角色。如下图所示:
storm-cluster

Storm集群部署

  1. 搭建Zookeeper集群。Storm使用Zookeeper协调集群,不会用Zookeeper来传递消息,所以Storm给Zookeeper带来的压力相当低,虽然1个Zookeeper节点可以完全胜任,但最好还是保持3个或以上的Zookeeper节点
  2. 安装Storm依赖库。Java 6和Python 2.6.6
  3. 安装Storm。下载Storm,解压即可
  4. 配置Storm。编辑conf/storm.yaml文件。
    • storm.zookeeper.servers: - "192.168.1.100" - "192.168.1.101" - "192.168.1.102"
      (有多少台Zookeeper就写几个,若不是默认接口,则需在storm.zookeeper.port参数中配置)
    • storm.local.dir: "/home/user/storm/"
      (用于存放Nimbus和Supervisor运行所需的文件,如状态信息文件、Jar等)
    • nimbus.host: "192.168.1.110"
      (Nimbus的IP地址)
    • supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
      (每台Supervisor启动几个Worker,这些Worker的端口号)
  5. 运行Storm。将Storm的bin配置至PATH中去。
    • 启动Nimbus: storm nimbus(nimbus.host配置的机器上启动)
    • 启动Supervisor: storm supervisor
    • 启动web界面:storm ui(必须在启动nimbus的机器上启动,访问http://{nimbus host}:8080可查看)

Storm编程初探

如下图所示,一个Topology往往由一个Spout和多个Bolt组成,而每个Spout或Bolt又可以启动多个实例(线程、Worker)。下图中每个方框代表一台机器,每个圆圈代表该机器上启动的实例(线程、Worker)。
topology-tasks

接下来实现一个简单的Topology:一个Spout进行生成1-10之间的随机数,一个Bolt接收数据并打印出来。

Spout实现

Bolt实现

Topology实现

运行结果

  • 以上代码启动了1个Spout和1个Bolt,运行结果如下,可以看到Spout线程id为80,Bolt线程id为78

    Readed(78): 80:2
    Readed(78): 80:8
    Readed(78): 80:2
    Readed(78): 80:0
    Readed(78): 80:0
    Readed(78): 80:7
    Readed(78): 80:2
    Readed(78): 80:9

  • 启动2个Spout线程、1个Bolt线程

    运行结果如下,可以看到Spout线程id为80和82,Bolt线程id为78

    Readed(78): 80:1
    Readed(78): 82:4
    Readed(78): 80:4
    Readed(78): 82:6
    Readed(78): 80:2
    Readed(78): 82:0
    Readed(78): 80:8
    Readed(78): 82:6
    Readed(78): 80:4
    Readed(78): 82:1
    Readed(78): 80:9
    Readed(78): 82:2
    Readed(78): 80:8
    Readed(78): 82:6
    Readed(78): 80:6
    Readed(78): 82:8

  • 启动1个Spout线程、2个Bolt线程

    运行结果如下,可以看到Spout线程id为82,Bolt线程id为78和80

    Readed(80): 82:1
    Readed(78): 82:1
    Readed(80): 82:3
    Readed(78): 82:0
    Readed(78): 82:5
    Readed(80): 82:5
    Readed(80): 82:2
    Readed(78): 82:1

  • 启动1个Spout线程、2个Bolt线程,采用fieldsGrouping

    并修改Spout代码,根据随机数的奇偶只发射2种数据

    运行结果如下,可以看到Spout线程id为82,Bolt线程id为78和80,且同样的数据只会发给同一个Bolt线程

    Readed(80): 82:b
    Readed(80): 82:b
    Readed(78): 82:a
    Readed(80): 82:b
    Readed(80): 82:b
    Readed(80): 82:b
    Readed(78): 82:a

最后

进过Storm编程的初步实验,可以看到Storm在多线程方面的扩展十分方便,更多探索和使用以及原理将随后进行。

参考资料

(转载本站文章请注明作者和出处 程序员的自我修养 – SelfUp.cn ,请勿用于任何商业用途)
分类:Apache Storm
标签:
发表评论


profile
  • 文章总数:79篇
  • 评论总数:378条
  • 分类总数:31个
  • 标签总数:44个
  • 运行时间:1041天

大家好,欢迎来到selfup.cn。

这不是一个只谈技术的博客,这里记录我成长的点点滴滴,coding、riding and everthing!

最新评论
  • Anonymous: 看错了 忽略我
  • Anonymous: UserSideCF这个类在哪里
  • 晴子: 在主节点初始化CM5数据库的时候报错误:Verifying that we can write to /opt/cm-5.9.0/etc/cloudera-scm -server log4j:ERROR Could not...
  • zhangnew: 就4题 :?:
  • linxh: “ 但要是遇到预先并不知道数组的长度而又需要获取正确的(或者称之 为原始的)split长度时,该如何处理呢。。? ” 印象中可以split函数参数传-1?
  • linxh: 班门弄斧一下: ssh host cmd 和直接ssh上后cmd结果不一样是因为ssh直接运行远程命令 是非交互非登录模式与ssh上去得到一个登录交互式Shell二 者加载的环境变量不一样。
  • 匿名: 其实文本分类和数字分类是一样的,只是文本分类需要多一个步骤, 就是计算它的tf-idf值将其转换为double类型
  • yurnom: 可能苹果最近又改变了返回值吧,最近没做测试了。 BadDeviceToken一般测试环境和正式环境弄错的情况 下会出现。
  • Anonymous: :razz: 博主,良心贴啊, 最近也在弄apns推送。 有个问题想请教你一下啊。 你博客中写的 Unregistered 错误,有准确的说明吗, 我看你博客中写的:...
  • 一波清泉: 回复邮箱: 1004161699@qq.com 多谢
  • Anonymous: 17/02/09 01:15:02 WARN Utils: Service ‘SparkUI’ could not bind on port 4040. Attempting port...
  • pacificLee: :twisted:
  • 小码: 为什么没有后面的呢,只有前10个
  • Anonymous: :lol:
  • Anonymous: :razz: 楼主是属于会聊天的。 我想问,sqoop发了几个版本了,应该没这些问题了吧。
  • Anonymous: Config.kafkaConfig.kafkaGroupI d 这个是指自己配置的group id 还是从 import org.apache.kafka.common.config .Config 这个类...
  • Anonymous: ZkUtils.getPartitionsForTopics (zkClient, Config.kafkaConfig.topic) 那个方法是在 spark-streaming_2.10 中 kafka...
  • Anonymous: ZkUtils.getPartitionsForTopics (zkClient, Config.kafkaConfig.topic) 你确定 kafka 里面有这个类 ? 个人在kafka 最新 稳定版...
  • Anonymous: :roll:
  • Anonymous: 很不错,试问有java版的吗?