Storm【技术文档】-Worker Executor Task的关系

2019-03-02 23:58|来源: 网路

1   simple introduction

Storm 在集群上运行一个 Topology的时刻,主要通过以下3个实体来完成Topology的执行工作

1  Worker

2 Executor

3 Task


一个Worker 进程执行的是一个topology的子集,这里我们必须强调:不会存在一个worker 为多个topology服务,

一个worker进程会启动一个或则多个executor 线程来执行一个topology的compotent-》也就是Spout或者bolt,

一个topology就是由于集群中间的多台物理机上的Worker构成的


一个executor是一个被Worker进程启动的单独线程,每一个Executor都只会运行一个topology的一个component,

在默认的情况之下,一个spout,或则一个bolt都只会生成一个task,Executor线程里会在每次循环的时候顺序的去调用所有的task的实例子


task是最终运行spout或bolt中代码的单元(注:1个task即为spout或bolt的1个实例,executor线程在执行期间会调用该task的nextTuple或execute方法)。topology启动后,1个component(spout或bolt)的task数目是固定不变的,但该component使用的executor线程数可以动态调整(例如:1个executor线程可以执行该component的1个或多个task实例)。这意味着,对于1个component存在这样的条件:#threads<=#tasks(即:线程数小于等于task数目)。默认情况下task的数目等于executor线程数目,即1个executor线程只运行1个task


更加细化的来说:

一个storm topology运行起来之后, 会在supervisor 机器上启动一些进程来运行spout和bolt实例. 

如果一个topology里面一共有一个spout, 一个bolt。 其中spout的parallelism是2, bolt的parallelism是4, 那么我们可以把这个topology的总工作量看成是6, 那么一共有6个task,那么/tasks/{topology-id}下面一共会有6个以task-id命名的文件,其中两个文件的内容是spout的id, 其它四个文件的内容是bolt的id。 

task->node+port, 它其实就是从task-id到supervisor-id+port的映射, 也就是把这个task分配给某台机器的某个端口来做。 

topology里面的组件(spout/bolt)都根据parallelism被分成多个task, 而这些task被分配给supervisor的多个worker来执行。 

task都会跟一个componment-id关联, componment是spout和bolt的一个统称. 

对于每一个component在部署的时候都会指定使用的数量, 在storm-user中有一个
讨论说明了这个问题: 
里面的大意是说, 通过设置parallelism来指定执行spout/bolt的线程数量. 而在配置中还有另外一个地方(backtype.storm.Config.setNumWorkers(int))来指定一个storm集群中执行topolgy的进程数量, 所有的线程将在这些指定的worker进程中运行. 比如说一个topology中要启动300个线程来运行spout/bolt, 而指定的worker进程数量是60个, 那么storm将会给每个worker分配5个线程来跑spout/bolt, 如果要对一个topology进行调优, 可以调整worker数量和spout/bolt的parallelism数量(调整参数之后要记得重新部署topology. 后续会为该操作提供一个swapping的功能来减小重新部署的时间). 

对于worker和task之间的比例, nathan也给出了
参考, 即1个worker包含10~15个左右, 当然这个参考, 实际情况还是要根据配置和测试情况 


3: work 进程内部消息传递处理和数据结构分析


本文从外部消息在worker进程内部的转化,传递及处理过程入手,一步步分析在worker-data中的数据项存在的原因和意义。试图从代码实现的角度来回答,如果是从头开始实现worker的话,该如何来定义消息接口,如何实现各自接口上的消息处理。


3.1 Topology 到worker的映射关系

Topology 由Spout,Bolt组成,其中的逻辑关系大体如下


请注意 Acker的行为,是在tuple,以及tuple所产生的其他tuple被确认消费掉以后,才会

有你的ACK行为

无论是Spout或Bolt的处理逻辑都需要在进程或线程内执行,那么它们与进程及线程间的映射关系又是如何呢。有关这个问题,Understanding the Parallelism of a Storm Topology 一文作了很好的总结,现重复一下其要点。

  1. 1  worker是进程,executor对应于线程,spout或bolt是一个个的task

  2. 2 同一个worker只会执行同一个topology相关的task

  3. 3 在同一个executor中可以执行多个同类型的task, 即在同一个executor中,要么全部是bolt类的task,要么全部是 spout类的task

  4. 4 运行的时候,spout和bolt需要被包装成一个又一个task


期间的三个组件的关系为:

小结一下,Worker=Process, Executor=Thread, Task=Spout or Bolt.

每一个executor使用的是actor pattern,high level的处理逻辑如下图所示









转自:http://my.oschina.net/u/1791874/blog/282652

相关问答

