1、FutureTask 對象介紹
Future 對象大家都不陌生,是 JDK1.5 提供的接口,是用來以阻塞的方式獲取線程異步執行完的結果。 在 Java 中想要通過線程執行一個任務,離不開 Runnable 與 Callable 這兩個接口。 Runnable 與 Callable 的區別在于,Runnable 接口只有一個 run 方法,該方法用來執行邏輯,但是并沒有返回值;而 Callable 的 call 方法,同樣用來執行業務邏輯,但是是有一個返回值的。
Callable 執行任務過程中可以通過 FutureTask 獲得任務的執行狀態,并且可以在執行完成后通過 Future.get () 方式獲取執行結果。 Future 是一個接口,而 FutureTask 就是 Future 的實現類。并且 FutureTask 實現了 RunnableFuture(Runnable + Future),說明我們可以創建一個 FutureTask 并直接把它放到線程池執行,然后獲取 FutureTask 的執行結果。
2、FutureTask 源碼解析
2.1 主要方法和屬性
那么 FutureTask 是如何通過阻塞的方式來獲取到異步線程執行的結果的呢?我們看下 FutureTask 中的屬性。
// FutureTask的狀態及其常量 privatevolatileint state; privatestaticfinalint NEW =0; privatestaticfinalint COMPLETING =1; privatestaticfinalint NORMAL =2; privatestaticfinalint EXCEPTIONAL =3; privatestaticfinalint CANCELLED =4; privatestaticfinalint INTERRUPTING =5; privatestaticfinalint INTERRUPTED =6; // callable對象,執行完后置空 privateCallablecallable; // 要返回的結果或要引發的異常來自 get() 方法 privateObject outcome;// non-volatile, protected by state reads/writes // 執行Callable的線程 privatevolatileThread runner; // 等待線程的一個鏈表結構 privatevolatileWaitNode waiters;
?FutureTask 中幾個比較重要的方法。
// 取消任務的執行 booleancancel(boolean mayInterruptIfRunning); // 返回任務是否已經被取消 booleanisCancelled(); // 返回任務是否已經完成,任務狀態不為NEW即為完成 booleanisDone(); // 通過get方法獲取任務的執行結果 Vget()throwsInterruptedException,ExecutionException; // 通過get方法獲取任務的執行結果,帶有超時,如果超過給定時間則拋出異常 Vget(long timeout,TimeUnit unit) throwsInterruptedException,ExecutionException,TimeoutException;
?2.2 FutureTask 執行
當我們在線程池中執行一個 Callable 方法時,其實是將 Callable 任務封裝成一個 RunnableFuture 對象去執行,同時將這個 RunnableFuture 對象返回,這樣我們就拿到了 FutureTask 的引用,可以隨時獲取到任務執行的狀態,并且可以在任務執行完成后通過該對象獲取執行結果。 以下為 ThreadPoolExecutor 線程池提交一個 callable 方法的源碼。
publicFuture submit(Callable task){ if(task ==null)thrownewNullPointerException(); RunnableFuture ftask =newTaskFor(task); execute(ftask); return ftask; } protected RunnableFuture newTaskFor(Callable callable){ returnnewFutureTask (callable); }
?2.3 run 方法介紹
RunnableFuture 其實也是一個可以執行的 runnable,我們看下他的 run 方法。其主要流程就是執行 call 方法,正常執行完畢后將 result 結果賦值到 outcome 屬性上。
publicvoidrun(){ if(state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null,Thread.currentThread())) return; try{ // 將callable賦值到本地變量 Callablec = callable; // 判斷callable不為空并且FutureTask的狀態必須為新創建 if(c !=null&& state == NEW){ V result; boolean ran; try{ // 執行call方法(用戶自己實現的call邏輯),并獲取到result結果 result = c.call(); ran =true; }catch(Throwable ex){ result =null; ran =false; // 如果執行過程出現異常,則將異常對象賦值到outcome上 setException(ex); } // 如果正常執行完畢,則將result賦值到outcome屬性上 if(ran) set(result); } }finally{ // runner must be non-null until state is settled to // prevent concurrent calls to run() runner =null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
?以下邏輯為正常執行完成后賦值的邏輯。
// 如果任務沒有被取消,將future執行完的返回值賦值給result結果 // FutureTask任務的執行狀態是通過CAS的方式進行賦值的,并且由此可知,COMPLETING其實是一個瞬時狀態 // 當將線程執行結果賦值給outcome后,狀態會修改為對應的NORMAL,即正常結束 protectedvoidset(V v){ if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){ outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final state finishCompletion(); } }
?以下為執行異常時賦值邏輯,直接將 Throwable 對象賦值到 outcome 屬性上。
protectedvoidsetException(Throwable t){ if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){ outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final state finishCompletion(); } }?無論是正常執行還是異常執行,最終都會調用一個 finishCompletion 方法,用來做工作的收尾工作。
2.4 get 方法介紹
Future 的 get 方法有兩個重載的方法,一個是 get () 獲取結果,一個是 get (long, TimeUnit) 帶有超時時間的獲取結果,我們看下 FutureTask 中的這兩個方法是如何實現的。
// 不帶有超時時間,一直阻塞直到獲取結果 publicVget()throwsInterruptedException,ExecutionException{ int s = state; if(s <= COMPLETING) // 等待結果完成,帶有超時的get方法也是調用的awaitDone方法 s =awaitDone(false,0L); // 返回結果 returnreport(s); } // 帶有超時時間的獲取結果,如果超過時間還沒有獲取到結果則拋出異常 publicVget(long timeout,TimeUnit unit) throwsInterruptedException,ExecutionException,TimeoutException{ if(unit ==null) thrownewNullPointerException(); int s = state; // 如果任務未中斷,調用awaitDone方法等待任務結果 if(s <= COMPLETING && (s =awaitDone(true, unit.toNanos(timeout)))<= COMPLETING) thrownewTimeoutException(); // 返回結果 returnreport(s); }
?我們主要看下 awaitDone 方法的執行邏輯。此方法會通過 for 循環的方式一直阻塞等待任務執行完成。如果帶有超時時間,則超過截止時間后會直接返回。
// timed:是否需要超時獲取 // nanos:超時時間單位納秒 privateintawaitDone(boolean timed,long nanos) throwsInterruptedException{ finallong deadline = timed ?System.nanoTime()+ nanos :0L; WaitNode q =null; boolean queued =false; // 此方法會一直for循環判斷任務狀態是否已經完成,是Future.get阻塞的原因 for(;;){ if(Thread.interrupted()){ removeWaiter(q); thrownewInterruptedException(); } int s = state; // 任務狀態大于COMPLETING,則表明任務結束,直接返回 if(s > COMPLETING){ if(q !=null) q.thread =null; return s; } elseif(s == COMPLETING)// cannot time out yet // Thread.yield() 方法,使當前線程由執行狀態,變成為就緒狀態,讓出cpu時間,在下一個線程執行時候,此線程有可能被執行,也有可能沒有被執行。 // COMPLETING狀態為瞬時狀態,任務執行完成,要么是正常結束,要么異常結束,后續會被置為NORMAL或者EXCEPTIONAL Thread.yield(); elseif(q ==null) // 每調用一次get方法,都會創建一個WaitNode等待節點 q =newWaitNode(); elseif(!queued) // 將該等待節點添加到鏈表結構waiters中,q.next = waiters 即在waiters的頭部插入 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果方法帶有超時判斷,則判斷當前時間是否已經超過了截止時間,如果超過了及截止日期,則退出循環直接返回當前狀態,此時任務狀態一定是NEW elseif(timed){ nanos = deadline -System.nanoTime(); if(nanos <=0L){ removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }?我們在看下 report 方法,在調用 get 方法時是如何返回結果的。
這里首先獲取 outcome 的值,并判斷任務是否已經執行完成,如果執行完成,則將 outcome 對象強轉成泛型指定的類型;如果任務被取消了,則拋出一個 CancellationException 異常;如果都不是,則說明任務在執行過程中發生了異常,此時任務狀態位 EXCEPTIONAL,此時的 outcome 即為 Throwable 對象,所以將 outcome 強轉為 Throwable 并拋出異常。
由此可以知道,我們將一個 FutureTask 任務 submit 到線程池中執行的時候,如果發生了異常,是會在調用 get 方法的時候拋出的。
privateVreport(int s)throwsExecutionException{ Object x = outcome; if(s == NORMAL) return(V)x; if(s >= CANCELLED) thrownewCancellationException(); thrownewExecutionException((Throwable)x); }
?2.5 cancel 方法介紹
cancel 方法用于取消正在運行的任務,如果任務取消成功,則返回 TRUE,如果取消失敗則返回 FALSE。
// mayInterruptIfRunning:允許中斷正在運行的任務 publicbooleancancel(boolean mayInterruptIfRunning){ // mayInterruptIfRunning如果為true則將狀態置為INTERRUPTING,如果未false則將狀態置為CANCELLED if(!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) returnfalse; // 如果狀態修改成功后,判斷是否允許中斷線程,如果允許,則調用Thread的interrupt方法中斷 try{// in case call to interrupt throws exception if(mayInterruptIfRunning){ try{ Thread t = runner; if(t !=null) t.interrupt(); }finally{// final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } }finally{ // 取消后的收尾工作 finishCompletion(); } returntrue; }
?2.6 isDone/isCancelled 方法介紹
isDone 方法用于判斷 FutureTask 是否已經完成;isCancelled 方法用來判斷 FutureTask 是否已經取消,這兩個方法都是通過狀態位來判斷的。
publicbooleanisCancelled(){ return state >= CANCELLED; } publicbooleanisDone(){ return state != NEW; }
?2.7 finishCompletion 方法介紹
我們看下 finishCompletion 方法都做了哪些工作。
// 刪除所有等待線程并發出信號,最后執行done方法 privatevoidfinishCompletion(){ // assert state > COMPLETING; for(WaitNode q;(q = waiters)!=null;){ if(UNSAFE.compareAndSwapObject(this, waitersOffset, q,null)){ for(;;){ Thread t = q.thread; if(t !=null){ q.thread =null; LockSupport.unpark(t); } WaitNode next = q.next; if(next ==null) break; q.next =null;// unlink to help gc q = next; } break; } } done(); callable =null;// to reduce footprint }?我們看到 done 方法是一個受保護的空方法,此處沒有任何邏輯,由其子類去根據自己的業務去實現相應的邏輯。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
protectedvoiddone(){}
3、總結
通過源碼解讀可以了解到 Future 的原理:
第一步:主線程將任務封裝成一個 Callable 對象,通過 submit 方法提交到線程池去執行。
第二步:線程池執行任務的 run 方法,主線程則可以繼續執行其他邏輯。
第三步:線程池中方法執行完成后將結果賦值到 outcome 屬性上,并修改任務狀態。
第四步:主線程在需要拿到異步任務結果的時候,主動調用 fugure.get () 方法來獲取結果。
第五步:如果異步線程在執行過程中發生異常,則會在調用 future.get () 方法的時候拋出來。 以上就是對于 FutureTask 的分析,我們可以了解 FutureTask 任務執行的方式以及 Future.get 已阻塞的方式獲取線程執行的結果原理,并且從代碼中可以了解 FutureTask 的任務執行狀態以及狀態的變化過程。
審核編輯:劉清
-
狀態機
+關注
關注
2文章
492瀏覽量
27486 -
線程池
+關注
關注
0文章
57瀏覽量
6836 -
for循環
+關注
關注
0文章
61瀏覽量
2495
原文標題:并發編程 - FutureTask 解析
文章出處:【微信號:OSC開源社區,微信公眾號:OSC開源社區】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論