前言
ScheduledThreadPoolExecutor
可以用來很方便實現我們的調度任務,具體使用可以參考調度線程池ScheduledThreadPoolExecutor的正確使用姿勢這篇文章,那大家知道它是怎么實現的嗎,本文就帶大家來揭曉謎底。
實現機制分析
我們先思考下,如果讓大家去實現ScheduledThreadPoolExecutor
可以周期性執行任務的功能,需要考慮哪些方面呢?
ScheduledThreadPoolExecutor
的整體實現思路是什么呢?
答:我們是不是可以繼承線程池類,按照線程池的思路,將任務先丟到阻塞隊列中,等到時間到了,工作線程就從阻塞隊列獲取任務執行。
- 如何實現等到了未來的時間點就開始執行呢?
答:我們可以根據參數獲取這個任務還要多少時間執行,那么我們是不是可以從阻塞隊列中獲取任務的時候,通過條件隊列的的awaitNanos(delay)
方法,阻塞一定時間。
- 如何實現 任務的重復性執行呢?
答:這就更加簡單了,任務執行完成后,把它再次加入到隊列不就行了嗎。
源碼解析
類結構圖
ScheduledThreadPoolExecutor
的類結構圖如上圖所示,很明顯它是在我們的線程池ThreadPoolExecutor
框架基礎上擴展的。
ScheduledExecutorService
:實現了該接口,封裝了調度相關的APIThreadPoolExecutor
:繼承了該類,保留了線程池的能力和整個實現的框架DelayedWorkQueue
:內部類,延遲阻塞隊列。ScheduledFutureTask
:延遲任務對象,包含了任務、任務狀態、剩余的時間、結果等信息。
重要屬性
通過ScheduledThreadPoolExecutor
類的成員屬性,我們可以了解它的數據結構。
shutdown
后是否繼續執行周期任務(重復執行)
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
shutdown
后是否繼續執行延遲任務(只執行一次)
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
- 調用
cancel()
方法后,是否將該任務從隊列中移除,默認false
private volatile boolean removeOnCancel = false;
- 任務的序列號,保證FIFO隊列的順序,用來比較優先級
private static final AtomicLong sequencer = new AtomicLong()
ScheduledFutureTask
延遲任務類
ScheduledFutureTask
繼承FutureTask
,實現RunnableScheduledFuture
接口,無論是runnable
還是callable
,無論是否需要延遲和定時,所有的任務都會被封裝成ScheduledFutureTask
。- 該類具有延遲執行的特點, 覆蓋
FutureTask
的run
方法來實現對延時執行、周期執行的支持。 - 對于延時任務調用
FutureTask#run
,而對于周期性任務則調用FutureTask#runAndReset
并且在成功之后根據fixed-delay/fixed-rate
模式來設置下次執行時間并重新將任務塞到工作隊列。 - 成員屬性如下:
// 任務序列號
private final long sequenceNumber;
// 任務可以被執行的時間,交付時間,以納秒表示
private long time;
// 0 表示非周期任務
// 正數表示 fixed-rate(兩次開始啟動的間隔)模式的周期,
// 負數表示 fixed-delay(一次執行結束到下一次開始啟動) 模式
private final long period;
// 執行的任務對象
RunnableScheduledFuture
DelayedWorkQueue
延遲隊列
DelayedWorkQueue
是支持延時獲取元素的阻塞隊列, 內部采用優先隊列 PriorityQueue(小根堆、滿二叉樹)存儲元素。- 內部數據結構是數組,所以延遲隊列出隊頭元素后需要讓其他元素(尾)替換到頭節點,防止空指針異常。
- 成員屬性如下:
// 初始容量
private static final int INITIAL_CAPACITY = 16;
// 節點數量
private int size = 0;
// 存放任務的數組
private RunnableScheduledFuture?[] queue =
new RunnableScheduledFuture?[INITIAL_CAPACITY];
// 控制并發用的鎖
private final ReentrantLock lock = new ReentrantLock();
// 條件隊列
private final Condition available = lock.newCondition();
//指定用于等待隊列頭節點任務的線程
private Thread leader = null;
提交延遲任務schedule()
原理
延遲執行方法,并指定延遲執行的時間,只會執行一次。
schedule()
方法是延遲任務方法的入口。
public ScheduledFuture? schedule(Runnable command,
long delay,
TimeUnit unit) {
// 判空處理
if (command == null || unit == null)
throw new NullPointerException();
// 將外部傳入的任務封裝成延遲任務對象ScheduledFutureTask
RunnableScheduledFuture? t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 執行延遲任務
delayedExecute(t);
return t;
}
decorateTask(...)
該方法是封裝延遲任務
- 調用
triggerTime(delay, unit)
方法計算延遲的時間。
// 返回【當前時間 + 延遲時間】,就是觸發當前任務執行的時間
private long triggerTime(long delay, TimeUnit unit) {
// 設置觸發的時間
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
// 如果 delay < Long.Max_VALUE/2,則下次執行時間為當前時間 +delay
// 否則為了避免隊列中出現由于溢出導致的排序紊亂,需要調用overflowFree來修正一下delay
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// 下面這種情況很少,大家看不懂可以不用強行理解
// 如果某個任務的 delay 為負數,說明當前可以執行(其實早該執行了)。
// 阻塞隊列中維護任務順序是基于 compareTo 比較的,比較兩個任務的順序會用 time 相減。
// 那么可能出現一個 delay 為正數減去另一個為負數的 delay,結果上溢為負數,則會導致 compareTo 產生錯誤的結果
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
// 判斷一下隊首的delay是不是負數,如果是正數就不用管,怎么減都不會溢出
// 否則拿當前 delay 減去隊首的 delay 來比較看,如果不出現上溢,排序不會亂
// 不然就把當前 delay 值給調整為 Long.MAX_VALUE + 隊首 delay
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
- 調用
RunnableScheduledFuture
的構造方法封裝為延遲任務
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
// 任務的觸發時間
this.time = ns;
// 任務的周期, 延遲任務的為0,因為不需要重復執行
this.period = 0;
// 任務的序號 + 1
this.sequenceNumber = sequencer.getAndIncrement();
}
- 調用
decorateTask()
方法裝飾延遲任務
// 沒有做任何操作,直接將 task 返回,該方法主要目的是用于子類擴展
protected
提交周期任務scheduleAtFixedRate()
原理
按照固定的頻率周期性的執行任務,捕手renwu,一次任務的啟動到下一次任務的啟動的間隔
public ScheduledFuture? scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 任務封裝,【指定初始的延遲時間和周期時間】
ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(period));
// 默認返回本身
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 開始執行這個任務
delayedExecute(t);
return t;
}
提交周期任務scheduleWithFixedDelay()
原理
按照指定的延時周期性執行任務,上一個任務執行完畢后,延時一定時間,再次執行任務。
public ScheduledFuture? scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 任務封裝,【指定初始的延遲時間和周期時間】,周期時間為 - 表示是 fixed-delay 模式
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 開始執行這個任務
delayedExecute(t);
return t;
}
執行任務delayedExecute(t)
原理
上面多種提交任務的方式,殊途同歸,最終都會調用delayedExecute()
方法執行延遲或者周期任務。
delayedExecute()
方法是執行延遲任務的入口
private void delayedExecute(RunnableScheduledFuture? task) {
// 線程池是 SHUTDOWN 狀態,執行拒絕策略
if (isShutdown())
// 調用拒絕策略的方法
reject(task);
else {
// 把當前任務放入阻塞隊列
super.getQueue().add(task);
// 線程池狀態為 SHUTDOWN 并且不允許執行任務了,就從隊列刪除該任務,并設置任務的狀態為取消狀態
// 非主流程,可以跳過,不重點看了
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false);
else
// 開始執行了哈
ensurePrestart();
}
}
ensurePrestart()
方法開啟線程執行
// ThreadPoolExecutor#ensurePrestart
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// worker數目小于corePoolSize,則添加一個worker。
if (wc < corePoolSize)
// 第二個參數 true 表示采用核心線程數量限制,false 表示采用 maximumPoolSize
addWorker(null, true);
// corePoolSize = 0的情況,至少開啟一個線程,【擔保機制】
else if (wc == 0)
addWorker(null, false);
}
addWorker()
方法實際上父類ThreadPoolExecutor
的方法,這個方法在該文章 Java線程池源碼深度解析中詳細介紹過,這邊做個總結:
- 如果線程池中工作線程數量小于最大線程數,創建工作線程,執行任務。
- 如果線程池中工作線程數量大于最大線程數,直接返回。
獲取延遲任務take()原理
目前工作線程已經創建好了,工作線程開始工作了,它會從阻塞隊列中獲取延遲任務執行,這部分也是線程池里面的原理,不做展開,那我們看下它是如何實現延遲執行的? 主要關注如何從阻塞隊列中獲取任務。
DelayedWorkQueue#take()
方法獲取延遲任務
- 該方法會在上面的
addWoker()
方法創建工作線程后,工作線程中循環持續調用workQueue.take()
方法獲取延遲任務。 - 該方法主要獲取延遲隊列中任務延遲時間小于等于0 的任務。
- 如果延遲時間不小于0,那么調用條件隊列的
awaitNanos(delay)
阻塞方法等待一段時間,等時間到了,延遲時間自然小于等于0了。 - 獲取到任務后,工作線程就可以開始執行調度任務了。
// DelayedWorkQueue#take()
public RunnableScheduledFuture? take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加可中斷鎖
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 獲取阻塞隊列中的頭結點
RunnableScheduledFuture? first = queue[0];
// 如果阻塞隊列沒有數據,為空
if (first == null)
// 等待隊列不空,直至有任務通過 offer 入隊并喚醒
available.await();
else {
// 獲取頭節點的的任務還剩余多少時間才執行
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 到達觸發時間,獲取頭節點并調整堆,重新選擇延遲時間最小的節點放入頭部
return finishPoll(first);
// 邏輯到這說明頭節點的延遲時間還沒到
first = null;
// 說明有 leader 線程在等待獲取頭節點,當前線程直接去阻塞等待
if (leader != null)
// 當前線程阻塞
available.await();
else {
// 沒有 leader 線程,【當前線程作為leader線程,并設置頭結點的延遲時間作為阻塞時間】
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 當前線程通過awaitNanos方法等待delay時間后,會自動喚醒,往后面繼續執行
available.awaitNanos(delay);
// 到達阻塞時間時,當前線程會從這里醒來,進入下一輪循環,就有可能執行了
} finally {
// t堆頂更新,leader 置為 null,offer 方法釋放鎖后,
// 有其它線程通過 take/poll 拿到鎖,讀到 leader == null,然后將自身更新為leader。
if (leader == thisThread)
// leader 置為 null 用以接下來判斷是否需要喚醒后繼線程
leader = null;
}
}
}
}
} finally {
// 沒有 leader 線程并且頭結點不為 null,喚醒阻塞獲取頭節點的線程,
// 【如果沒有這一步,就會出現有了需要執行的任務,但是沒有線程去執行】
if (leader == null && queue[0] != null)
available.signal();
// 解鎖
lock.unlock();
}
}
finishPoll()
方法獲取到任務后執行
該方法主要做兩個事情, 獲取頭節點并調整堆,重新選擇延遲時間最小的節點放入頭部。
private RunnableScheduledFuture?</span> finishPoll(RunnableScheduledFuture?span> f) {
// 獲取尾索引
int s = --size;
// 獲取尾節點
RunnableScheduledFuture? x = queue[s];
// 將堆結構最后一個節點占用的 slot 設置為 null,因為該節點要嘗試升級成堆頂,會根據特性下調
queue[s] = null;
// s == 0 說明 當前堆結構只有堆頂一個節點,此時不需要做任何的事情
if (s != 0)
// 從索引處 0 開始向下調整
siftDown(0, x);
// 出隊的元素索引設置為 -1
setIndex(f, -1);
return f;
}
延遲任務運行的原理
從延遲隊列中獲取任務后,工作線程會調用延遲任務的run()方法執行任務。
ScheduledFutureTask#run()
方法運行任務
- 調用
isPeriodic()
方法判斷任務是否是周期性任務還是非周期性任務 - 如果任務是非周期任務,就調用父類的
FutureTask#run()
執行一次 - 如果任務是非周期任務,就調用父類的
FutureTask#runAndReset()
, 返回true會設置下一次的執行時間,重新放入線程池的阻塞隊列中,等待下次獲取執行
public void run() {
// 是否周期性,就是判斷 period 是否為 0
boolean periodic = isPeriodic();
// 根據是否是周期任務檢查當前狀態能否執行任務,不能執行就取消任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 非周期任務,直接調用 FutureTask#run 執行一次
else if (!periodic)
ScheduledFutureTask.super.run();
// 周期任務的執行,返回 true 表示執行成功
else if (ScheduledFutureTask.super.runAndReset()) {
// 設置周期任務的下一次執行時間
setNextRunTime();
// 任務的下一次執行安排,如果當前線程池狀態可以執行周期任務,加入隊列,并開啟新線程
reExecutePeriodic(outerTask);
}
}
FutureTask#runAndReset()
執行周期性任務
- 周期任務正常完成后任務的狀態不會變化,依舊是 NEW,不會設置 outcome 屬性。
- 但是如果本次任務執行出現異常,會進入 setException 方法將任務狀態置為異常,把異常保存在 outcome 中。
- 方法返回 false,后續的該任務將不會再周期的執行
protected boolean runAndReset() {
// 任務不是新建的狀態了,或者被別的線程執行了,直接返回 false
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable
ScheduledFutureTask#setNextRunTime()
設置下次執行時間
- 如果屬性period大于0,表示
fixed-rate
模式,直接加上period時間即可。 - 如果屬性period小于等于0, 表示是
fixed-delay
模式, 調用triggerTime重新計算下次時間。
// 任務下一次的觸發時間
private void setNextRunTime() {
long p = period;
if (p > 0)
// fixed-rate 模式,【時間設置為上一次執行任務的時間 + p】,兩次任務執行的時間差
time += p;
else
// fixed-delay 模式,下一次執行時間是【當前這次任務結束的時間(就是現在) + delay 值】
time = triggerTime(-p);
}
ScheduledFutureTask#reExecutePeriodic()
,重新放入阻塞任務隊列,等待獲取,進行下一輪執行
// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture? task) {
if (canRunInCurrentRunState(true)) {
// 【放入任務隊列】
super.getQueue().add(task);
// 如果提交完任務之后,線程池狀態變為了 shutdown 狀態,需要再次檢查是否可以執行,
// 如果不能執行且任務還在隊列中未被取走,則取消任務
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 當前線程池狀態可以執行周期任務,加入隊列,并【根據線程數量是否大于核心線程數確定是否開啟新線程】
ensurePrestart();
}
}
-
線程池
+關注
關注
0文章
57瀏覽量
6836
發布評論請先 登錄
相關推薦
評論