分布式SQL查询引擎Presto原理介绍

前言

我们实时引擎组新引入了一款分布式SQL查询引擎,名字叫Presto,目前已经调研和测试了2个月了,并且期间某平台也从impala平台迁入到了Presto平台,查询性能有了2-3倍的提升(各种原因导致),所以本文将结合作者这段时间的测试和调研研究,来揭开Presto的神秘面纱。

Presto是神马

Presto是由Facebook开发的一个分布式SQL查询引擎, 它被设计为用来专门进行高速、实时的数据分析。它的产生是为了解决Hive的MapReduce模型太慢以及不能通过BI或Dashboards直接展现HDFS数据等问题。Presto是一个纯粹的计算引擎,它不存储数据,其通过Connector获取第三方Storage服务的数据。

历史

  • 2012年秋季,Facebook启动Presto项目
  • 2013年冬季,Presto开源
  • 2017年11月,11888 commits,203 releases,198 contributors

功能和优点

  • Ad-hoc,期望查询时间秒级或几分钟
  • 比Hive快10倍
  • 支持多数据源,如Hive、Kafka、MySQL、MonogoDB、Redis、JMX等,也可自己实现Connector
  • Client Protocol: HTTP+JSON, support various languages(Python, Ruby, PHP, Node.js Java)
  • 支持JDBC/ODBC连接
  • ANSI SQL,支持窗口函数,join,聚合,复杂查询等

架构

  • Master-Slave架构
  • 三个模块
    • Coordinator、Discovery Service、Worker
  • Connector

Presto沿用了通用的Master-Slave架构,Coordinator即Presto的Master,Worker即其Slave,Discovery Service就是用来保存Worker结点信息的,通过HTTP协议通信,而Connector用于获取第三方存储的Metadata及原始数据等。

Coordinator负责解析SQL语句,生成执行计划,分发执行任务给Worker节点执行;Worker节点负责实际执行查询任务。Worker节点启动后向Discovery Server服务注册,Coordinator从Discovery Server获得可以正常工作的Worker节点。假如配置了Hive Connector,需要配置一个Hive MetaStore服务为Presto提供Hive元信息,Worker节点与HDFS交互读取数据。

部署方式

Presto常见的部署方式如下图所示:

Coordinator与Discovery Server耦合在一起混合部署,然后部署多台Worker。然而这个有个问题,就是Coordinator存在单点问题,我们目前线上使用ip漂移的方法(网卡绑定多ip)。如下图所示:

查询流程

整体查询流程为:

  • Client使用HTTP协议发送一个query请求。
  • 通过Discovery Server发现可用的Server。
  • Coordinator构建查询计划(Connector插件提供Metadata)
  • Coordinator向workers发送任务
  • Worker通过Connector插件读取数据
  • Worker在内存里执行任务(Worker是纯内存型计算引擎)
  • Worker将数据返回给Coordinator,之后再Response Client

SQL执行流程


当Coordinator收到一个Query,其SQL执行流程如上图所示。SQL通过Anltr3解析为AST(抽象语法树),然后通过Connector获取原始数据的Metadata信息,这里会有一些优化,比如缓存Metadata信息等,根据Metadata信息生成逻辑计划,然后会依次生成分发计划和执行计划,在执行计划里需要去Discovery里获取可用的node列表,然后根据一定的策略,将这些计划分发到指定的Worker机器上,Worker机器再分别执行。

与Hive比较


上图显示了MapReduce与Presto的执行过程的不同点,MR每个操作要么需要写磁盘,要么需要等待前一个stage全部完成才开始执行,而Presto将SQL转换为多个stage,每个stage又由多个tasks执行,每个tasks又将分为多个split。所有的task是并行的方式进行允许,stage之间数据是以pipeline形式流式的执行,数据之间的传输也是通过网络以Memory-to-Memory的形式进行,没有磁盘io操作。这也是Presto性能比Hive快很多倍的决定性原因。

实现低延时的原理

  • 完全基于内存的并行计算
  • 流水线
  • 本地化计算
  • 动态编译执行计划
  • 小心使用内存和数据结构
  • 类BlinkDB的近似查询
  • GC控制

当然其优化方法也包括了一些传统的SQL优化原理,关于这些优化细节等后续文章详细介绍。

缺点

前面介绍了Presto的各种优点,其实其也有一些缺点,主要缺点为以下三条:

  • No fault tolerance;当一个Query分发到多个Worker去执行时,当有一个Worker因为各种原因查询失败,那么Master会感知到,整个Query也就查询失败了,而Presto并没有重试机制,所以需要用户方实现重试机制。
  • Memory Limitations for aggregations, huge joins;比如多表join需要很大的内存,由于Presto是纯内存计算,所以当内存不够时,Presto并不会将结果dump到磁盘上,所以查询也就失败了,但最新版本的Presto已支持写磁盘操作,这个待后续测试和调研。
  • MPP(Massively Parallel Processing )架构;这个并不能说其是一个缺点,因为MPP架构就是解决大量数据分析而产生的,但是其缺点也很明显,假如我们访问的是Hive数据源,如果其中一台Worke由于load问题,数据处理很慢,那么整个查询都会受到影响,因为上游需要等待上游结果。
    这篇文章就先介绍这里吧,后续会陆续更新一系列Presto相关的文章,欢迎关注。

参考链接

https://tech.meituan.com/presto.html

说说常见搜索引擎的分布式解决方法

随着索引数据的增大以及请求的增多,分布式搜索是最好的一种解决方案,主要解决两个问题,其一是能让单台机器load所有索引数据到内存中,其二是请求延时大,解决请求latency问题。我之前为团队写了篇专利,内容是关于分布式搜索解决方案的,所以粗略的看了下大部分开源的搜索引擎是怎么实现分布式的,后面的文章我会简单说下常见的搜索引擎的分布式解决方案。

