知识点
相关文章
更多最近更新
更多Storm-源码分析-Topology Submit-Executor
2019-03-02 23:51|来源: 网路
在worker中通过executor/mk-executor worker e, 创建每个executor
(defn mk-executor [worker executor-id] (let [executor-data (mk-executor-data worker executor-id) ;;1.mk-executor-data _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id)) task-datas (->> executor-data :task-ids (map (fn [t] [t (task/mk-task executor-data t)])) ;;2.mk-task (into {}) (HashMap.)) _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id)) report-error-and-die (:report-error-and-die executor-data) component-id (:component-id executor-data) ;;3.创建threads
;; starting the batch-transfer->worker ensures that anything publishing to that queue ;; doesn't block (because it's a single threaded queue and the caching/consumer started ;; trick isn't thread-safe) system-threads [(start-batch-transfer->worker-handler! worker executor-data)] handlers (with-error-reaction report-error-and-die (mk-threads executor-data task-datas)) threads (concat handlers system-threads)]
;;使用schedule-recurring定期产生SYSTEM_TICK(触发spout pending rotate)
(setup-ticks! worker executor-data)
1. mk-executor-data
(defn mk-executor-data [worker executor-id] (let [worker-context (worker-context worker) task-ids (executor-id->tasks executor-id) ;;包含的tasks component-id (.getComponentId worker-context (first task-ids)) ;;所属于的component storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id) executor-type (executor-type worker-context component-id) ;;executor类型, blot或者spout batch-transfer->worker (disruptor/disruptor-queue ;;executor的发送缓存queue (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE) :claim-strategy :single-threaded :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) ] (recursive-map :worker worker :worker-context worker-context :executor-id executor-id :task-ids task-ids :component-id component-id :open-or-prepare-was-called? (atom false) :storm-conf storm-conf :receive-queue ((:executor-receive-queue-map worker) executor-id) ;;取出executor所对应的disruptor queue :storm-id (:storm-id worker) :conf (:conf worker) :shared-executor-data (HashMap.) :storm-active-atom (:storm-active-atom worker) :batch-transfer-queue batch-transfer->worker :transfer-fn (mk-executor-transfer-fn batch-transfer->worker) ;;(1.1) :suicide-fn (:suicide-fn worker) :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker)) :type executor-type ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field) :stats (mk-executor-stats <> (sampling-rate storm-conf)) ;;(1.2) :interval->task->metric-registry (HashMap.) :task->component (:task->component worker) :stream->component->grouper (outbound-components worker-context component-id) :report-error (throttled-report-error-fn <>) :report-error-and-die (fn [error] ;;将error写到zk的error目录下,其他daemon进程可以知道 ((:report-error <>) error) ((:suicide-fn <>))) :deserializer (KryoTupleDeserializer. storm-conf worker-context) :sampler (mk-stats-sampler storm-conf) ;;1.3 mk-stats-sampler ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function? )))
1.1 mk-executor-transfer-fn
executor会把需要发送的tuple缓存到batch-transfer->worker queue中
参考下面的comments, 为了避免component block (大量的tuple没有被及时处理), 额外创建了overflow buffer, 只有当这个buffer也满了, 才停止nextTuple(对于spout executor比较需要overflow buffer)
;; the overflow buffer is used to ensure that spouts never block when emitting ;; this ensures that the spout can always clear the incoming buffer (acks and fails), which ;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all ;; buffers filled up) ;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer ;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple, ;; preventing memory issues overflow-buffer (LinkedList.)]
返回fn, fn用于将[task, tuple]放到overflow-buffer或者batch-transfer->worker queue中
注意, 这是executor->transfer-fn, 不同于worker->transfer-fn, 名字起的不好, 会混淆
executor的transfer-fn将tuple缓存到executor的batch-transfer->worker, 而worker->transfer-fn将tuple发送到worker的transfer queue
;; in its own function so that it can be mocked out by tracked topologies (defn mk-executor-transfer-fn [batch-transfer->worker] (fn this ([task tuple block? ^List overflow-buffer] (if (and overflow-buffer (not (.isEmpty overflow-buffer))) ;;overflow存在并且不为空,说明queue已经满了,所以直接放overflow-buffer中 (.add overflow-buffer [task tuple]) (try-cause (disruptor/publish batch-transfer->worker [task tuple] block?) (catch InsufficientCapacityException e (if overflow-buffer (.add overflow-buffer [task tuple]) (throw e)) )))) ([task tuple overflow-buffer] (this task tuple (nil? overflow-buffer) overflow-buffer)) ([task tuple] (this task tuple nil) )))
1.2 mk-executor-stats <> (sampling-rate storm-conf)
Storm-源码分析-Stats (backtype.storm.stats)
1.3 mk-stats-sampler
根据conf里面的sampling-rate创建一个sampler
(defn mk-stats-sampler [conf] (even-sampler (sampling-rate conf)))
这里创建的是even-sampler,
(defn even-sampler [freq] (let [freq (int freq) start (int 0) r (java.util.Random.) curr (MutableInt. -1) target (MutableInt. (.nextInt r freq))] ;;[0,freq]中的随机值 (with-meta (fn [] (let [i (.increment curr)] (when (>= i freq) (.set curr start) (.set target (.nextInt r freq)))) (= (.get curr) (.get target))) ;;FP没有直接赋值, 所以==简化为= {:rate freq})))
(defn sampler-rate [sampler] (:rate (meta sampler)))
even-sampler, 返回的是个fn ,并且通过with-meta添加metadata({:rate freq})
所以, 通过(:rate (meta sampler)), 可以从sampler的meta里面取出rate值
sampler就是fn, 每次调用都会返回(= curr target)
curr从start开始递增, 在达到target之前, 调用fn都是返回false
当curr等于target时, 调用fn返回true
当curr大于target时, 从新随机生成target, 将curr清零
所以sampler实际产生的效果, 就是不停的调用sampler, 会随机出现若干次false和一次true (在freq的范围内)
从而达到sampler的效果, 只有是true的时候才取样
其实对于简单的sampler, 比如rate是20%, 可以简单的每跳过4个取一个, 但是这样可能的问题是, 取样的规律性太强, 如果数据恰好符合你的规律, 比如5倍数的数据相同, 就会有问题
所以这里为了增加随机性, 采用这样的实现
并且这里对闭包和metadata的应用, 值得借鉴
2.mk-task, 创建task
(task/mk-task executor-data t)
Storm-源码分析-Topology Submit-Task
3.创建threads
3.1 batch-transfer-queue handle thread, spout发送线程
从batch-transfer-queue取出messages, 没有到达batchend时, 放到cached-emit中的arraylist中
当达到batchend时, 使用transfer-fn将messages发送到transfer-queue (spout应该没有发送给自己的tuple吧)
(defn start-batch-transfer->worker-handler! [worker executor-data] (let [worker-transfer-fn (:transfer-fn worker) cached-emit (MutableObject. (ArrayList.)) ;;用于cache所有messages,直到batchend storm-conf (:storm-conf executor-data) serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data)) ] (disruptor/consume-loop* (:batch-transfer-queue executor-data) (disruptor/handler [o seq-id batch-end?] (let [^ArrayList alist (.getObject cached-emit)] (.add alist o) (when batch-end? (worker-transfer-fn serializer alist) (.setObject cached-emit (ArrayList.)) ))) :kill-fn (:report-error-and-die executor-data))))
Worker, transfer-fn
将task分为local和remote
对于local的, 使用local-transfer将messages发送到对应的recieve-queue里面
而对于remote的, 使用disruptor/publish发送到transfer-queue里面storm使用kryo作为其java的序列化F/W (http://code.google.com/p/kryo/)
(defn mk-transfer-fn [worker] (let [local-tasks (-> worker :task-ids set) local-transfer (:transfer-local-fn worker) ^DisruptorQueue transfer-queue (:transfer-queue worker)] (fn [^KryoTupleSerializer serializer tuple-batch] (let [local (ArrayList.) remote (ArrayList.)] (fast-list-iter [[task tuple :as pair] tuple-batch] (if (local-tasks task) (.add local pair) (.add remote pair) )) (local-transfer local) ;; not using map because the lazy seq shows up in perf profiles (let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])] (disruptor/publish transfer-queue serialized-pairs)
3.2 executor的执行thread
try…catch mk-threads函数, 如果发生异常将error写到zk, 以便其他的daemon能及时知道
handlers (with-error-reaction report-error-and-die
(mk-threads executor-data task-datas))
Storm-源码分析-Topology Submit-Executor-mk-threads
转自:http://www.cnblogs.com/fxjwind/p/3238673
相关问答
更多-
我想我会回答我自己的问题。 基于我在github.com和javadocs中看到的...我相信pyleus不支持1.0.1。 javadocs表明,在以前的版本中,IRichBolt处于backtype.storm.topology但现在它存在于org.apache.storm.topology 。 谁知道还有什么是不相容的。 所以我想现在,运行一个旧版本的Storm(我只看到0.9.4兼容的引用,所以也许0.9.6也可以)。 我确实看到有一个关于0.10.0的开放拉取请求,所以我想在支持1.0.1之前可能 ...
-
经过几次测试后,我设法解决了这个问题。 使用以下测试环境运行: - Windows 7 SP1 --Apache Storm 1.0.3 - Java 1.8.0_111 - Eclipse Mars.2(4.5.2) 在本地群集上运行拓扑的方法示例: private void runTopology(final StormTopology topology, final String topologyName, final long timeout) { LocalCluster localClust ...
-
当您从IDE运行它时,我不确定multilang支持与LocalCluster配合使用。 请参阅https://stackoverflow.com/a/32085316/8845188 。 在回答之后,似乎在对话中提到了一种解决方法。 Just for the future folks - the message was very precise. I simply missed the storm.py in the same folder where my sample.py was located. ...
-
我有一个类似的问题,使用Drools和JMH作为阴影罐。 Drools使用ServiceRegistry方法。 这意味着Drools库(drools-compiler,kie-ci,drools-decisiontables,...)包含相同的命名属性文件,指示它们提供的接口的实现。 阴影jar插件通常将(传递)依赖项展平为一个jar。 对于多次存在的文件,这通常意味着如果没有另外指定,则选择其中一个文件。 对于ServiceRegistry属性,我们需要组合所有文件。 通常这是通过ServicesReso ...
-
有一个非常类似的问题,在运行该程序时它被困在: [SessionTracker] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop! 通过删除解决它 cluster.shutdown(); 因为它不允许动物园管理员与风暴沟通。 Instead of using LocalCluster we have to use StormSubmitter for submi ...
-
在风暴中设计拓扑?(Designing topology in storm?)[2022-03-12]
我建议在设计螺栓时遵循Unix的理念:“做一件事,做得好”。 螺栓'A'过滤数据。 Bolt'B'将螺栓'A'的输出转换为DBObject并将其保存到MongoDB。 这样,您可以保持拓扑简单。 每个螺栓都有简单而专注的责任。 如果出现故障,您确切知道问题所在。 当然,你可以告诉Storm每个螺栓需要多少并行化。 我认为你不需要任何特殊的抽象(三叉戟或交易)。 常规的Storm拓扑为您提供良好的服务。 顺便说一句,Storm文档很棒! I would suggest to follow the philos ... -
无法使用kafka-storm向apache storm提交拓扑(Unable to submit topology to apache storm using kafka-storm)[2022-04-07]
我最终通过使用maven repo中预编译的storm-kafka版本并在拓扑中添加过滤器螺栓而不是在spout本身中进行过滤来解决这个问题。 从而消除了对storm-core和storm-kafka本地编译的jar文件的需求。 这不是一个“解决方案”,但它是解决问题的一种方法。 I eventually worked around this problem by using a pre-compiled version of storm-kafka from a maven repo and adding ... -
Apache风暴:为什么以及如何选择每个执行器的任务数量?(Apache storm: why and how to choose number of tasks per executor?)[2022-06-12]
我认为https://stackoverflow.com/a/47714449/8845188是一个很好的答案,但我会尝试将其作为示例进行重新说明: 提交拓扑时,组件(例如,喷口或螺栓)的任务数量将一目了然,而执行器的数量可以在不重新部署拓扑的情况下进行更改。 执行者的数量总是小于或等于组件的任务数量。 问题1 您通常没有理由在1个执行程序中选择运行例如2个任务,但如果您当前负载较低但后期预期负载较高,则可以选择提交具有大量任务但数量较少的拓扑执行人 当然,您可以使用尽可能多的执行程序提交拓扑,但是由于上下 ... -
我昨天得到了答案,我需要的是将localhost更改为127.0.0.1,然后我在终端上启动Redis数据库,在监视器的第二个终端上启动,我的发布方法正在运行。 I came with the answer yesterday, what I needed is to change the localhost to 127.0.0.1, then I launched the Redis database on a terminal, on a second terminal the monitor, an ...
-
主要版本51表示此类是使用java 7编译的。您需要升级java才能运行它。 Major version 51 means that this class was compiled with java 7. You need to upgrade your java in order to run it.