1 SparkSQL 的发展历程

1.1 Hive and Shark

SparkSQL 的前身是 Shark ,给熟悉 RDBMS 但又不理解 MapReduce 的技术人员提供快速上手的工具, Hive 应运而生,它是当时唯一运行在 Hadoop 上的 SQL-on-Hadoop 工具。但是 MapReduce 计算过程中大量的中间磁盘落地过程消耗了大量的 I/O ,降低的运行效率,为了提高 SQL-on-Hadoop 的效率,大量的 SQL-on-Hadoop 工具开始产生,其中表现较为突出的是:

l MapR Drill

l Cloudera Impala

l Shark

其中 Shark 是伯克利实验室 Spark 生态环境的组件之一,它修改了下图所示的右下角的内存管理、物理计划、执行三个模块,并使之能运行在 Spark 引擎上,从而使得 SQL 查询的速度得到 10-100 倍的提升。

1.2 Shark SparkSQL

但是,随着 Spark 的发展,对于野心勃勃的 Spark 团队来说, Shark 对于 Hive 的太多依赖(如采用 Hive 的语法解析器、查询优化器等等),制约了 Spark One Stack Rule Them All 的既定方针,制约了 Spark 各个组件的相互集成,所以提出了 SparkSQL 项目。 SparkSQL 抛弃原有 Shark 的代码,汲取了 Shark 的一些优点,如内存列存储( In-Memory Columnar Storage )、 Hive 兼容性等,重新开发了 SparkSQL 代码;由于摆脱了对 Hive 的依赖性, SparkSQL 无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓“退一步,海阔天空”。

l 数据兼容方面 不但兼容 Hive ,还可以从 RDD parquet 文件、 JSON 文件中获取数据,未来版本甚至支持获取 RDBMS 数据以及 cassandra NOSQL 数据;

l 性能优化方面 除了采取 In-Memory Columnar Storage byte-code generation 等优化技术外、将会引进 Cost Model 对查询进行动态评估、获取最佳物理计划等等;

l 组件扩展方面 无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义,进行扩展。

2014 6 1 Shark 项目和 SparkSQL 项目的主持人 Reynold Xin 宣布:停止对 Shark 的开发,团队将所有资源放 SparkSQL 项目上,至此, Shark 的发展画上了句话,但也因此发展出两个直线: SparkSQL Hive on Spark

其中 SparkSQL 作为 Spark 生态的一员继续发展,而不再受限于 Hive ,只是兼容 Hive ;而 Hive on Spark 是一个 Hive 的发展计划,该计划将 Spark 作为 Hive 的底层引擎之一,也就是说, Hive 将不再受限于一个引擎,可以采用 Map-Reduce Tez Spark 等引擎。

1.3 SparkSQL 的性能

Shark 的出现,使得 SQL-on-Hadoop 的性能比 Hive 有了 10-100 倍的提高:

那么,摆脱了 Hive 的限制, SparkSQL 的性能又有怎么样的表现呢?虽然没有 Shark 相对于 Hive 那样瞩目地性能提升,但也表现得非常优异:

为什么 SparkSQL 的性能会得到怎么大的提升呢?主要 SparkSQL 在下面几点做了优化:

