一、前言
由于曾經在Linux2.6.23上工作了多年,我對這個版本還是非常有感情的(拋開感情因素,本來應該選擇longterm的2.6.32版本來分析的,^_^),本文主要就是描述Linux2.6.23內核版本中對RCU有哪些修正。所謂修正主要包括兩個部分,一部分是bug fixed,一部分是新增的特性。
?
二、issue修復
1、synchronize_kernel是什么鬼?
僅僅從符號命名上就能看出來synchronize_kernel有點格格不入,其他的rcu API都有rcu這個字符,但是synchronize_kernel沒有。該函數的功能其實很多,如下:
(1)等待RCU reader離開臨界區(這是大家都熟悉的功能)
(2)等待NMI的handler調用完成
(3)等待所有的interrupt handler調用完成
(4)其他
因此,該函數用途太多,最終被兩個函數代替:synchronize_rcu和synchronize_sched。其中synchronize_rcu用于RCU的同步。而synchronize_sched負責其他方面的功能(本質是等待系統中所有CPU退出不可搶占區)。順便一提的是這兩個函數目前的實現代碼是一樣的,不過由于語義不同,后續應該會有所修改。
2、RCU callback的處理機制
為了實時性,在2.6.11內核中,如果RCU callback數目太多,那么我們會把RCU callback分在若干次的tasklet context中執行,而不是一次性的處理完畢。這樣大大降低了調度延遲,不過,又帶來了另外一個問題:在負荷比較重的場景,由于每次處理的callback缺省是10個,實際上更多的callback請求會掛入從而導致RCU的鏈表不斷的增大,不斷的增大……
因此,在23內核上,批量處理RCU請求的算法進行了調整,增加了三個控制變量:
static int blimit = 10;?
static int qhimark = 10000;?
static int qlowmark = 100;
如果說RCU是黑盒子,那么這三個變量就是控制黑盒子工作參數的旋鈕,如果你對目前系統中的RCU模塊工作狀態不滿意,可以轉動這些旋鈕,調整一下該模塊的工作參數。blimit用來控制一次tasklet上下文中處理的RCU callback個數,類似2.6.11內核中的maxbatch。在各個CPU初始化的時候會進行下面的初始化動作:
rdp->blimit = blimit;
rdp->blimit 是真正控制算法的變量,初始化的時候等于blimit,在運行過程中,該值是動態變化的,具體如何變是根據兩個watermark來處理的:qhimark是上限水位,qlowmark 是下限水位。此外,在struct rcu_data數據結構中也增加了一個qlen成員來跟蹤目前RCU callback的數目。每次提交一個RCU callback,qlen就加一。當渡過GP之后,調用RCU callback函數的時候qlen減一。
在了解了上述基礎信息之后,我們一起看看call_rcu的代碼:
if (unlikely(++rdp->qlen > qhimark)) {?
??? rdp->blimit = INT_MAX;-----------------(1)?
??? force_quiescent_state(rdp, &rcu_ctrlblk);---------(2)?
}
如果qlen太大,超過了qhimark水位,說明提交的RCU callback太多,tasklet已經忙不過來了,這時候,必須采取兩個措施:
(1)不再限制每次tasklet context中處理的請求個數。
(2)加快GP,讓各個CPU快點通過QS。如何做呢?其實至于強迫每個CPU上都進行一個進程切換就OK了。對于本CPU可以直接調用set_need_resched,對于其他CPU,只能是調用send_ipi_message函數發送ipi message,以便讓其他CPU自己進行進程調度。
看完上限水位的處理,我們再一起看看下限水位如何處理,在rcu_do_batch中:
if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)?
??? rdp->blimit = blimit;
當我們采用了上面所說的方法雙管齊下,qlen應該會不斷的減少,當觸及下限水位的時候,將rdp->blimit的值恢復正常。
3、rcu_start_batch函數中的race issue
2.6.11中rcu_start_batch函數的部分代碼如下:
if (rcp->next_pending && rcp->completed == rcp->cur) {?
??? cpus_andnot(rsp->cpumask, cpu_online_map, nohz_cpu_mask); -------A??? rcp->next_pending = 0;?
??? smp_wmb();?
??? rcp->cur++;------------------------------B?
}
當重新啟動一個批次的RCU callback的Grace Period探測的時候,需要reset cpumask,設置next_pending以及給當前的批次號加一。這里訪問了nohz_cpu_mask這個全局變量,主要是為了減輕檢測各個CPU通過Quiescent state的工作量,畢竟那些進入idle狀態的CPU其實是沒有進行QS的檢查(注意:這里僅僅限于dynamic tick的情況,對于周期性tick而言,nohz_cpu_mask總是等于0)。不過,如果是上面的代碼邏輯,A點和B點之間,如果CPU進入了IDLE,那么這會導致已經進入idle的CPU也進入cpumask,從而延長的GP的時長。如何修正呢?很簡單,將A處的代碼放到B之后。
rcu_start_batch函數還有一個小改動,去掉了next_pending參數,改由調用者設定。
4、合并了struct rcu_ctrlblk和struct rcu_state
除了讓參數傳遞變得繁瑣,rcu控制塊分成兩個數據結構是沒有什么意義的。
?
三、新增的功能
1、增加rcu_barrier
有些特殊的場合(例如卸載模塊或者umount文件系統)需要當前的所有的RCU callback(也包括nxtlist鏈表中的剛剛提交請求的那些)都執行完畢。注意:是callback執行完畢而不是僅僅渡過Grace Period。我們可以舉一個實際的例子:比如文件系統的unmount函數中一般會釋放該文件系統特定的super block數據結構實例,但是,如果RCU callback中還需要操作這個文件系統特定的super block數據結構實例的時候(比如在callback中將該數據結構實例從鏈表中摘除),在這樣的場景中,unmount函數必須要要等到RCU callback執行完畢之后才能free該文件系統特定的super block數據結構實例。
具體如何實現倒是比較簡單。每個CPU都定義一個特別用于rcu barrier的callback請求,具體在struct rcu_data數據結構中的barrier成員:
struct rcu_head barrier;
一旦用戶調用rcu_barrier函數,那么就在各個CPU上提交這個barrier的請求。如果每一個CPU上的barrier這個RCU callback已經執行完畢,那么就說明系統中所有的(在調用rcu_barrier那一點)callback都已經執行完畢。為了跟蹤每一個CPU上的barrier執行情況,需要一個counter:
static atomic_t rcu_barrier_cpu_count;
該counter初始值是0,提交barrier請求的時候該count加一,渡過Grace Period之后,在callback函數中減一,當該counter減到0值的時候,說明所有的CPU的barrier callback函數都執行完畢,也就意味著當前的所有的RCU callback都執行完畢。
2、增加rcu_needs_cpu
在RCU模塊發展的同時,其他的內核子系統也不斷在演進,例如時間子系統。當一個CPU由于無事可做而進入idle的時候,關閉周期性的tick可以節省功耗,這也就是傳說中的tickless(或者dynamic tick)特性。我們首先假設CPU A處于這樣的狀態:
(1)沒有新的請求,即nxtlist鏈表為空
(2)curlist鏈表有待處理的批次,雖然分配了批次號,但是還沒有啟動該批次,也就是說該批次是pending的
(3)當前批次在本cpu的QS狀態已經檢測通過
(4)沒有處理中的callback請求,即donelist鏈表為空
在這種狀態下,周期性tick到來的時候,其實沒有什么相關的RUC事情要處理,這時候,__rcu_pending返回0。在這種情況下,似乎停掉tick應該是OK的,但是假設我們停掉了CPU A的tick,讓該CPU進入idle狀態。如果CPU B是最后一個pass QS的CPU,這時候,該CPU會調用rcu_start_batch啟動pending的那個批次(CPU A的curlist上的請求就是該批次的),由于要啟動一個新的批次進行GP的檢測,因此在該函數中會reset cpumask,代碼如下:
cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
如果CPU A進入了idle state,并停掉了tick,那么cpumask將不處理CPU A的QS狀態,但是,curlist上的請求其實就是該批次的。怎么辦?應該在curlist仍然有請求的時候,禁止該CPU進入idle state并停掉tick,因此時間子系統需要RCU歐酷提供一個接口函數,用來收集RCU是否還需要該CPU的信息,這個接口就是rcu_needs_cpu。
3、增加srcu
SRCU其實就是sleepable RCU的縮寫,而我們常說的RCU實際上是classic RCU,也就是在reader critical section中不能睡眠的,其在臨界區內的代碼要求是spin lock一樣的。也正因為如此,我們可以在進程調度的時候可以判斷該CPU的QS已經通過。SRCU是一個RCU的變種,從名字上也可以看出來,其reader critical section中可以block。一旦放開了這個口子,classic RCU所搭建的一切轟然倒塌,因此,直覺上SRCU是不可能實現的:
(1)一旦在reader critical section中sleep,那么GP就變得非常長了,一直要等到該進程被喚醒并調度執行,這么長的GP系統怎么受得了?畢竟系統需要在GP渡過之后,在callback中釋放資源
(2)進程切換的時候判斷通過QS的機制失效
不過,realtime linux kernel要求不可搶占的臨界區要盡量的短,在這樣的需求背景下,spin lock的臨界區都因此而修改成為preemptible(只有raw spin lock保持了不可搶占的特性),RCU的臨界區也不能豁免,必須作出相應的改動,這也就是srcu的源由。
既然sleepable RCU勢在必行,那么我們必須要面對的問題就是如何減少RCU callback請求的數量,要知道SRCU的GP可能非常的長。解決方法如下:
(1)不再提供GP的異步接口(也就是call_rcu API),僅僅保留同步接口。如果提供了call_srcu這樣的接口,那么每一個使用rcu的線程可以提交任意多的RCU callback請求。而同步接口synchronize_srcu(類似RCU的synchronize_rcu接口)會阻塞當前的thread,因此可以確保一個線程只會提交一個請求,從而大大降低請求的數目。
(2)細分GP。classic RCU的GP是一個批次一個批次的處理,一個批次的GP是for整個系統的,換句話說,一個RCU reader side臨界區如果delay了,那么整個系統的RCU callback都會delay。對于SRCU而言,雖然GP比較長,但是如果能夠將使用SRCU的各個內核子系統隔離開來,每個子系統都有自己GP,也就是說,一個RCU reader side臨界區如果delay了,那么只是影響該子系統的RCU callback請求處理。
根據上面的思路,在linux2.6.23內核中提供了SRCU機制,提供如下的API:
int init_srcu_struct(struct srcu_struct *sp);?
void cleanup_srcu_struct(struct srcu_struct *sp);?
int srcu_read_lock(struct srcu_struct *sp) __acquires(sp);?
void srcu_read_unlock(struct srcu_struct *sp, int idx) __releases(sp);?
void synchronize_srcu(struct srcu_struct *sp);
由于分隔了各個子系統的GP,因此各個子系統需要一個屬于自己的struct srcu_struct數據結構,可以靜態定義也可以動態分配,但是都需要調用init_srcu_struct來初始化。如果struct srcu_struct數據結構是動態分配,那么在free該數據結構之前需要調用cleanup_srcu_struct來釋放占用的資源。srcu_read_lock和srcu_read_unlock用來界定SRCU的臨界區范圍,struct srcu_struct數據結構做為該子系統的SRCU句柄傳遞給srcu_read_lock和srcu_read_unlock是可以理解的,但是idx是什么鬼?srcu_read_lock返回了idx,并做為參數傳遞給srcu_read_unlock函數,告知GP相關信息,具體后面會進行描述。synchronize_srcu和synchronize_rcu行為類似,都是阻塞當前進程,直到渡過GP之后才會繼續執行,不同的是,synchronize_srcu需要struct srcu_struct參數來指明是哪一個子系統的SRCU。
OK,了解了原理和API之后,我們來看看內部實現。對于一個具體的某個子系統中的SRCU而言,三個控制數據就可以完成SRCU的邏輯:
(1)用一個全局變量來跟蹤系統中的GP。為了方便,我們可以給GP編號,從0開始,每渡過一個GP,該ID就會加1。如果當前線程阻塞在synchronize_srcu,等到ID=a的GP過去,那么a+1就是pending的GP(也就是下一個要處理的GP ID)。struct srcu_struct中的completed成員就是起這個作用的。
(2)記錄各個GP中的位于reader critical section中的數目。當然了,隨著系統的運行,各個GP不斷的渡過,ID不斷的增加,但是在某個具體的時間點上,實際上不需要記錄每一個GP的reader臨界區的counter,只需要記錄current和next pending兩個reader臨界區的counter就OK了。為了性能,在2.6.23內核中,這個counter是per cpu的,也就是struct srcu_struct中的per_cpu_ref成員,具體的counter定義如下:
struct srcu_struct_array {?
??? int c[2];?
};
c[0]和c[1]的counter是不斷的toggle的,如果c[0]是current,那么c[1]就是next pending,如果c[1]是current,那么c[0]就是next pending,具體如何選擇是根據struct srcu_struct中的completed成員的LSB的那個bit決定的。
根據上面的描述,我們來進行邏輯解析。首先看srcu_read_lock的,該函數的邏輯很簡單,就是根據next pending ID(保存在completed成員)的LSB bit確定counter的位置,給這個counter加一。當然srcu_read_unlock執行相反的動作,略過。由于srcu_read_lock和srcu_read_unlock之間有可能會調用synchronize_srcu導致鎖定當前pending的狀態并將GP ID(也就是completed成員)加一,因此,srcu_read_unlock需要一個額外的index參數,用來告知應該選擇哪一個counter。
synchronize_srcu的邏輯也很簡單,首先要確定當前GP ID。也就是說,之前next pending的那個就變成current(說的很玄,本質就是選擇哪一個counter,c[0]還是c[1]),completed++讓隨后的srcu_read_lock調用更換到另外一個counter中,成為next pending。然后等待current的counter在各個CPU上的計數變成0。一旦counter計數等于0則返回,說明GP已經過去。
?
四、參考文獻
1、2.6.23 source code
2、https://lwn.net/Articles/202847/
評論
查看更多