Twitter Storm 入门指南

2019-03-02 23:38|来源: 网路

通过这个入门指南,你将学会如何创建storm拓扑(topology)和部署拓扑到storm集群。主要使用Java语言,少许例子使用Python说明Storm的多语言特性。

准备工作
入门指南使用了 storm-start项目中的例子。建议您克隆此项目并跟随这些例子。阅读 安装开发环境创建新Storm项目,在你的机器上安装好开发环境和项目。
Storm集群组件
Storm集群表面类似Hadoop集群。但在Hadoop上你运行的是”MapReduce jobs”,在Storm上你运行的是”topologies”。”Jobs”和”topologies”是大不同的,一个关键不同是一个MapReduce的Job最终会结束,而一个topology永远处理消息(或直到你kill它)。
Storm集群有两种节点:控制(master)节点和工作者(worker)节点。控制节点运行一个称之为”nimbus”的后台程序,它类似于Haddop的”JobTracker”。Nimbus负责在集群范围内分发代码、为worker分配任务和故障监测。
每个工作者节点运行一个称之”Supervisor”的后台程序。Supervisor监听分配给它所在机器的工作,基于Nimbus分配给它的事情来决定启动或停止工作者进程。每个工作者进程执行一个topology的子集;一个运行中的topology由许多跨多个机器的工作者进程组成。

 

一个 Zookeeper集群负责Nimbus和多个Supervisor之间的所有协调工作。此外,Nimbus后台程序和Supervisor后台程序都是快速失败(fail-fast)和无状态的;所有状态维持在Zookeeper或本地磁盘。这意味着你可以kill -9杀掉nimbus进程和supervisor进程,然后重启,它们将恢复状态并继续工作,就像什么也没发生。这种设计使storm极其稳定。
拓扑(topologies)
在Storm上做实时计算,你需要创建拓扑(topologies)。一个拓扑( topology)是一个计算图。一个拓扑中的每个节点都包含处理逻辑,节点之间的连接表示节点间数据流向。
运行一个拓扑相当简单。首先,打包你的全部代码和依赖文件到一个jar文件。然后,你运行一条命令,如下所示:
          
  
  1. storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2 
这是运行带 arg1 和  arg2参数的  backtype.storm.MyTopology 类,这个类的主要功能是定义拓扑,并提交拓扑到nimbus,  storm jar 负责连接nimbus和上传jar包。
由于拓扑定义只是一些Thrift结构体,并且nimbus是一个Thrift服务,所以你可以使用任意语言创建和提交拓扑。使用一种基于JVM的语言创建和提交拓扑,上述例子是最容易的方法。
关于启动/停止拓扑方面的更多信息,请参见 生产集群环境运行拓扑
 

Storm核心的抽象概念是“流”。流是一个无界的连续元组(tuple)。Storm提供用分布式和可靠的方式转换一个流到一个新流的原语primitive)。例如,你可以转换一个tweets流到一个trending topics流。

Storm为完成流转换提供的基本原语是“spouts”和“bolts“, sputs和bolts提供一些接口供你实现,用来运行你特定应用的业务逻辑。

spout是流的源头。例如,一个spout可能从Kestrel队列读取元组并以流的方式发射它们,一个spout可能连接Twitter API并发射一个tweets流。

bolts消费任意数量输入流,做一些处理,也可能发送新流。复杂的流转换,像从一个tweets流计算trending topics流,需要多个步骤和多个bolts。bolts可以做以下任何事情,包括运行功能,过滤元组,聚合流,连接流,访问数据库,以及更多的事情。
spouts和bolts组成的网被打包到一个 拓扑(“topology”),拓扑是storm中最高层次的一个抽象概念,拓扑可以被提交到storm集群执行。一个拓扑是一个流转换图,图中的每个节点是一个spout或bolt。图中的边表示bolt订阅了哪个流。当spout或bolt发送元组到流,它发送元组到每一个订阅了该流的bolt。

