首发于 flink系列

Flink Table API & SQL

一、整体介绍

Table API & SQL就是批流统一的上层处理API

1、Flink Table API & SQL的特点:

Flink将Table API & SQL作为未来的核心API,因为其具有一些非常重要的特点:

  • 声明式:用户只关心做什么,不用关心怎么做
  • 高性能:支持查询优化,可以获取更好的执行性能
  • 流批统一:相同的统计逻辑,既可以流模式运行,也可以批模式运行
  • 标准稳定:语义遵循SQL标准,不易变动
  • 易理解:语义明确,所见及所得

2、 Table AP I& SQL发展历程

架构升级

Flink 1.9 之前,流处理和批处理有各自独立的api ,而且有不同的执行计划解析过程,codegen过程也完全不一样,完全没有流批一体的概念,面向用户不太友好

Flink1.9之后,有两个查询处理器Flink Query Processor和Blink Query Processor。新增加了Blink Planner,新的代码及特性会在Blink planner模块上实现。批或者流都是通过解析为Stream Transformation来实现的

查询处理器的选择

Blink Query Processor查询处理器实现流批作业接口的统一,底层的 API 都是Transformation

API稳定性

Flink的上层API问题还很多,主要是类型转换、时间属性等,距离稳定商用还差很远,如果项目一定要选择Flink,建议使用相对稳定的DataStream API

性能对比

目前FlinkSQL性能不如SparkSQL,未来FlinkSQL可能会越来越好

3、动态表(Dynamic Table)

动态表是Flink对流数据的 Table API & SQL的核心概念,表示Table是不断变化的。

查询一个动态表会产生持续查询(Continuous Query),,连续查询永远不会终止,并会生成另一个动态表

流式表查询的处理过程

流式表查询的处理过程:

  • 流被转换为动态表
  • 对动态表计算连续查询,生成新的动态表
  • 生成的动态表被转换回流

通过建立动态表和连续查询来实现在无界流中的SQL操作 。在Continuous上面有个state,表示查询出来的结果会存储在State中,接下来Flink最终还是使用流来进行处理。

所以,可以理解为Flink的Table API和SQL,是一个 逻辑模型 ,通过该逻辑模型可以让我们的数据处理变得更加简单。

二、 API调用

1、创建TableEnvironment

TableEnvironment是flink中集成Table API & SQL的核心概念。所有对表的操作都基于 TableEnvironment

  • 注册 Catalog
  • 在 Catalog 中注册表
  • 执行 SQL 查询
  • 注册用户自定义函数(UDF)
val tableEnv = StreamTableEnvironment.create(env, settings)

2、创建表

TableEnvironment 可以调用 .connect() 方法,连接外部系统,并调用 .createTemporaryTable() 方法,在 Catalog 中注册表,表可以是常规的,也可以是虚拟的(视图,View)

常规表 (Table):一般可以用来描述 外部数据 ,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来。

视图 :可以从现有的表中创建,通常是table API或者SQL查询的一个结果。

   tableEnv
  .connect(...)    // 定义表的数据来源,和外部系统建立连接   
  .withFormat(...)    // 定义数据格式化方法   
  .withSchema(...)    // 定义表结构   
  .createTemporaryTable("MyTable")    // 创建临时表

3、表的查询

1、TableAPI

        // 简单转换
        Table inputTable = tableEnv.from("inputTable");
        Table resultTable = inputTable.select("id, temperature")
                .filter("id === 'sensor_6'");
        // 聚合统计
        Table aggTable = inputTable.groupBy("id")
                .select("id, id.count as count, temperature.avg as avgTemp");

2、SQL查询

Flink的SQL查询,基于实现了SQL 标准的 Apache Calcite

在Flink中,用常规字符串来定义SQL查询语句。

SQL 查询的结果,是一个新的 Table

 // 简单转换
tableEnv.sqlQuery("select id, temperature from inputTable where id = 'senosr_6'");
// 聚合统计
tableEnv.sqlQuery("select id, count(id) as cnt, avg(temperature) as avgTemp from inputTable group by id");

4、将DataStream 转换成表

val sensorTable: Table = tableEnv.fromDataStream(dataStream)

5、输出表

输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中

resultTable.insertInto("outputTable")    

6、更新模式:

与外部系统交换的消息类型,由更新模式( Update Mode) 指定

1、追加模式:inAppendMode()

动态表和外部连接器只交换插入(Insert)消息

2、撤回模式:inRetractMode()

表和外部连接器交换添加(Add)和撤回(Retract)消息

  • 插入(Insert)会被编码为添加消息;
  • 删除(Delete)则编码为撤回消息;
  • 更新( Update)编码为上一条的撤回(Retract) 和下一条的添加(Add)消息

3、更新插入模式:.inUpsertMode()

动态表和外部连接器交换Upsert和Delete消息,需要一个唯一的key,通过这个key可以传递更新消息

  • 更新和插入都被编码为Upsert消息
  • 删除编码为Delete消息
      tableEnv.connect(new FileSystem().path("output.txt")) // 定义到文件系统的连接
                .withFormat(new Csv()) // 定义以csv格式进行数据格式化
                .withSchema(new Schema()
                        .field("id",DataTypes.STRING())
                        .field("timestamp",DataTypes.BIGINT())
                        .field("temperature",DataTypes.DOUBLE()))
                .createTemporaryTable("outputTable"); //创建临时表
        resultTable.insertInto("outputTable");

6、将Table转换成DataStream

1、追加模式(Append Mode)

用于表只会被插入的场景

DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);

2、撤回模式(Retract Mode)

  • 用于任何场景
  • 得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据( Insert),还是被删除的数据( Delete)
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(aggTable, Row.class);

三、时间特性

1、处理时间( . proctime)

基于本地的机器时间,是一种最简单的时间语义,但是不能保证结果一致性。

它既不需要提取时间戳,也不需要生成 watermark

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外字段,作为处理时间属性
) WITH (
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分钟的滚动窗

2、事件时间( .rowtime

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 声明user_action_time作为事件时间属性,并允许5S的延迟  
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

四、窗口

1、Group Windows

Group Windows 定义在 SQL 查询的 Group By 子句中

①TUMBLE(time_attr, interval):定义一个滚动窗口,第一个参数是时间字段,第二个参数是 窗口长度

②HOP(time_attr, interval, interval):定义一个滑动窗口,第一个参数是时间字段,第二个参数是 窗口滑动步长 ,第三个是 窗口长度

③SESSION(time_attr, interval):定义一个会话窗口,第一个参数是时间字段,第二个参数是 窗口间隔 Gap。

还有一些辅助函数,可以用来选择Group Window的开始和结束时间戳,以及时间属性。

//滑动和会话窗口是类似的(HOP_*SESSION_*
TUMBLE_START(time_attr, interval)
TUMBLE_END(time_attr, interval)
TUMBLE_ROWTIME(time_attr, interval)
TUMBLE_PROCTIME(time_attr, interval)
select userId
,count(orderId) as orderCount
,max(money) as maxMoney
,min(money) as minMoney
from t_order
group by userId
tumble(createTime, INTERVAL '10' SECOND)

2、Over Windows

  • 用 Over 做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是相同的分区、排序和范围
  • 目前仅支持在当前行范围之前的窗口
  • ORDER BY 必须在单一的时间属性上指定
SELECT COUNT(amount) OVER (
  PARTITION BY user
  ORDER BY proctime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
// 也可以做多个聚合
SELECT COUNT(amount) OVER w, SUM(amount) OVER w