一、引言
鎖是解決并發問題的萬能鑰匙,可是并發問題只有鎖能解決嗎?當然不是,CAS也可以解決并發問題
二、什么是CAS
比較并交換(compare and swap,CAS),是原子操作的一種,可用于在多線程編程中實現不被打斷的數據交換操作,從而避免多線程同時改寫某?數據時由于執行順序不確定性以及中斷的不可預知性產?的數據不一致問題
有了CAS,我們就可以用它來實現各種無鎖(lock free)的數據結構
實現原理
該操作通過將內存中的值與指定數據進行比較,當數值?樣時將內存中的數據替換為新的值
下面是兩種int類型操作的CAS偽代碼形式:
//輸入reg的地址,判斷reg的值與oldval是否相等//如果相等,那么就將newval賦值給reg;否則reg保持不變//最終將reg原先的值返回回去
int compare_and_swap(int *reg, int oldval, int newval){ int old_ref_val = *reg; if(old_reg_val == oldval)
*reg = newval; return old_reg_val;
}
//輸入一個pAddr的地址,在函數內部判斷其的值是否與期望值nExpected相等//如果相等那么就將pAddr的值改為nNew并同時返回true;否則就返回false,什么都不做
bool compare_and_swap(int *pAddr, int nExpected, int nNew){ if(*pAddr == nExpected)
{
*pAddr = nNew; return true;
} else
return false;
}
在上面的兩種實現中第二種形式更好,因為它返回bool值讓調用者知道是否更新成功
三、CAS的應用層實現
因為CAS是原子操作,所以在各種庫的原子庫中都有對應的CAS實現方式
gcc/g++中的CAS
對于gcc、g++編譯器來講,其原子操作中包含下面兩個函數,是專門用來做CAS的
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
Windows的CAS
在Windows下,你可以使用下面的Windows API來完成CAS:
InterlockedCompareExchange ( __inout LONG volatile *Target,
__in LONG Exchange,
__in LONG Comperand);
C++中的CAS
C++11標準庫引入了原子操作,包含在頭文件中,下面是專門用于CAS操作的接口
template< class T >bool atomic_compare_exchange_weak( std::atomic< T >* obj,
T* expected, T desired );template< class T >bool atomic_compare_exchange_weak( volatile std::atomic< T >* obj,
T* expected, T desired );
四、無鎖隊列的實現
此處我們只考慮隊列出隊列和進隊列的并發問題:
出隊列: 出隊列時,要保證只有一個線程在對頭結點進行出隊列的操作,否則就會發生錯亂
入隊列: 入隊列時,也一樣,保證只有一個線程在對尾節點進行入隊列的操作,否則就會發生錯亂
無鎖隊列代碼實現
//queue_cas.h#include < iostream >
template< typename ElemType >class Queue{public:
Queue(); //構造函數
~Queue(); //析構函數public: void push(ElemType elem); //入隊列
bool pop(); //出隊列
void show(); //打印隊列的內容private: struct _qNode //隊列節點
{
_qNode(): _next(nullptr) { }
_qNode(ElemType elem): _elem(elem), _next(nullptr) { }
ElemType _elem; struct _qNode *_next;
};private: struct _qNode *_head; //頭結點
struct _qNode *_tail; //尾節點};
template< typename ElemType >
Queue< ElemType >::Queue()
{
_head = _tail =new _qNode();
}
template< typename ElemType >
Queue< ElemType >::~Queue()
{ while(_head != nullptr)
{ struct _qNode *tempNode = _head;
_head = _head- >_next; delete tempNode;
}
}
template< typename ElemType >void Queue< ElemType >::push(ElemType elem)
{ //創建一個新的節點
struct _qNode *newNode = new struct _qNode(elem);
struct _qNode *p = _tail;
struct _qNode *oldp = _tail;
do{ while(p- >_next != nullptr)
p = p- >_next;
} while(__sync_bool_compare_and_swap(&_tail- >_next, nullptr, newNode) != true);
__sync_bool_compare_and_swap(&_tail, oldp, newNode);
}
template< typename ElemType >bool Queue< ElemType >::pop()
{ struct _qNode *p;
do {
p = _head; if(p- >_next == nullptr) return false;
} while(__sync_bool_compare_and_swap(&_head, p , p- >_next) != true); delete p; return true;
}
template< typename ElemType >void Queue< ElemType >::show()
{ struct _qNode* tempNode = _head- >_next;
if(tempNode == nullptr){ std::cout < < "Empty" <
上面為無鎖隊列的實現代碼,我們假定此隊列中頭結點不存儲數據(當做哨兵),尾節點存儲數據
其使用到CAS的核心函數就是push()和pop()函數,在下面我們將
_sync_bool_compare_and_swap()函數調用稱之為CAS操作
push()如下:
假設線程T1和T2都執行push()函數,當線程T1先執行do-while中的CAS操作然后發現其尾節點后為空,于是就執行do-while中的CAS操作將尾節點_tail的_next指針賦值為newNode,然后退出do-while循環,調用第二個CAS操作將尾節點指針向后移動一位
由于CAS是一個原子操作,所以即使同時T2線程了也調用了do-while中的CAS操作,但是其判斷p->_next不為空,因為T1線程已經將尾節點向后移動了,所以其只能繼續執行do,將p向后移動,重新移動到尾節點繼續重新判斷,直到成功為止....
為什么push()函數的最后一個CAS操作不需要判斷是否執行成功,因為:
1.如果有一個線程T1,它的while中的CAS如果成功的話,那么其它所有的隨后線程的CAS都會失敗,然后就會再循環
2.此時,如果T1線程還沒有更新tail指針,其它的線程繼續失敗,因為tail->next不是NULL了
3.直到T1線程更新完tail指針,于是其它的線程中的某個線程就可以得到新的tail指針,繼續往下走了
do作用域中為什么要使用while將p指針向后移動:
- 假設T1線程在調用第二個CAS操作更新_tail指針之前,T1線程停掉或者掛掉了,那么其它線程就會進入死循環
template< typename ElemType >void Queue< ElemType >::push(ElemType elem)
{ //創建一個新的節點
struct _qNode *newNode = new struct _qNode(elem);
struct _qNode *p = _tail;
struct _qNode *oldp = _tail;
do{ //不斷的向后指,直到直到尾節點為止
while(p- >_next != nullptr)
p = p- >_next;
} while(__sync_bool_compare_and_swap(&p- >_next, nullptr, newNode) != true); //如果p沒有移動到真正的尾節點上,那么繼續執行do
//當CAS函數執行成功之后,那么執行這個CAS函數,將尾節點指針向后移動1位
__sync_bool_compare_and_swap(&_tail, oldp, newNode);
}
pop()如下:
- 原理與push()同理,假設線程T1和線程T2都執行pop()操作,假設T1先執行CAS操作將_head向后移動了一位,并且刪除了原先的頭指針
- 那么當T2再執行時發現T1更新過后的_head指針(移動了)與一開始獲取的頭指針p不相等了,那么就繼續執行do作用域重新獲取頭指針,然后重新進行CAS操作
template< typename ElemType >bool Queue< ElemType >::pop()
{ struct _qNode *p;
do { //獲取_head指針
p = _head; if(p- >_next == nullptr) return false;
} while(__sync_bool_compare_and_swap(&_head, p , p- >_next) != true); //判斷頭結點是否被移動過,如果移動過那么就執行do內部重新獲取_head指針
//刪除頭指針
delete p; return true;
}
測試代碼
//queue_cas_test.cpp#include "queue_cas.h"
int main(){
Queue< int > queue; queue.push(1); queue.push(2); queue.push(3); queue.show();
queue.pop(); queue.show();
queue.pop(); queue.show();
queue.pop(); queue.show();
queue.push(1); queue.show();
queue.push(2); queue.show();
}
我們編寫下面的程序 測試一下無鎖隊列的各種操作是否有誤, 結果顯示無誤
五、無鎖隊列性能測試
下面我們將上面的無鎖隊列與C++ STL庫中的queue進行對比,查看一下性能
queue_stl.cpp
#include < stdio.h >#include < time.h >#include < pthread.h >#include < queue >#include < mutex >
using namespace std;
#define FOR_LOOP_NUM 10000000 //隊列push和pop操作函數中for循環的次數
static std::queue< int > _queue; //隊列static std::mutex _mutex; //隊列操作要用到的互斥鎖
static int push_count; //隊列總共push的次數static int pop_count; //隊列總共pop的次數
typedef void *(*thread_func_t)(void *arg);
void *queue_push(void *arg){ for(int i = 0; i < FOR_LOOP_NUM; ++i)
{
_mutex.lock();
_queue.push(i);
push_count++;
_mutex.unlock();
} return NULL;
}
void *queue_pop(void *arg){ while(true)
{
_mutex.lock(); if(_queue.size() > 0)
{
_queue.pop();
pop_count++;
}
_mutex.unlock();
if(pop_count >= FOR_LOOP_NUM) break;
} return NULL;
}
void test_queue(thread_func_t push_func, thread_func_t pop_func, void *arg){ clock_t start = clock();
pthread_t push_tid; if(pthread_create(&push_tid, NULL, push_func, arg) != 0)
{
perror("pthread_create");
}
pthread_t pop_tid; if(pthread_create(&pop_tid, NULL, pop_func, arg) != 0)
{
perror("pthread_create");
}
pthread_join(push_tid, NULL);
pthread_join(pop_tid, NULL);
clock_t end = clock();
printf("spend clock: %ldn", (end - start) / CLOCKS_PER_SEC);
}
int main(){
push_count = 0;
pop_count = 0;
test_queue(queue_push, queue_pop, NULL);
printf("push_count:%d, pop_count:%dn", push_count, pop_count); return 0;
}
其結果顯示,執行10000000萬次push和 10000000萬次pop操作大概要1秒多的時間
queue_cas.cpp
#include < stdio.h >#include < time.h >#include < pthread.h >#include "queue_cas.h"
using namespace std;
#define FOR_LOOP_NUM 10000000 //隊列push和pop操作函數中for循環的次數
static int push_count; //隊列總共push的次數static int pop_count; //隊列總共pop的次數
static Queue< int > _queue;
typedef void *(*thread_func_t)(void *arg);
void *queue_push(void *arg){ for(int i = 0; i < FOR_LOOP_NUM; ++i)
{
_queue.push(i);
push_count++;
} return NULL;
}
void *queue_pop(void *arg){ while(true)
{
_queue.pop();
pop_count++;
if(pop_count >= FOR_LOOP_NUM) break;
} return NULL;
}
void test_queue(thread_func_t push_func, thread_func_t pop_func, void *arg){ clock_t start = clock();
pthread_t push_tid; if(pthread_create(&push_tid, NULL, push_func, arg) != 0)
{
perror("pthread_create");
}
pthread_t pop_tid; if(pthread_create(&pop_tid, NULL, pop_func, arg) != 0)
{
perror("pthread_create");
}
pthread_join(push_tid, NULL);
pthread_join(pop_tid, NULL);
clock_t end = clock();
printf("spend clock: %ldn", (end - start) / CLOCKS_PER_SEC);
}
int main(){
push_count = 0;
pop_count = 0;
test_queue(queue_push, queue_pop, NULL);
printf("push_count:%d, pop_count:%dn", push_count, pop_count); return 0;
}
其結果顯示,執行10000000萬次push和 10000000萬次pop操作大概在1秒之內,沒有超過1秒中
因此,無鎖隊列比使用mutex的效率要高一些
-
編程
+關注
關注
88文章
3595瀏覽量
93604 -
多線程
+關注
關注
0文章
277瀏覽量
19923 -
數據結構
+關注
關注
3文章
573瀏覽量
40093 -
CAS
+關注
關注
0文章
34瀏覽量
15183
發布評論請先 登錄
相關推薦
評論