首先我们先说下几个简单概念,分布式搜索都是M*N(横向切分数据,纵向切分流量)这个维度去解决问题的,虽然不同产品或场景概念不完全相同,读者可以简单认为一份完整的数据,被均分为M份,每一份被称为一个分配(Shard或者Partition),然后提供对每个Shard提供N份副本(Replica)。那么分布式的设计就围绕着以下问题:

  • 如何选择合适的分片(Shard),副本(Replica)的数量
  • 如何做路由,即怎么在所有Shard里找到一份完整的数据(找到对应的机器列表)
  • 如何做负载均衡
  • 如果提高服务的可扩展性
  • 如何提高服务的服务能力(QPS),当索引和搜索并发量增大时,如何平滑解决
  • 如何更新索引,全量和增量索引的更新解决方法
  • 如果提高服务的稳定性,单台服务挂掉怎么不影响整体服务等等

下面就说下常见的搜索引擎的分布式解决方案,因为开源的搜索产品基本上都没有在工作中用过,对代码细节并不是太了解,只是研究了下其原理,所以理解的会有些偏差,看官们如果发现错误直接指出即可。

Sphinx/Coreseek

Sphinx的流程还是很简单的,可以看下其流程图:

需要支持分布式的话,需要改下配置,大致是这样子的:

1
2
3
4
5
6
7
8
index dist
{
type = distributed
local = chunk1
agent = localhost:9312:chunk2 本地
agent = 192.168.100.2:9312:chunk3 远程
agent = 192.168.100.3:9312:chunk4 远程
}

从图中也可以看出,需要在配置列表里配置好其他shard的地址。查询过程为:

  • 连接到远程代理
  • 执行查询
    • 对本地索引进行查询
    • 接收来自远程代理的搜索结果
  • 将所有结果合并,删除重复项
  • 将合并后的结果返回给客户端

索引数据复制同步的方法也是常用的两种:

  • 主从同步
  • 增量更新索引

方法也是设置crontab,添加2个选项,一个是重建主索引,一个是增量索引更新。

当然为了避免单点以及增加服务能力,肯定有多个Replica,解决方法应该也是配置或者haproxy相关的方法解决,从上面可以看出,Sphinx很难用,自动化能力太弱,所以很多大厂要么不再使用Sphinx要么基于其二次开发。

Solr

Solr提供了两种方案来应对访问压力,其一是Replication,另一个是SolrCloud。我们此处只说Replication原理。
Replication采用了Master/Slave模式,也就是说由一个主索引和多个从索引构成,从索引从主索引复制索引,主索引负责更新索引,从索引负责同步索引和查询。本质上是读写分离的思想,MySQL/Redis等数据库也多是这种方式部署的。有两种部署方式:

  • 第一种

  • 第二种

与第一种相比多了一层Repeater,Repeater既扮演了Master角色,又扮演了Slave功能,主要解决单个Master下Slave太多,Master压力太大的问题。

Master与Slave之间的通信是无状态的http连接,Slave端发送不同的Command从Master端获得数据。原理就是Master那边有个标志位和版本号,用于获取正确的数据版本,然后数据扔到Slave临时目录下,数据完整后,再覆盖原有数据。多个副本的方法应该与Sphinx相似,一般也是通过通过上游负载均衡模块如Nginx,HaProxy来分流。

SolrCloud

因为Solr Replication不好用,本质上还不算真正分布式的,所以Solr从4.0开始支持SolrCloud模式。特性不少,主要说两个吧:

  • 配置文件统一管理,扔到Zookeeper上
  • 自动做负载均衡和故障恢复,不再需要Nginx或HaProxy的支持

逻辑图

  • Collection:逻辑意义上一份完整的索引
  • Shard:上文已说,就是索引的1/N分片
  • Replica:Shard的一个拷贝

每个Shard,即相同的Replica下都会有一个leader,leader选举由Zookeeper完成。虽然有leader的概念,但是其实SolrCloud分布式是去中心化的,意思就是说,leader和非leader都能提供查询功能(也有修改和删除功能,搜索场景应用不多吧?),而更新索引,创建Collection/Shard/Replica(即扩容)只能由leader完成,避免产生并发修改问题,当非leader节点收到修改操作请求时,将信息存储在zookeeper中相应节点上,leader节点会一直对该节点进行watch,发现变化就实时做处理。

Zookeeper信息

图片出处见参考链接

创建索引

  • 任意节点收到创建索引请求后,转换成json格式存储到zk的/overseer/collection-queue-work的children节点上
  • leader线程一直监控collection-queue-work节点,检查到变化后,取出json数据,根据信息计算出需要创建的shard、replica,将创建具体replica的请求转向各对应节点
  • 各节点创建完具体的replica后,将该节点的状态(创建成功与否等)更新到/overseer/queue的children节点上
  • leader线程监控/overseer/queue节点,将overseer/queue的children节点的状态更新至/clusterstate.json
  • 各节点同步/clusterstate.json,整个集群状态得到更新,新索引创建成功

更新索引

  • 根据路由规则计算出该doc所属shard,并找出该doc所属的shard对应的leader
  • 如果当前Replica是对应Shard且是leader,首先更新本地索引,然后再将doc转向该Shard的其余Replica

扩容/缩容

  • 停掉某台Solr,更新集群状态到/clusterstate.json
  • 增加一台Solr,从leader出复制相同的数据,然后配置写到/clusterstate.json

查询

  • 去中心的,leader和非leader一样功能,Replica接收搜索请求时,从Zookeeper中获取该Replica对应的Collection及所有的Shard和Replica
  • 将请求发送到该Collection下对应的Shard,然后负载均衡到对应的Replica

SolrCloud也有其他功能,比如Optimization,就是一个运行在leader机器的进程,复杂压缩索引和归并Segment;近实时搜索等。总体看SolrCloud解决了Solr Replication遇到的一些问题,比Sphinx更好用,更自动化。

一号店

很多大一点的厂商如果不自研搜索引擎的话,并没有使用SolrCloud,而多基于Solr/Lucence。比如一号店的分布式搜索解决方案,如下所示:
http://www.infoq.com/cn/articles/yhd-11-11-distributed-search-engine-architecture

