Linux IO复用—select poll 和 epoll

在Socket编程时,为了处理大量客户的连接请求,需要使用非阻塞I/O和端口复用,select、poll和epoll是Linux API提供的I/O复用方式。其实在*nix下的网络并发方法向来不缺,比如典型的Apache模型(Process Per Connection,简称PPC),TPC(Thread Per Connection)模型,这两种模型思想类似,就是利用了多进程、多线程概念,让进来的每一个连接去干别的事情去。但是连接多了以后,首先需要较大的内存,且进程/线程切换开销会非常大,因此这类模型能接受的最大连接数都不会太高。

Linux 2.6中加入了epoll之后(据说Windows下使用的是IOCP,但是我没使用过),在高性能服务器领域中得到广泛的应用,主要原因就是高效。在讲epoll之前,我们先总结下select、poll,因为epoll其实也就是他们的增强版本,比如select是一个系统调用,而epoll是个模块,由三个系统调用组成,内核中由文件系统实现。

select

select的第一个参数为fdset集合中最大描述符值加1,select对应于内核中的sys_select调用,sys_select首先将第二三四个参数指向的fd_set拷贝到内核,然后对每个被SET的描述符进行poll,并记录在临时结果中(fdset),如果有事件发生,select会将临时结果写到用户空间并返回,当轮询一遍后没有任何事件发生,如果指定了超时时间,则select会睡眠到超时,睡眠结束后再进行下一次轮询,并将临时结果写到用户空间,然后返回。select返回后,需要逐一检查关注的描述符是否被SET(事件是否发生)。

int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
1
2
3
4
5
6
参数                  描述
nfds             sets的文件描述符的最大值
readfds          fd_set type 类型,只读的描述符集
writefds         fd_set type 类型,只写的描述符集
errorfds         fd_set type 类型,错误的描述符集
timeout          超时等待时间

为了维护fd_set类型的参数,会使用下面四个宏:FD_SET(), FD_CLR(), FD_ZERO() 和 FD_ISSET()。

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct fd_set
{
u_int fd_count;
int fd_array[FD_SETSIZE];
}
//fd_array可SIZE*8个socket
int FD_ISSET(int fd,fd_set *fdset);
//返回值:若fd在描述符集中则返回非0值,否则返回0
void FD_CLR(int fd,fd_set *fdset); //fd指描述符
void FD_SET(int fd,fd_set *fdset);
void FD_ZERO(fd_set *fdset);

select 函数监视的文件描述符分3类,分别是writefds,readfds、和exceptfds。调用后 select 函数会阻塞,直到有描述符就绪(有数据可读、可写、或者except)、或者超时(timeout指定等待的时间,timeout== NULL表示永远等待,timeout == 0表示不等待、立即返回,其他表示等待的时间)。当 select 函数返回后,可以通过遍历 fdset ,来找到就绪的描述符。

select 的一个优点就是跨平台,缺点就是单个进程能够监视的文件描述符的数量存在最大限制,linux下一般为1024,Windows下好像无此限制,虽然可以修改这一限制,但是这样也会造成效率低下。

运行过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int sock;
FILE *fp;
struct fd_set fds;
struct timeval timeout={3,0}; //select等待3秒,3秒轮询,要非阻塞就置0
char buffer[256]={0}; //256字节的接收缓冲区
//假定已经建立UDP连接,具体过程不写,简单,当然TCP也同理,主机ip和port都已经给定,要写的文件已经打开
sock=socket(...);
bind(...);
fp=fopen(...);
while(1)
{
FD_ZERO(&fds); //每次循环都要清空集合,否则不能检测描述符变化
FD_SET(sock,&fds); //添加描述符
FD_SET(fp,&fds); //同上
maxfdp=sock>fp?sock+1:fp+1; //描述符最大值加1
// for(int i =0 ;i < maxfds; i++) if(FD_ISSET()) { }
switch(select(maxfdp,&fds,&fds,NULL,&timeout)) //select使用
{
case -1: exit(-1);break; //select错误,退出程序
case 0:break; //再次轮询
default:
if(FD_ISSET(sock,&fds)) //测试sock是否可读,即是否网络上有数据
{
recvfrom(sock,buffer,256,.....);//接受网络数据
if(FD_ISSET(fp,&fds)) //测试文件是否可写
fwrite(fp,buffer...);//写入文件
......
}
}
}

注意:每次select 有数据要遍历全部socket,每次select之前要重置fds的值。

poll

poll函数类似于 select,可用于任何类型的文件描述符,与 select 不同,poll不是为每个状态(可读性、可写性和异常状态)构造一个描述符集,而是构造一个pollfd 结构数组向内核传递需要关注的事件,故没有描述符个数的限制,每个数组元素指定一个描述符编号以及对其所关心的状态,pollfd中的events字段和revents字段分别用于标示关注的事件和发生的事件。

poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,poll返回后,需要对pollfd中的每个元素检查其revents值,来判断事件是否发生。

返回值:

-1:有错误产生
0:超时时间到,而且没有描述符有状态变化
>0:有状态变化的描述符个数

1
int poll(struct pollfd fdarray[],nfds_t nfds,int timeout);
1
2
3
4
5
struct pollfd{
int fd; //需要检查的文件描述符
short events; //等待的需要测试事件
short revents; //实际发生了的事件,也就是返回结果
};

应将每个数组元素的events成员设置为下图所示的值。通过这些值告诉内核我们对该描述符关心的是什么。返回时,内核设置revents成员,以说明对于该描述符已经发生了什么事件。(注意,poll没有更改events成员,这与select不同,select修改其参数以指示哪一个描述符已准备好了。)

poll的events和revents标志:


timeout == -1 永远等待。当所指定的描述符中的一个已准备好,或捕捉到一个信号时则返回。如果捕捉到一个信号,则poll返回-1,errno设置为EINTR timeout == 0 不等待 timeout > 0 等待timeout毫秒,如果已超时但是还没有一个描述符准备好,则返回值是0。
运行过程(与select相似):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
struct pollfd fds[IN_FILES];
char buf[MAX_BUFFER_SIZE];
int i,res,real_read, maxfd;
fds[0].fd = 0;
if((fds[1].fd=open("data1",O_RDONLY|O_NONBLOCK)) < 0)
{
fprintf(stderr,"open data1 error:%s",strerror(errno));
return 1;
}
if((fds[2].fd=open("data2",O_RDONLY|O_NONBLOCK)) < 0)
{
fprintf(stderr,"open data2 error:%s",strerror(errno));
return 1;
}
for (i = 0; i < IN_FILES; i++)
{
fds[i].events = POLLIN;
}
while(fds[0].events || fds[1].events || fds[2].events)
{
if (poll(fds, IN_FILES, TIME_DELAY) <= 0)
{
printf("Poll error\n");
return 1;
}
for (i = 0; i< IN_FILES; i++)
{
if (fds[i].revents)
{
memset(buf, 0, MAX_BUFFER_SIZE);
real_read = read(fds[i].fd, buf, MAX_BUFFER_SIZE);
if (real_read < 0)
{
if (errno != EAGAIN)
{
return 1;
}
}
else if (!real_read)
{
close(fds[i].fd);
fds[i].events = 0;
......
};

epoll

epoll通过epoll_create创建一个用于epoll轮询的描述符,通过epoll_ctl添加/修改/删除事件,通过epoll_wait 检查事件,epoll_wait 的第二个参数用于存放结果。

epoll与select、poll不同,首先,其不用每次调用都向内核拷贝事件描述信息,在第一次调用后,事件信息就会与对应的epoll描述符关联起来。另外epoll不是通过轮询,而是通过在等待的描述符上注册回调函数,当事件发生时,回调函数负责把发生的事件存储在就绪事件链表中,最后写到用户空间。

epoll返回后,该参数指向的缓冲区中即为发生的事件,对缓冲区中每个元素进行处理即可,而不需要像poll、select那样进行轮询检查。

系统调用:

1
2
//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。
int epoll_create(int size);
1
2
3
4
5
6
7
8
/*
epoll的事件注册函数,它与select()是在监听事件时告诉内核要监听什么类型的事件不同,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事。
*/
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

struct epoll_event结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event
{
__uint32_t events; //Epoll events
epoll_data_t data; //User data variable
};

`
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

1
2
3
4
/*
等待事件的产生,类似于select()调用,参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法是永久阻塞),该函数返回需要处理的事件数目
*/
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

运行过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listener) { // 如果是主socket的事件的话,则表示
// 有新连接进入了,进行新连接的处理。
client = accept(listener, (struct sockaddr *) &local, &addrlen);
if (client < 0){
perror("accept");
continue;
}
setnonblocking(client); // 将新连接置于非阻塞模式
ev.events = EPOLLIN | EPOLLET; // 并且将新连接也加入EPOLL的监听队列。
// 注意,这里的参数EPOLLIN | EPOLLET并没有设置对写socket的监听,
// 如果有写操作的话,这个时候epoll是不会返回事件的,如果要对写操作
// 也监听的话,应该是EPOLLIN | EPOLLOUT | EPOLLET
ev.data.fd = client;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
// 设置好event之后,将这个新的event通过epoll_ctl加入到epoll的监听队列里面,
// 这里用EPOLL_CTL_ADD来加一个新的epoll事件,通过EPOLL_CTL_DEL来减少一个
// epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式。
fprintf(stderr, "epoll set insertion error: fd=%d0, client);
return -1;
}
} else if (event[n].events & EPOLLIN) { // 如果是已经连接的用户,并且收到数据,
// 那么进行读入
int sockfd_r;
if ((sockfd_r = event[n].data.fd) < 0)
continue;
read(sockfd_r, buffer, MAXSIZE);
// 修改sockfd_r上要处理的事件为EPOLLOUT
ev.data.fd = sockfd_r;
ev.events = EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)
} else if (event[n].events & EPOLLOUT) { // 如果有数据发送
int sockfd_w = events[n].data.fd;
write(sockfd_w, buffer, sizeof(buffer));
// 修改sockfd_w上要处理的事件为EPOLLIN
ev.data.fd = sockfd_w;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)
}
do_use_fd(events[n].data.fd);
}

可简单归结为:

1
2
3
4
5
6
7
8
int fd = epoll_create(xxA); //xxA可监听的socket
struct epoll_event events[xxxB];//可返回的事件数
while(1){
int nfds = epoll_wait( ); //wait event occur
for(int i=0; i<nfds; i++){
…. }//end for
}//end while

epoll_wait返回的都是有效数据,可直接从struct epoll_events[]中获取事件,效率高。每次取事件后,要重新注册此socket的事件epoll(epoll_ctl)。

参考资料:

《UNIX环境高级编程》

http://zh.wikipedia.org/wiki/Select_(Unix)

http://zh.wikipedia.org/wiki/Epoll

http://www.cnblogs.com/xuxm2007/archive/2011/08/15/2139809.html

http://www.cnblogs.com/bigwangdi/p/3182958.html