storm starter学习(一)

2019-03-02 23:57|来源: 网路

    官方提供的storm starter示例中,有很多应用的例子,对storm的应用场景理解很有帮助。本文结合源码来进行功能分解,记录一下,作为记忆索引吧。

    先来看一个比较简单的示例:WordCountTopology,原版代码该示例是为了说明多语言适配而做的应用场景,主要功能是随机生成一些String,将这些String划分分组,统计各单词出现数量。后来修改了一下,去掉了py调用的地方。使用java来进行词组划分。

    先来看一下Topology:

public static void main(String[] args) throws Exception {
	TopologyBuilder builder = new TopologyBuilder();
	builder.setSpout("spout", new RandomSentenceSpout(), 5); // 数据源
	builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); // 单词划分
	builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",new Fields("word")); // 统计单词出现个数
	Config conf = new Config();
	conf.setDebug(true);

	if (args != null && args.length > 0) { 
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar("wordCount", conf,
					builder.createTopology());
	} else {
	    conf.setMaxTaskParallelism(3);
	    LocalCluster cluster = new LocalCluster();
	    cluster.submitTopology("word-count", conf, builder.createTopology());

	    Thread.sleep(10000);
	    cluster.shutdown();
	}
}

    RandomSentenceSpout功能很简单,随机生成字符串到tuple中,key为word。

    再看一下bolt,该示例中使用了两类分组方式shuffleGrouping(随机分组),fieldsGrouping(按字段分组)。使用shuffleGrouping来进行单词划分,为了保证单词统计时都在一个bolt中进行,使用fieldsGrouping来进行word划分统计,运行时会看到相同key值的tuple会分配到同一线程上。

public static class SplitSentence implements IBasicBolt {
	public void prepare(Map conf, TopologyContext context) {
	}

	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String sentence = tuple.getString(0);
		for (String word : sentence.split(" ")) { // 将Spout接收到的tuple按空格进行分解,产生单词数据流
			collector.emit(new Values(word));
		}

	}

	public void cleanup() {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word")); // 定义key值
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}
    单词统计bolt:
public static class WordCount extends BaseBasicBolt {
	Map<String, Integer> counts = new HashMap<String, Integer>();

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String word = tuple.getString(0);
		Integer count = counts.get(word);
		if (count == null)
			count = 0;
		count++;
		counts.put(word, count);
		collector.emit(new Values(word, count)); // 输出结果word + word number
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "count")); 
	}
}

    总结:

    通常情况下,为了保证数据可靠性与完整性,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,相当于自动处理了prepare方法和collector.emit.ack(inputTuple);


转自:http://my.oschina.net/u/262605/blog/299915

相关问答

更多
  • 既然要学习,就都看看,都看了之后才了解他们的区别啊。 如果按顺序,还是先学apark ,它最简单。
  • 你再把它和hadoop比较快慢。 两个框架都用于处理大量数据的并行计算。 所以这是把过程传递给数据,metaQ、hadoop:Hadoop使用磁盘作为中间交换的介质.容错性,再小的话hdfs上会一堆小文件),而是比较的吞吐了,在于reduce任务通过网络拖过去运算: 1,只需实现一个简单的Storm通信协议即可,数据直接通过网络导入内存,产生一行就通过一个传输系统发给流式计算系统。Storm保证每个消息至少能得到一次完整处理、多份复制等。二者在延时和吞吐上没太大区别。但是吞吐也低于mapreduce,可以在 ...
  • 在风暴谷歌集团找到答案。 似乎DRCP拓扑将发出一个元组,其中包含由DRCP spout作为流接收的参数,然后在处理完成时指示回(使用称为请求ID的唯一标识)。 在同一个线程中,hadoop可能最适合这些情况,除非数据不够大,并且可以一直处理。 Found the answer in the storm google group. Seems that DRCP topologies will emit a tuple with parameters that is received by DRCP spo ...
  • 免责声明:我写了您在上述问题中引用的文章 。 但是,我对“任务”的概念有些困惑。 任务是组件的运行实例(spout还是螺栓)? 执行者有多个任务实际上是说执行者多次执行相同的组件,我是否正确? 是的,是的。 此外,在一般的并行性意义上,Storm将为一个喷口或螺栓生成一个专用的线程(执行器),但是由具有多个任务的执行器(线程)对并行性的贡献是什么? 每个执行器运行多个任务不会增加并行级别 - 执行程序总是有一个线程用于其所有任务,这意味着任务在执行程序上连续运行。 正如我在文章中写道,请注意: 在拓扑开始后 ...
  • 我遇到了同样的问题,并从storm-user邮件列表档案中找到了答案,请参阅 http://mail-archives.apache.org/mod_mbox/storm-user/201404.mbox/%3C72091F29-8109-4A1C-A79D-DDD2D871737B%40petrolink.com%3E I encountered the same problem, and found the answer from storm-user mailing list archives, re ...
  • 确保将Storm bin目录放在路径上,如此处所述。 是的,您可以使用Maven( 此处描述)或使用storm命令( 此处描述)运行拓扑。 Make sure you placed the Storm bin directory on your path as described here. And yes, you can run a topology with Maven (described here) or with the storm command (described here).
  • 基本理解 手册是必读的,特别是概念部分将帮助您获得基本的理解。 您不仅应该为字数安装示例拓扑,还应该阅读附录教程 。 输入输出 你的最后一个螺栓应该将输出传输到目的地。 那可能是数据库,服务...... 您可以在任何其他Java应用程序中使用日志记录。 如果您使用的是Trident API ,则会有一个Debug过滤器,您可以将其集成到拓扑中。 Basic Understanding The manual is a must-read, especially the Concepts section wil ...
  • 这通常是由风暴客户端和风暴启动器版本不一致造成的。 请尝试按照以下步骤操作该示例。 从http://storm.apache.org/downloads.html下载最新版本 在这个例子中,我们将使用版本1.1.1 将其解压缩到一个文件夹,我们称之为${STORM_HOME} cd到${STORM_HOME}/examples/storm-starter 执行mvn package -DskipTests=true 这应该在目标文件夹中构建风暴启动器jar ${STORM_HOME}/examples/st ...
  • 它之所以用于在线学习,是因为它是逐个处理数据的解决方案,您可以通过设置一些并行化(更多工作流程和节点)来扩展。 您通常使用批量学习来构建或训练具有历史数据的模型,但对于该处理,逐个数据不是您的最佳选择。 由于您可能需要处理数月的数据,因此您需要对其进行批处理以优化流程。 这就是像火花这样的其他解决方案到位的地方。 The reason why it is for online learning is because is a solution to process data one by one, that ...
  • 通过maven运行暴风雨拓扑不是要走的路。 您应该在命令行上使用bin/storm jar myJarFile.jar向集群提交拓扑(这也适用于本地模式)。 文件./target/original-storm-starter-0.11.0-SNAPSHOT.jar和./target/storm-starter-0.11.0-SNAPSHOT.jar是标准的maven工件,不能用于向集群提交拓扑。 您可以使用maven-jar-plugin (我建议您开始使用 - 您可能还需要使用maven-dependen ...