今兒我們就從源碼入手,來幫助大家簡單理解一下 epoll 的實現原理,并在后邊分析一下,大家都說 epoll 性能好,那到底是好在哪里。
epoll 簡介
1、epoll 的簡單使用
我們先來看下 epoll 的簡單使用。
首先來看下不用 epoll 的時候,我們可能會怎樣去創建一個 socket 鏈接的偽代碼:
socket_fd = socket(AF_INET,SOCK_STREAM,0);
// 給 socket 綁定本地端口和地址
local_addr.sin_port = htons(PORT);
local_addr.sin_addr.s_addr = INADDR_ANY;
ret = bind(socket_fd,(struct sockaddr*)&local_addr,sizeof(struct sockaddr_in));
// 監聽客戶端發來的鏈接
ret = listen(socket_fd,backlog);
// 死循環
for(;;){
// 當用戶調用了 connect 后服務端會觸發 accept
accept_fd = accept( socket_fd, (struct sockaddr *)&remote_addr, &addr_len );
for(;;){
// 從線程池里撈一條線程然后把這個 accept 交給這條線程
// 然后線程中去做 recv()
get_thread_from_pool(accept_fd)
}
}
不同語言可能寫法都不太一樣,但是大概流程都是先創建個 socket,然后給 socket 綁定上本地端口和 ip,以便客戶端能通過這倆信息找到自己,之后監聽這個 socket,再然后死循環中用 accept 來接受用戶的 connect,接收到之后,把鏈接的 fd 扔給一條新的線程中去做 read 之類的操作。
我們再來簡單看下用 epoll 的時候大概會怎么寫:
// 創建 socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
// 給 socket 綁定地址和 port 并監聽
myaddr.sin_port = htons(PORT);
myaddr.sin_addr.s_addr = INADDR_ANY;
bind(sockfd, (const struct sockaddr *)&myaddr, sizeof(myaddr))
listen(sockfd)
// 創建 epoll
int efd = epoll_create(1);
// 創建 epoll 的事件
struct epoll_event evt = {
.events = EPOLLIN,
.data.fd = sockfd,
};
// 把 socket 交給 epoll 做托管
epoll_ctl(efd, EPOLL_CTL_ADD, sockfd, &evt)
struct epoll_event events[MAX];
while (1) {
// 觸發 epoll 的等待, 等用戶的 connect 以及 send
int num = epoll_wait(efd, events);
for (i = 0; i < num; i++) {
if (events[i].events & EPOLLIN) {
// 如果是 socketfd 收到了 connect
if (sockfd == events[i].data.fd) {
// 就把這條鏈接的 fd 也放到 epoll 中
int cn_fd = accept(sockfd, NULL, NULL);
struct epoll_event ac_evt = {
.events = EPOLLIN,
.data.fd = cn_fd,
};
epoll_ctl(efd, EPOLL_CTL_ADD, cn_fd, &ac_evt);
} else {
// 如果是收到了用戶的 send, 那就從線程池里撈出一條線程
// 然后里頭再去做 read 之類的操作
get_thread_from_pool(events[i].data.fd);
}
}
}
}
}
上邊的代碼簡單來講也是先創建 socket,然后創建 epoll,之后將 socket 交給 epoll 管理,隨后啟動死循環,當用戶 connect 了之后再把這個 accept 的 fd 同樣托管給 epoll,這樣當用戶發消息過來之后就會從線程池中撈一條線程,然后用這條線程去做 read 之類的操作。
用以及不用 epoll 大概就是上邊這兩種情況,這里都是偽代碼,具體一點的代碼可以很容易搜到,大家如果想自己試的話可以去搜一搜,這里就簡單帶過了。
2、epoll 的系統調用
epoll 主要有仨系統調用:
- epoll_create: 創建一個 epoll 對象
- epoll_ctl: 把要管理的對象添加到 epoll 中
- epoll_wait: hang 住當前線程等待被托管的東西里有 IO 發生
epoll 實現原理
epoll 的實現原理可能會有點繞,如果不想看中間那大坨源代碼的話,大家可以直接跳到后邊 “幾個系統調用總結” 這部分來看最后的總結。
1、epoll 是文件系統
首先 epoll 深得 unix 設計哲學的精髓,他也和 socket 一樣,是個文件系統,它的主要系統調用實現在內核源碼的 “fs/eventpoll.c” 文件中。
在之前的文章中介紹過 Linux 的文件系統以及 sockfs,并且當時提到文件系統有基于磁盤的,也有基于內存的。當時介紹的 sockfs 就是基于內存的文件系統。很明顯,這里的 epoll 文件系統也是基于內存的一種文件系統。
我們在之前的文章中提到,對于基于磁盤的文件系統比如 ext4 等他們都在內存中有自己的 inode 數據結構,這個 inode 數據結構上保存了很多對當前文件系統的操作方法以及屬性。然后用戶態在使用的時候,大概就是在線程的 task_struct 結構體上找到 files 屬性中的 fd_array 或者 fd_table,然后通過 fd 找到對應的 file 結構體,之后通過 file 結構體,就能找到對應的 inode 然后做一些文件相關的操作。
而對于類似 sockfs 或者 epoll 這種基于內存的文件系統來講,他們雖然也有 inode 屬性,但對他們來講,這個 inode 是一種 “假的” inode,也就是說對于 epoll 來講,它的 inode 作用不大,而真正有用的,是掛載在 file 結構體上的 private_data 屬性,這點它和 socket 一樣。
到這兒為止,如果感覺不是很清晰的話,可以去看下之前介紹 sockfs 的文章,或者也可以簡單地記,就是:
- epoll 和 socket 一樣也是一種文件系統
- 當用戶調用了 epoll_create 之后會返回 epoll 的 fd
- 通過這個 fd,可以在 task_struct 的 files 上找到對應的 epoll 的 file 結構體
- 在這個 file 結構體上可以拿到一個 private_data 屬性,這個 private_data 屬性的值,就是 epoll 內核中的數據結構。至于這個結構是什么東西,咱們后邊再說。
2、epoll_create
首先我們來分析一下想使用 epoll 的話,一定要走的第一個系統調用 “epoll_create”。
上圖是源碼中的實現,我們來簡單看下:
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
// 創建了一個 eventpoll 結構體
error = ep_alloc(&ep);
// 生成文件描述符
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
// 創建 epoll 對應的 file 結構體
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));
ep->file = file;
// 給它綁上 fd
fd_install(fd, file);
return fd;
}
上邊是把這個系統調用最外層的精華摘了出來,主要做的事兒,就是創建了一個 eventpoll 結構體,咱們上一小節說的通過 fd 找到 file 然后找到的那個 private_data 屬性,其實就是這個 eventpoll 結構體。
通過上邊源碼可以看到,這個 eventpoll 結構體作為 priv 參數交給了 “file->private_data” 方法。另外在源代碼中也可以看到,epoll 對應的這個 file 結構體,是用一個叫 “alloc_file_pseudo” 的方法創建的,其中這個 “pseudo” 是 “假的” 的意思,這也表明了對于 epoll 這種基于內存的文件系統,它的 file 結構體相比基于磁盤的文件系統沒有那么 “沉”。
接下來我們回到上邊創建完了 eventpoll 結構體之后,epoll_create 系統調用中會獲取一個未使用的文件描述符,然后給 epoll 創建一個 file 結構體,并把這個 file 結構體和 fd 做一個 “fd_install”,也就是給綁定一下子,這樣通過這個 fd 就能在當前線程的 task_struct 上找到對應的這個 eventpoll 數據結構了。
上邊我們反復提到 eventpoll 這個結構體,從 epoll_create 系統調用的源碼也能看出,這個系統調用主要就是創建出了這個一個結構體,并且能讓我們通過 fd 找到他,那他到底是個啥呢?我們來下源碼:
這個 eventpoll 結構體上有很多屬性,其中最重要的,我們只需要記住三個就好:
- wq: 一個存放等待事件的隊列
- rdllist: 一個存放就緒事件的隊列
- rbr: 一顆紅黑樹
至于這仨分別是干啥的,一會兒在后邊的文章中就能看到了。
這里簡單總結一下,使用 epoll 的第一步!調用 epoll_create 方法,該方法做的事情就是創建了一個 eventpoll 結構體,并且能讓用戶態通過 fd 找到這個 eventpoll 結構體。這個結構體上重點有仨屬性,一個用來存放等待事件的隊列,一個用來存放就緒事件的隊列,以及一顆紅黑樹。
2、epoll_ctl
接下來我們來看使用 epoll 的第二步,使用 epoll_ctl 系統調用,將要托管的 socket fd 交給 epoll 托管。代碼大概長這樣:
這一步就是將 socket 交給 epoll 管理,我們來簡單介紹下它里頭做了什么事兒,這里可能有些邏輯會比較繞,大家可以自己再去看看源碼加深一下理解:
struct eventpoll *ep;
// 根據 epoll 的 fd 找到對應的 eventpoll 的 file 結構體
f = fdget(epfd);
// 根據 socket 的 fd 找到對應的 socket 的 file 結構體
tf = fdget(fd);
// 檢查是否支持 poll
if (!file_can_poll(tf.file))
goto error_tgt_fput;
// 找到對應的 eventpoll 結構體
ep = f.file->private_data;
switch (op) {
// 添加一個 socket 到 epoll 中
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
}
break;
case EPOLL_CTL_DEL:
case EPOLL_CTL_MOD:
}
}
上邊是 epoll_ctl 這個系統調用的主要代碼,里頭做的事情乍一看也很簡單:
- 根據 epoll 的 fd 找到對應的 file 結構體,這個結構體上能找到 eventpoll 結構體
- 根據 socket 的 fd 找到對應的 file 結構體,這個結構體上能找到 socket 結構體
- 調用了 ep_insert 方法,將 socket 插入到 eventpoll 結構體中
下面我們來看看 “ep_insert” 這個方法做了啥:
{
// 初始化一個 epitem 數據結構
struct epitem *epi;
// 初始化一個等待隊列,但它其實是個 struct 結構體
// 上邊只有一個 poll_table 結構體和 epitem 結構體
struct ep_pqueue epq;
// 初始化 epitem 結構上的 pwqlist 屬性
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
// 這里只做了 ffd->file = file 以及 ffd->fd = fd
ep_set_ffd(&epi->ffd, tfile, fd);
// 給等待隊列的 epitem 賦值
epq.epi = epi;
// 給等待隊列的 poll_table 賦值
// 賦的值可以簡單地認為就是后邊這個叫做 “ep_ptable_queue_proc” 的函數
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 這里會調用上邊那行的 “ep_ptable_queue_proc” 方法
// 作用可以簡單理解成給 epi 的 pwqlist 這條鏈表上
// 添加了一個叫做 “ep_poll_callback” 的回調函數
revents = ep_item_poll(epi, &epq.pt, 1);
// 把這個 epitem 插入到 eventpoll 的紅黑樹里
ep_rbtree_insert(ep, epi);
}
這個 ep_insert 方法中,從宏觀上來看,主要做的事情就是創建了一個紅黑樹的節點,這個節點上保存了用戶傳進來的 socket 相關的信息,然后主要還調用了一個 ep_item_poll 用來初始化等待隊列。
這里最最最主要的操作其實就是這個 “ep_item_poll” 方法了,這個方法主要是往 epi 的 pwqlist 這條鏈表上掛了個回調函數名字叫 “ep_poll_callback”,那么這個 epi 的 “pwqlist” 又是誰,是干嘛的呢?這里我們直接揭秘,其實這個 “pwqlist” 就是用戶傳進來的那個 socket 身上的 “等待隊列”。我們來詳細看下源碼,這里會比較繞,我盡量說得簡單點:
pt->_key = epi->event.events;
if (!is_file_epoll(epi->ffd.file))
return vfs_poll(epi->ffd.file, pt) & epi->event.events;
}
static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) {
return file->f_op->poll(file, pt);
}
雖然我們上邊說這塊兒會比較繞,但實際上這個 “ep_item_poll” 的源碼還算是比較短的,我們把最重要的摘出來其實就這么幾行,可以看到它里頭調用了 vfs_poll 方法,vfs_poll 方法中又去調用了 file -> f_op -> poll 方法。
從這里我們就可以看出,如果你的文件系統實現了 poll 方法的話,其實理論上是都可以被 epoll 來托管的。那么這里這個 poll 方法是誰呢?這里不賣關子直接說,其實這個 file -> f_op -> poll 方法就是 tcp 協議自己實現的 poll 方法,也就是 “tcp_poll” 方法。
這里簡單解釋一下這個 tcp_poll 方法是怎么來的:首先大家都知道 socket 這個東西,但其實 socket 之下還有更重要的一個叫做 “sock” 的結構。對于這個 socket 和 sock 應該怎么理解呢?其實可以把 socket 理解成 “協議簇”,把 sock 理解為真正的 “協議”,socket 是用戶層的概念,而 sock 則是真的要和一種底層的協議做綁定的,比如 tcp 協議或者 udp 協議。然后不同的協議實現的什么 read 方法,send 方法,poll 方法等,就會被掛載到這個 sock 結構體上,也就是說,當用戶在用戶側調用了一個什么 send 方法或者 recv 方法啥的,真正的調用邏輯是 “socket -> sock -> ops -> tcp_recv(或者 udp_recv)”。所以上邊的 ep_item_poll 方法里頭調用的 poll 方法,就是 socket -> sock -> ops -> tcp_poll 方法。
也就是說,這里可以簡單地理解一下,當用戶態調用了 “epoll_ctl” 并把一個 socket 傳進來的時候,這個系統調用會調用 socket 下層的 poll 接口,而實現了這個 poll 接口的,就是下層真正的協議,比如 tcp 協議,此時就會調用 tcp 協議自己實現的 tcp_poll 方法。
好了回過頭繼續看這個 tcp_poll 方法,注意 “ep_item_poll” 在調用這個 tcp_poll 方法的時候,把一個 “poll_table” 類型的屬性作為參數傳給了 tcp_poll,這個 poll_table 是誰呢,我們暫時回頭去看下 “ep_insert” 方法中的那個 “init_poll_funcptr” 方法:
可以看到 init_poll_funcptr 接收了一個 “ep_ptable_queue_proc” 方法,并把這個方法放到 “poll_table” 這個結構體的 “_qproc” 屬性上,這里大家先強行記住這個 “_qproc”,記住這個 poll_table 結構體上有個 _qproc 屬性,并且指向了一個叫 “ep_ptable_queue_proc” 的函數。
然后我們往下看,上邊說到 “ep_item_poll” 會調用 tcp 協議實現的 poll 方法并把這個 poll_table 作為參數傳進去,那我們來看看 tcp_poll 中實現了啥:
我們順著 tcp_poll 的這條調用鏈看下去,最終在 poll_wait 中看到了一個眼熟的東西,誒!就是上邊讓大家記住的那個 “_qproc” 屬性,它指向了 “ep_ptable_queue_proc” 方法。
嗷!到這兒我們能反應過來,在 epoll_ctl 這個系統調用中,調用了底層協議的 poll 方法,并且把 epoll 那層的一個函數作為參數傳給了底層協議的 poll 方法,然后底層協議的 poll 方法又會調用這個函數。
是不是覺得有點繞了,還有更繞的。
我們來看上圖那個 poll_wait 函數,你看它調用 _qproc 方法時候傳的參數是個誰?其中是不是有個叫做 “wait_address” 的東西,然后您再往上看上一張圖的 “sock_poll_wait” 方法,在調用這個 “poll_wait” 方法時,傳進來的這個 “wait_address” 是誰呢?沒錯,正是 socket 的 wq.wait,也就是 socket 上的一個等待隊列。
好,到這兒我們在梳理一下流程,當用戶調用 “epoll_ctl” 并傳進來一個 socketfd 的時候,epoll ctl 內部會調用這個 socket 底層的協議實現的 poll 方法,并把自己的一個 poll_table 屬性傳進去,然后在底層協議比如 tcp 協議實現的 poll 方法中,又會調用上層的 epoll_ctl 傳進來的這個 poll_table 上的 _qproc 方法,并把自己這個 socket 身上的等待隊列作為參數傳給這個 _qproc 方法,而這個 _qproc 方法指向的是 “ep_ptable_queue_proc” 這個函數。所以接下來我們來看 “ep_ptable_queue_proc” 方法:
struct file *file,
wait_queue_head_t *whead,
poll_table *pt
) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
add_wait_queue(whead, &pwq->wait);
}
init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func)
{
wq_entry->flags = 0;
wq_entry->private = NULL;
wq_entry->func = func;
}
我們撿主要的看,這個函數中,最重要的兩個步驟就是首先是調用 “init_waitqueue_func_entry” 方法,這個方法很簡單,直接貼在上邊了,就是把 “ep_poll_callback” 這個方法給掛到 pwp->wait 上邊。接下來調用 “add_wait_queue”,把這個掛載了 “ep_poll_callback” 方法的 pwp->wait 結構給掛載到 whead 這個隊列上,那這個 whead 是誰呢,你一定能想到,就是上邊 tcp_poll 在調用這個 “ep_ptable_queue_proc” 方法時傳進來的 socket 自己身上的 wq 等待隊列。
到這兒,我們總結一下 epoll_ctl 都做了啥:
- 在 epoll_ctl 中調用了傳進來的那個 socket 底層協議的 poll 方法,比如底層協議如果是 tcp 的話,那這個方法就是 tcp_poll
- epoll_ctl 在調用 tcp_poll 時,把自己這邊的一個回調函數傳給了 tcp_poll
- tcp_poll 中又會調用上層 epoll_ctl 傳給他的這個回調函數,并且 tcp_poll 把自己的 socket 身上的等待隊列作為參數傳給這個 epoll_ctl 傳下來的回調函數
- 這個 epoll_ctl 中的會調用拿到了底層協議自己的 wq 等待隊列后,往這個等待隊列中推入了一個數據結構,這個數據結構中只有一個回調函數,叫 “eo_poll_callback”
- 最后把這個 socket 插到 epoll 內部的紅黑樹上
好了到這兒我們就把 epoll_ctl 主要做的事兒都說完了。可以發現這套流程如果要是自己一點點看的話,確實會比較繞,因為它里邊相當于是上層的 epoll 和下層的協議都是可以替換的,只要下層協議實現了 poll 方法,然后上層能把自己的回調注入進入,之后下層的 poll 方法再把自己的等待隊列注入給上層的回調函數,這就 ok 了,有一種雙向依賴注入的感覺。還挺(má)妙(fán)的是吧。
3、epoll_wait
說完了 epoll_create 和 epoll_ctl 我們來看是用 epoll 的最后一個重要的系統調用 “epoll_wait”。
epoll_wait 主要調用的是 “do_epoll_wait” 中的 “ep_poll” 方法,我們來看一下:
int maxevents, long timeout)
{
// 先判斷 eventpoll 上的就緒隊列是不是有東西
// 有的話直接吐給用戶
if (!ep_events_available(ep))
ep_busy_loop(ep, timed_out);
eavail = ep_events_available(ep);
if (eavail)
goto send_events;
// 使用 current 初始化一個等待項
init_waitqueue_entry(&wait, current);
// 把等待項給干到 eventpoll 結構體的 wq 隊列上
__add_wait_queue_exclusive(&ep->wq, &wait);
for (;;) {
// hang 住當前線程
set_current_state(TASK_INTERRUPTIBLE);
}
}
對于這個 “ep_poll” 整體上來看,做的事情比較直觀,主要就是:
- 先看看 epoll 里頭的就緒隊列是不是已經有東西了。還記得最開始我們介紹 eventpoll 時候說它里頭有三個重要的東西,一個是 “就緒隊列”,一個是 “等待隊列”,還有一顆 “紅黑樹”,此時看的就是這個 eventpoll 中的 “就緒隊列”。
- 就緒隊列里沒東西的話會創建一個所謂的 “等待項”,這個是啥呢,后邊再說。
- 創建好等待項之后把這個等待項給掛載到 eventpoll 的 “等待隊列” 也就是那個 wq 上。
- 將當前線程從操作系統的調度隊列中拿出來,hang 住當前線程。(current 總是指向當前正在運行的線程,內部是通過匯編+寄存器實現的,這里可以當成一個全局的環境變量)
所以簡單來說 epoll_wait 做的最主要的事兒就是往內部的 “等待隊列” 中插入了一個 “等待項” 并且讓當前線程睡覺。接下來我們來看上邊比較重要的一個 “等待項” 是啥
簡單理解,所謂等待項就是一個結構體,上邊會放一個 private 屬性,該屬性指向 current 也就是當前線程的 task_struct 結構體,還有個 func 屬性指向一個名叫 “default_wake_function” 的回調函數。
然后這個等待項,就會被插入到 eventpoll 的 wq “等待隊列” 上
到這兒為止,我們就把 epoll_wait 主要做的事情也說完了。
4、幾個系統調用總結
接下來我們簡單總結一下,epoll_create 和 epoll_ctl 以及 epoll_wait 都大概做了哪些事情:
首先是 epoll_create,它是使用 epoll 的第一步,它里邊主要是創建了三個數據結構,一個 “等待隊列”,一個 “就緒隊列”,以及一顆 “紅黑樹”。
如果用偽代碼表示的話,那么當你調用了 epoll_create 之后,此時通過這個系統調用返回的 fd,你能拿到這么一個結構體:
等待隊列 = [],
就緒隊列 = [],
紅黑樹 = [],
}
然后是 epoll_ctl,它允許你將實現了 poll 方法的文件系統作為參數交給 epoll 管理,epoll_ctl 內部會調用真實的底層協議實現的 poll 方法,并把 epoll 這一層的一個回調函數作為參數傳給 poll 方法,然后底層協議的 poll 方法中會調用 epoll 傳進來的那個回調函數,并且協議會把自己身上的等待隊列作為參數交給 epoll 的那個回調函數來處理。而這個回調函數中則會創建一個等待項,這個等待項上有個回調函數叫 “ep_poll_callback”,并且把這個等待項給塞到底層協議傳過來的等待隊列上。
如果用偽代碼表示的話,那么當你調用了 epoll_ctl 并把一個 socket 交給它管理之后,此時 fd 對應的結構體就變成了這樣,它的紅黑樹中會多一個節點:
等待隊列 = [],
就緒隊列 = [],
紅黑樹 = [
socket1 = {
等待隊列 = [{ callback: ep_poll_callback }]
}
],
}
最后是 epoll_wait,它會 hang 住當前線程,以等待被托管的 fd 身上有 IO 事件發生。它內部會創建一個等待項,注意這個等待項和上邊 epoll_ctl 中的那個等待項不是一個東西,上邊 epoll_ctl 的等待項是塞給了 socket 的等待隊列,而且里頭只有一個叫 “ep_poll_callback” 的回調函數,而這里的 epoll_wait 的等待項是真的塞給了 epoll 自己的 eventpoll 上的等待隊列,并且它上邊除了有個一個叫做 “default_wake_function” 的回調函數,同時還保存了 current 也就是當前線程對應的 task_struct 結構體。都弄完了之后就會出讓 cpu 讓當前線程睡覺覺。
如果用偽代碼表示的話,那么當你調用了 epoll_wait 之后,此時的 fd 能找到的結構體就變成了這樣,這個 epoll 自己的等待隊列上會多一個等待項:
等待隊列 = [{ callback: default_wake_function, private: current }],
就緒隊列 = [],
紅黑樹 = [
socket1 = {
等待隊列 = [{ callback: ep_poll_callback }]
}
],
}
5、 當來消息了
當用戶態執行完了上邊仨系統調用之后,這條線程就 hang 在這兒了,知道有客戶端發消息過來。那么接下來我們看看當用戶發消息過來之后會發生什么。
太具體的網卡收包的過程咱們就不說了先,大概過程總之就是網卡收到數據之后觸發硬中斷以及軟中斷,軟中斷從緩沖區中把收到的數據處理成 sk_buffer 這個數據結構,然后從網卡驅動也就是鏈路層這一層開始往上送到網絡層再送到傳輸層,在網絡層將 sk_buffer 送到傳輸層之前,它要有一步是根據 sk_buffer 中的協議,來找到要使用哪個傳輸層協議:
在 tcp 對應的 tcp_v4_rc 方法中,就會根據 ip 以及 port 去查找對應的 socket:
struct sock *sk;
sk = __inet_lookup_skb(&tcp_hashinfo, skb, __tcp_hdrlen(th), th->source,
th->dest, sdif, &refcounted);
ret = tcp_v4_do_rcv(sk, skb);
}
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb) {
tcp_rcv_established(sk, skb);
}
void tcp_rcv_established(struct sock *sk, struct sk_buff *skb) {
tcp_data_ready(sk);
}
void tcp_data_ready(struct sock *sk) {
sk->sk_data_ready(sk);
}
我們順著 tcp_v4_rcv 這條調用鏈看下去,最后會發現最終會調用到一個叫 “sk->sk_data_ready” 的方法,這個方法從名字上看就能看出來,它的作用是當 “數據準備好了” 時候調用的,那么這個 sk_data_ready 是誰呢?其實這個方法是在當用戶創建 socket 以及內部的 sock 結構時候被掛到上邊去的,由于創建 socket 的過程也比較繁瑣,這里我們就不再細說了,在之前的文章中我們有過介紹。我們記住一個結論,就是這個 sk_data_ready 屬性,指向的就是一個叫做 “sock_def_readable” 的方法:
{
struct socket_wq *wq;
wq = rcu_dereference(sk->sk_wq);
wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
EPOLLRDNORM | EPOLLRDBAND);
}
該方法中通過 “wake_up_interruptible_sync_poll” 方法執行等待隊列上的回調函數。不過這里有個迷惑人的地方,就是它的名字雖然帶有 “wake_up” 喚醒字樣,但實際上這里其實并一定會喚醒當前線程。如果你在上層是對 socket 做了類似 recv 之類的操作的話,那確實這里是會做喚醒,但是在 epoll 的情況下,這里并不會直接喚醒線程,為啥呢?還記得上邊我們介紹 epoll 的三個相關系統調用,當你把 epoll_create、epoll_ctl、epoll_wait 仨東西都調用完之后,通過 fd 能拿到的是啥玩意兒么,來看下邊回顧下:
等待隊列 = [{ callback: default_wake_function, private: current }],
就緒隊列 = [],
紅黑樹 = [
socket1 = {
等待隊列 = [{ callback: ep_poll_callback }]
}
],
}
就是這么個玩意兒,它里頭有兩個 “等待隊列”,其中一個在 socket 上,另外一個在 epoll 上。這里這個 “sock_def_readable” 方法中的 “wake_up_interruptible_sync_poll” 其實是會去 socket 上的等待隊列中去拿那個等待項,這個等待項里只有一個 callback 指向了 ep_poll_callback 回調函數。其實對于非 epoll 的情況下,如果上層調用的 recv 的話, 這個 socket 的等待項中,確實是會還有個 private 指向 current 的,不過這里我們是 epoll 的場景,對于其他場景大家可以自行研究,如果把 epoll 這個場景整明白了,其他場景其實也大同小異。
總之呢,這里會調用這個 socket 的等待隊列中的 ep_poll_callback 方法:
// 先找到這個 socket 對應的紅黑樹上的那個節點
struct epitem *epi = ep_item_from_wait(wait);
// 再找到管理著這個 socket 的那個 eventpoll 結構體
struct eventpoll *ep = epi->ep;
// 把這個 socket 對應的紅黑樹的那個節點給添加到 eventpoll 的就緒隊列中
list_add_tail_lockless(&epi->rdllink, &ep->rdllist)
// 看 eventpoll 的等待隊列中是否有等待項, 然后嘗試喚醒
if (waitqueue_active(&ep->wq)) {
wake_up(&ep->wq);
}
}
也就是說,當托管給 epoll 的某個 socket 上接收到了消息之后,tcp 的協議棧那層會主動觸發一個喚醒用的 callback,這個 callback 是 “ep_poll_callback”,然后這個 “ep_poll_callback” 中又會找到紅黑樹上對應的節點,并把這個節點放到 epoll 內部的 “就緒隊列中”,此時的偽代碼可表示為:
等待隊列 = [{ callback: default_wake_function, private: current }],
就緒隊列 = [ socket1 ],
紅黑樹 = [],
}
簡單來講就是當某個 socket 收到消息后,這個 socket 就不在紅黑樹里呆著了,會被放到 epoll 的就緒隊列中。之后觸發 “wake_up” 方法,該方法就會去 epoll 自己的等待隊列上去看是否有等待項,有的話觸發它的 callback,這里如上偽代碼表示,就是觸發了 “default_wake_function” 方法:
里邊觸發了一個 try_to_wake_up,我們注意看這個函數的參數是誰,是一個叫 “curr->private” 的東西,這個是誰呢?誒!就是上邊偽代碼中 epoll 的等待隊列中的等待項里的 private 對應的那個 current,也就是之前調用了 epoll_wait 的那條線程對應的 task_struct。
換句話說,當調用了 try_to_wake_up(curr -> private) 之后,這條被 hang 住的線程,就會被重新加入到可運行的任務隊列中,操作系統會在適當的時機繼續執行它。
那么重新回到哪兒執行呢?還記得我們是在哪里 hang 住當前線程的么?是在調用了 epoll_wait 時,內部執行了一個叫做 “ep_poll” 的方法里邊 hang 住的,忘了的話可以往上翻一番看一看那個 “ep_poll” 方法。所以繼續執行的話,就可以執行到 “ep_send_events”,也就是會把當前就緒隊列中的東西返回給用戶態,最后就是用戶態拿到咔咔用就行了~
到這兒,我們總結一下當數據包來了之后會發生了:
- 網卡收到包后一路往上送,送到 tcp 那層后
- tcp 那層會根據 ip 和 port 找到對應的 socket
- 觸發 socket 上的喚醒函數
- 該函數主要是從 socket 的等待隊列中獲取等待項,并觸發其中的回調函數
- 這個回調函數中會找到這個 socket 對應的紅黑樹節點,并把這個節點加入到 epoll 自己的 “就緒隊列” 中
- 最后查看 epoll 自己的 “等待隊列” 中,是否有等待項,有的話觸發其中的回調函數
- 這個回調函數會拿到之前保存的 private 屬性,也就是 task_struct 進行線程喚醒
- 喚醒后的線程從之前 hang 住的地方重新開始執行,會把 epoll “就緒隊列” 中的都吐給用戶態去使用
epoll 的性能高在哪兒?
到這里我們終于說完了 epoll 的基本實現原理,現在我們可以回過頭來看一看,都說 epoll 性能高,那到底高在哪兒呢?
我們首先來看當不使用 epoll 的時候,我們可能會這么用 socket:
for {
conn = accpet(listenfd)
// 開個新線程或者從線程池里撈一條線程去處理 conn
// 這條線程里去 read,write
start_new_process(conn)
}
我們會先死循環中等待客戶端的鏈接,每來一個鏈接,就開啟一條新線程或者從池子里撈,用這條線程去處理 conn。
當我們使用 epoll 的時候,我們可能會這么用:
epoll_ctl(listenfd)
for {
nums = epoll_wait(&events)
for (i = 0; i < nums; i++) {
if (events[i].data.fd == listenfd) {
connfd = accpet(listenfd)
epoll_ctl(connfd)
} else {
connfd = ep[i].data.fd
// 開個新線程或者從線程池里撈一條線程去處理 conn
// 這條線程里去 read, write
start_new_process(connfd)
}
}
}
可以看到里頭其實也是會頻繁的創建新線程或者從池子里撈一條線程出來用。乍一看之下,感覺用不用 epoll 好像沒啥差別。但是實際上,我們可以細想一下,如果用第一種方式,我們將 accept 的 fd 交給一條新的線程之后,在其內部我們一般會怎么做呢?一般可能就是:
while(true) {
res = recv(acceptfd);
}
}
我們在新的線程中處理每個鏈接時,大概率還是會用個死循環然后里頭不停地去 hang 住線程知道有用戶發請求過來。那么此時這條線程就卡死在這兒了。那么如果這條線程是從線程池中撈出來的話,這條線程就暫時回不去池子里了,相當于我們可用的線程資源就少一個。
但是對于 epoll 的場景來講,epoll 是一定能保證當前用戶拿到的這個 fd 中,確定一定以及肯定是有事件發生了,所以我們即使會創建新的線程或者從池子里撈,也可以馬上就讓這條新的線程去對我們拿到的 fd 做處理,就不用再 hang 住這條線程了。也就是說我們可以高效地利用每一條線程。這就是 epoll 高性能的原因。
如果用 epoll 托管 epoll 會怎么樣?
回到我們的標題,我們在上邊的文章中說過,當你的文件系統實現了 poll 方法之后,就可以使用 epoll 來托管,我們也說過 epoll 自己就是一種文件系統,那么我們來看看 epoll 這個文件系統它能做哪些操作:
能看到它里頭其實也實現了 poll 方法,所以理論上來說我們就可以用 epoll 去托管 epoll。對于這個 “ep_eventpoll_poll” 方法,里面主要調用了一個 “poll_wait” 方法:
而對于 “poll_wait” 方法,它主要是調用了一個 “_qproc” 方法。怎么樣這個方法是不是眼熟,這個就和我們在上邊介紹用 epoll 管理 socket 時一樣,epoll_ctl 會調用 socket 的 poll 方法,然后這個 poll 方法中又會調用上層 epoll 傳過來的那個回調函數。
后邊的事情大家就可以嘗試自己去分析分析了,這里因為過程和 socket 是差不多的,我就不再一點點分析了,我們可以直接用偽代碼來表示,如果用 epoll 托管 epoll,最后的數據結構體的樣子,大概如下:
等待隊列 = [{ private: current, callback: default_wake_function }],
就緒隊列 = [],
紅黑樹 = [
socket2 = {
等待隊列 = [{ private: null, callback: ep_poll_callback }]
},
ep1 = {
等待隊列 = [{ callback: ep_poll_callback }],
就緒隊列 = [],
紅黑樹 = [
socket1 = {
等待隊列 = [{ private: null, callback: ep_poll_callback }]
}
],
}
],
}
簡單來講,就是內部的 epoll 的等待隊列中的等待項,其實回調函數和 socket 的等待項中一樣,也是 “ep_poll_callback” 方法,只有外層的 epoll 的等待項中才會保存當前線程的 current。
也就是說!如果我們用 epoll 去管理一個 epoll 會發生什么呢!
答案是其實啥也不會發生,和正常一樣,當外層的 epoll 有了就緒事件之后,用戶側拿到的 fd 除了是 socket 的 fd,還有可能是個內部 epoll 的 fd,這個 epoll 如果想從它上邊獲取到內部 socket 的消息,我們還是需要對內部的這個 epoll 做正常的 epoll_wait 等操作。我這里有個簡單的小 demo,大家感興趣的話可以自己嘗試一下玩一玩:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define PORT1 13190
#define PORT2 13191
#define MAX 1023
int set_fcntl(int rws)
{
int flags = fcntl(rws, F_GETFD);
if (flags < 0)
{
perror("get fcntl errnor");
return -1;
}
flags |= O_NONBLOCK;
if (fcntl(rws, F_SETFD, flags) < 0)
{
perror("set fcntl errnor");
return -1;
}
return 0;
}
int main() {
pid_t pid = getppid();
printf("本條進程的 pid 是: %dn", pid);
// 創建 socket1
int sockfd1, sockfd2;
struct sockaddr_in myaddr1, myaddr2;
sockfd1 = socket(AF_INET, SOCK_STREAM, 0);
sockfd2 = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd1 < 0 || sockfd2 < 0) {
perror("creat sockfd1 failed");
return -1;
}
int on = 1;
if (
setsockopt(sockfd1, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0 ||
setsockopt(sockfd2, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0
) {
perror("setsockopt");
return -1;
}
myaddr1.sin_family = AF_INET;
myaddr1.sin_port = htons(PORT1);
myaddr1.sin_addr.s_addr = INADDR_ANY;
myaddr2.sin_family = AF_INET;
myaddr2.sin_port = htons(PORT2);
myaddr2.sin_addr.s_addr = INADDR_ANY;
if (
bind(sockfd1, (const struct sockaddr *)&myaddr1, sizeof(myaddr1)) < 0 ||
bind(sockfd2, (const struct sockaddr *)&myaddr2, sizeof(myaddr2)) < 0
) {
perror("bind failed");
return -1;
}
if (
listen(sockfd1, 10) < 0 ||
listen(sockfd2, 10) < 0
) {
perror("listen failed");
return -1;
}
int efd = epoll_create(2);
int efd_internal = epoll_create(1);
if (efd < 0 || efd_internal < 0) {
printf("efd errnon");
return -1;
}
int cn_fd1 = accept(sockfd1, NULL, NULL);
int cn_fd2 = accept(sockfd2, NULL, NULL);
set_fcntl(cn_fd1);
set_fcntl(cn_fd2);
if (cn_fd1 < 0 || cn_fd2 < 0) {
printf("accept fd errnorn");
return -1;
}
struct epoll_event evt1 = {
.events = EPOLLIN,
.data.fd = cn_fd1,
};
struct epoll_event evt2 = {
.events = EPOLLIN,
.data.fd = cn_fd2,
};
struct epoll_event evt_internal = {
.events = EPOLLIN,
.data.fd = efd_internal,
};
// 把 fd1 添加到外部的 epoll 中
if (epoll_ctl(efd, EPOLL_CTL_ADD, cn_fd1, &evt1) < 0) {
printf("put listen_fd epoll errnon");
return -1;
}
// 把 fd2 添加到內部的 epoll
if (epoll_ctl(efd_internal, EPOLL_CTL_ADD, cn_fd2, &evt2) < 0) {
printf("put listen_fd epoll errnon");
return -1;
}
// 把內部的 epoll 添加到外部的 epoll 中
if (epoll_ctl(efd, EPOLL_CTL_ADD, efd_internal, &evt_internal) < 0) {
printf("put listen_fd epoll errnon");
return -1;
}
char buf[1024] = {0};
struct epoll_event events[MAX];
while (1) {
int i = 0;
// 這里再 wait 時, 要么是 fd1 收到數據, 要么是內部的 epoll 的 fd2 收到數據
int num = epoll_wait(efd, events, MAX, ~0);
if (num < 0) {
printf("epoll_wait events start errnon");
return -1;
}
for (i = 0; i < num; i++) {
if (events[i].events & EPOLLIN) {
if (events[i].data.fd == cn_fd1) {
printf("外部的 fd1 接收到數據n");
int len = read(cn_fd1, buf, sizeof(buf));
if (len <= 0) {
struct epoll_event ac_evt1;
if (epoll_ctl(efd, EPOLL_CTL_DEL, cn_fd1, &ac_evt1) < 0) {
printf("put accept_fd epoll errnon");
return -1;
}
close(cn_fd1);
} else {
printf("%sn", buf);
write(events[i].data.fd, buf, len);
}
} else if (events[i].data.fd == cn_fd2) {
printf("外部的 fd2 接收到數據n");
int len = read(cn_fd1, buf, sizeof(buf));
if (len <= 0) {
struct epoll_event ac_evt1;
if (epoll_ctl(efd, EPOLL_CTL_DEL, cn_fd1, &ac_evt1) < 0) {
printf("put accept_fd epoll errnon");
return -1;
}
close(cn_fd1);
} else {
printf("%sn", buf);
write(events[i].data.fd, buf, len);
}
} else if (events[i].data.fd == efd_internal) {
printf("內部的 epoll 接收到數據n");
char buf_internal[1024] = {0};
struct epoll_event events_internal[MAX];
int num_internal = epoll_wait(efd_internal, events_internal, MAX, ~0);
if (num_internal < 0) {
printf("internal epoll_wait events start errnon");
return -1;
}
int i_internal = 0;
for (i_internal = 0; i_internal < num_internal; i_internal++) {
if (events_internal[i].events & EPOLLIN) {
if (events_internal[i].data.fd == cn_fd2) {
printf("內部的 fd2 接收到數據n");
int len = read(cn_fd2, buf_internal, sizeof(buf_internal));
if (len <= 0) {
struct epoll_event ac_evt2;
if (epoll_ctl(efd_internal, EPOLL_CTL_DEL, cn_fd2, &ac_evt2) < 0) {
printf("put internal accept_fd epoll errnon");
return -1;
}
close(cn_fd2);
} else {
printf("%sn", buf_internal);
write(cn_fd2, buf_internal, len);
}
}
}
}
}
}
}
}
}
到這里,我們就把 epoll 的實現原理,以及為啥性能好,還有一個不常見的小場景都介紹了一下。
-
Socket
+關注
關注
0文章
211瀏覽量
34637 -
源碼
+關注
關注
8文章
633瀏覽量
29140 -
代碼
+關注
關注
30文章
4748瀏覽量
68356 -
epoll
+關注
關注
0文章
28瀏覽量
2947
發布評論請先 登錄
相關推薦
評論