Broker就相当于Proxy,扮演了路由功能,很多厂商做的与一号店有相似之处。因为没有leader选举,所以索引的更新就由其他模块来做了。

ElasticSearch

ElasticSearch的倒排索引也是基于Lucence实现的。功能强大,不仅提供了实时搜索功能,还有分析功能,DB-Engines上面的搜索引擎排名,目前已经超越Solr排名第一位了。因为太强大了,功能也特别多,我研究还不够深,简单说下吧。
es会将集群名字相同的机器归为一个集群(业务),所以先说下启动过程。

启动过程

当ElasticSearch的节点启动后,它会利用多播(multicast)(或者单播,如果用户更改了配置)寻找集群中的其它节点,并与之建立连接。

leader选举

与SolrCloud相似,也是去中心化的,但是没有使用Zookeeper,而是自己实现了分布式锁,选主的流程叫做 ZenDiscovery(详情见第三个参考链接):

  • 节点启动后先ping(这里的ping是 Elasticsearch 的一个RPC命令。如果 discovery.zen.ping.unicast.hosts 有设置,则ping设置中的host,否则尝试ping localhost 的几个端口, Elasticsearch 支持同一个主机启动多个节点)
  • Ping的response会包含该节点的基本信息以及该节点认为的master节点
  • 选举开始,先从各节点认为的master中选,规则很简单,按照id的字典序排序,取第一个
  • 如果各节点都没有认为的master,则从所有节点中选择,规则同上。这里有个限制条件就是 discovery.zen.minimum_master_nodes,如果节点数达不到最小值的限制,则循环上述过程,直到节点数足够可以开始选举
  • 最后选举结果是肯定能选举出一个master,如果只有一个local节点那就选出的是自己
  • 如果当前节点是master,则开始等待节点数达到 minimum_master_nodes,然后提供服务
  • 如果当前节点不是master,则尝试加入master

选举完leader后,主节点leader会去读取集群状态信息;因为主节点会监控其他节点,当其他节点出现故障时,会进行恢复工作。在这个阶段,主节点会去检查哪些分片可用,决定哪些分片作为主分片。

分片

es在创建索引时,自己设置好分片个数,默认5个,整个过程类似于分裂的概念,如下图所示:

至于读写、写操作等于SolrCloud相似,等我细研究后后续再说吧,也可以说下实时索引怎么做的,细节很多,下次再说吧。至于文中为什么不说Lucence,因为Lucence其实就是个index lib,只是解决倒排、正排索引怎么存放的,并不是一个完整的搜索引擎解决方案。而ES为什么能脱颖而出的主要原因是配套设施完善,工具,Web UI都是非常赞的,对于很多开源产品,它能后来居上的主要原因就是它真实的能解决用户遇到的问题或者比其他产品更好用。搜索引擎发展这么多年了,架构这块能做的基本上大家都差不太多,最后能脱颖而出的肯定是第三方工具做的更完善,更好用的了。

PS:
至于阿里搜索怎么做的,可以参考下这个文档,包括了阿里搜索里用到的很多基础模块了:
https://share.weiyun.com/f66e79d9f6897d0aac683361531cf00d

参考链接:
http://blog.haohtml.com/archives/13724
http://www.voidcn.com/blog/u011026968/article/p-4922079.html
http://jolestar.com/elasticsearch-architecture/

Minder:一个分布式启停进程服务

关于

打算造个轮子,先备份下简单的设计手册吧。Minder是一个分布式启停进程服务,主要用于以下目的。

  • 命令或远程RPC调用启动和停止进程
  • 当进程挂掉,可以拉起来进程
  • 获得机器对应的系统信息,如CPU,内存,硬盘等等

设计

方案一(不采用)

分为Server和Client端

Server

Server在每台机器上都需要安装,Server是个常驻进程,负责真正的启停进程,同时需要一个Check线程,用于判断进程状态,当程序挂掉以便重启进程,重启进程可以在配置里有个次数限制。

Client

Client就是一个工具,用于向指定的Server发送请求,Server收到请求,启动进程。因为协议使用RPC,为了方便,使用脚本语言编写更好。

程序流程

Client发送请求,根据Client命令,判断是启动还是停止进程,假设是start进程,那么通过RPC的stub_client发送到Server的stub_server,之后调用对应的startProcess函数,通过fork+exec启动进程,同时Server需要有个Loop线程,判断进程的状态,以便重启进程。因为一个Server需要启动不同的服务,同时Server也可能会挂掉,所以Server需要持久化进程的信息,以便Server挂掉后恢复这些进程信息。

细节

  • 想象下我们启动进程都需要哪些参数?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"processInfos": [
{
"isDaemon": true, // 是否是守护进程
"processName": "app", // 进程名字
"restartInterval": 10, // 重启时间间隔
"restartCountLimit": 10, // 重启次数
"signal": 9, // 停止进程的信号值,kill函数对应的值
"parameters": "-m cluster -n 1", // 进程需要的参数
"environmentVariable" : { // 需要的环境变量
"key" : "key",
"value" : "value"
}
}
]
}

因为我们请求是Client工具向Server发送请求的,通信方式为RPC,所以上述参数都为PB格式。

  • Server端启动进程方式为fork+exec,停止进程为kill发送信号。

  • Client只是个工具,发送请求的方式可以为以下命令: minder_client -m start -s \”10.10.10.10:8888\” -p /bin/sleep 大意就是向Server发送请求,请求启动sleep进程。

架构图

方案二

分为Manager、Slave和Tools,第一版本只有一个Manager,Slave在每台机器都有。如果看懂方案一,方案二许多细节就可以不用说了,只见到说下每个模块干啥的吧。

Manager

只有Manager给用户打交道,用户向Manager提交需要启动的机器ip,进程信息,Manager向Slave发送消息。同时如果用户需要获取Slave机器的机器状态,如CPU,内存以及磁盘信息等,也是通过Manager获取的。

