Storm数据流模型的分析及讨论

2019-03-02 23:56|来源: 网路

转自:http://www.cnblogs.com/panfeng412/archive/2012/07/29/storm-stream-model-analysis-and-discussion.html

Storm数据流模型的分析及讨论

本文首先介绍了Storm的基本概念和数据流模型,然后结合一个典型应用场景来说明Storm支持Topology之间数据流订阅的必要性,最后对比了Storm与另一个流处理系统在数据流模型上的区别之处。

Storm基本概念

Storm是一个开源的实时计算系统,它提供了一系列的基本元素用于进行计算:Topology、Stream、Spout、Bolt等等。

在Storm中,一个实时应用的计算任务被打包作为Topology发布,这同Hadoop的MapReduce任务相似。但是有一点不同的是:在Hadoop中,MapReduce任务最终会执行完成后结束;而在Storm中,Topology任务一旦提交后永远不会结束,除非你显示去停止任务。

计算任务Topology是由不同的Spouts和Bolts,通过数据流(Stream)连接起来的图。下面是一个Topology的结构示意图:

其中包含有:

Spout:Storm中的消息源,用于为Topology生产消息(数据),一般是从外部数据源(如Message Queue、RDBMS、NoSQL、Realtime Log)不间断地读取数据并发送给Topology消息(tuple元组)。

Bolt:Storm中的消息处理者,用于为Topology进行消息的处理,Bolt可以执行过滤, 聚合, 查询数据库等操作,而且可以一级一级的进行处理。

最终,Topology会被提交到storm集群中运行;也可以通过命令停止Topology的运行,将Topology占用的计算资源归还给Storm集群。

Storm数据流模型

数据流(Stream)是Storm中对数据进行的抽象,它是时间上无界的tuple元组序列。在Topology中,Spout是Stream的源头,负责为Topology从特定数据源发射Stream;Bolt可以接收任意多个Stream作为输入,然后进行数据的加工处理过程,如果需要,Bolt还可以发射出新的Stream给下级Bolt进行处理。

下面是一个Topology内部Spout和Bolt之间的数据流关系:

Topology中每一个计算组件(Spout和Bolt)都有一个并行执行度,在创建Topology时可以进行指定,Storm会在集群内分配对应并行度个数的线程来同时执行这一组件。

那么,有一个问题:既然对于一个Spout或Bolt,都会有多个task线程来运行,那么如何在两个组件(Spout和Bolt)之间发送tuple元组呢?

Storm提供了若干种数据流分发(Stream Grouping)策略用来解决这一问题。在Topology定义时,需要为每个Bolt指定接收什么样的Stream作为其输入(注:Spout并不需要接收Stream,只会发射Stream)。

目前Storm中提供了以下7种Stream Grouping策略:Shuffle Grouping、Fields Grouping、All Grouping、Global Grouping、Non Grouping、Direct Grouping、Local or shuffle grouping,具体策略可以参考这里

一种Storm不能支持的场景

以上介绍了一些Storm中的基本概念,可以看出,Storm中Stream的概念是Topology内唯一的,只能在Topology内按照“发布-订阅”方式在不同的计算组件(Spout和Bolt)之间进行数据的流动,而Stream在Topology之间是无法流动的

这一点限制了Storm在一些场景下的应用,下面通过一个简单的实例来说明。

假设现在有一个Topology1的结构如下:通过Spout产生数据流后,依次需要经过Filter Bolt,Join Bolt,Business1 Bolt。其中,Filter Bolt用于对数据进行过滤,Join Bolt用于数据流的聚合,Business1 Bolt用于进行一个实际业务的计算逻辑。

目前这个Topology1已经被提交到Storm集群运行,而现在我们又有了新的需求,需要计算一个新的业务逻辑,而这个Topology的特点是和Topology1公用同样的数据源,而且前期的预处理过程完全一样(依次经历Filter Bolt和Join Bolt),那么这时候Storm怎么来满足这一需求?据个人了解,有以下几种“曲折”的实现方式:

1)  第一种方式:首先kill掉已经在集群中运行的Topology1计算任务,然后实现Business2 Bolt的计算逻辑,并重新打包形成一个新的Topology计算任务jar包后,提交到Storm集群中重新运行,这时候Storm内的整体Topology结构如下:

这种方式的缺点在于:由于要重启Topology,所以如果Spout或Bolt有状态则会丢失掉;同时由于Topology结构发生了变化,因此重新运行Topology前需要对程序的稳定性、正确性进行验证;另外Topology结构的变化也会带来额外的运维开销。

2)  第二种方式:完全开发部署一套新的Topology,其中前面的公共部分的Spout和Bolt可以直接复用,只需要重新开发新的计算逻辑Business2 Bolt来替换原有的Business1 Bolt即可。然后重新提交新的Topology运行。这时候Storm内的整体Topology结构如下:

这种方式的缺点在于:由于两个Topology都会从External Data Source读取同一份数据,无疑增加了External Data Source的负载压力;而且会导致同样的数据在Storm集群内被传输相同的两份,被同样的计算单元Bolt进行处理,浪费了Storm的计算资源和网络传输带宽。假设现在不止有两个这样的Topology计算任务,而是有N个,那么对Storm的计算Slot的浪费很严重。

注意:上述两种方式还有一个公共的缺点——系统可扩展性不好,这意味着不管哪种方式,只要以后有这种新增业务逻辑的需求,都需要进行复杂的人工操作或线性的资源浪费现象。

3) 第三种方式:OK,看了以上两种方式后,也许你会提出下面的解决方案:通过Kafka这样的消息中间件,实现不同Topology的Spout共享数据源,而且这样可以做到消息可靠传输、消息rewind回传等,好处是对于Storm来说,已经有了storm-kafka插件的支持。这时候Storm内的整体Topology结构如下:

这种实现方式可以通过引入一层消息中间件减少对External Data Source的重复访问的压力,而且可以通过消息中间件层,屏蔽掉External Data Source的细节,如果需要扩展新的业务逻辑,只需要重新部署运行新的Topology,应该说是现有Storm版本下很好的实现方式了。不过消息中间件的引入,无疑将给系统带来了一定的复杂性,这对于Storm上的应用开发来说提高了门槛。

值得注意的是,方案三中仍遗留有一点问题没有解决:对于Storm集群来说,这种方式还是没有能够从根本上避免数据在Storm不同Topology内的重复发送与处理。这是由于Storm的数据流模型上的限制所导致的,如果Storm实现了不同Topology之间Stream的共享,那么这一问题也就迎刃而解了。

一个流处理系统的数据流模型

个人工作中有幸参与过一个流处理框架的开发与应用。下面我们来简单看看其中所采用的数据流模型:

其中:

1)数据流(data stream):时间分布和数量上无限的一系列数据记录的集合体;

2)数据记录(data record):数据流的最小组成单元,每条数据记录包括 3 类数据:所属数据流名称(stream name)、用于路由的数据(keys)和具体数据处理逻辑所需的数据(value);

3)数据处理任务定义(task definition):定义一个数据处理任务的基本属性,无法直接被执行,必须特化为具体的任务实例。其基本属性包括:

  • (可选)输入流(input stream):描述该任务依赖哪些数据流作为输入,是一个数据流名称列表;数据流产生源不会依赖其他数据流,可忽略该配置;
  • 数据处理逻辑(process logic):描述该任务具体的处理逻辑,例如由独立进程进行的外部处理逻辑;
  • (可选)输出流(output stream):描述该任务产生哪个数据流,是一个数据流名称;数据流处理链末级任务不会产生新的数据流,可忽略该配置;

4)数据处理任务实例(task instance):对一个数据处理任务定义进行具体约束后,可推送到某个处理结点上运行的逻辑实体。附加下列属性:

  • 数据处理任务定义:指向该任务实例对应的数据处理任务定义实体;
  • 输入流过滤条件(input filting condition):一个 boolean 表达式列表,描述每个输入流中符合什么条件的数据记录可以作为有效数据交给处理逻辑;若某个输入流中所有数据记录都是有效数据,则可直接用 true 表示;
  • (可选)强制输出周期(output interval):描述以什么频率强制该任务实例产生输出流记录,可以用输入流记录个数或间隔时间作为周期;忽略该配置时,输出流记录产生周期完全由处理逻辑自身决定,不受框架约束;

