“Better code, better life. ”
poll函数源码解析
poll()
是Linux系统调用,跟select
和epoll
组成网络编程中IO多路复用处理函数三兄弟。
基本参数
其声明在头文件poll.h中,函数原型为:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
参数fds
是pollfd
结构体指针,可以指向一个结构体数组
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
nfds
是调用者指定fds
数组中的项数。
timeout
参数指定poll()在等待文件描述符准备就绪时应该阻塞的毫秒数。当超时为-1时,轮询调用将一直阻塞,直到某个事件发生;当超时为0时,轮询调用立即返回。
pollfd
pollfd
的结构如上所示。
fd
, 字段fd
是打开文件的文件描述符。如果fd<0
,则忽略相应的事件字段,并且revents字段返回零(不能用于忽略文件描述符为0的情况)。events
, 输入参数,一个位掩码(bit mask
),用来指定应用程序对文件描述符fd
感兴趣的事件。events
可以为0,在这种情况下revents
只能返回POLLHUP
,POLLERR
,POLLNVAL
。revents
, 输入参数,返回events
所指定的事件,由内核填充。如果发生错误则返回POLLHUP
,POLLERR
,POLLNVAL
中的一个。
函数描述
poll
并不是一个阻塞函数,因为有timeout
参数。如果对于任何文件描述符都没有发生所请求的事件(也没有错误),则poll
会阻塞直到其中一个事件发生或者超时。
timeout
将四舍五入为系统时钟的粒度,内核调度延迟意味着阻塞间隔可能会少量溢出。在超时中指定负值表示无限超时。将超时指定为零会导致poll()立即返回,即使没有准备好文件描述符也是如此。
events
和revents
的定义在poll.h
中,常用的有:
POLLIN
,存在数据可读POLLPRI
,一些特殊情况POLLOUT
,套接字可写,但是如果写操作的大小大于可写空间,写操作依然会被阻塞。POLLERR
,出现异常,此时·poll·函数只返回revents
。POLLHUP
,套接字关闭。注意,当从管道(例如管道或流套接字)读取时,此事件仅表明对等方关闭了对端通道。仅在使用完该通道中的所有未完成数据之后,从该通道进行的后续读取将返回0。
数据结构
poll
在内核中除了struct pollfd
还有另外几个重要的数据结构:
poll_wqueues:
struct poll_wqueues {
poll_table pt;
struct poll_table_page * table;
int error;
};
poll_table_page:
/*实际为struct page结构,也即是内存的一页*/
struct poll_table_page {
struct poll_table_page * next;
struct poll_table_entry * entry;
struct poll_table_entry entries[0];
};
poll_table_entry:
struct poll_table_entry {
struct file * filp;
wait_queue_t wait;/* 等待队列*/
wait_queue_head_t * wait_address;
};
poll_list:
struct poll_list {
struct poll_list *next;
int len;
struct pollfd entries[0];
};
上面四种结构从上到下存在依赖关系
用户调用poll
后,内核会先初始化struct poll_wqueues
,将用户传入的struct pollfd
复制到poll_list
中。
然后轮询所有FD,在设备收到一条消息或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上的进程,此时poll
函数返回相应的fd set
。
poll_list
是突破select(2)
fd数量限制的关键。它一个pollfd
组成的链表,每个poll_list
都占用一个内存页,其next
指针指向下一个内存页的poll_list
源码分析
linux
源码有一个常识,就是用户态的函数调用加上sys_
前缀基本就是对应的内核调用,poll
函数对应内核的sys_poll
函数,位于select.c
asmlinkage long sys_poll(struct pollfd __user * ufds, unsigned int nfds, long timeout)
{
struct poll_wqueues table;
int fdcount, err;
unsigned int i;
struct poll_list *head;
struct poll_list *walk;
/* Do a sanity check on nfds ... OPEN_MAX=256*/
if (nfds > current->files->max_fdset && nfds > OPEN_MAX)
return -EINVAL;
if (timeout) {
/* Careful about overflow in the intermediate values */
if ((unsigned long) timeout < MAX_SCHEDULE_TIMEOUT / HZ)
timeout = (unsigned long)(timeout*HZ+999)/1000+1;
else /* Negative or overflow */
timeout = MAX_SCHEDULE_TIMEOUT;
}
poll_initwait(&table);
head = NULL;
walk = NULL;
i = nfds;
err = -ENOMEM;
while(i!=0) {
/*拷贝用户输入的fds到poll_list,poll_list属于一个个内存页,即poll_table_page*/
struct poll_list *pp;
pp = kmalloc(sizeof(struct poll_list)+
sizeof(struct pollfd)*
(i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i),
GFP_KERNEL);
if(pp==NULL)
goto out_fds;
pp->next=NULL;
pp->len = (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i);
if (head == NULL)
head = pp;
else
walk->next = pp;
walk = pp;
/*将用户输入的fds拷贝到内核*/
if (copy_from_user(pp->entries, ufds + nfds-i,
sizeof(struct pollfd)*pp->len)) {
err = -EFAULT;
goto out_fds;
}
i -= pp->len;
}
/*循环遍历所有poll_page上的所有poll_fd,返回有事件发生的fd数目,详见下面的源码*/
fdcount = do_poll(nfds, head, &table, timeout);
/* OK, now copy the revents fields back to user space. */
walk = head;
err = -EFAULT;
while(walk != NULL) {
struct pollfd *fds = walk->entries;
int j;
/*填充返回事件*/
for (j=0; j < walk->len; j++, ufds++) {
if(__put_user(fds[j].revents, &ufds->revents))
goto out_fds;
}
walk = walk->next;
}
err = fdcount;
if (!fdcount && signal_pending(current))
err = -EINTR;
out_fds:
walk = head;
/*释放资源*/
while(walk!=NULL) {
struct poll_list *pp = walk->next;
kfree(walk);
walk = pp;
}
poll_freewait(&table);
return err;
}
其实sys_poll
做的事情很简单:
- 对
timeout
参数做处理 - poll_initwait注册超时回调函数,这个比较重要,后面继续讲
- 初始化
poll_wqueues
,包括分配空间,拷贝用户传递的事件 - 调用do_poll
- 异常处理,资源回收
poll_initwait:
void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->error = 0;
pwq->table = NULL;
}
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->qproc = qproc;
}
poll_initwait
函数非常简单,它初始化一个poll_wqueues
变量table:
然后注册了一个回调函数__pollwait
,它将在驱动的poll函数里用到。
void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p)
{
struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt);
struct poll_table_page *table = p->table;
if (!table || POLL_TABLE_FULL(table)) {
/*add new poll table entry*/
struct poll_table_page *new_table;
new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
if (!new_table) {
p->error = -ENOMEM;
__set_current_state(TASK_RUNNING);
return;
}
new_table->entry = new_table->entries;
new_table->next = table;
p->table = new_table;
table = new_table;
}
/* Add a new entry */
struct poll_table_entry * entry = table->entry;
table->entry = entry+1;
get_file(filp);
entry->filp = filp;
entry->wait_address = wait_address;
init_waitqueue_entry(&entry->wait, current);
add_wait_queue(wait_address, &entry->wait);
}
do_poll
static int do_poll(unsigned int nfds, struct poll_list *list,
struct poll_wqueues *wait, long timeout)
{
int count = 0;
poll_table* pt = &wait->pt;
if (!timeout)
pt = NULL;
for (;;) {
struct poll_list *walk;
set_current_state(TASK_INTERRUPTIBLE);
walk = list;
while(walk != NULL) {
/*对所有poll_list做遍历*/
do_pollfd( walk->len, walk->entries, &pt, &count);
walk = walk->next;
}
pt = NULL;
/*判断超时参数*/
if (count || !timeout || signal_pending(current))
break;
count = wait->error;
/*如果某个fd上发生了错误,依然退出循环*/
if (count)
break;
timeout = schedule_timeout(timeout);
}
__set_current_state(TASK_RUNNING);
return count;
}
这两个函数是针对设备驱动调用各自的实现,比如:如果fd对应的是某个socket,则do_pollfd
执行的就是网络设备驱动所实现的poll
方法。
此外poll
的超时机制也是在do_poll
函数中实现。do_poll
的代码主体是一个死循环,只有当存在就绪的fd数大于0,或者timeout
设置为0是才会跳出。
注意这一行代码:
timeout = schedule_timeout(timeout);
它让本进程休眠一段时间。
应用程序执行poll调用后,如果退出循环的条件不满足,进程就会进入休眠。那么,谁唤醒呢?除了休眠到指定时间被系统唤醒外,还可以被驱动程序唤醒──记住这点,这就是为什么驱动的poll里要调用poll_wait的原因。
do_pollfd
static void do_pollfd(unsigned int num, struct pollfd * fdpage,
poll_table ** pwait, int *count)
{
int i;
for (i = 0; i < num; i++) {
int fd;
unsigned int mask;
struct pollfd *fdp;
mask = 0;
fdp = fdpage+i;
fd = fdp->fd;
if (fd >= 0) {
struct file * file = fget(fd);
mask = POLLNVAL;
if (file != NULL) {
mask = DEFAULT_POLLMASK;
/*对每个文件描述符,执行其特有的poll函数*/
if (file->f_op && file->f_op->poll)
mask = file->f_op->poll(file, *pwait);
mask &= fdp->events | POLLERR | POLLHUP;
fput(file);
}
if (mask) {
*pwait = NULL;
(*count)++;
}
}
fdp->revents = mask;
}
}
do_pollfd
是执行驱动的poll函数,其中两行代码:
if (file->f_op && file->f_op->poll)
mask = file->f_op->poll(file, *pwait);
就是调用我们的驱动程序里注册的poll函数。
驱动程序
驱动程序里与poll相关的地方有两处:
一是构造file_operation结构时,要定义自己的poll函数。
二是通过poll_wait来调用上面说到的__pollwait函数,pollwait的代码如下:
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
p->qproc(filp, wait_address, p);
}
p->qproc就是__pollwait函数,从它的代码可知,它只是把当前进程挂入我们驱动程序里定义的一个队列里而已。它的代码如下:
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
struct poll_table_entry *entry = poll_get_entry(p);
if (!entry)
return;
get_file(filp);
entry->filp = filp;
entry->wait_address = wait_address;
init_waitqueue_entry(&entry->wait, current);
add_wait_queue(wait_address, &entry->wait);
}
执行到驱动程序的poll_wait函数时,进程并没有休眠,我们的驱动程序里实现的poll函数是不会引起休眠的。让进程进入休眠,是前面分析的do_sys_poll函数
__timeout = schedule_timeout(__timeout)
这一行。poll_wait只是把本进程挂入某个队列。
至此,应用程序调用顺序为:
poll -> sys_poll -> do_sys_poll -> poll_initwait
do_poll -> do_pollfd -> 自定义poll函数
再调用schedule_timeout进入休眠。
如果我们的驱动程序发现情况就绪,可以把这个队列上挂着的进程唤醒。可见,poll_wait的作用,只是为了让驱动程序能找到要唤醒的进程。即使不用poll_wait,我们的程序也有机会被唤醒:chedule_timeout(__timeout),只是休眠__time_out这段时间。
总结
- poll函数统一处理所有事件类型,只需一个事件集参数。用户通过pollfd.events传入感兴趣事件,内核通过修改pollfd.revents成员反馈其中就绪的事件,而events成员保持不变,下此调用poll时无须重置pollfd类型的事件集参数。
- poll和select类似,每次调用都返回整个用户注册的事件集合(包括就绪的和未就绪的),应用程序索引就绪文件描述符的时间复杂度为O(n)。而epoll是在内核中维护一个事件表,epoll_wait的events参数返回就绪的事件,时间复杂度为O(1).
- poll和epoll_wait分别用nfds和maxevents参数指定最多监听多少个文件描述符和事件个数,即65535(cat/proc/sys/fs/file-max)。而select允许监听的最大文件描述符个数为1024.
- poll只能工作在相对低效的LT模式(电平触发),而epoll可工作在ET高效模式(边沿触发)。
- poll采用轮询方式,即每次调用都要扫描整个注册文件描述符集合,并将其中就绪的文件描述符返回个用户,因此检测就绪事件的时间复杂度是O(n)。epoll则采用回调方式。内核检测到就绪的文件描述符,将触发回调函数,回调函数将该文件描述符上对应的事件插入内核就绪事件队列。内核最后将该就绪事件队列的内容拷贝到用户空间。时间复杂度为O(1).
- 在sys_poll中的循环中,建立链表,然后调用copy_from_user将文件描述符从用户空间拷贝到内核空间。也就是把用户态的struct pollfd拷进进这些条目中。通常用户程序的poll调用就监控几个fd,所以上面这个链表通常也就只需要一个节点,即操作系统的一页。但是,当用户传入的fd很多时,由于poll系统调用每次要把所有的struct pollfd拷进进内核,所以参数传递和页分配此时就成了poll系统调用的性能瓶颈。
- 在函数do_poll()中的循环语句中,直到计数大于0才跳出循环。而count 主要是依靠do_pollfd函数处理。当用户传入的FD很多时(比如1000个),对do_pollfd就会调用很多次,调查效率出现瓶颈。