A :内存列存储( In-Memory Columnar Storage

SparkSQL 的表数据在内存中存储不是采用原生态的 JVM 对象存储方式,而是采用内存列存储,如下图所示。

该存储方式无论在空间占用量和读取吞吐率上都占有很大优势。

对于原生态的 JVM 对象存储方式,每个对象通常要增加 12-16 字节的额外开销,对于一个 270MB TPC-H lineitem table 数据,使用这种方式读入内存,要使用 970MB 左右的内存空间(通常是 2 5 倍于原生数据空间);另外,使用这种方式,每个数据记录产生一个 JVM 对象,如果是大小为 200B 的数据记录, 32G 的堆栈将产生 1.6 亿个对象,这么多的对象,对于 GC 来说,可能要消耗几分钟的时间来处理( JVM 的垃圾收集时间与堆栈中的对象数量呈线性相关)。显然这种内存存储方式对于基于内存计算的 Spark 来说,很昂贵也负担不起。

对于内存列存储来说,将所有原生数据类型的列采用原生数组来存储,将 Hive 支持的复杂数据类型(如 array map 等)先序化后并接成一个字节数组来存储。这样,每个列创建一个 JVM 对象,从而导致可以快速的 GC 和紧凑的数据存储;额外的,还可以使用低廉 CPU 开销的高效压缩方法(如字典编码、行长度编码等压缩方法)降低内存开销;更有趣的是,对于分析查询中频繁使用的聚合特定列,性能会得到很大的提高,原因就是这些列的数据放在一起,更容易读入内存进行计算。

B :字节码生成技术( bytecode generation ,即 CG

在数据库查询中有一个昂贵的操作是查询语句中的表达式,主要是由于 JVM 的内存模型引起的。比如如下一个查询:

SELECT a + b FROM table

在这个查询里,如果采用通用的 SQL 语法途径去处理,会先生成一个表达式树(有两个节点的 Add 树,参考后面章节),在物理处理这个表达式树的时候,将会如图所示的 7 个步骤:

1. 调用虚函数 Add.eval() ,需要确认 Add 两边的数据类型

2. 调用虚函数 a.eval() ,需要确认 a 的数据类型

3. 确定 a 的数据类型是 Int ,装箱

4. 调用虚函数 b.eval() ,需要确认 b 的数据类型

5. 确定 b 的数据类型是 Int ,装箱

6. 调用 Int 类型的 Add

7. 返回装箱后的计算结果

其中多次涉及到虚函数的调用,虚函数的调用会打断 CPU 的正常流水线处理,减缓执行。

Spark1.1.0 catalyst 模块的 expressions 增加了 codegen 模块,如果使用动态字节码生成技术(配置 spark.sql.codegen 参数), SparkSQL 在执行物理计划的时候,对匹配的表达式采用特定的代码,动态编译,然后运行。如上例子,匹配到 Add 方法:

然后,通过调用,最终调用:

最终实现效果类似如下伪代码:

val a: Int = inputRow.getInt(0)

val b: Int = inputRow.getInt(1)

val result: Int = a + b

resultRow.setInt(0, result)

对于 Spark1.1.0 ,对 SQL 表达式都作了 CG 优化,具体可以参看 codegen 模块。 CG 优化的实现主要还是依靠 scala2.10 的运行时放射机制( runtime reflection )。对于 SQL 查询的 CG 优化,可以简单地用下图来表示:

C Scala 代码优化

另外, SparkSQL 在使用 Scala 编写代码的时候,尽量避免低效的、容易 GC 的代码;尽管增加了编写代码的难度,但对于用户来说,还是使用统一的接口,没受到使用上的困难。下图是一个 Scala 代码优化的示意图:

2 SparkSQL 运行架构

类似于关系型数据库, SparkSQL 也是语句也是由 Projection a1 a2 a3 )、 Data Source tableA )、 Filter condition )组成,分别对应 sql 查询过程中的 Result Data Source Operation ,也就是说 SQL 语句按 Result-->Data Source-->Operation 的次序来描述的。

当执行 SparkSQL 语句的顺序为:

1. 对读入的 SQL 语句进行解析( Parse ),分辨出 SQL 语句中哪些词是关键词(如 SELECT FROM WHERE ),哪些是表达式、哪些是 Projection 、哪些是 Data Source 等,从而判断 SQL 语句是否规范;

2. SQL 语句和数据库的数据字典(列、表、视图等等)进行绑定( Bind ),如果相关的 Projection Data Source 等都是存在的话,就表示这个 SQL 语句是可以执行的;

3. 一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划( Optimize );

4. 计划执行( Execute ),按 Operation-->Data Source-->Result 的次序来进行的,在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的 SQL 语句,可能直接从数据库的缓冲池中获取返回结果。

2.1 Tree Rule

SparkSQL SQL 语句的处理和关系型数据库对 SQL 语句的处理采用了类似的方法,首先会将 SQL 语句进行解析( Parse ),然后形成一个 Tree ,在后续的如绑定、优化等处理过程都是对 Tree 的操作,而操作的方法是采用 Rule ,通过模式匹配,对不同类型的节点采用不同的操作。在整个 sql 语句的处理过程中, Tree Rule 相互配合,完成了解析、绑定(在 SparkSQL 中称为 Analysis )、优化、物理计划等过程,最终生成可以执行的物理计划。

2.1.1 Tree

l Tree 的相关代码定义在 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees

l Logical Plans Expressions Physical Operators 都可以使用 Tree 表示

l Tree 的具体操作是通过 TreeNode 来实现的

Ø SparkSQL 定义了 catalyst.trees 的日志,通过这个日志可以形象的表示出树的结构

Ø TreeNode 可以使用 scala 的集合操作方法(如 foreach, map, flatMap, collect 等)进行操作

