線程安全
線程安全是多線程或多進程編程中的一個概念,在擁有共享數據的多條線程并行執行的程序中,線程安全的代碼會通過同步機制保證各個線程都可以正常且正確的執行,不會出現數據污染等意外情況。
線程安全的問題最主要還是由線程切換導致的,比如一個房間(進程)中有10顆糖(資源),除此之外還有3個小人(1個主線程、2個子線程),當小人A吃了3顆糖后被系統強制進行休息時他認為還剩下7顆糖,而當小人B工作后又吃掉了3顆糖,那么當小人A重新上崗時會認為糖還剩下7顆,但是實際上只有4顆了。
上述例子中線程A和線程B的數據不同步,這就是線程安全問題,它可能導致非常嚴重的意外情況發生,我們按下面這個示例來進行說明。
下面有一個數值num初始值為0,我們開啟2條線程:
線程1對num進行一千萬次+1的操作
線程2對num進行一千萬次-1的操作
結果可能會令人咋舌,num最后并不是我們所想象的結果0:
?
?
import threading num = 0 def add(): global num for i in range(10_000_000): num += 1 def sub(): global num for i in range(10_000_000): num -= 1 if __name__ == "__main__": subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結果三次采集 # num result : 669214 # num result : -1849179 # num result : -525674
?
?
上面這就是一個非常好的案例,想要解決這個問題就必須通過鎖來保障線程切換的時機。
需要我們值得留意的是,在Python基本數據類型中list、tuple、dict本身就是屬于線程安全的,
所以如果有多個線程對這3種容器做操作時,我們不必考慮線程安全問題。
鎖的作用
鎖是Python提供給我們能夠自行操控線程切換的一種手段,使用鎖可以讓線程的切換變的有序。
一旦線程的切換變的有序后,各個線程之間對數據的訪問、修改就變的可控,所以若要保證線程安全,就必須使用鎖。
threading模塊中提供了5種最常見的鎖,下面是按照功能進行劃分:
同步鎖:lock(一次只能放行一個)
遞歸鎖:rlock(一次只能放行一個)
條件鎖:condition(一次可以放行任意個)
事件鎖:event(一次全部放行)
信號量鎖:semaphore(一次可以放行特定個)
1、Lock() 同步鎖
基本介紹
Lock鎖的稱呼有很多,如:
同步鎖
互斥鎖
它們是什么意思呢?如下所示:
互斥指的是某一資源同一時刻僅能有一個訪問者對其進行訪問,具有唯一性和排他性,但是互斥無法限制訪問者對資源的訪問順序,即訪問是無序的
同步是指在互斥的基礎上(大多數情況),通過其他機制實現訪問者對資源的有序訪問
同步其實已經實現了互斥,是互斥的一種更為復雜的實現,因為它在互斥的基礎上實現了有序訪問的特點
下面是threading模塊與同步鎖提供的相關方法:
使用方式
同步鎖一次只能放行一個線程,一個被加鎖的線程在運行時不會將執行權交出去,只有當該線程被解鎖時才會將執行權通過系統調度交由其他線程。
如下所示,使用同步鎖解決最上面的問題:
?
?
import threading num = 0 def add(): lock.acquire() global num for i in range(10_000_000): num += 1 lock.release() def sub(): lock.acquire() global num for i in range(10_000_000): num -= 1 lock.release() if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結果三次采集 # num result : 0 # num result : 0 # num result : 0
?
?
這樣這個代碼就完全變成了串行的狀態,對于這種計算密集型I/O業務來說,還不如直接使用串行化單線程執行來得快,所以這個例子僅作為一個示例,不能概述鎖真正的用途。
死鎖現象
對于同步鎖來說,一次acquire()必須對應一次release(),不能出現連續重復使用多次acquire()后再重復使用多次release()的操作,這樣會引起死鎖造成程序的阻塞,完全不動了,如下所示:
?
?
import threading num = 0 def add(): lock.acquire() # 上鎖 lock.acquire() # 死鎖 # 不執行 global num for i in range(10_000_000): num += 1 lock.release() lock.release() def sub(): lock.acquire() # 上鎖 lock.acquire() # 死鎖 # 不執行 global num for i in range(10_000_000): num -= 1 lock.release() lock.release() if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num)
?
?
with語句
由于threading.Lock()對象中實現了enter__()與__exit()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:
?
?
import threading num = 0 def add(): with lock: # 自動加鎖 global num for i in range(10_000_000): num += 1 # 自動解鎖 def sub(): with lock: # 自動加鎖 global num for i in range(10_000_000): num -= 1 # 自動解鎖 if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結果三次采集 # num result : 0 # num result : 0 # num result : 0
?
?
2、RLock() 遞歸鎖
基本介紹
遞歸鎖是同步鎖的一個升級版本,在同步鎖的基礎上可以做到連續重復使用多次acquire()后再重復使用多次release()的操作,但是一定要注意加鎖次數和解鎖次數必須一致,否則也將引發死鎖現象。
下面是threading模塊與遞歸鎖提供的相關方法:
使用方式
以下是遞歸鎖的簡單使用,下面這段操作如果使用同步鎖則會發生死鎖現象,但是遞歸鎖不會:
?
?
import threading num = 0 def add(): lock.acquire() lock.acquire() global num for i in range(10_000_000): num += 1 lock.release() lock.release() def sub(): lock.acquire() lock.acquire() global num for i in range(10_000_000): num -= 1 lock.release() lock.release() if __name__ == "__main__": lock = threading.RLock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結果三次采集 # num result : 0 # num result : 0 # num result : 0
?
?
with語句
由于threading.RLock()對象中實現了enter__()與__exit()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:
?
?
import threading num = 0 def add(): with lock: # 自動加鎖 global num for i in range(10_000_000): num += 1 # 自動解鎖 def sub(): with lock: # 自動加鎖 global num for i in range(10_000_000): num -= 1 # 自動解鎖 if __name__ == "__main__": lock = threading.RLock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結果三次采集 # num result : 0 # num result : 0 # num result : 0
?
?
3、Condition() 條件鎖
基本介紹
條件鎖是在遞歸鎖的基礎上增加了能夠暫停線程運行的功能。并且我們可以使用wait()與notify()來控制線程執行的個數。
注意:條件鎖可以自由設定一次放行幾個線程。
下面是threading模塊與條件鎖提供的相關方法:
使用方式
下面這個案例會啟動10個子線程,并且會立即將10個子線程設置為等待狀態。
然后我們可以發送一個或者多個通知,來恢復被等待的子線程繼續運行:
?
?
import threading currentRunThreadNumber = 0 maxSubThreadNumber = 10 def task(): global currentRunThreadNumber thName = threading.currentThread().name condLock.acquire() # 上鎖 print("start and wait run thread : %s" % thName) condLock.wait() # 暫停線程運行、等待喚醒 currentRunThreadNumber += 1 print("carry on run thread : %s" % thName) condLock.release() # 解鎖 if __name__ == "__main__": condLock = threading.Condition() for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() while currentRunThreadNumber < maxSubThreadNumber: notifyNumber = int( input("Please enter the number of threads that need to be notified to run:")) condLock.acquire() condLock.notify(notifyNumber) # 放行 condLock.release() print("main thread run end") # 先啟動10個子線程,然后這些子線程會全部變為等待狀態 # start and wait run thread : Thread-1 # start and wait run thread : Thread-2 # start and wait run thread : Thread-3 # start and wait run thread : Thread-4 # start and wait run thread : Thread-5 # start and wait run thread : Thread-6 # start and wait run thread : Thread-7 # start and wait run thread : Thread-8 # start and wait run thread : Thread-9 # start and wait run thread : Thread-10 # 批量發送通知,放行特定數量的子線程繼續運行 # Please enter the number of threads that need to be notified to run:5 # 放行5個 # carry on run thread : Thread-4 # carry on run thread : Thread-3 # carry on run thread : Thread-1 # carry on run thread : Thread-2 # carry on run thread : Thread-5 #學習中遇到問題沒人解答?小編創建了一個Python學習交流群:725638078 # Please enter the number of threads that need to be notified to run:5 # 放行5個 # carry on run thread : Thread-8 # carry on run thread : Thread-10 # carry on run thread : Thread-6 # carry on run thread : Thread-9 # carry on run thread : Thread-7 # Please enter the number of threads that need to be notified to run:1 # main thread run end
?
?
with語句
由于threading.Condition()對象中實現了enter__()與__exit()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:
?
?
import threading currentRunThreadNumber = 0 maxSubThreadNumber = 10 def task(): global currentRunThreadNumber thName = threading.currentThread().name with condLock: print("start and wait run thread : %s" % thName) condLock.wait() # 暫停線程運行、等待喚醒 currentRunThreadNumber += 1 print("carry on run thread : %s" % thName) if __name__ == "__main__": condLock = threading.Condition() for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() while currentRunThreadNumber < maxSubThreadNumber: notifyNumber = int( input("Please enter the number of threads that need to be notified to run:")) with condLock: condLock.notify(notifyNumber) # 放行 print("main thread run end")
?
?
4、Event() 事件鎖
基本介紹
事件鎖是基于條件鎖來做的,它與條件鎖的區別在于一次只能放行全部,不能放行任意個數量的子線程繼續運行。
我們可以將事件鎖看為紅綠燈,當紅燈時所有子線程都暫停運行,并進入“等待”狀態,當綠燈時所有子線程都恢復“運行”。
下面是threading模塊與事件鎖提供的相關方法:
使用方式
事件鎖不能利用with語句來進行使用,只能按照常規方式。
如下所示,我們來模擬線程和紅綠燈的操作,紅燈停,綠燈行:
?
?
# 生成一個事件鎖對象 eve = threading.Event() # 將事件鎖設置為紅燈狀態 eve.clear() # 判斷事件鎖的狀態 eve.is_set() # 將當前線程設置’等待‘狀態 eve.wait() # 將事件鎖設置為綠燈狀態 eve.set() import time import threading def light(eve): print(f'當前時間:{time.ctime()}, 紅燈還有 5s 結束!') time.sleep(5) print(f'當前時間:{time.ctime()}, 綠燈亮!') eve.set() # 設置事件鎖標志為 True def car(eve, name): print(f'當前時間:{time.ctime()}, 車 {name} 正在等紅燈') eve.wait() # 將當前線程設置為等待狀態,等待事件鎖標志為 True 再執行 print(f'當前時間:{time.ctime()}, 車 {name} 開始通行') if __name__ == '__main__': eve = threading.Event() # 事件鎖默認標志為 False t1 = threading.Thread(target=light, args=(eve,)) t1.start() for each in 'ABCDE': t2 = threading.Thread(target=car, args=(eve, each)) t2.start() #學習中遇到問題沒人解答?小編創建了一個Python學習交流群:725638078 # 執行看一下打印結果 當前時間:Fri Jul 29 11:32:58 2022, 紅燈還有 5s 結束! 當前時間:Fri Jul 29 11:32:58 2022, 車 A 正在等紅燈 當前時間:Fri Jul 29 11:32:58 2022, 車 B 正在等紅燈 當前時間:Fri Jul 29 11:32:58 2022, 車 C 正在等紅燈 當前時間:Fri Jul 29 11:32:58 2022, 車 D 正在等紅燈 當前時間:Fri Jul 29 11:32:58 2022, 車 E 正在等紅燈 當前時間:Fri Jul 29 11:33:03 2022, 綠燈亮! 當前時間:Fri Jul 29 11:33:03 2022, 車 C 開始通行 當前時間:Fri Jul 29 11:33:03 2022, 車 B 開始通行 當前時間:Fri Jul 29 11:33:03 2022, 車 E 開始通行 當前時間:Fri Jul 29 11:33:03 2022, 車 A 開始通行 當前時間:Fri Jul 29 11:33:03 2022, 車 D 開始通行
?
?
5、Semaphore() 信號量鎖
基本介紹
信號量鎖也是根據條件鎖來做的,它與條件鎖和事件鎖的區別如下:
條件鎖:一次可以放行任意個處于“等待”狀態的線程
事件鎖:一次可以放行全部的處于“等待”狀態的線程
信號量鎖:通過規定,成批的放行特定個處于“上鎖”狀態的線程
下面是threading模塊與信號量鎖提供的相關方法:
使用方式
以下是使用示例,你可以將它當做一段限寬的路段,每次只能放行相同數量的線程:
?
?
import threading import time maxSubThreadNumber = 6 def task(): thName = threading.currentThread().name semaLock.acquire() print("run sub thread %s" % thName) time.sleep(3) semaLock.release() if __name__ == "__main__": # 每次只能放行2個 semaLock = threading.Semaphore(2) for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() # run sub thread Thread-1 # run sub thread Thread-2 # run sub thread Thread-3 # run sub thread Thread-4 # run sub thread Thread-6 # run sub thread Thread-5
?
?
with語句
由于threading.Semaphore()對象中實現了enter__()與__exit()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:
?
?
import threading import time maxSubThreadNumber = 6 def task(): thName = threading.currentThread().name with semaLock: print("run sub thread %s" % thName) time.sleep(3) if __name__ == "__main__": semaLock = threading.Semaphore(2) for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start()
?
?
鎖關系淺析
上面5種鎖可以說都是基于同步鎖來做的,這些你都可以從源碼中找到答案。
首先來看RLock遞歸鎖,遞歸鎖的實現非常簡單,它的內部會維護著一個計數器,當計數器不為0的時候該線程不能被I/O操作和時間輪詢機制切換。但是當計數器為0的時候便不會如此了:
?
?
def __init__(self): self._block = _allocate_lock() self._owner = None self._count = 0 # 計數器
?
?
而Condition條件鎖的內部其實是有兩把鎖的,一把底層鎖(同步鎖)一把高級鎖(遞歸鎖)。
低層鎖的解鎖方式有兩種,使用wait()方法會暫時解開底層鎖同時加上一把高級鎖,只有當接收到別的線程里的notfiy()后才會解開高級鎖和重新上鎖低層鎖,也就是說條件鎖底層是根據同步鎖和遞歸鎖的不斷切換來進行實現的:
?
?
def __init__(self, lock=None): if lock is None: lock = RLock() # 可以看到條件鎖的內部是基于遞歸鎖,而遞歸鎖又是基于同步鎖來做的 self._lock = lock self.acquire = lock.acquire self.release = lock.release try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = _deque()
?
?
基本練習題
條件鎖的應用
需求:一個空列表,兩個線程輪番往里面加值(一個加偶數,一個加奇數),最終讓該列表中的值為 1 - 100 ,且是有序排列的。
?
?
import threading lst = [] def even(): """加偶數""" with condLock: for i in range(2, 101, 2): # 判斷當前列表的長度處于2是否能處盡 # 如果能處盡則代表需要添加奇數 # 否則就添加偶數 if len(lst) % 2 != 0: # 添偶數 lst.append(i) # 先添加值 condLock.notify() # 告訴另一個線程,你可以加奇數了,但是這里不會立即交出執行權 condLock.wait() # 交出執行權,并等待另一個線程通知加偶數 else: # 添奇數 condLock.wait() # 交出執行權,等待另一個線程通知加偶數 lst.append(i) condLock.notify() condLock.notify() def odd(): """加奇數""" with condLock: for i in range(1, 101, 2): if len(lst) % 2 == 0: lst.append(i) condLock.notify() condLock.wait() condLock.notify() #學習中遇到問題沒人解答?小編創建了一個Python學習交流群:725638078 if __name__ == "__main__": condLock = threading.Condition() addEvenTask = threading.Thread(target=even) addOddTask = threading.Thread(target=odd) addEvenTask.start() addOddTask.start() addEvenTask.join() addOddTask.join() print(lst)
?
?
事件鎖的應用
有2個任務線程來扮演李白和杜甫,如何讓他們一人一句進行對答?文本如下:
杜甫:老李啊,來喝酒!
李白:老杜啊,不喝了我喝不下了!
杜甫:老李啊,再來一壺?
杜甫:…老李?
李白:呼呼呼…睡著了..
代碼如下:
?
?
import threading def libai(): event.wait() print("李白:老杜啊,不喝了我喝不下了!") event.set() event.clear() event.wait() print("李白:呼呼呼...睡著了..") def dufu(): print("杜甫:老李啊,來喝酒!") event.set() event.clear() event.wait() print("杜甫:老李啊,再來一壺?") print("杜甫:...老李?") event.set() if __name__ == '__main__': event = threading.Event() t1 = threading.Thread(target=libai) t2 = threading.Thread(target=dufu) t1.start() t2.start() t1.join() t2.join()
?
?
審核編輯:黃飛
?
評論
查看更多