知识点
相关文章
更多最近更新
更多storm定时器timer源码分析-timer.clj
2019-03-02 23:47|来源: 网路
storm定时器与java.util.Timer定时器比较相似。java.util.Timer定时器实际上是个线程,定时调度所拥有的TimerTasks;storm定时器也有一个线程负责调度所拥有的"定时任务"。storm定时器的"定时任务"是一个vector类型的数据[time, callback, uuid],内有会有三个值,分别是时间、函数、和uuid,很好理解,时间表示该定时任务什么时候执行,函数表示要执行的函数,uuid用于标识该"定时任务"。"定时任务"被存放到定时器的PriorityQueue队列中(和PriorityBlockingQueue区别,在于没有阻塞机制,不是线程安全的)。优先级队列是堆数据结构的典型应用,如果不提供Comparator的话,优先队列中元素默认按自然顺序排列,也就是数字默认是小的在队列头,字符串则按字典序排列(参阅 Comparable),也可以根据 Comparator 来指定,这取决于使用哪种构造方法。优先级队列不允许null元素。依靠自然排序的优先级队列还不允许插入不可比较的对象(这样做可能导致 ClassCastException)。当然也可以自己重新实现Comparator接口, 比如storm定时器就用reify重新实现了Comparator接口。storm定时器的执行过程比较简单,通过timer-thread,不断检查PriorityQueue里面时间最小的"定时任务"是否已经可以触发了, 如果可以(当前时间>=执行时间),就poll出来,调用callback,并sleep。storm定时器相关的函数均定义在timer.clj文件中,storm定时器是由mk-timer函数创建的,mk-timer函数定义如下:
mk-timer函数
( defnk mk-timer [ :kill-fn ( fn [ & _ ] ) :timer-name nil ]
;; queue绑定PriorityQueue队列,创建PriorityQueue队列时指定队列初始容量为10,并指定一个Comparator比较器,Comparator比较器比较"定时任务"执行时间的大小,这样每次poll出执行时间最小的"定 时任务", PriorityQueue队列是一个依赖执行时间的小顶堆
( let [ queue ( PriorityQueue. 10 ( reify Comparator
( compare
[ this o1 o2 ]
( - ( first o1) ( first o2)))
( equals
[ this obj ]
true)))
;; active标识timer-thread是"active"的
active ( atom true)
;; 创建一个锁,因为PriorityQueue并不是线程安全的,所以通过这个锁,可以使多线程互斥访问PriorityQueue
lock ( Object.)
;; notifier是一个java信号量,初始值为0,notifier信号量的主要功能就是当我们调用cancel-timer函数中断一个timer-thread时,等待timer-thread结束,当timer-thread结束前会release ;; notifier信号量
notifier ( Semaphore. 0)
;; thread-name绑定timer-thread线程名,没有指定时默认为"timer"
thread-name ( if timer-name timer-name "timer")
;; timer-thread线程
timer-thread ( Thread.
( fn []
;; 当timer-thread为"active"即active=true时,进入while循环
( while @ active
( try
;; peek函数从PriorityQueue获取执行时间最小的"定时任务",但并不出队列。time-millis绑定执行时间,elem绑定"定时任务"数据
( let [[ time-millis _ _ :as elem ] ( locking lock ( .peek queue ))]
;; 如果elem不为nil且当前时间>=执行时间,那么先加锁,然后poll出该"定时任务",并将"定时任务"的callback函数绑定到afn,最后调用该函数;否则判断time-millis ;; 是否为nil
;; 我们可以发现该定时器是软时间执行"定时任务"的,也就是说"定时任务"有可能被延迟执行,同时如果afn函数执行时间比较长,那么会影响下一个"定时任务"的执行
( if ( and elem ( >= ( current-time-millis) time-millis))
;; It is imperative to not run the function
;; inside the timer lock. Otherwise, it is
;; possible to deadlock if the fn deals with
;; other locks, like the submit lock.
( let [ afn ( locking lock ( second ( .poll queue )))]
;; 执行"定时任务"的callback函数
( afn))
;; 该if语句是上面if语句的else分支,判断time-millis是否为nil,如果time-millis不为nil,则timer-thread线程sleep(执行时间-当前时间);否则 ;; sleep(1000),表明PriorityQueue中没有"定时任务"
( if time-millis
;; If any events are scheduled, sleep until
;; event generation. If any recurring events
;; are scheduled then we will always go
;; through this branch, sleeping only the
;; exact necessary amount of time.
( Time/sleep ( - time-millis ( current-time-millis)))
;; Otherwise poll to see if any new event
;; was scheduled. This is, in essence, the
;; response time for detecting any new event
;; schedulings when there are no scheduled
;; events.
( Time/sleep 1000))))
( catch Throwable t
;; Because the interrupted exception can be
;; wrapped in a RuntimeException.
;; 检查是否是InterruptedException,如果是InterruptedException,说明线程是由于接收interrupt信号而中断的,不做异常处理,否则调用kill-fn函数、修改线程状 ;; 态并抛出该异常
( when-not ( exception-cause? InterruptedException t)
( kill-fn t)
( reset! active false)
( throw t)))))
;; release notifier信号量,标识timer—thread运行结束
( .release notifier)) thread-name )]
;; 设置timer-thread为守护线程
( .setDaemon timer-thread true)
;; 设置timer-thread为最高优先级
( .setPriority timer-thread Thread/MAX_PRIORITY)
;; 启动timer-thread线程
( .start timer-thread)
;; 返回该定时器的"属性"
{ :timer-thread timer-thread
:queue queue
:active active
:lock lock
:cancel-notifier notifier }))
我们可以通过调用cancel-timer函数中断一个timer-thread线程,cancel-timer函数定义如下:
cancel-timer函数
[ timer ]
;; 检查timer状态是否是"active",如果不是则抛出异常
( check-active! timer)
;; 加锁
( locking ( :lock timer)
;; 将timer的状态active设置成false,即"dead"
( reset! ( :active timer) false)
;; 调用interrupt方法,中断线程,通过mk-timer函数我们可以知道在线程的run方法内调用了sleep方法,当接收到中断新号后会抛出InterruptedException异常使线程退出
( .interrupt ( :timer-thread timer)))
;; acquire timer中的notifier信号量,因为只有当线程结束前才会release notifier信号量,所以此处是等待线程结束
( .acquire ( :cancel-notifier timer)))
check-active!函数定义如下:
check-active!函数
[ timer ]
( when-not @( :active timer)
( throw ( IllegalStateException. "Timer is not active"))))
通过调用schedule函数和schedule-recurring函数我们可以向storm定时器中添加"定时任务"。schedule函数定义如下:
schedule函数
;; timer绑定定时器,delay-secs绑定"定时任务"相对当前时间的延迟时间,afn绑定callback函数,check-active是否需要检查定时器
[ timer delay-secs afn :check-active true ]
;; 检查定时器状态
( when check-active ( check-active! timer))
( let [ id ( uuid)
^ PriorityQueue queue ( :queue timer )]
;; 加锁,执行时间=当前时间+延迟时间,将"定时任务"的vector类型数据添加到PriorityQueue队列中
( locking ( :lock timer)
( .add queue [( + ( current-time-millis) ( secs-to-millis-long delay-secs)) afn id ]))))
schedule-recurring函数定义如下:schedule-recurring函数也很简单,与schedule函数的区别就是在"定时任务"的callback函数中又添加了一个相同的"定时任务"。schedule函数的语义可以理解成向定时器添加
一个"一次性任务",schedule-recurring函数的语义可以理解成向定时器添加"一个周期执行的定时任务"(开始执行时间=当前时间+延迟时间,然后每隔recur-secs执行一次),
schedule-recurring函数
[ timer delay-secs recur-secs afn ]
( schedule timer
delay-secs
( fn this []
( afn)
; This avoids a race condition with cancel-timer.
( schedule timer recur-secs this :check-active false))))
nimbus检查心跳和重分配任务的实现就是通过schedule-recurring函数向storm定时器添加了一个"周期任务"实现的。
0
( conf NIMBUS-MONITOR-FREQ-SECS)
( fn []
( when ( conf NIMBUS-REASSIGN)
( locking ( :submit-lock nimbus)
( mk-assignments nimbus)))
( do-cleanup nimbus)
))
转自:http://www.cnblogs.com/ierbar0604/p/3948558
相关问答
更多-
java 定时器 timer[2022-09-04]
Timer t1 = CSSingletonFactory.createNewTimer(); //频度 long runfreq; try { t1.purge(); // 実行频度(时) String runfrequency = **** ; // 基准时间:时 String basetimehh = **** ; // 基准时间:分 String basetimemm = **** ; Calendar now = Calendar.getInstance(); int nowhh = now.ge ... -
更改Timer定时器的间隔时间[2022-12-12]
需要将之前的timer invalid之后,创建新的timer -
Timer定时器过段时间后自动停止运行[2022-04-27]
有可能是由于timer已经被回收掉了 你是在什么环境下面winform? webform? 经过你的补充我看明白了 是这样的 由于你的应用程序在特定空闲时间之后相应的w3wp辅助进程会被回收掉 所以你的timer自然没有作用了 你可以查看iis 应用程序池的属性 切换到性能选项卡空闲超时 默认的时间是20分钟 也就是说如果20分钟内你的网站没有任何请求那么就会造成回收 -
只使用一次定时器(Using Timer only once)[2022-04-15]
替换MessageBox.Show("Hello World"); t.Stop(); MessageBox.Show("Hello World"); t.Stop(); 与t.Stop();MessageBox.Show("Hello World"); 。 由于您没有及时按下OK,计时器已经再次打勾,并且您从未达到停止代码。 Replace MessageBox.Show("Hello World"); t.Stop(); with t.Stop();MessageBox.Show("Hello Worl ... -
WatchDog定时器在Linux中(WatchDog Timer In Linux)[2023-10-21]
如果你想使用定时器中断,使用信号,特别是SIGALRM 。 您可以使用函数alarm()来请求超时。 如果你想使用粒度,你可以使用ualarm() 。 一旦达到超时,它将调用你之前定义的回调函数。 这是一个示例代码: #includevoid watchdog(int sig) { printf("Pet the dog\r\n"); /* reset the timer so we get called again in 5 seconds */ alarm(5); ... -
Android定时器?(Android timer? How-to?)[2023-04-12]
好的,因为这不清除,但有3种简单的方法来处理这个。 下面是一个示例,显示所有3,底部是一个示例,仅显示我认为更可取的方法。 还要记住在onPause中清理你的任务,如有必要,保存状态。 import java.util.Timer; import java.util.TimerTask; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Message; i ... -
immediate使用false 定时器将立即触发(默认情况下可以不设置),然后立即触发立即触发定时器。 布尔值,默认为true "pageTimer": { "on": "timer", "timerSpec": { "interval": 10, "maxTimerLength": 9.99, "immediate":false }, "request": "event", "va ...
-
定时器只运行一次(Timer runs only once)[2022-05-09]
在我回答这个问题之前。 您是否愿意回到上一个问题,并在那里标记您收到的答案,点击白色复选标记将其变为绿色。 我曾经尝试过减去值并且对结果不满意,所以我决定总是关闭计时器而不是使用负值。 #Persistent #SingleInstance SetTimer, CheckApp, 4000 Return CheckApp: IfWinActive, ahk_class QWidget { SetTimer, CheckApp, Off Click 486, 15 Click ... -
Swift定时器应用程序(Swift Timer App)[2022-12-20]
源代码中的一个错误。 首先存在计时器然后启动新计时器。 代码如下。 @IBAction func start(_ sender: AnyObject) { timer.invalidate() timer = Timer.scheduledTimer(timeInterval: 1, target: self, selector: #selector(ViewController.action), userInfo: nil, repeats: true) } 并且对于停止计时器使用无效的 ... -
定时器不会失效(Timer does not invalidate)[2021-11-29]
您可能正在启动多个计时器。 如果您在当前的计时器失效之前点击了创建计时器的代码行,那么您将有两个活动计时器,其中只有一个可以使其无效。 要防止出现这种情况,请在创建新计时器之前在scoreTimer上调用invalidate : scoreTimer.invalidate() scoreTimer = Timer.scheduledTimer(timeInterval: 0.05, target: self, selector: #selector(updatePassengerScore), userIn ...