拓扑中节点间的连接表示元组在拓扑范围内如何传递。例如,如果从SpoutA到BoltB有一条连接,从SpoutA到BoltC也有一条连接,BoltB到BoltC有一条连接,那么,每当SpoutA发射一个元组,元组会发送到BoltB和BoltC,BoltB发射的所有元组也都会发送到BoltC。
Storm拓扑的每个节点并行运行。你可以指定每个节点的并行数,Storm将通过集群产生与并行数相等数量的线程。
拓扑将一直运行,除非你杀掉它。Storm将重新分配失败任务。另外,即使机器down机并且消息被放弃的情况下,Storm都将保证数据不会丢失。
数据模型
Storm使用元组(tuple)作为它的数据模型。一个元组就是一个值列表(List),列表中的值都有一个名字,并且一个元组的字段(field)可以是任意类型的对象。开箱即用,Storm支持所有基本类型、字符串类型、字节数组作为元组字段的值。为了使用一个其它类型的对象,只需该类型实现序列化。
拓扑的每个节点必须为它发射的元组声明输出字段。例如,这个Bolt声明它发射字段为“double”和“triple”的元组,代码如下所示:
           
   
  1. public class DoubleAndTripleBolt implements IRichBolt { 
  2.     private OutputCollectorBase _collector; 
  3.  
  4.     @Override 
  5.     public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { 
  6.         _collector = collector; 
  7.     } 
  8.  
  9.     @Override 
  10.     public void execute(Tuple input) { 
  11.         int val = input.getInteger(0);         
  12.         _collector.emit(input, new Values(val*2, val*3)); 
  13.         _collector.ack(input); 
  14.     } 
  15.  
  16.     @Override 
  17.     public void cleanup() { 
  18.     } 
  19.  
  20.     @Override 
  21.     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  22.         declarer.declare(new Fields("double""triple")); 
  23.     }     