Ø 有了 TreeNode ,通过 Tree 中各个 TreeNode 之间的关系,可以对 Tree 进行遍历操作,如使用 transformDown transformUp Rule 应用到给定的树段,然后用结果替代旧的树段;也可以使用 transformChildrenDown transformChildrenUp 对一个给定的节点进行操作,通过迭代将 Rule 应用到该节点以及子节点。

l TreeNode 可以细分成三种类型的 Node

Ø UnaryNode 一元节点,即只有一个子节点。如 Limit Filter 操作

Ø BinaryNode 二元节点,即有左右子节点的二叉节点。如 Jion Union 操作

Ø LeafNode 叶子节点,没有子节点的节点。主要用户命令类操作,如 SetCommand

2.1.2 Rule

l Rule 的相关代码定义在 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules

l Rule SparkSQL Analyzer Optimizer SparkPlan 等各个组件中都有应用到

l Rule 是一个抽象类,具体的 Rule 实现是通过 RuleExecutor 完成

l Rule 通过定义 batch batchs ,可以简便的、模块化地对 Tree 进行 transform 操作

l Rule 通过定义 Once FixedPoint ,可以对 Tree 进行一次操作或多次操作(如对某些 Tree 进行多次迭代操作的时候,达到 FixedPoint 次数迭代或达到前后两次的树结构没变化才停止操作,具体参看 RuleExecutor.apply

2.2 sqlContext hiveContext 的运行过程

SparkSQL 有两个分支, sqlContext hiveContext sqlContext 现在只支持 SQL 语法解析器( SQL-92 语法); hiveContext 现在支持 SQL 语法解析器和 hivesql 语法解析器,默认为 hiveSQL 语法解析器,用户可以通过配置切换成 SQL 语法解析器,来运行 hiveSQL 不支持的语法,

2.2.1 sqlContext 的运行过程

sqlContext 总的一个过程如下图所示:

1. SQL 语句经过 SqlParse 解析成 UnresolvedLogicalPlan

2. 使用 analyzer 结合数据数据字典( catalog )进行绑定,生成 resolvedLogicalPlan

3. 使用 optimizer resolvedLogicalPlan 进行优化,生成 optimizedLogicalPlan

4. 使用 SparkPlan LogicalPlan 转换成 PhysicalPlan

5. 使用 prepareForExecution() PhysicalPlan 转换成可执行物理计划;

6. 使用 execute() 执行可执行物理计划;

7. 生成 SchemaRDD

在整个运行过程中涉及到多个 SparkSQL 的组件,如 SqlParse analyzer optimizer SparkPlan 等等

2.2.2 hiveContext 的运行过程

hiveContext 总的一个过程如下图所示:

1. SQL 语句经过 HiveQl.parseSql 解析成 Unresolved LogicalPlan ,在这个解析过程中对 hiveql 语句使用 getAst() 获取 AST 树,然后再进行解析;

2. 使用 analyzer 结合数据 hive 源数据 Metastore (新的 catalog )进行绑定,生成 resolved LogicalPlan

3. 使用 optimizer resolved LogicalPlan 进行优化,生成 optimized LogicalPlan ,优化前使用了 ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))) 进行预处理;

4. 使用 hivePlanner LogicalPlan 转换成 PhysicalPlan

5. 使用 prepareForExecution() PhysicalPlan 转换成可执行物理计划;

6. 使用 execute() 执行可执行物理计划;

7. 执行后,使用 map(_.copy) 将结果导入 SchemaRDD

2.3 catalyst 优化器

SparkSQL1.1 总体上由四个模块组成: core catalyst hive hive-Thriftserver

l core 处理数据的输入输出,从不同的数据源获取数据( RDD Parquet json 等),将查询结果输出成 schemaRDD

l catalyst 处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;

l hive hive 数据的处理

l hive-ThriftServer 提供 CLI JDBC/ODBC 接口

在这四个模块中, catalyst 处于最核心的部分,其性能优劣将影响整体的性能。由于发展时间尚短,还有很多不足的地方,但其插件式的设计,为未来的发展留下了很大的空间。下面是 catalyst 的一个设计图:

其中虚线部分是以后版本要实现的功能,实线部分是已经实现的功能。从上图看, catalyst 主要的实现组件有:

l sqlParse ,完成 sql 语句的语法解析功能,目前只提供了一个简单的 sql 解析器;

l Analyzer ,主要完成绑定工作,将不同来源的 Unresolved LogicalPlan 和数据元数据(如 hive metastore Schema catalog )进行绑定,生成 resolved LogicalPlan

