一,flink cep简介

flink cep 主要用来处理复杂事件的库,如实时提取一段时间内股票涨,跌,涨,跌的事件并作出对应的决策。
官网地址https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/libs/cep.html

二,demo

需求1:要在事件序列中提取 连续数据是 10,20,10 的事件
需求2:要在事件序列中提取 连续数据是 10 (10-50)+ 0 (第一个数是10,后续连续到来一个或者多个10到50之间的数字(终止条件:出现大于50的数字))的事件 最后到来一个是0的数字
如下代码实现

* cepDemo create by 梁丰 object DemoFlinkCep1 { case class EventData(value:Int) def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment environment.setParallelism(1) //输入数据 val input: DataStream[String] = environment.socketTextStream("liangfeng02",9999) //模拟股票数据 val sharesStream: DataStream[EventData] = input.map(x=>EventData(x.toInt)) //sharesStream.print() //创建匹配模式 val pattern: Pattern[EventData, EventData] = createPattern1 //pattern 和输入流进行关联 val patternStream: PatternStream[EventData] = CEP.pattern(sharesStream,pattern) //打印结果 printPatternStreamMethod2(patternStream) environment.execute("flinkDemo") * 创建匹配模式 10 20 10 * @return def createPattern() ={ val pattern: Pattern[EventData, EventData] = Pattern .begin[EventData]("start").where(_.value.equals(10)) .next("next").where(_.value.equals(20)) .next("next1").where(_.value.equals(10)) pattern * 创建匹配模式 10 (10-50)+ 0 * @return def createPattern1() ={ val pattern: Pattern[EventData, EventData] = Pattern .begin[EventData]("start").where(_.value.equals(10)) .next("next1").oneOrMore.where(_.value > 10).until(_.value > 50) .followedBy("end").where(_.value.equals(0)) pattern * 打印匹配的事件 * @param patternStream def printPatternStreamMethod1(patternStream: PatternStream[EventData]): Unit ={ val result: DataStream[Map[String, Iterable[EventData]]] = patternStream.select((data: Map[String, Iterable[EventData]]) => { val maybeLongs: Option[Iterable[EventData]] = data.get("start") result.print() * 打印匹配的事件和超时的事件 * @param patternStream def printPatternStreamMethod2(patternStream: PatternStream[EventData]): Unit ={ val opt: OutputTag[Map[String, Iterable[EventData]]] = OutputTag[Map[String, Iterable[EventData]]]("outPutTag") val result: DataStream[Map[String, Iterable[EventData]]] = patternStream.flatSelect(opt) { (data: Map[String, Iterable[EventData]], timeStap: Long, out: Collector[Map[String, Iterable[EventData]]]) => {out.collect(data)} } { (data: Map[String, Iterable[EventData]], out: Collector[Map[String, Iterable[EventData]]]) => {out.collect(data)}} result.print("factEvent:") result.getSideOutput(opt).print("outTimeEvent:")

如上需求和代码 其中重点就是创建匹配模式具体的api可以官网上面详细查看,接下来我们探究下如何提取复杂事件模型

三,NFA~b 模型

以下内容参考 《基于复杂事件处理的模型和算法研究》-魏仕杰
一,查询的约束条件(sase+语言为例)
查询约束条件就是确定事件的约束条件
查询的约束条件分为两类:时序约束和谓词约束
时序约束:描述的是事件间的时序关系的约束
谓词约束:描述的是单个事件的属性的约束
时序约束分类:
1,序列算子:定义了模式匹配结构为按照时间先后顺序得到事件序列的顺序结构,用SEQ表示,在PATTERN子句中定义,例如定义SEQ(A,B,C) 事件类型 A,B,C 按照时间顺序A->B->C的方式匹配 在flink中类似 begin, next,follewby 方法对状态进行组合
2,时间窗口:规定一个时间,约束事件在这段时间内发生 在within 语句中定义。在flunk中和within 类似
3,克林闭包算子:表示一个事件发生一次或者多次的事件序列,如A+表示事件A发生一次或者多次的事件序列,类似flunk当中的oneormore()方法
4,扩展克林算子:对克林闭包的扩展,A[n,m]表示事件A发生n到m次的事件序列,类似flunk当中的time函数
谓词约束:是针对事件属性的约束,可以是单个事件的属性约束(如订单金额大于100)
也可以是事件间的属性约束(如当前订单金额大于前一笔订单金额)
类似flunk当中的where方法
二,NFA~b模型
NFA~b是非确定性有限自动机(NFA)结合缓存匹配的模型。是NFA模型的改进,其中匹配缓存用与在运行时存储查询出来的结果。
如下图示: 在这里插入图片描述

在这里插入图片描述 NFA~b模型由三个部分组成
1,状态
在非确定有限自动机中状态有两种情况:
a)非克林闭包事件由一个状态来表示
b)克林闭包事件由两个状态来表示
上图中每个圆圈代表一个状态其中 A事件分为两个A[1] 和A[i]。A[1]是该克林闭包算子的开始条件,当事件到达满足A[1]时候表示A事件开始(开始克林闭包算子),并且把A[1]匹配的事件放入A[1]状态对应的缓存当中,到达下一个状态A[i],当下一个事件到达和A[i]匹配的则放入A[i]所对应的缓存当中。当克林算子满足终止条件终止时候(例如A[] 只匹配十次 和flink的time(10)方法类似),表示匹配过程已经满足克林闭包查询条件约束,接着进入到下一个状态B,B是一个非克林闭包事件的状态,当B匹配成功同样把事件存入B的缓存当中进入到最终状态F,表示匹配结束
2,有向边
与状态有关的有向边代表事件到达该状态时可以采取的操作。执行操作的条件由模型指定的事件类型,约束条件,终止条件,事件选择策略等决定。
每条有向边代表着状态越迁的条件,对事件流的操作(是否消耗事件),是否把事件存入缓存之中,其中图中的实线表示消耗事件并且把事件存入缓存如begin take ,图中ignore对应的虚线表示消耗事件但是不存入缓存,即忽略这个事件,图中process对应的虚线表示不消耗事件也不存入缓存,此时事件进入下一个状态
有向边分为四种:
begin:消费事件存入缓存状态越迁到下一个状态
ignore:表示跳过不相关事件并且依然处于该状态被忽略事件不存入缓存,开始状态没有ignore边因为匹配必须从一个选定的事件开始,事件是否忽略和有向边的执行条件有关(下文讲解)
take:表示选择当前时间,消费事件并且存入缓存中,还是处于当前状态,是否take和有向边的执行条件有关(下文讲解)。
process:表示完成克林闭包的选择策略,如设置啦次数限制等终止条件或者时间窗口的限制,状态越迁到下一个状态 是否process和有向边的执行条件有关(下文讲解)
3,run(运行)
模型会产生多个可能的事件序列也就是多个run,每个run表示对到达事件的一次处理,处理结果存入缓存当中。一个run代表一个根据模型匹配的事件序列。一个事件到来会对多个run进行匹配。