Slave

用户Slave是不与用户打交道的,Slave用于接受Manager的请求,启停进程,获取Slave机器状态,然后将对应的信息返回给Manager。

Tools

封装了对应的RPC请求,用户向Manager发送请求。

程序流程

Manager接受用户的请求,根据请求,Manager向Slave发送请求,Slave接受到请求后执行对应的操作。同时Manager持久化对应的信息。因为Manger必须知道Slave的机器ip,如果这个需要Slave主动汇报给Manager,四层心跳既可以。

架构图

当然,这个设计也是有很多改进的方法,比如Manager挂了怎么办,Manager存在单点问题,既然启动进程,怎么可以改善下搞成灰度发布,等等。因为时间问题,我工作也非常忙,这个我就不细说了。轮子有空造,其实写出来方案,思路基本上就清晰了。

另外,其实写这篇文字时候,与微信朋友圈里与朋友们交流了下想法,发现了2个系统是可以参考的,一个是Hadoop Ambari,另一个是Cloudera Manager。

一些常见的搜索查询树优化方法

搜索引擎服务收到一个Query后,一般引擎这边是这么搞得,解析语法,生成后缀表达式,即查询搜索树(Search Tree)。搜索查询树负责求交、求并和过滤。所以这个地方也是性能关键点。所以在解析语法后,一般要做查询搜索树优化,减少求交,求并和过滤操作的次数,以此来提高搜索服务的QPS和查询Latency。

因为同事在负责新实时索引的性能优化,我正好负责将全量索引格式迁移到新实时索引上面,即实时索引的代码即支持全量索引也支持实时索引,减少运维和代码维护成本。我在查阅代码的同事,顺便与同事沟通了一些优化方法,进入全量索引代码里发现老的全量索引里已经有一些优化方法了,这些方法应该是市面上常见的方法,多数人也是应该知道的,这里记录下吧。主流优化操作主要包括以下四种:

全AND操作

如果父子节点都是AND,则可以合并,如(A & B)& (C & D),优化后为 A & B & C & D ,能提早发现交为空,并退出,如下图所示。

优化后为:

全OR操作

如果父子节点都是OR,则可以合并。如 (A | B)| (C | D),优化后为 A | B | C | D,能提早发现为满并退出。如下图所示:

优化后为:

将子节点的OR上提

如 (A | B) & (C &D),优化后为 A |(B & C | D),查找次数减少。

优化后为:

按照Doc数量排序

如 A & B & C,可以按照Doc数量排序后查询,Sum(A) < Sum(B) < Sum(C),则A & B & C可以尽早发现为空并退出。

当然查询搜索树优化不止这么些方法,比如之前就知道可以使用WAND提升长Query的检索速度,很久没看了,细节记不清了,就不说了。以后需要的时候再研究下。

dlog:高性能线程安全的C++日志库

打算闲暇时光写点靠谱的东西比如名字服务、KV存储等基础服务类软件,所以周末就先从封装一些基础库开始。上周末写了个C++日志库,支持多线程,性能也还不错,今天就做了下测试和修改了下bug。这里说下该库的使用方法,以及实现思路以及一些性能调优方法。欢迎交流和指教。

源码地址

https://github.com/armsword/dlog

编译

执行build.sh,在上层目录里会生成build文件,测试的可执行文件在release/bin目录下。

使用方法

将etc文件下的dlog.json扔到可执行文件的当前目录(当然只要能让可执行文件找到dlog.json即可),每个函数包含logger文件夹下的Log.h头文件,在主函数里调用DLOG_INIT初始化一次,之后在每个需要打印log的文件里调用DLOG_LOG即可。
如实例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <dlog/logger/Log.h>
#include <unistd.h>
using namespace dlog::logger;

int main() {
DLOG_INIT();

DLOG_LOG(WARN, "test the log level using log lib!");
DLOG_LOG(DEBUG, "Hello World, %d",2016);
DLOG_LOG(INFO, "test Log C/C++ lib");
DLOG_LOG(ERROR, "Hello everyone, this is my blog: http://armsword.com/; Welcome to visit it,Thank you!");

return 0;
}

输出样例

目录名:dlog.log.20160911.0
日志内容:

1
2
3
4
[2016-09-11 15:05:30.510] [WARN] [4246443808] [/home/Github/dlog/example/LogTest.cpp:8] [main] test the log level using log lib!
[2016-09-11 15:05:30.510] [DEBUG] [4246443808] [/home/Github/dlog/example/LogTest.cpp:9] [main] Hello World, 2016
[2016-09-11 15:05:30.510] [INFO] [4246443808] [/home/Github/dlog/example/LogTest.cpp:10] [main] test Log C/C++ lib
[2016-09-11 15:05:30.510] [ERROR] [4246443808] [/home/Github/dlog/example/LogTest.cpp:11] [main] Hello everyone, this is my blog: http://armsword.com/; Welcome to visit it,Thank you!

该日志库功能

  • 包含四种日志级别,分别为WARN、DEBUG、INFO、ERROR,日志级别大小依次递增
  • 可配置输出日志路径
  • 可配置输出日志前缀
  • 可定义输出的日志级别,默认DEBUG
  • 可定义日志文件切分大小
  • 支持多线程程序
  • 可定义日志往磁盘刷新的方式
  • 支持每天切换新的日志文件
  • 支持log文件被删除时,从新建立日志文件

dlog.json配置

1
2
3
4
5
6
7
{
"log_path": "./log", // 日志路径
"log_prefix": "dlog", // 日志前缀
"log_level": "DEBUG", // 输出日志级别,DEBUG表示大于等于DEBUG级别的日志都打印
"max_file_size": 200, // 日志切分大小,单位m
"async_flush": true // 日志往磁盘刷新方式,true表示异步,false表示同步,建议选择true
}

性能

在测试机上测试了下机器的硬盘真实io写速度

