twitter storm 源码走读之5 -- worker进程内部消息传递处理和数据结构分析

2019-03-02 23:48|来源: 网路

欢迎转载,转载请注明出处,徽沪一郎。

本文从外部消息在worker进程内部的转化,传递及处理过程入手,一步步分析在worker-data中的数据项存在的原因和意义。试图从代码实现的角度来回答,如果是从头开始实现worker的话,该如何来定义消息接口,如何实现各自接口上的消息处理。

Topology到Worker的映射关系

Topology由Spout,Bolt组成,其逻辑关系大体如下图所示。

无论是Spout或Bolt的处理逻辑都需要在进程或线程内执行,那么它们与进程及线程间的映射关系又是如何呢。有关这个问题,Understanding the Parallelism of a Storm Topology 一文作了很好的总结,现重复一下其要点。

  1. worker是进程,executor对应于线程,spout或bolt是一个个的task
  2. 同一个worker只会执行同一个topology相关的task
  3. 在同一个executor中可以执行多个同类型的task, 即在同一个executor中,要么全部是bolt类的task,要么全部是spout类的task
  4. 运行的时候,spout和bolt需要被包装成一个又一个task

worker,executor, task三者之间的关系可以用下图表示

小结一下,Worker=Process, Executor=Thread, Task=Spout or Bolt.

每一个executor使用的是actor pattern,high level的处理逻辑如下图所示

外部消息的接收和处理

在源码走读之四一文中总结了worker进程内的各种类型的thread,也即executor,这个等同于定义了进程内部和进程间的接口类型。那么这些接口上的消息在具体流传和处理过程中需要定义哪些数据结构,针对这些数据结构,又要做哪些必要的处理呢?

换句话说,就是为什么在worker.clj中有哪些数据和函数存在,不这样做,可以不?

先图示一下,外部消息处理的大概流程。

注:圈起来的数字表示消息转换和处理的序列。

步骤一

监听端口准备就绪,接收线程在收到外部的消息后,面临的问题就是如何确定由哪个task来处理该消息。接收到的tuple中含有task-id,根据task-id可以知道运行该task的executor,executor中有receive-message-queue即(incoming queue)来存放外部的tuple. 定义的数据结构需要反映这个转换过程task-id->executor->receive-queue-map.

那么在worker-data中哪些数据项与这个过程相关呢

  1. :port
  2. :executor-receive-queue-map
  3. :short-executor-receive-queue-map
  4. :task->short-executor
  5. :transfer-local-fn

transfer-local-fn将数据从接收线程发送到spout或bolt所在的executor线程。

步骤二

接下来数据会被传递到executor,于是又牵涉到executor的数据结构问题。executor-data由函数mk-executor-data创建,其内容与worker-data比较起来相对较少。

executor收到tuple之后,第一步需要进行反序列化,storm中使用kyro来进行序列化和反序列化,这也是为什么在executor中有该数据项的原因。

executor中与步骤2相关的数据项

  1. :type executor-type
  2. :receive-queue
  3. :deserializer (executor-data中的数据项)

步骤三:

步骤2处理结束,会产生相应的tuple发送到外部。这个过程需要多解释一下,首先tuple不是直接发送给worker的transfer-thread(负责向其它进程发送消息),而是发送给send-handler线程,每一个executor在创建的时候最起码会有两个线程被创建,一个用于运行bolt或spout的处理逻辑,另一个用以负责缓存bolt或spout产生的对外发送的tuple。

一旦snd-hander中的tuple数量达到阀值,这些被缓存的tuple会一次性发送给worker级别的transfer-thread.

executor中与步骤3相关的数据项

  1. :transfer-fn (mk-executor-transfer-fn batch-transfer->worker)
  2. :batch-transfer-queue

在步骤3中生成outgoing的tuple,tuple生成的时候需要回答两个基本问题

  1. tuple中含有哪些字段 --   该问题的解答由spout或bolt中的declareOutFields来解决
  2. 由哪个node+port来接收该tuple -- 由grouping来解决,这个时候就可以看出为什么需要task这一层的逻辑抽象了,有关grouping的详细解释,请参考fxjwind撰写的Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)

步骤四:

处理逻辑很简单,先将数据缓存,然后在达到阀值之后,一起传送给transfer-thread.

start-batch-transfer->worker-handler

