CEP
# 概念
在大数据分析领域,有一类复杂事件,就是多个事件的组合。他们不仅有先后顺序,有时也有时间约束。
使用 SQL 和 DataStream API 很难完成,即使是使用 process function 我们的复杂度也十分高。
因此 Flink 提供了专门处理复杂事件的库:CEP。让我们可以轻松解决这种棘手问题。
CEP 其实就是 Complex Event Processing 缩写,Flink CEP 就是专门处理复杂事件的库。
CEP 流程可以分为三个步骤:
定义一个匹配规则。
我们称之为模式,Pattern。主要分为两步:
- 每个简单事件的特征。
- 简单事件的组合关系。
当然,也可更复杂,比如一个事件之后是否跳过、一个匹配后面是否跳过后面的匹配等……
将匹配规则应用到事件流上,检测满足条件的复杂事件。
对复杂事件进行处理,得到结果进行输出。
CEP 主要用于实时流分析处理,可以在复杂的、看似不相关的事件流找出有意义的组合。可以做风险控制、用户画像、运维监控等。
很多大数据框架都有 CEP 解决方案,但是没有专门的库。可以说 Flink CEP 是当前最佳解决方案。
# 起步
增加依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
案例
假设一个场景:一个用户三次登陆失败,则需要报警。
那么我们应该首先定义一个 POJO 类:
@Data @NoArgsConstructor @AllArgsConstructor public class LoginEvent { private String userId; private String ipAddress; private String eventType; private Long timestamp; }
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); KeyedStream<LoginEvent, String> stream = env .fromElements( new LoginEvent("user1", "192.168.0.1", "fail", 2000L), new LoginEvent("user1", "192.168.0.2", "fail", 3000L), new LoginEvent("user2", "192.168.0.2", "fail", 4000L), new LoginEvent("user1", "192.168.0.1", "fail", 5000L), new LoginEvent("user1", "192.168.0.1", "fail", 6000L), new LoginEvent("user1", "192.168.0.1", "fail", 7000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<LoginEvent>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() { @Override public long extractTimestamp(LoginEvent loginEvent, long l) { return loginEvent.getTimestamp(); } }) ) .keyBy(LoginEvent::getUserId); // 1. 定义 Pattern,监测连续三个登录失败事件 Pattern<LoginEvent, LoginEvent> pattern = Pattern .<LoginEvent>begin("first") .where(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent) throws Exception { return "fail".equals(loginEvent.getEventType()); } }) .next("second") .where(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent) throws Exception { return "fail".equals(loginEvent.getEventType()); } }) .next("third") .where(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent) throws Exception { return "fail".equals(loginEvent.getEventType()); } }); // 2. Pattern 应用到流上,匹配复杂事件,得到 PatternStream PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern); // 3. 复杂事件筛选,然后包装为字符串报警输出 patternStream.select(new PatternSelectFunction<LoginEvent, String>() { @Override public String select(Map<String, List<LoginEvent>> map) throws Exception { LoginEvent first = map.get("first").get(0); LoginEvent second = map.get("second").get(0); LoginEvent third = map.get("third").get(0); return String.format( "%s 连续三次登录失败,时间为:%s %s %s", first.getUserId(), first.getTimestamp(), second.getTimestamp(), third.getTimestamp() ); } }); env.execute();
# Pattern API
Flink CEP 的核心是复杂事件的模式匹配,Flink CEP 库提供了 Pattern 类,基于它可以调用一系列方法来定义匹配模式,这就是所谓的模式 API,Pattern API。
# 个体模式
Pattern 其实就是将一组简单事件组合成复杂事件的匹配规则,由于流中的事件是有先后顺序的,因此一个匹配规则就可以表达为先后发生的一个个简单事件,串联在一起。
这里的每个简单事件也不是随便选取的,也是要遵循一定的规则,所以我们将每个简单事件的匹配规则叫做个体模式。
每个个体模式都是由连接词开始定义的,比如 begine、next 等。这是 Pattern 的一个方法,返回的是一个 Pattern。
pattern
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getEventType());
}
})
上面的代码中,这就是一个个体模式。它通过一个 where 条件来将事件进行过滤选择。个体事件也可以接受多个事件,就是量词 quantifier。
量词 quantifier
量词,跟在个体模式后面,可以用于指定循环的次数。从这个角度来看,个体模式可以包括单例模式和循环模式。默认情况下,个体模式是单例模式,但是有了量词之后,可以转换为循环模式。
比如我们之前说的,匹配的事件出现多次,就可以采用量词,而不必一次性写三个相同的个体模式。
oneOrMore()
:个体模式 a 出现一次或多次,我们表示为 a+。times(times)
:个体模式出现 times 次,例如 a.times(3) 为 a a a。times(fromTimes, toTimes)
:指定匹配事件出现的次数范围,最小为 fromTimes 次,最大为 toTimes 次。greedy()
:贪心匹配,在循环模式之后,总是尽可能多的去匹配。比如a.times(2, 4).greedy()
,它会匹配aaaa
。optional()
:可选匹配,在循环模式之后,表示可以满足也可以不满足。
那么连续三次登录失败的事件就可以表示为:
Pattern<LoginEvent, LoginEvent> pattern = Pattern
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getEventType());
}
})
.times(3);
条件 conditions
对于每个个体模式,核心在于匹配条件,也就是 where 的条件。其中条件分为:
限定子类型
增加类型限制,例如:
pattern.subtype(LoginEvent.class)
简单条件
最简单的匹配,直接使用
SimpleCondition
即可,例如:pattern.where(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent) throws Exception { return loginEvent.getUserId().startsWith("A"); } });
迭代条件
我们需要当前事件和之前的事件作对比,那么这就叫做迭代条件。
Flink CEP 提供了 IterativeCondition 抽象类,其中需要一个 filter 方法,给到两个参数:当前事件、上下文。
pattern .oneOrMore() .where(new IterativeCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception { // 可以直接写简单条件 if (!loginEvent.getUserId().startsWith("A")) { return false; } // context.getEventsForPattern("middle") 获取当前模式之前,已经匹配到的事件(我们之前已经命名为 middle) int count = 1; for (LoginEvent event : context.getEventsForPattern("middle")) { count = event.getUserId().startsWith("A") ? count + 1 : count; } return count > 100; } });
组合条件
组合条件,无非就是 and 和 or,假如是 and,可以在条件之后再接一个条件,假如是 or,可以在条件后先接一个 or。
pattern .oneOrMore() .subtype(LoginEvent.class) .or(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent) throws Exception { return loginEvent.getUserId().startsWith("A"); } });
终止条件
针对于循环模式,顾名思义,遇到某个特定事件之后就不再继续循环匹配。
- 终止条件只能和
oneOrMore()
、oneOrMore().optional()
结合使用。 - 终止条件需要调用
.until()
实现,传入一个IterativeCondition
为参数。
pattern .oneOrMore() .until(new IterativeCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception { return loginEvent.getUserId().startsWith("A"); } });
- 终止条件只能和
# 组合模式
个体模式按照一定顺序进行链接就是组合模式,也叫做模式序列。
可以使用连接词(例如 begin、next、followedBy)等串联,每个连接词之后得到的都是 Pattern 对象。
初始模式
每个组合模式都必须要有一个初始模式开头,必须通过 Pattern.<T>begin("str")
来创建。
近邻条件
有了初始模式之后,我们就可以追加模式,组合模式序列了。模式之间组合使用连接词实现,这些连接词组成近邻条件,有三种近邻关系:
- 严格近邻:必须按照顺序一个个出现,中间不会有其他任何事件,对应
.next()
连接词。 - 宽松近邻:只关心事件发生顺序,中间是否有其他事件不关心,对应
.followedBy()
连接词。 - 非确定性宽松临近:可以重复使用之前已经匹配过的事件,所以可以使用同一个事件作为开始,匹配结果宽松很多。对应
followedByAny()
。
其他限制
注意,因为宽松近邻和非确定性宽松近邻没有中间的事件限制,所以严格来说,我们不能保证之后没有这种数据,那就会一直监测,这显然不是我们希望的。
Flink CEP 针对这种情况,可以给模式加一个时间限制,通过 .within()
就可以实现,此方法位置不限,并且多次调用取最小的时间间隔为准。
Pattern<LoginEvent, LoginEvent> pattern = Pattern
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getEventType());
}
})
.followedBy("second")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.getUserId().startsWith("A");
}
})
.within(Time.seconds(10));
对于使用了 oneOrMore()
、times()
的量词的循环模式来说,默认采用的是宽松临近,相当于使用 followedBy()
链接,这显然不是我们想要看到的。
我们可以通过 consecutive()
来改变,它会保证所有匹配事件是严格连续的。
Pattern
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getEventType());
}
})
.times(3)
.consecutive();
另外也可以调用 allowCombinations()
,它可以取得和 followedByAny()
相同的效果。
Pattern
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getEventType());
}
})
.times(3)
.allowCombinations();
# 模式组
之前的模式序列现在可以被连接了,多个模式序列就可以形成模式组。在模式组中,每个模式序列就被当做某阶段的匹配条件,返回 GroupPattern。
而 GroupPattern 是 Pattern 的自雷,所以量词和方法一般也可以在模式组中使用。
Pattern<LoginEvent, LoginEvent> group1 = Pattern
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getEventType());
}
})
.times(3)
.allowCombinations();
Pattern<LoginEvent, LoginEvent> group2 = Pattern
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getEventType());
}
})
.times(3)
.consecutive();
Pattern<LoginEvent, LoginEvent> group3 = Pattern
.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getEventType());
}
})
.times(3);
GroupPattern<LoginEvent, LoginEvent> groupPattern = Pattern.begin(group1).next(group2).followedBy(group3);
# 跳过匹配
跳过匹配是对于事件的处理方式,我们直接举例。
假设我们要使用宽松近邻模式,检测 a 事件之后的 b 事件算一个组合事件,写法为 a + followedBy + b
。
假如 a 重复多次,再跟上 b,比如 a1 a2 a3 b
,那么我们的跳过匹配策略有:
不跳过 NO_SKIP
(a1 b)、(a1 a2 b)、(a1 a2 a3 b) (a2 b)、(a2 a3 b) (a3 b)
跳至下一个 SKIP_TO_NEXT
(a1 a2 a3 b)、(a2 a3 b)、(a3 b)
每个事件作为开头只会出现一次,这个效果等同于使用
.greedy()
跳过所有子匹配 SKIP_PAST_LAST_EVENT
(a1 a2 a3 b)
跳至第一个 SKIP_TO_FIRST[a],也就是只留下 a1 的匹配
(a1 a2 a3 b)、(a1 a2 b)、(a1 b)
调至最后一个 SKIP_TP_LAST[a],也就是只留下 a3 的匹配
(a1 a2 a3 b)、(a3 b)
具体使用,可以再 Pattern 的初始模式中,在 begine 的第二个参数传入:
Pattern
.<LoginEvent>begin("first", AfterMatchSkipStrategy.noSkip())
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getEventType());
}
})
.times(3)
.allowCombinations();
# 模式检测处理
# 处理匹配事件
有了 pattern 和 stream 之后,就可以将 pattern 应用到 stream 上了,简单使用:
PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
patternStream.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> map) throws Exception {
// 注意,得到的是一个 list
LoginEvent event = map.get("first").get(0);
return Objects.isNull(event) ? null : event.getUserId();
}
});
pattern stream 还有一个 flatSelect,顾名思义,是扁平化处理,它没有返回值,而是多了一个 collector 输出数据
PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {
@Override
public void flatSelect(Map<String, List<LoginEvent>> map, Collector<String> collector) throws Exception {
LoginEvent event = map.get("first").get(0);
if (Objects.isNull(event)) {
return;
}
collector.collect(event.getUserId());
collector.collect(event.getEventType());
collector.collect(event.getIpAddress());
}
});
在 1.8 之后,CEP 有了匹配事件的通用处理方式,就是直接调用 process()
,传入 PatternProcessFunction,可以访问一个上下文,进行更多操作。
PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
long processingTime = context.currentProcessingTime();
LoginEvent event = map.get("first").get(0);
if (Objects.isNull(event)) {
return;
}
collector.collect(event.getUserId() + processingTime);
}
});
# 处理超时事件和迟到数据
超时事件到侧输出流
实现接口 TimedOutPartitialMatchHandler,将超时的匹配事件放到一个 map 中,作为方法参数,将 context 上下文作为第二个参数。
class CustomerPatternProcessFunction extends PatternProcessFunction<LoginEvent, String> implements TimedOutPartialMatchHandler<LoginEvent> {
// 正常事件处理
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
}
// 超时事件处理
@Override
public void processTimedOutMatch(Map<String, List<LoginEvent>> map, Context context) throws Exception {
LoginEvent event = map.get("first").get(0);
OutputTag<LoginEvent> outputTag = new OutputTag<>("time-out");
context.output(outputTag, event);
}
}
迟到数据到侧输出流
默认情况下,迟到数据直接丢弃,不过 CEP 同样提供了将迟到数据输出到侧输出流的方式。
OutputTag<LoginEvent> lateDataTag = new OutputTag<>("late-data");
SingleOutputStreamOperator<String> result = patternStream
// 迟到数据输出到侧输出流
.sideOutputLateData(lateDataTag)
.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
long processingTime = context.currentProcessingTime();
LoginEvent event = map.get("first").get(0);
if (Objects.isNull(event)) {
return;
}
collector.collect(event.getUserId() + processingTime);
}
});
// 提取侧输出流
result.getSideOutput(lateDataTag);