1
2
3
4
5
6
time dd if=/dev/zero of=test.dbf bs=8k count=300000 oflag=direct
记录了300000+0 的读入
记录了300000+0 的写出
2457600000字节(2.5 GB)已复制,38.0385 秒,64.6 MB/秒

注:oflag=direct 表示使用DirectIO方式,不使用文件系统的buffer等

而我用dlog写入3.5G(1000W条数据),用时大约51、52s,计算下来,写速度大约69M/s,与上面测试的磁盘io数据64.6 MB/s比起来,性能还算不错(因为cache问题,要比裸写磁盘性能要好),当然由于时间问题,我测试还不够充分,以后有机会继续优化下再测试看下。

一些性能调优技巧

  • 为了避免锁竞争,使用了一种更为高效的线程局部存储方法,就是使用关键字__thread来定义变量,__thread是GCC内置的线程局部存储设施(Thread-Local Storage),凡是带有__thread的变量,每个线程都拥有该变量的一份拷贝,且互不干扰。
  • 使用likely,unlikely来提高CPU分支预测正确率来提高性能。我们定义likely、unlikely如下:
    1
    2
    #define likely(x) __builtin_expect((x), 1)
    #define unlikely(x) __builtin_expect((x), 0)

其中__builtin_expect是gcc提供的函数。顾名思义,likely表示这件事很大概率会发生 :)

  • 使用loop线程来判断文件是否需要切分(打开新的fd),并且open、close这种费时操作,只在loop线程里完成,不阻塞log输出线程(一些小技巧保证线程安全)。

本日志库主要是为了满足个人需求,当然即使公司业务,没特殊需求的话也足够用了,所以并没有写的像log4j或者log4cxx那么复杂,但即便如此该日志库还有一些提高空间,等我有空再继续优化下,欢迎交流和指教,谢谢。

致谢:

因为很久没用Cmake了(之前在阿里用ascons),基本上都忘光了。并且阿里的一些经验使我比较在意代码目录的组织方式,搜索后发现一篇文章讲Cmake非常不错,作者很用心,非常感谢。

Cmake入门实战

Earlybird:Twitter实时搜索引擎

简单说下,2011年左右的Paper,Paper里说Twitter的实时索引和检索系统叫做Earlybird,Paper中主要讲了2件事,第一个就是支持Twitter的实时索引的倒排索引结构是怎么样的,第二个就是利用Java并发模型,处理并发读写。

该引擎功能:

  • 低延时、高吞吐能力;
  • 能处理突发峰值(Weibo的特性);
  • 突出时效性,时间越近,排名应该越靠前;
  • 该实时引擎支持AND、OR、NOT以及短语查询,实时性大约10S,查询latency 50ms;

该实时引擎,基于Lucene,使用Java开发,理由是:

  • 利用已存在的Lucene代码,并用来做全量索引;
  • 适用于Twitter以JVM为中心的开发环境;
  • 利用Java和JVM提供容易理解的并发模型;

其实上面几条理由与阿里很多开发项目类似,但是阿里的搜索引擎是C++编写的,质量也是非常不错的,叫问天(HA3)。

索引构建过程:

Earlybird处理三种信号:

  • 静态信号,初次建立索引时被添加;
  • 共鸣信号,动态更新的,比如tweet转发次数;
  • 搜索用户信息,用来个性化排序;

  1. 用户发布tweet,之后被发送到一个队列,叫做Ingestion Pipeline里。这些tweet会被分词,并被加上一些meta信息,比如语言等;
  2. 为了处理大容量,tweet按照哈希算法,分发到各个Earlybird服务上,之后将tweet实时地建立索引;
  3. 如上图所示,有一个Updater服务,它推送共鸣信号到Earlybird服务,用于动态地更新索引,比如tweet转发信息等。

查询过程如下:

  1. 当查询请求过来,先到达前端“Blender”,Blender解析请求,并将搜索用户的本地社交图谱信息也下发到Earlybird服务;
  2. Earlybird服务执行相关性计算并排序,并将排序好的tweet(排序分数高的,最近发生的tweets)返回给Blender;
  3. Blender合并各个Earlybird返回的列表,并重排序后返回给用户;

Term字典

Twitter实时索引现在不支持一些高级查询,比如通配符。所以term词典使用hash表实现。没有选择Java默认的HashMap,因为其不是GC友好的,Earlybird实现了个开链法的哈希表(个人猜测是不是与Redis的渐进式哈希相似呢?)每个Term被分配了一个唯一且单调递增的id作为key,value包含以下两部分信息:

  • Term对应的倒排索引数据长度;
  • 指向倒排索引数据末尾的指针;

动态(active)索引

Paper里首先介绍了现在存在的索引组织方式不适合实时索引,如将新发生的tweet放到postings链最后面,但是读取时候需要倒序读取,这个是不支持现在流行的压缩算法的,而如果将新的数据放到倒排索引前面,这带来的问题就是内存分配的问题。Earlybird使用了一个更简单的方式,分离索引读和写的方式。

每个实例维护了多个索引分段(目前是12个instance),每个分段保存相对较少量的tweet(目前是2^23~840万 tweets)。新增加的tweets被分到同一个segment(分段)中,满了之后再放下一个,这样,在任何时候只有一个分段被更改,其他都是只读状态,当一个segment满了,停止接收新的tweets时,会做只读优化。

postings列表在优化之前直接使用整数数组,按照docid升序存储,并且不做压缩,因此查找可以直接二分查找。postings增长时分配空间以pool为单位,分配空间按指数预留空间,分为4个,大小分别为应2^1、2^4、2^7、2^11 。如果一个pool满了,另一个pool将会分配。每个pool里面有若干个slice,每个slice保存一个term的postings列表,每个postings列表里面存储着一个term的postings信息,每个postings是一个32-bit的整数,24-bit用作文档id;8-bit用作位置id,保存term在tweet中出现的位置。因为tweet有140字符限制,所以8-bit足够用。在建立索引时,先尝试填满2^1pool中的slice,如果填满,就转到2^4的pool的slice,以此类推。term 字典中保存term对应的最后一个pool的中postings list尾部指针。如下图所示:

索引优化

一旦动态索引停止接收新的tweets时,即此segment的tweets超过840W时,后台执行只读索引优化。优化过程中,根据老的索引数据,新的索引数据会被创建,原始索引数据不做变化,一旦完成构建,原始索引数据将被新的索引替换。postings链分为两种,长的和短的,以长度1000为界限。短的,即postings链长度小于1000,postings保持原样(24-bit文档id加上8-bit的位置信息),但是postings会按照时间逆序排列。对于长的postings链,采用了基于block的压缩算法,PForDelta和Simple9。

并发管理

对于实时索引来说,一个重要的需求就是在多线程环境下处理好并发读和写操作。当然这个只用于动态索引当中,静态索引(优化后的)只有读操作。因为不懂Java,这块简单说下吧,Paper里说Java和JVM提供了一个非常容易理解的并发内存模型。11个只读段并发读不需要锁,唯一的可读可写段使用volatile关键字实现高效同步,然后也使用了 jvm memory barrier。懂Java得同学,通过以下新tweet添加处理过程,可能就明白一二了。

tweet里的每个term,首先去查询对应的词典入口,在字典中,term被映射到term id。它包括2个信息,Term对应的倒排索引数据长度和指向倒排索引数据末尾的指针。通过末尾的指针,信息被添加到新的postings链中。如果没有足够的空间插入新的postings信息,新的slice被分配。如果term是第一次遇到,它被添加到词典并且分配下一个term id。postings被添加到slice时,term并发的增加计数,指向postings链尾巴的指针也被并发更新。当tweet里的所有term都被处理后,增加maxDoc变量,此变量表示目前遇到的最大文档id。

简单就写这么多吧,没看明白的同学可以自己仔细阅读下twitter的这篇实时索引Paper吧,地址:

http://www.umiacs.umd.edu/~jimmylin/publications/Busch_etal_ICDE2012.pdf

记一次线上Bug的查找过程和思路

前言

简单说下问题情况,Proxy即作为客户端又有服务端功能,其接受QS(Query Server)的请求,之后向BS(索引服务)发送请求,然后根据BS的返回结果Merge后返回给QS。不能泄露太多东西,所以本文主要是整理一些知识点和问题查找思路。

问题和现象:

  • Proxy机器上出现大量的CLOSE_WAIT。
  • Proxy失去服务能力,内存使用不为0,CPU使用率为0。
  • 机器报警,TcpListenOverFlows。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
命令查看:
netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'


结果:
TIME_WAIT 31927
CLOSE_WAIT 1570
ESTABLISHED 40

TIME_WAIT:表示主动关闭,通过优化系统内核参数可容易解决。
CLOSE_WAIT:表示被动关闭,需要从程序本身出发。
ESTABLISHED:表示正在通信


注:以上数据非真实线上情况,只为举例

一些知识点

TCP握手图



TIME_WAIT:

Linux系统下,TCP/IP连接断开后,会以TIME_WAIT状态保留一定的时间(2MSL:max segment lifetime),默认为4分钟,然后才会关闭回收资源。当并发请求过多的时候,就会产生大量的 TIME_WAIT状态的连接,无法及时断开的话,会占用大量的端口资源和服务器资源。
状态保持2MSL的原因:
1.防止上一次连接中的包,迷路后重新出现,影响新连接(经过2MSL,上一次连接中所有的重复包都会消失)
2.可靠的关闭TCP连接。在主动关闭方发送的最后一个 ack(fin) ,有可能丢失,这时被动方会重新发fin, 如果这时主动方处于 CLOSED 状态 ,就会响应 rst 而不是 ack。所以主动方要处于 TIME_WAIT 状态,而不能是 CLOSED 。另外这么设计TIME_WAIT 会定时的回收资源,并不会占用很大资源的,除非短时间内接受大量请求或者受到攻击。
一些解决方法:
通过修改/etc/sysctl.conf文件,服务器能够快速回收和重用那些TIME_WAIT的资源 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#表示开启SYN Cookies。当出现SYN等待队列溢出时,启用cookies来处理,可防范少量SYN攻击,默认为0,表示关闭
net.ipv4.tcp_syncookies = 1

#表示开启重用。允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭
net.ipv4.tcp_tw_reuse = 1

#表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭
net.ipv4.tcp_tw_recycle = 1

#表示如果套接字由本端要求关闭,这个参数决定了它保持在FIN-WAIT-2状态的时间
net.ipv4.tcp_fin_timeout=30

生效命令:
/sbin/sysctl -p

CLOSE_WAIT:

CLOSE_WAIT表示被动关闭,出现这种问题,基本是就是客户端连接异常或者自己没有迅速回收资源。
什么情况下,连接处于CLOSE_WAIT状态呢?

  • 在被动关闭连接情况下,在已经接收到FIN,但是还没有发送自己的FIN的时刻,连接处于CLOSE_WAIT状态。通常来讲,CLOSE_WAIT状态的持续时间应该很短,正如SYN_RCVD状态。但是在一些特殊情况下,就会出现连接长时间处于CLOSE_WAIT状态的情况。
  • 出现大量close_wait的现象,主要原因是某种情况下对方关闭了socket链接,但是我方忙与读或者写,没有关闭连接(CLOSE_WAIT应该是默认2小时才会关闭)。代码需要判断socket,一旦读到0,断开连接,read返回负,检查一下errno,如果不是AGAIN,就断开连接。
  • scoket阻塞,无超时或非阻塞处理。

EPOLL:

  • EPOLLIN:连接,对端发送普通数据,对端socket正常关闭。
  • EPOLLPRI:带外数据,文件描述符有紧急的数据可读。
  • EPOLLOUT:数据可写。
  • 对端正常关闭(close(),shell下kill或ctr+c),触发EPOLLIN和EPOLLRDHUP,但是不触发EPOLLERR和EPOLLHUP。//重要
  • 对端异常断开连接(如网线),不会触发任何事件。判断方式是向已经断开的socket写或者读,会发生EPOLLERR错误。

socket接口的几个知识点:

  • read总是在接收缓冲区有数据时立即返回,而不是等到给定的read buffer填满时返回。
    只有当receive buffer为空时,blocking模式才会等待,而nonblock模式下会立即返回-1(errno = EAGAIN或EWOULDBLOCK)
  • blocking的write只有在缓冲区足以放下整个buffer时才返回(与blocking read并不相同)
    nonblock write则是返回能够放下的字节数,之后调用则返回-1(errno = EAGAIN或EWOULDBLOCK)
    对于blocking的write有个特例:当write正阻塞等待时对面关闭了socket,则write则会立即将剩余缓冲区填满并返回所写的字节数,再次调用则write失败(connection reset by peer)

Bug查找

通过以上的知识点,我们就可以很好的定位问题原因了,因为Proxy即作为客户端,又有服务端功能,其转发QS的结果到BS之后,再将BS的结果merge后回复给QS。仔细排查代码,发现2处引起Proxy大量出现CLOSE_TIMEWAIT。

<1>、Proxy将BS的结果merge后,发送QS时,socket是阻塞的,也未做超时处理。

<2>、QS超时(3S),会主动关闭socket,而Proxy这层没有关闭fd,可能原作者认为会触发EPOLLERR和EPOLLHUP(这里面做了处理)。

参考链接:

1、http://www.cnblogs.com/promise6522/archive/2012/03/03/2377935.html
2、http://originlee.com/2015/04/22/nonblocking-tcp-socket/

浅谈Linux ulimit以及max memory locked

刚入职,我在索引这个组,需要熟悉下之前的搜索索引模块的代码,以便后续开发。我在新申请的测试机器上编译代码部署索引这个模块就报错了。定位出现问题的地方,代码大致流程是这样子的:

1
2
3
4
5
6
7
8
9
10
if (conf->use_memlock_for_mmap) {
// add MAP_LOCKED to lock the buffer

attr_hash = (AttrIndexInfo*)mmap(NULL, fsize, PROT_WRITE | PROT_READ, MAP_SHARED | MAP_LOCKED, fd, 0);

} else {

attr_hash = (AttrIndexInfo*)mmap(NULL, fsize, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);

}

而参数里配置了use_memlock_for_mmap 选项,于是代码就走到第一个mmap逻辑处。

