tinyhttpd源码剖析

喊了几天学习Web开发,为了毕业论文,今晚上计划也是看看CSS呢,但是实在是没忍住,读了下经典的tinyhttp的源代码,这款代码还是颇有名气的,网上这么评论的:

tinyhttpd是一个超轻量型Http Server,使用C语言开发,全部代码只有500、600行,附带一个简单的Client,可以通过阅读这段代码理解一个Http Server的本质。

其实,代码颇简单,适合刚学习Web Server的童鞋学习,因为我之前写过CGI Server,所以,我还是认为此代码写的一般,并且我在Ubuntu下编译遇到了不少错误,我都懒得写详细分析了,所以随便写下吧,后面的Github地址里有详细的分析。

源码下载地址:http://sourceforge.net/projects/tinyhttpd/

make编译后会有不少错误和警告,我这里说下怎么改正错误:

  1. Makefile文件里的:gcc -W -Wall -lsocket -lpthread -o httpd httpd.c ,修改为:
    gcc -W -Wall -o httpd httpd.c -lpthread

  2. 481行的 int client_name_len 改为 socklen_t client_name_len

  3. 436行 改动与上面相似,改为socklen_t类型即可。

  4. 34行改为void accept_request(void ); 所以下面的实现也要修改下:

1
2
3
4
5
6
7
8
9
void *accept_request(void* client1)
{
int client = *(int *)client1;
char buf[1024];
// 省略
// 同时注意此函数77 和129行改为return NULL;
// 497行改为
if (pthread_create(&newthread , NULL, accept_request, (void*)client_sock) != 0)

之后再make,程序就OK了。

简单说下程序的逻辑吧:

一个无限循环,一个请求,创建一个线程,之后线程处理函数处理每个请求,然后解析HTTP请求,然后做一些判断处理,之后判断文件是否可执行,不可执行,打开文件,输出给客户端(浏览器)呗,可执行就创建管道,父子进程通信。

整个流程就这么简单,程序主要处理2种HTTP请求方式:GET和POST,懒得说了,上传两张图片,自己分析吧,图片原始出处不清楚,电脑里存了好久了,。

GET:

POST:

其实这个源码里有一个地方比较难懂,就是那个解析HTTP每一行的那个get_line函数里的recv的MSG_PEEK参数,详细解释可以参考此链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 读取socket,判断换行,CRLF标志,之后以"\n"换行,并在字符串后添加'\0'
int get_line(int sock, char *buf, int size)
{
int i = 0;
char c = '\0';
int n;
while ((i < size - 1) && (c != '\n'))
{
// 一个字符一个字符的读取
n = recv(sock, &c, 1, 0);
/* DEBUG printf("%02X\n", c); */
if (n > 0)
{
if (c == '\r')
{
/*
* 注意MSG_PEEK参数,表示TCP Buffer不删除之前队列数据,从队列里接收数据
*/
n = recv(sock, &c, 1, MSG_PEEK);
/* DEBUG printf("%02X\n", c); */
if ((n > 0) && (c == '\n'))
recv(sock, &c, 1, 0);
else
c = '\n';
}
buf[i] = c;
i++;
}
else
c = '\n';
}
buf[i] = '\0';
return(i);
}

这代码,没啥写头,很多功能都没实现,请求只实现了GET和POST,Header里只用了第一行,CGI里全局变量只定义了几个,并且我验证程序,发现CGI功能好像是有些问题的,但是因为我CGI水平比较水,懒得检测原因了,总体来说,程序比我之前的那个CGI Server要简单些,功能要稍微弱些吧。

下面放出我Github里的详细中文注释,欢迎指正,谢谢:

https://github.com/armsword/Source/tree/master/tinyhttpd

Webbench源码剖析

我们知道知名的Web网站压力测试工具有Webbench、ab、http_load、siege等等,这种工具的源码都不是太长,所以,我用了一下午和晚上时间仔细了分析了Webbench的源码,并且写下这篇博客记录下。

我们先看下一般Webbench是怎么做压力测试的吧,方法很简单,如下所示:

1
2
3
4
5
6
7
8
9
10
#模拟200次请求,持续时间5秒的压力测试 -c 后为并发数, -t 后为持续时间
[imlinuxer@imlinuxer webbench-1.5]# webbench -c 200 -t 5 http://localhost/index.php
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.
Benchmarking: GET http://localhost/index.php
200 clients, running 5 sec.
Speed=156804 pages/min, 128496336 bytes/sec.
Requests: 13067 susceed, 0 failed.

那Webbench的原理是怎么样的?其实也是很简单的,就是根据提供的参数构造HTTP请求Header,然后使用fork,创建指定大小(webbench提供的参数-c 后的数字,上文为200)个子进程,每个子进程利用socket创建TCP连接到URL,然后通过管道向父进程发送数据,父进程通过管道读取子进程的数据,并作累计,输出即可。其简单流程图如下图所示:


下面简单说下源码吧,源码很短,不到600行,两个文件,分别为socket.c和webbench.c。先说下socket.c,代码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int Socket(const char *host, int clientPort)
{
int sock;
unsigned long inaddr;
struct sockaddr_in ad;
struct hostent *hp;
memset(&ad, 0, sizeof(ad));
ad.sin_family = AF_INET;
// 将字符串转换为32位二进制网络字节序的IPv4地址
inaddr = inet_addr(host);
if (inaddr != INADDR_NONE)
memcpy(&ad.sin_addr, &inaddr, sizeof(inaddr));
else
{
// 使用域名或主机名获取ip地址
hp = gethostbyname(host);
if (hp == NULL)
return -1;
memcpy(&ad.sin_addr, hp->h_addr, hp->h_length);
}
ad.sin_port = htons(clientPort);
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
return sock;
if (connect(sock, (struct sockaddr *)&ad, sizeof(ad)) < 0)
return -1;
return sock;
}

自己封装了个socket模块,只需要注意的就是URL可能不是域名,也可能是IP地址。

然后就是webbench.c文件,咱们从main函数说起,因为需要对命令行做处理,所以使用了getopt_long函数,没使用这个函数的同学可以man getopt_long或者查看在线文件man 。注意下此结构体为getopt_long的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 长选项,getopt_long的参数
static const struct option long_options[]=
{
{"force",no_argument,&force,1},
{"reload",no_argument,&force_reload,1},
{"time",required_argument,NULL,'t'},
{"help",no_argument,NULL,'?'},
{"http09",no_argument,NULL,'9'},
{"http10",no_argument,NULL,'1'},
{"http11",no_argument,NULL,'2'},
{"get",no_argument,&method,METHOD_GET},
{"head",no_argument,&method,METHOD_HEAD},
{"options",no_argument,&method,METHOD_OPTIONS},
{"trace",no_argument,&method,METHOD_TRACE},
{"version",no_argument,NULL,'V'},
{"proxy",required_argument,NULL,'p'},
{"clients",required_argument,NULL,'c'},
{NULL,0,NULL,0}
};

之后是buildr_equest()函数,主要功能是构造HTTP头请求,构造成下面类似情况,具体可以参考代码:

1
2
3
4
GET / HTTP/1.1
Host: www.baidu.com
Cache-Control: max-age=0
Pragma: no-cache

之后便是打印压力测试的一些信息,没啥可说的,很容易读懂,之后到了return bench()处,bench函数是压力测试的核心代码。
bench里根据并发数,使用fork()创建子进程,子进程调用benchcore()函数,此函数里使用alarm和sigaction信号控制定时时间,alarm函数设置了多少时间之后产生SIGALRM信号,一旦产生此信号,运行alarm_handler函数,使得timerexpired等于1,这样可以通过判断timerexpired值来退出程序。然后子进程将数据写入管道。同时父进程读取管道数据,将数据进行累加,当全部读取完子进程后,父进程输出信息退出。

总体来说,Webbench代码还是很好读懂的,当然此代码也存在一些问题,比如不支持POST请求方式,只能模拟单个IP测试等等。
宿舍马上要熄灯了,写的比较着急,可能逻辑混乱些。我Github里对此源码做了非常详细的源码剖析。感兴趣的同学请查看详细的源码剖析吧。

链接地址:
https://github.com/armsword/Source/tree/master/webbench-1.5

Redis AE事件库源码剖析

很久之前就看过AE库了,不过这东西好久不看,也忘得差不多了,正好要找工作了,简历上写了之前的项目用过这个AE事件库,索性我就又把AE库看了一遍,本来表达能力就差,复习下吧。

先上张图,下图左侧是ae.c文件里所有的函数,一目了然。

AE事件库包含文件事件和时间事件,其包含2个源文件ae.c和ae.h,当然还有四个不同的多用复用封装的文件:ae_epoll.c、ae_select.c、ae_evport.c、ae_kqueue.c。一般常用的是ae_select.c(跨平台用),linux下多用ae_epoll.c。我们就以ae_epoll.c为例吧。

我们先说说文中用到的几个结构体:

aeEventLoop 核心数据结构,每个aeEventLoop对应一个event loop。

1
2
3
4
5
6
7
8
9
10
11
12
13
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* 现在注册的最大文件描述符 */
int setsize; /* 跟踪文件描述符的最大数量 */
long long timeEventNextId; //下一个timer的id
time_t lastTime; /* 用来诊断系统时间偏差 */
aeFileEvent *events; /* 用于保存epoll需要关注的文件事件的fd、触发条件、注册函数 */
aeFiredEvent *fired; /* poll_wait之后获得可读或者可写的fd数组,通过aeFiredEvent->fd再定位到events*/
aeTimeEvent *timeEventHead; //以链表形式保存多个时间事件,每隔一段时间机会触发注册的函数
int stop; //停止运行的标志
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep; //阻塞地等待下一个事件发生之前要执行的函数
} aeEventLoop;

