Twitter Storm 概念

2019-03-02 23:38|来源: 网路

这个页面列出了storm的主要概念和查找更多信息的链接。讨论的概述有:
1. 拓扑( Topologies
2. 流( Streams
3. 喷嘴(Spouts)
4. 螺栓(Bolts)
5. 流分组(Stream groupings)
6. 可靠性(Reliability)
7. 任务(Tasks)
8. 工作者(Workers)
9. 配置(Configuration)
 
Topologies
实时应用的逻辑被打包在storm拓扑里。storm拓扑类似于MapReduce job。一个关键的区别是MapReduce job运行一段时间后最终会完成,而storm拓扑一直运行(直到你kill它)。一个拓扑是由spouts和bolts组成的图,spouts和bolts之间通过流分组连接起来。下面描述了这些概念。
 
TopologyBuilder:用java语言,使用这个类来构造拓扑。
本地模式:学习在本地模式如何开发和测试拓扑。
 
Streams
Storm核心的抽象概念是“流”。流是一个分布式并行创建和处理的无界的连续元组(tuple)。流通过一种模式来定义,该模式是给流元组中字段命名。默认情况下,元组可以包含整型、长整型、短整型、字节、字符串、双精度浮点数、单精度浮点数、布尔型和字节数组。您还可以自定义序列化,在元组中使用自定义类型。
 
在声明流时,每个流被分配一个ID,由于由spouts和bolts组成的单流如此普遍, OutputFieldDeclarer提供便利的方法声明一个不需要指定ID的单流,此时,流被分配一个默认ID为“default”。
 
元组:流由元组组成
OutputFieldDeclarer:用于声明流及其模式
序列化:关于storm元组的动态定型和自定义序列化方面的信息
ISerialization:自定义序列化必须实现此接口
CONFIG.TOPOLOGY_SERIALIZATIONS:使用该配置注册自定义序列化
 
Spouts
Spout是拓扑中流的源泉。通常spouts从外部资源读取元组,然后发射元组到拓扑中(例如,Kestrel队列或Twitter API)。Spouts即可以是可靠的,也可是不可靠的。可靠的spout可以重新执行一个失败元组,但一个不可靠的spout一发射元组就会忘记它。
 
Spouts 可以发射多个流。要发射多个流,使用 OutputFieldDeclarer的declareStream方法声明多个流,并在使用 SpoutOutputCollector的emit方法时指定流ID。
 
Spouts的重要方法是nextTuple方法。nextTuple方法发射一个新的元组到拓扑,或如果没有新的元组发射,简单的返回。注意任务spout的nextTuple方法都不要实现成阻塞的,因为storm是在相同的线程中调用spout的方法。
 
Spout的另外两个重要方法是ack和fail方法。当spout发射的元组被拓扑成功处理时,调用ack方法;当处理失败时,调用fail方法。Ack和fail方法仅被可靠spouts调用。更多信息参见 the Javadoc
 
资料:
IRichSpout:spout必须实现的接口
 
拓扑中的所有处理都在bolts中完成。Bolts什么都可以做,如过滤、业务功能、聚合、连接(合并)、访问数据库等等。
 
Bolts可以做简单的流转换。复杂的流转换经常需要多步完成,因此也需要多个bolts。例如,转换tweets数据流到流行图片数据流至少需要两步:一个bolt 对retweets的图片进行滚动计数,另外的bolt找出Top X(前几位)的图片(你可以用更具伸缩性的方式处理这部分流)。
 
Bolts可以发射多个流。要发射多个流,使用 OutputFieldDeclarer的declareStream方法声明多个流,并在使用 SpoutOutputCollector的emit方法时指定流ID。
 
当你声明一个bolt的输入流时,你总是以另一个组件的指定流作为输入。如果你想订阅另一个组件的所有流,你必须分别订阅每一个流。InputDeclarer提供了使用默认流ID订阅流的语法糖,调用 declarer.shuffleGrouping(“1”)订阅组件“1”上的默认流,作用等同于declarer.shuffleGrouping(“1”, DEFAULT_STREAM_ID)。
 
Bolts的主要方法是execute方法,任务在一个新的元组输入时执行该方法。Bolts使用 OutputCollector对象发射新的元组。Bolts必须对每个处理的元组调用 OutputCollector的ack方法,因此storm知道这个元组完成处理(并且能最终确定ack原始元组是安全的)。一般情况,处理一个输入元组,基于此元组再发射0-N个元组,然后ack输入元组。Strom提供了一个IBasicBolt接口自动调用ack方法。
 
在Bolts中载入新的线程进行异步处理。 OutputCollector是线程安全的,并随时都可调用它。
 
IRichBolt:bolt的接口
IBasicBolt:为定义用于过滤和简单功能的bolt 提供了方便接口
OutputCollector:bolts使用此类的实例发射元组到输出流。
 
Stream groupings
指定每个bolt应接收的输入流是定义bolts的一部分工作。流分组定义流应该如何分割到各个任务。
 
Storm包括六种流分组类型:
1.  随机分组Shuffle grouping):随机分发元组到bolt的任务,保证每个任务获得相等数量的元组。
2.  字段分组Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。
3.  全部分组All grouping):元组被复制到bolt的所有任务。小心使用该分组。
4.  全局分组Global grouping):全部流都分配到bolt的同一个任务,明确地说,是分配给ID最小的那个task。
5.  无分组None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,storm将把无分组的bolts放到bolts或spouts订阅它们的同一线程去执行(如果可能)。
6.  直接分组Direct grouping):这是一个特别的分组类型。元组生产者决定元组由哪个元组消费者任务接收。该分组仅能被声明为direct stream的流使用。元组必须通过emitDirect方法直接发射。Bolt获取消费者任务id,可以使用已提供的TopologyContext或保持引用 OutputCollector的emit方法的输出(元组发送后返回的任务ID列表)。
 
