Flink Table API & SQL
一、整体介绍
Table API & SQL就是批流统一的上层处理API
1、Flink Table API & SQL的特点:
Flink将Table API & SQL作为未来的核心API,因为其具有一些非常重要的特点:
- 声明式:用户只关心做什么,不用关心怎么做
- 高性能:支持查询优化,可以获取更好的执行性能
- 流批统一:相同的统计逻辑,既可以流模式运行,也可以批模式运行
- 标准稳定:语义遵循SQL标准,不易变动
- 易理解:语义明确,所见及所得
2、 Table AP I& SQL发展历程
架构升级
Flink 1.9 之前,流处理和批处理有各自独立的api ,而且有不同的执行计划解析过程,codegen过程也完全不一样,完全没有流批一体的概念,面向用户不太友好
Flink1.9之后,有两个查询处理器Flink Query Processor和Blink Query Processor。新增加了Blink Planner,新的代码及特性会在Blink planner模块上实现。批或者流都是通过解析为Stream Transformation来实现的
查询处理器的选择
Blink Query Processor查询处理器实现流批作业接口的统一,底层的 API 都是Transformation
API稳定性
Flink的上层API问题还很多,主要是类型转换、时间属性等,距离稳定商用还差很远,如果项目一定要选择Flink,建议使用相对稳定的DataStream API
性能对比
目前FlinkSQL性能不如SparkSQL,未来FlinkSQL可能会越来越好
3、动态表(Dynamic Table)
动态表是Flink对流数据的 Table API & SQL的核心概念,表示Table是不断变化的。
查询一个动态表会产生持续查询(Continuous Query),,连续查询永远不会终止,并会生成另一个动态表
流式表查询的处理过程:
- 流被转换为动态表
- 对动态表计算连续查询,生成新的动态表
- 生成的动态表被转换回流
通过建立动态表和连续查询来实现在无界流中的SQL操作 。在Continuous上面有个state,表示查询出来的结果会存储在State中,接下来Flink最终还是使用流来进行处理。
所以,可以理解为Flink的Table API和SQL,是一个 逻辑模型 ,通过该逻辑模型可以让我们的数据处理变得更加简单。
二、 API调用
1、创建TableEnvironment
TableEnvironment是flink中集成Table API & SQL的核心概念。所有对表的操作都基于 TableEnvironment
- 注册 Catalog
- 在 Catalog 中注册表
- 执行 SQL 查询
- 注册用户自定义函数(UDF)
val tableEnv = StreamTableEnvironment.create(env, settings)
2、创建表
TableEnvironment 可以调用 .connect() 方法,连接外部系统,并调用 .createTemporaryTable() 方法,在 Catalog 中注册表,表可以是常规的,也可以是虚拟的(视图,View)
常规表 (Table):一般可以用来描述 外部数据 ,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来。
视图 :可以从现有的表中创建,通常是table API或者SQL查询的一个结果。
tableEnv
.connect(...) // 定义表的数据来源,和外部系统建立连接
.withFormat(...) // 定义数据格式化方法
.withSchema(...) // 定义表结构
.createTemporaryTable("MyTable") // 创建临时表
3、表的查询
1、TableAPI
// 简单转换
Table inputTable = tableEnv.from("inputTable");
Table resultTable = inputTable.select("id, temperature")
.filter("id === 'sensor_6'");
// 聚合统计
Table aggTable = inputTable.groupBy("id")
.select("id, id.count as count, temperature.avg as avgTemp");
2、SQL查询
Flink的SQL查询,基于实现了SQL 标准的 Apache Calcite
在Flink中,用常规字符串来定义SQL查询语句。
SQL 查询的结果,是一个新的 Table 。
// 简单转换
tableEnv.sqlQuery("select id, temperature from inputTable where id = 'senosr_6'");
// 聚合统计
tableEnv.sqlQuery("select id, count(id) as cnt, avg(temperature) as avgTemp from inputTable group by id");
4、将DataStream 转换成表
val sensorTable: Table = tableEnv.fromDataStream(dataStream)
5、输出表
输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中
resultTable.insertInto("outputTable")
6、更新模式:
与外部系统交换的消息类型,由更新模式( Update Mode) 指定
1、追加模式:inAppendMode()
动态表和外部连接器只交换插入(Insert)消息
2、撤回模式:inRetractMode()
表和外部连接器交换添加(Add)和撤回(Retract)消息
- 插入(Insert)会被编码为添加消息;
- 删除(Delete)则编码为撤回消息;
- 更新( Update)编码为上一条的撤回(Retract) 和下一条的添加(Add)消息
3、更新插入模式:.inUpsertMode()
动态表和外部连接器交换Upsert和Delete消息,需要一个唯一的key,通过这个key可以传递更新消息
- 更新和插入都被编码为Upsert消息
- 删除编码为Delete消息
tableEnv.connect(new FileSystem().path("output.txt")) // 定义到文件系统的连接
.withFormat(new Csv()) // 定义以csv格式进行数据格式化
.withSchema(new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE()))
.createTemporaryTable("outputTable"); //创建临时表
resultTable.insertInto("outputTable");
6、将Table转换成DataStream
1、追加模式(Append Mode)
用于表只会被插入的场景
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
2、撤回模式(Retract Mode)
- 用于任何场景
- 得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据( Insert),还是被删除的数据( Delete)
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(aggTable, Row.class);
三、时间特性
1、处理时间( . proctime)
基于本地的机器时间,是一种最简单的时间语义,但是不能保证结果一致性。
它既不需要提取时间戳,也不需要生成 watermark
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外字段,作为处理时间属性
) WITH (
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分钟的滚动窗
2、事件时间( .rowtime )
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 声明user_action_time作为事件时间属性,并允许5S的延迟
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
四、窗口
1、Group Windows
Group Windows 定义在 SQL 查询的 Group By 子句中
①TUMBLE(time_attr, interval):定义一个滚动窗口,第一个参数是时间字段,第二个参数是 窗口长度 。
②HOP(time_attr, interval, interval):定义一个滑动窗口,第一个参数是时间字段,第二个参数是 窗口滑动步长 ,第三个是 窗口长度 。
③SESSION(time_attr, interval):定义一个会话窗口,第一个参数是时间字段,第二个参数是 窗口间隔 Gap。
还有一些辅助函数,可以用来选择Group Window的开始和结束时间戳,以及时间属性。
//滑动和会话窗口是类似的(HOP_*,SESSION_*)
TUMBLE_START(time_attr, interval)
TUMBLE_END(time_attr, interval)
TUMBLE_ROWTIME(time_attr, interval)
TUMBLE_PROCTIME(time_attr, interval)
select userId
,count(orderId) as orderCount
,max(money) as maxMoney
,min(money) as minMoney
from t_order
group by userId
tumble(createTime, INTERVAL '10' SECOND)
2、Over Windows
- 用 Over 做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是相同的分区、排序和范围
- 目前仅支持在当前行范围之前的窗口
- ORDER BY 必须在单一的时间属性上指定
SELECT COUNT(amount) OVER (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
// 也可以做多个聚合
SELECT COUNT(amount) OVER w, SUM(amount) OVER w