l optimizer resolved LogicalPlan 进行优化,生成 optimized LogicalPlan

l Planner LogicalPlan 转换成 PhysicalPlan

l CostModel ,主要根据过去的性能统计数据,选择最佳的物理执行计划

这些组件的基本实现方法:

l 先将 sql 语句通过解析生成 Tree ,然后在不同阶段使用不同的 Rule 应用到 Tree 上,通过转换完成各个组件的功能。

l Analyzer 使用 Analysis Rules ,配合数据元数据(如 hive metastore Schema catalog ),完善 Unresolved LogicalPlan 的属性而转换成 resolved LogicalPlan

l optimizer 使用 Optimization Rules ,对 resolved LogicalPlan 进行合并、列裁剪、过滤器下推等优化作业而转换成 optimized LogicalPlan

l Planner 使用 Planning Strategies ,对 optimized LogicalPlan

3 SparkSQL CLI

CLI Command-Line Interface ,命令行界面)是指可在用户提示符下键入可执行指令的界面,它通常不支持鼠标,用户通过键盘输入指令,计算机接收到指令后予以执行。 Spark CLI 指的是使用命令界面直接输入 SQL 命令,然后发送到 Spark 集群进行执行,在界面中显示运行过程和最终的结果。

Spark1.1 相较于 Spark1.0 最大的差别就在于 Spark1.1 增加了 Spark SQL CLI ThriftServer ,使得 Hive 用户还有用惯了命令行的 RDBMS 数据库管理员较容易地上手,真正意义上进入了 SQL 时代。

【注】 Spark CLI Spark Thrift Server 实验环境为第二课《 Spark 编译与部署(下) --Spark 编译安装》所搭建

3.1 运行环境说明

3.1.1 硬软件环境

l 主机操作系统: Windows 64 位,双核 4 线程,主频 2.2G 10G 内存

l 虚拟软件: VMware® Workstation 9.0.0 build-812388

l 虚拟机操作系统: CentOS 64 位,单核

l 虚拟机运行环境:

Ø JDK 1.7.0_55 64

Ø Hadoop 2.2.0 (需要编译为 64 位)

Ø Scala 2.11.4

Ø Spark 1.1.0 (需要编译)

Ø Hive 0.13.1

3.1.2 机器网络环境

集群包含三个节点,节点之间可以免密码 SSH 访问,节点 IP 地址和主机名分布如下:

3.2 配置并启动

3.2.1 创建并配置 hive-site.xml

在运行 Spark SQL CLI 中需要使用到 Hive Metastore ,故需要在 Spark 中添加其 uris 。具体方法是在 SPARK_HOME/conf 目录下创建 hive-site.xml 文件,然后在该配置文件中,添加 hive.metastore.uris 属性,具体如下:

<configuration>

<property>

<name>hive.metastore.uris</name>

<value>thrift://hadoop1:9083</value>

<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

</property>

</configuration>

3.2.2 启动 Hive

在使用 Spark SQL CLI 之前需要启动 Hive Metastore (如果数据存放在 HDFS 文件系统,还需要启动 Hadoop HDFS ),使用如下命令可以使 Hive Metastore 启动后运行在后台,可以通过 jobs 查询:

$nohup hive --service metastore > metastore.log 2>&1 &

3.2.3 启动 Spark 集群和 Spark SQL CLI

通过如下命令启动 Spark 集群和 Spark SQL CLI

$cd /app/hadoop/spark-1.1.0

$sbin/start-all.sh

$bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g

在集群监控页面可以看到启动了 SparkSQL 应用程序:

这时就可以使用 HQL 语句对 Hive 数据进行查询,另外可以使用 COMMAND ,如使用 set 进行设置参数:默认情况下, SparkSQL Shuffle 的时候是 200 partition ,可以使用如下命令修改该参数:

SET spark.sql.shuffle.partitions=20;

运行同一个查询语句,参数改变后, Task partition )的数量就由 200 变成了 20

3.2.4 命令参数

通过 bin/spark-sql --help 可以查看 CLI 命令参数:

其中 [options] CLI 启动一个 SparkSQL 应用程序的参数,如果不设置 --master 的话,将在启动 spark-sql 的机器以 local 方式运行,只能通过 http:// 机器名 :4040 进行监控;这部分参数,可以参照 Spark1.0.0 应用程序部署工具 spark-submit 的参数。

