關于epoll
驚群問題,什么是驚群呢?
比如我們在寫代碼過程中,使用兩個線程的epoll
監聽socket
,當socket
上有事件發生時,兩個epoll
都會被喚醒,導致會操作同一個socket
,這就是驚群,那如何解決呢?
(1)使用EPOLLEXCLUSIVE
:EPOLLEXCLUSIVE
是epoll
的擴展選項,它允許一個線程獨占一個epoll
實例,從而避免了epoll
的驚群問題;
(2)使用EPOLLONESHOT
:對于注冊了EPOLLONESHOT
事件的文件描述符,操作系統最多觸發一個可讀,可寫或者異常事件,且觸發一次,這樣就能確保一個線程獲取事件并處理,但是需要注意的是對于監聽類型(如accept
)不能使用EPOLLONESHOT
,否則就不能持續監聽連接,對于處理完了的非監聽事件,需要重置EPOLLONESHOT
;
第一部分:信號
1、發送信號給進程
#include < sys/types.h >
#include < signal.h >
int kill(pid_t pid, int sig);
2、信號回調函數
#include < signal.h >
typedef void (&__sighandler_t) (int);
__sighandler_t signal(int sig, __sighandler_t _handler);
int sigaction(int sig, const struct sigaction *act, struct sigaction *oact);
(1)__sighandler_t
信號處理的函數指針,其中處理參數為觸發信號當前值,其中有兩個默認宏(SIG_DFL:使用信號默認處理,SIG_IGN:忽略目標信號);
(2)signal
注冊信號回調處理函數,返回值為一個函數指針,含義是這個信號上一次處理的回調函數或者是系統默認的處理函數,這里目的是讓用戶可以自己恢復信號處理方式,比如系統對于一些信號是殺掉進程的,這里就應該處理完自己的回調邏輯后再調用系統默認行為;
(3)sigaction
函數的功能是檢查或修改與指定信號相關聯的處理動作,使用樣例如下:
#include < stdio.h >
#include < unistd.h >
#include < stdlib.h >
#include < signal.h >
int main()
{
struct sigaction newact, oldact;
newact.sa_handler = SIG_IGN; // 設置信號忽略,也可以設置為處理函數
sigemptyset(&newact.sa_mask);
newact.sa_flags = 0;
int count = 0;
pid_t pid = 0;
sigaction(SIGINT, &newact, &oldact); // 原始的備份到oldact,為后續的處理恢復
pid = fork();
if (pid == 0)
{
while(1)
{
printf("child exec ...n");
sleep(1);
}
return 0;
}
while (1)
{
if (count++ > 3)
{
sigaction(SIGINT, &oldact, NULL); // 恢復父進程信號處理方式
kill(pid, SIGKILL); // 父進程發信號給子進程
}
printf("father exec ...n");
sleep(1);
}
return 0;
}
第二部分:定時器
在Linux網絡編程中,定時器的作用主要是管理定時任務,處理過期連接,檢測超時隊列等,那我們可以通過哪些方式實現定時器呢?
1、利用系統API
...
setsockopt(socketfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);
setsockopt(socketfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, len);
int number = epoll_wait(fd, events, MAX_EVENT_NUMBER, timeout);
...
通過使用socket的參數,設置連接句柄的發送和接收數據超時時間,可以實現定時處理:
(1)SO_SNDTIMEO
發送數據超時時間,根據timeout
設置;
(2)SO_RCVTIMEO
接收數據超時時間,根據timeout
設置;
IO復用的參數中都帶了一個timeout
參數,可以設置來達到定時觸發分支邏輯,比如epoll_wait
;
2、簡單的定時器
(1)啟動一個線程實現定時器,具體實現如下圖:
- 主線程啟動,開始執行任務,這里可以是網絡收發或者其他;
- 啟動一個線程,做定時任務處理使用;
- 主線程需要增加定時任務,可以將任務封裝為task,添加到任務隊列中;
- 同時通知定時線程,隊列中有任務了,這里通知機制可以是信號量或者廣播方式;
- 定時線程取出隊列中任務,判斷當前任務是否過期,如果過期就執行,沒有過期就繼續放入任務隊列中,同時這里需要讓線程等待隊列中距離下一個周期最短的時間,繼續取隊列任務;
(2)使用epoll_wait
設置timeout,是在網絡事件觸發的定時器中最方便的方式,具體邏輯如下:
...
start_timer = ... // 開始執行時間
while (true) {
int number = epoll_wait(epfd, events, MAX_EVENT_NUMBER, timeout);
for (...) {
...
// 處理連接任務
...
}
end_timer = ... // epoll_wait返回并處理任務時間
// 處理定時任務,判斷當前時間是否在一個timeout
if (end_timer - start_timer > timeout) { // 這里是偽代碼,具體時間判斷可以參考linux結構體
...
// 啟動線程執行定時任務邏輯
...
}
}
3、時間輪
時間輪是一種高效定時器,通過類似圓盤的形式定義每個tick,定時轉動圓盤,假設每次tick時間為si
,一個時間輪有N個tick,那么執行轉動一圈時間為N*si
;
現在插入一個任務,需要to1
時間周期后執行,這里就分情況處理:
(1)如果to1
< N*si
,則需要分配到(當前時間輪的位置 + to1
/ si
)的位置上,等待自然tick到達執行當前to1
的定時任務;
(2)如果to1
> N*si
,則需要分配到(當前時間輪的位置 + (to1
% N) / si
+ N)的位置上,由于to1
執行時間超過一輪的周期,所以需要等待多輪轉動后才能執行,那如何處理呢?因此我們將每個輪的tick上掛一個鏈表,這個鏈表的節點表示到達這個tick需要執行的任務to1
,這里的節點有可能是大于一個輪轉動的事件周期,也可能就是當前輪時間周期內執行,我們只需要當事件到達tick時,取出鏈表遍歷鏈表節點to1
,判斷是否是當前事件周期內執行,如果是摘除鏈表節點然后執行任務,如果不是則重新計算to1
需要多久后執行,計算方法就和上面的一樣(當前時間輪位置 + ((to1
- 鏈表最小的周期時間) % N) / si
+ N),然后將當前鏈表節點重新放回;
事件輪
4、時間堆
堆的數據結構應該大家都比較熟悉了,堆是一種滿足以下條件的樹:
- 堆中某個節點的值總是不大于或不小于其父節點的值;
- 堆總是一棵完全二叉樹;
- 添加堆節點的時間復雜度O(lgn),刪除節點是O(lgn),獲取節點是O(1);
時間堆
(1)循環線程讀取最小時間堆的堆頂元素;
(2)取出最小節點,判斷當前事件是否過期,如果過期則繼續執行,否則不處理;
(3)將最小節點對應的事件丟給執行線程執行;
這里最小時間堆節點在代碼實現中可以用一個數組表示,使用完全二叉樹的排列。
#include< iostream >
void heapify(int arr[], int n, int i) {
if (i >= n) return;
int min_node = i;
int lson = i * 2 + 1;
int rson = i * 2 + 2;
if (lson < n && arr[min_node] > arr[lson]) { // 和左孩子比較,找到最小節點
min_node = lson;
}
if (rson < n && arr[min_node] > arr[rson]) { // 和右孩子比較,找到最小節點
min_node = rson;
}
if (min_node != i) {
swap(arr[min_node], arr[i]);
heapify(arr, n, min_node); // 遞歸處理
}
}
void heapSort(int arr[], int n) {
// 反向取出最后一個節點
int lastNode = n - 1;
int parent = (lastNode - 1) / 2;
for (int i = parent; i >= 0; i--) {
heapify(arr, n, i);
}
for (int i = n - 1; i >= 0; i--) {
swap(arr[i], arr[0]);
heapify(arr, i, 0); // 調整堆節點
}
}
int main() {
int arr[5] = { 70, 41, 10, 90, 18, 26 };
heap_sort(arr, sizeof(arr) / sizeof(arr[0]));
for (int i = 0; i < sizeof(arr) / sizeof(arr[0]); i++) {
cout < < arr[i] < < endl;
}
return 0;
}
評論
查看更多