知识点
相关文章
更多最近更新
更多storm实战入门一
2019-03-02 23:46|来源: 网路
本节探讨一下storm具体怎么使用,明白怎么在windows下开发storm程序。
功能描述:实时随机输出一字符串。
在开发前记得导入storm需要的jar包。
1、SimpleSpout类继承BaseRichSpout类,用来产生数据并且向topology里面发出消息:tuple。
package com.ljq.helloword; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * Spout起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务 * * @author Administrator * */ @SuppressWarnings("serial") public class SimpleSpout extends BaseRichSpout{ //用来发射数据的工具类 private SpoutOutputCollector collector; private static String[] info = new String[]{ "comaple\t,12424,44w46,654,12424,44w46,654,", "lisi\t,435435,6537,12424,44w46,654,", "lipeng\t,45735,6757,12424,44w46,654,", "hujintao\t,45735,6757,12424,44w46,654,", "jiangmin\t,23545,6457,2455,7576,qr44453", "beijing\t,435435,6537,12424,44w46,654,", "xiaoming\t,46654,8579,w3675,85877,077998,", "xiaozhang\t,9789,788,97978,656,345235,09889,", "ceo\t,46654,8579,w3675,85877,077998,", "cto\t,46654,8579,w3675,85877,077998,", "zhansan\t,46654,8579,w3675,85877,077998,"}; Random random=new Random(); /** * 初始化collector */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用 */ @Override public void nextTuple() { try { String msg = info[random.nextInt(11)]; // 调用发射方法 collector.emit(new Values(msg)); // 模拟等待100ms Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应 } }
2、SimpleBolt类继承BaseBasicBolt类,处理一个输入tuple。
package com.ljq.helloword; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 接收喷发节点(Spout)发送的数据进行简单的处理后,发射出去。 * * @author Administrator * */ @SuppressWarnings("serial") public class SimpleBolt extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); if (msg != null){ //System.out.println("msg="+msg); collector.emit(new Values(msg + "msg is processed!")); } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("info")); } }
3、SimpleTopology类包含一个main函数,是Storm程序执行的入口点,包括一个数据喷发节点spout和一个数据处理节点bolt。
package com.ljq.helloword; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; /** * 定义了一个简单的topology,包括一个数据喷发节点spout和一个数据处理节点bolt。 * * @author Administrator * */ public class SimpleTopology { public static void main(String[] args) { try { // 实例化TopologyBuilder类。 TopologyBuilder topologyBuilder = new TopologyBuilder(); // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1); // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout"); Config config = new Config(); config.setDebug(true); if (args != null && args.length > 0) { config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); } else { // 这里是本地模式下运行的启动代码。 config.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("simple", config, topologyBuilder.createTopology()); } } catch (Exception e) { e.printStackTrace(); } } }
运行结果效果如下:
转自:http://www.cnblogs.com/linjiqin/archive/2013/05/28/3104016
相关问答
更多-
求《经典Java EE企业应用实战》、《EJB3.0入门经典》电子书全本[2023-06-15]
如果是一点java的基础都没有的话, 那这两本书都不适合, 你最好先把java基础的学了再看这两本书。 -
redis入门指南 redis实战 哪个好[2022-03-19]
应用Redis实现数据的读写,同时利用队列处理器定时将数据写入mysql。 同时要注意避免冲突,在redis启动时去mysql读取所有表键值存入redis中,往redis写数据时,对redis主键自增并进行读取,若mysql更新失败,则需要及时清除缓存及同步redis主键。 这样处理,主要是实时读写redis,而mysql数据则通过队列异步处理,缓解mysql压力,不过这种方法应用场景主要基于高并发,而且redis的高可用集群架构相对更复杂,一般不是很推荐。 -
redis入门指南 redis实战 哪个好[2022-01-24]
应用Redis实现数据的读写,同时利用队列处理器定时将数据写入mysql。 同时要注意避免冲突,在redis启动时去mysql读取所有表键值存入redis中,往redis写数据时,对redis主键自增并进行读取,若mysql更新失败,则需要及时清除缓存及同步redis主键。 这样处理,主要是实时读写redis,而mysql数据则通过队列异步处理,缓解mysql压力,不过这种方法应用场景主要基于高并发,而且redis的高可用集群架构相对更复杂,一般不是很推荐。 -
求学习mahout从入门到实战的教程,相关的也可以。[2023-07-04]
去apache mahout的主页看就行,目前市面上就三本书,写的都很不负责任,模棱两可,错误百出 -
求Go语言从入门到进阶实战课程,急,在线等!!![2022-06-30]
Go语言是谷歌推出的一种全新的编程语言,旨在不损失应用程序性能的情况下降低代码的复杂性,具有“部署简单、并发性好、语言设计良好、执行性能好”等优势,目前国内诸多IT公司均已采用Go语言开发项目。 GO语言从入门到进阶实战:下载地址 -
Hadoop实战入门培训哪个最好[2022-02-22]
其它的不清楚,可以到魔据,条件不错,很注重基础教育,真正做到为学生负责到底,其它的,说实在的真的不敢保证。刚开始有些枯燥,入门就好了,现在缺大数据人才,好好学会有前途。 -
《Android 开发入门与实战》(第二版)修订版何时发售?[2022-05-21]
接到出版社消息,第二印已经上市开卖啦,现在从网店买的应该就是最新的了~(里面有惊喜) -
linux入门与实战经典好不好[2022-04-14]
unix是针对一些大型机小型机设计出来的系统,比如aix,solaris,hp-ux linux是针对台式机设计的,解决了环境限制不能应用unix的问题,典型的操作系统有redhat,suse,ununtu linux与unix的指令是相通的 推荐韩顺平的linux视频,不推荐一开始进接... -
求PIC单片机入门与实战~视频教程[2023-08-15]
找了半天都没,还是直接买书吧 推荐个便宜点的26块 http://mall.ednchina.com/detail/54.aspx 没必要同步的了,狗狗上其他的视频也都差不多 http://movie.gougou.com/search?search=%E5%8D%95%E7%89%87%E6%9C%BA&restype=4&sortby=3&suffix=&lrc=false&page=1&id=2 电驴上推荐http://www.verycd.com/topics/2744448/ 《郭天祥十天学会PI ... -
android开发入门与实战这本书怎么样[2021-08-28]
书是不错,如果你是新手还是选别的,有java经验这本还可以