5)数据处理结点(node):可容纳多个数据处理任务实例运行的实体机器,每个数据处理结点的IPv4地址必须保证唯一。

该分布式流处理系统由多个数据处理结点(node)组成;每个数据处理结点(node)上运行有多个数据任务实例(task instance);每个数据任务实例(task instance)属于一个数据任务定义(task definition),任务实例是在任务定义的基础上,添加了输入流过滤条件和强制输出周期属性后,可实际推送到数据处理结点(node)上运行的逻辑实体;数据任务定义(task definition)包含输入数据流、数据处理逻辑以及输出数据流属性。

该系统中,通过分布式应用程序协调服务ZooKeeper集群存储以上数据流模型中的所有配置信息;不同的数据处理节点统一通过ZooKeeper集群获取数据流的配置信息后进行任务实例的运行与停止、数据流的流入和流出。

同时,每个数据处理任务可以接受流系统中已存在的任意数据流(data stream)作为输入,并产出新的任意名称的数据流(data stream),被其他结点上运行的任务实例订阅。不同结点之间对于各个数据流(data stream)的订阅关系,通过ZooKeeper集群来动态感知并负责通知流系统做出变化。

二者在数据流模型上的不同之处

至于两个系统的实现细节,我们先不去做具体比较,下面仅列出二者在数据流模型上的一些不同之处(这里并不是为了全面对比二者的不同之处,只是列出其中的关键部分):

1)  在Storm中,数据流Stream是在Topology内进行定义,并在Topology内进行传输的;而在上面提到的流处理系统中,数据流Stream是在整个系统内全局唯一的,可以在整个集群内被订阅。

2)  在Storm中,数据流Stream的发布和订阅都是静态的,所谓静态是指数据流的发布与订阅关系在向Storm集群提交Topology计算任务时,被一次性生成的,这一关系在Topology的运行过程中是不能被改变的;而在上面提到的流处理系统中,数据流Stream的发布和订阅都是动态的,即数据处理任务task可以动态的发布Stream,也可以动态的订阅系统内已经生成的任意Stream,数据流的订阅关于通过分布式应用程序协调服务ZooKeeper集群的动态节点来维护管理。

有了以上的对比,我们不难发现,对于本文所举的应用场景实例,Storm的数据流模式尚不能很方便的支持,而在这里提到的这个流处理系统的全局数据流模型下,这一应用场景的需求可以很方便的满足。

总结的话

个人觉得,Storm有必要实现不同Topology之间Stream的共享,这个至少可以在不损失Storm现有功能的前提下,使得Storm在处理实际生产环境下的一些应用场景时更加从容应对。

至于如何在现有Storm的基础上实现这一需求,可能的方式很多。一种简单的方式是通过Zookeeper来集中存储、动态感知Topology之间Stream的“发布-订阅”关系,同时在Storm的消息分发过程中对这种情况加以处理。

以上观点,如果不对之处,欢迎大家指出。



转自:http://blog.csdn.net/blue_jjw/article/details/9252523

相关问答

