Presto
olap
-
Druid :
是一个实时处理时序数据的OLAP数据库,因为它的索引按照时间分片,查询的时候也是按照时间线去路由索引。 -
Kylin
核心是Cube,Cube是一种预计算技术,基本思路是预先对数据作多维索引,查询时只扫描索引而不访问原始数据从而提速。 -
Presto:
它没有使用MapReduce,大部分场景下比hive快一个数量级,其中的关键是 所有的处理都在内存中完成 。 -
Impala:
基于内存运算,速度快,支持的数据源没有Presto多。 -
Spark SQL:
基于Spark平台上的一个OLAP框架,基本思路是增加机器来并行运算,从而提高查询速度。 -
ES:
最大的特点是使用了倒排索引解决问题。ES在数据获取和聚集用的资源比在Druid高。
转自:
https://www.
jianshu.com/p/bbfdce443
3b1
Presto是一个开源的分布式SQL查询引擎,数据量支持GB到PB字节,主要用来处理秒级查询的场景。
虽presto可以解析SQL,但它不是一个标准的数据库,不是MySQL、Oracle的代替品,也不能用来处理在线事务(OLTP);
Presto 整体架构
1.Client :包括 prosto-cli 客户端以及 JDBC 驱动、ODBC 或 其他语言实现的Driver; 2.Discovery Service :是将 coordinator 和 worker 结合到一起的服务。Worker 节点启动后向Discovery Server 服务注册,Coordinator 从 Discovery Server 获得可以正常工作的 Worker 节点; 3.Coordinator :主要用于接收客户端提交的查询,解析查询语句,执行词法分析生成查询执行计划,并生成Stage 和 Task 进行调度;然后合并结果,把结果返回给客户端(Client); 4.Worker :主要负责与数据的读写交互以及执行查询计划;Coordinator 和 Worker 可一起启动,这样小规模的集群或伪分布式可以节省一些资源。
Presto SQL执行步骤
1. 客户端通过 HTTP 发送一个查询语句给Presto集群的Coordinator;
2. Coordinator 接收到客户端传来的查询语句,对该语句进行解析、生成查询执行计划,并根据查询执行计划依次生成 SqlQueryExecution -> SqlStageExecution -> HttpRemoteTask;
3. Coordinator 将每个Task分发到所需要处理的数据所在的Worker上进行分析;
4. 执行Source Stage 的 Task,这些Task通过Connector从数据源中读取所需要的数据;
5. 处于下游的Stage中用的Task会读取上游的Stage产生的输出结果,并在该Stage的每个Task所在的Worker内存中进行后续的计算和处理;
6. Coordinator 从分发的Task之后,一直持续不断的从Single Stage 中的Task获得计算结果,并将结果写入到缓存中,直到所所有的计算结束;
7. Client 从提交查询后,就一直监听 Coordinator 中的本次查询结果集,立即输出。直到轮训到所有的结果都返回,本次查询结束;
部署方式
Presto常见的部署方式如下图所示:
Coordinator与Discovery Server耦合在一起混合部署,然后部署多台Worker。然而这个有个问题,就是Coordinator存在单点问题,我们目前线上使用ip漂移的方法(网卡绑定多ip)。如下图所示:
查询流程
整体查询流程为:
- Client使用HTTP协议发送一个query请求。
- 通过Discovery Server发现可用的Server。
- Coordinator构建查询计划(Connector插件提供Metadata)
- Coordinator向workers发送任务
- Worker通过Connector插件读取数据
- Worker在内存里执行任务(Worker是纯内存型计算引擎)
- Worker将数据返回给Coordinator,之后再Response Client
SQL执行流程
当Coordinator收到一个Query,其SQL执行流程如上图所示。SQL通过Anltr3解析为AST(抽象语法树),然后通过Connector获取原始数据的Metadata信息,这里会有一些优化,比如缓存Metadata信息等,根据Metadata信息生成逻辑计划,然后会依次生成分发计划和执行计划,在执行计划里需要去Discovery里获取可用的node列表,然后根据一定的策略,将这些计划分发到指定的Worker机器上,Worker机器再分别执行。
Presto、Impala性能比较
测试结论:Impala性能稍领先于Presto,但是Presto在数据源支持上非常丰富,包括Hive、图数据库、传统关系型数据库、Redis等。
转自: OLAP分析工具之Presto - kris12 - 博客园
Facebook的数据仓库存储在少量大型Hadoop/HDFS集群。Hive是Facebook在几年前专为Hadoop打造的一款数据仓库工具。在以前,Facebook的科学家和分析师一直依靠Hive来做数据分析。但Hive使用MapReduce作为底层计算框架,是专为批处理设计的。但随着数据越来越多,使用Hive进行一个简单的数据查询可能要花费几分到几小时,显然不能满足交互式查询的需求。Facebook也调研了其他比Hive更快的工具,但它们要么在功能有所限制要么就太简单,以至于无法操作Facebook庞大的数据仓库。
2012年开始试用的一些外部项目都不合适,他们决定自己开发,这就是Presto。2012年秋季开始开发,目前该项目已经在超过 1000名Facebook雇员中使用,运行超过30000个查询,每日数据在1PB级别。Facebook称Presto的性能比Hive要好上10倍多。2013年Facebook正式宣布开源Presto。
本文首先介绍Presto从用户提交SQL到执行的这一个过程,然后尝试对Presto实现实时查询的原理进行分析和总结,最后介绍Presto在美团的使用情况。
Presto架构
presto架构图
Presto查询引擎是一个Master-Slave的架构,由一个Coordinator节点,一个Discovery Server节点,多个Worker节点组成,Discovery Server通常内嵌于Coordinator节点中。Coordinator负责解析SQL语句,生成执行计划,分发执行任务给Worker节点执行。Worker节点负责实际执行查询任务。Worker节点启动后向Discovery Server服务注册,Coordinator从Discovery Server获得可以正常工作的Worker节点。如果配置了Hive Connector,需要配置一个Hive MetaStore服务为Presto提供Hive元信息,Worker节点与HDFS交互读取数据。
Presto执行查询过程简介
既然Presto是一个交互式的查询引擎,我们最关心的就是Presto实现低延时查询的原理,我认为主要是下面几个关键点,当然还有一些传统的SQL优化原理,这里不介绍了。
- 完全基于内存的并行计算
- 流水线
- 本地化计算
- 动态编译执行计划
- 小心使用内存和数据结构
- 类BlinkDB的近似查询
- GC控制
为了介绍上述几个要点,这里先介绍一下Presto执行查询的过程
提交查询
用户使用Presto Cli提交一个查询语句后,Cli使用HTTP协议与Coordinator通信,Coordinator收到查询请求后调用SqlParser解析SQL语句得到Statement对象,并将Statement封装成一个QueryStarter对象放入线程池中等待执行。
提交查询
SQL编译过程
Presto与Hive一样,使用Antlr编写SQL语法,语法规则定义在Statement.g和StatementBuilder.g两个文件中。 如下图中所示从SQL编译为最终的物理执行计划大概分为5部,最终生成在每个Worker节点上运行的LocalExecutionPlan,这里不详细介绍SQL解析为逻辑执行计划的过程,通过一个SQL语句来理解查询计划生成之后的计算过程。
SQL解析过程
样例SQL:
select c1.rank, count(*) from dim.city c1 join dim.city c2 on c1.id = c2.id where c1.id > 10 group by c1.rank limit 10;
逻辑执行计划
上面的SQL语句生成的逻辑执行计划Plan如上图所示。那么Presto是如何对上面的逻辑执行计划进行拆分以较高的并行度去执行完这个计划呢,我们来看看物理执行计划。
物理执行计划
逻辑执行计划图中的虚线就是Presto对逻辑执行计划的切分点,逻辑计划Plan生成的SubPlan分为四个部分,每一个SubPlan都会提交到一个或者多个Worker节点上执行。
SubPlan有几个重要的属性planDistribution、outputPartitioning、partitionBy属性。
- PlanDistribution表示一个查询Stage的分发方式,逻辑执行计划图中的4个SubPlan共有3种不同的PlanDistribution方式:Source表示这个SubPlan是数据源,Source类型的任务会按照数据源大小确定分配多少个节点进行执行;Fixed表示这个SubPlan会分配固定的节点数进行执行(Config配置中的query.initial-hash-partitions参数配置,默认是8);None表示这个SubPlan只分配到一个节点进行执行。在下面的执行计划中,SubPlan1和SubPlan0 PlanDistribution=Source,这两个SubPlan都是提供数据源的节点,SubPlan1所有节点的读取数据都会发向SubPlan0的每一个节点;SubPlan2分配8个节点执行最终的聚合操作;SubPlan3只负责输出最后计算完成的数据。
- OutputPartitioning属性只有两个值HASH和NONE,表示这个SubPlan的输出是否按照partitionBy的key值对数据进行Shuffle。在下面的执行计划中只有SubPlan0的OutputPartitioning=HASH,所以SubPlan2接收到的数据是按照rank字段Partition后的数据。
物理执行计划
完全基于内存的并行计算
查询的并行执行流程
Presto SQL的执行流程如下图所示
- Cli通过HTTP协议提交SQL查询之后,查询请求封装成一个SqlQueryExecution对象交给Coordinator的SqlQueryManager#queryExecutor线程池去执行
- 每个SqlQueryExecution线程(图中Q-X线程)启动后对查询请求的SQL进行语法解析和优化并最终生成多个Stage的SqlStageExecution任务,每个SqlStageExecution任务仍然交给同样的线程池去执行
- 每个SqlStageExecution线程(图中S-X线程)启动后每个Stage的任务按PlanDistribution属性构造一个或者多个RemoteTask通过HTTP协议分配给远端的Worker节点执行
- Worker节点接收到RemoteTask请求之后,启动一个SqlTaskExecution线程(图中T-X线程)将这个任务的每个Split包装成一个PrioritizedSplitRunner任务(图中SR-X)交给Worker节点的TaskExecutor#executor线程池去执行
查询执行流程
上面的执行计划实际执行效果如下图所示。
- Coordinator通过HTTP协议调用Worker节点的 /v1/task 接口将执行计划分配给所有Worker节点(图中蓝色箭头)
- SubPlan1的每个节点读取一个Split的数据并过滤后将数据分发给每个SubPlan0节点进行Join操作和Partial Aggr操作
- SubPlan1的每个节点计算完成后按GroupBy Key的Hash值将数据分发到不同的SubPlan2节点
- 所有SubPlan2节点计算完成后将数据分发到SubPlan3节点
- SubPlan3节点计算完成后通知Coordinator结束查询,并将数据发送给Coordinator
执行计划计算流程
源数据的并行读取
在上面的执行计划中SubPlan1和SubPlan0都是Source节点,其实它们读取HDFS文件数据的方式就是调用的HDFS InputSplit API,然后每个InputSplit分配一个Worker节点去执行,每个Worker节点分配的InputSplit数目上限是参数可配置的,Config中的query.max-pending-splits-per-node参数配置,默认是100。
分布式的Hash聚合
上面的执行计划在SubPlan0中会进行一次Partial的聚合计算,计算每个Worker节点读取的部分数据的部分聚合结果,然后SubPlan0的输出会按照group by字段的Hash值分配不同的计算节点,最后SubPlan3合并所有结果并输出
流水线
数据模型
Presto中处理的最小数据单元是一个Page对象,Page对象的数据结构如下图所示。一个Page对象包含多个Block对象,每个Block对象是一个字节数组,存储一个字段的若干行。多个Block横切的一行是真实的一行数据。一个Page最大1MB,最多16*1024行数据。
数据模型
节点内部流水线计算
下图是一个Worker节点内部的计算流程图,左侧是任务的执行流程图。
Worker节点将最细粒度的任务封装成一个PrioritizedSplitRunner对象,放入pending split优先级队列中。每个
Worker节点启动一定数目的线程进行计算,线程数task.shard.max-threads=availableProcessors() * 4,在config中配置。
每个空闲的线程从队列中取出一个PrioritizedSplitRunner对象执行,如果执行完成一个周期,超过最大执行时间1秒钟,判断任务是否执行完成,如果完成,从allSplits队列中删除,如果没有,则放回pendingSplits队列中。
每个任务的执行流程如下图右侧,依次遍历所有Operator,尝试从上一个Operator取一个Page对象,如果取得的Page不为空,交给下一个Operator执行。
节点内部流水线计算
节点间流水线计算
下图是ExchangeOperator的执行流程图,ExchangeOperator为每一个Split启动一个HttpPageBufferClient对象,主动向上一个Stage的Worker节点拉数据,数据的最小单位也是一个Page对象,取到数据后放入Pages队列中
节点间流水线计算
本地化计算
Presto在选择Source任务计算节点的时候,对于每一个Split,按下面的策略选择一些minCandidates
- 优先选择与Split同一个Host的Worker节点
- 如果节点不够优先选择与Split同一个Rack的Worker节点
- 如果节点还不够随机选择其他Rack的节点
对于所有Candidate节点,选择assignedSplits最少的节点。
动态编译执行计划
Presto会将执行计划中的ScanFilterAndProjectOperator和FilterAndProjectOperator动态编译为Byte Code,并交给JIT去编译为native代码。Presto也使用了Google Guava提供的LoadingCache缓存生成的Byte Code。
动态编译执行计划
动态编译执行计划
上面的两段代码片段中,第一段为没有动态编译前的代码,第二段代码为动态编译生成的Byte Code反编译之后还原的优化代 码,我们看到这里采用了循环展开的优化方法。
循环展开最常用来降低循环开销,为具有多个功能单元的处理器提供指令级并行。也有利于指令流水线的调度。
小心使用内存和数据结构
使用Slice进行内存操作,Slice使用Unsafe#copyMemory实现了高效的内存拷贝,Slice仓库参考: https:// github.com/airlift/slic e
Facebook工程师在另一篇介绍ORCFile优化的文章中也提到使用Slice将ORCFile的写性能提高了20%~30%,参考: https:// code.facebook.com/posts /229861827208629/scaling-the-facebook-data-warehouse-to-300-pb/
类BlinkDB的近似查询
为了加快avg、count distinct、percentile等聚合函数的查询速度,Presto团队与BlinkDB作者之一Sameer Agarwal合作引入了一些近似查询函数approx_avg、approx_distinct、approx_percentile。approx_distinct使用HyperLogLog Counting算法实现。
GC控制
Presto团队在使用hotspot java7时发现了一个JIT的BUG,当代码缓存快要达到上限时,JIT可能会停止工作,从而无法将使用频率高的代码动态编译为native代码。
Presto团队使用了一个比较Hack的方法去解决这个问题,增加一个线程在代码缓存达到70%以上时进行显式GC,使得已经加载的Class从perm中移除,避免JIT无法正常工作的BUG。
Presto TPCH benchmark测试
介绍了上述这么多点,我们最关心的还是Presto性能测试,Presto中实现了TPCH的标准测试,下面的表格给出了Presto 0.60 TPCH的测试结果。直接运行presto-main/src/test/java/com/facebook/presto/benchmark/BenchmarkSuite.java。
benchmarkName cpuNanos(MILLISECONDS) inputRows inputBytes inputRows/s inputBytes/s outputRows outputBytes outputRows/s outputBytes/s
count_agg 2.055ms 1.5M 12.9MB 730M/s 6.12GB/s 1 9B 486/s 4.28KB/s
double_sum_agg 14.792ms 1.5M 12.9MB 101M/s 870MB/s 1 9B 67/s 608B/s
hash_agg 174.576ms 1.5M 21.5MB 8.59M/s 123MB/s 3 45B 17/s 257B/s
predicate_filter 68.387ms 1.5M 12.9MB 21.9M/s 188MB/s 1.29M 11.1MB 18.8M/s 162MB/s
raw_stream 1.899ms 1.5M 12.9MB 790M/s 6.62GB/s 1.5M 12.9MB 790M/s 6.62GB/s
top100 58.735ms 1.5M 12.9MB 25.5M/s 219MB/s 100 900B 1.7K/s 15KB/s
in_memory_orderby_1.5M 1909.524ms 1.5M 41.5MB 786K/s 21.7MB/s 1.5M 28.6MB 786K/s 15MB/s
hash_build 588.471ms 1.5M 25.7MB 2.55M/s 43.8MB/s 1.5M 25.7MB 2.55M/s 43.8MB/s
hash_join 2400.006ms 6M 103MB 2.5M/s 42.9MB/s 6M 206MB 2.5M/s 85.8MB/s
hash_build_and_join 2996.489ms 7.5M 129MB 2.5M/s 43MB/s 6M 206MB 2M/s 68.8MB/s
hand_tpch_query_1 3146.931ms 6M 361MB 1.91M/s 115MB/s 4 300B 1/s 95B/s
hand_tpch_query_6 345.960ms 6M 240MB 17.3M/s 695MB/s 1 9B 2/s 26B/s
sql_groupby_agg_with_arithmetic 1211.444ms 6M 137MB 4.95M/s 113MB/s 2 30B 1/s 24B/s
sql_count_agg 3.635ms 1.5M 12.9MB 413M/s 3.46GB/s 1 9B 275/s 2.42KB/s
sql_double_sum_agg 16.960ms 1.5M 12.9MB 88.4M/s 759MB/s 1 9B 58/s 530B/s
sql_count_with_filter 81.641ms 1.5M 8.58MB 18.4M/s 105MB/s 1 9B 12/s 110B/s
sql_groupby_agg 169.748ms 1.5M 21.5MB 8.84M/s 126MB/s 3 45B 17/s 265B/s
sql_predicate_filter 46.540ms 1.5M 12.9MB 32.2M/s 277MB/s 1.29M 11.1MB 27.7M/s 238MB/s
sql_raw_stream 3.374ms 1.5M 12.9MB 445M/s 3.73GB/s 1.5M 12.9MB 445M/s 3.73GB/s
sql_top_100 60.663ms 1.5M 12.9MB 24.7M/s 212MB/s 100 900B 1.65K/s 14.5KB/s
sql_hash_join 4421.159ms 7.5M 129MB 1.7M/s 29.1MB/s 6M 206MB 1.36M/s 46.6MB/s
sql_join_with_predicate 1008.909ms 7.5M 116MB 7.43M/s 115MB/s 1 9B 0/s 8B/s
sql_varbinary_max 224.510ms 6M 97.3MB 26.7M/s 433MB/s 1 21B 4/s 93B/s
sql_distinct_multi 257.958ms 1.5M 32MB 5.81M/s 124MB/s 5 112B 19/s 434B/s
sql_distinct_single 112.849ms 1.5M 12.9MB 13.3M/s 114MB/s 1 9B 8/s 79B/s
sql_tpch_query_1 3168.782ms 6M 361MB 1.89M/s 114MB/s 4 336B 1/s 106B/s
sql_tpch_query_6 286.281ms 6M 240MB 21M/s 840MB/s 1 9B 3/s 31B/s
sql_like 3497.154ms 6M 232MB 1.72M/s 66.3MB/s 1.15M 9.84MB 328K/s 2.81MB/s
sql_in 80.267ms 6M 51.5MB 74.8M/s 642MB/s 25 225B 311/s 2.74KB/s
sql_semijoin_in 1945.074ms 7.5M 64.4MB 3.86M/s 33.1MB/s 3M 25.8MB 1.54M/s 13.2MB/s
sql_regexp_like 2233.004ms 1.5M 76.6MB 672K/s 34.3MB/s 1 9B 0/s 4B/s
sql_approx_percentile_long 587.748ms 1.5M 12.9MB 2.55M/s 21.9MB/s 1 9B 1/s 15B/s
sql_between_long 53.433ms 1.5M 12.9MB 28.1M/s 241MB/s 1 9B 18/s 168B/s
sampled_sql_groupby_agg_with_arithmetic 1369.485ms 6M 189MB 4.38M/s 138MB/s 2 30B 1/s 21B/s
sampled_sql_count_agg 11.367ms 1.5M 12.9MB 132M/s 1.11GB/s 1 9B 87/s 791B/s
sampled_sql_join_with_predicate 1338.238ms 7.5M 180MB 5.61M/s 135MB/s 1 9B 0/s 6B/s
sampled_sql_double_sum_agg 24.638ms 1.5M 25.7MB 60.9M/s 1.02GB/s 1 9B 40/s 365B/s
stat_long_variance 26.390ms 1.5M 12.9MB 56.8M/s 488MB/s 1 9B 37/s 341B/s
stat_long_variance_pop 26.583ms 1.5M 12.9MB 56.4M/s 484MB/s 1 9B 37/s 338B/s
stat_double_variance 26.601ms 1.5M 12.9MB 56.4M/s 484MB/s 1 9B 37/s 338B/s
stat_double_variance_pop 26.371ms 1.5M 12.9MB 56.9M/s 488MB/s 1 9B 37/s 341B/s