Redis AE事件库源码剖析

很久之前就看过AE库了,不过这东西好久不看,也忘得差不多了,正好要找工作了,简历上写了之前的项目用过这个AE事件库,索性我就又把AE库看了一遍,本来表达能力就差,复习下吧。

先上张图,下图左侧是ae.c文件里所有的函数,一目了然。

AE事件库包含文件事件和时间事件,其包含2个源文件ae.c和ae.h,当然还有四个不同的多用复用封装的文件:ae_epoll.c、ae_select.c、ae_evport.c、ae_kqueue.c。一般常用的是ae_select.c(跨平台用),linux下多用ae_epoll.c。我们就以ae_epoll.c为例吧。

我们先说说文中用到的几个结构体:

aeEventLoop 核心数据结构,每个aeEventLoop对应一个event loop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* 现在注册的最大文件描述符 */
int setsize; /* 跟踪文件描述符的最大数量 */
long long timeEventNextId; //下一个timer的id
time_t lastTime; /* 用来诊断系统时间偏差 */
aeFileEvent *events; /* 用于保存epoll需要关注的文件事件的fd、触发条件、注册函数 */
aeFiredEvent *fired; /* poll_wait之后获得可读或者可写的fd数组,通过aeFiredEvent->fd再定位到events*/
aeTimeEvent *timeEventHead; //以链表形式保存多个时间事件,每隔一段时间机会触发注册的函数
int stop; //停止运行的标志
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep; //阻塞地等待下一个事件发生之前要执行的函数
} aeEventLoop;

aeFileEvent是文件事件结构体,mask是要捕获的事件的掩码(读|写),rfileProc和wfileProc分别指处理文件读和写事件的函数,clientData是函数用的参数。

1
2
3
4
5
6
7
8
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

aeFiredEvent指的已准备好的文件事件的结构体,包括fd(文件描述符)和mask(掩码):

1
2
3
4
5
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;

aeTimeEvent 是时间事件用到的结构体,id是指的这个定时器的id,when_sec、when_ms分别指的秒和毫秒,timeproc是时间处理函数,finalizerProc是定时器被删除的时候的回调函数,next指的下一个结点,定时器事件是一个链表形式存储的各个时间事件,无序的。

1
2
3
4
5
6
7
8
9
10
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;

下面这个结构体用在epoll创建,aeCreateApi()里使用:

1
2
3
4
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;

下面来说说程序流程吧,再说aeMain()函数之前,我先说下一些准备动作吧,分别为aeCreateEventLoop()、aeCreateTimeEvent()、aeCreateFileEvent()和aeSetBeforeSleepProc()。 aeCreateEventLoop()创建一个aeEventLoop结构体,做些初始化,之后调用函数aeApiCreate(),如上述结构体,epfd由epoll_create创建,*events 保存epoll_wait返回的事件组。 aeCreateFileEvent()为fd注册一个文件事件,使用epoll_ctl加入到全局的epoll fd 进行监控,之后再指定事件可读写处理函数。 aeCreateTimeEvent()注册一个时间事件,使用aeAddMillisecondsToNow()加入时间事件。 beforesleep 就是个回调函数,sleep之前调用的函数,有些事情是每次循环必须做的,并非文件和时间事件。

1
2
3
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}

下面说说核心循环函数aeMain()吧:

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

由stop来决定是否停止,beforesleep可以决定它是否停止,但redis里beforesleep就是一个实现,做的工作只是保证aeProcessEvents要用到的fd都准备好。aeProcessEvents一般会阻塞的(有例外)等待事件(时间和文件事件)的发生,然后做一些处理,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;

if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;

/* Calculate the time missing for the nearest
* timer to fire. */
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}

numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;

/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}

注意参数flags,当为AE_DONT_WAIT时就表示是不阻塞的检查关注的事件是否发生了,如果没发生则直接返回0。
aeProcessEvents处理过程如下:
首先通过调用aeSearchNearestTimer找到离现在最近的定时器是什么时候,这个地方其实做的不好,不如libevent的定时器,需要O(N)得到最短时间,可以改为最小堆保存时间,然后将得到的时间作为参数传给aeApiPoll,作为其timeout参数,来返回命中的文件事件数目,查看eventLoop->fired,可以取出命中事件的详细信息,然后调用rfileProc或wfileProc做事件读写处理,最后再调用processTimeEvents处理已经触发了的定时器事件,然后返回所有已处理的事件(文件和时间)总和。
最后,参考链接里的那个Redis执行流程图还是挺有借鉴意义的,图片太大,我就不转过来了,参考链接在最下方。

PS:边看边写,写到此处竟然又晚上两点多了,现在作息时间越来越乱,等拿到offer后,需要调整下生物钟了。

参考链接:http://my.oschina.net/qichang/blog/32469