NFA~b的条件设定
如上图中的 A[1]_begin ,表示begin有向边的条件(a[1].price=20),其他有向边也有对应的条件,有向边的执行条件有如下:
1,相关事件定义:在where语句中定义的谓词约束即位相关事件,如A[1]_begin 中的谓词约束 a[1].price=20,这些谓词约束用来设置begin和take执行的条件,可以是单个事件的属性值约束也可以是多个事件之间的关系约束,或者是对事件聚合后的结果约束。不满足约束的是无关事件,满足的即为相关事件。
2,事件选择策略:从事件流中选择和克林闭包算子相关的事件时,需要有相关的选择策略,如一些查询希望在事件流中查询连续的事件,而另外的希望查询不连续的事件。
skip till next match:在策略中忽略无关事件,对相邻的相关事件进行连续的匹配选择,注意是连续的相关事件。两个相关事件之间的相关事件都的选择。克林闭包使用该策略直到满足终止原则的条件,如上query1的查询选择从price从20开始并且不断增长的事件,忽略掉波动下降的事件。即选择连续的相关事件。
skip till any match:该策略选择的时候更加灵活,考虑如下情况
如果来的事件价格如下 20,30,100,40 ,50 根据上面的选择策略只能选择 20,30,100 的最长序列,但是如果100 是个特例呢,那么后面的40 ,50就丢失啦。此时我们可以略掉100的价格,则事件序列是20,30,40,50。即选择相关事件但是可以不连续,中间可以选择的忽略掉一些相关事件。对每一个到来的相关事件都可以对他作出非确定的决策:选择加入还是忽略跳过。
3,终止准则:克林闭包不能无限匹配选择下去,得设置一个终止准则,如上图中的a[i]_foced-stop = a[i].time-a[1].time>w || b_begin (到达设置的时间窗口事件,或者b事件开始)

四,NFA~b的缓存模型和数据结构

以下内容参考论文:https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf

一,Shared Versioned Match Buffer
共享的版本的匹配缓存:一个存储各个state中匹配的事件和事件间关系(带有版本)的共享的缓存
思考:如果到来一个事件可以take 也可以process 即由一个run分裂为两个run,那么这两个run的事件序列该如何存储呢?
实例:我们以以下实例进行分析
1,查询语句
查询描述:以大于1000股票交易量的开始 之后当价格持续增长时候 交易量大跌
在这里插入图片描述
2,事件序列:
在这里插入图片描述
如上序列:在状态为a[i]时,e6到来,此时750<999*0.8 满足 b的take条件,同时125>(100+120+120+121+120)/5 满足a[i]的take条件,此时一个run会分裂为两个如上的 R1和R3,此时两个run当中前五个事件是相同的,如果分别为每个run都建立一个缓存会有大量的冗余,而且每个事件都可能产生分裂,所以冗余量可能会很大下图中的三个run建立三个buffer a,b,c 数据会有大量重复,而d中带上版本号的数据就会减少冗余。

