Presto ORC及其性能优化

简介

ORC的全称是(Optimized Row Columnar),其是为了加速Hive查询以及节省Hadoop磁盘空间而生的,其使用列式存储,支持多种文件压缩方式。由于其被广泛应用在Hadoop系统中,Presto 0.77版本在Hive Connector里实现了ORC Reader。

ORC文件结构

上图(图1)来自网络,有很多文章通过这张图片介绍了ORC文件结构,我这里就不多啰嗦了,我们直接通过数据来看文件格式吧。

创建表:

插入数据:

1
insert into orc(id,name) values(1,'a'),(2,'b'),(3,'c'),(4,'d'),(5,'e'),(6,'f'),(7,null);

注:我们想只生成一个文件,所以一次插入了7条数据,否则会生成多个文件,不利于我们后续分析。

Dump数据:

使用Hive自带的ORC DUMP工具,命令:

1
./bin/hive --orcfiledump hdfs://localhost:9000/user/hive/warehouse/orc/000000_0

数据格式:

上图由回车划分了5个模块,第五个模块表示该ORC文件数据大小,我们不需要关心。

  • 模块1
    对应图1里面的PostScript,包含压缩类型及表结构信息
  • 模块2
    Stripe Statistics对应图1里面的Stripe Footer,Stripe粒度的索引,可以看到其包含3列,Column 1 对应id字段,包含其最大值和最小值以及包含其sum总和,其中也有个hasNull标记,用于标记是否含有NULL,此字段可用于SQL里带有 is null 的优化,Column 2对应name字段,与Column 1相似,但是sum表示非NULL值的行数。注意此处只含有1个Stripe,默认为10000,参数hive.exec.orc.default.row.index.stride可控制其大小,简单测试了下,发现此值最小为1000,否则生成MR出错,具体需要代码里再确认下。Column 0只统计其count值,可以忽略。
  • 模块3
    对应图1里面的File Footer,与Stripe Footer相似,但是是文件级别的索引。
  • 模块4
    stripe详细信息,就是真实列数据块,其中又分为Index data(记录每列的索引信息),Raw Data(记录原始数据),Index data可以根据自身业务特点做些性能调优,比如实现布隆过滤器索引(Hive 1.2实现)。Raw Data是通过row group保存的,其实可以简单的认为就是默认按照10000将原始数据划分更小的块,每一个row group由多个stream保存数据和索引信息。每一个stream的数据会根据该列的类型使用特定的压缩算法保存。在ORC中存在如下几种stream类型:
    • PRESENT:每一个成员值在这个stream中保持一位(bit)用于标识该值是否为NULL
    • DATA:当前stripe的成员值,真实数据
    • LENGTH:每一个成员的长度,string类型特有,否则你不知道每个string的长度
    • DICTIONARY_DATA:对string类型数据编码之后字典的内容
    • SECONDARY:存储Decimal、timestamp类型的小数或者纳秒数等

通过模块4,我们可以看到id含列有DATA Stream,而name含有PRESENT、DATA、LENGTH Stream,因为存在空值,所以多了个PRESENT Stream。

Presto ORC及优化

Presto在实现ORC时,Hive-based ORC reader维护的数据是行式的,Presto想使用官方提供的客户端时还需要将行数据转换为列数据,且当时不支持Predicate pushdown,所以索性Presto自己实现ORC Reader,不过Hive 0.13也实现了VectorizedOrcRecordReader提供列格式。

Predicate pushdown

由ORC文件格式分析,ORC在每个文件中提供三级索引:

  • 文件级别,整个文件级别的统计信息
  • stripe级别,每个stripe每列中的值的统计信息
  • 行级别 (行组),stripe中每组10000行(默认值)的每列值的统计信息

假如查询过滤条件为WHERE id = 1,首先从文件的统计信息(一级索引)中看看id字段的min/max值,如果1不包含在内,那么跳过该文件;如果在这个文件中,那么继续查看二级索引,每个stripe中id字段的min/max值,如果1不包含在内,那么跳过此stripe;如果在该stripe中,则继续匹配row group中的min/max值(三级索引),如果1不包含在内,那么跳过该row group。如果1包含在内min和max范围内,则利用布隆过滤器再次判断是否一定不在内,不在内则继续跳过该行组。其原理就是通过三级索引,将查询范围缩小到10000行的集合,而原始数据是列式存储,更加适合CPU pipeline的编码方式,有效利用这种局部性,缓存可以达到极高的命中率,所以ORC有非常高效的性能。

Lazy reads

以SQL为例,SELECT a, b FROM ... WHERE a = ...,如果a不匹配,那么将不会读取b的列。

Bulk reads

Presto老版本ORC Reader代码可以简化为以下逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (dataStream == null) {
presentStream.skip(nextBatchSize);
return RunLengthEncodedBlock.create(type, null, nextBatchSize);
}

BlockBuilder builder = type.createBlockBuilder(null, nextBatchSize);
if (presentStream == null) {
for (int i = 0; i < nextBatchSize; i++) {
type.writeLong(builder, dataStream.next());
}
}
else {
for (int i = 0; i < nextBatchSize; i++) {
if (presentStream.nextBit()) {
type.writeLong(builder, dataStream.next());
}
else {
builder.appendNull();
}
}
}
return builder.build();

比如float及double的datatStream.next()实现为:

1
2
3
4
5
6
public float next()
throws IOException
{
input.readFully(buffer, 0, SIZE_OF_FLOAT);
return slice.getFloat(0);
}

一次只读取一个值,将其改为按照Bulk loading(比如8*SIZE_OF_FLOAT),读取性能有明显提升。
而对于Boolean reader,之前一次处理 1 bit数据(但是读取按照Byte),优化点是将其改为一次处理 8 bit(1 Byte)。

Improve null reading

从上面的代码可以看到,当有PRESENT Stream时(就是存在null值时),还要每次处理PRESENT Stream,读取DATA Stream及 PRESENT Stream,CPU Cache利用率很低,所以上面代码改为了下面形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// bulk read and count null values
boolean[] isNull = new boolean[nextBatchSize];
int nullCount = presentStream.getUnsetBits(nextBatchSize, isNull);

// bulk read non-values into a temporary array
dataStream.next(tempBuffer, nextBatchSize - nullCount);

// copy values into result
long[] result = new long[isNull.length];
int position = 0;
for (int i = 0; i < isNull.length; i++) {
result[i] = tempBuffer[position];
if (!isNull[i]) {
position++;
}
}

先将数据读取临时文件里,然后依次处理。

Avoid dynamic dispatch in loops

1
2
3
for (int i = 0; i < nextBatchSize; i++) {
type.writeLong(builder, dataStream.next());
}

很多Stream Reader只包含一种type,但是LongStreamReader会包含BIGINT、INTEGER、SMALLINT、TINYINT 及 DATE Types。这会让JVM的一些优化失效,比如inline,改动为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (type instanceof BigintType) {
BlockBuilder builder = type.createBlockBuilder(null, nextBatchSize);
for (int i = 0; i < nextBatchSize; i++) {
type.writeLong(builder, dataStream.next());
}
return builder.build();
}
if (type instanceof IntegerType) {
BlockBuilder builder = type.createBlockBuilder(null, nextBatchSize);
for (int i = 0; i < nextBatchSize; i++) {
type.writeLong(builder, dataStream.next());
}
return builder.build();
}

