精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

無鎖CAS如何實現各種無鎖的數據結構

科技綠洲 ? 來源:Linux開發架構之路 ? 作者:Linux開發架構之路 ? 2023-11-13 15:38 ? 次閱讀

一、引言

鎖是解決并發問題的萬能鑰匙,可是并發問題只有鎖能解決嗎?當然不是,CAS也可以解決并發問題

二、什么是CAS

比較并交換(compare and swap,CAS),是原子操作的一種,可用于在多線程編程中實現不被打斷的數據交換操作,從而避免多線程同時改寫某?數據時由于執行順序不確定性以及中斷的不可預知性產?的數據不一致問題

有了CAS,我們就可以用它來實現各種無鎖(lock free)的數據結構

實現原理

該操作通過將內存中的值與指定數據進行比較,當數值?樣時將內存中的數據替換為新的值

下面是兩種int類型操作的CAS偽代碼形式:

//輸入reg的地址,判斷reg的值與oldval是否相等//如果相等,那么就將newval賦值給reg;否則reg保持不變//最終將reg原先的值返回回去
 int compare_and_swap(int *reg, int oldval, int newval){    int old_ref_val = *reg;    if(old_reg_val == oldval)
        *reg = newval;    return old_reg_val;
}
//輸入一個pAddr的地址,在函數內部判斷其的值是否與期望值nExpected相等//如果相等那么就將pAddr的值改為nNew并同時返回true;否則就返回false,什么都不做
 bool compare_and_swap(int *pAddr, int nExpected, int nNew){    if(*pAddr == nExpected)
    {
        *pAddr = nNew;        return true;
    }    else
        return false;
}

在上面的兩種實現中第二種形式更好,因為它返回bool值讓調用者知道是否更新成功

三、CAS的應用層實現

因為CAS是原子操作,所以在各種庫的原子庫中都有對應的CAS實現方式

gcc/g++中的CAS

對于gcc、g++編譯器來講,其原子操作中包含下面兩個函數,是專門用來做CAS的

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);

Windows的CAS

在Windows下,你可以使用下面的Windows API來完成CAS:

InterlockedCompareExchange ( __inout LONG volatile  *Target,
                                __in LONG Exchange,
                                __in LONG Comperand);

C++中的CAS

C++11標準庫引入了原子操作,包含在頭文件中,下面是專門用于CAS操作的接口

template< class T >bool atomic_compare_exchange_weak( std::atomic< T >* obj,
                                   T* expected, T desired );template< class T >bool atomic_compare_exchange_weak( volatile std::atomic< T >* obj,
                                   T* expected, T desired );

四、無鎖隊列的實現

此處我們只考慮隊列出隊列和進隊列的并發問題:

出隊列: 出隊列時,要保證只有一個線程在對頭結點進行出隊列的操作,否則就會發生錯亂

入隊列: 入隊列時,也一樣,保證只有一個線程在對尾節點進行入隊列的操作,否則就會發生錯亂

無鎖隊列代碼實現

//queue_cas.h#include < iostream >

 template< typename ElemType >class Queue{public:
    Queue();                  //構造函數
    ~Queue();                 //析構函數public:    void push(ElemType elem); //入隊列
    bool pop();               //出隊列
    void show();              //打印隊列的內容private:    struct _qNode             //隊列節點
    {
        _qNode(): _next(nullptr) { } 
        _qNode(ElemType elem): _elem(elem), _next(nullptr) { } 
        ElemType       _elem;        struct _qNode *_next;
    };private:     struct _qNode *_head;    //頭結點
     struct _qNode *_tail;    //尾節點}; 
template< typename ElemType >
Queue< ElemType >::Queue()
{
    _head = _tail =new _qNode();
} 
template< typename ElemType >
Queue< ElemType >::~Queue()
{    while(_head != nullptr)
    {        struct _qNode *tempNode = _head;
        _head = _head- >_next;        delete tempNode;
    }
} 

template< typename ElemType >void Queue< ElemType >::push(ElemType elem)
{   //創建一個新的節點
   struct _qNode *newNode = new struct _qNode(elem);

   struct _qNode *p = _tail;
   struct _qNode *oldp = _tail;

   do{        while(p- >_next != nullptr)
            p = p- >_next;
   } while(__sync_bool_compare_and_swap(&_tail- >_next, nullptr, newNode) != true);
   __sync_bool_compare_and_swap(&_tail, oldp, newNode);
} 
template< typename ElemType >bool Queue< ElemType >::pop()
{   struct _qNode *p;
   do {
        p = _head;        if(p- >_next == nullptr)            return false;
   } while(__sync_bool_compare_and_swap(&_head, p , p- >_next) != true);   delete p;   return true;
} 
template< typename ElemType >void Queue< ElemType >::show()
{    struct _qNode* tempNode = _head- >_next;

    if(tempNode == nullptr){        std::cout < < "Empty" <

上面為無鎖隊列的實現代碼,我們假定此隊列中頭結點不存儲數據(當做哨兵),尾節點存儲數據

圖片

其使用到CAS的核心函數就是push()和pop()函數,在下面我們將

_sync_bool_compare_and_swap()函數調用稱之為CAS操作

push()如下:

假設線程T1和T2都執行push()函數,當線程T1先執行do-while中的CAS操作然后發現其尾節點后為空,于是就執行do-while中的CAS操作將尾節點_tail的_next指針賦值為newNode,然后退出do-while循環,調用第二個CAS操作將尾節點指針向后移動一位

由于CAS是一個原子操作,所以即使同時T2線程了也調用了do-while中的CAS操作,但是其判斷p->_next不為空,因為T1線程已經將尾節點向后移動了,所以其只能繼續執行do,將p向后移動,重新移動到尾節點繼續重新判斷,直到成功為止....

為什么push()函數的最后一個CAS操作不需要判斷是否執行成功,因為:

1.如果有一個線程T1,它的while中的CAS如果成功的話,那么其它所有的隨后線程的CAS都會失敗,然后就會再循環

2.此時,如果T1線程還沒有更新tail指針,其它的線程繼續失敗,因為tail->next不是NULL了

3.直到T1線程更新完tail指針,于是其它的線程中的某個線程就可以得到新的tail指針,繼續往下走了

do作用域中為什么要使用while將p指針向后移動:

  • 假設T1線程在調用第二個CAS操作更新_tail指針之前,T1線程停掉或者掛掉了,那么其它線程就會進入死循環
template< typename ElemType >void Queue< ElemType >::push(ElemType elem)
{   //創建一個新的節點
   struct _qNode *newNode = new struct _qNode(elem);

   struct _qNode *p = _tail;
   struct _qNode *oldp = _tail;

   do{        //不斷的向后指,直到直到尾節點為止
        while(p- >_next != nullptr)
            p = p- >_next;
   } while(__sync_bool_compare_and_swap(&p- >_next, nullptr, newNode) != true); //如果p沒有移動到真正的尾節點上,那么繼續執行do

    //當CAS函數執行成功之后,那么執行這個CAS函數,將尾節點指針向后移動1位
    __sync_bool_compare_and_swap(&_tail, oldp, newNode);
}

pop()如下:

  • 原理與push()同理,假設線程T1和線程T2都執行pop()操作,假設T1先執行CAS操作將_head向后移動了一位,并且刪除了原先的頭指針
  • 那么當T2再執行時發現T1更新過后的_head指針(移動了)與一開始獲取的頭指針p不相等了,那么就繼續執行do作用域重新獲取頭指針,然后重新進行CAS操作
template< typename ElemType >bool Queue< ElemType >::pop()
{   struct _qNode *p;
   do {        //獲取_head指針
        p = _head;        if(p- >_next == nullptr)            return false;
   } while(__sync_bool_compare_and_swap(&_head, p , p- >_next) != true); //判斷頭結點是否被移動過,如果移動過那么就執行do內部重新獲取_head指針

   //刪除頭指針
   delete p;   return true;

}

測試代碼

//queue_cas_test.cpp#include "queue_cas.h"
 int main(){
    Queue< int > queue;    queue.push(1);    queue.push(2);    queue.push(3);    queue.show(); 
    queue.pop();    queue.show(); 
    queue.pop();    queue.show(); 
    queue.pop();    queue.show(); 
    queue.push(1);    queue.show(); 
    queue.push(2);    queue.show();
}

我們編寫下面的程序 測試一下無鎖隊列的各種操作是否有誤, 結果顯示無誤

圖片

五、無鎖隊列性能測試

下面我們將上面的無鎖隊列與C++ STL庫中的queue進行對比,查看一下性能

queue_stl.cpp

#include < stdio.h >#include < time.h >#include < pthread.h >#include < queue >#include < mutex >
 using namespace std; 
#define FOR_LOOP_NUM 10000000  //隊列push和pop操作函數中for循環的次數
 static std::queue< int > _queue; //隊列static std::mutex      _mutex; //隊列操作要用到的互斥鎖
 static int push_count;         //隊列總共push的次數static int pop_count;          //隊列總共pop的次數
 typedef void *(*thread_func_t)(void *arg); 

void *queue_push(void *arg){    for(int i = 0; i < FOR_LOOP_NUM; ++i)
    {
        _mutex.lock();
        _queue.push(i);
        push_count++;
        _mutex.unlock();
    }    return NULL;
} 
void *queue_pop(void *arg){    while(true)
    {
        _mutex.lock();        if(_queue.size() > 0)
        {
            _queue.pop();
            pop_count++;
        }
        _mutex.unlock();        
        if(pop_count >= FOR_LOOP_NUM)            break;
    }    return NULL;
} 
void test_queue(thread_func_t push_func, thread_func_t pop_func, void *arg){    clock_t start = clock();    
    pthread_t push_tid;    if(pthread_create(&push_tid, NULL, push_func, arg) != 0)
    {
        perror("pthread_create");
    } 
    pthread_t pop_tid;    if(pthread_create(&pop_tid, NULL, pop_func, arg) != 0)
    {
        perror("pthread_create");
    }

    pthread_join(push_tid, NULL);
    pthread_join(pop_tid, NULL); 
    clock_t end = clock(); 
    printf("spend clock: %ldn", (end - start) / CLOCKS_PER_SEC);
} 
int main(){
    push_count = 0;
    pop_count = 0;

    test_queue(queue_push, queue_pop, NULL); 
    printf("push_count:%d, pop_count:%dn", push_count, pop_count);    return 0;
}

其結果顯示,執行10000000萬次push和 10000000萬次pop操作大概要1秒多的時間

圖片

queue_cas.cpp

#include < stdio.h >#include < time.h >#include < pthread.h >#include "queue_cas.h"
 using namespace std; 
#define FOR_LOOP_NUM 10000000  //隊列push和pop操作函數中for循環的次數
 static int push_count;         //隊列總共push的次數static int pop_count;          //隊列總共pop的次數
 static Queue< int > _queue; 
typedef void *(*thread_func_t)(void *arg); 
void *queue_push(void *arg){    for(int i = 0; i < FOR_LOOP_NUM; ++i)
    {
        _queue.push(i);
        push_count++;
    }    return NULL;
} 
void *queue_pop(void *arg){    while(true)
    {
        _queue.pop();
        pop_count++;        
        if(pop_count >= FOR_LOOP_NUM)            break;
    }    return NULL;
} 
void test_queue(thread_func_t push_func, thread_func_t pop_func, void *arg){    clock_t start = clock();    
    pthread_t push_tid;    if(pthread_create(&push_tid, NULL, push_func, arg) != 0)
    {
        perror("pthread_create");
    } 
    pthread_t pop_tid;    if(pthread_create(&pop_tid, NULL, pop_func, arg) != 0)
    {
        perror("pthread_create");
    }

    pthread_join(push_tid, NULL);
    pthread_join(pop_tid, NULL); 
    clock_t end = clock(); 
    printf("spend clock: %ldn", (end - start) / CLOCKS_PER_SEC);
} 
int main(){
    push_count = 0;
    pop_count = 0;

    test_queue(queue_push, queue_pop, NULL); 
    printf("push_count:%d, pop_count:%dn", push_count, pop_count);    return 0;
}

其結果顯示,執行10000000萬次push和 10000000萬次pop操作大概在1秒之內,沒有超過1秒中

圖片

因此,無鎖隊列比使用mutex的效率要高一些

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 編程
    +關注

    關注

    88

    文章

    3595

    瀏覽量

    93604
  • 多線程
    +關注

    關注

    0

    文章

    277

    瀏覽量

    19923
  • 數據結構
    +關注

    關注

    3

    文章

    573

    瀏覽量

    40093
  • CAS
    CAS
    +關注

    關注

    0

    文章

    34

    瀏覽量

    15183
收藏 人收藏

    評論

    相關推薦

    MCU上的原子讀操作

    變量的操作(必須單核),大家可以仔細想想。單讀單寫隊列是MCU上經常用的,對中斷通信接口的緩沖非常方便可靠。以此為基礎,可跨平臺實現。
    發表于 03-06 09:39

    OpenHarmony——內核IPC機制數據結構解析

    可以調用LOS_EventClear來清除事件。四、數據結構--互斥互斥又稱互斥型信號量,是一種特殊的二值性信號量,用于實現對共享資源的獨占式處理。任意時刻互斥
    發表于 09-05 11:02

    OpenHarmony——內核IPC機制數據結構解析

    、數據結構--互斥互斥又稱互斥型信號量,是一種特殊的二值性信號量,用于實現對共享資源的獨占式處理。任意時刻互斥的狀態只有開鎖或閉鎖,當
    發表于 09-08 11:44

    《有》/《》/《簽約》/《解鎖》/《越獄》/《激活》專

    《有》/《》/《簽約》/《解鎖》/《越獄》/《激活》專業技術詞解析 在討論區里,大家看到:《有版》,《
    發表于 02-03 11:05 ?945次閱讀

    智能按鍵出現反應或禁止操作的原因坤坤智能告訴你

    智能按鍵出現反應或禁止操作的原因坤坤智能告訴你在日常生活中使用智能時,多多少少會遇到智能熱鍵
    發表于 12-14 14:47 ?1.1w次閱讀

    利用CAS技術實現隊列

    【 導讀 】:本文 主要講解利用CAS技術實現隊列。 關于隊列的
    的頭像 發表于 01-11 10:52 ?2255次閱讀
    利用<b class='flag-5'>CAS</b>技術<b class='flag-5'>實現</b><b class='flag-5'>無</b><b class='flag-5'>鎖</b>隊列

    關于CAS等原子操作介紹 隊列的鏈表實現方法

    ,X86下對應的是 CMPXCHG 匯編指令。有了這個原子操作,我們就可以用其來實現各種(lock free)的數據結構。
    的頭像 發表于 05-18 09:12 ?3374次閱讀
    關于<b class='flag-5'>CAS</b>等原子操作介紹 <b class='flag-5'>無</b><b class='flag-5'>鎖</b>隊列的鏈表<b class='flag-5'>實現</b>方法

    源智能的應用前景

    ,但應用前景廣闊。源智能的發展優勢:1.政策支持:近年來,國家大力推進物聯網、大數據、新能源的發展,并且陸續出臺各項產業政策,引導智能行業有序化、高端化發展,
    的頭像 發表于 09-22 10:18 ?1470次閱讀
    <b class='flag-5'>無</b>源智能<b class='flag-5'>鎖</b>的應用前景

    新品上架——源智能把手

    為了迎合市場需求,2022年我司開始著手開發源智能把手。經過幾個月的努力,2022年11月我司正式上架源智能把手。源智能把手
    的頭像 發表于 11-11 17:56 ?580次閱讀
    新品上架——<b class='flag-5'>無</b>源智能把手<b class='flag-5'>鎖</b>

    源智能系統之水務消防

    源智能系統之水務消防
    的頭像 發表于 05-22 09:48 ?450次閱讀
    <b class='flag-5'>無</b>源智能<b class='flag-5'>鎖</b>系統之水務消防

    如何實現一個多讀多寫的線程安全的隊列

    在ZMQ隊列的原理與實現一文中,我們已經知道了ypipe可以實現一線程寫一線程讀的隊列,
    的頭像 發表于 11-08 15:25 ?1211次閱讀
    如何<b class='flag-5'>實現</b>一個多讀多寫的線程安全的<b class='flag-5'>無</b><b class='flag-5'>鎖</b>隊列

    隊列的潛在優勢

    隊列 先大致介紹一下隊列。隊列的根本是CAS
    的頭像 發表于 11-09 09:23 ?553次閱讀
    <b class='flag-5'>無</b><b class='flag-5'>鎖</b>隊列的潛在優勢

    CAS如何實現各種數據結構

    匯編指令。有了這個原子操作,我們就可以用其來實現各種(lock free)的數據結構。 這個操作用C語言來描述就是下面這個樣子:意思就
    的頭像 發表于 11-10 11:00 ?524次閱讀
    <b class='flag-5'>CAS</b>如何<b class='flag-5'>實現</b><b class='flag-5'>各種</b><b class='flag-5'>無</b><b class='flag-5'>鎖</b>的<b class='flag-5'>數據結構</b>

    隊列解決的問題

    為什么需要隊列 隊列解決了什么問題?隊列解決了
    的頭像 發表于 11-10 15:33 ?906次閱讀
    <b class='flag-5'>無</b><b class='flag-5'>鎖</b>隊列解決的問題

    互斥和自旋實現原理

    互斥和自旋是操作系統中常用的同步機制,用于控制對共享資源的訪問,以避免多個線程或進程同時訪問同一資源,從而引發數據不一致或競爭條件等問題。 互斥(Mutex) 互斥
    的頭像 發表于 07-10 10:07 ?414次閱讀