(defn start-batch-transfer->worker-handler! [worker executor-data]
  (let [worker-transfer-fn (:transfer-fn worker)
        cached-emit (MutableObject. (ArrayList.))
        storm-conf (:storm-conf executor-data)
        serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data)) 
        ]
    (disruptor/consume-loop*
      (:batch-transfer-queue executor-data)
      (disruptor/handler [o seq-id batch-end?]
        (let [^ArrayList alist (.getObject cached-emit)]
          (.add alist o)
          (when batch-end?
            (worker-transfer-fn serializer alist)
            (.setObject cached-emit (ArrayList.))
            )))
      :kill-fn (:report-error-and-die executor-data))))

worker-transfer-fn是worker中的transfer-fn,由mk-transfer-fn生成。

(defn mk-transfer-fn [worker]
  (let [local-tasks (-> worker :task-ids set)
        local-transfer (:transfer-local-fn worker)
        ^DisruptorQueue transfer-queue (:transfer-queue worker)]
    (fn [^KryoTupleSerializer serializer tuple-batch]
      (let [local (ArrayList.)
            remote (ArrayList.)]
        (fast-list-iter [[task tuple :as pair] tuple-batch]
          (if (local-tasks task)
            (.add local pair)
            (.add remote pair)
            ))
        (local-transfer local)
        ;; not using map because the lazy seq shows up in perf profiles
        (let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])]
          (disruptor/publish transfer-queue serialized-pairs)
          )))))

步骤五:

处理函数mk-transfer-tuples-handler,主要进行序列化,将序列化后的数据发送给目的地址。

(defn mk-transfer-tuples-handler [worker]
  (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
        drainer (ArrayList.)
        node+port->socket (:cached-node+port->socket worker)
        task->node+port (:cached-task->node+port worker)
        endpoint-socket-lock (:endpoint-socket-lock worker)
        ]
    (disruptor/clojure-handler
      (fn [packets _ batch-end?]
        (.addAll drainer packets)
        (when batch-end?
          (read-locked endpoint-socket-lock
            (let [node+port->socket @node+port->socket
                  task->node+port @task->node+port]
              ;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead)
              ;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering)
            
              (fast-list-iter [[task ser-tuple] drainer]
                ;; TODO: consider write a batch of tuples here to every target worker  
                ;; group by node+port, do multipart send              
                (let [node-port (get task->node+port task)]
                  (when node-port
                   (.send ^IConnection (get node+port->socket node-port) task ser-tuple))
                    ))))
          (.clear drainer))))))

tuple发送的时候需要用到connection,但目前只知道task-id,所以在worker中需要保存task-id到node+port的映射,node+port与outgoing connections之间的映射。

worker中与步骤5相关的数据项:

  1. :cached-node+port->socket
  2. :cached-task->node+port
  3. :component->stream->fields
  4. :component->sorted-tasks
  5. :endpoint-socket-lock
  6. :transfer-queue (线程内部的消息队列)
  7. :task->component

其它的数据项

上述五个步骤并没有涵盖worker-data所有的数据项,那么其它的数据项归一归类,大体如下

timer相关,timer相关的数据项包括timer及其对应的处理句柄

  1. :heartbeat-timer
  2. :refresh-connection-timer
  3. :refresh-active-timer
  4. :executor-heartbeat-timer
  5. :user-timer

zk相关

  1. :storm-cluster-state
  2. :storm-active-atom
  3. :cluster-state

配置相关

  1. :conf
  2. :mq-context 在transport layer是使用zmq还是netty

Assignment相关

  1. :storm-id
  2. :assigment-id
  3. :worker-id
  4. :executors
  5. :task-ids
  6. :storm-conf
  7. :topology
  8. :system-topology

进程关闭相关

  1. :suicide-fn

其它的其它

  1. :uptime 运行时间,统计用
  2. :default-shared-resources 线程池
  3. :user-shared-resources 未启用

 小结

设计的时候,一定是先画出一个大概的蓝图,然后逐步的细化并加以实现。具体来说,步骤如下

  1. manifest 定义主要的功能
  2. draw skeleton 画出实现草图,定义主要的接口
  3. discussion 与团队讨论
  4. data structures 数据结构
  5. function 函数实现
  6. testing 测试

转自:http://www.cnblogs.com/hseagle/p/3477863

相关问答