...

因为早期Hive ORC Reader的一些特性,导致Presto自己实现了ORC Reader,但是现在来看,直接调用社区的ORC Reader效果会更好,因为Presto基本上每2、3个小版本就会修复ORC Bug或者做些简单的性能提升,但是代码里很多都是来源于社区ORC的代码,Presto社区整体进展缓慢,直接调用社区ORC接口,省下了优化和修复Bug的时间,剩下的时间做些Presto引擎更核心的事情应该才是正确的做法。

参考链接

Presto System load过高问题调研

背景:

我们Presto有个集群,每6.5天会出现System load过高问题,这个集群有个特点,只服务于一个业务方,且SQL基本相似。如图所示:Sys load很高(20-40%),严重影响查询性能。

业务SQL查询时间表现为:

ScanFilterAndProjectOperator(Source Stage)阶段有机器有明显的长尾现象,比如20台机器,正常这个Operator执行时间只需要1S,但是有几台机器会耗时几分钟。而重启服务后,查询恢复正常。

先jstack看下Presto在干啥:

如图所示,Presto通过ScanFilterAndProjectOperator类执行filter过滤,其中这个filter方法是Codegen生成。

pstack查看下线程栈:

看到程序主要CPU浪费在Deoptimization::uncommon_trap里了。为什么呢?再说这个问题之前,我们先说下JIT。

JIT:

为了提高热点代码(Hot Spot Code)的执行效率,在运行时,虚拟机将会把这些代码编译成与本地平台相关的机器码,并进行各种层次的优化,完成这个任务的编译器称为即时编译器(JIT)。

“热点代码”两类:

  • 被多次调用的方法
  • 被多次执行的循环体 – 尽管编译动作是由循环体所触发的,但编译器依然会以整个方法(而不是单独的循环体)作为编译对象。
    编译器根据概率选择一些大多数时候都能提升运行速度的优化手段,当优化的假设不成立,出现“罕见陷阱”(Uncommon Trap)时可以通过逆优化(Deoptimization)退回到解释状态继续执行。

如:

1
2
3
4
5
6
static void test(Object input) {
if (input == null) {
return;
}
// do something
}

如果input一直不为空,执行1W次时,那么上述代码将优化为(类似GCC的 -O2优化算法):

1
2
3
static void test(Object input) {
// do something
}

但是如果之后出现input为空,那么将会触发Uncommon Trap,通过逆优化(Deoptimization)退回到解释状态继续执行。所以触发逆优化是正常的,但是我们的问题是程序一直在做Uncommon Trap,做了很多无用功,这是不正常的。

查阅JVM源码 compile::too_many_recompiles,发现有参数可以控制可编译次数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
bool Compile::too_many_recompiles(ciMethod* method,
int bci,
Deoptimization::DeoptReason reason) {

// Pick a cutoff point well within PerBytecodeRecompilationCutoff.
uint bc_cutoff = (uint) PerBytecodeRecompilationCutoff / 8;
uint m_cutoff = (uint) PerMethodRecompilationCutoff / 2 + 1; // not zero
Deoptimization::DeoptReason per_bc_reason
= Deoptimization::reason_recorded_per_bytecode_if_any(reason);

if ((per_bc_reason == Deoptimization::Reason_none
|| md->has_trap_at(bci, reason) != 0)
// The trap frequency measure we care about is the recompile count:
&& md->trap_recompiled_at(bci)
&& md->overflow_recompile_count() >= bc_cutoff) {
// Do not emit a trap here if it has already caused recompilations.
// Also, if there are multiple reasons, or if there is no per-BCI record,
// assume the worst.
if (log())
log()->elem("observe trap='%s recompiled' count='%d' recompiles2='%d'",
Deoptimization::trap_reason_name(reason),
md->trap_count(reason),
md->overflow_recompile_count());
return true;
} else if (trap_count(reason) != 0
&& decompile_count() >= m_cutoff) {
// Too many recompiles globally, and we have seen this sort of trap.
// Use cumulative decompile_count, not just md->decompile_count.

if (log())
log()->elem("observe trap='%s' count='%d' mcount='%d' decompiles='%d' mdecompiles='%d'",
Deoptimization::trap_reason_name(reason),
md->trap_count(reason), trap_count(reason),
md->decompile_count(), decompile_count());

return true;
} else {
// The coast is clear.
return false;
}
}