资料:
TopologyBuilder:使用这个类定义拓扑
InputDeclarer:每当调用TopologyBuilder的setBolt方法时,返回这个对象,它用于声明一个拓扑的输入流和声明输入流如何分组。
CoordinatedBolt:该bolt用于分布式RPC拓扑,并大量使用direct stream和直接分组。
 
Reliability
Storm保证每个元组都被拓扑完全执行。它会追踪每个spout元组引发的元组树并检测元组树何时成功完成处理。每个拓扑都关联一个“message timeout”,如果storm检测到未在超时时间之内完成spout元组的处理,那么storm记录元组失败,并稍后重做该元组。
 
为了充分利用storm的可靠性特性,你必须通知storm何时发射了一个新元组,何时完成了一个元组,这些都可使用发射元组的bolt的OutputCollector对象完成,用emit方法通知storm发射了新元组,用ack方法知道storm元组已完成。
 
更多详情请参见 可靠消息处理
 
Tasks
每个spout或bolt都是作为集群中的许多任务来执行。每个任务相当于一个执行线程,流分组定义从一组任务到另一组任务如何发送元组。你可以使用 TopologyBuilder的setSpout方法和setBolt方法为每个spout或bolt设置并行数。
 
Workers
拓扑执行时可能跨一个或多个工作者进程。每个工作者进程是一个物理JAVA虚拟机,并执行拓扑的一部分任务。例如,假如分配给拓扑的并行数为300,工作者进程为50,那么每个工作者进程执行6个任务(工作者进程中的线程)。Storm试图在所有工作者进程间均匀分布任务。
 
资料:
Config.TOPOLOGY_WORKERS 该配置设置执行拓扑的工作者进程数量
 
Configuration
Storm有各种配置调整nimbus、supervisor、运行拓扑的行为。一些配置是系统配置,不能在拓扑上进行修改,而其它配置每个拓扑都能修改。
 
在storm代码库的default.yaml文件中,每个配置都定义了一个默认值。通过在nimbus和 supervisor的CLASSPATH中定义一个storm.yaml文件,你能 覆写这些配置。最后,当使用 StormSubmitter提交你的拓扑时,你可以定义一个特定的拓扑配置。
 
配置的优先顺序是defaults.yaml < storm.yaml < 拓扑特有配置。不过,拓扑特有配置仅能覆写前缀为“TOPOLOGY”的配置。
 
资料:
Config:所有配置的清单,也是一个用于创建拓扑特有配置的帮助类
default.yaml:所有配置的默认值
安装storm集群:说明如何创建和配置集群
在生产集群运行拓扑:在集群运行拓扑时有用的配置
本地模式:本地模式时有用的配置
 
 

转自:http://chenlx.blog.51cto.com/4096635/743630

相关问答

