|
|
月球上的南瓜 · DiffDefense:通过扩散模型防御对抗 ...· 2 年前 · |
|
|
仗义的面包 · byte数组 ios json ...· 2 年前 · |
|
|
斯文的皮带 · vue使用 monaco editor ...· 2 年前 · |
|
|
买醉的闹钟 · RSA加密解密(无数据大小限制,php、go ...· 2 年前 · |
|
|
很酷的鸡蛋 · JSP 开发环境搭建 | 菜鸟教程· 2 年前 · |
SparkSQL 的前身是 Shark ,给熟悉 RDBMS 但又不理解 MapReduce 的技术人员提供快速上手的工具, Hive 应运而生,它是当时唯一运行在 Hadoop 上的 SQL-on-Hadoop 工具。但是 MapReduce 计算过程中大量的中间磁盘落地过程消耗了大量的 I/O ,降低的运行效率,为了提高 SQL-on-Hadoop 的效率,大量的 SQL-on-Hadoop 工具开始产生,其中表现较为突出的是:
l MapR 的 Drill
l Cloudera 的 Impala
l Shark
其中 Shark 是伯克利实验室 Spark 生态环境的组件之一,它修改了下图所示的右下角的内存管理、物理计划、执行三个模块,并使之能运行在 Spark 引擎上,从而使得 SQL 查询的速度得到 10-100 倍的提升。
但是,随着 Spark 的发展,对于野心勃勃的 Spark 团队来说, Shark 对于 Hive 的太多依赖(如采用 Hive 的语法解析器、查询优化器等等),制约了 Spark 的 One Stack Rule Them All 的既定方针,制约了 Spark 各个组件的相互集成,所以提出了 SparkSQL 项目。 SparkSQL 抛弃原有 Shark 的代码,汲取了 Shark 的一些优点,如内存列存储( In-Memory Columnar Storage )、 Hive 兼容性等,重新开发了 SparkSQL 代码;由于摆脱了对 Hive 的依赖性, SparkSQL 无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓“退一步,海阔天空”。
l 数据兼容方面 不但兼容 Hive ,还可以从 RDD 、 parquet 文件、 JSON 文件中获取数据,未来版本甚至支持获取 RDBMS 数据以及 cassandra 等 NOSQL 数据;
l 性能优化方面 除了采取 In-Memory Columnar Storage 、 byte-code generation 等优化技术外、将会引进 Cost Model 对查询进行动态评估、获取最佳物理计划等等;
l 组件扩展方面 无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义,进行扩展。
2014 年 6 月 1 日 Shark 项目和 SparkSQL 项目的主持人 Reynold Xin 宣布:停止对 Shark 的开发,团队将所有资源放 SparkSQL 项目上,至此, Shark 的发展画上了句话,但也因此发展出两个直线: SparkSQL 和 Hive on Spark 。
其中 SparkSQL 作为 Spark 生态的一员继续发展,而不再受限于 Hive ,只是兼容 Hive ;而 Hive on Spark 是一个 Hive 的发展计划,该计划将 Spark 作为 Hive 的底层引擎之一,也就是说, Hive 将不再受限于一个引擎,可以采用 Map-Reduce 、 Tez 、 Spark 等引擎。
Shark 的出现,使得 SQL-on-Hadoop 的性能比 Hive 有了 10-100 倍的提高:
那么,摆脱了 Hive 的限制, SparkSQL 的性能又有怎么样的表现呢?虽然没有 Shark 相对于 Hive 那样瞩目地性能提升,但也表现得非常优异:
为什么 SparkSQL 的性能会得到怎么大的提升呢?主要 SparkSQL 在下面几点做了优化:
A :内存列存储( In-Memory Columnar Storage )
SparkSQL 的表数据在内存中存储不是采用原生态的 JVM 对象存储方式,而是采用内存列存储,如下图所示。
该存储方式无论在空间占用量和读取吞吐率上都占有很大优势。
对于原生态的 JVM 对象存储方式,每个对象通常要增加 12-16 字节的额外开销,对于一个 270MB 的 TPC-H lineitem table 数据,使用这种方式读入内存,要使用 970MB 左右的内存空间(通常是 2 ~ 5 倍于原生数据空间);另外,使用这种方式,每个数据记录产生一个 JVM 对象,如果是大小为 200B 的数据记录, 32G 的堆栈将产生 1.6 亿个对象,这么多的对象,对于 GC 来说,可能要消耗几分钟的时间来处理( JVM 的垃圾收集时间与堆栈中的对象数量呈线性相关)。显然这种内存存储方式对于基于内存计算的 Spark 来说,很昂贵也负担不起。
对于内存列存储来说,将所有原生数据类型的列采用原生数组来存储,将 Hive 支持的复杂数据类型(如 array 、 map 等)先序化后并接成一个字节数组来存储。这样,每个列创建一个 JVM 对象,从而导致可以快速的 GC 和紧凑的数据存储;额外的,还可以使用低廉 CPU 开销的高效压缩方法(如字典编码、行长度编码等压缩方法)降低内存开销;更有趣的是,对于分析查询中频繁使用的聚合特定列,性能会得到很大的提高,原因就是这些列的数据放在一起,更容易读入内存进行计算。
B :字节码生成技术( bytecode generation ,即 CG )
在数据库查询中有一个昂贵的操作是查询语句中的表达式,主要是由于 JVM 的内存模型引起的。比如如下一个查询:
SELECT a + b FROM table
在这个查询里,如果采用通用的 SQL 语法途径去处理,会先生成一个表达式树(有两个节点的 Add 树,参考后面章节),在物理处理这个表达式树的时候,将会如图所示的 7 个步骤:
1. 调用虚函数 Add.eval() ,需要确认 Add 两边的数据类型
2. 调用虚函数 a.eval() ,需要确认 a 的数据类型
3. 确定 a 的数据类型是 Int ,装箱
4. 调用虚函数 b.eval() ,需要确认 b 的数据类型
5. 确定 b 的数据类型是 Int ,装箱
6. 调用 Int 类型的 Add
7. 返回装箱后的计算结果
其中多次涉及到虚函数的调用,虚函数的调用会打断 CPU 的正常流水线处理,减缓执行。
Spark1.1.0 在 catalyst 模块的 expressions 增加了 codegen 模块,如果使用动态字节码生成技术(配置 spark.sql.codegen 参数), SparkSQL 在执行物理计划的时候,对匹配的表达式采用特定的代码,动态编译,然后运行。如上例子,匹配到 Add 方法:
然后,通过调用,最终调用:
最终实现效果类似如下伪代码:
val a: Int = inputRow.getInt(0)
val b: Int = inputRow.getInt(1)
val result: Int = a + b
resultRow.setInt(0, result)
对于 Spark1.1.0 ,对 SQL 表达式都作了 CG 优化,具体可以参看 codegen 模块。 CG 优化的实现主要还是依靠 scala2.10 的运行时放射机制( runtime reflection )。对于 SQL 查询的 CG 优化,可以简单地用下图来表示:
C : Scala 代码优化
另外, SparkSQL 在使用 Scala 编写代码的时候,尽量避免低效的、容易 GC 的代码;尽管增加了编写代码的难度,但对于用户来说,还是使用统一的接口,没受到使用上的困难。下图是一个 Scala 代码优化的示意图:
类似于关系型数据库, SparkSQL 也是语句也是由 Projection ( a1 , a2 , a3 )、 Data Source ( tableA )、 Filter ( condition )组成,分别对应 sql 查询过程中的 Result 、 Data Source 、 Operation ,也就是说 SQL 语句按 Result-->Data Source-->Operation 的次序来描述的。
当执行 SparkSQL 语句的顺序为:
1. 对读入的 SQL 语句进行解析( Parse ),分辨出 SQL 语句中哪些词是关键词(如 SELECT 、 FROM 、 WHERE ),哪些是表达式、哪些是 Projection 、哪些是 Data Source 等,从而判断 SQL 语句是否规范;
2. 将 SQL 语句和数据库的数据字典(列、表、视图等等)进行绑定( Bind ),如果相关的 Projection 、 Data Source 等都是存在的话,就表示这个 SQL 语句是可以执行的;
3. 一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划( Optimize );
4. 计划执行( Execute ),按 Operation-->Data Source-->Result 的次序来进行的,在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的 SQL 语句,可能直接从数据库的缓冲池中获取返回结果。
SparkSQL 对 SQL 语句的处理和关系型数据库对 SQL 语句的处理采用了类似的方法,首先会将 SQL 语句进行解析( Parse ),然后形成一个 Tree ,在后续的如绑定、优化等处理过程都是对 Tree 的操作,而操作的方法是采用 Rule ,通过模式匹配,对不同类型的节点采用不同的操作。在整个 sql 语句的处理过程中, Tree 和 Rule 相互配合,完成了解析、绑定(在 SparkSQL 中称为 Analysis )、优化、物理计划等过程,最终生成可以执行的物理计划。
l Tree 的相关代码定义在 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees
l Logical Plans 、 Expressions 、 Physical Operators 都可以使用 Tree 表示
l Tree 的具体操作是通过 TreeNode 来实现的
Ø SparkSQL 定义了 catalyst.trees 的日志,通过这个日志可以形象的表示出树的结构
Ø TreeNode 可以使用 scala 的集合操作方法(如 foreach, map, flatMap, collect 等)进行操作
Ø 有了 TreeNode ,通过 Tree 中各个 TreeNode 之间的关系,可以对 Tree 进行遍历操作,如使用 transformDown 、 transformUp 将 Rule 应用到给定的树段,然后用结果替代旧的树段;也可以使用 transformChildrenDown 、 transformChildrenUp 对一个给定的节点进行操作,通过迭代将 Rule 应用到该节点以及子节点。
l TreeNode 可以细分成三种类型的 Node :
Ø UnaryNode 一元节点,即只有一个子节点。如 Limit 、 Filter 操作
Ø BinaryNode 二元节点,即有左右子节点的二叉节点。如 Jion 、 Union 操作
Ø LeafNode 叶子节点,没有子节点的节点。主要用户命令类操作,如 SetCommand
l Rule 的相关代码定义在 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules
l Rule 在 SparkSQL 的 Analyzer 、 Optimizer 、 SparkPlan 等各个组件中都有应用到
l Rule 是一个抽象类,具体的 Rule 实现是通过 RuleExecutor 完成
l Rule 通过定义 batch 和 batchs ,可以简便的、模块化地对 Tree 进行 transform 操作
l Rule 通过定义 Once 和 FixedPoint ,可以对 Tree 进行一次操作或多次操作(如对某些 Tree 进行多次迭代操作的时候,达到 FixedPoint 次数迭代或达到前后两次的树结构没变化才停止操作,具体参看 RuleExecutor.apply )
SparkSQL 有两个分支, sqlContext 和 hiveContext , sqlContext 现在只支持 SQL 语法解析器( SQL-92 语法); hiveContext 现在支持 SQL 语法解析器和 hivesql 语法解析器,默认为 hiveSQL 语法解析器,用户可以通过配置切换成 SQL 语法解析器,来运行 hiveSQL 不支持的语法,
sqlContext 总的一个过程如下图所示:
1. SQL 语句经过 SqlParse 解析成 UnresolvedLogicalPlan ;
2. 使用 analyzer 结合数据数据字典( catalog )进行绑定,生成 resolvedLogicalPlan ;
3. 使用 optimizer 对 resolvedLogicalPlan 进行优化,生成 optimizedLogicalPlan ;
4. 使用 SparkPlan 将 LogicalPlan 转换成 PhysicalPlan ;
5. 使用 prepareForExecution() 将 PhysicalPlan 转换成可执行物理计划;
6. 使用 execute() 执行可执行物理计划;
7. 生成 SchemaRDD 。
在整个运行过程中涉及到多个 SparkSQL 的组件,如 SqlParse 、 analyzer 、 optimizer 、 SparkPlan 等等
hiveContext 总的一个过程如下图所示:
1. SQL 语句经过 HiveQl.parseSql 解析成 Unresolved LogicalPlan ,在这个解析过程中对 hiveql 语句使用 getAst() 获取 AST 树,然后再进行解析;
2. 使用 analyzer 结合数据 hive 源数据 Metastore (新的 catalog )进行绑定,生成 resolved LogicalPlan ;
3. 使用 optimizer 对 resolved LogicalPlan 进行优化,生成 optimized LogicalPlan ,优化前使用了 ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))) 进行预处理;
4. 使用 hivePlanner 将 LogicalPlan 转换成 PhysicalPlan ;
5. 使用 prepareForExecution() 将 PhysicalPlan 转换成可执行物理计划;
6. 使用 execute() 执行可执行物理计划;
7. 执行后,使用 map(_.copy) 将结果导入 SchemaRDD 。
SparkSQL1.1 总体上由四个模块组成: core 、 catalyst 、 hive 、 hive-Thriftserver :
l core 处理数据的输入输出,从不同的数据源获取数据( RDD 、 Parquet 、 json 等),将查询结果输出成 schemaRDD ;
l catalyst 处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;
l hive 对 hive 数据的处理
l hive-ThriftServer 提供 CLI 和 JDBC/ODBC 接口
在这四个模块中, catalyst 处于最核心的部分,其性能优劣将影响整体的性能。由于发展时间尚短,还有很多不足的地方,但其插件式的设计,为未来的发展留下了很大的空间。下面是 catalyst 的一个设计图:
其中虚线部分是以后版本要实现的功能,实线部分是已经实现的功能。从上图看, catalyst 主要的实现组件有:
l sqlParse ,完成 sql 语句的语法解析功能,目前只提供了一个简单的 sql 解析器;
l Analyzer ,主要完成绑定工作,将不同来源的 Unresolved LogicalPlan 和数据元数据(如 hive metastore 、 Schema catalog )进行绑定,生成 resolved LogicalPlan ;
l optimizer 对 resolved LogicalPlan 进行优化,生成 optimized LogicalPlan ;
l Planner 将 LogicalPlan 转换成 PhysicalPlan ;
l CostModel ,主要根据过去的性能统计数据,选择最佳的物理执行计划
这些组件的基本实现方法:
l 先将 sql 语句通过解析生成 Tree ,然后在不同阶段使用不同的 Rule 应用到 Tree 上,通过转换完成各个组件的功能。
l Analyzer 使用 Analysis Rules ,配合数据元数据(如 hive metastore 、 Schema catalog ),完善 Unresolved LogicalPlan 的属性而转换成 resolved LogicalPlan ;
l optimizer 使用 Optimization Rules ,对 resolved LogicalPlan 进行合并、列裁剪、过滤器下推等优化作业而转换成 optimized LogicalPlan ;
l Planner 使用 Planning Strategies ,对 optimized LogicalPlan
CLI ( Command-Line Interface ,命令行界面)是指可在用户提示符下键入可执行指令的界面,它通常不支持鼠标,用户通过键盘输入指令,计算机接收到指令后予以执行。 Spark CLI 指的是使用命令界面直接输入 SQL 命令,然后发送到 Spark 集群进行执行,在界面中显示运行过程和最终的结果。
Spark1.1 相较于 Spark1.0 最大的差别就在于 Spark1.1 增加了 Spark SQL CLI 和 ThriftServer ,使得 Hive 用户还有用惯了命令行的 RDBMS 数据库管理员较容易地上手,真正意义上进入了 SQL 时代。
【注】 Spark CLI 和 Spark Thrift Server 实验环境为第二课《 Spark 编译与部署(下) --Spark 编译安装》所搭建
l 主机操作系统: Windows 64 位,双核 4 线程,主频 2.2G , 10G 内存
l 虚拟软件: VMware® Workstation 9.0.0 build-812388
l 虚拟机操作系统: CentOS 64 位,单核
l 虚拟机运行环境:
Ø JDK : 1.7.0_55 64 位
Ø Hadoop : 2.2.0 (需要编译为 64 位)
Ø Scala : 2.11.4
Ø Spark : 1.1.0 (需要编译)
Ø Hive : 0.13.1
集群包含三个节点,节点之间可以免密码 SSH 访问,节点 IP 地址和主机名分布如下:
在运行 Spark SQL CLI 中需要使用到 Hive Metastore ,故需要在 Spark 中添加其 uris 。具体方法是在 SPARK_HOME/conf 目录下创建 hive-site.xml 文件,然后在该配置文件中,添加 hive.metastore.uris 属性,具体如下:
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop1:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
</configuration>
在使用 Spark SQL CLI 之前需要启动 Hive Metastore (如果数据存放在 HDFS 文件系统,还需要启动 Hadoop 的 HDFS ),使用如下命令可以使 Hive Metastore 启动后运行在后台,可以通过 jobs 查询:
$nohup hive --service metastore > metastore.log 2>&1 &
通过如下命令启动 Spark 集群和 Spark SQL CLI :
$cd /app/hadoop/spark-1.1.0
$sbin/start-all.sh
$bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g
在集群监控页面可以看到启动了 SparkSQL 应用程序:
这时就可以使用 HQL 语句对 Hive 数据进行查询,另外可以使用 COMMAND ,如使用 set 进行设置参数:默认情况下, SparkSQL Shuffle 的时候是 200 个 partition ,可以使用如下命令修改该参数:
SET spark.sql.shuffle.partitions=20;
运行同一个查询语句,参数改变后, Task ( partition )的数量就由 200 变成了 20 。
通过 bin/spark-sql --help 可以查看 CLI 命令参数:
其中 [options] 是 CLI 启动一个 SparkSQL 应用程序的参数,如果不设置 --master 的话,将在启动 spark-sql 的机器以 local 方式运行,只能通过 http:// 机器名 :4040 进行监控;这部分参数,可以参照 Spark1.0.0 应用程序部署工具 spark-submit 的参数。
[cli option] 是 CLI 的参数,通过这些参数 CLI 可以直接运行 SQL 文件、进入命令行运行 SQL 命令等等,类似以前的 Shark 的用法。需要注意的是 CLI 不是使用 JDBC 连接,所以不能连接到 ThriftServer ;但可以配置 conf/hive-site.xml 连接到 Hive 的 Metastore ,然后对 Hive 数据进行查询。
第一步 设置任务个数,在这里修改为 20 个
spark-sql>SET spark.sql.shuffle.partitions=20;
第二步 运行 SQL 语句
spark-sql>use hive;
spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;
第三步 查看运行结果
第一步 执行 SQL 语句
spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;
第二步 执行结果
使用 CLI 执行结果如下:
spark-sql>select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear;
使用 CLI 执行结果如下:
ThriftServer 是一个 JDBC/ODBC 接口,用户可以通过 JDBC/ODBC 连接 ThriftServer 来访问 SparkSQL 的数据。 ThriftServer 在启动的时候,会启动了一个 SparkSQL 的应用程序,而通过 JDBC/ODBC 连接进来的客户端共同分享这个 SparkSQL 应用程序的资源,也就是说不同的用户之间可以共享数据; ThriftServer 启动时还开启一个侦听器,等待 JDBC 客户端的连接和提交查询。所以,在配置 ThriftServer 的时候,至少要配置 ThriftServer 的主机名和端口,如果要使用 Hive 数据的话,还要提供 Hive Metastore 的 uris 。
【注】 Spark CLI 和 Spark Thrift Server 实验环境为第二课《 Spark 编译与部署(下) --Spark 编译安装》所搭建
第一步 创建 hive-site.xml 配置文件
在 $SPARK_HOME/conf 目录下修改 hive-site.xml 配置文件(如果在 Spark SQL CLI 中已经添加,可以省略):
$cd /app/hadoop/spark-1.1.0/conf
$sudo vi hive-site.xml
第二步 修改配置文件
设置 hadoop1 为 Metastore 服务器, hadoop2 为 Thrift Server 服务器,配置内容如下:
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop1:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
<name>hive.server2.thrift.min.worker.threads</name>
<value>5</value>
<description>Minimum number of Thrift worker threads</description>
</property>
<property>
<name>hive.server2.thrift.max.worker.threads</name>
<value>500</value>
<description>Maximum number of Thrift worker threads</description>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
<description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hadoop2</value>
<description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description>
</property>
</configuration>
在 hadoop1 节点中,在后台启动 Hive Metastore (如果数据存放在 HDFS 文件系统,还需要启动 Hadoop 的 HDFS ):
$nohup hive --service metastore > metastore.log 2>&1 &
在 hadoop1 节点启动 Spark 集群
$cd /app/hadoop/spark-1.1.0/sbin
$./start-all.sh
在 hadoop2 节点上进入 SPARK_HOME/sbin 目录,使用如下命令启动 Thrift Server
$cd /app/hadoop/spark-1.1.0/sbin
$./start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 1g
注意 : Thrift Server 需要按照配置在 hadoop2 启动!
在集群监控页面可以看到启动了 SparkSQL 应用程序:
使用 sbin/start-thriftserver.sh --help 可以查看 ThriftServer 的命令参数:
$sbin/start-thriftserver.sh --help Usage: ./sbin/start-thriftserver [options] [thrift server options]
Thrift server options: Use value for given property
其中 [options] 是 Thrift Server 启动一个 SparkSQL 应用程序的参数,如果不设置 --master 的话,将在启动 Thrift Server 的机器以 local 方式运行,只能通过 http:// 机器名 :4040 进行监控;这部分参数,可以参照 Spark1.0.0 应用程序部署工具 spark-submit 的参数。在集群中提供 Thrift Server 的话,一定要配置 master 、 executor-memory 等参数。
[thrift server options] 是 Thrift Server 的参数,可以使用 -dproperty=value 的格式来定义;在实际应用上,因为参数比较多,通常使用 conf/hive-site.xml 配置。
可以在任意节点启动 bin/beeline ,用 !connect jdbc:hive2://hadoop2:10000 连接 ThriftServer ,因为没有采用权限管理,所以用户名用运行 bin/beeline 的用户 hadoop ,密码为空:
$cd /app/hadoop/spark-1.1.0/bin
$./beeline
beeline>!connect jdbc:hive2://hadoop2:10000
第一步 显示 hive 数据库所有表
beeline>show database;
beeline>use hive;
beeline>show tables;
第二步 创建表 testThrift
beeline>create table testThrift(field1 String , field2 Int);
beeline>show tables;
第三步 把 tbStockDetail 表中金额大于 3000 插入到 testThrift 表中
beeline>insert into table testThrift select ordernumber,amount from tbStockDetail where amount>3000;
beeline>select * from testThrift;
第四步 重新创建 testThrift 表中,把年度最大订单插入该表中
beeline>drop table testThrift;
beeline>create table testThrift (field1 String , field2 Int);
beeline>insert into table testThrift select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear;
beeline>select * from testThrift;
spark-sql>select c.theyear, count(distinct a.ordernumber) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;
Stage 监控页面:
查看 Details for Stage 28
spark-sql>select c.theyear,c.themonth,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,c.themonth order by sumofamount desc limit 10;
Stage 监控页面:
在其第一个 Task 中,从本地读入数据
在后面的 Task 是从内存中获取数据
第一步 缓存数据
beeline>cache table tbStock;
beeline>select count(*) from tbStock;
第二步 运行 4.2.4 中的“计算所有订单月销售额前十名”
beeline>select count(*) from tbStock;
本次计算划给 11.233 秒,查看 webUI ,数据已经缓存,缓存率为 100% :
第三步 在另外节点再次运行
在 hadoop3 节点启动 bin/beeline ,用 !connect jdbc:hive2://hadoop2:10000 连接 ThriftServer ,然后直接运行对 tbStock 计数(注意没有进行数据库的切换):
用时 0.343 秒,再查看 webUI 中的 stage :
Locality Level 是 PROCESS ,显然是使用了缓存表。
从上可以看出, ThriftServer 可以连接多个 JDBC/ODBC 客户端,并相互之间可以共享数据。顺便提一句, ThriftServer 启动后处于监听状态,用户可以使用 ctrl+c 退出 ThriftServer ;而 beeline 的退出使用 !q 命令。
有了 ThriftServer ,开发人员可以非常方便的使用 JDBC/ODBC 来访问 SparkSQL 。下面是一个 scala 代码,查询表 tbStockDetail ,返回 amount>3000 的单据号和交易金额:
第一步 在 IDEA 创建 class6 包和类 JDBCofSparkSQL
参见《 Spark 编程模型(下) --IDEA 搭建及实战》在 IDEA 中创建 class6 包并新建类 JDBCofSparkSQL 。该类中查询 tbStockDetail 金额大于 3000 的订单:
package class6
import java.sql.DriverManager
object JDBCofSparkSQL {
def main(args: Array[String]) {
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000/hive", "hadoop", "")
try {
val statement = conn.createStatement
val rs = statement.executeQuery("select ordernumber,amount from tbStockDetail where amount>3000")
while (rs.next) {
val ordernumber = rs.getString("ordernumber")
val amount = rs.getString("amount")
println("ordernumber = %s, amount = %s".format(ordernumber, amount))
} catch {
case e: Exception => e.printStackTrace
conn.close
第二步 查看运行结果
在 IDEA 中可以观察到,在运行日志窗口中没有运行过程的日志,只显示查询结果
第三步 查看监控结果
从 Spark 监控界面中观察到,该 Job 有一个编号为 6 的 Stage ,该 Stage 有 2 个 Task ,分别运行在 hadoop1 和 hadoop2 节点,获取数据为 NODE_LOCAL 方式。
在 hadoop2 中观察 Thrift Server 运行日志如下:
|
|
月球上的南瓜 · DiffDefense:通过扩散模型防御对抗性攻击,arXiv - CS - Computer Vision and Pattern Recognition - X-MOL 2 年前 |
|
|
很酷的鸡蛋 · JSP 开发环境搭建 | 菜鸟教程 2 年前 |