aeFileEvent是文件事件结构体,mask是要捕获的事件的掩码(读|写),rfileProc和wfileProc分别指处理文件读和写事件的函数,clientData是函数用的参数。

1
2
3
4
5
6
7
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

aeFiredEvent指的已准备好的文件事件的结构体,包括fd(文件描述符)和mask(掩码):

1
2
3
4
5
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;

aeTimeEvent 是时间事件用到的结构体,id是指的这个定时器的id,when_sec、when_ms分别指的秒和毫秒,timeproc是时间处理函数,finalizerProc是定时器被删除的时候的回调函数,next指的下一个结点,定时器事件是一个链表形式存储的各个时间事件,无序的。

1
2
3
4
5
6
7
8
9
10
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;

下面这个结构体用在epoll创建,aeCreateApi()里使用:

1
2
3
4
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;

下面来说说程序流程吧,再说aeMain()函数之前,我先说下一些准备动作吧,分别为aeCreateEventLoop()、aeCreateTimeEvent()、aeCreateFileEvent()和aeSetBeforeSleepProc()。 aeCreateEventLoop()创建一个aeEventLoop结构体,做些初始化,之后调用函数aeApiCreate(),如上述结构体,epfd由epoll_create创建,*events 保存epoll_wait返回的事件组。 aeCreateFileEvent()为fd注册一个文件事件,使用epoll_ctl加入到全局的epoll fd 进行监控,之后再指定事件可读写处理函数。 aeCreateTimeEvent()注册一个时间事件,使用aeAddMillisecondsToNow()加入时间事件。 beforesleep 就是个回调函数,sleep之前调用的函数,有些事情是每次循环必须做的,并非文件和时间事件。

1
2
3
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}

下面说说核心循环函数aeMain()吧:

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

由stop来决定是否停止,beforesleep可以决定它是否停止,但redis里beforesleep就是一个实现,做的工作只是保证aeProcessEvents要用到的fd都准备好。aeProcessEvents一般会阻塞的(有例外)等待事件(时间和文件事件)的发生,然后做一些处理,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}

注意参数flags,当为AE_DONT_WAIT时就表示是不阻塞的检查关注的事件是否发生了,如果没发生则直接返回0。
aeProcessEvents处理过程如下:
首先通过调用aeSearchNearestTimer找到离现在最近的定时器是什么时候,这个地方其实做的不好,不如libevent的定时器,需要O(N)得到最短时间,可以改为最小堆保存时间,然后将得到的时间作为参数传给aeApiPoll,作为其timeout参数,来返回命中的文件事件数目,查看eventLoop->fired,可以取出命中事件的详细信息,然后调用rfileProc或wfileProc做事件读写处理,最后再调用processTimeEvents处理已经触发了的定时器事件,然后返回所有已处理的事件(文件和时间)总和。
最后,参考链接里的那个Redis执行流程图还是挺有借鉴意义的,图片太大,我就不转过来了,参考链接在最下方。

PS:边看边写,写到此处竟然又晚上两点多了,现在作息时间越来越乱,等拿到offer后,需要调整下生物钟了。

参考链接:http://my.oschina.net/qichang/blog/32469

一致性哈希算法

一致性hash算法在分布式系统里面使用比较广,我也只是听说过这个概念,今天看到一个一致性hash开源库,随手写篇文章,记录下吧。

假如我们使用普通的哈希方式来处理负载均衡,到目标机器的映射使用的算法为:

hash(o) mod n (n表示机器的总数)

如果需要增加或减少一台机器的时候,算法就变为 hash(o) mod (n + 1) / hash(o) mod (n - 1)

不仅需要重新编写代码,当服务器做了大量变更,大量的o会重定向不同的服务器而导致了缓冲不被命中,所以此时,就提出了一致性hash算法。

一致性哈希算法:

简单的说,哈希函数将o映射到 0 ~ 2^32 - 1的值区间,将这些数字头尾相连,然后将它们组织成一个闭合的圆环。如图一所示:

要实现一致性哈希算法,思路就是下面2个步骤:

将数据通过一定的hash算法映射到环上:


hash(object1) = key1;


hash(object4) = key4;

将机器通过哈希算法映射到环上


假如有三台cache机器,A,B,C,一般情况下hash计算是采用机器的IP或者机器唯一的别名作为输入值。

hash(cache A) = key A;


hash(cache B) = key C;

然后顺时针方向计算,将所有object(数据)存储到离自己最近的机器上。
如上图所示:object1将存储到 cache A,object2和object3将会存储到 cache C,object4将会存储到 cache B。

添加或删除机器:

假如B 机器被移走或者坏掉了,根据顺时针存储到最近的机器的特性,之前存储到B机器的objects(数据)将会转到C机器(因为顺时针离C最近),如下所示:

同样添加了新机器也是同理。

虚拟节点:

一致性哈希算法在服务节点太少或不均匀时,容易造成数据倾斜问题,导致平衡性问题。hash算法不能保证平衡的,在一致性哈希算法中,为了尽可能的满足平衡性,引入了虚拟节点。

虚拟节点是实际节点(机器)在hash空间的复制品,一个实际节点对应了若干了虚拟节点,当我们增加一个机器时,我们在环里增加几个其虚拟节点,当我们移掉它时,我们把其虚拟节点从环中一块移除掉。实际上就是对一个机器节点计算多次哈希,每个计算结果位置都放置一个此服务节点,成为虚拟节点,具体做法可以在机器IP或主机名的后面添加编号来实现,如Node1-1,Node1-2。