[cli option] CLI 的参数,通过这些参数 CLI 可以直接运行 SQL 文件、进入命令行运行 SQL 命令等等,类似以前的 Shark 的用法。需要注意的是 CLI 不是使用 JDBC 连接,所以不能连接到 ThriftServer ;但可以配置 conf/hive-site.xml 连接到 Hive Metastore ,然后对 Hive 数据进行查询。

3.3 实战 Spark SQL CLI

3.3.1 获取订单每年的销售单数、销售总额

第一步 设置任务个数,在这里修改为 20

spark-sql>SET spark.sql.shuffle.partitions=20;

第二步 运行 SQL 语句

spark-sql>use hive;

spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

第三步 查看运行结果

3.3.2 计算所有订单每年的总金额

第一步 执行 SQL 语句

spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

第二步 执行结果

使用 CLI 执行结果如下:

3.3.3 计算所有订单每年最大金额订单的销售额

第一步 执行 SQL 语句

spark-sql>select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear;

第二步 执行结果

使用 CLI 执行结果如下:

4 Spark Thrift Server

ThriftServer 是一个 JDBC/ODBC 接口,用户可以通过 JDBC/ODBC 连接 ThriftServer 来访问 SparkSQL 的数据。 ThriftServer 在启动的时候,会启动了一个 SparkSQL 的应用程序,而通过 JDBC/ODBC 连接进来的客户端共同分享这个 SparkSQL 应用程序的资源,也就是说不同的用户之间可以共享数据; ThriftServer 启动时还开启一个侦听器,等待 JDBC 客户端的连接和提交查询。所以,在配置 ThriftServer 的时候,至少要配置 ThriftServer 的主机名和端口,如果要使用 Hive 数据的话,还要提供 Hive Metastore uris

【注】 Spark CLI Spark Thrift Server 实验环境为第二课《 Spark 编译与部署(下) --Spark 编译安装》所搭建

4.1 配置并启动

4.1.1 创建并配置 hive-site.xml

第一步 创建 hive-site.xml 配置文件

$SPARK_HOME/conf 目录下修改 hive-site.xml 配置文件(如果在 Spark SQL CLI 中已经添加,可以省略):

$cd /app/hadoop/spark-1.1.0/conf

$sudo vi hive-site.xml

第二步 修改配置文件

设置 hadoop1 Metastore 服务器, hadoop2 Thrift Server 服务器,配置内容如下:

<configuration>

<property>

<name>hive.metastore.uris</name>

<value>thrift://hadoop1:9083</value>

<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

</property>

<property>

<name>hive.server2.thrift.min.worker.threads</name>

<value>5</value>

<description>Minimum number of Thrift worker threads</description>

</property>

<property>

<name>hive.server2.thrift.max.worker.threads</name>

<value>500</value>

<description>Maximum number of Thrift worker threads</description>

</property>

<property>

<name>hive.server2.thrift.port</name>

<value>10000</value>

<description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>

</property>

<property>

<name>hive.server2.thrift.bind.host</name>

<value>hadoop2</value>

<description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description>

</property>

</configuration>

4.1.2 启动 Hive

hadoop1 节点中,在后台启动 Hive Metastore (如果数据存放在 HDFS 文件系统,还需要启动 Hadoop HDFS ):

$nohup hive --service metastore > metastore.log 2>&1 &

4.1.3 启动 Spark 集群和 Thrift Server

hadoop1 节点启动 Spark 集群

$cd /app/hadoop/spark-1.1.0/sbin

$./start-all.sh

hadoop2 节点上进入 SPARK_HOME/sbin 目录,使用如下命令启动 Thrift Server

$cd /app/hadoop/spark-1.1.0/sbin

$./start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 1g

注意 Thrift Server 需要按照配置在 hadoop2 启动!

在集群监控页面可以看到启动了 SparkSQL 应用程序:

4.1.4 命令参数

使用 sbin/start-thriftserver.sh --help 可以查看 ThriftServer 的命令参数:

$sbin/start-thriftserver.sh --help Usage: ./sbin/start-thriftserver [options] [thrift server options]

Thrift server options: Use value for given property

其中 [options] Thrift Server 启动一个 SparkSQL 应用程序的参数,如果不设置 --master 的话,将在启动 Thrift Server 的机器以 local 方式运行,只能通过 http:// 机器名 :4040 进行监控;这部分参数,可以参照 Spark1.0.0 应用程序部署工具 spark-submit 的参数。在集群中提供 Thrift Server 的话,一定要配置 master executor-memory 等参数。