更多
  • 当下雨的时候,风暴都到了
  • 在风暴谷歌集团找到答案。 似乎DRCP拓扑将发出一个元组,其中包含由DRCP spout作为流接收的参数,然后在处理完成时指示回(使用称为请求ID的唯一标识)。 在同一个线程中,hadoop可能最适合这些情况,除非数据不够大,并且可以一直处理。 Found the answer in the storm google group. Seems that DRCP topologies will emit a tuple with parameters that is received by DRCP spo ...
  • 回答你的问题有点难,因为Zookeeper,Maven和Nginx都做了很多不同的事情,但我会尽我所能。 Zookeeper是运行Storm的一部分。 它会跟踪通过Storm拓扑运行的当前事务。 Maven是JVM生态系统中常见的构建管理工具。 你需要这个来构建东西。 在某些情况下,Nginx可能会使用Storm拓扑,但对于所有用例来说肯定不是必需的或共同的。 因此,您可能需要项目中的所有三个,您肯定需要Zookeeper,并且几乎肯定会以某种方式与Maven交互,即使您在其他地方使用Ant或Leineng ...
  • 免责声明:我写了您在上述问题中引用的文章 。 但是,我对“任务”的概念有些困惑。 任务是组件的运行实例(spout还是螺栓)? 执行者有多个任务实际上是说执行者多次执行相同的组件,我是否正确? 是的,是的。 此外,在一般的并行性意义上,Storm将为一个喷口或螺栓生成一个专用的线程(执行器),但是由具有多个任务的执行器(线程)对并行性的贡献是什么? 每个执行器运行多个任务不会增加并行级别 - 执行程序总是有一个线程用于其所有任务,这意味着任务在执行程序上连续运行。 正如我在文章中写道,请注意: 在拓扑开始后 ...
  • 免责声明:我是Apache Flink提交者和PMC成员,只熟悉Storm的高级设计,而不是其内部。 Apache Flink是统一流和批处理的框架。 Flink的运行时原生支持这两个域,这是由于并行任务之间的流水线数据传输,包括流水线洗牌。 记录将立即从生产任务发送到接收任务(在收集缓冲区进行网络传输之后)。 可以使用阻止数据传输来选择执行批处理作业。 Apache Spark是一个支持批处理和流处理的框架。 Flink的批处理API看起来很相似,并且解决了与Spark类似的用例,但在内部方面却有所不同。 ...
  • 可能有多个问题。 while循环中没有中断 - 无限循环。 你可以调用f.readline()两次。 您可能打算在每次select后仅调用一次。 为避免阻塞,请在select后使用data = os.read(f.fileno(), 1024) 。 我不知道是否可以阻止nextTuple()直到子进程退出。 如果你所做的只是读取子过程中的行,那么你不需要select : def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')): p = Popen( ...
  • 您的问题的根本原因是您正在向您的风暴配置添加ProducerTemplate,并且它正在抛出异常,因为它不可序列化。 如果那是你自己的类,你可以改变代码使其工作,但由于这是一个Camel类,我会推荐一种不同的方法。 WebSocketBolt:将您的producerTemplate私有成员更改为transient: private transient ProducerTemplate producerTemplate; 这样就不会尝试序列化(将其置于conf中也会遇到同样的问题)。 WebSocketBol ...
  • 我最终通过使用maven repo中预编译的storm-kafka版本并在拓扑中添加过滤器螺栓而不是在spout本身中进行过滤来解决这个问题。 从而消除了对storm-core和storm-kafka本地编译的jar文件的需求。 这不是一个“解决方案”,但它是解决问题的一种方法。 I eventually worked around this problem by using a pre-compiled version of storm-kafka from a maven repo and adding ...
  • 我从来没有听说过DSMS一词,但是看一下维基百科上的描述,我认为Storm绝对可以说是DSMS。 来自维基百科: 它类似于数据库管理系统(DBMS)[...]但是,与DBMS相比,DSMS执行连续查询,该查询不仅执行一次,而且是永久安装的。 这听起来就像Storm一样。 但请注意,在Storm的情况下,它通常与DBMS结合使用。 例如,Storm可以提供One-time queries , unlimited secondary storage等,维基百科说这与DSMS结合时缺乏DSMS。 I had ne ...
  • 关于消息ID是通用的:在内部它可能是一个64位值,但是这个64位值是作为Spout内emit()提供的msgID对象的散列来计算的。 因此,您可以将任何对象作为消息标识(两个对象散列到相同值的概率接近于零)。 关于使用str :我认为在这个例子中, str包含一行(而不是一个单词),并且文档包含两次完全相同的行(如果没有空行可能很多)是不太可能的。 关于计数器作为消息ID:对于你的观察你是绝对正确的 - 如果多个喷嘴并行运行,这会给消息ID冲突并破坏容错。 如果你想“修复”计数器方法,每个计数器应该以不同的 ...