例如,有两台机器A和C,引入了虚拟节点后,就会有4个节点,分布更均匀,如下图所示所示:

写了一个简单的CGI Server

之前看过一些开源程序的源码,也略微知道些Apache的CGI处理程序架构,于是用了一段时间,用C写了一个简单的CGI Server,代码算上头文件,一共1200行左右,难度中等偏上。麻雀虽小,五脏俱全。如果把程序里所涉及的HTTP协议,Linux下POSIX编程等等搞清楚,我想找工作中肯定是有足够的竞争力的,当然我也只是皮毛而已,不再班门弄斧了,下面简单的说下程序流程吧。

再说程序流程之前,我先简单说下CGI吧,CGI这个东西比较老了,N年之前,没有PHP JSP等等动态脚本之前,这个是非常火的。只要能支持输入输出的程序,都可以编写CGI脚本,比如Apache就集成了CGI服务器功能,你可以使用Python编写简单的CGI脚本,可以参照前几天我发的文章,点击此处,现在CGI 脚本基本上使用的比较少了,但是在一些对效率要求比较高的设计里还是可以看到一些身影,比如腾讯的QQ相册,QQ空间里就可以看到(你可以打开QQ空间,查看源码-查找CGI),他们的脚本应该是C/C++编写的,效率,你懂的,如果你想更多的了解CGI脚本相关,可以点击此处,推荐了解一些,但是不值得深入学习,那个精力不如学习PHP/Python了。。。

其实说白了,CGI 扮演的就是在服务器和特定解释器之间输入输出协议的角色,每个来自用户的请求,Web服务器都会唤起特定语言解释器的命令行,执行CGI脚本。

现在来介绍下此CGI Server,我不想长篇大论了,一幅图来描述其执行过程(字比较丑,凑合看吧),具体看代码吧:

如果你比较理解HTTP协议,大部分都没有难点,中文URL编码这个可以看上一篇文章,可能唯一的难点是管道部分,这个使用了经典的fork-exece模型,关于管道,一张图能很好的解释:

不过程序中使用了双管道,读写模型,如果你对此不理解,可以翻阅《Unix环境高级编程》。
如果你对HTTP不太了解,参考链接里的两篇文章,能帮助你更好的了解HTTP协议,其实HTTP1.1里面的难点,理解以下几句话就足够了:

http协议规定头必然有2个连续的”\r\n”,就像Cookie后面就跟了2个\r\n,所以读取请求头的时候只要读到\r\n\r\n,那么前面就是头,后面就是实体。实体大小在上面有一个Content-Length标记。所以从\r\n\r\n后面读Content-Length大小后就结束了。所以服务端一般是1、判断状态码,http协议规定1xx 204 304肯定不包括实体,所以读到\r\n\r\n就不用再读;2、判断没有Content-Length;3、判断有没有chunked。有Content-Length直接读,而chunked的最后7个一定是\r\n0\r\n\r\n。

chunked编码,这种编码一般是gzip压缩的

HTTP/1.1 200 OK 
Cache-Control: private 
Content-Type: text/html; charset=utf-8 
Server: Microsoft-IIS/7.5
X-AspNetMvc-Version: 2.0
X-AspNet-Version: 4.0.30319 
Set-Cookie: Set-Cookie: X-Powered-By: ASP.NET 
ntCoent-Length: 166137
Content-Encoding: gzip 
Transfer-Encoding: chunked 



2D23 
...........}.s.G.......j....*u......y....%...;QO.M.[....3..,...
.!..O....H-."v..>.............=YY 

 
在处理chunked时候,像上面chunked\r\n\r\n后面是实体,第一行2D23就是一chunk的大小所以在2D23\r\n后面开始读2D23个然后会紧跟着\r\n,然后后面就是下一chunk的大小,直到最后一chunk是0大小。实体结束,最后再来一个\r\n。也就是说chunked的最后7个一定是\r\n0\r\n\r\n,本来判断读到\r\n0\r\n\r\n就结束应该没问题,但是为了保险起见,还是一次一次的读大小值再读取指定的大小数据。

另外,此代码里有部分缺陷,一是我急于完成,某些结构设计不合理,所以可扩展性比较差。第二,有少部分内存泄漏,懒得修改了。毕竟,我也得忙着找工作,时间比较紧张,但是总的来说,如果你想找后台开发相关的职位,如果你没有更好的项目经验,此代码如果你能读懂,写到简历上,面试官肯定会问你的。

能力和精力有限,希望能对你们有所帮助 ^-^

注:学生时代的作品,此代码已废弃,等我有空做个更好的 ^_^

参考链接:

1.http://www.cnblogs.com/TankXiao/archive/2012/02/13/2342672.html

2.http://www.cnblogs.com/li0803/archive/2008/11/03/1324746.html

3.http://blog.csdn.net/laotse/article/details/5903651

Redis里dict源码剖析

Redis全名叫做Remote Dictionary Server,从全称就可以看出dict是Redis里重要的数据结构,看了下dict.c里的源码,简单分析下。

dict.c主要是实现了哈希功能,实际上实现哈希(字典)常见的方法有几种:

<1>比如我们数据结构课上学的数组和链表法,但是这种方法适用于元素个数不多的情况下

<2>使用复杂的平衡树(B-等等),性能比较不错,比如MySQL里的索引就使用了这种方法

<3>哈希表,兼顾了高效和简单性,Redis选择了这种方法。

我们就解读下dict.c里源码里的数据结构吧,我们知道hash表的性能由hash表的大小和解决冲突的方法决定。Redis里使用了两个哈希表(ht[0])和哈希表(ht[1]),hash[0]是字典主要使用的hash表,而hash[1]主要对0号哈希表进行rehash时才使用。读者可能不明白这个地方啥意思?其实就是,程序每次申请几个字节(默认为4字节),当key/value数量达到规定时,程序申请的4字节就翻一倍(这里使用了慢迁移),那什么时候rehash呢?我们知道size == used(根据下面的dictht结构体)时,达到最理想状态,所以当used/size > 1时,就进行rehash。我们先看看dict.h里定义的几个结构体吧:

dict:

1
2
3
4
5
6
7
typedef struct dict {
dictType *type; //哈希表节点指针数组(俗称桶,bucket)hash表的类型,可以是string,list等
void *privdata; //该hash表的一些private数据
dictht ht[2];
int rehashidx; /* rehashidx记录的实际上是rehash进行到的索引,比如如果rehash进行到第10个元素,那么rehashidx的值就为9。如果没有在进行rehash,rehashidex的值就为-1.*/
int iterators; /* number of iterators currently running */
} dict;

dictType:

1
2
3
4
5
6
7
8
9
//每种hash table的类型,里面既有成员函数,又有成员变量,模拟的C++类,每个函数带有的privdata均为预留参数
typedef struct dictType {
unsigned int (*hashFunction)(const void *key); //要采用的hash函数
void *(*keyDup)(void *privdata, const void *key); //对key进行拷贝
void *(*valDup)(void *privdata, const void *obj); //对value进行拷贝
int (*keyCompare)(void *privdata, const void *key1, const void *key2); //key比较器
void (*keyDestructor)(void *privdata, void *key); //销毁key,一般为释放空间
void (*valDestructor)(void *privdata, void *obj); //销毁value,一般为释放空间
} dictType;

dictht:

1
2
3
4
5
6
7
8
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table; //hash表中的数据,以key/value形式,通过单链表保存
unsigned long size; //桶个数
unsigned long sizemask; //size - 1,方便定位
unsigned long used; //实际保存的元素数
} dictht;