更多
  • 你好湮魂,从多个角度全面讲解Storm实时数据处理技术和最佳实践,为快速掌握并灵活应用Storm提供实用指南   从实际问题出发,系统介绍Storm的基本应用、多语言特性、完整业务系统实现和产品交付的最佳实践方法;从产品持续交付角度,分析并实践集成、测试和交付的所有步骤   《大数据技术丛书:Storm实时数据处理》涵盖搭建基于Storm的开发环境和测试实时系统的许多实用方法与实战用例,以及如何应用交付最佳实践来将系统部署至云端。
  • 回答你的问题有点难,因为Zookeeper,Maven和Nginx都做了很多不同的事情,但我会尽我所能。 Zookeeper是运行Storm的一部分。 它会跟踪通过Storm拓扑运行的当前事务。 Maven是JVM生态系统中常见的构建管理工具。 你需要这个来构建东西。 在某些情况下,Nginx可能会使用Storm拓扑,但对于所有用例来说肯定不是必需的或共同的。 因此,您可能需要项目中的所有三个,您肯定需要Zookeeper,并且几乎肯定会以某种方式与Maven交互,即使您在其他地方使用Ant或Leineng ...
  • 你的理解是正确的。 bolt的每个任务都只会打开自己与数据库服务器的连接。 Your understanding is correct. Each task of a bolt will just open its own connection to the database server.
  • 免责声明:我写了您在上述问题中引用的文章 。 但是,我对“任务”的概念有些困惑。 任务是组件的运行实例(spout还是螺栓)? 执行者有多个任务实际上是说执行者多次执行相同的组件,我是否正确? 是的,是的。 此外,在一般的并行性意义上,Storm将为一个喷口或螺栓生成一个专用的线程(执行器),但是由具有多个任务的执行器(线程)对并行性的贡献是什么? 每个执行器运行多个任务不会增加并行级别 - 执行程序总是有一个线程用于其所有任务,这意味着任务在执行程序上连续运行。 正如我在文章中写道,请注意: 在拓扑开始后 ...
  • 可能有多个问题。 while循环中没有中断 - 无限循环。 你可以调用f.readline()两次。 您可能打算在每次select后仅调用一次。 为避免阻塞,请在select后使用data = os.read(f.fileno(), 1024) 。 我不知道是否可以阻止nextTuple()直到子进程退出。 如果你所做的只是读取子过程中的行,那么你不需要select : def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')): p = Popen( ...
  • 您需要将json属性从json对象中拉出,并将两个值(json对象和String groupId)作为双值元组传递。 当您将流声明为拓扑规范逻辑的一部分时,您将为第二个字段指定名称“groupId”,并且事情应该正常工作。 如果您不想修改Kafka喷口,则需要有一个中间螺栓,其唯一目的是将groupId从json对象中分离出来。 中间螺栓还可以使用定向流(emitDirect()方法),将目标放在json对象中的groupId上。 这就是为什么我不重复使用Kafka喷口的一个原因 - 除了盲目地将数据写入流 ...
  • 您的问题的根本原因是您正在向您的风暴配置添加ProducerTemplate,并且它正在抛出异常,因为它不可序列化。 如果那是你自己的类,你可以改变代码使其工作,但由于这是一个Camel类,我会推荐一种不同的方法。 WebSocketBolt:将您的producerTemplate私有成员更改为transient: private transient ProducerTemplate producerTemplate; 这样就不会尝试序列化(将其置于conf中也会遇到同样的问题)。 WebSocketBol ...
  • 我最终通过使用maven repo中预编译的storm-kafka版本并在拓扑中添加过滤器螺栓而不是在spout本身中进行过滤来解决这个问题。 从而消除了对storm-core和storm-kafka本地编译的jar文件的需求。 这不是一个“解决方案”,但它是解决问题的一种方法。 I eventually worked around this problem by using a pre-compiled version of storm-kafka from a maven repo and adding ...
  • 我从来没有听说过DSMS一词,但是看一下维基百科上的描述,我认为Storm绝对可以说是DSMS。 来自维基百科: 它类似于数据库管理系统(DBMS)[...]但是,与DBMS相比,DSMS执行连续查询,该查询不仅执行一次,而且是永久安装的。 这听起来就像Storm一样。 但请注意,在Storm的情况下,它通常与DBMS结合使用。 例如,Storm可以提供One-time queries , unlimited secondary storage等,维基百科说这与DSMS结合时缺乏DSMS。 I had ne ...
  • 我不知道你正在使用的平台,但在C ++ 10ms是永恒的 。 我认为你正在使用错误的工具来完成工作。 使用C ++,提供一些本地查询应该不到一微秒。 触摸多个内存位置和/或必须等待磁盘或网络I / O的非本地查询别无选择,只能花费更多时间。 在这种情况下,并行性是你最好的朋友。 你必须找到瓶颈。 是I / O吗? 是CPU吗? 是内存带宽吗? 是内存访问时间吗? 在找到瓶颈之后,您可以改进它,异步它和/或乘以(=并行化)它。 I don't know the platform you're using, b ...