大家好,请教一个问题
我有一条进行
session window
的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
一直是 No Watermark。 暂时找不到排查问题的思路。
Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了
EventTime mode 模式
,
Blink Planner。
create
table
test
(
user_id
varchar
,
action
varchar
,
event_time
TIMESTAMP
(
3
)
,
WATERMARK
FOR
event_time
AS
event_time
-
INTERVAL
'10'
SECOND
)
with
(
.
.
.
)
;
insert
into
console
select
user_id
,
f_get_str
(
bind_id
)
as
id_list
select
action
as
bind_id
,
user_id
,
event_time
SELECT
user_id
,
action
,
PROCTIME
(
)
as
proc_time
,
event_time
FROM
test
where
user_id
is
not
null
and
user_id
<>
''
and
CHARACTER_LENGTH
(
user_id
)
=
24
group
by
SESSION
(
event_time
,
INTERVAL
'10'
SECOND
)
,
user_id
关于这个问题我进行了一些 debug,发现了 watermark 对应的一个
physical relnode
是
StreamExecWatermarkAssigner
在
translateToPlanInternal
中生成了如下一个 class 代码:
public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator {
public WatermarkGenerator$2(Object[] references) throws Exception { }
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { }
@Override
public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12);
field$3 = null;
if (!isNull$3) {
field$3 = row.getTimestamp(12, 3);
isNull$4 = isNull$3 || false; result$5 = null;
if (!isNull$4) {
result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond());
if (isNull$4) {
return null;
} else {
return result$5.getMillisecond();
@Override
public void close() throws Exception { } }
其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond());
确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
的定义获取的watermark。
在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
hi, 你的意思是没有办法在codegen出来的代码上加断点的意思吗?
这里倒是有一个比较hack的方法:
将生成的类放在一个java文件之中,然后修改改下GeneratedClass
下的newInstance
方法,如果classname ==“WatermarkGenerator$2”
则将刚才的类则返回 new WatermarkGenerator$2
这个类。
我个人对于问题的猜测是有一条数据的rowtime远远晚于其他数据,从而将整体的watermark提得很高,导致后面的“晚到”的数据一直无法触发watermark的生成。
问题的原因定位到了。
由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
发现在 WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)
这个 op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
定位到由于 kafka 部分 partition 无数据导致 No watermark 加上 table.exec.source.idle-timeout = 10s
参数即可。
当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
java中两个一维数组相加 java一维数组和二维数组
一维数组1.定义(类型名[] 数组名 = new 类型名[数组长度])public class Hee {
public static void main(String[] args){
int[] arr = new int[5];//这里定义了一个长度为5的一维数组
}2.调用(数组名[索引])import java.util.Scanner;
public class Hee