IO 多路复用之select、poll、epoll详解

select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

select

1
2
int select (int maxfdp1, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout)
;

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。但是select有一下三个缺点:

  1. 当个进程能够监控的文件描述符的数量受到限制(最大为1024,由FD_SETSIZE宏定义决定,修改需要重新编译内核)
  2. 每一次调用select,都需要将文件描述符set复制到内核里面,增加了系统开销
  3. select采用轮询的方式,每一次内核都要遍历 [0,maxfdp1)这么多个文件描述符,即使文件描述符set里面只有一
    个值为maxfdp的文件描述符,而且返回的时候,程序需要遍历所有监控的文件描述符来获得可读写的文件描述符

poll

1
int poll (struct pollfd *fds, unsigned int nfds, int timeout);

不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。

1
2
3
4
5
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events to watch */
short revents; /* returned events witnessed */
};

pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

epoll

epoll操作过程

epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

epoll操作过程需要三个接口,分别如下:

1
2
3
int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

1: int epoll_create(int size):

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2: int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event):

1
2
3
4
5
6
函数是对指定描述符fd执行op操作。
- epfd:是epoll_create()的返回值。
- op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD
分别添加、删除和修改对fd的监听事件。

- fd:是需要监听的fd(文件描述符)
- epoll_event:是告诉内核需要监听什么事

struct epoll_event结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

//events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个
socket加入到EPOLL队列里

3: int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

工作模式

epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:

LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。

ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

LT模式

LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。

ET模式

ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)

ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

在 select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。(此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。)

epoll的优点主要是一下几个方面:

  1. 监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左 右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。select的最大缺点就是进程打开的fd是有数量限制的。这对 于连接数量比较大的服务器来说根本不能满足。虽然也可以选择多进程的解决方案( Apache就是这样实现的),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。

  2. IO的效率不会随着监视fd的数量的增长而下降。epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的。只有就绪的fd才会执行回调函数。

  3. 文件描述符只需要复制一次到内核,不需要每一次调用函数都进行文件描述符的内核复制

如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle- connection,就会发现epoll的效率大大高于select/poll。

下面是在这里复制的一份epoll的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>

#define IPADDRESS "127.0.0.1"
#define PORT 8787
#define MAXSIZE 1024
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100

//函数声明
//创建套接字并进行绑定
static int socket_bind(const char* ip,int port);
//IO多路复用epoll
static void do_epoll(int listenfd);
//事件处理函数
static void
handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf);

//处理接收到的连接
static void handle_accpet(int epollfd,int listenfd);
//读处理
static void do_read(int epollfd,int fd,char *buf);
//写处理
static void do_write(int epollfd,int fd,char *buf);
//添加事件
static void add_event(int epollfd,int fd,int state);
//修改事件
static void modify_event(int epollfd,int fd,int state);
//删除事件
static void delete_event(int epollfd,int fd,int state);

int main(int argc,char *argv[])
{

int listenfd;
listenfd = socket_bind(IPADDRESS,PORT);
listen(listenfd,LISTENQ);
do_epoll(listenfd);
return 0;
}

static int socket_bind(const char* ip,int port)
{

int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if (listenfd == -1)
{
perror("socket error:");
exit(1);
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET,ip,&servaddr.sin_addr);
servaddr.sin_port = htons(port);
if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
{
perror("bind error: ");
exit(1);
}
return listenfd;
}

static void do_epoll(int listenfd)
{

int epollfd;
struct epoll_event events[EPOLLEVENTS];
int ret;
char buf[MAXSIZE];
memset(buf,0,MAXSIZE);
//创建一个描述符
epollfd = epoll_create(FDSIZE);
//添加监听描述符事件
add_event(epollfd,listenfd,EPOLLIN);
for ( ; ; )
{
//获取已经准备好的描述符事件
ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
handle_events(epollfd,events,ret,listenfd,buf);
}
close(epollfd);
}

static void
handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{

int i;
int fd;
//进行选好遍历
for (i = 0;i < num;i++)
{
fd = events[i].data.fd;
//根据描述符的类型和事件类型进行处理
if ((fd == listenfd) &&(events[i].events & EPOLLIN))
handle_accpet(epollfd,listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd,fd,buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd,fd,buf);
}
}
static void handle_accpet(int epollfd,int listenfd)
{

int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
if (clifd == -1)
perror("accpet error:");
else
{
printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
//添加一个客户描述符和事件
add_event(epollfd,clifd,EPOLLIN);
}
}

static void do_read(int epollfd,int fd,char *buf)
{

int nread;
nread = read(fd,buf,MAXSIZE);
if (nread == -1)
{
perror("read error:");
close(fd);
delete_event(epollfd,fd,EPOLLIN);
}
else if (nread == 0)
{
fprintf(stderr,"client close.\n");
close(fd);
delete_event(epollfd,fd,EPOLLIN);
}
else
{
printf("read message is : %s",buf);
//修改描述符对应的事件,由读改为写
modify_event(epollfd,fd,EPOLLOUT);
}
}

static void do_write(int epollfd,int fd,char *buf)
{

int nwrite;
nwrite = write(fd,buf,strlen(buf));
if (nwrite == -1)
{
perror("write error:");
close(fd);
delete_event(epollfd,fd,EPOLLOUT);
}
else
modify_event(epollfd,fd,EPOLLIN);
memset(buf,0,MAXSIZE);
}

static void add_event(int epollfd,int fd,int state)
{

struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

static void delete_event(int epollfd,int fd,int state)
{

struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

static void modify_event(int epollfd,int fd,int state)
{

struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}