dictEntry:

1
2
3
4
5
6
7
8
9
10
//hash表中每一项key/value,若key的映射值,以单链表的形式保存
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
} v;
struct dictEntry *next;
} dictEntry;

dict.h一共有这四种结构,每个结构体里的每个变量的用途,我都已经注释出来。

根据程序分析出整个字典结构如图所示(此时的状态为dictadd,且未出现rehash和rehash也未在进行中):

我们之后再说下程序的执行流程吧:

首先是dict初始化,init_hash_dict –> dictCreate –> _dictInit –> _dictRest

上面流程代码都很简单,就是把dict结构体和dictht里面的数据都初始化。

然后是添加键值到字典,这个地方比较难懂,也是重点,因为它包括的操作比较多,分为三种情况:

<1>如果字典为未初始化(也即字典的0号哈希表的table属性为空),那么程序需要对0号哈希表进行初始化。

<2>如果在插入时发生了键碰撞,处理碰撞,链表法

<3>如果插入新元素使得字典满足了rehash条件,那么启动相应的rehash程序(used / size > 1)。

这里有个需要注意的地方,如果rehash正在进行中(ht[0]数据–>ht[1]),那么选择ht[1]作为新键值对的添加目标,否则ht[0]作为新键值对的添加目标。

本来想长篇大论呢,但是学校网速真的是太卡了,不写了,看看其他的吧。

参考资料:

《Redis设计与实现》

Letter Combinations of a Phone Number

由于这个月还没写博客,找道题应付下吧,主要原因是自己现在太忙了,得准备找工作啊,唉,弱的惨不忍睹。好吧,不抱怨了,进入正题,LeetCode上面的一道题,题目描述为:

Given a digit string, return all possible letter combinations that the number could represent.

A mapping of digit to letters (just like on the telephone buttons) is given below.

Input:Digit string “23”
Output: [“ad”, “ae”, “af”, “bd”, “be”, “bf”, “cd”, “ce”, “cf”].

Note:

Although the above answer is in lexicographical order, your answer could be in any order you want.

其实就是组合问题,如下图所示:

如图所示,遍历完 1,2,3,就把a,d (g,h,i)遍历完了,之后回溯到e,就是个DFS搞定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <iostream>;
#include <vector>;
#include <string>;
using namespace std;
class Solution
{
public:
vector&lt;string&gt; letterCombinations(string digits)
{
int len = digits.size();
result.clear();
num[2] = &quot;abc&quot;;
num[3] = &quot;def&quot;;
num[4] = &quot;ghi&quot;;
num[5] = &quot;jkl&quot;;
num[6] = &quot;mno&quot;;
num[7] = &quot;pqrs&quot;;
num[8] = &quot;tuv&quot;;
num[9] = &quot;wxyz&quot;;
solve(digits,0,len);
return result;
}
void solve(string &amp;digits,int i,int len)
{
if(i == len)
{
str[len] = '&#92;&#48;';
string temp = str;
result.push_back(temp);
return;
}
unsigned int j;
int index = digits[i] - '0';
for(j = 0; j &lt; num[index].size(); j++)
{
str[i] = num[index][j];
solve(digits,i+1,len);
}
}
private:
vector&lt;string&gt; result;
string num[10];
char str[1000];
};
int main()
{
Solution solution;
string p;
p = &quot;23&quot;;
vector&lt;string&gt; p1;
p1 = solution.letterCombinations(p);
vector&lt;string&gt;::iterator iter;
for(iter=p1.begin();iter != p1.end();iter++)
{
cout&lt;&lt;*iter&lt;&lt;endl;
}
return 0;
}

最近开始集中精力做OJ了,唯一的感觉就是自己好弱,算法准备的有点晚了,有点找不上工作的节奏。算法也不是一朝一夕就能提高的,自己好好努力吧,即使实习拿不到dream offer,到正式找工作(9月份)还有时间努力,集中精力弥补自己弱点就可以了。

使用python编写简单网络爬虫技巧总结

使用python写过几个小玩具,基本上都与爬虫、Web相关的,自己也积累了下经验,也看过很多文章,一直想总结下,可惜现在要忙于找工作,实验室活也比较多,也就落下了。感觉如果时间长不记录,也就懒散了,并且许多东西我写完过个两天,我自己都记不住当时怎么想的了。

0、HTTP协议

基本上常见的Web开发里,Web内容都是通过HTTP协议进行传输的(虽然咱不懂Web开发,但是基本的计算机网络知识还是了解的),通过TCP连接服务器的80端口,爬虫其实质就是通过模拟浏览器发送HTTP请求,至于HTTP请求相关知识,点击这里

1
2
3
4
HTTP通常通过创建到服务器80端口的TCP连接进行通信
HTTP协议的内容包括请求方式(method), urlheaderbody,通常以纯文本方式发送
HTTP返回内容包括状态码,headerbody,通常以纯文本方式返回
header以及body间以CRLF(\r\n)分割

1、最基础的抓取网站内容

使用python编写一个网络爬虫是非常简单的,如下例所示:

1
2
import urllib2
content = urllib2.urlopen('http://armsword.com').read()

但是实际工作中,这样肯定是不行的,你会遇到各种各样的问题,如下:

  • 网络出现错误,任何错误都可能。例如机器宕了,网线断了,域名出错了,网络超时了,页面没有了,网站跳转了,服务被禁了,主机负载不够了…
  • 服务器加上了限制,只让常见浏览器访问
  • 需要登录才能抓取数据
  • IP被封掉或者IP访问次数受到限制
  • 服务器加上了防盗链的限制
  • 某些网站不管你HTTP请求里有没有Accept-Encoding头部,也不管你头部具体内容是什么,反正总给你发gzip后的内容
  • URL链接千奇百怪,带汉字的也罢了,有的甚至还有回车换行
  • 某些网站HTTP头部里有一个Content-Type,网页里有好几个Content-Type,更过分的是,各个Content-Type还不一样,最过分的是,这些Content-Type可能都不是正文里使用的Content-Type,从而导致乱码
  • 需要抓取的数据很多,怎么提高抓取效率
  • 网站内容通过AJAX请求加载的
  • 需要验证码识别
    至于怎么解决上述问题,我们一一道来。

2、网络或网站出现错误等处理

之前我搞过百度音乐的爬虫,因为当时百度音乐有个bug,就是可以任何人都下载百度音乐的无损、高品质音乐,我就写了个爬虫,开了几十个进程抓取下载地址(抓包解析JSON串数据)存储到数据库里,虽然百度无ip访问限制这些麻烦事,但是程序无任何错误处理的情况下,依然出现各自异常错误,所以我们只需要捕捉异常,处理异常就可以了。我们可以根据urlopen的返回值,读取它的HTTP状态码。除此之外,我们在urlopen的时候,也需要设置timeout参数,以保证处理好超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import urllib2
import socket
try:
f = urllib2.urlopen('http://armsword', timeout = 10)
code = f.getcode()
if code < 200 or code >= 300:
#你自己的HTTP错误处理
except Exception, e:
if isinstance(e, urllib2.HTTPError):
print 'http error: {0}'.format(e.code)
elif isinstance(e, urllib2.URLError) and isinstance(e.reason, socket.timeout):
print 'url error: socket timeout {0}'.format(e.__str__())
else:
print 'error: ' + e.__str__()

