精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

AQS如何解決線程同步與通信問題

科技綠洲 ? 來源:Java技術指北 ? 作者:Java技術指北 ? 2023-10-13 11:23 ? 次閱讀

我們在第一篇中說到AQS使用的是管程模型,而管程模型是使用條件變量來解決同步通信問題的。條件變量會有兩個方法,喚醒和等待。當條件滿足時,我們會通過喚醒方法將條件隊列中的線程放入第二篇所說的同步隊列中;如果不滿足條件,則會通過等待方法將線程阻塞放入條件隊列中。而AQS中通過ConditionObject類實現了條件變量,所以接下來我們就具體看看ConditionObject類吧。

一 屬性

我們先看下ConditionObject中的屬性

/** 鏈表頭節點 */
private transient Node firstWaiter;
/** 鏈表尾節點 */
private transient Node lastWaiter;

開頭說了,條件變量中會有一個條件隊列,ConditionObject中的條件隊列使用的是單向鏈表,firstWaiter和lastWaiter為頭尾節點,節點也是使用AQS的內部類Node,但同步隊列是個雙向鏈表,條件隊列是單向鏈表,所以條件隊列使用的是Node類中的nextWaiter屬性作為下一個節點的鏈接指針。

volatile Node prev;
volatile Node next;
Node nextWaiter;

我們可以注意到nextWaiter是沒用volatile修飾的,這是因為線程在調用await方法進入條件隊列時,是已經擁有了鎖的。還有一點需要注意是,條件隊列里面的Node只會存在CANCELLED和CONDITION的狀態,有別于同步隊列。

二 喚醒方法

2.1 signalAll

此方法是喚醒所有條件隊列中的節點,即將條件隊列中的所有節點都移動到我們第二篇所說的同步隊列中,然后再去競爭鎖,具體源碼如下:

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

上面我們說了,要調用喚醒和等待方法,都需要此線程獲取鎖,首先我們會通過子類復寫的方法isHeldExclusively來看此時的線程是否已經獲得了鎖。如果獲得了鎖,我們會判斷條件隊列的頭節點是否為null,為null則說明條件隊列中沒有阻塞的Node;如果不為null,則會通過doSignalAll方法來將條件隊列中的所有Node移動到同步隊列中

2.1.1 doSignalAll

doSignalAll方法主要功能就是遍歷條件隊列里面的節點Node,然后通過transferForSignal方法將Node移動到同步隊列中,源碼如下:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
       // 將next指向first的后繼Node
        Node next = first.nextWaiter;
       // 切斷first與后繼Node的聯系
        first.nextWaiter = null;
       // 將此node轉移到同步隊列中
        transferForSignal(first);
        // 將first指向first的后繼Node
        first = next;
    // 在判斷此時的first是否為null,不是則繼續循環
    } while (first != null);
}
2.1.2 transferForSignal

transferForSignal主要功能就是將條件隊列中的節點Node轉移到同步隊列中,源碼如下:

final boolean transferForSignal(Node node) {
    // 說明此節點狀態為CANCELLED,所以跳過該節點(GC會回收)
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 入隊方法(獨占鎖獲取中詳細闡述過)
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); 
    return true;
}

首先通過CAS來將Node的狀態置為0,如果失敗,則說明此時Node狀態是CANCELLED,則直接返回false;如果Node狀態成功置為了0,我們就通過enq方法將此節點入隊到同步隊列中,enq方法已經在第二篇文章中講過,這里就不再復述了。enq方法執行完成后,說明node已經成功進入同步隊列了,然后其返回的是入隊的前驅節點,如果前驅節點是CANCELLED狀態,或者我們將前驅節點的狀態變為SIGNAL失敗,則我們就需要喚醒此節點去搶鎖。這個如果你看了第二篇文章,你肯定是能夠想到的。

2.2 signal

看名字也能大概猜到,因為signalAll是將條件隊列中所有的Node轉移到同步隊列中,所以signal肯定是轉移單個Node。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

我們可以對比上面的signalAll方法,其唯一不同點就是signalAll內部調用的是doSignalAll方法,而signal內部調用的是doSignal方法,我們接著來看doSignal:

private void doSignal(Node first) {
    do {
        // 將firstWaiter指向傳入的first的后繼節點,
        // 然后判斷firstWaiter是否為null,
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

我們可以看到方法里面是個do-While的循環,我們首先將firstWaiter指向first的后繼節點并判斷是否為null,如果為空,則說明條件隊列中只有first這一個節點,所以我們將整個隊列清空。然后我們再將first的的nextWaiter指向null斷開連接,進入while條件語句中。while條件語句中,會先調transferForSignal來轉移Node,如果返回為false,即轉移失敗,我們會判斷此節點下一個節點是否為null,不為null則又進入循環。

三 等待方法

喚醒方法wait,就是將線程阻塞包裝成節點放入條件隊列中,等到其他線程喚醒(signal)或者自身中斷后再重新去獲取鎖。所以其又可以大致分為兩個階段,線程阻塞前和阻塞后。

3.1 await—阻塞前

我們先來看下await的源碼:

public final void await() throws InterruptedException {
    // 如果此線程被中斷過,直接拋中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 將當前線程包裝成節點放入條件隊列
    Node node = addConditionWaiter();
    // 釋放當前線程持有的鎖
    long savedState = fullyRelease(node);
    // 初始化中斷模式參數
    int interruptMode = 0;
    // 檢查節點是否在同步隊列中
    while (!isOnSyncQueue(node)) {
       // 不在同步隊列中則阻塞此線程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被喚醒后再去獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 當線程是被中斷喚醒,node和后繼節點是沒有斷開的
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    // 根據異常標志位對異常進行處理
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
3.1.1 addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

addConditionWaiter的大致邏輯為:lastWaiter不是null且它的等待狀態不是CONDITION,說明lastWaiter的狀態是CANCELLED,所以我們會通過unlinkCancelledWaiters方法來移除條件隊列中所有CANCELLED的節點。然后我們會將當前線程包裝成一個節點,我們再會判斷尾節點是否為null,為null說明條件隊列為空,所以我們就將firstWaiter指向新的節點;如果不為null,就將尾節點的后繼節點指向新節點,然后再重置lastWaiter。最后將新節點返回。

3.1.2 fullyRelease

此時入隊成功后,我們就會調用fullyRelease方法來釋放當前線程所持有的鎖了,我們具體看下源碼:

final long fullyRelease(Node node) {
    boolean failed = true;
    try {
        long savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

其中釋放鎖成功調用的是release方法,這個方法在第二篇文章中詳述過。如果釋放鎖成功,則將failed狀態置為false,然后返回savedState狀態,否則我們就會拋出異常。其中savedState是重入鎖的數量,release方法會一起釋放掉。

再看下finally,如果釋放鎖失敗,我們此線程會拋異常終止,然后在finally將waitStatus置為CANCELLED,然后等待后面被移出條件隊列。

3.1.3 isOnSyncQueue

isOnSyncQueue方法是檢查此節點是否在同步隊列中,具體源碼如下:

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;        
    return findNodeFromTail(node);
}

先看第一個if語句,如果狀態是CONDITION或者prev參數是null,說明此節點是在條件隊列中,返回為false。再來看第二個if,我們知道,prev和next都是同步隊列中的節點連接是用的prev和next,所以如果兩個屬性不為null,說明此節點是在同步隊列中,所以node.next不為null則需要返回true。如果兩個if都不成立,說明這個節點狀態是0且prev不為null,即屬于我們中CAS進入同步隊列的情況,則我們會通過findNodeFromTail方法來確認是不是這種情況

3.1.3.1 findNodeFromTail
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

如果此時tail就是node的話,說明node在同步隊列中,如果不是就像前遍歷。我們再回到await方法:

// 省略
while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}
// 省略

如果不在同步隊列中,則此線程就被park方法阻塞了,只有當線程被喚醒才會在這里開始繼續執行下面代碼。

3.2 wait—喚醒后

我們再來看看await喚醒后的情形:

public final void await() throws InterruptedException {
    // 省略。。。。
    while (!isOnSyncQueue(node)) {
       // 不在同步隊列中則阻塞此線程
        LockSupport.park(this); // < ----- 被喚醒后從下面開始
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被喚醒后再去獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 當線程是被中斷喚醒時,node和后繼節點是沒有斷開的
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 根據異常標志位對異常進行處理
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

我們需要注意的是,線程在這里被喚醒有兩種情況:

  1. 其他線程調用了doSignal或doSignalAll,
  2. 線程被中斷。

我們需要確定我們被喚醒的情況是哪種,這里是通過checkInterruptWhileWaiting方法來判斷。但在講這個方法前,我們需先了解這個interruptMode有幾種狀態:

/** wait方法退出時,會重新再中斷一次 */
private static final int REINTERRUPT =  1;
/** wait方法退出時,會拋出InterruptedException異常 */
private static final int THROW_IE    = -1;

除了上面兩種,還有一種初始態0,它代表線程沒有被中斷過,不做任何處理。

3.2.1 checkInterruptWhileWaiting
private int checkInterruptWhileWaiting(Node node) {
   return Thread.interrupted() ?
       (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
       0;
}

我們看下代碼,首先我們會檢查中斷標志位,如果interrupted方法返回false,說明沒發生中斷,方法最終返回0;如果返回了true,則說明中斷了,則我們需要通過transferAfterCancelledWait方法進一步檢查其他線程是否執行了喚醒操作。

3.2.1.1 transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 
        enq(node);
        return true;
    }

    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;

}

我們先看第一個if條件,如果條件中的CAS操作成功,說明此時的節點肯定是在條件隊列中,則我們調動 enq 方法將此節點放入到同步隊列中,然后返回true。但是這里需要特別注意,這個節點的nextWaiter還沒置為null;如果CAS失敗,說明這個節點可能已經在同步隊列中或者在入隊的過程中,所以我們通過while循環等待此節點入隊后返回false。

我們再回到調用transferAfterCncelled 的 checkInterruptWhileWaiting方法中,根據transferAfterCancelledWait方法返回值我們最終會返回REINTERRUPT或THROW_IE。

然后我們返回到調用checkInterruptWhileWaiting方法的await方法中。

public final void await() throws InterruptedException {
    // 代碼省略
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 我們現在在這里!!!
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

我們可以看到,如果返回值不為0,則直接break跳出循環,如果為0,則再次回到while條件檢查是否在同步隊列中。最后我們看最后剩下的三個if語句:

  1. 通過acquireQueued方法來獲取鎖,這個方法在第二篇中詳細講過,acquireQueued返回true(即獲取鎖的的過程中被中斷了),我們再將interruptMode為0置為REINTERRUPT。
  2. 如果node的nextWaiter不是null。我們會通過unlinkCancelledWaiters方法將條件隊列中所有不為CONDITION的節點移除。
  3. 最后一個if,線程拿到鎖了,且節點沒在同步隊列和條件隊列中,await方法其實算完成了,我們這時候只需要對中斷進行善后處理。如果interruptMode不為0,說明線程是被中斷過的,需要通過reportInterruptAfterWait對中斷進行處理。
3.2.1.2 reportInterruptAfterWait
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

如果是THROW_IE,就是拋異常,如果是REINTERRUPT,就再自我中斷一次。

四 總結

好了,AQS如何解決線程同步與通信問題,就分析完了,這里我再總結一下:

AQS通過ConditionObject類來實現條件變量,并通過其喚醒方法、阻塞方法來進行線程的通信。當線程獲取鎖之后,可以通過signal、signalAll等喚醒方法將條件隊列中被阻塞的線程節點轉移到同步隊列中,然后喚醒去競爭鎖;也可以通過wait方法將自己包裝成節點并放入條件隊列中,然后等待被其他線程喚醒或中斷。

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 通信
    +關注

    關注

    18

    文章

    5971

    瀏覽量

    135857
  • 模型
    +關注

    關注

    1

    文章

    3172

    瀏覽量

    48714
  • 線程
    +關注

    關注

    0

    文章

    504

    瀏覽量

    19651
收藏 人收藏

    評論

    相關推薦

    一文詳解Linux線程同步

    我們在工作中會經常遇到線程同步,那么到底什么是線程同步呢,線程同步的本質是什么,
    發表于 08-25 11:49 ?606次閱讀

    基于TCP/IP協議和多線程通信軟件的設計與實現

    ,解析后以數據表格形式顯示。重點探討了客戶端/服務器模式下基于TCP/IP協議通信的多線程實現過程,并利用時序圖和活動圖進行具體描述。討論了在軟件安裝調試過程中如何解決客戶端死機問題和客戶端數據與服務器
    發表于 05-06 09:02

    Linux多線程線程同步

    。同一進程內的線程共享進程的地址空間。通信:進程間通信IPC,線程間可以直接讀寫進程數據段(如全局變量)來進行通信——需要進程
    發表于 12-08 14:14

    IOT-OS之RT-Thread--- 線程同步線程通信

    rt_thread,下面要介紹線程間的同步通信線程同步對象rt_sem / rt_mutex / rt_event和
    發表于 07-02 06:15

    線程同步機制在應用程序與驅動程序通信中的應用

    本文對Windows NT 操作系統的多線程同步機制和同步對象進行了分析,以其在檢測儀和經緯儀同步通信程序開發中的應用為例,論述了如何通過共
    發表于 08-24 10:02 ?16次下載

    Linux多線程同步方法

    線程對共享相同內存操作時,就會出現多個線程對同一資源的使用,為此,需要對這些線程進行同步,以確保它們在訪問共享內存的時候不會訪問到無效的數值。
    發表于 08-08 14:17 ?2048次閱讀

    了解Linux多線程線程同步

    進程間通信IPC,線程間可以直接讀寫進程數據段(如全局變量)來進行通信——需要進程同步和互斥手段的輔助,以保證數據的一致性。
    發表于 04-23 14:23 ?712次閱讀
    了解Linux多<b class='flag-5'>線程</b>及<b class='flag-5'>線程</b>間<b class='flag-5'>同步</b>

    HMC241AQS16革資料

    HMC241AQS16 Gerber Files
    發表于 03-24 10:12 ?0次下載
    HMC241<b class='flag-5'>AQS</b>16革資料

    HMC253AQS24 S參數

    HMC253AQS24 S參數
    發表于 04-09 14:22 ?2次下載
    HMC253<b class='flag-5'>AQS</b>24 S參數

    RT-Thread文檔_線程同步

    RT-Thread文檔_線程同步
    發表于 02-22 18:29 ?1次下載
    RT-Thread文檔_<b class='flag-5'>線程</b>間<b class='flag-5'>同步</b>

    AQS同步組件有哪些呢?

    AQS 的全稱為 Abstract Queued Synchronizer,是在 JUC(java.util.concurrent)下子包中的類。
    的頭像 發表于 03-16 09:42 ?462次閱讀

    基于AQS共享模式的同步計數器——CountDownLatch

    await(): 調用該方法的線程會被掛起,直到 CountDownLatch 計數器的值為 0 才繼續執行,底層使用的是 AQS 的 tryAcquireShared()
    發表于 04-24 15:02 ?672次閱讀
    基于<b class='flag-5'>AQS</b>共享模式的<b class='flag-5'>同步</b>計數器——CountDownLatch

    AQS獨占鎖的獲取

    AQS提供了兩種鎖,獨占鎖和共享鎖。獨占鎖只有一把鎖,同一時間只允許一個線程獲得鎖;而共享鎖則有多把鎖,同一時間允許多個線程獲得鎖。我們本文主要講獨占鎖。 一. 獨占鎖的獲取 AQS
    的頭像 發表于 10-13 14:51 ?440次閱讀
    <b class='flag-5'>AQS</b>獨占鎖的獲取

    AQS是什么

    的也是這種MESA模型(其模型圖如下圖所示): 可能這個圖大家現在還看不太明白,沒關系,暫時留個印象,當看完指北君AQS系列文章以后,你再回過頭來看這個圖,肯定秒懂! Java中的synchronized關鍵字就是其管程的具體實現,當然,今天所要聊的AQS同樣也是。
    的頭像 發表于 10-13 14:54 ?480次閱讀
    <b class='flag-5'>AQS</b>是什么

    線程如何保證數據的同步

    線程編程是一種并發編程的方法,意味著程序中同時運行多個線程,每個線程可獨立執行不同的任務,共享同一份數據。由于多線程并發執行的特點,會引發數據同步
    的頭像 發表于 11-17 14:22 ?1156次閱讀