使用ulimit -a查看下系统属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[armsword@search-querybs1 ~]$ ulimit -a
core file size (blocks, -c) 4194304
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 256726
max locked memory (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files (-n) 1000000
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 10240
cpu time (seconds, -t) unlimited
max user processes (-u) 102400
virtual memory (kbytes, -v) unlimited (注:相当于limits.conf中的as和cgroup中的memory.limit_in_bytes.)
file locks (-x) unlimited

我们知道索引程序在启动时候,需要加载倒排/正排等索引原文件,这些文件是比较大的,几十G或者过百G也是可能的,但是系统下的max locked memory对应值是64k,而mmap选项也设置了MAP_LOCKED选项,此选项的意思就是lock一段地址范围内已map的内存,相应的数据在unlock之前一直存在于物理内存中,不会被回收。对于应用程序来说,可以将内存中一些对程序性能影响较大的数据lock起来,避免非预期的页面回收或者换入/换出引起性能波动。所以将max locked memory设置为unlimited就解决问题。

linux系统提供了以下几个系统调用用于内存的lock和unlock。

1
2
3
4
5
mlock/munlock:lock/unlock一段地址范围内已map的内存

mlockall/munlockall:lock进程虚拟地址空间内已map的内存,还可以选择对于此后新map的空间是否自动lock

mmap使用MAP_LOCKED选项时表示在mmap的同时,对相应地址范围进行mlock

一般情况下,我们使用ulimit多用于提高性能,最常用的地方就是设置打开文件描述符的数量,web服务器等需要大量的文件句柄,一旦开太小,比如默认1024,在句柄使用完毕的时候,系统就频繁出现emfile错误,这时候系统很容易陷入不可用。但是如果设定太大了,又会有这样的副作用。很多服务器程序是事件派遣的,比如说用epoll,程序在启动的时候通常会根据最大的文件句柄数来预留内部的slot,一个slot貌似要占用几K的资源,如果你设定文件句柄数目太大,就可能无端的浪费了几百M内存。所以要设置一个合适的值有利于提高程序性能。

ulimit 用于限制 shell 启动进程所占用的资源,支持以下各种类型的限制,具体楼上已列出,注意下H/S选项的意义,

-H 设置硬资源限制,硬资源限制用于控制软限制。限定一旦设置只有root用户可以增加硬限制,普通用户只能减少自己的硬限制大小。
-S 设置弹性资源限制,弹性限制用于限制具体的用户或者进程。设置后普通用户可以增加,但是不能超过硬限制大小。
如果不指定-S或者-H,那么弹性资源限制和硬限制将同时设置。

作为临时限制,ulimit 可以作用于通过使用其命令登录的 shell 会话,在会话终止时便结束限制,并不影响于其他 shell 会话。而对于长期的固定限制,修改 /etc/security/limits.conf 文件即可。

几种Hash算法的实现

哈希被广泛使用在很多领域,如数据存储,加密,计算机视觉(几何哈希),此处就简单整理下几个常见的Hash函数的实现,有空陆续补充吧。

BKDR Hash Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 本算法由于在Brian Kernighan与Dennis Ritchie的《The C Programming Language》一书被展示而得名
// 是一种简单快捷的hash算法。
// 也是Java目前采用的字符串的Hash算法(累乘因子为31)。
// 此哈希函数用的最多

template<class T>
size_t BKDRHash(const T *str)
{
register size_t hash = 0;
while (size_t ch = (size_t)*str++)
{
hash = hash * 131 + ch; // 也可以乘以31、131、1313、13131、131313..
// 可将乘法分解为位运算及加减法可以提高效率
// 如将上式表达为:hash = hash << 7 + hash << 1 + hash + ch;
}
return hash;
}

其中累乘因子也可以为131、1313、13131,比如下述代码就使用了33。

1
2
3
4
5
6
7
unsigned int times33(char *str)
{
unsigned int val = 0;
while (*str)
val = (val << 5) + val + (*str++);
return val;
}

此算法也会有如下变种,如:

1
2
3
4
5
6
7
unsigned int timesnum(char *str, int num)
{
unsigned int val = 0;
while (*str)
val = val * num + (*str++);
return val;
}

SDBM Hash Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 本算法是由于在开源项目SDBM(一种简单的数据库引擎)中被应用而得名
// 它与BKDRHash思想一致,只是种子不同而已。

template<class T>
size_t SDBMHash(const T *str)
{
register size_t hash = 0;
while (size_t ch = (size_t)*str++)
{
hash = 65599 * hash + ch;
//hash = (size_t)ch + (hash << 6) + (hash << 16) - hash;
}
return hash;
}

AP Hash Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Arash Partow发明的一种hash算法 
// 比较优秀的一种哈希算法
unsigned int APhash(char *str)
{
unsigned int val = 0;
int i = 0;
for (i = 0; *str; i++)
if ((i & 1) == 0)
val ^= ((val << 7)^(*str++)^(val>>3));
else
val ^= (~((val << 11)^(*str++)^(val>>5)));

return (val & 0x7FFFFFFF);
}

FNV Hash Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Unix system系统中使用的一种著名hash算法,后来微软也在其hash_map中实现。

template<class T>
size_t FNVHash(const T* str)
{
if(!*str)
return 0;
register size_t hash = 2166136261;
while (size_t ch = (size_t)*str++)
{
hash *= 16777619;
hash ^= ch;
}
return hash;
}

其实关于为什么要用异或,我搜索了下原因,见Why is XOR the default way to combine hashes?

MySQL中出现的字符串哈希函数

1
2
3
4
5
6
7
8
9
unsigned int MySQLhash(char *str)
{
register unsigned int nr = 1, nr2 = 4;
while(*str) {
nr ^= (((nr & 63) + nr2)*((unsigned int)*str++)) + (nr << 8);
nr2 += 3;
}
return (unsigned int)nr;
}

参考链接:
spiderq
各种哈希函数冲突率分析

写了一个Chrome插件 - 百度无损音乐下载插件

1、之前是从http://music.baidu.com/ 入手的,现在发现从此处入手已经找不到方法了。由于我又不是太懂JS语法,于是用了几十分钟,没分析到地址放弃。

2、灵光一闪,从http://fm.baidu.com/ 入手,在Chrome下抓包,细心分析后,果然发现蜘丝马迹,如下图所示:

见左图那个灰色的链接,我们打开链接看下,嘿,发现了许多歌曲信息哦,如下图所示:

由于是FM,所以songIDs有一大串,咱们试试构造下,以黑蝙蝠中队为例,其有三种音乐格式,第一种是可以直接下载的,第二三种是收费的:

我们构造下不同格式的音乐试试看,以无损音乐flac格式和921kbps为例:

构造的链接为:http://music.baidu.com/data/music/fmlink?songIds=966991&type=flac&rate=921


将songLink后的地址格式化为正常网页地址,如下所示:
http://yinyueshiting.baidu.com/data2/music/33809115/966991111600921.flac?xcode=a6ed00d30421e9bebbbef0f52f4938299d0fba1db032ff5f
下载下,发现可以正常下载,28.8M,flac格式的无损音乐。

所以,超高/无损音乐下载方法就是:
将此链接的:
http://music.baidu.com/data/music/fmlink?songIds=966991&type=flac&rate=921
songIds、type、rate改完你想下载的音乐就可以了。

如果只想下载无损音乐flac格式的(前提条件是百度下载里包含无损格式),这样就可以了,只需要更改歌曲songIds。
http://music.baidu.com/data/music/fmlink?songIds=966991&type=flac

用了2-3天时间,学了下前端开发方面的基础知识,没怎么看教程,就是想实现某些功能或者遇到问题就去搜索解决,遇到了一些坑,但是收获也挺大的,于是随手写了个Chrome插件,可用来下载百度无损音乐的,比较简单,原理如上。

程序地址

点击下载插件

使用方法

本插件支持Chrome浏览器和UC桌面浏览器,其他Webkit内核的浏览器应该也支持,但是我没做测试。

Linux系统下安装方法

Linux下直接下载此插件,之后解压后,在BDMusicDownloader文件夹下发现一个src.crx文件,将其拖入到Chrome浏览器-设置-扩展程序界面即可,当打开如以 http://music.baidu.com/song/ 开头的链接时,即下载音乐地址,会自动弹出插件,点击下载即可。如图所示:

Windows下安装方法

从程序地址里下载安装插件,解压文件夹,注意,因为Chrome在Win下的安全限制,所以此文件夹不能删除,之后同上所示,但不是拖入src.crx文件,而是选中开发者模式-加载正在开发的插件-BDMusicDownloader,之后选中src文件,打开即可。如图以UC桌面浏览器为例:

说句题外话,UC桌面浏览器真的还蛮好用的,支持Chrome插件(废话,使用的都是同一个内核),但是自带科学上网,打开Github速度就挺快的,推荐使用。

需要改进的地方

因为我前端经验太少,所以一些小细节没处理好,比如当音乐名字太长时,样式就比较难看了。因我忙着在回家前要把孙钟秀的《操作系统教程》读完,并且之前部门给我的入职前作业我还没完成,所以不打算再做太多修改了,感兴趣的东西可以帮忙修改下。

源码地址

https://github.com/armsword/BDMusicDownloader

代码已注释,有时间和感兴趣的同学,可以帮忙优化下界面。