dlog:高性能线程安全的C++日志库

打算闲暇时光写点靠谱的东西比如名字服务、KV存储等基础服务类软件,所以周末就先从封装一些基础库开始。上周末写了个C++日志库,支持多线程,性能也还不错,今天就做了下测试和修改了下bug。这里说下该库的使用方法,以及实现思路以及一些性能调优方法。欢迎交流和指教。

源码地址

https://github.com/armsword/dlog

编译

执行build.sh,在上层目录里会生成build文件,测试的可执行文件在release/bin目录下。

使用方法

将etc文件下的dlog.json扔到可执行文件的当前目录(当然只要能让可执行文件找到dlog.json即可),每个函数包含logger文件夹下的Log.h头文件,在主函数里调用DLOG_INIT初始化一次,之后在每个需要打印log的文件里调用DLOG_LOG即可。
如实例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <dlog/logger/Log.h>
#include <unistd.h>
using namespace dlog::logger;
int main() {
DLOG_INIT();
DLOG_LOG(WARN, "test the log level using log lib!");
DLOG_LOG(DEBUG, "Hello World, %d",2016);
DLOG_LOG(INFO, "test Log C/C++ lib");
DLOG_LOG(ERROR, "Hello everyone, this is my blog: http://armsword.com/; Welcome to visit it,Thank you!");
return 0;
}

输出样例

目录名:dlog.log.20160911.0
日志内容:

1
2
3
4
[2016-09-11 15:05:30.510] [WARN] [4246443808] [/home/Github/dlog/example/LogTest.cpp:8] [main] test the log level using log lib!
[2016-09-11 15:05:30.510] [DEBUG] [4246443808] [/home/Github/dlog/example/LogTest.cpp:9] [main] Hello World, 2016
[2016-09-11 15:05:30.510] [INFO] [4246443808] [/home/Github/dlog/example/LogTest.cpp:10] [main] test Log C/C++ lib
[2016-09-11 15:05:30.510] [ERROR] [4246443808] [/home/Github/dlog/example/LogTest.cpp:11] [main] Hello everyone, this is my blog: http://armsword.com/; Welcome to visit it,Thank you!

该日志库功能

  • 包含四种日志级别,分别为WARN、DEBUG、INFO、ERROR,日志级别大小依次递增
  • 可配置输出日志路径
  • 可配置输出日志前缀
  • 可定义输出的日志级别,默认DEBUG
  • 可定义日志文件切分大小
  • 支持多线程程序
  • 可定义日志往磁盘刷新的方式
  • 支持每天切换新的日志文件
  • 支持log文件被删除时,从新建立日志文件

dlog.json配置

1
2
3
4
5
6
7
{
"log_path": "./log", // 日志路径
"log_prefix": "dlog", // 日志前缀
"log_level": "DEBUG", // 输出日志级别,DEBUG表示大于等于DEBUG级别的日志都打印
"max_file_size": 200, // 日志切分大小,单位m
"async_flush": true // 日志往磁盘刷新方式,true表示异步,false表示同步,建议选择true
}

性能

在测试机上测试了下机器的硬盘真实io写速度

1
2
3
4
5
6
time dd if=/dev/zero of=test.dbf bs=8k count=300000 oflag=direct
记录了300000+0 的读入
记录了300000+0 的写出
2457600000字节(2.5 GB)已复制,38.0385 秒,64.6 MB/秒
注:oflag=direct 表示使用DirectIO方式,不使用文件系统的buffer等

而我用dlog写入3.5G(1000W条数据),用时大约51、52s,计算下来,写速度大约69M/s,与上面测试的磁盘io数据64.6 MB/s比起来,性能还算不错(因为cache问题,要比裸写磁盘性能要好),当然由于时间问题,我测试还不够充分,以后有机会继续优化下再测试看下。

一些性能调优技巧

  • 为了避免锁竞争,使用了一种更为高效的线程局部存储方法,就是使用关键字__thread来定义变量,__thread是GCC内置的线程局部存储设施(Thread-Local Storage),凡是带有__thread的变量,每个线程都拥有该变量的一份拷贝,且互不干扰。
  • 使用likely,unlikely来提高CPU分支预测正确率来提高性能。我们定义likely、unlikely如下:
    1
    2
    #define likely(x) __builtin_expect((x), 1)
    #define unlikely(x) __builtin_expect((x), 0)

其中__builtin_expect是gcc提供的函数。顾名思义,likely表示这件事很大概率会发生 :)

  • 使用loop线程来判断文件是否需要切分(打开新的fd),并且open、close这种费时操作,只在loop线程里完成,不阻塞log输出线程(一些小技巧保证线程安全)。

本日志库主要是为了满足个人需求,当然即使公司业务,没特殊需求的话也足够用了,所以并没有写的像log4j或者log4cxx那么复杂,但即便如此该日志库还有一些提高空间,等我有空再继续优化下,欢迎交流和指教,谢谢。

致谢:

因为很久没用Cmake了(之前在阿里用ascons),基本上都忘光了。并且阿里的一些经验使我比较在意代码目录的组织方式,搜索后发现一篇文章讲Cmake非常不错,作者很用心,非常感谢。

Cmake入门实战

Earlybird:Twitter实时搜索引擎

简单说下,2011年左右的Paper,Paper里说Twitter的实时索引和检索系统叫做Earlybird,Paper中主要讲了2件事,第一个就是支持Twitter的实时索引的倒排索引结构是怎么样的,第二个就是利用Java并发模型,处理并发读写。

该引擎功能:

  • 低延时、高吞吐能力;
  • 能处理突发峰值(Weibo的特性);
  • 突出时效性,时间越近,排名应该越靠前;
  • 该实时引擎支持AND、OR、NOT以及短语查询,实时性大约10S,查询latency 50ms;

该实时引擎,基于Lucene,使用Java开发,理由是:

  • 利用已存在的Lucene代码,并用来做全量索引;
  • 适用于Twitter以JVM为中心的开发环境;
  • 利用Java和JVM提供容易理解的并发模型;

其实上面几条理由与阿里很多开发项目类似,但是阿里的搜索引擎是C++编写的,质量也是非常不错的,叫问天(HA3)。

索引构建过程:

Earlybird处理三种信号:

  • 静态信号,初次建立索引时被添加;
  • 共鸣信号,动态更新的,比如tweet转发次数;
  • 搜索用户信息,用来个性化排序;

  1. 用户发布tweet,之后被发送到一个队列,叫做Ingestion Pipeline里。这些tweet会被分词,并被加上一些meta信息,比如语言等;
  2. 为了处理大容量,tweet按照哈希算法,分发到各个Earlybird服务上,之后将tweet实时地建立索引;
  3. 如上图所示,有一个Updater服务,它推送共鸣信号到Earlybird服务,用于动态地更新索引,比如tweet转发信息等。

