精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

FutureTask是如何通過阻塞來獲取到異步線程執行結果的呢?

OSC開源社區 ? 來源:OSCHINA 社區 ? 2023-08-12 14:37 ? 次閱讀

1、FutureTask 對象介紹

Future 對象大家都不陌生,是 JDK1.5 提供的接口,是用來以阻塞的方式獲取線程異步執行完的結果。 在 Java 中想要通過線程執行一個任務,離不開 Runnable 與 Callable 這兩個接口。 Runnable 與 Callable 的區別在于,Runnable 接口只有一個 run 方法,該方法用來執行邏輯,但是并沒有返回值;而 Callable 的 call 方法,同樣用來執行業務邏輯,但是是有一個返回值的。

Callable 執行任務過程中可以通過 FutureTask 獲得任務的執行狀態,并且可以在執行完成后通過 Future.get () 方式獲取執行結果。 Future 是一個接口,而 FutureTask 就是 Future 的實現類。并且 FutureTask 實現了 RunnableFuture(Runnable + Future),說明我們可以創建一個 FutureTask 并直接把它放到線程池執行,然后獲取 FutureTask 的執行結果。

2、FutureTask 源碼解析

2.1 主要方法和屬性

那么 FutureTask 是如何通過阻塞的方式來獲取到異步線程執行的結果的呢?我們看下 FutureTask 中的屬性。

// FutureTask的狀態及其常量
privatevolatileint state;
    privatestaticfinalint NEW          =0;
    privatestaticfinalint COMPLETING   =1;
    privatestaticfinalint NORMAL       =2;
    privatestaticfinalint EXCEPTIONAL  =3;
    privatestaticfinalint CANCELLED    =4;
    privatestaticfinalint INTERRUPTING =5;
    privatestaticfinalint INTERRUPTED  =6;
    
    // callable對象,執行完后置空
    privateCallable callable;
    // 要返回的結果或要引發的異常來自 get() 方法
    privateObject outcome;// non-volatile, protected by state reads/writes
    // 執行Callable的線程
    privatevolatileThread runner;
    // 等待線程的一個鏈表結構
    privatevolatileWaitNode waiters;

?FutureTask 中幾個比較重要的方法。

// 取消任務的執行
booleancancel(boolean mayInterruptIfRunning);
// 返回任務是否已經被取消
booleanisCancelled();
// 返回任務是否已經完成,任務狀態不為NEW即為完成
booleanisDone();
// 通過get方法獲取任務的執行結果
Vget()throwsInterruptedException,ExecutionException;
// 通過get方法獲取任務的執行結果,帶有超時,如果超過給定時間則拋出異常
Vget(long timeout,TimeUnit unit)
        throwsInterruptedException,ExecutionException,TimeoutException;

?2.2 FutureTask 執行

當我們在線程池中執行一個 Callable 方法時,其實是將 Callable 任務封裝成一個 RunnableFuture 對象去執行,同時將這個 RunnableFuture 對象返回,這樣我們就拿到了 FutureTask 的引用,可以隨時獲取到任務執行的狀態,并且可以在任務執行完成后通過該對象獲取執行結果。 以下為 ThreadPoolExecutor 線程池提交一個 callable 方法的源碼。

public Future submit(Callable task){
        if(task ==null)thrownewNullPointerException();
        RunnableFuture ftask =newTaskFor(task);
        execute(ftask);
        return ftask;
    }

protected RunnableFuture newTaskFor(Callable callable){
        returnnewFutureTask(callable);
    }

?2.3 run 方法介紹

RunnableFuture 其實也是一個可以執行的 runnable,我們看下他的 run 方法。其主要流程就是執行 call 方法,正常執行完畢后將 result 結果賦值到 outcome 屬性上。

