相关文章推荐
火爆的围巾  ·  iOS AFN Error ...·  1 年前    · 
阳刚的豆腐  ·  how to set maxlength ...·  1 年前    · 
绅士的匕首  ·  Java8 Stream ...·  1 年前    · 

最近有一个朋友问我,Flink使用窗口计算的时候,如果设置了允许数据晚到时间,这个时候又来了一条属于前面窗口的数据,但是前面窗口的计算结果已经写到mysql了,怎么更正之前的结果,而不是在写入一条数据呢?今天这篇文章就来介绍一下怎么使用Flink的窗口函数更新之前计算的不完全的(不准确的结果)

对于晚到的数据,一般有几种常用的处理方式:

1,直接丢弃掉(这个也是窗口的默认做法,也就是说一个迟到的元素不会创建一个新的窗口)

2,用测流输出的方式,拿到晚到的元素,可以进行,相关的逻辑处理,或者保存起来.

3,更新之前的窗口计算结果,也就是今天要介绍的做法.

由于存在晚到的元素,所以已经计算出的窗口结果是不准确和不完全的。我们可以使用迟到元素更新已经计算完的窗口结果。

如果我们要求一个operator支持重新计算和更新已经发出的结果,就需要在第一次发出结果以后也要保存之前所有的状态。但显然我们不能一直保存所有的状态,肯定会在某一个时间点将状态清空,而一旦状态被清空,结果就再也不能重新计算或者更新了。而迟到的元素只能被抛弃或者发送到侧输出流。

window operator API提供了方法来明确声明我们要等待迟到元素。当使用event-time window,我们可以指定一个时间段叫做allowed lateness。window operator如果设置了allowed lateness,这个window operator在水位线没过窗口结束时间时也将不会删除窗口和窗口中的状态。窗口会在一段时间内(allowed lateness设置的)保留所有的元素。

当迟到元素在allowed lateness时间内到达时,这个迟到元素会被实时处理并发送到触发器(trigger)。当水位线没过了窗口结束时间+allowed lateness时间时,窗口会被删除,并且所有后来的迟到的元素都会被丢弃。

首先来看一下具体的代码实现:</

最近有一个朋友问我,Flink使用窗口计算的时候,如果设置了允许数据晚到时间,这个时候又来了一条属于前面窗口的数据,但是前面窗口的计算结果已经写到mysql了,怎么更正之前的结果,而不是在写入一条数据呢?今天这篇文章就来介绍一下怎么使用Flink的窗口函数更新之前计算的不完全的(不准确的结果)对于晚到的数据,一般有几种常用的处理方式:1,直接丢弃掉(这个也是窗口的默认做法,也就是说一个迟到的元素不会创建一个新的窗口)2,用测流输出的方式,拿到晚到的元素,可以进行,相关的逻辑处理,或者保存起.
Flink 的一个重要特性就是有状态计算(stateful processing)。 Flink 提供了简单易用的 API 来存储和获取状态。但是,我们还是要理解 API 背后的原理,才能更好的使用。本文分为 3 个部分: Flink 支持的三种 State Backend state 文件格式 state 持久化及故障恢复 我们首先看下 state 究竟存储在哪里。 State 存储方式 Fli...
import org.apache. flink .api.scala.createTypeInformation import org.apache. flink .streaming.api.TimeCharacteristic import org.apache. flink .streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache. flink .streaming.api.scala.function.ProcessAllW
对于,我们说聚合统计的结果 数据 来说,使用插入肯定是不行的.因为涉及到,聚合统计,比如 计算 数据 的个数,获取平均数这样,这种的,得出的结果,需要首先把 数据 撤回,然后再生成 一个新的 数据 . 这里在 flink 中就有个模式,叫做 更新 模式 ,可以看到: 这里有3中模式, 1.追加模式:表只是做插..
Window( 窗口 计算) 窗口 计算是流计算的核心,通过使用 窗口 对无限的流 数据 划分成固定大小的 buckets,然后基于落入同一个bucket( 窗口 )中的元素执行计算。 Flink 窗口 计算分为两大类。 一类基于keyed-stream 窗口 计算。 stream .keyBy(...) <- 分组 .window(...) <- 必须: "assigner" 窗口 分配器 [.trigger(...)]
本文将重点跟大家讲解 Flink 的状态管理机制,包括状态要解决的问题、 Flink 几种不同类型的状态、Keyed State和Operator List State的使用方法等。相关代码参见的github:https://github.com/luweizheng/ flink -tutorials。图片文字均为原创,转载请联系本专栏。 为什么要管理状态 有状态的计算是流处理框架要实现的重要功能,因...
在做用户画像标签时,很多实时指标都需要追溯历史 数据 ,如用户截止此刻的订单数,需要将用户有史以来的所有订单数进行统计,而实时 数据 一般只能回溯7天。这种情况下就需要先将离线 数据 预先加载到sink端,或者将 数据 保存到 Flink State中去。 如果离线 数据 计算结果 在mysql中,那么可以使用类似维表的方式直接进行Stream操作 加载离线写入State,实时作业以此restore 理论上是可以的,但是按照我上面的写法,代价比较大,因为我的作业中有subtask会处于Finished状态,导致不会触发ch.
import org.apache. flink .api.java.io.jdbc.JDBCOutputFormat import org.apache. flink .api.scala._ import org.apache... 【Flink 实战系列】Could not find any factory for identifier ‘wechat-alarm‘ that implements ‘org.apache.fli Flink SQL Gateway REST Endpoint 使用第二弹