PerMethodRecompilationCutoff 可以控制recompiles次数,触发uncommon_trap时,且too_many_recompiles为true时,其行为为Deoptimization::Action_none。
https://github.com/MuniyappanV/jdk-source-code/blob/master/jdk6u21_src/hotspot/src/share/vm/opto/graphKit.cpp#L1807

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void GraphKit::uncommon_trap(int trap_request,
ciKlass* klass, const char* comment,
bool must_throw,
bool keep_exact_action) {
Deoptimization::DeoptReason reason = Deoptimization::trap_request_reason(trap_request);
Deoptimization::DeoptAction action = Deoptimization::trap_request_action(trap_request);

switch (action) {
case Deoptimization::Action_maybe_recompile:
case Deoptimization::Action_reinterpret:
// Temporary fix for 6529811 to allow virtual calls to be sure they
// get the chance to go from mono->bi->mega
if (!keep_exact_action &&
Deoptimization::trap_request_index(trap_request) < 0 &&
too_many_recompiles(reason)) {
// This BCI is causing too many recompilations.
action = Deoptimization::Action_none;
trap_request = Deoptimization::make_trap_request(reason, action);
} else {
C->set_trap_can_recompile(true);
}
break;

case Deoptimization::Action_make_not_entrant:
C->set_trap_can_recompile(true);
break;

我们写个测试程序,测试PerMethodRecompilationCutoff 参数,参数超过recompile次数时性能表现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class DeoptimizationTest
{
private DeoptimizationTest() {}
static int someComplicatedFunction(double x)
{
return (int) (Math.pow(x, x) + Math.log(x) + Math.sqrt(x));
}
static void hotMethod(int iteration)
{
if (iteration < 20) {
someComplicatedFunction(1.23);
}
else if (iteration < 40) {
someComplicatedFunction(1.23);
}
else if (iteration < 60) {
someComplicatedFunction(1.23);
}
else if (iteration < 80) {
someComplicatedFunction(1.23);
}
else {
someComplicatedFunction(1.23);
}
}
static void hotMethodWrapper(int iteration)
{
int count = 100_000;
for (int i = 0; i < count; i++) {
hotMethod(iteration);
}
}
public static void main(String[] args)
{
for (int k = 0; k < 100; k++) {
long start = System.nanoTime();
hotMethodWrapper(k + 1);
System.out.println("iteration " + k + ": " + (System.nanoTime() - start) / 1_000_000 + "ms");
}
}
}

执行代码:javac DeoptimizationTest.java && java -XX:PerMethodRecompilationCutoff=3 DeoptimizationTest

指定PerMethodRecompilationCutoff为3,意思是PerMethodRecompilationCutoff/2 + 1 = 2,那么第三个分支将触发大量Deoptimization::Action_none操作,如上图所示,发现第59次迭代时,查询变慢了接近600倍。

火焰图见下图,可以看到耗时与我们Presto遇到的问题非常相似,耗时主要在Deoptimization::uncommon_trap:

解决方案:

既然当达到阈值会触发Deoptimization::Action_none,那么最简单的解决方法是不让他达到阈值就好了。修改JVM参数:

-XX:PerMethodRecompilationCutoff
-XX:PerBytecodeRecompilationCutoff

这2个值尽量调大一些,比如2000和500以上。

线上我找了一台机器测试了一周,CPU表现为:

因为这个问题,应该是个JVM Bug,社区找了下关键词,发现JDK 11.0.6 好像解了这个问题,但是这个版本还没有release,无法验证,等版本发布后再验证下。

https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8227523

参考链接

Presto兼容Hive SQL的一些改造工作

前言

Presto是一款优秀的分布式SQL查询引擎,适用于即席查询和报表分析等业务,其使用了ANSI SQL语法和语义,使用标准是SQL-92和SQL:2016。但是因为很多业务方一直使用Hive离线引擎来做SQL分析,而Hive使用类似SQL的语法(HQL)。为了使用户能平滑的将业务迁移到Presto上或者能让SQL同时跑到Presto及Hive引擎上,我们对Presto语法及一些算子等做了二次兼容开发,来最大限度降低用户迁移成本。接下来我们介绍下我们的主要兼容工作。

一、权限认证

Presto默认如果使用Password,那么必须使用Kerberos协议。因为公司离线数据没有Kerberos协议,所以我们改进了权限认证机制,与离线复用一套权限机制,用户只需要去离线申请权限,指定用户名和密码,即可查询Presto引擎和访问离线数据,并且提供了以下几种方式访问公司Presto,且与公司离线权限体系打通,分别为JDBC、Cli、Go、Python、R、NodeJS。

二、行为修改

整数相除返回浮点数

Presto整数相除沿用了Java整数相除的特性,我们需要修改函数算子行为,如IntegerOperators,将divide相关的函数修改为返回浮点数结果。比如:

1
2
3
4
5
presto:default> select 5/2;
_col0
-------
2.5
(1 row)

类型隐式转换

Presto类型比较与Java一样,使用的是强类型,即int与varchar是无法比较的,但是Hive是支持隐式转换的,所以这种隐式类型转换的问题,用户遇到的比较多。我们做了修改,在对应的算子里,添加了新的语义行为,事例如下:

1
2
3
4
5
presto:default> select '1' > 2;
_col0
-------
false
(1 row)

过滤表达式隐式转换

大量的SQL,where条件里会带有比较语句,可能会出现int与varchar比较的场景,所以我们做了过滤表达式隐式转换,方法是在ExpressionAnalyzer里通过visitComparisonExpression来cast左右表达式为同一类型。

Array Functions兼容

主要与Hive保持兼容,做了如下修改:

  • array index从0开始

    1
    2
    3
    4
    5
    presto:default> select array[1,2,3][0];
    _col0
    -------
    1
    (1 row)
  • 超过下标最大值时返回NULL

1
2
3
4
5
presto:default> select array[1,2,3][4];
_col0
-------
NULL
(1 row)

substr兼容

  • 下标从0开始
1
2
3
4
5
presto:default> select substr('123',0,2);
_col0
-------
12
(1 row)

in 操作符可包含字符串

1
2
3
4
5
presto:default> select id from test where id in ('1001',1002) limit 1;
_col0
-------
1001
(1 row)

sum 支持字符串相加

sum的参数包含字符串时,可进行计算, 即 select sum(id) from test; 如果id为varchar也可以进行计算了。

JOIN条件隐式转换

实现了StatementRewrite子类,通过visit JOIN来获得左右表达式进行cast,因为一些财务业务涉及到大精度问题,所以将精度范围改为DECIMAL(38,10)。

cast

因为Presto字符串类型为varchar,Hive为String,所以支持了cast as string语法,事例:

1
2
3
4
5
presto:default> select cast (1 as string);
_col0
-------
1
(1 row)

三、新的语法支持

& 操作符

SqlBase.g4文件里添加新的 & 语法,然后实现对应的Operator。事例为:

1
2
3
4
5
presto:default> select 1 & 3;
_col0
-------
1
(1 row)

rlike

与like相似,实现了rlike,比如查询select id from test rlike ‘xxx’; 目前还有部分功能需要完善。

四、UDF

UDF除了添加了一些特殊业务的UDF外,比如isInPolygon多边形计算,安全函数等,还添加了一些常用的Hive UDF,分别为:

字符串相关

  • md5
  • concat_ws
  • nvl
  • json

日期相关

  • unix_timestamp
  • fromUtcTimestamp
  • toUtcTimestamp
  • dateSub
  • diffDate

int/bigint函数

1
2
3
4
5
presto:default> select int(2.1);
_col0
-------
2
(1 row)

等等,具体想查看udf是否支持,可以直接操作连接Presto,如查询md5函数:

1
2
3
4
5
presto:default> select md5('1');
_col0
----------------------------------
c4ca4238a0b923820dcc509a6f75849b
(1 row)

有结果即表示支持该函数。

五、其他兼容

Hive视图

在visitTable里获得table信息,判断是否是视图,如果是视图,那么获取原始SQL查询,递归调用Analyze。

插入数据临时文件目录

由于涉及到权限问题,insert数据时,用户临时数据插入到临时目录会没有权限,所以修改了可配置临时文件插入目录参数。

支持可读取Hive递归子分区

正常表存储格式为 /table/partition/file,这种数据是可以正常访问的,但是如果数据存储格式为 /table/partition/directory/file 时,Presto访问时返回数据为空。而Hive里面的解决方法为请求时添加以下参数:

1
2
3
set hive.mapred.supports.subdirectories=true;

set mapreduce.input.fileinputformat.input.dir.recursive=true;

我们设置了参数,让默认即可递归子目录,参数为:

1
hive.recursive-directories=true

Key not present in map

Presto在处理MAP时,当KEY 不存在时,会报错,而Hive返回NULL,通过Presto内部参数解决:

1
deprecated.legacy-map-subscript=true

当然整体兼容Hive并不止这些,比如函数兼容类似Java里面的多态,需要找到合适的函数;还有least/greast等函数支持,还有一些函数null处理问题,太细节了,这里就不一一介绍了。由于Hive兼容是一个任重而道远的事情,这里还是推荐用户最好使用ANSI SQL以及管理好自己数据,这以后无论迁移还是使用,都会方便不少,减少不必要的麻烦。

Presto的一些基本概念

模型

Presto 是 Facebook 开源的 MPP (Massive Parallel Processing) SQL 引擎,其理念来源于一个叫 Volcano 的并行数据库,该数据库提出了一个并行执行 SQL 的模型,核心思想就是 Operator Model 和 Iterator Model。

  • Operator Model
    即通过各种 Operator 组成一棵树,树的根节点负责结果输出,树的叶子节点是各种 TableScan。这棵树被称作 Plan(执行计划),数据库里又被细分为逻辑执行计划和物理执行计划。这棵树是由 SQL 经过词法、语法分析及语义分析后,生成一个 AST(Abstract Syntax Tree),一般经过 Visitor 模式遍历后生成。原始数据通过叶子节点 TableScan 读取数据,然后经过各个 Operator的计算,包括(TableScan、Project、Filter、Exchange、Agg、Join、TaskOutput等)产出结果。
  • Iterator Model
    顾名思义就是一个递归迭代过程,Plan 树的各节点有三个状态,Open、GetNext及Close。从根节点 Open 开始递归调用 GetNext 获取数据,即父节点递归调用子节点接口直到没有结果为止,然后Close。

概念

Stage

MPP的理念就是能尽量细粒度的将 SQL 并行执行,以一个 SQL 2个表 JOIN 后 Agg 为例,那么每个表都可以单独并行执行去 Scan 数据(互不影响),然后进行 Join 和 Agg。所以执行计划(Plan)将执行 PlanFragment,即将一个树分块变为各个子树,每个子树可以并行的在多台机器上执行,这个 Fragment 被称为 Stage。

Presto根据 Stage 的用途,分为四种stage:

  • Coordinator_Only:一般表示 DDL,DML 的 Stage
  • Single:用于聚合子 stages 数据,并最终将数据输出给终端用户。比如每个查询中的根节点(Root Stage)
  • Fixed:用于接收子 Stage 产生的数据,并在集群中对这些数据进行聚合或分组计算
  • Source:连接数据源,从数据源读取数据

我们以简单的SQL查询为例,SQL为select id from table limit 1; 这个SQL简单来说,就干了2件事,一是Scan数据,另外是Limit,而这2件事,可以并行执行,所以如图所示,其分为2个Stage:

Stage 1 为Scan数据和Limit,这里Limit是下推优化。Stage 0为最终结果输出。

同时Presto UI里可以看到每个Stage详细信息,以及每个Stage需要的Task数(可以认为Worker数),如图所示:

Exchange

连接不同的 Stage,用于不同 Stage 之间的数据交互。数据的交互有一些Operator实现,比如数据是Hash分发还是完全Replicate等。从上图可以看到Stage 1 和Stage 0 需要交互,通过Exchange实现。

Task

Stage 有多个 Task 组成。Stage 并不会运行,其实个抽象的概念,其只是负责管理 Task 和封装建模。Stage 实际运行的是Task。每个Task处理一个或者多个Split,每个Task被分配到每台机器上执行。每个Task都有对应的输入和输出。同一个Stage下的Task是个并行的概念,做的事情是相同的。
如下图所示,我们可以看到每个Tasks的相信信息,其中0.x表示Stage 0,1.x表示Stage 1,同时我们也可以看到每个Task执行花费的时间,读取的数据大小以及每个Task处理的Split数目。一个Stage包含一个或多个Task,每个Task做的事情是一样的,所以每个Stage的花费时间由最慢的Task决定,比如Scan HDFS数据,可能会因为某些Data Node阻塞导致Task阻塞。

Driver

Task 被分解成一个或者多个 Driver,并行执行多个 Driver 的方式来实现 Task 的并行执行。Driver 是作用于一个 Split 的一系列 Operator 的集合。一个 Driver 处理一个 Split,产生输出由 Task 收集并传递给下游的 Stage 中的一个 Task。一个 Driver 拥有一个输入和输出。

Operator

Operator 表示对一个 Split 的一种操作。比如过滤、转换等。 一个 Operator 一次读取一个 Split 的数据,将 Operator 所表示的计算、操作作用于 Split 的数据上,产生输出。每个 Operator 会以 Page 为最小处理单位分别读取输入数据和产生输出数据。Operator 每次只读取一个 Page,输出产生一个 Page。

Split

一个分片表示大的数据集合中的一个小子集,与 MapReduce 中的 Split 概念类似。对于Hive中的表,一个Split就是HDFS文件的一个分片。根据文件格式是否分片(如ORC,Parquet),该Split可能是一个Block的大小,也可能是整个文件。

Page

Presto 中处理的最小数据单元。一个 Page 对象包括多个 Block 对象,而每个 Block 对象是一个字节数组,存储一个字段的若干行。多个 Block 的横切的一行表示真实的一行数据。一个 Page 最大 1MB,最多 16 * 1024 行数据

Pipeline

Stage 里有很多 Operator,这些 Operator 可能并行度是不一样的,比如 Scan 数据并行就很大,但是最后聚合数据,并行一般为1。所以 PlanFragment 又会被切分为若干 Pipeline,每个 Pipeline 由一组 Operator 组成,这些 Operator 被设置同样的并行度。Pipeline 之间会通过 LocalExchangeOperator 来传递数据。

在 Presto UI 上我们可以看到 Pipeline信息,如下图所示,Stage 0 主要是将 Exchange 的数据,做最后的 limit,所以其可以细分为 2 个步骤,LocalExchangeOperator 及 LimitOperator,这2个动作的并行度是不一样的,Exchange 可以多个线程去做,而 Limit 只需要一个线程。从图中我们可以看到 Driver 和 Operator 信息,其中 Driver 的数目就是这个 Pipeline 的并行度。

因为后续会陆续介绍 Presto 的一些执行流程,为了防止被一些概念绕晕,所以本文主要是对 Presto 的一些概念和专有名词做了一些科普和解释。

参考资料

  • 《Presto技术内幕》
  • 《Presto基本概念》

Presto内存管理原理和调优

内存池

Presto有三种内存池,分别为GENERAL_POOL、RESERVED_POOL、SYSTEM_POOL。这三个内存池占用的内存大小是由下面算法进行分配的:

1
2
3
4
5
6
7
8
builder.put(RESERVED_POOL, new MemoryPool(RESERVED_POOL, config.getMaxQueryMemoryPerNode()));

builder.put(SYSTEM_POOL, new MemoryPool(SYSTEM_POOL, systemMemoryConfig.getReservedSystemMemory()));

long maxHeap = Runtime.getRuntime().maxMemory();
maxMemory = new DataSize(maxHeap - systemMemoryConfig.getReservedSystemMemory().toBytes(), BYTE);
DataSize generalPoolSize = new DataSize(Math.max(0, maxMemory.toBytes() - config.getMaxQueryMemoryPerNode().toBytes()), BYTE);
builder.put(GENERAL_POOL, new MemoryPool(GENERAL_POOL, generalPoolSize));

梳理这块代码对应的逻辑和配置文件,得出RESERVED_POOL大小由config.properties里的query.max-memory-per-node指定;SYSTEM_POOL由config.properties里的resources.reserved-system-memory指定,如果不指定,默认值为Runtime.getRuntime().maxMemory() * 0.4,即0.4 * Xmx值;而GENERAL_POOL值为 总内存(Xmx值)- 预留的(max-memory-per-node)- 系统的(0.4 * Xmx)。

而这三种内存池分别用于不同的地方,分析代码和阅读Presto开发手册,大体可以定位出:

  • GENERAL_POOL is the memory pool used by the physical operators in a query.
  • SYSTEM_POOL is mostly used by the exchange buffers and readers/writers.
  • RESERVED_POOL is for running a large query when the general pool becomes full.

简单说GENERAL_POOL用于普通查询的physical operators;SYSTEM_POOL用于读写buffer;而RESERVED_POOL比较特殊,大部分时间里是不参与计算的,只有当同时满足如下情形下,才会被使用,然后从所有查询里获取占用内存最大的那个查询,然后将该查询放到 RESERVED_POOL 里执行,同时注意RESERVED_POOL只能用于一个Query。

1、GENERAL_POOL有节点出现阻塞节点(block node)情况,即该node内存不足
2、RESERVED_POOL没有被使用

GENERAL_POOL、RESERVED_POOL、SYSTEM_POOL应配合合理的值,如果并发比较大时,建议SYSTEM_POOL保持默认或者稍微再大一点。目前我的经验配置是SYSTEM_POOL为1/3 * Xmx(虽然我们并发较多,但是依然调低了此值);RESERVED_POOL 为 1/9 * XMX。

当然你可以通过HTTP请求查看每台Worker的/v1/status,来预估具体需要配置多大的内存,如图所示,显示了各内存池的使用量。

内存限制和管理

单机维度

  • GENERAL_POOL每次内存申请时,都会判断内存使用量是否超过了最大内存,如果超过了就报错,错误为“Query exceeded local memory limit of x”,这保护了Presto会无限申请内存,只会导致当前查询出错。同时,如果该节点的GENERAL_POOL可使用内存以及可回收内存为0,那么认为该node为Block node。

  • RESERVED_POOL可以认为是查询最大的SQL,其能满足GENERAL_POOL的内存限制策略,那么肯定会满足RESERVED_POOL的策略(复用了GENERAL_POOL策略)。

  • RESERVED_POOL目前版本未发现可以限制内存,所以当并发非常高,且scan的数据非常大时,有低概率会引起OOM问题。但是配合Resource Group,内存设置合理,也基本会避免OOM问题。

集群维度

同时满足以下两点时,Presto便认为集群超出要求的内存了:

  • GENERAL_POOL出现阻塞节点(Block node)
  • RESERVED_POOL已经被使用

当判断出集群超出CLuster Memory时,有两种方式管理内存:
1、挨个遍历每个查询,判断当前查询占用的总内存是否超过了query.max-memory(config.properties里配置),如果超过了,那么该查询就被failed。
2、如果query.max-memory配置的不合理,值非常大,那么可能过了5秒(默认时间)依然不满足第一种情形,那么将会使用第二种方法管理查询。第二种管理方法又分为两种小的管理,根据LowMemoryKillerPolicy来决定Kill查询策略,其分为total-reservation和total-reservation-on-blocked-nodes。配置total-reservation的作用是kill掉所有查询里最费内存的查询;而total-reservation-on-blocked-nodes杀死在内存不足(阻塞)的节点上使用最多内存的查询。

Resource Groups

Resource Groups 可以认为是Presto实现了一个弱资源限制和隔离功能。其可以为每个group指定队列大小、并发大小、内存使用大小。为每个group设置合理的hardConcurrencyLimit(最大并发数)、softMemoryLimit(内存最大使用值)及maxQueued(队列大小)一方面可以使不同业务影响降低,另一方面也大概率避免OOM问题,当然善于运用user及做下二次开发,就可以让Presto支持多用户共用同一分组和权限认证功能。

OOM

即使按照上述内存管理做了调优,但Presto依然会遇到OOM问题,此时会显示”INTERNAL_ERROR”,仔细查看错误原因为报类似的错误:

1
Encountered too many errors talking to a worker node. The node may have crashed or be under too much load. This is probably a transient issue, so please retry your query in a few minutes.

如果是这种情形,大概率是JVM OOM了。如果确定了确实是JVM OOM的原因,那么可以参考我们的JVM(G1 GC)调参经验,以下配置添加到jvm.config里会明显避免OOM问题。

1
2
3
-XX:G1ReservePercent=15
-XX:InitiatingHeapOccupancyPercent=40
-XX:ConcGCThreads=8

好了,本文简单介绍了下Presto内存管理方面的原理和一些经验,因为Presto相关的技术文章比较少,且关注的用户越来越多,所以我会不定期更新一些Presto相关的技术文章,如果有错误的地方也请及时提醒,有关注Presto的同行也欢迎与我讨论交流。

分布式SQL查询引擎Presto原理介绍

前言

我们实时引擎组新引入了一款分布式SQL查询引擎,名字叫Presto,目前已经调研和测试了2个月了,并且期间某平台也从impala平台迁入到了Presto平台,查询性能有了2-3倍的提升(各种原因导致),所以本文将结合作者这段时间的测试和调研研究,来揭开Presto的神秘面纱。

Presto是神马

Presto是由Facebook开发的一个分布式SQL查询引擎, 它被设计为用来专门进行高速、实时的数据分析。它的产生是为了解决Hive的MapReduce模型太慢以及不能通过BI或Dashboards直接展现HDFS数据等问题。Presto是一个纯粹的计算引擎,它不存储数据,其通过Connector获取第三方Storage服务的数据。

历史

  • 2012年秋季,Facebook启动Presto项目
  • 2013年冬季,Presto开源
  • 2017年11月,11888 commits,203 releases,198 contributors

功能和优点

  • Ad-hoc,期望查询时间秒级或几分钟
  • 比Hive快10倍
  • 支持多数据源,如Hive、Kafka、MySQL、MonogoDB、Redis、JMX等,也可自己实现Connector
  • Client Protocol: HTTP+JSON, support various languages(Python, Ruby, PHP, Node.js Java)
  • 支持JDBC/ODBC连接
  • ANSI SQL,支持窗口函数,join,聚合,复杂查询等

架构

  • Master-Slave架构
  • 三个模块
    • Coordinator、Discovery Service、Worker
  • Connector

Presto沿用了通用的Master-Slave架构,Coordinator即Presto的Master,Worker即其Slave,Discovery Service就是用来保存Worker结点信息的,通过HTTP协议通信,而Connector用于获取第三方存储的Metadata及原始数据等。

Coordinator负责解析SQL语句,生成执行计划,分发执行任务给Worker节点执行;Worker节点负责实际执行查询任务。Worker节点启动后向Discovery Server服务注册,Coordinator从Discovery Server获得可以正常工作的Worker节点。假如配置了Hive Connector,需要配置一个Hive MetaStore服务为Presto提供Hive元信息,Worker节点与HDFS交互读取数据。

部署方式

Presto常见的部署方式如下图所示:

Coordinator与Discovery Server耦合在一起混合部署,然后部署多台Worker。然而这个有个问题,就是Coordinator存在单点问题,我们目前线上使用ip漂移的方法(网卡绑定多ip)。如下图所示:

查询流程

整体查询流程为:

  • Client使用HTTP协议发送一个query请求。
  • 通过Discovery Server发现可用的Server。
  • Coordinator构建查询计划(Connector插件提供Metadata)
  • Coordinator向workers发送任务
  • Worker通过Connector插件读取数据
  • Worker在内存里执行任务(Worker是纯内存型计算引擎)
  • Worker将数据返回给Coordinator,之后再Response Client

SQL执行流程


当Coordinator收到一个Query,其SQL执行流程如上图所示。SQL通过Anltr3解析为AST(抽象语法树),然后通过Connector获取原始数据的Metadata信息,这里会有一些优化,比如缓存Metadata信息等,根据Metadata信息生成逻辑计划,然后会依次生成分发计划和执行计划,在执行计划里需要去Discovery里获取可用的node列表,然后根据一定的策略,将这些计划分发到指定的Worker机器上,Worker机器再分别执行。

与Hive比较


上图显示了MapReduce与Presto的执行过程的不同点,MR每个操作要么需要写磁盘,要么需要等待前一个stage全部完成才开始执行,而Presto将SQL转换为多个stage,每个stage又由多个tasks执行,每个tasks又将分为多个split。所有的task是并行的方式进行允许,stage之间数据是以pipeline形式流式的执行,数据之间的传输也是通过网络以Memory-to-Memory的形式进行,没有磁盘io操作。这也是Presto性能比Hive快很多倍的决定性原因。

实现低延时的原理

  • 完全基于内存的并行计算
  • 流水线
  • 本地化计算
  • 动态编译执行计划
  • 小心使用内存和数据结构
  • 类BlinkDB的近似查询
  • GC控制

当然其优化方法也包括了一些传统的SQL优化原理,关于这些优化细节等后续文章详细介绍。

缺点

前面介绍了Presto的各种优点,其实其也有一些缺点,主要缺点为以下三条:

  • No fault tolerance;当一个Query分发到多个Worker去执行时,当有一个Worker因为各种原因查询失败,那么Master会感知到,整个Query也就查询失败了,而Presto并没有重试机制,所以需要用户方实现重试机制。
  • Memory Limitations for aggregations, huge joins;比如多表join需要很大的内存,由于Presto是纯内存计算,所以当内存不够时,Presto并不会将结果dump到磁盘上,所以查询也就失败了,但最新版本的Presto已支持写磁盘操作,这个待后续测试和调研。
  • MPP(Massively Parallel Processing )架构;这个并不能说其是一个缺点,因为MPP架构就是解决大量数据分析而产生的,但是其缺点也很明显,假如我们访问的是Hive数据源,如果其中一台Worke由于load问题,数据处理很慢,那么整个查询都会受到影响,因为上游需要等待上游结果。
    这篇文章就先介绍这里吧,后续会陆续更新一系列Presto相关的文章,欢迎关注。

参考链接

https://tech.meituan.com/presto.html

说说常见搜索引擎的分布式解决方法

随着索引数据的增大以及请求的增多,分布式搜索是最好的一种解决方案,主要解决两个问题,其一是能让单台机器load所有索引数据到内存中,其二是请求延时大,解决请求latency问题。我之前为团队写了篇专利,内容是关于分布式搜索解决方案的,所以粗略的看了下大部分开源的搜索引擎是怎么实现分布式的,后面的文章我会简单说下常见的搜索引擎的分布式解决方案。

首先我们先说下几个简单概念,分布式搜索都是M*N(横向切分数据,纵向切分流量)这个维度去解决问题的,虽然不同产品或场景概念不完全相同,读者可以简单认为一份完整的数据,被均分为M份,每一份被称为一个分配(Shard或者Partition),然后提供对每个Shard提供N份副本(Replica)。那么分布式的设计就围绕着以下问题:

  • 如何选择合适的分片(Shard),副本(Replica)的数量
  • 如何做路由,即怎么在所有Shard里找到一份完整的数据(找到对应的机器列表)
  • 如何做负载均衡
  • 如果提高服务的可扩展性
  • 如何提高服务的服务能力(QPS),当索引和搜索并发量增大时,如何平滑解决
  • 如何更新索引,全量和增量索引的更新解决方法
  • 如果提高服务的稳定性,单台服务挂掉怎么不影响整体服务等等

下面就说下常见的搜索引擎的分布式解决方案,因为开源的搜索产品基本上都没有在工作中用过,对代码细节并不是太了解,只是研究了下其原理,所以理解的会有些偏差,看官们如果发现错误直接指出即可。

Sphinx/Coreseek

Sphinx的流程还是很简单的,可以看下其流程图:

需要支持分布式的话,需要改下配置,大致是这样子的:

1
2
3
4
5
6
7
8
index dist
{
type = distributed
local = chunk1
agent = localhost:9312:chunk2 本地
agent = 192.168.100.2:9312:chunk3 远程
agent = 192.168.100.3:9312:chunk4 远程
}

从图中也可以看出,需要在配置列表里配置好其他shard的地址。查询过程为:

  • 连接到远程代理
  • 执行查询
    • 对本地索引进行查询
    • 接收来自远程代理的搜索结果
  • 将所有结果合并,删除重复项
  • 将合并后的结果返回给客户端

索引数据复制同步的方法也是常用的两种:

  • 主从同步
  • 增量更新索引

方法也是设置crontab,添加2个选项,一个是重建主索引,一个是增量索引更新。

当然为了避免单点以及增加服务能力,肯定有多个Replica,解决方法应该也是配置或者haproxy相关的方法解决,从上面可以看出,Sphinx很难用,自动化能力太弱,所以很多大厂要么不再使用Sphinx要么基于其二次开发。

Solr

Solr提供了两种方案来应对访问压力,其一是Replication,另一个是SolrCloud。我们此处只说Replication原理。
Replication采用了Master/Slave模式,也就是说由一个主索引和多个从索引构成,从索引从主索引复制索引,主索引负责更新索引,从索引负责同步索引和查询。本质上是读写分离的思想,MySQL/Redis等数据库也多是这种方式部署的。有两种部署方式:

  • 第一种

  • 第二种

与第一种相比多了一层Repeater,Repeater既扮演了Master角色,又扮演了Slave功能,主要解决单个Master下Slave太多,Master压力太大的问题。

Master与Slave之间的通信是无状态的http连接,Slave端发送不同的Command从Master端获得数据。原理就是Master那边有个标志位和版本号,用于获取正确的数据版本,然后数据扔到Slave临时目录下,数据完整后,再覆盖原有数据。多个副本的方法应该与Sphinx相似,一般也是通过通过上游负载均衡模块如Nginx,HaProxy来分流。

SolrCloud

因为Solr Replication不好用,本质上还不算真正分布式的,所以Solr从4.0开始支持SolrCloud模式。特性不少,主要说两个吧:

  • 配置文件统一管理,扔到Zookeeper上
  • 自动做负载均衡和故障恢复,不再需要Nginx或HaProxy的支持

逻辑图

  • Collection:逻辑意义上一份完整的索引
  • Shard:上文已说,就是索引的1/N分片
  • Replica:Shard的一个拷贝

每个Shard,即相同的Replica下都会有一个leader,leader选举由Zookeeper完成。虽然有leader的概念,但是其实SolrCloud分布式是去中心化的,意思就是说,leader和非leader都能提供查询功能(也有修改和删除功能,搜索场景应用不多吧?),而更新索引,创建Collection/Shard/Replica(即扩容)只能由leader完成,避免产生并发修改问题,当非leader节点收到修改操作请求时,将信息存储在zookeeper中相应节点上,leader节点会一直对该节点进行watch,发现变化就实时做处理。

Zookeeper信息

图片出处见参考链接

创建索引

  • 任意节点收到创建索引请求后,转换成json格式存储到zk的/overseer/collection-queue-work的children节点上
  • leader线程一直监控collection-queue-work节点,检查到变化后,取出json数据,根据信息计算出需要创建的shard、replica,将创建具体replica的请求转向各对应节点
  • 各节点创建完具体的replica后,将该节点的状态(创建成功与否等)更新到/overseer/queue的children节点上
  • leader线程监控/overseer/queue节点,将overseer/queue的children节点的状态更新至/clusterstate.json
  • 各节点同步/clusterstate.json,整个集群状态得到更新,新索引创建成功

更新索引

  • 根据路由规则计算出该doc所属shard,并找出该doc所属的shard对应的leader
  • 如果当前Replica是对应Shard且是leader,首先更新本地索引,然后再将doc转向该Shard的其余Replica

扩容/缩容

  • 停掉某台Solr,更新集群状态到/clusterstate.json
  • 增加一台Solr,从leader出复制相同的数据,然后配置写到/clusterstate.json

查询

  • 去中心的,leader和非leader一样功能,Replica接收搜索请求时,从Zookeeper中获取该Replica对应的Collection及所有的Shard和Replica
  • 将请求发送到该Collection下对应的Shard,然后负载均衡到对应的Replica

SolrCloud也有其他功能,比如Optimization,就是一个运行在leader机器的进程,复杂压缩索引和归并Segment;近实时搜索等。总体看SolrCloud解决了Solr Replication遇到的一些问题,比Sphinx更好用,更自动化。

一号店

很多大一点的厂商如果不自研搜索引擎的话,并没有使用SolrCloud,而多基于Solr/Lucence。比如一号店的分布式搜索解决方案,如下所示:
http://www.infoq.com/cn/articles/yhd-11-11-distributed-search-engine-architecture

Broker就相当于Proxy,扮演了路由功能,很多厂商做的与一号店有相似之处。因为没有leader选举,所以索引的更新就由其他模块来做了。

ElasticSearch

ElasticSearch的倒排索引也是基于Lucence实现的。功能强大,不仅提供了实时搜索功能,还有分析功能,DB-Engines上面的搜索引擎排名,目前已经超越Solr排名第一位了。因为太强大了,功能也特别多,我研究还不够深,简单说下吧。
es会将集群名字相同的机器归为一个集群(业务),所以先说下启动过程。

启动过程

当ElasticSearch的节点启动后,它会利用多播(multicast)(或者单播,如果用户更改了配置)寻找集群中的其它节点,并与之建立连接。

leader选举

与SolrCloud相似,也是去中心化的,但是没有使用Zookeeper,而是自己实现了分布式锁,选主的流程叫做 ZenDiscovery(详情见第三个参考链接):

  • 节点启动后先ping(这里的ping是 Elasticsearch 的一个RPC命令。如果 discovery.zen.ping.unicast.hosts 有设置,则ping设置中的host,否则尝试ping localhost 的几个端口, Elasticsearch 支持同一个主机启动多个节点)
  • Ping的response会包含该节点的基本信息以及该节点认为的master节点
  • 选举开始,先从各节点认为的master中选,规则很简单,按照id的字典序排序,取第一个
  • 如果各节点都没有认为的master,则从所有节点中选择,规则同上。这里有个限制条件就是 discovery.zen.minimum_master_nodes,如果节点数达不到最小值的限制,则循环上述过程,直到节点数足够可以开始选举
  • 最后选举结果是肯定能选举出一个master,如果只有一个local节点那就选出的是自己
  • 如果当前节点是master,则开始等待节点数达到 minimum_master_nodes,然后提供服务
  • 如果当前节点不是master,则尝试加入master

选举完leader后,主节点leader会去读取集群状态信息;因为主节点会监控其他节点,当其他节点出现故障时,会进行恢复工作。在这个阶段,主节点会去检查哪些分片可用,决定哪些分片作为主分片。

分片

es在创建索引时,自己设置好分片个数,默认5个,整个过程类似于分裂的概念,如下图所示:

至于读写、写操作等于SolrCloud相似,等我细研究后后续再说吧,也可以说下实时索引怎么做的,细节很多,下次再说吧。至于文中为什么不说Lucence,因为Lucence其实就是个index lib,只是解决倒排、正排索引怎么存放的,并不是一个完整的搜索引擎解决方案。而ES为什么能脱颖而出的主要原因是配套设施完善,工具,Web UI都是非常赞的,对于很多开源产品,它能后来居上的主要原因就是它真实的能解决用户遇到的问题或者比其他产品更好用。搜索引擎发展这么多年了,架构这块能做的基本上大家都差不太多,最后能脱颖而出的肯定是第三方工具做的更完善,更好用的了。

PS:
至于阿里搜索怎么做的,可以参考下这个文档,包括了阿里搜索里用到的很多基础模块了:
https://share.weiyun.com/f66e79d9f6897d0aac683361531cf00d

参考链接:
http://blog.haohtml.com/archives/13724
http://www.voidcn.com/blog/u011026968/article/p-4922079.html
http://jolestar.com/elasticsearch-architecture/

Minder:一个分布式启停进程服务

关于

打算造个轮子,先备份下简单的设计手册吧。Minder是一个分布式启停进程服务,主要用于以下目的。

  • 命令或远程RPC调用启动和停止进程
  • 当进程挂掉,可以拉起来进程
  • 获得机器对应的系统信息,如CPU,内存,硬盘等等

设计

方案一(不采用)

分为Server和Client端

Server

Server在每台机器上都需要安装,Server是个常驻进程,负责真正的启停进程,同时需要一个Check线程,用于判断进程状态,当程序挂掉以便重启进程,重启进程可以在配置里有个次数限制。

Client

Client就是一个工具,用于向指定的Server发送请求,Server收到请求,启动进程。因为协议使用RPC,为了方便,使用脚本语言编写更好。

程序流程

Client发送请求,根据Client命令,判断是启动还是停止进程,假设是start进程,那么通过RPC的stub_client发送到Server的stub_server,之后调用对应的startProcess函数,通过fork+exec启动进程,同时Server需要有个Loop线程,判断进程的状态,以便重启进程。因为一个Server需要启动不同的服务,同时Server也可能会挂掉,所以Server需要持久化进程的信息,以便Server挂掉后恢复这些进程信息。

细节

  • 想象下我们启动进程都需要哪些参数?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"processInfos": [
{
"isDaemon": true, // 是否是守护进程
"processName": "app", // 进程名字
"restartInterval": 10, // 重启时间间隔
"restartCountLimit": 10, // 重启次数
"signal": 9, // 停止进程的信号值,kill函数对应的值
"parameters": "-m cluster -n 1", // 进程需要的参数
"environmentVariable" : { // 需要的环境变量
"key" : "key",
"value" : "value"
}
}
]
}

因为我们请求是Client工具向Server发送请求的,通信方式为RPC,所以上述参数都为PB格式。

  • Server端启动进程方式为fork+exec,停止进程为kill发送信号。

  • Client只是个工具,发送请求的方式可以为以下命令: minder_client -m start -s \”10.10.10.10:8888\” -p /bin/sleep 大意就是向Server发送请求,请求启动sleep进程。

架构图

方案二

分为Manager、Slave和Tools,第一版本只有一个Manager,Slave在每台机器都有。如果看懂方案一,方案二许多细节就可以不用说了,只见到说下每个模块干啥的吧。

Manager

只有Manager给用户打交道,用户向Manager提交需要启动的机器ip,进程信息,Manager向Slave发送消息。同时如果用户需要获取Slave机器的机器状态,如CPU,内存以及磁盘信息等,也是通过Manager获取的。

Slave

用户Slave是不与用户打交道的,Slave用于接受Manager的请求,启停进程,获取Slave机器状态,然后将对应的信息返回给Manager。

Tools

封装了对应的RPC请求,用户向Manager发送请求。

程序流程

Manager接受用户的请求,根据请求,Manager向Slave发送请求,Slave接受到请求后执行对应的操作。同时Manager持久化对应的信息。因为Manger必须知道Slave的机器ip,如果这个需要Slave主动汇报给Manager,四层心跳既可以。

架构图

当然,这个设计也是有很多改进的方法,比如Manager挂了怎么办,Manager存在单点问题,既然启动进程,怎么可以改善下搞成灰度发布,等等。因为时间问题,我工作也非常忙,这个我就不细说了。轮子有空造,其实写出来方案,思路基本上就清晰了。

另外,其实写这篇文字时候,与微信朋友圈里与朋友们交流了下想法,发现了2个系统是可以参考的,一个是Hadoop Ambari,另一个是Cloudera Manager。

一些常见的搜索查询树优化方法

搜索引擎服务收到一个Query后,一般引擎这边是这么搞得,解析语法,生成后缀表达式,即查询搜索树(Search Tree)。搜索查询树负责求交、求并和过滤。所以这个地方也是性能关键点。所以在解析语法后,一般要做查询搜索树优化,减少求交,求并和过滤操作的次数,以此来提高搜索服务的QPS和查询Latency。

因为同事在负责新实时索引的性能优化,我正好负责将全量索引格式迁移到新实时索引上面,即实时索引的代码即支持全量索引也支持实时索引,减少运维和代码维护成本。我在查阅代码的同事,顺便与同事沟通了一些优化方法,进入全量索引代码里发现老的全量索引里已经有一些优化方法了,这些方法应该是市面上常见的方法,多数人也是应该知道的,这里记录下吧。主流优化操作主要包括以下四种:

全AND操作

如果父子节点都是AND,则可以合并,如(A & B)& (C & D),优化后为 A & B & C & D ,能提早发现交为空,并退出,如下图所示。

优化后为:

全OR操作

如果父子节点都是OR,则可以合并。如 (A | B)| (C | D),优化后为 A | B | C | D,能提早发现为满并退出。如下图所示:

优化后为:

将子节点的OR上提

如 (A | B) & (C &D),优化后为 A |(B & C | D),查找次数减少。

优化后为:

按照Doc数量排序

如 A & B & C,可以按照Doc数量排序后查询,Sum(A) < Sum(B) < Sum(C),则A & B & C可以尽早发现为空并退出。

当然查询搜索树优化不止这么些方法,比如之前就知道可以使用WAND提升长Query的检索速度,很久没看了,细节记不清了,就不说了。以后需要的时候再研究下。

dlog:高性能线程安全的C++日志库

打算闲暇时光写点靠谱的东西比如名字服务、KV存储等基础服务类软件,所以周末就先从封装一些基础库开始。上周末写了个C++日志库,支持多线程,性能也还不错,今天就做了下测试和修改了下bug。这里说下该库的使用方法,以及实现思路以及一些性能调优方法。欢迎交流和指教。

源码地址

https://github.com/armsword/dlog

编译

执行build.sh,在上层目录里会生成build文件,测试的可执行文件在release/bin目录下。

使用方法

将etc文件下的dlog.json扔到可执行文件的当前目录(当然只要能让可执行文件找到dlog.json即可),每个函数包含logger文件夹下的Log.h头文件,在主函数里调用DLOG_INIT初始化一次,之后在每个需要打印log的文件里调用DLOG_LOG即可。
如实例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <dlog/logger/Log.h>
#include <unistd.h>
using namespace dlog::logger;

int main() {
DLOG_INIT();

DLOG_LOG(WARN, "test the log level using log lib!");
DLOG_LOG(DEBUG, "Hello World, %d",2016);
DLOG_LOG(INFO, "test Log C/C++ lib");
DLOG_LOG(ERROR, "Hello everyone, this is my blog: http://armsword.com/; Welcome to visit it,Thank you!");

return 0;
}

输出样例

目录名:dlog.log.20160911.0
日志内容:

1
2
3
4
[2016-09-11 15:05:30.510] [WARN] [4246443808] [/home/Github/dlog/example/LogTest.cpp:8] [main] test the log level using log lib!
[2016-09-11 15:05:30.510] [DEBUG] [4246443808] [/home/Github/dlog/example/LogTest.cpp:9] [main] Hello World, 2016
[2016-09-11 15:05:30.510] [INFO] [4246443808] [/home/Github/dlog/example/LogTest.cpp:10] [main] test Log C/C++ lib
[2016-09-11 15:05:30.510] [ERROR] [4246443808] [/home/Github/dlog/example/LogTest.cpp:11] [main] Hello everyone, this is my blog: http://armsword.com/; Welcome to visit it,Thank you!

该日志库功能

  • 包含四种日志级别,分别为WARN、DEBUG、INFO、ERROR,日志级别大小依次递增
  • 可配置输出日志路径
  • 可配置输出日志前缀
  • 可定义输出的日志级别,默认DEBUG
  • 可定义日志文件切分大小
  • 支持多线程程序
  • 可定义日志往磁盘刷新的方式
  • 支持每天切换新的日志文件
  • 支持log文件被删除时,从新建立日志文件

dlog.json配置

1
2
3
4
5
6
7
{
"log_path": "./log", // 日志路径
"log_prefix": "dlog", // 日志前缀
"log_level": "DEBUG", // 输出日志级别,DEBUG表示大于等于DEBUG级别的日志都打印
"max_file_size": 200, // 日志切分大小,单位m
"async_flush": true // 日志往磁盘刷新方式,true表示异步,false表示同步,建议选择true
}

性能

在测试机上测试了下机器的硬盘真实io写速度

1
2
3
4
5
6
time dd if=/dev/zero of=test.dbf bs=8k count=300000 oflag=direct
记录了300000+0 的读入
记录了300000+0 的写出
2457600000字节(2.5 GB)已复制,38.0385 秒,64.6 MB/秒

注:oflag=direct 表示使用DirectIO方式,不使用文件系统的buffer等

而我用dlog写入3.5G(1000W条数据),用时大约51、52s,计算下来,写速度大约69M/s,与上面测试的磁盘io数据64.6 MB/s比起来,性能还算不错(因为cache问题,要比裸写磁盘性能要好),当然由于时间问题,我测试还不够充分,以后有机会继续优化下再测试看下。

一些性能调优技巧

  • 为了避免锁竞争,使用了一种更为高效的线程局部存储方法,就是使用关键字__thread来定义变量,__thread是GCC内置的线程局部存储设施(Thread-Local Storage),凡是带有__thread的变量,每个线程都拥有该变量的一份拷贝,且互不干扰。
  • 使用likely,unlikely来提高CPU分支预测正确率来提高性能。我们定义likely、unlikely如下:
    1
    2
    #define likely(x) __builtin_expect((x), 1)
    #define unlikely(x) __builtin_expect((x), 0)

其中__builtin_expect是gcc提供的函数。顾名思义,likely表示这件事很大概率会发生 :)

  • 使用loop线程来判断文件是否需要切分(打开新的fd),并且open、close这种费时操作,只在loop线程里完成,不阻塞log输出线程(一些小技巧保证线程安全)。

本日志库主要是为了满足个人需求,当然即使公司业务,没特殊需求的话也足够用了,所以并没有写的像log4j或者log4cxx那么复杂,但即便如此该日志库还有一些提高空间,等我有空再继续优化下,欢迎交流和指教,谢谢。

致谢:

因为很久没用Cmake了(之前在阿里用ascons),基本上都忘光了。并且阿里的一些经验使我比较在意代码目录的组织方式,搜索后发现一篇文章讲Cmake非常不错,作者很用心,非常感谢。

Cmake入门实战