3、服务器做了限制

3.1、防盗连限制

某些站点有所谓的反盗链设置,其实说穿了很简单,就是检查你发送请求的header里面,referer站点是不是他自己,我们只要把headers的referer改成该网站即可,以V2EX的领取每日奖励地址为例:

1
2
3
4
header = {'Referer':'http://www.v2ex.com/signin'}
req = urllib2.Request('http://www.v2ex.com',headers = header)
response = urllib2.urlopen(url = req,timeout = 10).read
#response = urllib2.urlopen(req)).read()

3.2、限制浏览器登录,需伪装成浏览器

某些网站做了限制,进制爬虫的访问,此时我们可以更改HTTP的header,与3.1相似:

1
2
3
header = {&quot;User-Agent&quot;:&quot;Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.36
(KHTML, like Gecko) Ubuntu Chromium/31.0.1650.63 Chrome/31.0.1650.63 Safari/537.36&quot;}
req = urllib2.Request(url,headers=header)

3.3、IP访问次数受到限制

如果IP被封掉、或者访问次数受到限制,使用代理服务器比较有用,代理IP可以自己抓取一些代理网站的数据:

1
2
3
4
5
import urllib2
proxy_ip = urllib2.ProxyHandler({'http':'http://XX.XX.XX.XX:XXXX'}) #XX为代理IP和端口
opener = urllib2.build_opener(proxy_ip, urllib2.HTTPHandler)
urllib2.install_opener(opener)
content = urllib2.urlopen('http://XXXX.com').read()

4、需要登录才能抓取数据

4.1、表单的处理

抓包,Chrome或者Firefox+Httpfox浏览器插件就能办到,查看POST数据,模拟表单就可以了,这里不再细说,这里主要注意,在GET/POST一些数据的时候,需要对GET/POST的数据做些编码,即使用urlencode(),我们以北邮校园网关登录为例:

1
2
3
4
5
6
7
8
9
10
postdata = {
'DDDDD': uname, #用户名
'upass': u_pass, #密码
'R1': 0, #其他参数
'R2': 1,
'para': 00,
'0MKKey': 123456
}
en_url = urllib.urlencode(postdata)

4.2、Cookie处理

1
2
3
4
5
6
7
8
9
#获取一个cookie对象
cookie = cookielib.CookieJar()
#构建cookie处理器
cookie_p = urllib2.HTTPCookieProcessor(cookie)
#装载cookie
opener = urllib2.build_opener(cookie_p)
#注意此处后面的install_opener(opener),安装不同的opener对象作为urlopen() 使用的全局URL opener
urllib2.install_opener(opener)
content = urllib2.urlopen('http://armsword.com').read()

如果同时需要代理和Cookie,这样就可以:

1
opener = urllib2.build_opener(proxy_ip, cookie_p, urllib2.HTTPHandler)

如果上面未使用install_opener(opener),则使用:

1
response = opener.open(url).read()

以V2EX每日签到为例,把上面的流程走一遍:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#全局变量装载Cookie省略
def login():
'''
once值每次登录都不一样,在页面上可以看到
也必需有http header
'''
req = urllib2.Request(url_login)
once = get_info(req,'name','once')['value'] #省略
postdata = {
'u':username,
'p':password,
'once':once,
'next':"/"
}
header = {'Host':'www.v2ex.com','Origin':'http://www.v2ex.com',
'Referer':'http://www.v2ex.com/signin',
'User-Agent':&quot;Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.36
(KHTML, like Gecko) Ubuntu Chromium/32.0.1700.107 Chrome/32.0.1700.107 Safari/537.36&quot;}
data = urllib.urlencode(postdata)
req = urllib2.Request(url_login,data,header)
response = opener.open(req)

5、gzip/deflate支持

某些网站不管你请求头带不带Accept-Encoding:gzip,它返回的都是gzip压缩的内容,urlopen不会处理gzip压缩的内容,所以得到的就是乱码,以下为解决方法:

1
2
3
4
5
6
7
8
9
10
11
f = urllib2.urlopen(url)
headers = f.info()
rawdata = f.read()
if ('Content-Encoding' in headers and headers['Content-Encoding']) or \
('content-encoding' in headers and headers['content-encoding']):
import gzip
import StringIO
data = StringIO.StringIO(rawdata)
gz = gzip.GzipFile(fileobj=data)
rawdata = gz.read()
gz.close()

6、URL编码和网页编码处理
如果URL里含有中文,要对相应的中文进行编码,可以使用urllib.quote(‘要编码的字符串’),如下所示:

1
2
3
4
http://www.google.com/#newwindow=1&amp;q=你好
query = urllib.quote('你好')
url = 'http://www.google.com/#newwindow=1&amp;q='+query
response = urllib.urlopen(url).read()

至于处理网页编码,按照一般浏览器的处理流程,判断网页的编码首先是根据HTTP服务器发过来的HTTP响应头部中Content-Type字段,例如text/html; charset=utf-8就表示这是一个HTML网页,使用的是utf8编码。如果HTTP头部中没有,或者网页内容的head区中有charset属性或者其http-equiv属性为Content-Type的meta元素,如果有charset属性,则直接读取这个属性,如果是后者,则读取这个元素的content属性。但是实际情况是许多网页不按照规矩出牌,首先,HTTP响应里不一定有Content-Type头部,其次某些网页里可能没有Content-Type或者有多个Content-Type(例如百度搜索的缓存页面)。所以,我的经验基本是自己多猜测几次吧。其实GBK、UTF-8占了绝大部分,使用xx.decode(‘gbk’,’ignore’).encode(‘utf-8’) (或相反)试试,其中ignore是指忽略非法字符。

7、AJAX

AJAX 是一种用于创建快速动态网页的技术。
通过在后台与服务器进行少量数据交换,AJAX 可以使网页实现异步更新。这意味着可以在不重新加载整个网页的情况下,对网页的某部分进行更新。
传统的网页(不使用 AJAX)如果需要更新内容,必需重载整个网页面。

AJAX依然是HTTP请求,只是来自页面发起的另一个HTTP请求。我之前做百度音乐的时候,在chrome下的network选项分析每一个选项,找到对应的ajax数据,分析json,在正则、查找之类的。

当然,也可以通过selenium、phantomjs、mechanize等模拟浏览器引擎构建一个真实的浏览器执行JS、渲染页面(在写这篇文章时,我还没用到过这些),这应该算是终极大招了吧。

8、效率

目前使用过多线程、多进程,多进程注意下僵尸进程。

异步:用twisted进行异步I/O抓取,tornado(如果拿到实习offer后,我可能会抽出几天时间学习这个,我感觉我应该学习下web编程,纠结了下flask还是tornado,其实无论哪个一周完全可以入门了,python你懂的。)

9、验证码

图像识别方面的知识,好吧,等我学会后再写吧。。。

根据我的经验,网络爬虫里网络带宽是主要瓶颈,上文只说了抓取,其实网页抽取依然有很多知识要做的,我现在会的都是些皮毛,要学的还很多,等拿到dream 公司的实习offer后,有空继续折腾下,现在主要精力是弥补自己的一些知识缺点和面试长考的问题。

参考链接:

http://blog.binux.me/2013/09/howto-crawl-web/

http://blog.raphaelzhang.com/2012/03/issues-in-python-crawler/

http://yxmhero1989.blog.163.com/blog/static/112157956201311821444664/

Linux IO复用—select poll 和 epoll