查询过程如下:

  1. 当查询请求过来,先到达前端“Blender”,Blender解析请求,并将搜索用户的本地社交图谱信息也下发到Earlybird服务;
  2. Earlybird服务执行相关性计算并排序,并将排序好的tweet(排序分数高的,最近发生的tweets)返回给Blender;
  3. Blender合并各个Earlybird返回的列表,并重排序后返回给用户;

Term字典

Twitter实时索引现在不支持一些高级查询,比如通配符。所以term词典使用hash表实现。没有选择Java默认的HashMap,因为其不是GC友好的,Earlybird实现了个开链法的哈希表(个人猜测是不是与Redis的渐进式哈希相似呢?)每个Term被分配了一个唯一且单调递增的id作为key,value包含以下两部分信息:

  • Term对应的倒排索引数据长度;
  • 指向倒排索引数据末尾的指针;

动态(active)索引

Paper里首先介绍了现在存在的索引组织方式不适合实时索引,如将新发生的tweet放到postings链最后面,但是读取时候需要倒序读取,这个是不支持现在流行的压缩算法的,而如果将新的数据放到倒排索引前面,这带来的问题就是内存分配的问题。Earlybird使用了一个更简单的方式,分离索引读和写的方式。

每个实例维护了多个索引分段(目前是12个instance),每个分段保存相对较少量的tweet(目前是2^23~840万 tweets)。新增加的tweets被分到同一个segment(分段)中,满了之后再放下一个,这样,在任何时候只有一个分段被更改,其他都是只读状态,当一个segment满了,停止接收新的tweets时,会做只读优化。

postings列表在优化之前直接使用整数数组,按照docid升序存储,并且不做压缩,因此查找可以直接二分查找。postings增长时分配空间以pool为单位,分配空间按指数预留空间,分为4个,大小分别为应2^1、2^4、2^7、2^11 。如果一个pool满了,另一个pool将会分配。每个pool里面有若干个slice,每个slice保存一个term的postings列表,每个postings列表里面存储着一个term的postings信息,每个postings是一个32-bit的整数,24-bit用作文档id;8-bit用作位置id,保存term在tweet中出现的位置。因为tweet有140字符限制,所以8-bit足够用。在建立索引时,先尝试填满2^1pool中的slice,如果填满,就转到2^4的pool的slice,以此类推。term 字典中保存term对应的最后一个pool的中postings list尾部指针。如下图所示:

索引优化

一旦动态索引停止接收新的tweets时,即此segment的tweets超过840W时,后台执行只读索引优化。优化过程中,根据老的索引数据,新的索引数据会被创建,原始索引数据不做变化,一旦完成构建,原始索引数据将被新的索引替换。postings链分为两种,长的和短的,以长度1000为界限。短的,即postings链长度小于1000,postings保持原样(24-bit文档id加上8-bit的位置信息),但是postings会按照时间逆序排列。对于长的postings链,采用了基于block的压缩算法,PForDelta和Simple9。

并发管理

对于实时索引来说,一个重要的需求就是在多线程环境下处理好并发读和写操作。当然这个只用于动态索引当中,静态索引(优化后的)只有读操作。因为不懂Java,这块简单说下吧,Paper里说Java和JVM提供了一个非常容易理解的并发内存模型。11个只读段并发读不需要锁,唯一的可读可写段使用volatile关键字实现高效同步,然后也使用了 jvm memory barrier。懂Java得同学,通过以下新tweet添加处理过程,可能就明白一二了。

tweet里的每个term,首先去查询对应的词典入口,在字典中,term被映射到term id。它包括2个信息,Term对应的倒排索引数据长度和指向倒排索引数据末尾的指针。通过末尾的指针,信息被添加到新的postings链中。如果没有足够的空间插入新的postings信息,新的slice被分配。如果term是第一次遇到,它被添加到词典并且分配下一个term id。postings被添加到slice时,term并发的增加计数,指向postings链尾巴的指针也被并发更新。当tweet里的所有term都被处理后,增加maxDoc变量,此变量表示目前遇到的最大文档id。

简单就写这么多吧,没看明白的同学可以自己仔细阅读下twitter的这篇实时索引Paper吧,地址:

http://www.umiacs.umd.edu/~jimmylin/publications/Busch_etal_ICDE2012.pdf

记一次线上Bug的查找过程和思路

前言

简单说下问题情况,Proxy即作为客户端又有服务端功能,其接受QS(Query Server)的请求,之后向BS(索引服务)发送请求,然后根据BS的返回结果Merge后返回给QS。不能泄露太多东西,所以本文主要是整理一些知识点和问题查找思路。

问题和现象:

  • Proxy机器上出现大量的CLOSE_WAIT。
  • Proxy失去服务能力,内存使用不为0,CPU使用率为0。
  • 机器报警,TcpListenOverFlows。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
命令查看:
netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
结果:
TIME_WAIT 31927
CLOSE_WAIT 1570
ESTABLISHED 40
TIME_WAIT:表示主动关闭,通过优化系统内核参数可容易解决。
CLOSE_WAIT:表示被动关闭,需要从程序本身出发。
ESTABLISHED:表示正在通信
注:以上数据非真实线上情况,只为举例

一些知识点

TCP握手图



TIME_WAIT:

Linux系统下,TCP/IP连接断开后,会以TIME_WAIT状态保留一定的时间(2MSL:max segment lifetime),默认为4分钟,然后才会关闭回收资源。当并发请求过多的时候,就会产生大量的 TIME_WAIT状态的连接,无法及时断开的话,会占用大量的端口资源和服务器资源。
状态保持2MSL的原因:
1.防止上一次连接中的包,迷路后重新出现,影响新连接(经过2MSL,上一次连接中所有的重复包都会消失)
2.可靠的关闭TCP连接。在主动关闭方发送的最后一个 ack(fin) ,有可能丢失,这时被动方会重新发fin, 如果这时主动方处于 CLOSED 状态 ,就会响应 rst 而不是 ack。所以主动方要处于 TIME_WAIT 状态,而不能是 CLOSED 。另外这么设计TIME_WAIT 会定时的回收资源,并不会占用很大资源的,除非短时间内接受大量请求或者受到攻击。
一些解决方法:
通过修改/etc/sysctl.conf文件,服务器能够快速回收和重用那些TIME_WAIT的资源 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#表示开启SYN Cookies。当出现SYN等待队列溢出时,启用cookies来处理,可防范少量SYN攻击,默认为0,表示关闭
net.ipv4.tcp_syncookies = 1
#表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭
net.ipv4.tcp_tw_reuse = 1
#表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭
net.ipv4.tcp_tw_recycle = 1
#表示如果套接字由本端要求关闭,这个参数决定了它保持在FIN-WAIT-2状态的时间
net.ipv4.tcp_fin_timeout=30
生效命令:
/sbin/sysctl -p