publicvoidrun(){
        if(state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null,Thread.currentThread()))
            return;
        try{
            // 將callable賦值到本地變量
            Callable c = callable;
            // 判斷callable不為空并且FutureTask的狀態必須為新創建
            if(c !=null&& state == NEW){
                V result;
                boolean ran;
                try{
                    // 執行call方法(用戶自己實現的call邏輯),并獲取到result結果
                    result = c.call();
                    ran =true;
                }catch(Throwable ex){
                    result =null;
                    ran =false;
                    // 如果執行過程出現異常,則將異常對象賦值到outcome上
                    setException(ex);
                }
                // 如果正常執行完畢,則將result賦值到outcome屬性上
                if(ran)
                    set(result);
            }
        }finally{
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner =null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if(s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

?以下邏輯為正常執行完成后賦值的邏輯。

// 如果任務沒有被取消,將future執行完的返回值賦值給result結果
// FutureTask任務的執行狀態是通過CAS的方式進行賦值的,并且由此可知,COMPLETING其實是一個瞬時狀態
// 當將線程執行結果賦值給outcome后,狀態會修改為對應的NORMAL,即正常結束
protectedvoidset(V v){
        if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final state
            finishCompletion();
        }
    }

?以下為執行異常時賦值邏輯,直接將 Throwable 對象賦值到 outcome 屬性上。

protectedvoidsetException(Throwable t){
        if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final state
            finishCompletion();
        }
    }

?無論是正常執行還是異常執行,最終都會調用一個 finishCompletion 方法,用來做工作的收尾工作。

2.4 get 方法介紹

Future 的 get 方法有兩個重載的方法,一個是 get () 獲取結果,一個是 get (long, TimeUnit) 帶有超時時間的獲取結果,我們看下 FutureTask 中的這兩個方法是如何實現的。

// 不帶有超時時間,一直阻塞直到獲取結果
publicVget()throwsInterruptedException,ExecutionException{
        int s = state;
        if(s <= COMPLETING)
            // 等待結果完成,帶有超時的get方法也是調用的awaitDone方法
            s =awaitDone(false,0L);
        // 返回結果
        returnreport(s);
    }

// 帶有超時時間的獲取結果,如果超過時間還沒有獲取到結果則拋出異常
publicVget(long timeout,TimeUnit unit)
        throwsInterruptedException,ExecutionException,TimeoutException{
        if(unit ==null)
            thrownewNullPointerException();
        int s = state;
        // 如果任務未中斷,調用awaitDone方法等待任務結果
        if(s <= COMPLETING &&
            (s =awaitDone(true, unit.toNanos(timeout)))<= COMPLETING)
            thrownewTimeoutException();
        // 返回結果
        returnreport(s);
    }

?我們主要看下 awaitDone 方法的執行邏輯。此方法會通過 for 循環的方式一直阻塞等待任務執行完成。如果帶有超時時間,則超過截止時間后會直接返回。