在Socket编程时,为了处理大量客户的连接请求,需要使用非阻塞I/O和端口复用,select、poll和epoll是Linux API提供的I/O复用方式。其实在*nix下的网络并发方法向来不缺,比如典型的Apache模型(Process Per Connection,简称PPC),TPC(Thread Per Connection)模型,这两种模型思想类似,就是利用了多进程、多线程概念,让进来的每一个连接去干别的事情去。但是连接多了以后,首先需要较大的内存,且进程/线程切换开销会非常大,因此这类模型能接受的最大连接数都不会太高。

Linux 2.6中加入了epoll之后(据说Windows下使用的是IOCP,但是我没使用过),在高性能服务器领域中得到广泛的应用,主要原因就是高效。在讲epoll之前,我们先总结下select、poll,因为epoll其实也就是他们的增强版本,比如select是一个系统调用,而epoll是个模块,由三个系统调用组成,内核中由文件系统实现。

select

select的第一个参数为fdset集合中最大描述符值加1,select对应于内核中的sys_select调用,sys_select首先将第二三四个参数指向的fd_set拷贝到内核,然后对每个被SET的描述符进行poll,并记录在临时结果中(fdset),如果有事件发生,select会将临时结果写到用户空间并返回,当轮询一遍后没有任何事件发生,如果指定了超时时间,则select会睡眠到超时,睡眠结束后再进行下一次轮询,并将临时结果写到用户空间,然后返回。select返回后,需要逐一检查关注的描述符是否被SET(事件是否发生)。

int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
1
2
3
4
5
6
参数                  描述
nfds             sets的文件描述符的最大值
readfds          fd_set type 类型,只读的描述符集
writefds         fd_set type 类型,只写的描述符集
errorfds         fd_set type 类型,错误的描述符集
timeout          超时等待时间

为了维护fd_set类型的参数,会使用下面四个宏:FD_SET(), FD_CLR(), FD_ZERO() 和 FD_ISSET()。

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct fd_set
{
u_int fd_count;
int fd_array[FD_SETSIZE];
}
//fd_array可SIZE*8个socket
int FD_ISSET(int fd,fd_set *fdset);
//返回值:若fd在描述符集中则返回非0值,否则返回0
void FD_CLR(int fd,fd_set *fdset); //fd指描述符
void FD_SET(int fd,fd_set *fdset);
void FD_ZERO(fd_set *fdset);

select 函数监视的文件描述符分3类,分别是writefds,readfds、和exceptfds。调用后 select 函数会阻塞,直到有描述符就绪(有数据可读、可写、或者except)、或者超时(timeout指定等待的时间,timeout== NULL表示永远等待,timeout == 0表示不等待、立即返回,其他表示等待的时间)。当 select 函数返回后,可以通过遍历 fdset ,来找到就绪的描述符。

select 的一个优点就是跨平台,缺点就是单个进程能够监视的文件描述符的数量存在最大限制,linux下一般为1024,Windows下好像无此限制,虽然可以修改这一限制,但是这样也会造成效率低下。

运行过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int sock;
FILE *fp;
struct fd_set fds;
struct timeval timeout={3,0}; //select等待3秒,3秒轮询,要非阻塞就置0
char buffer[256]={0}; //256字节的接收缓冲区
//假定已经建立UDP连接,具体过程不写,简单,当然TCP也同理,主机ip和port都已经给定,要写的文件已经打开
sock=socket(...);
bind(...);
fp=fopen(...);
while(1)
{
FD_ZERO(&amp;fds); //每次循环都要清空集合,否则不能检测描述符变化
FD_SET(sock,&amp;fds); //添加描述符
FD_SET(fp,&amp;fds); //同上
maxfdp=sock&gt;fp?sock+1:fp+1; //描述符最大值加1
// for(int i =0 ;i &lt; maxfds; i++) if(FD_ISSET()) { }
switch(select(maxfdp,&amp;fds,&amp;fds,NULL,&amp;timeout)) //select使用
{
case -1: exit(-1);break; //select错误,退出程序
case 0:break; //再次轮询
default:
if(FD_ISSET(sock,&amp;fds)) //测试sock是否可读,即是否网络上有数据
{
recvfrom(sock,buffer,256,.....);//接受网络数据
if(FD_ISSET(fp,&amp;fds)) //测试文件是否可写
fwrite(fp,buffer...);//写入文件
......
}
}
}

注意:每次select 有数据要遍历全部socket,每次select之前要重置fds的值。

poll

poll函数类似于 select,可用于任何类型的文件描述符,与 select 不同,poll不是为每个状态(可读性、可写性和异常状态)构造一个描述符集,而是构造一个pollfd 结构数组向内核传递需要关注的事件,故没有描述符个数的限制,每个数组元素指定一个描述符编号以及对其所关心的状态,pollfd中的events字段和revents字段分别用于标示关注的事件和发生的事件。

poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,poll返回后,需要对pollfd中的每个元素检查其revents值,来判断事件是否发生。

返回值:

-1:有错误产生
0:超时时间到,而且没有描述符有状态变化
>0:有状态变化的描述符个数

1
int poll(struct pollfd fdarray[],nfds_t nfds,int timeout);
1
2
3
4
5
struct pollfd{
int fd; //需要检查的文件描述符
short events; //等待的需要测试事件
short revents; //实际发生了的事件,也就是返回结果
};

应将每个数组元素的events成员设置为下图所示的值。通过这些值告诉内核我们对该描述符关心的是什么。返回时,内核设置revents成员,以说明对于该描述符已经发生了什么事件。(注意,poll没有更改events成员,这与select不同,select修改其参数以指示哪一个描述符已准备好了。)

poll的events和revents标志:


timeout == -1 永远等待。当所指定的描述符中的一个已准备好,或捕捉到一个信号时则返回。如果捕捉到一个信号,则poll返回-1,errno设置为EINTR timeout == 0 不等待 timeout > 0 等待timeout毫秒,如果已超时但是还没有一个描述符准备好,则返回值是0。
运行过程(与select相似):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
struct pollfd fds[IN_FILES];
char buf[MAX_BUFFER_SIZE];
int i,res,real_read, maxfd;
fds[0].fd = 0;
if((fds[1].fd=open(&quot;data1&quot;,O_RDONLY|O_NONBLOCK)) &lt; 0)
{
fprintf(stderr,&quot;open data1 error:%s&quot;,strerror(errno));
return 1;
}
if((fds[2].fd=open(&quot;data2&quot;,O_RDONLY|O_NONBLOCK)) &lt; 0)
{
fprintf(stderr,&quot;open data2 error:%s&quot;,strerror(errno));
return 1;
}
for (i = 0; i &lt; IN_FILES; i++)
{
fds[i].events = POLLIN;
}
while(fds[0].events || fds[1].events || fds[2].events)
{
if (poll(fds, IN_FILES, TIME_DELAY) &lt;= 0)
{
printf(&quot;Poll error\n&quot;);
return 1;
}
for (i = 0; i&lt; IN_FILES; i++)
{
if (fds[i].revents)
{
memset(buf, 0, MAX_BUFFER_SIZE);
real_read = read(fds[i].fd, buf, MAX_BUFFER_SIZE);
if (real_read &lt; 0)
{
if (errno != EAGAIN)
{
return 1;
}
}
else if (!real_read)
{
close(fds[i].fd);
fds[i].events = 0;
......
};

epoll

epoll通过epoll_create创建一个用于epoll轮询的描述符,通过epoll_ctl添加/修改/删除事件,通过epoll_wait 检查事件,epoll_wait 的第二个参数用于存放结果。

epoll与select、poll不同,首先,其不用每次调用都向内核拷贝事件描述信息,在第一次调用后,事件信息就会与对应的epoll描述符关联起来。另外epoll不是通过轮询,而是通过在等待的描述符上注册回调函数,当事件发生时,回调函数负责把发生的事件存储在就绪事件链表中,最后写到用户空间。

epoll返回后,该参数指向的缓冲区中即为发生的事件,对缓冲区中每个元素进行处理即可,而不需要像poll、select那样进行轮询检查。

系统调用:

1
2
//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。
int epoll_create(int size);
1
2
3
4
5
6
7
8
/*
epoll的事件注册函数,它与select()是在监听事件时告诉内核要监听什么类型的事件不同,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事。
*/
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

struct epoll_event结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event
{
__uint32_t events; //Epoll events
epoll_data_t data; //User data variable
};

`
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

1
2
3
4
/*
等待事件的产生,类似于select()调用,参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法是永久阻塞),该函数返回需要处理的事件数目
*/
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

运行过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
for (n = 0; n &lt; nfds; ++n) {
if (events[n].data.fd == listener) { // 如果是主socket的事件的话,则表示
// 有新连接进入了,进行新连接的处理。
client = accept(listener, (struct sockaddr *) &amp;local, &amp;addrlen);
if (client &lt; 0){
perror(&quot;accept&quot;);
continue;
}
setnonblocking(client); // 将新连接置于非阻塞模式
ev.events = EPOLLIN | EPOLLET; // 并且将新连接也加入EPOLL的监听队列。
// 注意,这里的参数EPOLLIN | EPOLLET并没有设置对写socket的监听,
// 如果有写操作的话,这个时候epoll是不会返回事件的,如果要对写操作
// 也监听的话,应该是EPOLLIN | EPOLLOUT | EPOLLET
ev.data.fd = client;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &amp;ev) &lt; 0) {
// 设置好event之后,将这个新的event通过epoll_ctl加入到epoll的监听队列里面,
// 这里用EPOLL_CTL_ADD来加一个新的epoll事件,通过EPOLL_CTL_DEL来减少一个
// epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式。
fprintf(stderr, &quot;epoll set insertion error: fd=%d0, client);
return -1;
}
} else if (event[n].events &amp; EPOLLIN) { // 如果是已经连接的用户,并且收到数据,
// 那么进行读入
int sockfd_r;
if ((sockfd_r = event[n].data.fd) &lt; 0)
continue;
read(sockfd_r, buffer, MAXSIZE);
// 修改sockfd_r上要处理的事件为EPOLLOUT
ev.data.fd = sockfd_r;
ev.events = EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &amp;ev)
} else if (event[n].events &amp; EPOLLOUT) { // 如果有数据发送
int sockfd_w = events[n].data.fd;
write(sockfd_w, buffer, sizeof(buffer));
// 修改sockfd_w上要处理的事件为EPOLLIN
ev.data.fd = sockfd_w;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &amp;ev)
}
do_use_fd(events[n].data.fd);
}

可简单归结为:

1
2
3
4
5
6
7
8
int fd = epoll_create(xxA); //xxA可监听的socket
struct epoll_event events[xxxB];//可返回的事件数
while(1){
int nfds = epoll_wait( ); //wait event occur
for(int i=0; i&lt;nfds; i++){
…. }//end for
}//end while

epoll_wait返回的都是有效数据,可直接从struct epoll_events[]中获取事件,效率高。每次取事件后,要重新注册此socket的事件epoll(epoll_ctl)。

参考资料:

《UNIX环境高级编程》

http://zh.wikipedia.org/wiki/Select_(Unix)

http://zh.wikipedia.org/wiki/Epoll

http://www.cnblogs.com/xuxm2007/archive/2011/08/15/2139809.html

http://www.cnblogs.com/bigwangdi/p/3182958.html

记录下传输层协议TCP和UDP的一些特性

在TCP/IP中能够实现传输层功能的、具有代表性的协议是TCP和UDP。

TCP

TCP是面向连接的、可靠的流协议。流就是指不间断的数据结构,你可以把它想象成排水道中的水流。当应用程序采用TCP发送消息时,虽然可以保证发送的顺序,但还是犹如没有任何间隔的数据流发送给接收端。TCP充分地实现了数据传输时各种控制功能,可以进行丢包时的重发控制,还可以对次序乱掉的分包进行顺序控制。而这些在UDP中都没有。此外,TCP作为一种面向有连接的协议,只有在确认通信对端存在时才会发送数据,从而可以控制通信流量的浪费。根据TCP的这些机制,在IP这种无连接的网络上也能够实现高可靠性的通信。

TCP为提供可靠性传输,实行“顺序控制”或“重发控制”机制。此外还具备“流控制(流量控制)”、“拥塞控制”、提供网络利用率等众多功能。

TCP通过检验和、序列号、确认应答、重发控制、连接管理以及窗口控制等机制实现可靠性传输。

通过序列号与确认应答提高可靠性

在TCP中,当发送端的数据到达接收主机时,接收端主机会返回一个已收到消息的通知。这个消息叫做确认应答(ACK)。

TCP通过肯定的确认应答(ACK)实现可靠的数据传输。当发送端将数据发出之后会等待对端的确认应答。如果有确认应答,说明数据已经成功到达对端。反之,则数据丢失的可能性很大。在一定时间内没有等到确认应答,发送端就可以认为数据已经丢失,并进行重发。由此,即使产生了丢包,仍然能够保证数据能够到达对端,实现可靠传输。未收到确认应答并不意味着数据一定丢失。也有可能是数据对方已经收到,只是返回的确认应答在途中丢失。这种情况也会导致发送端因没有收到确认应答,而认为数据没有到达目的地,从而进行重新发发送。

上述这些确认应答处理、重发控制以及重复控制等功能都可以通过序列号实现。序列号是按顺序给发送数据的每一个字节(8位字节)都标上号码的编号。接收端查询接收数据TCP首部中的序列号和数据的长度,将自己下一步应该接收的序号作为确认应答返送回去。就这样,通过序列号和确认应答号,TCP可以实现可靠传输。

重发超时如何确实

重发超时是指在重发数据之前,等待确认应答到来的那个特定时间间隔。如果超过了这个时间仍未收到确认应答,发送端将进行数据重发。那么这个重发超时的具体时间长度又是如何确定的呢?

最理想的是,找到一个最小时间,它能保证“确认应答一定能在这个时间内返回”。然而这个时间长短随着数据包途径的网络环境的不同而有所变化。例如在高速的LAN中时间相对较短,而在长距离的通信当中应该比LAN要长一些。即使是在同一个网络中,根据不同时段的网络堵塞程度时间的长短也会发生变化。TCP要求不论处在何种网络环境下都要提供高性能通信,并且无论网络拥堵情况发生何种变化,都必须保持这一特性。为此,它在每次发包时都会计算往返时间及其偏差。将这个往返时间和偏差相加重发超时的时间,就是比这个总和要稍大一点的值。

在BSD的Unix以及Windows系统中,超时都以0.5秒为单位进行控制,因此重发超时都是0.5秒的整数倍。不过,由于最初的数据包还不知道往返时间,所以其重发超时一般设置为6秒左右。数据被重发之后若还是收不到确认应答,则进行再次发送。此时,等待确认应答的时间将会以2倍、4倍的指数函数延长。此外,数据也不会被无限、反复地重发。达到一定重发次数之后,如果仍没有任何确认应答返回,就会判断为网络或对端主机发生了异常,强制关闭连接。并且通知应用通信异常强行终止。

连接管理

UDP是一种面向无连接的通信协议,因此不检查对端是否可以通信,直接将UDP包发送出去。TCP与此相反,它会在数据通信之前,通过TCP首部发送一个SYN包作为建立连接的请求等待确认应答。如果对端发来确认应答,则认为可以进行数据通信。如果对端的确认应答未能到达,就不会进行数据通信。此外,在通信结束时会进行断开连接的处理(FIN包)。

可以使用TCP首部用于控制的字段来管理TCP连接。一个连接的建立与断开,正常过程至少需要来回发送7个包才能完成。

TCP以段为单位发送数据

在建立TCP连接的同时,也可以确认发送数据包的单位,我们也可以称其为“最大消息长度”(MSS:Maximum Segment Size)。最理想的情况是,最大消息长度正好是IP中不会被分片处理的最大数据长度。

TCP在传送大量数据时,是以MSS的大小将数据进行分割发送。进行重发时也是以MSS为单位。

MSS是在三次握手的时候,在两端主机之间被计算得出。两端的主机在发出建立连接的请求时,会在TCP首部中写入MSS选项,告诉对方自己的接口能够适应MSS的大小。然后会在两者之间选择一个较小的值投入使用。

利用窗口控制提高速度

TCP以1个段为单位,每发一个段进行一次确认应答的处理,这样的传输方式有一个缺点,那就是,包的往返时间越长通信性能就越低。

为解决这个问题,TCP引入了窗口这个概念。即使在往返时间较长的情况下,它也能控制网络性能的下降。确认应答不再是以每个分段,而是以更大的单位进行确认时,转发时间将会大幅度的缩短。也就是说,发送端主机,在发送了一个段以后不必要一直等待确认应答,而是继续发送。

窗口大小就是无需等待确认应答而可以继续发送数据的最大值,如上,窗口大小为4个段。这个机制实现了使用大量的缓冲区,通过对多个段同时进行确认应答的功能。
收到确认应答的情况下,将窗口滑动到确认应答中的序列号的位置。这样可以顺序地将多个段同时发送提高通信性能。这种机制也被成为滑动窗口控制。

流控制

发送端根据自己的实际情况发送数据。但是,接收端可能收到的是一个毫无关系的数据包又可能会在处理其他问题上花费一些时间。因此在为这个数据包做其他处理时会耗费一些时间,甚至在高负荷的情况下无法接收任何数据。如此一来,如果接收端将本应该接收的数据丢弃的话,就又会触发重发机制,从而导致网络流量的无端浪费。

为了防止这种现象的发生,TCP提供一种机制可以让发送端根据接收端的实际接收能力控制发送的数据量。这就是所谓的流控制。它的具体操作是,接收端主机向发送端主机通知自己可以接收数据的大小,于是发送端会发送不超过这个限度的数据。该大小限度就被称作窗口大小。

TCP首部中,专门有一个字段用来通知窗口大小。接收主机将自己可以接收的缓冲区大小放入这个字段中通知给发送端。这个字段的值越大,说明网络的吞吐量越高。

不过,接收端的这个缓冲区一旦面临数据溢出时,窗口大小的值也会随之被设置为一个更小的值通知给发送端,从而控制数据发送量。也就是说,发送端主机会根据接收端主机的指示,对发送数据的量进行控制。这也就形成了一个完整的TCP流控制(流量控制)。

拥塞控制

有了TCP的窗口控制,收发主机之间即使不再以一个数据段为单位发送确认应答,也能够连续发送大量数据包。然而,如果在通信刚开始时就发送大量数据,也可能会引发其他问题。在网络出现堵塞时,如果突然发送一个较大量的数据,极有可能导致整个网络的瘫痪。TCP为了防止该问题的出现,在通信一开始时就会通过一个叫做慢启动的算法得出的数值,对发送数据量进行控制。

首先,为了在发送端调节所要发送数据的量,定义了一个叫做“拥塞窗口”的概念。于是在慢启动的时候,将这个拥塞窗口的大小设置为1个数据段(1MSS)发送数据,之后每收到一次确认应答(ACK),拥塞窗口的值就加1.在发送数据包时,将拥塞窗口的大小与接收端主机通知的窗口大小做比较,然后按照它们当中较小的那个值,发送比其还要小的数据量。

提高网络利用率的规范

Nagle算法:

TCP中为了提高网络的利用率,经常使用一个叫做Nagle的算法。该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送的一种处理机制。具体来说,就是仅在下列任意一种条件下才能发送数据。如果两个条件都不满足,那么暂时等待一段时间以后再进行数据发送。

1>已发送的数据都已经收到确认应答时

2>可以发送最大段长度(MSS)的数据时

延迟确认应答:

当某个接收端收到这个小窗口的通知以后,会以它为上限发送数据,从而又降低了网络的利用率。为此,引入了一个方法,那就是收到数据以后并不立即返回确认应答,而是延迟一段时间的机制。

捎带应答:

捎带应答是指在同一个TCP包中既发送数据又发送确认应答的一种机制。由此,网络的利用率会提高,计算机的负荷也会减轻。不过,确认应答必须得等到应用处理完数据并将作为回执的数据返回为止,才能进行捎带应答。

UDP:

UDP是User Datagram Protocol的缩写。UDP是不具有可靠性的数据报协议。细微的处理它会交给上层的应用去完成。在UDP的情况下,虽然可以确保发送消息的大小,却不能保证消息一定会到达。因此,应用有时会根据自己的需要进行重发处理。

UDP不提供复杂的控制机制,利用IP提供面向无连接的通信服务。并且它是将应用程序发来的数据在收到的那一刻,立即按照原样发送到网络上的一种机制。

即使是出现网络堵塞的情况下,UDP也无法进行流量控制等避免网络拥塞的行为。此外,传输途中即使出现丢包,UDP也不负责重发。甚至当出现包的到达顺序乱掉时也没有纠正的功能。如果需要这些细节控制,那么不得不交由采用UDP的应用程序去处理。UDP有点类似于用户说什么听什么的机制,但是需要用户充分考虑好上层协议类型并制作相应的应用程序。因此,也可以说,UDP按照“制造程序的那些用户的指示行事”。

由于UDP面向无连接,它可以随时发送数据。再加上UDP本身的处理既简单又高效,因此经常用于以下几个方面:

1>包总量较少的通信(DNS、SNMP等)

2>视频、音频等多媒体通信(即时通信)

3>限定于LAN等特定网络中的应用通信

4>广播通信(广播、多播)

可能有人会认为,鉴于TCP是可靠的传输协议,那么它一定优于UDP。其实不然。TCP与UDP的优缺点无法简单地、绝对地去做比较。那么,对这两种协议应该如何加以区分使用呢?下面,我就对此问题做一简单说明。

TCP用于在传输层有必要实现可靠的情况。由于它是面向有连接并具备顺序控制、重发控制等机制的,所以它可以为应用提供可靠传输。

而UDP主要用于那些对高速传输和实时性有较高要求的通信或广播通信。我们拿通过IP电话进行通话作为例子。如果使用TCP,数据在传送途中如果丢失会重发,但这样无法刘畅地传输通话人的声音,会导致无法进行正常交流。而采用UDP,它不会进行重发处理。从而也就不会有声音大幅度延迟到达的问题。即使有部分数据丢失,也只是会影响某一小部分的通话。此外,在多播与广播通信中也使用UDP而不是TCP。RIP、DHCP等基于广播的协议也要依赖于UDP。因此,TCP和UDP应该根据应用的目的按需使用。