-
可以通过create 语句控制字段个数和顺序 , 决定后面insert into的select语句中的字段顺序受影响,所以insert into的select只能跟create定义的顺序一样,不能改别名
例如create table mysink (id varchar ,name varchar) with(...)
-
不可以通过insert 或select语句控制输出个数
-
insert into mysink (id ,name ) select * from source是错误的
-
insert into mysink select id from source是错误的(select中的字段一定要跟create语句中一致--->如insert into mysink select id,name from source)
-
可以在create语句中使用udf
-
比如可以解决,source没有time字段,无法定义watremark,可以通过udf解析得到time字段
GetHeartTime是udf,返回timestamp
CREATE TABLE sourceTable (
request_uri STRING,
ts as GetHeartTime(request_uri),
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH (
'connector.type' = 'socket',
'connector.host' = 'localhost',
'connector.port' = '21'
- flinksql 1.10.0想要通过sql创建sql函数时 , 如果时tablefunction,需要重写getResultTypepublic
//flinksql 1.10.0
public class ParseUriRow extends TableFunction<Row> {
public void eval(String data) {
Row row = new Row(fnames.length);
collect(row);
@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(
Types.STRING
public class Split2Tuple extends TableFunction<Tuple2<String, String>> {
public void eval(String str) {
Tuple2 tuple = new Tuple2();
//根据业务
String[] split = str.split(",");
tuple.setField(split[0],0);
tuple.setField(split[1],1);
collect(tuple);//TableFunction可以collect多次(一行转多行)
@Override
public TypeInformation<Tuple2<String, String>> getResultType() {
return Types.TUPLE(Types.STRING,Types.STRING);
//在1.11版本中做修改,使用注解或者重写getTypeInference方法
public class Split2Row extends TableFunction<Row> {
* Row形式返回不限制,但需要重写getResultType
//@FunctionHint(output = @DataTypeHint("ROW<s STRING, i STRING>"))
public void eval(String str) {
//根据业务
Row row = new Row(2);
String[] split = str.split(",");
row.setField(0,split[0]);
row.setField(1,split[1]);
collect(row);//TableFunction可以collect多次(一行转多行)
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
// 指定输入参数的类型,必要时参数会被隐式转换
.typedArguments(STRING())
// specify a strategy for the result data type of the function
.outputTypeStrategy(callContext -> {
return Optional.of(DataTypes.ROW(DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())));
.build();
tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
" id bigint, " +
" game_id varchar, " +
" PRIMARY KEY (id) NOT ENFORCED " +
" ) " +
" with ( " +
"'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://xxxxx:3306/flinksql?useSSL=false' , " +
" 'username' = 'root' , " +
" 'password' = 'root', " +
" 'table-name' = 'mysqlsink' , " +
" 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
" 'sink.buffer-flush.interval' = '2s', " +
" 'sink.buffer-flush.max-rows' = '300' " +
" )");
tableEnvironment.executeSql("insert overwrite mysqlsink select a,cast(b as varchar) b from mySource");
月初的时候,Flink 终于发布 1.11.0 版本, CDC 的功能还是比较期待的(虽然比预期差很多)当然是升级一波了最新的代码已经上传到 GitHub :https://github.com/springMoon/sqlSubmit跑 sqlSubmit 的代码,随便来个 kafka to kafka 的sql在执行这句的时候:env.execute(Common.jobName)...
Flink Table Sink 的三种模式本质上是如何监控结果表并产生 changelog,这可以应用于所有需要将表转为流的场景,包括同一个 Flink 应用的不同表间的联动。三种模式中 Append 模式只支持表的INSERT,最为简单;Upsert 模式依赖业务主键提供INSERTUPDATE和DELETE全部三类变更,比较实用;Retract 模式同样支持三类变更且不要求业务主键,但会将UPDATE翻译为旧数据的撤回和新数据的累加,实现上比较复杂。
1.抽象出来一个动态表,并未进行存储,是Flink支持流数据的tableAPI和sql的核心概念,随时间变化的,查询动态表会生成一个连续的查询,结果是一个动态表2.hive进入命令行需要先启动元数据服务,在查数据的时候数据是不变的3.除非是有界流,否则连续的查询是不会停止的4.将流转化(定义)成动态表,在动态表上计算一个连续的查询,生成一个新的动态表,最后转换成流,连续查询从不停止,会根据输入表上的更新对结果表进行更新。......
Flink SQL的Sink表只支持全字段数据插入,不支持指定字段数据插入和更新操作,那后面结果表增加字段如何处理?
首先我们需要给Sink指定主键,如果输出存储是声明了主键(primary key)的数据库(例如,RDS/ES/HBASE等),数据流的输出结果有以下2种情况:
如果根据主键查询的数据在数据库中不存在,则会将该数据插入数据库。
如果根据主键查询的数据在数据库中存在,则会根据主键更新数据。
这里跟mysql数据库的for update效果一样,所以我们可以使用这个特性进行分批次插入。解决了
IS NULL 、 IS NOT NULL
--非空判断
value1 IS DISTINCT FROM value2、value1 IS NOT DISTINCT FROM value2、
--不同于
value1 BETWEEN [ ASYMMETRIC | SYMMETR
Insert into select请慎用。这天xxx接到一个需求,需要将表A的数据迁移到表B中去做一个备份。本想通过程序先查询查出来然后批量插入。但xxx觉得这样有点慢,需要耗费大量的网络I/O,决定采取别的方法进行实现。通过在Baidu的海洋里遨游,他发现了可以使用insert into select实现,这样就可以避免使用网络I/O,直接使用SQL依靠数据库I/O完成,这样简直不要太棒了。然后他就被开除了。
事故发生的经过。
由于数据数据库中order_today数据量过大,当时好像有700W了
Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。
Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域
Table API & SQL
Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet A...
Flink 1.9 版本开源了很多 Blink 方面的功能,尤其是在 SQL 方面,这使得我们在开发 Flink 实时任务变得更加方便。目前 Blink SQL 支持了 Create Table 功能,以及维表的功能。我们的实时任务整体流程为,读取Kafka的数据,然后去关联 HBase 维表的数据,最后在输出到 Kafka 中,虽然整体流程跑通,但是其中也遇到了很多坑,这里记录一下,和大家...
文章目录碎碎念1. Flink的特点1.1 Flink vs Spark Streaming2. Java快速使用3. Flink部署模式3.1 Standalone模式3.2 yarn模式1. Sesstion Cluster模式2. Per Job Cluster 模式4. Flink运行架构4.1 Flink运行时的组件作业管理器(JobManager)资源管理器(ResourceManager)任务管理器(TaskManager)分发器(Dispatcher)4.2 作业提交流程4.3 作业调度原理
导读: 最近在看 Flink 源码的时候发现到一段实用的代码,该代码实现了 java 动态编译以及生成 jar 文件。将其进行改进后可以应用到我们的平台上,实现在平台页面上编写 java 代码语句,提交后由后台进行编译和打成 Jar 包再上传到指定的文件存储系统,从而代替之前在本地自己手动打 UDF 包的方式。下面我将对这段代码做一些简单分析,希望对各位有所帮助。核心代码public class ...
public class RecordSeclectId
implements KeySelector<Record, String> {
private static final long serialVersionUID = 4780234853172462378L;
@Override
public String getKey(Record value) throw...