[thrift server options] Thrift Server 的参数,可以使用 -dproperty=value 的格式来定义;在实际应用上,因为参数比较多,通常使用 conf/hive-site.xml 配置。

4.2 实战 Thrift Server

4.2.1 远程客户端连接

可以在任意节点启动 bin/beeline ,用 !connect jdbc:hive2://hadoop2:10000 连接 ThriftServer ,因为没有采用权限管理,所以用户名用运行 bin/beeline 的用户 hadoop ,密码为空:

$cd /app/hadoop/spark-1.1.0/bin

$./beeline

beeline>!connect jdbc:hive2://hadoop2:10000

4.2.2 基本操作

第一步 显示 hive 数据库所有表

beeline>show database;

beeline>use hive;

beeline>show tables;

第二步 创建表 testThrift

beeline>create table testThrift(field1 String , field2 Int);

beeline>show tables;

第三步 tbStockDetail 表中金额大于 3000 插入到 testThrift 表中

beeline>insert into table testThrift select ordernumber,amount from tbStockDetail where amount>3000;

beeline>select * from testThrift;

第四步 重新创建 testThrift 表中,把年度最大订单插入该表中

beeline>drop table testThrift;

beeline>create table testThrift (field1 String , field2 Int);

beeline>insert into table testThrift select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear;

beeline>select * from testThrift;

4.2.3 计算所有订单每年的订单数

第一步 执行 SQL 语句

spark-sql>select c.theyear, count(distinct a.ordernumber) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

第二步 执行结果

Stage 监控页面:

查看 Details for Stage 28

4.2.4 计算所有订单月销售额前十名

第一步 执行 SQL 语句

spark-sql>select c.theyear,c.themonth,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,c.themonth order by sumofamount desc limit 10;

第二步 执行结果

Stage 监控页面:

在其第一个 Task 中,从本地读入数据

在后面的 Task 是从内存中获取数据

4.2.5 缓存表数据

第一步 缓存数据

beeline>cache table tbStock;

beeline>select count(*) from tbStock;

第二步 运行 4.2.4 中的“计算所有订单月销售额前十名”

beeline>select count(*) from tbStock;

本次计算划给 11.233 秒,查看 webUI ,数据已经缓存,缓存率为 100%

第三步 在另外节点再次运行

hadoop3 节点启动 bin/beeline ,用 !connect jdbc:hive2://hadoop2:10000 连接 ThriftServer ,然后直接运行对 tbStock 计数(注意没有进行数据库的切换):

用时 0.343 秒,再查看 webUI 中的 stage

Locality Level PROCESS ,显然是使用了缓存表。

从上可以看出, ThriftServer 可以连接多个 JDBC/ODBC 客户端,并相互之间可以共享数据。顺便提一句, ThriftServer 启动后处于监听状态,用户可以使用 ctrl+c 退出 ThriftServer ;而 beeline 的退出使用 !q 命令。

4.2.6 IDEA JDBC 访问

有了 ThriftServer ,开发人员可以非常方便的使用 JDBC/ODBC 来访问 SparkSQL 。下面是一个 scala 代码,查询表 tbStockDetail ,返回 amount>3000 的单据号和交易金额:

第一步 IDEA 创建 class6 包和类 JDBCofSparkSQL

参见《 Spark 编程模型(下) --IDEA 搭建及实战》在 IDEA 中创建 class6 包并新建类 JDBCofSparkSQL 。该类中查询 tbStockDetail 金额大于 3000 的订单:

package class6

import java.sql.DriverManager

object JDBCofSparkSQL {

def main(args: Array[String]) {

Class.forName("org.apache.hive.jdbc.HiveDriver")

val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000/hive", "hadoop", "")

try {

val statement = conn.createStatement

val rs = statement.executeQuery("select ordernumber,amount from tbStockDetail where amount>3000")

while (rs.next) {

val ordernumber = rs.getString("ordernumber")

val amount = rs.getString("amount")

println("ordernumber = %s, amount = %s".format(ordernumber, amount))

} catch {

case e: Exception => e.printStackTrace

conn.close

第二步 查看运行结果

IDEA 中可以观察到,在运行日志窗口中没有运行过程的日志,只显示查询结果

第三步 查看监控结果

Spark 监控界面中观察到,该 Job 有一个编号为 6 Stage ,该 Stage 2 Task ,分别运行在 hadoop1 hadoop2 节点,获取数据为 NODE_LOCAL 方式。

hadoop2 中观察 Thrift Server 运行日志如下: