什么是select?
有的朋友可能對select也不是很了解啊,我這里稍微科普一下:網絡連接,服務器也是通過文件描述符來管理這些連接上來的客戶端,既然是供連接的服務器,那就免不了要接收來自客戶端的消息。那么多臺客戶端,消息那么的多,要是漏了一條兩條重要消息,那也不要用TCP了,那怎么辦?
前輩們就是有辦法,輪詢,輪詢每個客戶端文件描述符,查看他們是否帶著消息,如果帶著,那就處理一下;如果沒帶著,那就一邊等著去。這就是select,輪詢,頗有點領導下基層的那種感覺哈。
但是這個select的輪詢吶,會有個問題,明眼人一下就能想到,那即是耗費資源啊,耗費什么資源,時間吶,慢吶(其實也挺快了,不過相對epoll來說就是慢)。 再認真想一下,還浪費什么資源,系統資源。有的客戶端吶,占著那啥玩意兒不干那啥事兒,這種客戶端吶,還不少。這也怪不得人家,哪兒有客戶端時時刻刻在發消息,要是有,那就要小心是不是惡意攻擊了。那把這么一堆偶爾動一下的客戶端的文件描述符一直攥手里,累不累?能一次攥多少個?就像一個老板,一直想著下去巡視,那他可以去當車間組長了哈哈哈。
所以,select的默認上限一般是1024(FD_SETSIZE),當然我們可以手動去改,但是人家給個1024自然有人家的道理,改太大的話系統在這一塊的負載就大了。 那句話怎么說的來著,你每次對系統的索取,其實都早已明碼標價!哈哈哈。。。
所以,我們選用epoll模型。
什么是epoll?
epoll接口是為解決Linux內核處理大量文件描述符而提出的方案。該接口屬于Linux下多路I/O復用接口中select/poll的增強。其經常應用于Linux下高并發服務型程序,特別是在大量并發連接中只有少部分連接處于活躍下的情況 (通常是這種情況),在該情況下能顯著的提高程序的CPU利用率。
前面說,select就像親自下基層視察的老板,那么epoll這個老板就要顯得精明的多了。他可不親自下基層,他找了個美女秘書,他只要盯著他的秘書看就行了,呸,他只需要聽取他的秘書的匯報就行了。匯報啥呢?基層有任何消息,跟秘書說,秘書匯總之后一次性交給老板來處理。這樣老板的時間不就大大的提高了嘛。
如果你學過設計模式,這就是典型的“命令模式”,非常符合“依賴倒置原則”,這是一個非常美妙的模式,這個原則也是我最喜歡的一個原則,將高層實現與低層實現解耦合,從而可以各自開發,只要接口一致便可,這個接口,就是秘書。
扯遠了,如果對“設計模式”有興趣,可以找我的專欄。
好,言歸正傳哈哈哈。
epoll的設計思路
- (1)epoll在Linux內核中構建了一個文件系統,該文件系統采用紅黑樹來構建,紅黑樹在增加和刪除上面的效率極高,因此是epoll高效的原因之一。有興趣可以百度紅黑樹了解,但在這里你只需知道其算法效率超高即可。
- (2)epoll提供了兩種觸發模式,水平觸發(LT)和邊沿觸發(ET)。當然,涉及到I/O操作也必然會有阻塞和非阻塞兩種方案。目前效率相對較高的是 epoll+ET+非阻塞I/O 模型,在具體情況下應該合理選用當前情形中最優的搭配方案。
- (3)epoll所支持的FD上限是最大可以打開文件的數目,這個數字一般遠大于1024,舉個例子,在1GB內存的機器上大約是10萬左右,具體數目可以下面語句查看,一般來說這個數目和系統內存關系很大。
阻塞I/O與非阻塞I/O
為了方便理解后面的內容,我們先看幾張圖,關于阻塞與非阻塞I/O的。
阻塞式文件I/O
非阻塞式文件I/O
多路復用I/O
好,有了上面這幾張圖墊著,咱來看看邊緣觸發和水平觸發。
邊緣觸發 VS 水平觸發
EPOLL 事件有兩種模型: Edge Triggered (ET) 邊緣觸發 只有新數據到來,才觸發,不管緩存區中是否還有數據。 Level Triggered (LT) 水平觸發 只要有數據都會觸發,不管數據是哪里的。 (這樣表述會不會好理解一些)
LT(level triggered) 是 缺省 的工作方式 ,并且同時支持 block 和 no-block socket. 在這種做法中,內核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的 fd 進行 IO 操作。如果你不作任何操作,內核還是會繼續通知你的,所以,這種模式編程出錯誤可能性要小一點。傳統的 select/poll 都是這種模型的代表.
ET(edge-triggered) 是高速工作方式 ,只支持 no-block socket 。在這種模式下,當描述符從未就緒變為就緒時,內核通過 epoll 告訴你。然后它會假設你知道文件描述符已經就緒,并且不會再為那個文件描述符發送更多的就緒通知,直到你做了某些操作導致那個文件描述符不再為就緒狀態了 ( 比如,你在發送,接收或者接收請求,或者發送接收的數據少于一定量時導致了一個 EWOULDBLOCK 錯誤)。但是請注意,如果一直不對這個 fd 作 IO 操作 ( 從而導致它再次變成未就緒 ) ,內核不會發送更多的通知 (only once), 不過在 TCP 協議中, ET 模式的加速效用仍需要更多的 benchmark 確認。
要設置ET:在epoll_ctl函數中配置上EPOLLET即可
epoll 工作在 ET 模式的時候,必須使用非阻塞套接口,以避免由于一個文件句柄的阻塞讀 / 阻塞寫操作把處理多個文件描述符的任務餓死。最好以下面的方式調用 ET 模式的 epoll 接口,在后面會介紹避免可能的缺陷。
- 基于非阻塞文件句柄
- 只有當 read(2) 或者 write(2) 返回 EAGAIN 時才需要掛起,等待。但這并不是說每次 read() 時都需要循環讀,直到讀到產生一個 EAGAIN 才認為此次事件處理完成,當 read() 返回的讀到的數據長度小于請求的數據長度時,就可以確定此時緩沖中已沒有數據了,也就可以認為此事讀事件已處理完成。
epoll API
epoll提供的API,我所用過的其實不多,無非就那么幾個。 所以我就只能聊聊我說用過的。
頭文件
#include< sys/epoll.h >
創建句柄
int epoll_create(int size);
創建一個epoll句柄,參數size用于告訴內核監聽的文件描述符個數,跟內存大小有關。 返回epoll 文件描述符
控制某個epoll監控的文件描述符上的事件:注冊,修改,刪除
參數釋義: epfd:為epoll的句柄 op:表示動作,用3個宏來表示 ··· EPOLL_CTL_ADD(注冊新的 fd 到epfd) ··· EPOLL_CTL_DEL(從 epfd 中刪除一個 fd) ··· EPOLL_CTL_MOD(修改已經注冊的 fd 監聽事件)
event:告訴內核需要監聽的事件
typedef union epoll_data
{
void* ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t; /* 保存觸發事件的某個文件描述符相關的數據 */
struct epoll_event
{
__uint32_t events; /* epoll event */
epoll_data_t data; /* User data variable */
};
/* epoll_event.events:
EPOLLIN 表示對應的文件描述符可以讀
EPOLLOUT 表示對應的文件描述符可以寫
EPOLLPRI 表示對應的文件描述符有緊急的數據可讀
EPOLLERR 表示對應的文件描述符發生錯誤
EPOLLHUP 表示對應的文件描述符被掛斷
EPOLLET 設置ET模式
*/
epoll消息讀取
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
等待所監控文件描述符上有事件的產生
參數釋義: events:用來從內核得到事件的集合 maxevent:用于告訴內核這個event有多大,這個maxevent不能大于創建句柄時的size timeout:超時時間 ··· -1:阻塞 ··· 0:立即返回 ···>0:指定微秒
成功返回有多少個文件描述符準備就緒,時間到返回0,出錯返回-1.
代碼示例
/* 實現功能:通過epoll, 處理多個socket
* 監聽一個端口,監聽到有鏈接時,添加到epoll_event
* xs
*/
#include < stdio.h >
#include < stdlib.h >
#include < string.h >
#include < sys/socket.h >
#include < poll.h >
#include < sys/epoll.h >
#include < sys/time.h >
#include < netinet/in.h >
#include < unistd.h >
#define MYPORT 12345
//最多處理的connect
#define MAX_EVENTS 500
//當前的連接數
int currentClient = 0;
//數據接受 buf
#define REVLEN 10
char recvBuf[REVLEN];
//epoll描述符
int epollfd;
//事件數組
struct epoll_event eventList[MAX_EVENTS];
void AcceptConn(int srvfd);
void RecvData(int fd);
int main()
{
int i, ret, sinSize;
int recvLen = 0;
fd_set readfds, writefds;
int sockListen, sockSvr, sockMax;
int timeout;
struct sockaddr_in server_addr;
struct sockaddr_in client_addr;
//socket
if((sockListen=socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
printf("socket errorn");
return -1;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(MYPORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
//bind
if(bind(sockListen, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0)
{
printf("bind errorn");
return -1;
}
//listen
if(listen(sockListen, 5) < 0)
{
printf("listen errorn");
return -1;
}
// epoll 初始化
epollfd = epoll_create(MAX_EVENTS);
struct epoll_event event;
event.events = EPOLLIN|EPOLLET;
event.data.fd = sockListen;
//add Event
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, sockListen, &event) < 0)
{
printf("epoll add fail : fd = %dn", sockListen);
return -1;
}
//epoll
while(1)
{
timeout=3000;
//epoll_wait
int ret = epoll_wait(epollfd, eventList, MAX_EVENTS, timeout);
if(ret < 0)
{
printf("epoll errorn");
break;
}
else if(ret == 0)
{
printf("timeout ...n");
continue;
}
//直接獲取了事件數量,給出了活動的流,這里是和poll區別的關鍵
int i = 0;
for(i=0; i< ret; i++)
{
//錯誤退出
if ((eventList[i].events & EPOLLERR) ||
(eventList[i].events & EPOLLHUP) ||
!(eventList[i].events & EPOLLIN))
{
printf ( "epoll errorn");
close (eventList[i].data.fd);
return -1;
}
if (eventList[i].data.fd == sockListen)
{
AcceptConn(sockListen);
}else{
RecvData(eventList[i].data.fd);
}
}
}
close(epollfd);
close(sockListen);
return 0;
}
/**************************************************
函數名:AcceptConn
功能:接受客戶端的鏈接
參數:srvfd:監聽SOCKET
***************************************************/
void AcceptConn(int srvfd)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
bzero(&sin, len);
int confd = accept(srvfd, (struct sockaddr*)&sin, &len);
if (confd < 0)
{
printf("bad acceptn");
return;
}else
{
printf("Accept Connection: %d", confd);
}
//將新建立的連接添加到EPOLL的監聽中
struct epoll_event event;
event.data.fd = confd;
event.events = EPOLLIN|EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, confd, &event);
}
//讀取數據
void RecvData(int fd)
{
int ret;
int recvLen = 0;
memset(recvBuf, 0, REVLEN);
printf("RecvData functionn");
if(recvLen != REVLEN)
{
while(1)
{
//recv數據
ret = recv(fd, (char *)recvBuf+recvLen, REVLEN-recvLen, 0);
if(ret == 0)
{
recvLen = 0;
break;
}
else if(ret < 0)
{
recvLen = 0;
break;
}
//數據接受正常
recvLen = recvLen+ret;
if(recvLen< REVLEN)
{
continue;
}
else
{
//數據接受完畢
printf("buf = %sn", recvBuf);
recvLen = 0;
break;
}
}
}
printf("data is %s", recvBuf);
}
整體拔高:高效的并發方式
并發編程的目的是讓程序”同時”執行多個任務。如果程序是計算密集型的,并發編程并沒有什么優勢,反而由于任務的切換使效率降低。但如果程序是I/O密集型的,那就不同了。
并發模式是指I/O處理單元和多個邏輯單元之間協調完成任務的方法,服務器主要有兩種并發編程模式:半同步/半異步(half-sync/half-async)模式和領導者/追隨者(Leader/Followers)模式。
這里講一個“半同步/半異步”。
下面的內容需要有一定的基礎了,小白可以收藏一下以后變強了再看。
半同步/半異步模式
在半同步/半異步模式中,同步線程用于處理客戶邏輯,異步線程用于處理I/O事件。異步線程監聽到客戶請求之后就將其封裝成請求對象并插入到請求隊列中。請求隊列將通知某個工作在同步模式的工作線程來讀取并處理該請求對象。
半同步/半反應堆模式(half-sync/half-reactive模式)
半同步/半反應堆模式是半同步/半異步模式的一種變體。 其結構如下圖:
在上圖中,異步線程只有一個,由主線程充當,負責監聽socket上的事件。如果監聽socket上有新的連接請求到來,主線程就接受新的連接socket,然后往epoll內核事件表中注冊該socket上的讀寫事件。如果連接socket上有讀寫事件發生,即有新的客戶請求到來或有數據要發送至客戶端,主線程就將該連接socket插入到請求隊列中,所有工作線程都睡眠在請求隊列上,當有任務到來時,他們通過競爭來獲取任務的接管權。 由于主線程插入請求隊列中的任務是就緒的連接socket,所以該半同步/半反應堆模式所采用的事件處理模式是Reactor模式,即工作線程要自己從socket上讀寫數據。當然,半同步/半反應堆模式也可以用模擬的Proactor事件處理模式,即由主線程來完成數據的讀寫操作,此時主線程將應用程序數據、任務類型等信息封裝為一個任務對象,然后將其插入到請求隊列。
半同步/半反應堆模式的缺點: 主線程和工作線程共享請求隊列,因而請求隊列是臨界資源,所以對請求隊列操作的時候需要加鎖保護。 每個工作線程在同一時間只能處理一個客戶請求。如果客戶數量增多,則請求隊列中堆積任務太多,客戶端的響應會越來越慢。如果增多工作線程的話,則線程的切花也將消耗大量的CPU時間。
高效的半同步/半異步模式
在半同步/半反應堆模式中,每個工作線程同時只能處理一個客戶請求,如果并發量大的話,客戶端響應會很慢。如果每個工作線程都能同時處理多個客戶鏈接,則就能改善這種情況,所以就有了高效的半同步/半異步模式。 其結構如圖:
主線程只管監聽socket,當有新的連接socket到來時,主線程就接受連接并返回新的連接socket給某個工作線程。此后該新連接socket上的任何I/O操作都由被選中的工作線程來處理,直到客戶端關閉連接。當工作線程檢測到有新的連接socket到來時,就把該新的連接socket的讀寫事件注冊到自己的epoll內核事件表中。 主線程和工作線程都維持自己的事件循環,他們各自獨立的監聽不同事件。因此在這種高效的半同步/半異步模式中,每個線程都工作在異步模式中,所以它并非嚴格意義上的半同步/半異步模式。
epoll源碼學習
數據結構
eventpoll
// epoll的核心實現對應于一個epoll描述符
struct eventpoll {
spinlock_t lock;
struct mutex mtx;
wait_queue_head_t wq; // sys_epoll_wait() 等待在這里
// f_op- >poll() 使用的, 被其他事件通知機制利用的wait_address
wait_queue_head_t poll_wait;
//已就緒的需要檢查的epitem 列表
struct list_head rdllist;
//保存所有加入到當前epoll的文件對應的epitem
struct rb_root rbr;
// 當正在向用戶空間復制數據時, 產生的可用文件
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
//優化循環檢查,避免循環檢查中重復的遍歷
int visited;
struct list_head visited_list_link;
}
epitem
// 對應于一個加入到epoll的文件
struct epitem {
// 掛載到eventpoll 的紅黑樹節點
struct rb_node rbn;
// 掛載到eventpoll.rdllist 的節點
struct list_head rdllink;
// 連接到ovflist 的指針
struct epitem *next;
/* 文件描述符信息fd + file, 紅黑樹的key */
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
// 當前文件的等待隊列(eppoll_entry)列表
// 同一個文件上可能會監視多種事件,
// 這些事件可能屬于不同的wait_queue中
// (取決于對應文件類型的實現),
// 所以需要使用鏈表
struct list_head pwqlist;
// 當前epitem 的所有者
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* epoll_ctl 傳入的用戶數據 */
struct epoll_event event;
};
eppoll_entry
// 與一個文件上的一個wait_queue_head 相關聯,因為同一文件可能有多個等待的事件,
//這些事件可能使用不同的等待隊列
struct eppoll_entry {
// List struct epitem.pwqlist
struct list_head llink;
// 所有者
struct epitem *base;
// 添加到wait_queue 中的節點
wait_queue_t wait;
// 文件wait_queue 頭
wait_queue_head_t *whead;
};
函數接口
epoll_create()
//先進行判斷size是否 >=0,若是則直接調用epoll_create1
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
SYSCALL_DEFINE1是一個宏,用于定義有一個參數的系統調用函數,上述宏展開后即成為: int sys_epoll_create(int size),這就是epoll_create系統調用的入口。至于為何要用宏而不是直接聲明,主要是因為系統調用的參數個數、傳參方式都有嚴格限制,最多六個參數。
/* 這才是真正的epoll_create啊~~ */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error;
struct eventpoll *ep = NULL;//主描述符
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
/* 對于epoll來講, 目前唯一有效的flag就是CLOEXEC */
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
/* 分配一個struct eventpoll */
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
/* 這里是創建一個匿名fd。
epollfd本身并不存在一個真正的文件與之對應, 所以內核需要創建一個
"虛擬"的文件, 并為之分配真正的struct file結構, 而且有真正的fd.
這里2個參數比較關鍵:
eventpoll_fops, fops就是file operations, 就是當你對這個文件(這里是虛擬的)進行操作(比如讀)時,
fops里面的函數指針指向真正的操作實現, 類似C++里面虛函數和子類的概念.
epoll只實現了poll和release(就是close)操作, 其它文件系統操作都有VFS全權處理了.
ep, ep就是struct epollevent, 它會作為一個私有數據保存在struct file的private指針里面.
*/
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (error < 0)
ep_free(ep);
return error;
// epoll 文件系統的相關實現
// epoll 文件系統初始化, 在系統啟動時會調用
static int __init eventpoll_init(void)
{
struct sysinfo si;
si_meminfo(&si);
// 限制可添加到epoll的最多的描述符數量
max_user_watches = (((si.totalram - si.totalhigh) / 25) < < PAGE_SHIFT) /
EP_ITEM_COST;
BUG_ON(max_user_watches < 0);
// 初始化遞歸檢查隊列
ep_nested_calls_init(&poll_loop_ncalls);
ep_nested_calls_init(&poll_safewake_ncalls);
ep_nested_calls_init(&poll_readywalk_ncalls);
// epoll 使用的slab分配器分別用來分配epitem和eppoll_entry
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);
return 0;
}
epoll_ctl()
//創建好epollfd后, 接下來添加fd
//epoll_ctl的參數:epfd 表示epollfd;op 有ADD,MOD,DEL,
//fd 是需要監聽的描述符,event 我們感興趣的events
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
//錯誤處理以及從用戶空間將epoll_event結構copy到內核空間.
if (ep_op_has_event(op) &&
// 復制用戶空間數據到內核
copy_from_user(&epds, event, sizeof(struct epoll_event))) {
goto error_return;
}
// 取得 epfd 對應的文件
error = -EBADF;
file = fget(epfd);
if (!file) {
goto error_return;
}
// 取得目標文件
tfile = fget(fd);
if (!tfile) {
goto error_fput;
}
// 目標文件必須提供 poll 操作
error = -EPERM;
if (!tfile- >f_op || !tfile- >f_op- >poll) {
goto error_tgt_fput;
}
// 添加自身或epfd 不是epoll 句柄
error = -EINVAL;
if (file == tfile || !is_file_epoll(file)) {
goto error_tgt_fput;
}
// 取得內部結構eventpoll
ep = file- >private_data;
// EPOLL_CTL_MOD 不需要加全局鎖 epmutex
if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {
mutex_lock(&epmutex);
did_lock_epmutex = 1;
}
if (op == EPOLL_CTL_ADD) {
if (is_file_epoll(tfile)) {
error = -ELOOP;
// 目標文件也是epoll 檢測是否有循環包含的問題
if (ep_loop_check(ep, tfile) != 0) {
goto error_tgt_fput;
}
} else
{
// 將目標文件添加到 epoll 全局的tfile_check_list 中
list_add(&tfile- >f_tfile_llink, &tfile_check_list);
}
}
mutex_lock_nested(&ep- >mtx, 0);
// 以tfile 和fd 為key 在rbtree 中查找文件對應的epitem
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
// 沒找到, 添加額外添加ERR HUP 事件
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else {
error = -EEXIST;
}
// 清空文件檢查列表
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi) {
error = ep_remove(ep, epi);
} else {
error = -ENOENT;
}
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else {
error = -ENOENT;
}
break;
}
mutex_unlock(&ep- >mtx);
error_tgt_fput:
if (did_lock_epmutex) {
mutex_unlock(&epmutex);
}
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
//ep_insert()在epoll_ctl()中被調用, 完成往epollfd里面添加一個監聽fd的工作
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
/*
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
*/
// 增加監視文件數
user_watches = atomic_long_read(&ep- >user- >epoll_watches);
if (unlikely(user_watches >= max_user_watches)) {
return -ENOSPC;
}
// 分配初始化 epi
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) {
return -ENOMEM;
}
INIT_LIST_HEAD(&epi- >rdllink);
INIT_LIST_HEAD(&epi- >fllink);
INIT_LIST_HEAD(&epi- >pwqlist);
epi- >ep = ep;
// 初始化紅黑樹中的key
ep_set_ffd(&epi- >ffd, tfile, fd);
// 直接復制用戶結構
epi- >event = *event;
epi- >nwait = 0;
epi- >next = EP_UNACTIVE_PTR;
// 初始化臨時的 epq
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 設置事件掩碼
epq.pt._key = event- >events;
// 內部會調用ep_ptable_queue_proc, 在文件對應的wait queue head 上
// 注冊回調函數, 并返回當前文件的狀態
revents = tfile- >f_op- >poll(tfile, &epq.pt);
// 檢查錯誤
error = -ENOMEM;
if (epi- >nwait < 0) { // f_op- >poll 過程出錯
goto error_unregister;
}
// 添加當前的epitem 到文件的f_ep_links 鏈表
spin_lock(&tfile- >f_lock);
list_add_tail(&epi- >fllink, &tfile- >f_ep_links);
spin_unlock(&tfile- >f_lock);
// 插入epi 到rbtree
ep_rbtree_insert(ep, epi);
/* now check if we've created too many backpaths */
error = -EINVAL;
if (reverse_path_check()) {
goto error_remove_epi;
}
spin_lock_irqsave(&ep- >lock, flags);
/* 文件已經就緒插入到就緒鏈表rdllist */
if ((revents & event- >events) && !ep_is_linked(&epi- >rdllink)) {
list_add_tail(&epi- >rdllink, &ep- >rdllist);
if (waitqueue_active(&ep- >wq))
// 通知sys_epoll_wait , 調用回調函數喚醒sys_epoll_wait 進程
{
wake_up_locked(&ep- >wq);
}
// 先不通知調用eventpoll_poll 的進程
if (waitqueue_active(&ep- >poll_wait)) {
pwake++;
}
}
spin_unlock_irqrestore(&ep- >lock, flags);
atomic_long_inc(&ep- >user- >epoll_watches);
if (pwake)
// 安全通知調用eventpoll_poll 的進程
{
ep_poll_safewake(&ep- >poll_wait);
}
return 0;
error_remove_epi:
spin_lock(&tfile- >f_lock);
// 刪除文件上的 epi
if (ep_is_linked(&epi- >fllink)) {
list_del_init(&epi- >fllink);
}
spin_unlock(&tfile- >f_lock);
// 從紅黑樹中刪除
rb_erase(&epi- >rbn, &ep- >rbr);
error_unregister:
// 從文件的wait_queue 中刪除, 釋放epitem 關聯的所有eppoll_entry
ep_unregister_pollwait(ep, epi);
spin_lock_irqsave(&ep- >lock, flags);
if (ep_is_linked(&epi- >rdllink)) {
list_del_init(&epi- >rdllink);
}
spin_unlock_irqrestore(&ep- >lock, flags);
// 釋放epi
kmem_cache_free(epi_cache, epi);
return error;
}
static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
{
int pollflags;
struct eventpoll *ep = file- >private_data;
// 插入到wait_queue
poll_wait(file, &ep- >poll_wait, wait);
// 掃描就緒的文件列表, 調用每個文件上的poll 檢測是否真的就緒,
// 然后復制到用戶空間
// 文件列表中有可能有epoll文件, 調用poll的時候有可能會產生遞歸,
// 調用所以用ep_call_nested 包裝一下, 防止死循環和過深的調用
pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,
ep_poll_readyevents_proc, ep, ep, current);
// static struct nested_calls poll_readywalk_ncalls;
return pollflags != -1 ? pollflags : 0;
}
// 通用的poll_wait 函數, 文件的f_ops- >poll 通常會調用此函數
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p- >_qproc && wait_address) {
// 調用_qproc 在wait_address 上添加節點和回調函數
// 調用 poll_table_struct 上的函數指針向wait_address添加節點, 并設置節點的func
// (如果是select或poll 則是 __pollwait, 如果是 epoll 則是 ep_ptable_queue_proc),
p- >_qproc(filp, wait_address, p);
}
}
/*
* 該函數在調用f_op- >poll()時會被調用.
* 也就是epoll主動poll某個fd時, 用來將epitem與指定的fd關聯起來的.
* 關聯的辦法就是使用等待隊列(waitqueue)
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi- >nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
/* 初始化等待隊列, 指定ep_poll_callback為喚醒時的回調函數,
* 當我們監聽的fd發生狀態改變時, 也就是隊列頭被喚醒時,
* 指定的回調函數將會被調用. */
init_waitqueue_func_entry(&pwq- >wait, ep_poll_callback);
pwq- >whead = whead;
pwq- >base = epi;
/* 將剛分配的等待隊列成員加入到頭中, 頭是由fd持有的 */
add_wait_queue(whead, &pwq- >wait);
list_add_tail(&pwq- >llink, &epi- >pwqlist);
/* nwait記錄了當前epitem加入到了多少個等待隊列中,
* 我認為這個值最大也只會是1... */
epi- >nwait++;
} else {
/* We have to signal that an error occurred */
epi- >nwait = -1;
}
}
//回調函數, 當我們監聽的fd發生狀態改變時, 它會被調用.
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
//從等待隊列獲取epitem.需要知道哪個進程掛載到這個設備
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi- >ep;//獲取
spin_lock_irqsave(&ep- >lock, flags);
if (!(epi- >event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
/* 沒有我們關心的event... */
if (key && !((unsigned long) key & epi- >event.events))
goto out_unlock;
/*
* 這里看起來可能有點費解, 其實干的事情比較簡單:
* 如果該callback被調用的同時, epoll_wait()已經返回了,
* 也就是說, 此刻應用程序有可能已經在循環獲取events,
* 這種情況下, 內核將此刻發生event的epitem用一個單獨的鏈表
* 鏈起來, 不發給應用程序, 也不丟棄, 而是在下一次epoll_wait
* 時返回給用戶.
*/
if (unlikely(ep- >ovflist != EP_UNACTIVE_PTR)) {
if (epi- >next == EP_UNACTIVE_PTR) {
epi- >next = ep- >ovflist;
ep- >ovflist = epi;
}
goto out_unlock;
}
/* 將當前的epitem放入ready list */
if (!ep_is_linked(&epi- >rdllink))
list_add_tail(&epi- >rdllink, &ep- >rdllist);
/* 喚醒epoll_wait... */
if (waitqueue_active(&ep- >wq))
wake_up_locked(&ep- >wq);
/* 如果epollfd也在被poll, 那就喚醒隊列里面的所有成員. */
if (waitqueue_active(&ep- >poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep- >lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep- >poll_wait);
return 1;
}
epoll_wait()
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
/* 這個地方有必要說明一下:
* 內核對應用程序采取的策略是"絕對不信任",
* 所以內核跟應用程序之間的數據交互大都是copy, 不允許(也時候也是不能...)指針引用.
* epoll_wait()需要內核返回數據給用戶空間, 內存由用戶程序提供,
* 所以內核會用一些手段來驗證這一段內存空間是不是有效的.
*/
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
/* Get the "struct file *" for the eventpoll file */
error = -EBADF;
/* 獲取epollfd的struct file, epollfd也是文件嘛 */
file = fget(epfd);
if (!file)
goto error_return;
error = -EINVAL;
/* 檢查一下它是不是一個真正的epollfd... */
if (!is_file_epoll(file))
goto error_fput;
/* 獲取eventpoll結構 */
ep = file- >private_data;
/* 等待事件到來~~ */
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
/* 這個函數真正將執行epoll_wait的進程帶入睡眠狀態... */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;//等待隊列
/* 計算睡覺時間, 毫秒要轉換為HZ */
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
spin_lock_irqsave(&ep- >lock, flags);
res = 0;
/* 如果ready list不為空, 就不睡了, 直接干活... */
if (list_empty(&ep- >rdllist)) {
/* OK, 初始化一個等待隊列, 準備直接把自己掛起,
* 注意current是一個宏, 代表當前進程 */
init_waitqueue_entry(&wait, current);//初始化等待隊列,wait表示當前進程
__add_wait_queue_exclusive(&ep- >wq, &wait);//掛載到ep結構的等待隊列
for (;;) {
/* 將當前進程設置位睡眠, 但是可以被信號喚醒的狀態,
* 注意這個設置是"將來時", 我們此刻還沒睡! */
set_current_state(TASK_INTERRUPTIBLE);
/* 如果這個時候, ready list里面有成員了,
* 或者睡眠時間已經過了, 就直接不睡了... */
if (!list_empty(&ep- >rdllist) || !jtimeout)
break;
/* 如果有信號產生, 也起床... */
if (signal_pending(current)) {
res = -EINTR;
break;
}
/* 啥事都沒有,解鎖, 睡覺... */
spin_unlock_irqrestore(&ep- >lock, flags);
/* jtimeout這個時間后, 會被喚醒,
* ep_poll_callback()如果此時被調用,
* 那么我們就會直接被喚醒, 不用等時間了...
* 再次強調一下ep_poll_callback()的調用時機是由被監聽的fd
* 的具體實現, 比如socket或者某個設備驅動來決定的,
* 因為等待隊列頭是他們持有的, epoll和當前進程
* 只是單純的等待...
**/
jtimeout = schedule_timeout(jtimeout);//睡覺
spin_lock_irqsave(&ep- >lock, flags);
}
__remove_wait_queue(&ep- >wq, &wait);
/* OK 我們醒來了... */
set_current_state(TASK_RUNNING);
}
/* Is it worth to try to dig for events ? */
eavail = !list_empty(&ep- >rdllist) || ep- >ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep- >lock, flags);
/* 如果一切正常, 有event發生, 就開始準備數據copy給用戶空間了... */
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
//調用p_scan_ready_list()
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
//由ep_send_events()調用本函數
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
mutex_lock(&ep- >mtx);
spin_lock_irqsave(&ep- >lock, flags);
/* 這一步要注意, 首先, 所有監聽到events的epitem都鏈到rdllist上了,
* 但是這一步之后, 所有的epitem都轉移到了txlist上, 而rdllist被清空了,
* 要注意哦, rdllist已經被清空了! */
list_splice_init(&ep- >rdllist, &txlist);
/* ovflist, 在ep_poll_callback()里面我解釋過, 此時此刻我們不希望
* 有新的event加入到ready list中了, 保存后下次再處理... */
ep- >ovflist = NULL;
spin_unlock_irqrestore(&ep- >lock, flags);
/* 在這個回調函數里面處理每個epitem
* sproc 就是 ep_send_events_proc, 下面會注釋到. */
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep- >lock, flags);
/* 現在我們來處理ovflist, 這些epitem都是我們在傳遞數據給用戶空間時
* 監聽到了事件. */
for (nepi = ep- >ovflist; (epi = nepi) != NULL;
nepi = epi- >next, epi- >next = EP_UNACTIVE_PTR) {
/* 將這些直接放入readylist */
if (!ep_is_linked(&epi- >rdllink))
list_add_tail(&epi- >rdllink, &ep- >rdllist);
}
ep- >ovflist = EP_UNACTIVE_PTR;
/* 上一次沒有處理完的epitem, 重新插入到ready list */
list_splice(&txlist, &ep- >rdllist);
/* ready list不為空, 直接喚醒... */
if (!list_empty(&ep- >rdllist)) {
if (waitqueue_active(&ep- >wq))
wake_up_locked(&ep- >wq);
if (waitqueue_active(&ep- >poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep- >lock, flags);
mutex_unlock(&ep- >mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep- >poll_wait);
return error;
}
-
服務器
+關注
關注
12文章
9021瀏覽量
85184 -
程序
+關注
關注
116文章
3777瀏覽量
80851 -
模型
+關注
關注
1文章
3172瀏覽量
48714 -
epoll
+關注
關注
0文章
28瀏覽量
2947
發布評論請先 登錄
相關推薦
評論