Storm简介
Storm,一个由Twitter发起的开源项目。Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流,常被称为“实时的Hadoop”。
若把Hadoop理解成将一大桶水(大量的数据)倒入锅中煮(计算),煮完了就收工;那么Storm则可以理解成为一根水管连接到锅中,水持续的进入锅中,而锅持续的煮水,直到人为的把锅端掉。Storm的官网上的一张图很好的诠释了这个含义:
上图中的水龙头就是数据的输入源Spout,而闪电图标的节点则是Blot节点,用来对数据进行加工处理。
Storm的特性
Storm最大的特性毫无疑问就是实时的数据流处理。Storm有很多使用场景:如实时分析,在线机器学习,持续计算,分布式RPC,ETL等等。其主要特性如下:
- 低延迟,实时响应。Hadoop在批处理上无人能及,Storm在实时流数据处理上无人可比。
- 高性能。用到了集群要是性能还说不过去都不好意思说自己是集群处理。其底层消息队列采用了ZeroMQ, 保证消息能快速被处理(0.9版本后不在采用)。
- 可轻松扩展。和Hadoop一样,可以横向扩展来提高集群的处理能力;同样的,还能方便的扩展每台机器上的工作线程。
- 高可靠性。Storm是记录级容错,保证每条消息都被处理。
- 方便开发。Storm同MapReduce一样也有一些编程模型,且Storm除了Java以外还支持大量的编程语言。
Storm的基本组件
类比Hadoop中的组件:
Hadoop | Storm | |
---|---|---|
系统角色 | JobTracker | Nimbus |
TaskTracker | Supervisor | |
应用名称 | Job | Topology |
组件接口 | Mapper/Reducer | Spout/Bolt |
- Nimbus:负责资源分配和任务调度。
- Supervisor:负责接受nimbus分配的任务。
- Topology:等价于Hadoop中的Job,之所以叫拓扑,大概因为数据在各个组件中流转形成了一个逻辑上的拓扑结构。可以理解成将一个Job分散成几个部分,每部分在不同的节点上处理,不同的节点通过Tuple来传递数据。
- Spout:在一个Topology中产生源数据流的组件。
- Bolt:在一个Topology中接受数据然后执行处理的组件,其接受的数据可以来自Spout也可以来自其它Bolt。
Storm的Nimbus与Supervisor的交互通过Zookeeper,所以Zookeeper为Storm中不可缺少的角色。如下图所示:
Storm集群部署
- 搭建Zookeeper集群。Storm使用Zookeeper协调集群,不会用Zookeeper来传递消息,所以Storm给Zookeeper带来的压力相当低,虽然1个Zookeeper节点可以完全胜任,但最好还是保持3个或以上的Zookeeper节点
- 安装Storm依赖库。Java 6和Python 2.6.6
- 安装Storm。下载Storm,解压即可
- 配置Storm。编辑conf/storm.yaml文件。
- storm.zookeeper.servers: - "192.168.1.100" - "192.168.1.101" - "192.168.1.102"
(有多少台Zookeeper就写几个,若不是默认接口,则需在storm.zookeeper.port参数中配置) - storm.local.dir: "/home/user/storm/"
(用于存放Nimbus和Supervisor运行所需的文件,如状态信息文件、Jar等) - nimbus.host: "192.168.1.110"
(Nimbus的IP地址) - supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
(每台Supervisor启动几个Worker,这些Worker的端口号)
- storm.zookeeper.servers: - "192.168.1.100" - "192.168.1.101" - "192.168.1.102"
- 运行Storm。将Storm的bin配置至PATH中去。
- 启动Nimbus: storm nimbus(nimbus.host配置的机器上启动)
- 启动Supervisor: storm supervisor
- 启动web界面:storm ui(必须在启动nimbus的机器上启动,访问http://{nimbus host}:8080可查看)
Storm编程初探
如下图所示,一个Topology往往由一个Spout和多个Bolt组成,而每个Spout或Bolt又可以启动多个实例(线程、Worker)。下图中每个方框代表一台机器,每个圆圈代表该机器上启动的实例(线程、Worker)。
接下来实现一个简单的Topology:一个Spout进行生成1-10之间的随机数,一个Bolt接收数据并打印出来。
Spout实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class HelloWorldSpout extends BaseRichSpout { private SpoutOutputCollector collector; private static final int MAX_RANDOM = 10; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); //声明Field } @Override public void open(Map map, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { Utils.sleep(1000); //一秒发射一次数据 final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); collector.emit(new Values(Thread.currentThread().getId() + ":" + instanceRandom)); } } |
Bolt实现
1 2 3 4 5 6 7 8 9 10 11 |
public class HelloWorldBolt extends BaseRichBolt { @Override public void prepare(Map map, TopologyContext context, OutputCollector collector) {} @Override public void execute(Tuple tuple) { String r = tuple.getStringByField("sentence"); System.out.println("Readed("+Thread.currentThread().getId()+"): " + r); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} } |
Topology实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public class HelloWorldTopology { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("helloworld-spout", new HelloWorldSpout()); builder.setBolt("helloworld-bolt", new HelloWorldBolt()) .shuffleGrouping("helloworld-spout"); Config conf = new Config(); conf.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("hello-world", conf, builder.createTopology()); Utils.sleep(10000); //Topology运行10秒 cluster.killTopology("hello-world"); cluster.shutdown(); } } |
运行结果
- 以上代码启动了1个Spout和1个Bolt,运行结果如下,可以看到Spout线程id为80,Bolt线程id为78
Readed(78): 80:2
Readed(78): 80:8
Readed(78): 80:2
Readed(78): 80:0
Readed(78): 80:0
Readed(78): 80:7
Readed(78): 80:2
Readed(78): 80:9 - 启动2个Spout线程、1个Bolt线程
1builder.setSpout("helloworld-spout", new HelloWorldSpout(), 2);
运行结果如下,可以看到Spout线程id为80和82,Bolt线程id为78Readed(78): 80:1
Readed(78): 82:4
Readed(78): 80:4
Readed(78): 82:6
Readed(78): 80:2
Readed(78): 82:0
Readed(78): 80:8
Readed(78): 82:6
Readed(78): 80:4
Readed(78): 82:1
Readed(78): 80:9
Readed(78): 82:2
Readed(78): 80:8
Readed(78): 82:6
Readed(78): 80:6
Readed(78): 82:8 - 启动1个Spout线程、2个Bolt线程
12builder.setBolt("helloworld-bolt", new HelloWorldBolt(), 2).shuffleGrouping("helloworld-spout");
运行结果如下,可以看到Spout线程id为82,Bolt线程id为78和80Readed(80): 82:1
Readed(78): 82:1
Readed(80): 82:3
Readed(78): 82:0
Readed(78): 82:5
Readed(80): 82:5
Readed(80): 82:2
Readed(78): 82:1 - 启动1个Spout线程、2个Bolt线程,采用fieldsGrouping
12builder.setBolt("helloworld-bolt", new HelloWorldBolt(), 2).fieldsGrouping("helloworld-spout", new Fields("sentence"));
并修改Spout代码,根据随机数的奇偶只发射2种数据
1234if(instanceRandom % 2 == 0)collector.emit(new Values(Thread.currentThread().getId() + ":a"));elsecollector.emit(new Values(Thread.currentThread().getId() + ":b"));
运行结果如下,可以看到Spout线程id为82,Bolt线程id为78和80,且同样的数据只会发给同一个Bolt线程Readed(80): 82:b
Readed(80): 82:b
Readed(78): 82:a
Readed(80): 82:b
Readed(80): 82:b
Readed(80): 82:b
Readed(78): 82:a
最后
进过Storm编程的初步实验,可以看到Storm在多线程方面的扩展十分方便,更多探索和使用以及原理将随后进行。
参考资料