更多
  • VAS5054A是德国大众和奥迪公司为其特约服务站指定的必备汽车检测仪,在世界范围内不仅德国大众、奥迪、捷克大众斯柯达、西班牙大众西雅特,还是国内的上海大众,一汽大众——奥迪等服务站均采用的设备。其功能是其它任何诊断仪器所不能替代的。 VAS5054A是一款通用的诊断接口线,主要用于大众集团的车辆和来自其它工厂的汽车OBD系统。 集成的蓝牙接口可以把车辆连接到笔记本电脑,台式电脑或者测试系统上。
  • 不是的. pcap文件中的,或者现场抓包得到的,都是以太网中的网络数据包.是以数据包为单位的.你在wireshark中上面列表中看到的每一行,都是一个以太网数据包.而机器之间的通信,都是需要经过多个数据包的发送与接收,才能完成的. 你可以在上面的filter那里设置过滤的协议,将列表中杂乱的数据包过滤一下,仅显示某个协议的数据包,列表中,就都是这个协议及这个协议承载的协议的数据包了. 至于双向传输,也需要多个数据包的传输交互理解后,才可以完成,所以每个数据包肯定是单向的. 数据流的含义比较模糊,不是很确定, ...
  • 在控制流程语言中,您可以使用一些指令来处理外部数据。 条件执行,跳转和过程调用更改要执行的指令流。 这可以被视为流经数据的指令(例如,指令对通过指令加载数据的寄存器进行操作 - 数据是静态的,除非指令流移动它)。 控制流“if”语句跳转到指令流中的正确分支,但数据不会被移动。 在数据流语言中,您有一个从指令传递到要处理的指令的数据流。 条件执行,跳转和过程调用将数据路由到不同的指令。 这可以被看作是数据流过静态指令,如电信号如何流过电路或水流过管道。 数据流“if”语句将数据路由到正确的分支。 数据流功能和 ...
  • 是的,你可以在Storm中构建这样的事情,这是一个不错的选择,因为你正在处理Storm擅长的事情(连续的数据流,容易并行化的计算。 另外,由于您提到了PHP和JS,即使它不是JVM语言,也可以将您的Storm拓扑的组件编写为任何您喜欢的组件,尽管Clojure DSL for Storm非常好用且易于使用。 顺便说一句,如果您对JavaScript感到满意,并且不想处理学习Storm,那么您可能需要考虑使用Node.js来构建它。 Yes, you can build something like this ...
  • 关于加缪,是的,启动工作的调度员应该工作。 他们在LinkedIn上使用的是Azkaban,你也可以看一下。 如果在另一个完成之前启动,则会读取一些数据量两次。 由于第二个作业将从第一个作业使用的相同偏移开始读取。 关于加缪与S3,目前我不认为这已经到位。 Kafka actually retains events for a configurable period of time -- events are not purged immediately upon consumption like othe ...
  • 这是否意味着可以使用像Spark中构建的Random Forest模型这样的复杂学习模型来测试Spark Streaming程序中的流数据? 是的,您可以在批处理模式下训练像随机森林这样的模型,并在以后存储模型以进行预测。 如果您想将其与流式应用程序集成,其中值连续进行预测,您只需要在内存中加载模型(实际上读取特征向量及其权重)并进行预测直到结束。 它是否像引用已构建的“模型”一样简单,并在Spark Streaming程序中调用“predictOnValues()”? 是。 在这种情况下,现有的火花流机器 ...
  • 不要寻找这些术语的数学严格描述; 它们比这更模糊,松散的分类可以重叠。 我认为“数据流”在这里相当清楚; 它描述了数据流,并根据并发语句对其进行了描述。 但我想补充说,每个并发语句都会被其输入的变化唤醒并提供其输出; 因此(重要的一点:)在发生的事情的顺序和源代码中的元素的顺序之间没有对应关系。 在这方面,它与函数式编程有很多共同之处。 前两个模型都是数据流; 在(I)中,元素按逻辑顺序排列,而(II)不是。 “行为”也应该相当清楚 - 它只是根据行为来描述一个电路。 但它通常并不反对数据流 - 虽然你的圣 ...
  • 数据流是确定性的,允许并行性。 参与者是非确定性的,允许并发。 Dataflow is deterministic which allows parallelism. Actors are non-deterministic which allows concurrency.
  • 与所有可能的输入相比,实现100%代码覆盖更容易,因为所有可能输入的集合可能非常大或几乎无限制。 测试它们需要花费太多时间。 让我们看一个简单的示例函数: double invert(double x) { return 1.0/x; } 单元测试可能如下所示: double y = invert(5); double expected = 1.0/5.0; EXPECT_EQ( expected, y ); 此测试实现了100%的代码覆盖率。 但是,在1.8446744e + 19个可能的输入 ...
  • 目前Scala不支持,您可以投票支持此请求: SCL-8555 “分析数据流”在Scala中不起作用 It's not supported for Scala at the moment, you can vote for this request: SCL-8555 "Analyze Dataflow" Doesn't work In Scala