3,缓存模型
在这里插入图片描述
如上图所示,用右边的带有版本号的边去关联数据,来代替左边的三个buffer。
我们如何选出 e3,e4,e6 的事件序列 或e1,e2,e3,e4,e5,e6 的事件序列呢?
如果不带版本号的话 会出现回溯到e1,e2,e3,e4,e6 的事件序列,因为e6->e4->e3 中 e4 ->e3 存在两条边,而我们只需要 e6->e4->e3 的序列 。
解决方法:添加版本号,论文中的版本号规则如下
如上 y1.(yi)* 其中i代表当前的state, 如a[i] 中的事件 边的版本只有两个数,b中的版本有三个。yi的值是出现分裂就加一初始值为0 e4<-e5 版本为1.0 e5<-e6 分裂为两条 则在当前state中的e5<-e6 版本加一为 1.1 而到b state的 e5<-e6 数加一为 1.0.0。
那么我们如何通过生成的版本号进行回溯出来最终需要的事件序列呢?
解决方法:回溯时候长度相同的版本号只能最后一位数字不同且是累加的,长度不同的版本号短的版本号必须作为长的版本号的前缀。
如上图中 e4<-e6 的版本号是2.0.0 e3<-e4 有两个版本号,一个2.0 一个是1.0 显然1.0不是2.0.0的前缀所以被排除掉 选择2.0版本的 e3<-e4。最终得到 e3<-e4<-e6 序列 而不是 e1<-e2<-e3<-e4<-e6
flink中版本会有不同 是根据事件数来决定i的值(后文我们通过源码分析)yi的值是当前事件可以产生分裂边的数量决定

二,Computation state
计算状态,每个run会有一个当前的需要进行计算的计算状态,里面保存啦当前run的版本信息当前的所在的state 和一些需要和新到达事件进行比较以判断是否take ignore process的历史事件数据,每次新到达事件会根据Computation state 来进行判断是否take或 ignore或 process。
论文中的数据结构如下 在这里插入图片描述
保存啦版本好,当前状态,最近事件,开始事件事件 和一些聚合数据 如 :总金额总数量来计算平均金额来判断新到达事件是否满足a[i]的条件,还有交易量来判断新到达事件是否满足 b状态条件。当然还有数据结构的优化策略,这里不做介绍。
flink中Computation state会略有不同,下篇文章我们通过源码分析。

参考:
1《基于复杂事件处理的模型和算法研究》-魏仕杰
2 https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf

一,flink cep简介flink cep 主要用来处理复杂事件的库,如实时提取一段时间内股票涨,跌,涨,跌的事件并作出对应的决策。官网地址https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/libs/cep.html二,demo需求1:要在事件序列中提取 连续数据是 10,20,10 的事件需求2:要在...
Flink 是新的stream计算引擎,用java实现。既可以处理streamdata也可以处理batchdata,可以同时兼顾Spark以及Sparkstreaming的功能,与Spark不同的是, Flink 本质上只有stream的概念,batch被认为是specialstream。 Flink 在运行中主要有三个组件组成,JobClient,JobManager和TaskManager。主要工作 原理 如下图用户首先提交 Flink 程序到JobClient,经过JobClient的处理、解析、优化提交到JobManager,最后由TaskManager运行task。JobClient是 Flink 程序和
1.1 引入 在 Flink RichFunction&state这篇博文中我们一起学习了下如何结合使用keyBy state和TreeSet在一条无界流中进行全局的分组求top n操作,可以解决一些实时看板相关的业务问题。在 Flink BroadcastStream这篇博文中我们也学习到了如何使用广播流来处理监控规则经常变更的程序日志监控业务。 那么现在我们又遇到了一个新的业务需求:判断一个用户在点击了商品之后是否立即进行了下单付款操作,如果是的话将用户名和点击时间以及下单付款时
本文根据 Apache Flink 系列直播课程整理而成,由哈啰出行大数据实时平台资深开发刘博分享。通过一些简单的实际例子,从概念 原理 ,到如何使用,再到功能的扩展,希望能够给计划使用或者已经使用的同学一些帮助。 主要的内容分为如下三个部分: Flink CEP 概念以及使用场景。 如何使用 Flink CEP 。 如何扩展 Flink CEP Flink CEP 概念以及使用场景 1.什么是 CEP CEP 的意思是复杂事件处理,例如:起床–>洗漱–>吃饭–>上班等一系列串
复杂事件处理( CEP )是一种基于流处理的技术,将系统数据看作不同类型的事件,通 过 分析 事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最 终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和 分析 ,从实时数据中发掘有价值的信息 一. CEP 相关概念 1. 配置依赖 在使用 Flink CEP 组件之前,需要将 Flink CEP 的依赖库引入项目工程中。 <dependency> <groupId>org. (1)定义 复合事件处理(Complex Event Processing, CEP )是一种基于动态环境中事件流的 分析 技术,事件在这 里通常是有意义的状态变化,通过 分析 事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合 关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终 分析 得到更复杂的复合事件 (2)特征 CEP 的特征如下: 目标:从有序的简单事件流中发现一些高阶特征; 输入:一个或多个简单事
### 回答1: Flink checkpoint 是一种在 Flink 流处理程序中用于容错和重启的机制。在运行时, Flink 会定期将当前的状态和数据点快照存储在外部存储(如 HDFS)中,以便在发生故障时能够重新启动程序。这些快照称为 checkpoint。在重启后, Flink 会恢复到最近一次 checkpoint 的状态,并从该点继续处理数据。 ### 回答2: Flink 是一个分布式流处理框架,其能够处理大规模的数据流并保持准确。为了实现这一点, Flink 通过 checkpoint技术提供了容错能力。 Checkpoint是 Flink 中的一项重要技术。它通过定期保存作业的状态来保证作业的容错性。在出现故障或意外关闭节点时, Flink 可以从最近的 checkpoint中恢复作业状态。 Flink 检查点的工作流程如下: 1.触发checkpoint:Checkpoint操作是由coordinator控制的。当某个作业达到checkpoint的时间间隔时,coordinator会通知所有正在运行的方法进行checkpoint操作。 2.快照进程:当接收到checkpoint请求时,任务会调用快照进程,将状态存储在分布式文件系统中。此过程中,用户自定义状态将被存储在Chkpoint根路径下的“checkpoint_metadata”目录中,而系统状态将被存储在“chk-xxxxx”目录中。 3.确认存储:当所有任务都完成了检查点操作之后,coordinator就会确认这些数据已经在文件系统上完全存储。 4.分布式完成:一旦所有任务都已经完成了checkpoint操作并且协调者确认了数据存储,协调者将向所有节点发送分布式完成信号。 5.恢复作业:当作业故障时, Flink 会使用最近的checkpoint,并使用该checkpoint来恢复作业状态。同时,它还会重新启动作业并从源中获取任何未处理的数据,这样能保证数据的完整性。 通过使用 Flink 的checkpoint技术,您可以确保数据流作业的容错性,即使在出现故障时也可以恢复您的作业,而不会丢失你的数据。此外, Flink 还提供了多种配置可供您调整checkpoint的频率和存储位置。 ### 回答3: Flink Checkpoint是Apache Flink 框架中的一种机制,它可以保证数据处理的一致性和可靠性。Checkpoint记录了当前流式任务的状态和进度,并将其存储在分布式存储系统中,例如:HDFS或者S3,以便在任务出现故障时进行恢复。以下是 Flink Checkpoint的 原理 。 1. 核心 原理 Flink 的Checkpoint是基于分布式快照的机制,其核心 原理 是在任务执行过程中,定时使用快照来记录该任务的状态。将快照存储在分布式存储系统中,以便在任务异常时进行恢复。 2. Checkpoint的机制 在 Flink 中,Checkpoint是自动触发的,当一个任务完成一定数量的事件时,会自动执行一次Checkpoint操作。这个数量称为Checkpoint的阈值。Checkpoint会使任务暂停,所有的状态数据都会被序列化并写入快照中,并将该快照存储在分布式存储系统中。一旦Checkpoint操作完成,任务会再次启动,并从上一次Checkpoint的状态继续处理。 3. Checkpoint的类型 Flink 提供了两种Checkpoint类型:exactly-once和at-least-once。其中,exactly-once类型的Checkpoint可以保证数据处理的一致性和可靠性,但是对任务性能的影响较大;而at-least-once类型的Checkpoint虽然性能更好,但是会牺牲一些数据处理的一致性和可靠性。 4. Checkpoint的实现 在 Flink 中,Checkpoint是通过Checkpoint Coordinator来实现的。Checkpoint Coordinator是 Flink 的一个组件,它负责管理Checkpoint的触发、快照、存储和恢复。除此之外,Checkpoint Coordinator还负责监控所有参与Checkpoint的任务状态,如果任务异常则会从之前的快照中进行恢复。 5. Checkpoint的应用场景 Flink 的Checkpoint机制可以应用于各种流式任务场景中,例如:实时数据 分析 、流式ETL、实时指标计算等等。它可以保证任务处理的一致性与可靠性,让我们可以同时享受高性能与可靠性。