更多
  • 这听起来像你有两个不同的问题: 1)你的工作队列过多。 你不能只将新任务填入队列中,而不考虑任务执行者的消耗速率。 您需要找出一些逻辑来了解何时阻止向工作队列添加新增内容。 2)任务线程中的任何未捕获的异常都可以完全终止该线程。 当发生这种情况时,ExecutorService将创建一个新线程来替换它。 但这并不意味着你可以忽略任何导致线程死亡的问题! 找到那些未捕获的异常并抓住它们! 这只是一个预感(因为没有足够的信息知道你的帖子),但我认为你的问题不在于任务执行者停止处理任务。 我的猜测是,它不会像创建 ...
  • 您创建了3个Worker实例:第一个是main。 它的列表永远不会被触及,最终是您决定打印的列表。 在决策方法内,您创建了两个新的Workers,每个都有自己的列表。 每个工作人员只更改其列表 - 但这些更改从未反映在主工作人员的列表中。 简而言之,您需要在所有工作人员中“共享”这些列表:使这些列表保持静态和最终。 这应该解决它。 You create 3 instances of Worker: The first one is in the main. Its lists are never touch ...
  • 从文档中检查spark默认值中的spark.executor.cores The number of cores to use on each executor. For YARN and standalone mode only. In standalone mode, setting this parameter allows an application to run multiple executors on the same worker, provided that there are eno ...
  • 我不确定你是使用双引号而不是单引号,这可能是问题所在。 尝试使用单引号,如: citus.task_executor_type = 'task-tracker' I am not sure but you are using double quote instead of single quote and this can be the problem. Try using single quote like: citus.task_executor_type = 'task-tracker'
  • 有几个想法可能会出错: 您可能只打开一个连接(Sybase客户端不是多线程保存,几年前我使用它时) 存储过程锁定一些表,让第二个调用等待和阻塞。 SP在某个开头的某个桌面上进行某种插入,更新,删除吗? 也许在交易中? 那么,第二个SP将等待第一个完成。 我记得Sybase在很长一段时间内的问题是创建一些对象(表,列,视图,临时表等),这些对象专门锁定一些系统表,让第二个SP等待第一个完成创建。 也许有人可以建议,如果仍然可以发生。 Several ideas abbout what might be wro ...
  • 在后台串行执行的AsyncTasks是否也会在同一个线程上运行? 实施相当奇怪。 AsyncTask不是使用单线程线程池,而是维护自己的队列,并将任务一次一个地提供给常规多线程线程池(可见为THREAD_POOL_EXECUTOR)。 是否有可能AsyncTask.SERIAL_EXECUTOR将在线程1上执行任务A,然后在线程2上执行任务B,然后在线程3上执行任务C? 在此方案中,所有任务都是串行执行的,但它们是在不同的后台线程上运行的。 这是非常可能的,假设如果池当前未使用,则ThreadPoolExe ...
  • 在创建任务的循环中,您只使用一个LoginCredentials实例。 所以所有任务都共享实例。 在循环中,您将在每次迭代中覆盖主机名。 所以最后LoginCredentials引用了最后一个任务的主机名。 所以所有任务都连接到该主机。 只是不通过task.getFirewall().getAddress()传递主机名,并在创建JSch会话时直接使用task.getFirewall().getAddress() : SSH ssh = new SSH(lc, t.getSSHCommand(enablepa ...
  • ICallable和IFuture C#示例只返回您提供的相同字符串... 实现基于ICallable的类: 在Call()方法中,您实现计算并返回结果, ExecutorService负责调用此方法并存储结果。 public class BoomerangeCallable : Java.Lang.Object, ICallable { T value; public BoomerangeCallable(T value) { this.value = val ...
  • 我认为https://stackoverflow.com/a/47714449/8845188是一个很好的答案,但我会尝试将其作为示例进行重新说明: 提交拓扑时,组件(例如,喷口或螺栓)的任务数量将一目了然,而执行器的数量可以在不重新部署拓扑的情况下进行更改。 执行者的数量总是小于或等于组件的任务数量。 问题1 您通常没有理由在1个执行程序中选择运行例如2个任务,但如果您当前负载较低但后期预期负载较高,则可以选择提交具有大量任务但数量较少的拓扑执行人 当然,您可以使用尽可能多的执行程序提交拓扑,但是由于上下 ...
  • 相当有趣的技术挑战,谢谢你的提问。 这是使用CompletableFuture for Java8的解决方案。 在Java7中,您可以以完全相同的方式使用io.netty.util.concurrent.Promise 。 最简单的部分是处理正常情况: 创造一个可以完善的未来 安排任务 回归未来 第一个完成未来,其他人被忽略(如果没有被杀死,那么原子布尔控制他们不会覆盖值) 在未来的下一阶段关闭执行器服务 更棘手的部分是在每个单独抛出保持相同的逻辑流时异常完成。 这可以通过累积所有异常并在计数达到最后一个失 ...