知识点
相关文章
更多最近更新
更多Twitter Storm 入门指南
2019-03-02 23:38|来源: 网路
通过这个入门指南,你将学会如何创建storm拓扑(topology)和部署拓扑到storm集群。主要使用Java语言,少许例子使用Python说明Storm的多语言特性。
- storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
arg1
和
arg2
参数的
backtype.storm.MyTopology
类,这个类的主要功能是定义拓扑,并提交拓扑到nimbus,
storm jar
负责连接nimbus和上传jar包。
Storm核心的抽象概念是“流”。流是一个无界的连续元组(tuple)。Storm提供用分布式和可靠的方式转换一个流到一个新流的原语(primitive)。例如,你可以转换一个tweets流到一个trending topics流。
spout是流的源头。例如,一个spout可能从Kestrel队列读取元组并以流的方式发射它们,一个spout可能连接Twitter API并发射一个tweets流。
- public class DoubleAndTripleBolt implements IRichBolt {
- private OutputCollectorBase _collector;
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
- _collector = collector;
- }
- @Override
- public void execute(Tuple input) {
- int val = input.getInteger(0);
- _collector.emit(input, new Values(val*2, val*3));
- _collector.ack(input);
- }
- @Override
- public void cleanup() {
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("double", "triple"));
- }
- }
declareOutputFields
为这个组件声明输出字段
["double", "triple"]
,剩下的bolt将在后面的章节解释。
ExclamationTopology
的 定义,代码如下所示:
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("words", new TestWordSpout(), 10);
- builder.setBolt("exclaim1", new ExclamationBolt(), 3)
- .shuffleGrouping("words");
- builder.setBolt("exclaim2", new ExclamationBolt(), 2)
- .shuffleGrouping("exclaim1");
setSpout
和方法
setBolt
来定义节点。这些方法需要三个入参:用户指定节点ID,业务逻辑处理对象,节点的并行数。这个例子,我们给spout指定的id是“words”,给bolt指定的id分别是“exclaim1”和“exclaim2”。
setBolt
方法返回一个
InputDeclarer对象,此对象用于定义该bolt的输入流。组件“
excliam1”声明它要
读取所有由组件“
words”发射出来的元组,并使用随机分组策略(
shuffle grouping)分发到各线程,组件
“excliam2”声明它要读取所有由组件“excliam1”发射出来的元组,并使用随机分组策略分发到各线程。“shuffle grouping”是指元组从输入任务被随机的分发到bolt任务。组件间的数据分发策略还有很多,我将用少量章节解释它们。
- builder.setBolt("exclaim2", new ExclamationBolt(), 5)
- .shuffleGrouping("words")
- .shuffleGrouping("exclaim1");
让我们深入探究拓扑中spout和bolt的实现。Spout负责发送新消息到拓扑。拓扑中的TestWordSpout每隔0.1秒就从["nathan", "mike", "jackson", "golda", "bertels"]列表中随机选择一个单词,并把该单词作为一元元组发射出去。TestWordSpout类的nextTuple()方法的实现如下所示:
- public void nextTuple() {
- Utils.sleep(100);
- final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
- final Random rand = new Random();
- final String word = words[rand.nextInt(words.length)];
- _collector.emit(new Values(word));
- }
- public static class ExclamationBolt implements IRichBolt {
- OutputCollector _collector;
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- }
- public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
- }
- public void cleanup() {
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
prepare方法提供给bolt一个Outputcollector用来发射tuple。Bolt可以在任意时候发射tuple – 可以在prepare、execute、cleanup方法中发射, 或者甚至在另一个线程中异步发射。prepare方法只是简单地把OutputCollector作为一个类成员变量保存,以供execute方法以后使用。
- Config conf = new Config();
- conf.setDebug(true);
- conf.setNumWorkers(2);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
通过告诉storm在任务集合之间如何发送元组,“stream grouping”回答了这个问题。在深入研究不同种类的流分组之前,我们先看一下storm-starter项目中的另一个拓扑,WordCountTopology从spout读取句子,WordCountBolt输出单词的出现次数,代码如下所示:
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("sentences", new RandomSentenceSpout(), 5);
- builder.setBolt("split", new SplitSentence(), 8)
- .shuffleGrouping("sentences");
- builder.setBolt("count", new WordCount(), 12)
- .fieldsGrouping("split", new Fields("word"));
一个更有趣的流分组种类是“fields grouping“。SplitSentence和WordCount之间使用的是字段分组(fields grouping)。对于WordCount功能,同一单词流向同一任务是关键。否则,如果多个任务都能获取同一单词,那么由于他们获得的信息不完整,他们将发射不正确的计数结果。字段分组让我们可以按字段的子集对流分组,这使得子段子集的相等值会流向同一任务。由于WordCount使用基于字段“word”的字段分组方式订阅SplitSentence的输出流,所以相同的单词总是流向同一任务,最终bolt输出正确的结果。
- public static class SplitSentence extends ShellBolt implements IRichBolt {
- public SplitSentence() {
- super("python", "splitsentence.py");
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
- import storm
- class SplitSentenceBolt(storm.BasicBolt):
- def process(self, tup):
- words = tup.values[0].split(" ")
- for word in words:
- storm.emit([word])
- SplitSentenceBolt().run()
转自:http://chenlx.blog.51cto.com/4096635/739531
相关问答
更多-
回答你的问题有点难,因为Zookeeper,Maven和Nginx都做了很多不同的事情,但我会尽我所能。 Zookeeper是运行Storm的一部分。 它会跟踪通过Storm拓扑运行的当前事务。 Maven是JVM生态系统中常见的构建管理工具。 你需要这个来构建东西。 在某些情况下,Nginx可能会使用Storm拓扑,但对于所有用例来说肯定不是必需的或共同的。 因此,您可能需要项目中的所有三个,您肯定需要Zookeeper,并且几乎肯定会以某种方式与Maven交互,即使您在其他地方使用Ant或Leineng ...
-
免责声明:我写了您在上述问题中引用的文章 。 但是,我对“任务”的概念有些困惑。 任务是组件的运行实例(spout还是螺栓)? 执行者有多个任务实际上是说执行者多次执行相同的组件,我是否正确? 是的,是的。 此外,在一般的并行性意义上,Storm将为一个喷口或螺栓生成一个专用的线程(执行器),但是由具有多个任务的执行器(线程)对并行性的贡献是什么? 每个执行器运行多个任务不会增加并行级别 - 执行程序总是有一个线程用于其所有任务,这意味着任务在执行程序上连续运行。 正如我在文章中写道,请注意: 在拓扑开始后 ...
-
可能有多个问题。 while循环中没有中断 - 无限循环。 你可以调用f.readline()两次。 您可能打算在每次select后仅调用一次。 为避免阻塞,请在select后使用data = os.read(f.fileno(), 1024) 。 我不知道是否可以阻止nextTuple()直到子进程退出。 如果你所做的只是读取子过程中的行,那么你不需要select : def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')): p = Popen( ...
-
如果您在本地运行并使用LocalCluter ,则无需安装Zookeeper。 如果以伪分布式模式本地运行(即,在本地启动Nimubs和Supervisor)并使用StormSubmitter ,则需要在本地安装Zookeeper。 If you run locally and use LocalCluter you do not need to install Zookeeper. If you run locally in pseudo-distributed mode (ie, start up Ni ...
-
使用Twitter登录(Login using Twitter)[2022-01-18]
这里有一些有用且方便的教程链接 1 - iOS 7 - Twitter集成 2 - Ray wenderlich教程 3 - 视频教程 希望这可以帮助 Here are some useful and handy tutorials links 1 - iOS 7 - Twitter Integration 2 - Ray wenderlich tutorial 3 - Video Tutorial Hope this helps -
我不知道Storm如何管理这个过程,但你当然可以用sqlalchemy反映数据库中的表。 例如,下面是使用我目前可以访问的SQL Server实例的基本示例。 一个完整的数据库 >>> from sqlalchemy import create_engine, MetaData >>> engine = create_engine('mssql+pyodbc://
: @ / ') # replace with user ... -
在Apache Storm bolt中使用Apache Camel ProducerTemplate(Using Apache Camel ProducerTemplate in Apache Storm bolt)[2022-09-09]
您的问题的根本原因是您正在向您的风暴配置添加ProducerTemplate,并且它正在抛出异常,因为它不可序列化。 如果那是你自己的类,你可以改变代码使其工作,但由于这是一个Camel类,我会推荐一种不同的方法。 WebSocketBolt:将您的producerTemplate私有成员更改为transient: private transient ProducerTemplate producerTemplate; 这样就不会尝试序列化(将其置于conf中也会遇到同样的问题)。 WebSocketBol ... -
无法使用kafka-storm向apache storm提交拓扑(Unable to submit topology to apache storm using kafka-storm)[2022-04-07]
我最终通过使用maven repo中预编译的storm-kafka版本并在拓扑中添加过滤器螺栓而不是在spout本身中进行过滤来解决这个问题。 从而消除了对storm-core和storm-kafka本地编译的jar文件的需求。 这不是一个“解决方案”,但它是解决问题的一种方法。 I eventually worked around this problem by using a pre-compiled version of storm-kafka from a maven repo and adding ... -
我从来没有听说过DSMS一词,但是看一下维基百科上的描述,我认为Storm绝对可以说是DSMS。 来自维基百科: 它类似于数据库管理系统(DBMS)[...]但是,与DBMS相比,DSMS执行连续查询,该查询不仅执行一次,而且是永久安装的。 这听起来就像Storm一样。 但请注意,在Storm的情况下,它通常与DBMS结合使用。 例如,Storm可以提供One-time queries , unlimited secondary storage等,维基百科说这与DSMS结合时缺乏DSMS。 I had ne ...
-
关于消息ID是通用的:在内部它可能是一个64位值,但是这个64位值是作为Spout内emit()提供的msgID对象的散列来计算的。 因此,您可以将任何对象作为消息标识(两个对象散列到相同值的概率接近于零)。 关于使用str :我认为在这个例子中, str包含一行(而不是一个单词),并且文档包含两次完全相同的行(如果没有空行可能很多)是不太可能的。 关于计数器作为消息ID:对于你的观察你是绝对正确的 - 如果多个喷嘴并行运行,这会给消息ID冲突并破坏容错。 如果你想“修复”计数器方法,每个计数器应该以不同的 ...