CLOSE_WAIT:

CLOSE_WAIT表示被动关闭,出现这种问题,基本是就是客户端连接异常或者自己没有迅速回收资源。
什么情况下,连接处于CLOSE_WAIT状态呢?

  • 在被动关闭连接情况下,在已经接收到FIN,但是还没有发送自己的FIN的时刻,连接处于CLOSE_WAIT状态。通常来讲,CLOSE_WAIT状态的持续时间应该很短,正如SYN_RCVD状态。但是在一些特殊情况下,就会出现连接长时间处于CLOSE_WAIT状态的情况。
  • 出现大量close_wait的现象,主要原因是某种情况下对方关闭了socket链接,但是我方忙与读或者写,没有关闭连接(CLOSE_WAIT应该是默认2小时才会关闭)。代码需要判断socket,一旦读到0,断开连接,read返回负,检查一下errno,如果不是AGAIN,就断开连接。
  • scoket阻塞,无超时或非阻塞处理。

EPOLL:

  • EPOLLIN:连接,对端发送普通数据,对端socket正常关闭。
  • EPOLLPRI:带外数据,文件描述符有紧急的数据可读。
  • EPOLLOUT:数据可写。
  • 对端正常关闭(close(),shell下kill或ctr+c),触发EPOLLIN和EPOLLRDHUP,但是不触发EPOLLERR和EPOLLHUP。//重要
  • 对端异常断开连接(如网线),不会触发任何事件。判断方式是向已经断开的socket写或者读,会发生EPOLLERR错误。

socket接口的几个知识点:

  • read总是在接收缓冲区有数据时立即返回,而不是等到给定的read buffer填满时返回。
    只有当receive buffer为空时,blocking模式才会等待,而nonblock模式下会立即返回-1(errno = EAGAIN或EWOULDBLOCK)
  • blocking的write只有在缓冲区足以放下整个buffer时才返回(与blocking read并不相同)
    nonblock write则是返回能够放下的字节数,之后调用则返回-1(errno = EAGAIN或EWOULDBLOCK)
    对于blocking的write有个特例:当write正阻塞等待时对面关闭了socket,则write则会立即将剩余缓冲区填满并返回所写的字节数,再次调用则write失败(connection reset by peer)

Bug查找

通过以上的知识点,我们就可以很好的定位问题原因了,因为Proxy即作为客户端,又有服务端功能,其转发QS的结果到BS之后,再将BS的结果merge后回复给QS。仔细排查代码,发现2处引起Proxy大量出现CLOSE_TIMEWAIT。

<1>、Proxy将BS的结果merge后,发送QS时,socket是阻塞的,也未做超时处理。

<2>、QS超时(3S),会主动关闭socket,而Proxy这层没有关闭fd,可能原作者认为会触发EPOLLERR和EPOLLHUP(这里面做了处理)。

参考链接:

1、http://www.cnblogs.com/promise6522/archive/2012/03/03/2377935.html
2、http://originlee.com/2015/04/22/nonblocking-tcp-socket/

浅谈Linux ulimit以及max memory locked

刚入职,我在索引这个组,需要熟悉下之前的搜索索引模块的代码,以便后续开发。我在新申请的测试机器上编译代码部署索引这个模块就报错了。定位出现问题的地方,代码大致流程是这样子的:

1
2
3
4
5
6
7
8
9
10
if (conf->use_memlock_for_mmap) {
// add MAP_LOCKED to lock the buffer
attr_hash = (AttrIndexInfo*)mmap(NULL, fsize, PROT_WRITE | PROT_READ, MAP_SHARED | MAP_LOCKED, fd, 0);
} else {
attr_hash = (AttrIndexInfo*)mmap(NULL, fsize, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
}

而参数里配置了use_memlock_for_mmap 选项,于是代码就走到第一个mmap逻辑处。

使用ulimit -a查看下系统属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[armsword@search-querybs1 ~]$ ulimit -a
core file size (blocks, -c) 4194304
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 256726
max locked memory (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files (-n) 1000000
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 10240
cpu time (seconds, -t) unlimited
max user processes (-u) 102400
virtual memory (kbytes, -v) unlimited (注:相当于limits.conf中的as和cgroup中的memory.limit_in_bytes.)
file locks (-x) unlimited

我们知道索引程序在启动时候,需要加载倒排/正排等索引原文件,这些文件是比较大的,几十G或者过百G也是可能的,但是系统下的max locked memory对应值是64k,而mmap选项也设置了MAP_LOCKED选项,此选项的意思就是lock一段地址范围内已map的内存,相应的数据在unlock之前一直存在于物理内存中,不会被回收。对于应用程序来说,可以将内存中一些对程序性能影响较大的数据lock起来,避免非预期的页面回收或者换入/换出引起性能波动。所以将max locked memory设置为unlimited就解决问题。

linux系统提供了以下几个系统调用用于内存的lock和unlock。

1
2
3
4
5
mlock/munlock:lock/unlock一段地址范围内已map的内存
mlockall/munlockall:lock进程虚拟地址空间内已map的内存,还可以选择对于此后新map的空间是否自动lock
mmap使用MAP_LOCKED选项时表示在mmap的同时,对相应地址范围进行mlock

一般情况下,我们使用ulimit多用于提高性能,最常用的地方就是设置打开文件描述符的数量,web服务器等需要大量的文件句柄,一旦开太小,比如默认1024,在句柄使用完毕的时候,系统就频繁出现emfile错误,这时候系统很容易陷入不可用。但是如果设定太大了,又会有这样的副作用。很多服务器程序是事件派遣的,比如说用epoll,程序在启动的时候通常会根据最大的文件句柄数来预留内部的slot,一个slot貌似要占用几K的资源,如果你设定文件句柄数目太大,就可能无端的浪费了几百M内存。所以要设置一个合适的值有利于提高程序性能。

ulimit 用于限制 shell 启动进程所占用的资源,支持以下各种类型的限制,具体楼上已列出,注意下H/S选项的意义,

-H 设置硬资源限制,硬资源限制用于控制软限制。限定一旦设置只有root用户可以增加硬限制,普通用户只能减少自己的硬限制大小。
-S 设置弹性资源限制,弹性限制用于限制具体的用户或者进程。设置后普通用户可以增加,但是不能超过硬限制大小。
如果不指定-S或者-H,那么弹性资源限制和硬限制将同时设置。

作为临时限制,ulimit 可以作用于通过使用其命令登录的 shell 会话,在会话终止时便结束限制,并不影响于其他 shell 会话。而对于长期的固定限制,修改 /etc/security/limits.conf 文件即可。

几种Hash算法的实现

哈希被广泛使用在很多领域,如数据存储,加密,计算机视觉(几何哈希),此处就简单整理下几个常见的Hash函数的实现,有空陆续补充吧。

BKDR Hash Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 本算法由于在Brian Kernighan与Dennis Ritchie的《The C Programming Language》一书被展示而得名
// 是一种简单快捷的hash算法。
// 也是Java目前采用的字符串的Hash算法(累乘因子为31)。
// 此哈希函数用的最多
template<class T>
size_t BKDRHash(const T *str)
{
register size_t hash = 0;
while (size_t ch = (size_t)*str++)
{
hash = hash * 131 + ch; // 也可以乘以31、131、1313、13131、131313..
// 可将乘法分解为位运算及加减法可以提高效率
// 如将上式表达为:hash = hash << 7 + hash << 1 + hash + ch;
}
return hash;
}

其中累乘因子也可以为131、1313、13131,比如下述代码就使用了33。

1
2
3
4
5
6
7
unsigned int times33(char *str)
{
unsigned int val = 0;
while (*str)
val = (val << 5) + val + (*str++);
return val;
}

此算法也会有如下变种,如:

1
2
3
4
5
6
7
unsigned int timesnum(char *str, int num)
{
unsigned int val = 0;
while (*str)
val = val * num + (*str++);
return val;
}

SDBM Hash Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 本算法是由于在开源项目SDBM(一种简单的数据库引擎)中被应用而得名
// 它与BKDRHash思想一致,只是种子不同而已。
template<class T>
size_t SDBMHash(const T *str)
{
register size_t hash = 0;
while (size_t ch = (size_t)*str++)
{
hash = 65599 * hash + ch;
//hash = (size_t)ch + (hash << 6) + (hash << 16) - hash;
}
return hash;
}

AP Hash Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Arash Partow发明的一种hash算法
// 比较优秀的一种哈希算法
unsigned int APhash(char *str)
{
unsigned int val = 0;
int i = 0;
for (i = 0; *str; i++)
if ((i & 1) == 0)
val ^= ((val << 7)^(*str++)^(val>>3));
else
val ^= (~((val << 11)^(*str++)^(val>>5)));
return (val & 0x7FFFFFFF);
}

FNV Hash Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Unix system系统中使用的一种著名hash算法,后来微软也在其hash_map中实现。
template<class T>
size_t FNVHash(const T* str)
{
if(!*str)
return 0;
register size_t hash = 2166136261;
while (size_t ch = (size_t)*str++)
{
hash *= 16777619;
hash ^= ch;
}
return hash;
}

其实关于为什么要用异或,我搜索了下原因,见Why is XOR the default way to combine hashes?

MySQL中出现的字符串哈希函数

1
2
3
4
5
6
7
8
9
unsigned int MySQLhash(char *str)
{
register unsigned int nr = 1, nr2 = 4;
while(*str) {
nr ^= (((nr & 63) + nr2)*((unsigned int)*str++)) + (nr << 8);
nr2 += 3;
}
return (unsigned int)nr;
}

参考链接:
spiderq
各种哈希函数冲突率分析

写了一个Chrome插件 - 百度无损音乐下载插件

1、之前是从http://music.baidu.com/ 入手的,现在发现从此处入手已经找不到方法了。由于我又不是太懂JS语法,于是用了几十分钟,没分析到地址放弃。

2、灵光一闪,从http://fm.baidu.com/ 入手,在Chrome下抓包,细心分析后,果然发现蜘丝马迹,如下图所示:

见左图那个灰色的链接,我们打开链接看下,嘿,发现了许多歌曲信息哦,如下图所示:

由于是FM,所以songIDs有一大串,咱们试试构造下,以黑蝙蝠中队为例,其有三种音乐格式,第一种是可以直接下载的,第二三种是收费的:

我们构造下不同格式的音乐试试看,以无损音乐flac格式和921kbps为例:

构造的链接为:http://music.baidu.com/data/music/fmlink?songIds=966991&type=flac&rate=921


将songLink后的地址格式化为正常网页地址,如下所示:
http://yinyueshiting.baidu.com/data2/music/33809115/966991111600921.flac?xcode=a6ed00d30421e9bebbbef0f52f4938299d0fba1db032ff5f
下载下,发现可以正常下载,28.8M,flac格式的无损音乐。

所以,超高/无损音乐下载方法就是:
将此链接的:
http://music.baidu.com/data/music/fmlink?songIds=966991&type=flac&rate=921
songIds、type、rate改完你想下载的音乐就可以了。

如果只想下载无损音乐flac格式的(前提条件是百度下载里包含无损格式),这样就可以了,只需要更改歌曲songIds。
http://music.baidu.com/data/music/fmlink?songIds=966991&type=flac

用了2-3天时间,学了下前端开发方面的基础知识,没怎么看教程,就是想实现某些功能或者遇到问题就去搜索解决,遇到了一些坑,但是收获也挺大的,于是随手写了个Chrome插件,可用来下载百度无损音乐的,比较简单,原理如上。

程序地址

点击下载插件

使用方法

本插件支持Chrome浏览器和UC桌面浏览器,其他Webkit内核的浏览器应该也支持,但是我没做测试。

Linux系统下安装方法

Linux下直接下载此插件,之后解压后,在BDMusicDownloader文件夹下发现一个src.crx文件,将其拖入到Chrome浏览器-设置-扩展程序界面即可,当打开如以 http://music.baidu.com/song/ 开头的链接时,即下载音乐地址,会自动弹出插件,点击下载即可。如图所示:

Windows下安装方法

从程序地址里下载安装插件,解压文件夹,注意,因为Chrome在Win下的安全限制,所以此文件夹不能删除,之后同上所示,但不是拖入src.crx文件,而是选中开发者模式-加载正在开发的插件-BDMusicDownloader,之后选中src文件,打开即可。如图以UC桌面浏览器为例:

说句题外话,UC桌面浏览器真的还蛮好用的,支持Chrome插件(废话,使用的都是同一个内核),但是自带科学上网,打开Github速度就挺快的,推荐使用。

需要改进的地方

因为我前端经验太少,所以一些小细节没处理好,比如当音乐名字太长时,样式就比较难看了。因我忙着在回家前要把孙钟秀的《操作系统教程》读完,并且之前部门给我的入职前作业我还没完成,所以不打算再做太多修改了,感兴趣的东西可以帮忙修改下。

源码地址

https://github.com/armsword/BDMusicDownloader

代码已注释,有时间和感兴趣的同学,可以帮忙优化下界面。

jwSMTP源码剖析

前段时间事情太多了,忙着写毕业论文,考试,然后又被抽到了盲审,不过好在有惊无险,最后也在学院提交三月中旬申请答辩成功,如果盲审顺利的话,4月份就可以毕业了。不过这段时间总算可以看代码、看书了,感觉自己操作系统方面有些不扎实,索性买了本孙钟秀的《操作系统教程》看,之后顺便阅读和分析了jwSMTP源码,这里写篇文章记录下。本文不想对代码细节作太多分析,因为代码比较好读,并且文章末尾我会放出我注释过的源码链接,所以此文多介绍下原理吧。

jwSMTP

jwSMTP是一个由C++编写的发送邮件的库,支持Linux、Windows等平台。可使用HTML或纯文本方式发送邮件。也可添加附件,支持多个收件人。并且支持LOGIN和PLAIN两种服务器验证方式。

两种调用方式

第一种方式

1
2
3
4
5
6
7
mailer mail(“myfriend@friend.com”, // who the mail is too
“someone@somewhere.net”, // who the mail is from
“There is always room for FooBar”, // subject for the email
“Foo\nBar”, // content of the message
“ns.somewhere.net”); // the nameserver to contact
// to query for an MX record
mail.send( );

第二种方式

1
2
3
4
5
6
7
8
mailer mail(“myfriend@friend.com”, // who the mail is too
“someone@somewhere.net”, // who the mail is from
“There is always room for FooBar”, // subject for the email
vec, // content of the message
“mail.somewhere.net”, // the smtp server to mail to
mailer::SMTP_PORT, // default smtp port (25)
false); // do not query MX records,
// mail directly to mail.somewhere.net

主要区别是一个查询MX record,一个不查询MX record,直接发送给SMTP Server。

base64编码

Base64是网络上最常见的用于传输8Bit字节代码的编码方式之一,设计此种编码是为了使二进制数据可以通过非纯8bit的传输层传输,Base64编码可用于在HTTP环境下传递较长的标识信息,另一方面,采用Base64编码具有不可读性,即所编码的数据不会被人用肉眼所直接看到。
电子邮件的主题,MIME都会用到base64编码。我们现在说下其原理:

Base64编码方法:

  • base64的编码都是按字符串长度,以每3个8bit的字符为一组,然后针对每组,首先获取每个字符的ASCII编码,然后将ASCII编码转换成8bit的二进制,得到一组3*8=24bit的字节,然后再将这24bit划分为4个6bit的字节,并在每个6bit的字节前面都填两个高位0,得到4个8bit的字节,然后将这4个8bit的字节转换成10进制,对照Base64编码表,得到对应编码后的字符。
  • 不是3的整数倍的,需要补齐而出现的0,转化成十进制的时候就不能按常规用base64编码表来对应,可以理解成为一种特殊的“异常”,编码应该对应“=”。
    代码base64.cpp/base64.h是对Base64编码的实现,更多原理请参考下参考链接关于base64编码的原理与实现一文。打开一封Email,查看其原始信息,一般为如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    Date: Thu, 25 Dec 2014 06:33:07 +0800
    From: A<A@yeah.net>
    To: "B"<b@126.com>
    Subject:
    X-mailer: Foxmail 5.0 beta2 [cn]
    Mime-Version: 1.0
    Content-Type: text/plain;
    charset="gb2312"
    Content-Transfer-Encoding: base64
    xOO6w6OsU25haVgNCg0KoaGhodXiysfSu7j2QmFzZTY0tcSy4srU08q8/qOhDQoNCkJlc3QgV2lz
    aGVzIQ0KIAkJCQkNCqGhoaGhoaGhoaGhoaGhoaGhoaGhoaGhoaGhoaEgICAgICAgICAgICAgICBl
    U1g/IQ0KoaGhoaGhoaGhoaGhoaGhoaGhoaGhoaGhoaGhoSAgICAgICAgICAgICAgIHNuYWl4QHll
    YWgubmV0DQqhoaGhoaGhoaGhoaGhoaGhoaGhoaGhoaGhoaGhoaGhoaGhICAgICAgICAgMjAwMy0x
    Mi0yNQ0K

程序流程

程序一般为先设置发件人信息,之后设置收件人信息,对应的函数为setsender()和addrecipient()函数,此处没什么可说的。之后是setmessage/setmessageHTML函数,两者的主要区别是不是需要base64编码,方法前面已说,此处主要说下checkRFCcompat()函数,此函数主要功能是:将消息结尾改为CRLF(\r\n)形式,之后注意此处:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if(message.size() == 1) {
if(*(message.begin()) == '.')
message.push_back('.');
}
else if(message.size() == 2) {
if(*(message.begin()) == '.') {
it = message.begin();
it = message.insert(it, '.');
}
}
else {
if(*(message.begin()) == '.') {
it = message.begin();
it = message.insert(it, '.');
}
for(it = message.begin()+2; it != message.end(); ++it) {
// follow the rfc. Add '.' if the first character on a line is '.'
if(*it == '\n') {
if( ((it + 1) != message.end()) && (*(it +1) == '.') ) {
it = message.insert(it + 1, '.');
++it; // step past
}
}
}
}

此处是根据RFC2821(SMTP协议)的4.5.2 Transparency编写的,内容为下:

  1. Before sending a line of mail text, the SMTP client checks the first character of the line. If it is a period, one additional period is inserted at the beginning of the line.

  2. When a line of mail text is received by the SMTP server, it checks the line. If the line is composed of a single period, it is treated as the end of mail indicator. If the first character is a period and there are other characters on the line, the first character is deleted.

然后就是每一行消息不能超过1000个字符,见RFC2821的text line小节。

之后的一些setsubject、setserver、addrecipent等等函数,都不做解释了,都是用来添加/删除主机、设置服务器、增加/删除收件人列表相关的,很好明白。我们重点说下邮件发送函数send()里的operator()()函数,如果lookupMXRecord为真,就调用gethostaddresses()函数,用来查询MX记录,这涉及到DNS协议相关知识,请查DNS小节。如果为fasle,则直接连接SMTP server,operator()()和makesmtpmessage()函数主要是完成了如下流程(并不完全一致,仅参考):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
C: telnet smtp.163.com 25 (连接到163的SMTP服务器,协议规定SMTP服务器的端口号为25
S: Trying 202.108.5.83...
Connected to smtp.163.split.netease.com.
Escape character is '^]'.
220 163.com Anti-spam GT for Coremail System (163com[071018]) (220 表示连接成功)
C: HELO smtp.163.com (协议规定的握手过程,格式为HELO + 服务器名称)
S: 250 OK (250 表示握手成功)
C: AUTH LOGIN (AUTH LOGIN 是用户登录命令)
S: 334 dXNlcm5hbWU6 (334表示服务器接受)
C: tommy_mail (输入明文用户名)
S: 535 Error: authentication failed (服务器拒绝,因为SMTP要求用户名和密码都通过64位编码后再发送)
C: AUTH LOGIN (重新要求SMTP登录)
S: 334 dXNlcm5hbWU6
C: dG9tb*****FpbA== (用编码后的内容发送)
S: 334 UGFzc3dvcmQ6 (334表示接受)
C: ********aXZldXA= (编码后的密码)
S: 235 Authentication successful (235 登录成功)
C: MAIL FROM:<A@163.com> (MAIL FROM:<>格式,用来记录发送者)
S: 250 Mail OK (250 系统常用确认信息)
C: RCPT TO:<B@126.com> (接收者邮箱,可多次使用以实现发送给多个人)
S: 250 Mail OK
C: DATA (DATA明令表示以下为邮件正文)
S: 354 End data with <CR><LF>.<CR><LF>
C: TO:11@11 (发送给谁,这里可自由撰写,也是伪造邮件的一个入口,欺骗一般人可以,但会读源码的人欺骗不了)
FROM:22@22 (发送者是谁,可串改)
SUBJECT:TEST MAIL SMTP (邮件主题)
hello world (空一行写邮件正文)
. (正文以.结束)
S: 250 Mail OK queued as smtp3,DdGowLBLAjqD6_JIg1hfBA==.63235S2 1223879684 (服务器接受)
C: noop (空操作,延迟退出时间)
S: 250 OK
C: quit (退出SMTP服务器连接)
S: 221 Bye

DNS协议

调用gethostaddresses()函数,用来查询MX记录,这涉及到DNS协议相关知识,本函数可以使用nslookup命令模拟,我本地模拟如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
imlinuxer@imlinuxer:~$ nslookup
> set type=mx
> mail.qq.com
Server: 127.0.1.1
Address: 127.0.1.1#53
Non-authoritative answer:
*** Can't find mail.qq.com: No answer
Authoritative answers can be found from:
mail.qq.com
origin = qq.com
mail addr = webmaster.qq.com
serial = 1186990741
refresh = 300
retry = 600
expire = 86400
minimum = 86400

DNS查询的过程一般是:客户向DNS服务器的53端口发送UDP报文,DNS服务器收到后进行处理,并把结果记录仍以UDP报文的形式返回过来。除了报文头是固定的12字节外,其他每一部分的长度均为不定字节数。我们只关心的是报文头、问题、回答这三个部分
DNS的协议为rfc1035,但是枯燥难懂,可以查看参考链接的DNS消息格式,比较容易理解。

1
unsigned char dns[512] = {1,1, 1,0, 0,1, 0,0, 0,0, 0,0};

比如此处即为DNS Header消息头部信息。
之后几段代码是请求部分格式,代码里我已详细注释,之后发送请求,解析应答即可。

SMTP验证方式

比较简单,原理见此:SMTP验证方式种类(LOGIN、PLAIN、CRAM-MD5)
jwSMTP代码只实现了两种验证方式:LOGIN和PLAIN。

说的有点多了,感觉很多原理都解释了,逻辑稍微有一点混乱,主要是自己不是那么擅长组织语言,如果读者有兴趣,可以多了解下原理,知道SMTP和DNS原理,基本上代码就不需要多看了。

最后扔出我的中文源码剖析代码,在Github上:

https://github.com/armsword/Source/tree/master/jwSMTP

参考链接

关于base64编码的原理与实现
POP3 SMTP协议分析学习笔记
DNS消息格式

tinyhttpd源码剖析

喊了几天学习Web开发,为了毕业论文,今晚上计划也是看看CSS呢,但是实在是没忍住,读了下经典的tinyhttp的源代码,这款代码还是颇有名气的,网上这么评论的:

tinyhttpd是一个超轻量型Http Server,使用C语言开发,全部代码只有500、600行,附带一个简单的Client,可以通过阅读这段代码理解一个Http Server的本质。

其实,代码颇简单,适合刚学习Web Server的童鞋学习,因为我之前写过CGI Server,所以,我还是认为此代码写的一般,并且我在Ubuntu下编译遇到了不少错误,我都懒得写详细分析了,所以随便写下吧,后面的Github地址里有详细的分析。

源码下载地址:http://sourceforge.net/projects/tinyhttpd/

make编译后会有不少错误和警告,我这里说下怎么改正错误:

  1. Makefile文件里的:gcc -W -Wall -lsocket -lpthread -o httpd httpd.c ,修改为:
    gcc -W -Wall -o httpd httpd.c -lpthread

  2. 481行的 int client_name_len 改为 socklen_t client_name_len

  3. 436行 改动与上面相似,改为socklen_t类型即可。

  4. 34行改为void accept_request(void ); 所以下面的实现也要修改下:

1
2
3
4
5
6
7
8
9
void *accept_request(void* client1)
{
int client = *(int *)client1;
char buf[1024];
// 省略
// 同时注意此函数77 和129行改为return NULL;
// 497行改为
if (pthread_create(&amp;newthread , NULL, accept_request, (void*)client_sock) != 0)

之后再make,程序就OK了。

简单说下程序的逻辑吧:

一个无限循环,一个请求,创建一个线程,之后线程处理函数处理每个请求,然后解析HTTP请求,然后做一些判断处理,之后判断文件是否可执行,不可执行,打开文件,输出给客户端(浏览器)呗,可执行就创建管道,父子进程通信。

整个流程就这么简单,程序主要处理2种HTTP请求方式:GET和POST,懒得说了,上传两张图片,自己分析吧,图片原始出处不清楚,电脑里存了好久了,。

GET:

POST:

其实这个源码里有一个地方比较难懂,就是那个解析HTTP每一行的那个get_line函数里的recv的MSG_PEEK参数,详细解释可以参考此链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 读取socket,判断换行,CRLF标志,之后以"\n"换行,并在字符串后添加'\0'
int get_line(int sock, char *buf, int size)
{
int i = 0;
char c = '&#92;&#48;';
int n;
while ((i &lt; size - 1) &amp;&amp; (c != '\n'))
{
// 一个字符一个字符的读取
n = recv(sock, &c, 1, 0);
/* DEBUG printf("%02X\n", c); */
if (n &gt; 0)
{
if (c == '\r')
{
/*
* 注意MSG_PEEK参数,表示TCP Buffer不删除之前队列数据,从队列里接收数据
*/
n = recv(sock, &amp;c, 1, MSG_PEEK);
/* DEBUG printf(&quot;%02X\n&quot;, c); */
if ((n &gt; 0) &amp;&amp; (c == '\n'))
recv(sock, &amp;c, 1, 0);
else
c = '\n';
}
buf[i] = c;
i++;
}
else
c = '\n';
}
buf[i] = '&#92;&#48;';
return(i);
}

这代码,没啥写头,很多功能都没实现,请求只实现了GET和POST,Header里只用了第一行,CGI里全局变量只定义了几个,并且我验证程序,发现CGI功能好像是有些问题的,但是因为我CGI水平比较水,懒得检测原因了,总体来说,程序比我之前的那个CGI Server要简单些,功能要稍微弱些吧。

下面放出我Github里的详细中文注释,欢迎指正,谢谢:

https://github.com/armsword/Source/tree/master/tinyhttpd

Webbench源码剖析

我们知道知名的Web网站压力测试工具有Webbench、ab、http_load、siege等等,这种工具的源码都不是太长,所以,我用了一下午和晚上时间仔细了分析了Webbench的源码,并且写下这篇博客记录下。

我们先看下一般Webbench是怎么做压力测试的吧,方法很简单,如下所示:

1
2
3
4
5
6
7
8
9
10
#模拟200次请求,持续时间5秒的压力测试 -c 后为并发数, -t 后为持续时间
[imlinuxer@imlinuxer webbench-1.5]# webbench -c 200 -t 5 http://localhost/index.php
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.
Benchmarking: GET http://localhost/index.php
200 clients, running 5 sec.
Speed=156804 pages/min, 128496336 bytes/sec.
Requests: 13067 susceed, 0 failed.

那Webbench的原理是怎么样的?其实也是很简单的,就是根据提供的参数构造HTTP请求Header,然后使用fork,创建指定大小(webbench提供的参数-c 后的数字,上文为200)个子进程,每个子进程利用socket创建TCP连接到URL,然后通过管道向父进程发送数据,父进程通过管道读取子进程的数据,并作累计,输出即可。其简单流程图如下图所示:


下面简单说下源码吧,源码很短,不到600行,两个文件,分别为socket.c和webbench.c。先说下socket.c,代码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int Socket(const char *host, int clientPort)
{
int sock;
unsigned long inaddr;
struct sockaddr_in ad;
struct hostent *hp;
memset(&amp;ad, 0, sizeof(ad));
ad.sin_family = AF_INET;
// 将字符串转换为32位二进制网络字节序的IPv4地址
inaddr = inet_addr(host);
if (inaddr != INADDR_NONE)
memcpy(&amp;ad.sin_addr, &amp;inaddr, sizeof(inaddr));
else
{
// 使用域名或主机名获取ip地址
hp = gethostbyname(host);
if (hp == NULL)
return -1;
memcpy(&amp;ad.sin_addr, hp-&gt;h_addr, hp-&gt;h_length);
}
ad.sin_port = htons(clientPort);
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock &lt; 0)
return sock;
if (connect(sock, (struct sockaddr *)&amp;ad, sizeof(ad)) &lt; 0)
return -1;
return sock;
}

自己封装了个socket模块,只需要注意的就是URL可能不是域名,也可能是IP地址。

然后就是webbench.c文件,咱们从main函数说起,因为需要对命令行做处理,所以使用了getopt_long函数,没使用这个函数的同学可以man getopt_long或者查看在线文件man 。注意下此结构体为getopt_long的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 长选项,getopt_long的参数
static const struct option long_options[]=
{
{&quot;force&quot;,no_argument,&amp;force,1},
{&quot;reload&quot;,no_argument,&amp;force_reload,1},
{&quot;time&quot;,required_argument,NULL,'t'},
{&quot;help&quot;,no_argument,NULL,'?'},
{&quot;http09&quot;,no_argument,NULL,'9'},
{&quot;http10&quot;,no_argument,NULL,'1'},
{&quot;http11&quot;,no_argument,NULL,'2'},
{&quot;get&quot;,no_argument,&amp;method,METHOD_GET},
{&quot;head&quot;,no_argument,&amp;method,METHOD_HEAD},
{&quot;options&quot;,no_argument,&amp;method,METHOD_OPTIONS},
{&quot;trace&quot;,no_argument,&amp;method,METHOD_TRACE},
{&quot;version&quot;,no_argument,NULL,'V'},
{&quot;proxy&quot;,required_argument,NULL,'p'},
{&quot;clients&quot;,required_argument,NULL,'c'},
{NULL,0,NULL,0}
};

之后是buildr_equest()函数,主要功能是构造HTTP头请求,构造成下面类似情况,具体可以参考代码:

1
2
3
4
GET / HTTP/1.1
Host: www.baidu.com
Cache-Control: max-age=0
Pragma: no-cache

之后便是打印压力测试的一些信息,没啥可说的,很容易读懂,之后到了return bench()处,bench函数是压力测试的核心代码。
bench里根据并发数,使用fork()创建子进程,子进程调用benchcore()函数,此函数里使用alarm和sigaction信号控制定时时间,alarm函数设置了多少时间之后产生SIGALRM信号,一旦产生此信号,运行alarm_handler函数,使得timerexpired等于1,这样可以通过判断timerexpired值来退出程序。然后子进程将数据写入管道。同时父进程读取管道数据,将数据进行累加,当全部读取完子进程后,父进程输出信息退出。

总体来说,Webbench代码还是很好读懂的,当然此代码也存在一些问题,比如不支持POST请求方式,只能模拟单个IP测试等等。
宿舍马上要熄灯了,写的比较着急,可能逻辑混乱些。我Github里对此源码做了非常详细的源码剖析。感兴趣的同学请查看详细的源码剖析吧。

链接地址:
https://github.com/armsword/Source/tree/master/webbench-1.5

Redis AE事件库源码剖析

很久之前就看过AE库了,不过这东西好久不看,也忘得差不多了,正好要找工作了,简历上写了之前的项目用过这个AE事件库,索性我就又把AE库看了一遍,本来表达能力就差,复习下吧。

先上张图,下图左侧是ae.c文件里所有的函数,一目了然。

AE事件库包含文件事件和时间事件,其包含2个源文件ae.c和ae.h,当然还有四个不同的多用复用封装的文件:ae_epoll.c、ae_select.c、ae_evport.c、ae_kqueue.c。一般常用的是ae_select.c(跨平台用),linux下多用ae_epoll.c。我们就以ae_epoll.c为例吧。

我们先说说文中用到的几个结构体:

aeEventLoop 核心数据结构,每个aeEventLoop对应一个event loop。

1
2
3
4
5
6
7
8
9
10
11
12
13
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* 现在注册的最大文件描述符 */
int setsize; /* 跟踪文件描述符的最大数量 */
long long timeEventNextId; //下一个timer的id
time_t lastTime; /* 用来诊断系统时间偏差 */
aeFileEvent *events; /* 用于保存epoll需要关注的文件事件的fd、触发条件、注册函数 */
aeFiredEvent *fired; /* poll_wait之后获得可读或者可写的fd数组,通过aeFiredEvent-&gt;fd再定位到events*/
aeTimeEvent *timeEventHead; //以链表形式保存多个时间事件,每隔一段时间机会触发注册的函数
int stop; //停止运行的标志
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep; //阻塞地等待下一个事件发生之前要执行的函数
} aeEventLoop;

aeFileEvent是文件事件结构体,mask是要捕获的事件的掩码(读|写),rfileProc和wfileProc分别指处理文件读和写事件的函数,clientData是函数用的参数。

1
2
3
4
5
6
7
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

aeFiredEvent指的已准备好的文件事件的结构体,包括fd(文件描述符)和mask(掩码):

1
2
3
4
5
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;

aeTimeEvent 是时间事件用到的结构体,id是指的这个定时器的id,when_sec、when_ms分别指的秒和毫秒,timeproc是时间处理函数,finalizerProc是定时器被删除的时候的回调函数,next指的下一个结点,定时器事件是一个链表形式存储的各个时间事件,无序的。

1
2
3
4
5
6
7
8
9
10
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;

下面这个结构体用在epoll创建,aeCreateApi()里使用:

1
2
3
4
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;

下面来说说程序流程吧,再说aeMain()函数之前,我先说下一些准备动作吧,分别为aeCreateEventLoop()、aeCreateTimeEvent()、aeCreateFileEvent()和aeSetBeforeSleepProc()。 aeCreateEventLoop()创建一个aeEventLoop结构体,做些初始化,之后调用函数aeApiCreate(),如上述结构体,epfd由epoll_create创建,*events 保存epoll_wait返回的事件组。 aeCreateFileEvent()为fd注册一个文件事件,使用epoll_ctl加入到全局的epoll fd 进行监控,之后再指定事件可读写处理函数。 aeCreateTimeEvent()注册一个时间事件,使用aeAddMillisecondsToNow()加入时间事件。 beforesleep 就是个回调函数,sleep之前调用的函数,有些事情是每次循环必须做的,并非文件和时间事件。

1
2
3
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}

下面说说核心循环函数aeMain()吧:

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

由stop来决定是否停止,beforesleep可以决定它是否停止,但redis里beforesleep就是一个实现,做的工作只是保证aeProcessEvents要用到的fd都准备好。aeProcessEvents一般会阻塞的(有例外)等待事件(时间和文件事件)的发生,然后做一些处理,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags &amp; AE_TIME_EVENTS) &amp;&amp; !(flags &amp; AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop-&gt;maxfd != -1 ||
((flags &amp; AE_TIME_EVENTS) &amp;&amp; !(flags &amp; AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags &amp; AE_TIME_EVENTS &amp;&amp; !(flags &amp; AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
aeGetTime(&amp;now_sec, &amp;now_ms);
tvp = &amp;tv;
tvp-&gt;tv_sec = shortest-&gt;when_sec - now_sec;
if (shortest-&gt;when_ms &lt; now_ms) {
tvp-&gt;tv_usec = ((shortest-&gt;when_ms+1000) - now_ms)*1000;
tvp-&gt;tv_sec --;
} else {
tvp-&gt;tv_usec = (shortest-&gt;when_ms - now_ms)*1000;
}
if (tvp-&gt;tv_sec &lt; 0) tvp-&gt;tv_sec = 0;
if (tvp-&gt;tv_usec &lt; 0) tvp-&gt;tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags &amp; AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &amp;tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j &lt; numevents; j++) {
aeFileEvent *fe = &amp;eventLoop-&gt;events[eventLoop-&gt;fired[j].fd];
int mask = eventLoop-&gt;fired[j].mask;
int fd = eventLoop-&gt;fired[j].fd;
int rfired = 0;
/* note the fe-&gt;mask &amp; mask &amp; ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe-&gt;mask &amp; mask &amp; AE_READABLE) {
rfired = 1;
fe-&gt;rfileProc(eventLoop,fd,fe-&gt;clientData,mask);
}
if (fe-&gt;mask &amp; mask &amp; AE_WRITABLE) {
if (!rfired || fe-&gt;wfileProc != fe-&gt;rfileProc)
fe-&gt;wfileProc(eventLoop,fd,fe-&gt;clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags &amp; AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}

注意参数flags,当为AE_DONT_WAIT时就表示是不阻塞的检查关注的事件是否发生了,如果没发生则直接返回0。
aeProcessEvents处理过程如下:
首先通过调用aeSearchNearestTimer找到离现在最近的定时器是什么时候,这个地方其实做的不好,不如libevent的定时器,需要O(N)得到最短时间,可以改为最小堆保存时间,然后将得到的时间作为参数传给aeApiPoll,作为其timeout参数,来返回命中的文件事件数目,查看eventLoop->fired,可以取出命中事件的详细信息,然后调用rfileProc或wfileProc做事件读写处理,最后再调用processTimeEvents处理已经触发了的定时器事件,然后返回所有已处理的事件(文件和时间)总和。
最后,参考链接里的那个Redis执行流程图还是挺有借鉴意义的,图片太大,我就不转过来了,参考链接在最下方。

PS:边看边写,写到此处竟然又晚上两点多了,现在作息时间越来越乱,等拿到offer后,需要调整下生物钟了。

参考链接:http://my.oschina.net/qichang/blog/32469