// timed:是否需要超時獲取
// nanos:超時時間單位納秒
privateintawaitDone(boolean timed,long nanos)
        throwsInterruptedException{
        finallong deadline = timed ?System.nanoTime()+ nanos :0L;
        WaitNode q =null;
        boolean queued =false;
        // 此方法會一直for循環判斷任務狀態是否已經完成,是Future.get阻塞的原因
        for(;;){
            if(Thread.interrupted()){
                removeWaiter(q);
                thrownewInterruptedException();
            }

            int s = state;
            // 任務狀態大于COMPLETING,則表明任務結束,直接返回
            if(s > COMPLETING){
                if(q !=null)
                    q.thread =null;
                return s;
            }
            elseif(s == COMPLETING)// cannot time out yet
                // Thread.yield() 方法,使當前線程由執行狀態,變成為就緒狀態,讓出cpu時間,在下一個線程執行時候,此線程有可能被執行,也有可能沒有被執行。
                // COMPLETING狀態為瞬時狀態,任務執行完成,要么是正常結束,要么異常結束,后續會被置為NORMAL或者EXCEPTIONAL
                Thread.yield();
            elseif(q ==null)
                // 每調用一次get方法,都會創建一個WaitNode等待節點
                q =newWaitNode();
            elseif(!queued)
                // 將該等待節點添加到鏈表結構waiters中,q.next = waiters 即在waiters的頭部插入
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果方法帶有超時判斷,則判斷當前時間是否已經超過了截止時間,如果超過了及截止日期,則退出循環直接返回當前狀態,此時任務狀態一定是NEW
            elseif(timed){
                nanos = deadline -System.nanoTime();
                if(nanos <=0L){
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

?我們在看下 report 方法,在調用 get 方法時是如何返回結果的。

這里首先獲取 outcome 的值,并判斷任務是否已經執行完成,如果執行完成,則將 outcome 對象強轉成泛型指定的類型;如果任務被取消了,則拋出一個 CancellationException 異常;如果都不是,則說明任務在執行過程中發生了異常,此時任務狀態位 EXCEPTIONAL,此時的 outcome 即為 Throwable 對象,所以將 outcome 強轉為 Throwable 并拋出異常。

由此可以知道,我們將一個 FutureTask 任務 submit 到線程池中執行的時候,如果發生了異常,是會在調用 get 方法的時候拋出的。
privateVreport(int s)throwsExecutionException{
        Object x = outcome;
        if(s == NORMAL)
            return(V)x;
        if(s >= CANCELLED)
            thrownewCancellationException();
        thrownewExecutionException((Throwable)x);
    }

?2.5 cancel 方法介紹

cancel 方法用于取消正在運行的任務,如果任務取消成功,則返回 TRUE,如果取消失敗則返回 FALSE。

// mayInterruptIfRunning:允許中斷正在運行的任務
publicbooleancancel(boolean mayInterruptIfRunning){
        // mayInterruptIfRunning如果為true則將狀態置為INTERRUPTING,如果未false則將狀態置為CANCELLED
        if(!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            returnfalse;
        // 如果狀態修改成功后,判斷是否允許中斷線程,如果允許,則調用Thread的interrupt方法中斷
        try{// in case call to interrupt throws exception
            if(mayInterruptIfRunning){
                try{
                    Thread t = runner;
                    if(t !=null)
                        t.interrupt();
                }finally{// final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        }finally{
            // 取消后的收尾工作
            finishCompletion();
        }
        returntrue;
    }

?2.6 isDone/isCancelled 方法介紹

isDone 方法用于判斷 FutureTask 是否已經完成;isCancelled 方法用來判斷 FutureTask 是否已經取消,這兩個方法都是通過狀態位來判斷的。

publicbooleanisCancelled(){
        return state >= CANCELLED;
    }

    publicbooleanisDone(){
        return state != NEW;
    }

?2.7 finishCompletion 方法介紹

我們看下 finishCompletion 方法都做了哪些工作。

// 刪除所有等待線程并發出信號,最后執行done方法
privatevoidfinishCompletion(){
        // assert state > COMPLETING;
        for(WaitNode q;(q = waiters)!=null;){
            if(UNSAFE.compareAndSwapObject(this, waitersOffset, q,null)){
                for(;;){
                    Thread t = q.thread;
                    if(t !=null){
                        q.thread =null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if(next ==null)
                        break;
                    q.next =null;// unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable =null;// to reduce footprint
    }

?我們看到 done 方法是一個受保護的空方法,此處沒有任何邏輯,由其子類去根據自己的業務去實現相應的邏輯。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
protectedvoiddone(){}

3、總結

通過源碼解讀可以了解到 Future 的原理:

第一步:主線程將任務封裝成一個 Callable 對象,通過 submit 方法提交到線程池去執行。

第二步:線程池執行任務的 run 方法,主線程則可以繼續執行其他邏輯。

第三步:線程池中方法執行完成后將結果賦值到 outcome 屬性上,并修改任務狀態。

第四步:主線程在需要拿到異步任務結果的時候,主動調用 fugure.get () 方法來獲取結果。

第五步:如果異步線程在執行過程中發生異常,則會在調用 future.get () 方法的時候拋出來。 以上就是對于 FutureTask 的分析,我們可以了解 FutureTask 任務執行的方式以及 Future.get 已阻塞的方式獲取線程執行的結果原理,并且從代碼中可以了解 FutureTask 的任務執行狀態以及狀態的變化過程。





審核編輯:劉清

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 狀態機
    +關注

    關注

    2

    文章

    492

    瀏覽量

    27486
  • 線程池
    +關注

    關注

    0

    文章

    57

    瀏覽量

    6836
  • for循環
    +關注

    關注

    0

    文章

    61

    瀏覽量

    2495

原文標題:并發編程 - FutureTask 解析

文章出處:【微信號:OSC開源社區,微信公眾號:OSC開源社區】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    鴻蒙原生應用開發-ArkTS語言基礎類庫多線程I/O密集型任務開發

    使用異步并發可以解決單次I/O任務阻塞的問題,但是如果遇到I/O密集型任務,同樣會阻塞線程中其它任務的執行,這時需要使用多
    發表于 03-21 14:57

    Java線程阻塞方法大全

    如果線程是因為調用了wait()、sleep()或者join()方法而導致的阻塞,可以中斷線程,并且通過拋出InterruptedException
    發表于 04-02 15:42

    Java的線程喚醒與阻塞規則

    如果線程是因為調用了wait()、sleep()或者join()方法而導致的阻塞,可以中斷線程,并且通過拋出InterruptedException
    發表于 07-06 15:11

    同步與異步阻塞與非阻塞的區別是什么

    同步與異步阻塞與非阻塞的區別
    發表于 01-26 06:12

    labview怎么終止‘執行系統命令’并獲取所有結果

    現在有一個應用要一邊運動一邊通過執行系統命令’獲取一系列數據,獲取數據的指令是一直以0.1秒的速率讀取數據,我可以在運動開始的時候開啟‘
    發表于 04-07 10:31

    如何使用多線程異步操作等并發設計方法最大化程序的性能

    很多朋友往往會使用線程執行耗時較長的I/O操作。這樣在只有少數幾個并發操作的時候還無傷大雅,如果需要處理大量的并發操作時就不合適了。  異步調用與多
    發表于 08-23 16:31

    A線程如何在線程本身識別變量是否改變

    阻塞獲取可以解決但是這個B線程是別人代碼寫的。不好修改不想再增加一個線程去循環讀取變量X是否改變,再釋放信號量需求A線程如何在
    發表于 11-02 11:02

    異步調用子vi問題

    我試了異步調用子vi,現在的問題是子vi是一個循環,但是我在主程序獲取子vi的結果時,只有子vi結束了才能獲取且只能獲取到循環最后一次的
    發表于 11-11 10:34

    是否有函數或者功能可以實現A線程阻塞變量的值

    阻塞獲取可以解決但是這個B線程是別人代碼寫的。不好修改不想再增加一個線程去循環讀取變量X是否改變,再釋放信號量需求A線程如何在
    發表于 02-01 16:25

    Runloop是怎樣進行線程保活

    ,它會向一個 C語言程序那樣在運行完所有代碼后退出線程。 而網絡請求是異步的,這導致獲取到請求數據時,線程已經退出,代理方法沒有機會執行
    發表于 09-26 10:34 ?0次下載

    沒有互聯網,如何本地獲取到LoRaWAN的終端數據?

    一般情況下,我們可以通過連接TTN,獲取到LoRaWAN的終端數據。 但是,如果沒有互聯網,那么,我們也就無法通過連接TTN
    發表于 04-10 16:38 ?946次閱讀

    詳解同步異步阻塞阻塞

    同步、異步分別指的是一種通訊方式,當 cpu 不需要執行線程上下文切換就能完成任務,此時便認為這種通訊方式是同步的,相對的如果存在cpu 上下文切換,這種方式便是異步
    的頭像 發表于 05-03 17:53 ?4819次閱讀
    詳解同步<b class='flag-5'>異步</b>和<b class='flag-5'>阻塞</b>非<b class='flag-5'>阻塞</b>

    使用匿名管道技術獲取CMD命令的執行結果

    遠程 CMD 是指惡意程序接收到控制端發送的 CMD 指令后,在本地執行 CMD 命令,并將執行結果回傳至控制端。本文將演示使用匿名管道技術獲取 CMD 命令的
    的頭像 發表于 04-03 18:04 ?3810次閱讀

    CompletableFuture異步線程是真的優雅

    雖然 Future 以及相關使用方法提供了異步執行任務的能力,但是對于結果獲取卻是很不方便,我們必須使用Future.get()的方式阻塞
    的頭像 發表于 08-07 15:40 ?633次閱讀
    CompletableFuture<b class='flag-5'>異步</b>多<b class='flag-5'>線程</b>是真的優雅

    verilog同步和異步的區別 verilog阻塞賦值和非阻塞賦值的區別

    Verilog中同步和異步的區別,以及阻塞賦值和非阻塞賦值的區別。 一、Verilog中同步和異步的區別 同步傳輸和異步傳輸是指數據在電路中
    的頭像 發表于 02-22 15:33 ?1562次閱讀