FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库。
它允许你在无界的事件流中检测事件模式,让你有机会掌握数据中重要的事项。
Flink CEP 首先需要用户创建定义一个个pattern,然后通过链表将由前后逻辑关系的pattern串在一起,构成模式匹配的逻辑表达。
可以应用的场景: 直播平台异常检测(扫X), 顺风车路径异常检测(XD) 等等.....
Maven坐标
我下的是1.9.1
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.9.1</version>
</dependency>
需要注意的是: 应用模式匹配的DataStream中的事件必须实现正确的equals()和hashCode()方法,因为FlinkCEP使用它们来比较和匹配事件。
入门 demo
场景: 用户登录, 在整个模式匹配的规则在5秒内,如果连续两次登录失败,则发出警告。。
LoginEvent
package com.ronnie.flink.demo.cep;
public class LoginEvent {
private String userId;//用户ID
private String ip;//登录IP
private String type;//登录类型
public LoginEvent() {
public LoginEvent(String userId, String ip, String type) {
this.userId = userId;
this.ip = ip;
this.type = type;
public String getUserId() {
return userId;
public void setUserId(String userId) {
this.userId = userId;
public String getIp() {
return ip;
public void setIp(String ip) {
this.ip = ip;
public String getType() {
return type;
public void setType(String type) {
this.type = type;
@Override
public String toString() {
return "LoginEvent{" +
"userId='" + userId + '\'' +
", ip='" + ip + '\'' +
", type='" + type + '\'' +
public LoginWarning(String userId, String type, String ip) {
this.userId = userId;
this.type = type;
this.ip = ip;
@Override
public String toString() {
return "LoginWarning{" +
"userId='" + userId + '\'' +
", type='" + type + '\'' +
", ip='" + ip + '\'' +
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
* 整个模式匹配的规则在5秒内,如果连续两次登录失败,则发出警告。。
public class LoginFailWarningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource loginEventStream = env.fromCollection(Arrays.asList(
new LoginEvent("1", "192.168.0.1", "fail"),
new LoginEvent("1", "192.168.0.2", "fail"),
new LoginEvent("1", "192.168.0.3", "fail"),
new LoginEvent("2", "192.168.10,10", "fail"),
new LoginEvent("2", "192.168.10,10", "success")
// 开启一个模式匹配规则
Pattern<LoginEvent, LoginEvent> begin = Pattern.begin("begin");
// 模式匹配的条件
Pattern<LoginEvent, LoginEvent> p1 = begin.where(new IterativeCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
// 过滤掉非fail的, 只返回fail的
return loginEvent.getType().equals("fail");
// 追加一个新的模式。 匹配事件必须直接跟着先前的匹配事件(严格连续性)
Pattern<LoginEvent, LoginEvent> next = p1.next("next");
// 新模式的匹配条件
Pattern<LoginEvent, LoginEvent> p2 = next.where(new IterativeCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
// 第二次匹配
return loginEvent.getType().equals("fail");
// 定义事件序列进行模式匹配的最大间隔。如果未完成的事件序列超过此时间, 则将其丢弃:
Pattern<LoginEvent, LoginEvent> p3 = p2.within(Time.seconds(5)); // 注意导的是窗口的时间
PatternStream patternStream = CEP.pattern(loginEventStream.keyBy("userId"), p3);
patternStream.select(new PatternSelectFunction<LoginEvent, LoginWarning>() {
@Override
public LoginWarning select(Map<String, List<LoginEvent>> pattern) throws Exception {
List<LoginEvent> begin01 = pattern.get("begin");
System.out.println("------- Begin List ------");
begin01.stream().forEach(System.out::println);
System.out.println("------- next list ------");
List<LoginEvent> next01 = pattern.get("next");
next01.stream().forEach(System.out::println);
return new LoginWarning(next01.get(0).getUserId(), next01.get(0).getType(), next01.get(0).getIp());
loginEventStream.printToErr();
env.execute();
Pattern API允许你定义要从输入流中提取的复杂模式序列。
每个复杂模式序列都是由多个简单模式组成,即寻找具有相同属性的单个事件的模式。
可以将模式序列视为此类模式的结构图,基于用户指定的条件从一个模式转换到下一个模式,例如, event.getName().equals("start")。
匹配是一系列输入事件,通过一系列有效的模式转换访问复杂模式图中的所有模式。
每个模式必须具有唯一的名称, 以便后续可以使用该名称来标识匹配的事件。
注意模式名称不能包含字符“:”
Pattern可以是单单个,也可以是循环模式。
单个模式接受单个事件,而循环模式可以接受多个事件。
在模式匹配符号中,模式“a b + c?d”(或“a”,后跟一个或多个“b”,可选地后跟“c”,后跟“d”),a,c ?,和d是单例模式,而b +是循环模式。 [吐槽一下: 搞得跟正则似的]
默认情况下,模式是单个模式,可以使用Quantifiers将其转换为循环模式。
每个模式可以有一个或多个条件,基于它接受事件。
1.1. Quantifiers
在FlinkCEP中,您可以使用以下方法指定循环模式:pattern.oneOrMore(),用于期望一个或多个事件发生的模式(例如之前提到的b +);
pattern.times(#ofTimes), 用于期望给定类型事件的特定出现次数的模式
patterntimes(#fromTimes,#toTimes),用于期望给定类型事件的最小出现次数和最大出现次数的模式
可以使用pattern.greedy()方法使循环模式变得贪婪,但是还不能使组模式变得贪婪。
可以使用pattern.optional()方法使得所有模式,循环与否,变为可选。
```java
// expecting 4 occurrences 出现4次
start.times(4);
// expecting 0 or 4 occurrences 出现 0 或 4 次
start.times(4).optional();
// expecting 2, 3 or 4 occurrences 出现 2, 3, 4 次
start.times(2, 4);
// expecting 2, 3 or 4 occurrences and repeating as many as possible
// 出现 2, 3, 4次 并重复尽可能多次
start.times(2, 4).greedy();
// expecting 0, 2, 3 or 4 occurrences 出现0, 2, 3, 4次
start.times(2, 4).optional();
// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy();
// expecting 1 or more occurrences 出现一次或多次
start.oneOrMore();
// expecting 1 or more occurrences and repeating as many as possible
// 出现一次或多次 并 重复多次
start.oneOrMore().greedy();
// expecting 0 or more occurrences 出现0或多次
start.oneOrMore().optional();
// expecting 0 or more occurrences and repeating as many as possible
// 出现 0 或 多次并重复尽可能多次
start.oneOrMore().optional().greedy();
// expecting 2 or more occurrences 出现两次或多次
start.timesOrMore(2);
// expecting 2 or more occurrences and repeating as many as possible
// 出现两次或多次并尽可能重复多次
start.timesOrMore(2).greedy();
// expecting 0, 2 or more occurrences and repeating as many as possible
//出现0, 2或多次并重复尽可能多次
start.timesOrMore(2).optional().greedy();
1.2. Conditions
在每个模式中,从一个模式转到下一个模式,可以指定其他条件。您可以将使用下面这些条件:
传入事件的属性,例如其值应大于5,或大于先前接受的事件的平均值。
匹配事件的连续性,例如检测模式a,b,c,序列中间不能有任何非匹配事件。
1.3. Conditions on Properties
可以通过pattern.where(),pattern.or()或pattern.until()方法指定事件属性的条件。 条件可以是IterativeConditions或SimpleConditions。
迭代条件:
可以指定一个条件,该条件基于先前接受的事件的属性或其子集的统计信息来接受后续事件。
简单条件:
这种类型的条件扩展了前面提到的IterativeCondition类,并且仅根据事件本身的属性决定是否接受事件。
还可以通过pattern.subtype(subClass)方法将接受事件的类型限制为初始事件类型的子类型。
组合条件:
可以将子类型条件与其他条件组合使用。 这适用于所有条件。 您可以通过顺序调用where()来任意组合条件。
最终结果将是各个条件的结果的逻辑AND。 要使用OR组合条件,可以使用or()方法
停止条件:
在循环模式(oneOrMore()和oneOrMore().optional())的情况下,还可以指定停止条件,例如: 接受值大于5的事件,直到值的总和小于50。
连续事件条件:
FlinkCEP支持事件之间以下形式进行连续:
严格连续性:希望所有匹配事件一个接一个地出现,中间没有任何不匹配的事件。
宽松连续性:忽略匹配的事件之间出现的不匹配事件。 不能忽略两个事件之间的匹配事件。
非确定性轻松连续性:进一步放宽连续性,允许忽略某些匹配事件的其他匹配。
对于循环模式(例如oneOrMore()和times()),默认是宽松的连续性。
想要严格的连续性,必须使用consecutive()显式指定它。
如果你想要非确定性的松弛连续性,你可以使用allowCombinations()方法。
单个循环模式中的连续性,并且需要在该上下文中理解consecutive()和allowCombinations()。
1.4 API 简介
where(condition)
- 定义当前模式的条件。 为了匹配模式,事件必须满足条件。 多个连续的where(),其条件为AND:
or(condition)
添加与现有条件进行OR运算的新条件。 只有在至少通过其中一个条件时,事件才能匹配该模式:
until(condition)
指定循环模式的停止条件。 意味着如果匹配给定条件的事件发生,则不再接受该模式中的事件。
仅适用于oneOrMore()
注意:它允许在基于事件的条件下清除相应模式的状态。
subtype(subClass)
定义当前模式的子类型条件。 如果事件属于此子类型,则事件只能匹配该模式:
pattern.subtype(SubEvent.class);
oneOrMore()
指定此模式至少发生一次匹配事件。
默认情况下, 使用宽松的内部连续性。
注意点: 建议使用until() 或 within() 来启用状态清除
pattern.oneOrMore().until(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // alternative condition 可替代的条件
timesOrMore(#times)
指定此模式至少需要 #times 次出现匹配事件。
默认情况下, 使用宽松的内部连续性(在后续事件之间)。
pattern.timesOrMore(2);
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
如果不应用,则使用宽松的连续性(如followBy())。
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
notNext() 如果不希望一个事件类型紧接着另一个类型出现。 notFollowedBy() 不希望两个事件之间任何地方出现该事件。
注意 模式序列不能以notFollowedBy()结束。
注意 NOT模式前面不能有可选模式
// strict contiguity 强连续性
Pattern<Event, ?> strict = start.next("middle").where(...);
// relaxed contiguity 松连续性
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// non-deterministic relaxed contiguity 不可确认的松连续性
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// NOT pattern with strict contiguity 非模式强连续性
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// NOT pattern with relaxed contiguity 非模式松连续性
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
宽松连续性指的是仅第一个成功匹配的事件会被匹配到,然而非确定性宽松连续性,相同的开始会有多个匹配结果发出。距离,如果一个模式是"a b",给定输入序列是"a c b1 b2"。对于不同连续性会有不同输出。
a和b之间严格连续性,将会返回{},也即是没有匹配。因为c的出现导致a,被抛弃了。
a和b之间宽松连续性,返回的是{a,b1},因为宽松连续性将会抛弃为匹配成功的元素,直至匹配到下一个要匹配的事件
a和b之间非确定性宽松连续性,返回的是{a,b1},{a,b2}。
可以为begin,followBy,followByAny和next定义一个模式序列作为条件。
模式序列将被逻辑地视为匹配条件,而且将返回GroupPattern并且 可对GroupPattern使用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),consecutive(), allowCombinations()等方法。
PatternPatte <Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
// strict contiguity
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
Pattern<Event, ?> start = Pattern.<Event>begin(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
next(#name)
追加一个新的模式。匹配事件必须直接跟着先前的匹配事件(严格连续性):
Pattern<Event, ?> next = start.next("middle");
next(#pattern_sequence)
追加一个新的模式。匹配事件必须直接接着先前的匹配事件(严格连续性):
Pattern<Event, ?> next = start.next(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
followedBy(#name)
追加加新模式。 匹配事件和先前匹配事件(宽松连续)之间可能发生其他非匹配事件:
Pattern<Event, ?> followedBy = start.followedBy("middle")
followedBy(#pattern_sequence)
追加新模式。 匹配事件和先前匹配事件(宽松连续)之间可能发生其他非匹配事件:
Pattern<Event, ?> followedBy = start.followedBy(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
followedByAny(#name)
添加新模式。 匹配事件和先前匹配事件之间可能发生其他事件,并且将针对每个备选匹配事件(非确定性放松连续性)呈现替代匹配:
Pattern<Event, ?> followedByAny = start.followedByAny("middle");
followedByAny(#pattern_sequence)
添加新模式。 匹配事件和先前匹配事件之间可能发生其他事件,并且将针对每个备选匹配事件(非确定性放松连续性)呈现替代匹配:
Pattern<Event, ?> followedByAny = start.followedByAny(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
notNext()
添加新的否定模式。 匹配(否定)事件必须直接跟着先前的匹配事件(严格连续性)才能丢弃部分匹配:
Pattern<Event, ?> notNext = start.notNext("not");
notFollowedBy()
追加一个新的否定模式匹配。即使在匹配(否定)事件和先前匹配事件(宽松连续性)之间发生其他事件,也将丢弃部分匹配事件序列:
Pattern<Event, ?> notFollowedBy = start.notFollowedBy("not");
within(time)
定义事件序列进行模式匹配的最大时间间隔。 如果未完成的事件序列超过此时间,则将其丢弃:
pattern.within(Time.seconds(10));
给定输入流 input,模式 pattern 和可选的比较器 comparator,用于在EventTime的情况下对具有相同时间戳的事件进行排序或在同一时刻到达,通过调用以下命令创建PatternStream
DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
根据实际情况,创建的流可以是有key,也可以是无key的。
注意点: 在无key的流上使用模式,将导致job的并行度为1。
class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
@Override
public OUT select(Map<String, List<IN>> pattern) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
return new OUT(startEvent, endEvent);
PatternFlatSelectFunction类似于PatternSelectFunction,唯一的区别是它可以返回任意数量的结果。 为此,select方法有一个额外的Collector参数,用于将输出元素向下游转发。
class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
@Override
public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
for (int i = 0; i < startEvent.getValue(); i++ ) {
collector.collect(new OUT(startEvent, endEvent));
处理超时部分模式
每当模式具有通过within关键字附加的时间窗口长度时,部分事件序列可能因为超出时间窗口长度而被丢弃。
为了对这些超时的部分匹配作出相应的处理,select和flatSelect API调用允许指定超时处理程序。
超时处理程序接收到目前为止由模式匹配的所有事件,以及检测到超时时的时间戳。
为了处理部分模式,select和flatSelect API提供了一个带参数的重载版本
PatternTimeoutFunction/ PatternFlatTimeoutFunction。
OutputTag 超时的匹配将会在其中返回。
PatternSelectFunction / PatternFlatSelectFunction。
PatternStreamPatte <Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
outputTag,
new PatternSelectFunction<Event, ComplexEvent>() {...}
DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);
SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
outputTag,
new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);