成员方法  declareOutputFields 为这个组件声明输出字段  ["double", "triple"] ,剩下的bolt将在后面的章节解释。
一个简单拓扑
为了探索更多的概念,并明白代码是如何表现的,让我们一起看一个简单拓扑。我们先看一下storm-starter项目中类 ExclamationTopology的 定义,代码如下所示:
            
    
  1. TopologyBuilder builder = new TopologyBuilder();         
  2. builder.setSpout("words"new TestWordSpout(), 10);         
  3. builder.setBolt("exclaim1"new ExclamationBolt(), 3
  4.         .shuffleGrouping("words"); 
  5. builder.setBolt("exclaim2"new ExclamationBolt(), 2
  6.         .shuffleGrouping("exclaim1"); 
这个拓扑包括一个spout和两个bolt。Spout发送单词。每个bolt在输入数据的尾部追加字符串“!!!”。三个节点排成一条线:spout发射给首个bolt,然后,这个bolt再发射给第二个bolt。如果spout发射元组“bob”和“john”,然后,第二个bolt将发射元组“bob!!! !!!”和“john!!!!!!”。
 
上述代码使用方法  setSpout 和方法  setBolt 来定义节点。这些方法需要三个入参:用户指定节点ID,业务逻辑处理对象,节点的并行数。这个例子,我们给spout指定的id是“words”,给bolt指定的id分别是“exclaim1”和“exclaim2”。
 
业务逻辑处理对象如果是spout,则需要实现 IRichSpout接口;如果是bolt,则需要实现 IRichBolt接口。
 
最后的参数是你要设置的节点并行数,这是个可选参数。它表示组件由多少线程执行,这些线程很可能不在同一个JVM,如果你不设置节点并行数,那么storm仅为那个节点分配一个线程。
 
setBolt 方法返回一个 InputDeclarer对象,此对象用于定义该bolt的输入流。组件“ excliam1”声明它要 读取所有由组件“ words”发射出来的元组,并使用随机分组策略( shuffle grouping)分发到各线程,组件 excliam2”声明它要读取所有由组件“excliam1”发射出来的元组,并使用随机分组策略分发到各线程。“shuffle grouping”是指元组从输入任务被随机的分发到bolt任务。组件间的数据分发策略还有很多,我将用少量章节解释它们。
 
如果你想组件“ excliam2”读取所有由组件“words”和组件“excliam1”发射的元组,那你可以这样声明组件“excliam2”,代码如下所示:
            
    
  1. builder.setBolt("exclaim2"new ExclamationBolt(), 5
  2.             .shuffleGrouping("words"
  3.             .shuffleGrouping("exclaim1"); 
像你看到的一样,这个bolt的输入流被声明为链接指定的多个数据源。

让我们深入探究拓扑中spout和bolt的实现。Spout负责发送新消息到拓扑。拓扑中的TestWordSpout每隔0.1秒就从["nathan", "mike", "jackson", "golda", "bertels"]列表中随机选择一个单词,并把该单词作为一元元组发射出去。TestWordSpout类的nextTuple()方法的实现如下所示:

             
     
  1. public void nextTuple() { 
  2.     Utils.sleep(100); 
  3.     final String[] words = new String[] {"nathan""mike""jackson""golda""bertels"}; 
  4.     final Random rand = new Random(); 
  5.     final String word = words[rand.nextInt(words.length)]; 
  6.     _collector.emit(new Values(word)); 
正如你看到的,nextTuple()方法的实现非常简单。
ExclamationBolt在单词的尾部追加“!!!”。让我们看下ExclamationBolt的完整实现。代码如下所示:
             
     
  1. public static class ExclamationBolt implements IRichBolt { 
  2.     OutputCollector _collector; 
  3.  
  4.     public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 
  5.         _collector = collector; 
  6.     } 
  7.  
  8.     public void execute(Tuple tuple) { 
  9.         _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); 
  10.         _collector.ack(tuple); 
  11.     } 
  12.  
  13.     public void cleanup() { 
  14.     } 
  15.  
  16.     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  17.         declarer.declare(new Fields("word")); 
  18.     } 

prepare方法提供给bolt一个Outputcollector用来发射tuple。Bolt可以在任意时候发射tuple – 可以在prepare、execute、cleanup方法中发射, 或者甚至在另一个线程中异步发射。prepare方法只是简单地把OutputCollector作为一个类成员变量保存,以供execute方法以后使用。

execute方法从bolt的一个输入流接收tuple。 ExclamationBolt获取tuple的第一个字段(field),然后在值的尾部追加“!!!”作为一个新元组发射出去。如果一个bolt有多个输入源,你可以通过调用Tuple类的getSourceComponent方法找出tuple来自哪个输入源。
 
在execute方法中还有其它一些事情,即输入的tuple作为emit方法的第一个参数,在最后一行,输入的tuple被ack。这些是 用于保证数据不会丢失的Storm可靠性API的一部分,本文稍后将解释这些。
 
当bolt关闭时,cleanup方法将被调用,它将清理所有已打开的资源。在集群中并不保证cleanup方法一定被调用。例如,如果正在运行task的机器突然down机,那么就没办法调用cleanup方法。Cleanup方法当初是为了在本地模式运行拓扑而设计(本地模式:在一个进程内模拟storm集群),你可以运行和杀掉一些topology,且不会有资源泄漏方面的问题。
 
最后,declareOutputFields方法声明ExclamationBolt发射只有一个“word”字段的一元元组。
 
本地模式运行ExclamationTopology
让我们来看一下如何在本地模式运行 ExclamationTopology,并且它是工作正常。
Storm有两种运行模式:本地模式和分布式模式。在本地模式,Storm完全执行在一个进程,通过线程模拟工作节点。本地模式对开发和测试拓扑非常有用。当你运行storm-starter项目中的拓扑时,它们将运行在本地模式,你能看到每个组件正在发射的消息。本地模式运行拓扑的更多信息请参见 本地模式
 
在分布式模式,storm运行在一个机器集群上。为了运行这个拓扑,当你提交一个拓扑到master时,你也需要提交这个拓扑的所有必要代码,master将负责分发代码和分配工作进程,如果工作进程down掉,master将重新分配它们到其它进程。在一个集群中运行拓扑的更多信息请参见 在一个生产集群运行拓扑
 
以下是本地模式运行ExclamationTopology的代码:
            
    
  1. Config conf = new Config(); 
  2. conf.setDebug(true); 
  3. conf.setNumWorkers(2); 
  4.  
  5. LocalCluster cluster = new LocalCluster(); 
  6. cluster.submitTopology("test", conf, builder.createTopology()); 
  7. Utils.sleep(10000); 
  8. cluster.killTopology("test"); 
  9. cluster.shutdown(); 
首先,代码通过创建一个LocalCluster对象定义一个进程内集群(在进程内模拟集群)。提交拓扑到虚拟集群和提交拓扑到真正的分布式集群相同。通过调用submitTopology方法提交拓扑到LocalCluster,该方法需要三个参数:拓扑名称、拓扑的配置、拓扑自身。
拓扑名称用于标识拓扑,以便你以后你能kill它。拓扑将一直运行,直到你kill它。
拓扑配置用于调整拓扑。这有两个很常见的配置,如下所示:
1. TOPOLOGY_WORKERS(调用setNumWorkers方法设置它)规定在集群内分配多少进程来执行拓扑。拓扑内的组件由许多线程执行,分配给组件的线程数通过setBolt方法和setSpout方法配置。这些线程分散在worker进程内,每个worker进程包含分配给一些组件的一些线程。例如,你配置所有组件的线程数为300,工作进程数为50,每个工作进程将执行6个线程,每个线程可以属于不同的组件。你可以通过调整组件并行度和worker进程数来调整Storm拓扑的性能。
2. TOPOLOGY_DEBUG(调用setDebug方法设置它)设置为true,组件每发射一个消息就会记录日志。这在本地模式测试拓扑非常有用,但在集群中运行拓扑时,你也许要设置为false,因为大量日志输出对性能影响非常大。
拓扑的其它配置还有许多。配置明细参见 java文档
学习如何设置开发环境,以便你能在本地模式运行拓扑(例如eclipse), 参见新建storm项目
流分组
流分组( stream grouping)告诉topology两个组件之间如何发送tuple。记住,spouts和bolts以许多跨集群任务(task)的形式并行执行。如果你在任务层次看拓扑如何执行,它看起来像这样,如下所示:

当任务boltA发射元组到boltB,它会发送元组到哪个任务?

通过告诉storm在任务集合之间如何发送元组,“stream grouping”回答了这个问题。在深入研究不同种类的流分组之前,我们先看一下storm-starter项目中的另一个拓扑,WordCountTopology从spout读取句子,WordCountBolt输出单词的出现次数,代码如下所示:

         
 
  1. TopologyBuilder builder = new TopologyBuilder(); 
  2.          
  3. builder.setSpout("sentences"new RandomSentenceSpout(), 5);         
  4. builder.setBolt("split"new SplitSentence(), 8
  5.         .shuffleGrouping("sentences"); 
  6. builder.setBolt("count"new WordCount(), 12
  7.         .fieldsGrouping("split"new Fields("word")); 
SplitSentence对句子中每个单词都发射一个元组,WordCount在内存中以map形式保持单词计数。WordCount每收到一个单词,就更新单词的计数并发射新的计数。
下面有少数几个不同种类的流分组。
最简单的流分组是随机分组(“shuffle grouping”),它发送元组到一个随机任务。 WordCountTopology拓扑使用随机分组从RandomSentenceSpout发送元组到 SplitSentence。它均等分配元组处理工作到跨集群的所有SplitSentence任务(task)。

一个更有趣的流分组种类是“fields grouping“。SplitSentenceWordCount之间使用的是字段分组(fields grouping)。对于WordCount功能,同一单词流向同一任务是关键。否则,如果多个任务都能获取同一单词,那么由于他们获得的信息不完整,他们将发射不正确的计数结果。字段分组让我们可以按字段的子集对流分组,这使得子段子集的相等值会流向同一任务。由于WordCount使用基于字段“word”的字段分组方式订阅SplitSentence的输出流,所以相同的单词总是流向同一任务,最终bolt输出正确的结果。

字段分组是流合并、流聚合、众多其它用例的基础。其实,字段分组是使用取模散列法(mod hashing)实现。
还有少许其它类型的流分组,请参见 概念
用其它语言定义Bolts
可以用任何语言定义Bolt。用其它语言定义的bolt当作子进程(subprocess)执行,storm用JSON消息通过标准输入或输出(stdin/stdout)与子进程通讯。通信协议仅需要一个约100行的适配库,并且storm已为Ruby、Python和Fancy提供了适配库。
WordCountTopology拓扑中SplitSentence的定义,代码如下所示:
          
  
  1. public static class SplitSentence extends ShellBolt implements IRichBolt { 
  2.     public SplitSentence() { 
  3.         super("python""splitsentence.py"); 
  4.     } 
  5.  
  6.     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  7.         declarer.declare(new Fields("word")); 
  8.     } 
SplitSentence继承ShellBolt,并且声明它用带参数splitsentence.py 的python命令来运行。splitsentence.py的代码如下所示:
          
  
  1. import storm 
  2.  
  3. class SplitSentenceBolt(storm.BasicBolt): 
  4.     def process(self, tup): 
  5.         words = tup.values[0].split(" "
  6.         for word in words: 
  7.           storm.emit([word]) 
  8.  
  9. SplitSentenceBolt().run() 
关于用其它语言定义spout和bolt、创建拓扑的更多信息,请参见: Using non-JVM languages with Storm。 
 
可靠消息处理
在这个教程之前,我们跳过了关于元组如何发射方面的一些内容。这些方面是storm可靠性API的一部份:storm如何保证从spout出来的每个消息都被完整处理。可靠消息处理是如何工作的,用户需要做哪些工作来才能获得storm可靠性方面的好处,关于这些方面的信息,请参见 可靠消息处理
 
结尾
入门指南全面介绍了拓扑的开发、测试、部署。文档的其余部分会深入介绍storm使用的各个方面。
 

 


转自:http://chenlx.blog.51cto.com/4096635/739531

相关问答

更多
  • 回答你的问题有点难,因为Zookeeper,Maven和Nginx都做了很多不同的事情,但我会尽我所能。 Zookeeper是运行Storm的一部分。 它会跟踪通过Storm拓扑运行的当前事务。 Maven是JVM生态系统中常见的构建管理工具。 你需要这个来构建东西。 在某些情况下,Nginx可能会使用Storm拓扑,但对于所有用例来说肯定不是必需的或共同的。 因此,您可能需要项目中的所有三个,您肯定需要Zookeeper,并且几乎肯定会以某种方式与Maven交互,即使您在其他地方使用Ant或Leineng ...
  • 免责声明:我写了您在上述问题中引用的文章 。 但是,我对“任务”的概念有些困惑。 任务是组件的运行实例(spout还是螺栓)? 执行者有多个任务实际上是说执行者多次执行相同的组件,我是否正确? 是的,是的。 此外,在一般的并行性意义上,Storm将为一个喷口或螺栓生成一个专用的线程(执行器),但是由具有多个任务的执行器(线程)对并行性的贡献是什么? 每个执行器运行多个任务不会增加并行级别 - 执行程序总是有一个线程用于其所有任务,这意味着任务在执行程序上连续运行。 正如我在文章中写道,请注意: 在拓扑开始后 ...
  • 可能有多个问题。 while循环中没有中断 - 无限循环。 你可以调用f.readline()两次。 您可能打算在每次select后仅调用一次。 为避免阻塞,请在select后使用data = os.read(f.fileno(), 1024) 。 我不知道是否可以阻止nextTuple()直到子进程退出。 如果你所做的只是读取子过程中的行,那么你不需要select : def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')): p = Popen( ...
  • 如果您在本地运行并使用LocalCluter ,则无需安装Zookeeper。 如果以伪分布式模式本地运行(即,在本地启动Nimubs和Supervisor)并使用StormSubmitter ,则需要在本地安装Zookeeper。 If you run locally and use LocalCluter you do not need to install Zookeeper. If you run locally in pseudo-distributed mode (ie, start up Ni ...
  • 这里有一些有用且方便的教程链接 1 - iOS 7 - Twitter集成 2 - Ray wenderlich教程 3 - 视频教程 希望这可以帮助 Here are some useful and handy tutorials links 1 - iOS 7 - Twitter Integration 2 - Ray wenderlich tutorial 3 - Video Tutorial Hope this helps
  • 我不知道Storm如何管理这个过程,但你当然可以用sqlalchemy反映数据库中的表。 例如,下面是使用我目前可以访问的SQL Server实例的基本示例。 一个完整的数据库 >>> from sqlalchemy import create_engine, MetaData >>> engine = create_engine('mssql+pyodbc://:@/') # replace with user ...
  • 您的问题的根本原因是您正在向您的风暴配置添加ProducerTemplate,并且它正在抛出异常,因为它不可序列化。 如果那是你自己的类,你可以改变代码使其工作,但由于这是一个Camel类,我会推荐一种不同的方法。 WebSocketBolt:将您的producerTemplate私有成员更改为transient: private transient ProducerTemplate producerTemplate; 这样就不会尝试序列化(将其置于conf中也会遇到同样的问题)。 WebSocketBol ...
  • 我最终通过使用maven repo中预编译的storm-kafka版本并在拓扑中添加过滤器螺栓而不是在spout本身中进行过滤来解决这个问题。 从而消除了对storm-core和storm-kafka本地编译的jar文件的需求。 这不是一个“解决方案”,但它是解决问题的一种方法。 I eventually worked around this problem by using a pre-compiled version of storm-kafka from a maven repo and adding ...
  • 我从来没有听说过DSMS一词,但是看一下维基百科上的描述,我认为Storm绝对可以说是DSMS。 来自维基百科: 它类似于数据库管理系统(DBMS)[...]但是,与DBMS相比,DSMS执行连续查询,该查询不仅执行一次,而且是永久安装的。 这听起来就像Storm一样。 但请注意,在Storm的情况下,它通常与DBMS结合使用。 例如,Storm可以提供One-time queries , unlimited secondary storage等,维基百科说这与DSMS结合时缺乏DSMS。 I had ne ...
  • 关于消息ID是通用的:在内部它可能是一个64位值,但是这个64位值是作为Spout内emit()提供的msgID对象的散列来计算的。 因此,您可以将任何对象作为消息标识(两个对象散列到相同值的概率接近于零)。 关于使用str :我认为在这个例子中, str包含一行(而不是一个单词),并且文档包含两次完全相同的行(如果没有空行可能很多)是不太可能的。 关于计数器作为消息ID:对于你的观察你是绝对正确的 - 如果多个喷嘴并行运行,这会给消息ID冲突并破坏容错。 如果你想“修复”计数器方法,每个计数器应该以不同的 ...