作為在日常開發生產中非常實用的語言,有必要掌握一些python用法,比如爬蟲、網絡請求等場景,很是實用。但python是單線程的,如何提高python的處理速度,是一個很重要的問題,這個問題的一個關鍵技術,叫協程。本篇文章,講講python協程的理解與使用,主要是針對網絡請求這個模塊做一個梳理,希望能幫到有需要的同學。
概念篇
在理解協程這個概念及其作用場景前,先要了解幾個基本的關于操作系統的概念,主要是進程、線程、同步、異步、阻塞、非阻塞,了解這幾個概念,不僅是對協程這個場景,諸如消息隊列、緩存等,都有一定的幫助。接下來,編者就自己的理解和網上查詢的材料,做一個總結。
進程
在面試的時候,我們都會記住一個概念,進程是系統資源分配的最小單位。是的,系統由一個個程序,也就是進程組成的,一般情況下,分為文本區域、數據區域和堆棧區域。
文本區域存儲處理器執行的代碼(機器碼),通常來說,這是一個只讀區域,防止運行的程序被意外修改。
數據區域存儲所有的變量和動態分配的內存,又細分為初始化的數據區(所有初始化的全局、靜態、常量,以及外部變量)和為初始化的數據區(初始化為0的全局變量和靜態變量),初始化的變量最初保存在文本區,程序啟動后被拷貝到初始化的數據區。
堆棧區域存儲著活動過程調用的指令和本地變量,在地址空間里,棧區緊連著堆區,他們的增長方向相反,內存是線性的,所以我們代碼放在低地址的地方,由低向高增長,棧區大小不可預測,隨開隨用,因此放在高地址的地方,由高向低增長。當堆和棧指針重合的時候,意味著內存耗盡,造成內存溢出。
進程的創建和銷毀都是相對于系統資源,非常消耗資源,是一種比較昂貴的操作。進程為了自身能得到運行,必須要搶占式的爭奪CPU。對于單核CPU來說,在同一時間只能執行一個進程的代碼,所以在單核CPU上實現多進程,是通過CPU快速的切換不同進程,看上去就像是多個進程在同時進行。
由于進程間是隔離的,各自擁有自己的內存內存資源,相比于線程的共同共享內存來說,相對安全,不同進程之間的數據只能通過 IPC(Inter-Process Communication) 進行通信共享。
線程
線程是CPU調度的最小單位。如果進程是一個容器,線程就是運行在容器里面的程序,線程是屬于進程的,同個進程的多個線程共享進程的內存地址空間。
線程間的通信可以直接通過全局變量進行通信,所以相對來說,線程間通信是不太安全的,因此引入了各種鎖的場景,不在這里闡述。
當一個線程崩潰了,會導致整個進程也崩潰了,即其他線程也掛了, 但多進程而不會,一個進程掛了,另一個進程依然照樣運行。
在多核操作系統中,默認進程內只有一個線程,所以對多進程的處理就像是一個進程一個核心。
同步和異步
同步和異步關注的是消息通信機制,所謂同步,就是在發出一個函數調用時,在沒有得到結果之前,該調用不會返回。一旦調用返回,就立即得到執行的返回值,即調用者主動等待調用結果。所謂異步,就是在請求發出去后,這個調用就立即返回,沒有返回結果,通過回調等方式告知該調用的實際結果。
同步的請求,需要主動讀寫數據,并且等待結果;異步的請求,調用者不會立刻得到結果。而是在調用發出后,被調用者通過狀態、通知來通知調用者,或通過回調函數處理這個調用。
阻塞和非阻塞
關注的是程序在等待調用結果(消息,返回值)時的狀態。
阻塞調用是指調用結果返回之前,當前線程會被掛起。調用線程只有在得到結果之后才會返回。非阻塞調用指在不能立刻得到結果之前,該調用不會阻塞當前線程。所以,區分的條件在于,進程/線程要訪問的數據是否就緒,進程/線程是否需要等待。
非阻塞一般通過多路復用實現,多路復用有 select、poll、epoll幾種實現方式。
協程
在了解前面的幾個概念后,我們再來看協程的概念。
協程是屬于線程的,又稱微線程,纖程,英文名Coroutine。舉個例子,在執行函數A時,我希望隨時中斷去執行函數B,然后中斷B的執行,切換回來執行A。這就是協程的作用,由調用者自由切換。這個切換過程并不是等同于函數調用,因為它沒有調用語句。執行方式與多線程類似,但是協程只有一個線程執行。
協程的優點是執行效率非常高,因為協程的切換由程序自身控制,不需要切換線程,即沒有切換線程的開銷。同時,由于只有一個線程,不存在沖突問題,不需要依賴鎖(加鎖與釋放鎖存在很多資源消耗)。
協程主要的使用場景在于處理IO密集型程序,解決效率問題,不適用于CPU密集型程序的處理。然而實際場景中這兩種場景非常多,如果要充分發揮CPU利用率,可以結合多進程+協程的方式。后續我們會講到結合點。
原理篇
根據wikipedia的定義,協程是一個無優先級的子程序調度組件,允許子程序在特點的地方掛起恢復。所以理論上,只要內存足夠,一個線程中可以有任意多個協程,但同一時刻只能有一個協程在運行,多個協程分享該線程分配到的計算機資源。協程是為了充分發揮異步調用的優勢,異步操作則是為了避免IO操作阻塞線程。
知識準備
在了解原理前,我們先做一個知識的準備工作。
1)現代主流的操作系統幾乎都是分時操作系統,即一臺計算機采用時間片輪轉的方式為多個用戶服務,系統資源分配的基本單位是進程,CPU調度的基本單位是線程。
2)運行時內存空間分為變量區,棧區,堆區。內存地址分配上,堆區從低地到高,棧區從高往低。
3)計算機執行時一條條指令讀取執行,執行到當前指令時,下一條指令的地址在指令寄存器的IP中,ESP寄存值指向當前棧頂地址,EBP指向當前活動棧幀的基地址。
4)系統發生函數調用時操作為:先將入參從右往左依次壓棧,然后把返回地址壓棧,最后將當前EBP寄存器的值壓棧,修改ESP寄存器的值,在棧區分配當前函數局部變量所需的空間。
5)協程的上下文包含屬于當前協程的棧區和寄存器里面存放的值。
事件循環
在python3.3中,通過關鍵字yield from使用協程,在3.5中,引入了關于協程的語法糖async和await,我們主要看async/await的原理解析。其中,事件循環是一個核心所在,編寫過 js的同學,會對事件循環Eventloop更加了解, 事件循環是一種等待程序分配事件或消息的編程架構(維基百科)。在python中,asyncio.coroutine 修飾器用來標記作為協程的函數, 這里的協程是和asyncio及其事件循環一起使用的,而在后續的發展中,async/await被使用的越來越廣泛。
async/await
async/await是使用python協程的關鍵,從結構上來看,asyncio 實質上是一個異步框架,async/await 是為異步框架提供的 API已方便使用者調用,所以使用者要想使用async/await 編寫協程代碼,目前必須機遇 asyncio 或其他異步庫。
Future
在實際開發編寫異步代碼時,為了避免太多的回調方法導致的回調地獄,但又需要獲取異步調用的返回結果結果,聰明的語言設計者設計了一個 叫Future的對象,封裝了與loop 的交互行為。其大致執行過程為:程序啟動后,通過add_done_callback 方法向 epoll 注冊回調函數,當 result 屬性得到返回值后,主動運行之前注冊的回調函數,向上傳遞給 coroutine。這個Future對象為asyncio.Future。
但是,要想取得返回值,程序必須恢復恢復工作狀態,而由于Future 對象本身的生存周期比較短,每一次注冊回調、產生事件、觸發回調過程后工作可能已經完成,所以用 Future 向生成器 send result 并不合適。所以這里又引入一個新的對象 Task,保存在Future 對象中,對生成器協程進行狀態管理。
Python 里另一個 Future 對象是 concurrent.futures.Future,與 asyncio.Future 互不兼容,容易產生混淆。區別點在于,concurrent.futures 是線程級的 Future 對象,當使用 concurrent.futures.Executor 進行多線程編程時,該對象用于在不同的 thread 之間傳遞結果。
Task
上文中提到,Task是維護生成器協程狀態處理執行邏輯的的任務對象,Task 中有一個_step 方法,負責生成器協程與 EventLoop 交互過程的狀態遷移,整個過程可以理解為:Task向協程 send 一個值,恢復其工作狀態。當協程運行到斷點后,得到新的Future對象,再處理 future 與 loop 的回調注冊過程。
Loop
在日常開發中,會有一個誤區,認為每個線程都可以有一個獨立的 loop。實際運行時,主線程才能通過 asyncio.get_event_loop() 創建一個新的 loop,而在其他線程時,使用 get_event_loop() 卻會拋錯。正確的做法為通過 asyncio.set_event_loop() ,將當前線程與 主線程的loop 顯式綁定。
Loop有一個很大的缺陷,就是 loop 的運行狀態不受 Python 代碼控制,所以在業務處理中,無法穩定的將協程拓展到多線程中運行。
總結
實戰篇
介紹完概念和原理,我來看看如何使用,這里,舉一個實際場景的例子,來看看如何使用python的協程。
場景
外部接收一些文件,每個文件里有一組數據,其中,這組數據需要通過http的方式,發向第三方平臺,并獲得結果。
分析
由于同一個文件的每一組數據沒有前后的處理邏輯,在之前通過Requests庫發送的網絡請求,串行執行,下一組數據的發送需要等待上一組數據的返回,顯得整個文件的處理時間長,這種請求方式,完全可以由協程來實現。
為了更方便的配合協程發請求,我們使用aiohttp庫來代替requests庫,關于aiohttp,這里不做過多剖析,僅做下簡單介紹。
aiohttp
aiohttp是asyncio和Python的異步HTTP客戶端/服務器,由于是異步的,經常用在服務區端接收請求,和客戶端爬蟲應用,發起異步請求,這里我們主要用來發請求。
aiohttp支持客戶端和HTTP服務器,可以實現單線程并發IO操作,無需使用Callback Hell即可支持Server WebSockets和Client WebSockets,且具有中間件。
代碼實現
直接上代碼了,talk is cheap, show me the code~
import aiohttp
import asyncio
from inspect import isfunction
import time
import logger
@logging_utils.exception(logger)
def request(pool, data_list):
loop = asyncio.get_event_loop()
loop.run_until_complete(exec(pool, data_list))
async def exec(pool, data_list):
tasks = []
sem = asyncio.Semaphore(pool)
tasks.append(
control_sem(sem,
item.get(“method”, “GET”),
item.get(“url”),
item.get(“data”),
item.get(“headers”),
item.get(“callback”)))
await asyncio.wait(tasks)
async def control_sem(sem, method, url, data, headers, callback):
async with sem:
count = 0
flag = False
while not flag and count 《 4:
flag = await fetch(method, url, data, headers, callback)
count = count + 1
print(“flag:{},count:{}”.format(flag, count))
if count == 4 and not flag:
raise Exception(‘EAS service not responding after 4 times of retry.’)
async def fetch(method, url, data, headers, callback):
async with aiohttp.request(method, url=url, data=data, headers=headers) as resp:
try:
json = await resp.read()
print(json)
if resp.status != 200:
return False
if isfunction(callback):
callback(json)
return True
except Exception as e:
print(e)
這里,我們封裝了對外發送批量請求的request方法,接收一次性發送的數據多少,和數據綜合,在外部使用時,只需要構建好網絡請求對象的數據,設定好請求池大小即可,同時,設置了重試功能,進行了4次重試,防止在網絡抖動的時候,單個數據的網絡請求發送失敗。
最終效果
在使用協程重構網絡請求模塊之后,當數據量在1000的時候,由之前的816s,提升到424s,快了一倍,且請求池大小加大的時候,效果更明顯,由于第三方平臺同時建立連接的數據限制,我們設定了40的閥值。可以看到,優化的程度很顯著。
責任編輯:ct
評論
查看更多