siddhi-实时计算CEP
siddhi
siddhi是一个流处理、CEP处理平台(事件驱动),可以与Java、Python集成,可以以微服务的方式在裸机、虚拟机、Docker和K8s上运行。
siddhi App
siddhi App 将实时的流处理逻辑定义,以类SQL的语法定义在.siddi文件中。文件内容包括,sources、sinks、stream、queries、table、functions、还有一些其他处理逻辑。
System Requirements
Mem:128MB(min)、500MB(recomended)、 如果更高的内存配置则需要配合内存数据存储和处理。
Cores:2+
JDK:8 or 11
Maven:3.0.4+
siddhi feature
1、类SQL语言
2、基于Web的编辑器
a、图形化拖拉拽
b、SQL编辑器
c、部署
2、CI/CD pipeline
a、siddhi 测试框架提供了单元测试、集成、黑盒测试整个完成的devOps工作流程
b、基于.siddhi 文件进行App版本管理
3、多种部署模式
a、Java、Python libraries
b、standalone 微服务模式运行在裸机、VM、Docker
c、基于Kubernetes 以微服务的方式进行分布式部署
4、Functionality
数据过滤、消费和发布events、数据转换、数据存储集成、数据聚合、服务集成和错误处理、规则处理、流上机器学习、分散采集和数据管道、快照和恢复
Siddhi architecture
文档链接
https:// siddhi.io/en/v5.1/docs/ siddhi-as-a-java-library/
CEP
The pattern is a state machine implementation that detects event occurrences from events arrived via one or more event streams over time. It can repetitively match patterns, count event occurrences, and use logical event ordering (using
and
,
or
, and
not
).
Pattern 是一种状态机的实现,旨在检测在某个时间范围内,一个或者多个事件流发生所引起的事件。该状态机可以重复匹配事件模式、统计事件发生次数、并且按照事件发生顺序进行逻辑运算(and、or、not)。
Pattern可以通过近实时地检测各种预定义的事件模式,来实现CEP。
1、语法关键字
->
表示事件发生顺序。 preceding event -> subsequent event
every
表示重复进行事件模式的状态机匹配。如果不使用该关键字,将只进行一次模式匹配。
within <time gap>
定义所有匹配事件发生的时间间隔
<<min count> : <max count>>
定义应该匹配的事件的最小和最大数量
n1:n2 匹配n1 到 n2个events
<n:> 匹配大于等于n个events
<:n> 匹配小于等于n个events
<n> 匹配n个events
and
表示两个不同事件的条件必须全部匹配
or
表示事件的匹配条件只需要满足其中一个即可
not <condition1> and <condition2>
表示检测匹配<condition2>的events (在任何event匹配<condition1>之前)
not <condition1> for <time period>
表示对于指定的<time period> 检测没有匹配<condition1>的事件
<event reference>
<event reference>.<attribute name>
where index
0
referring to the first event, and a special index
last
referring to the last available event in the collection.
e1[0].symbol : 表示e1的第一个事件的synbol属性值
e1[3].price:表示e1的第三个事件的price属性值
2、Non occurrence of events.
Siddhi detects non-occurrence of events using the
not
keyword, and its effective non-occurrence checking period is bounded either by fulfillment of a condition associated by
and
or via an expiry time using .
a、not 关键字 通常用于检测在某个时间段内未发生事件
b、and 关键字反之
3、Logical correlation of multiple conditions.
Siddhi can only logically correlate two conditions at a time using keywords such as
and
,
or
, and
not
. When more than two conditions need to be logically correlated, use multiple pattern queries in a chaining manner, at a time correlating two logical conditions and streaming the output to a downstream query to logically correlate the results with other logical conditions.
a、and 、or 、not 关键字目前只支持在一个时间范围内的两个条件关联;
b、对于多个条件的关联,可以通过多个CEP查询进行链式关联;
4、CEP 示例
a、发送告警:如果一个房间温度在10分钟之内增长超过5度;
from every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ] within 10 min select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStream;
b、查找两个温度调节器的温差;(定义两个流、)
define stream TempStream (deviceID long, roomNo int, temp double);
define stream RegulatorStream (deviceID long, roomNo int, tempSet double, isOn bool);
from every e1=RegulatorStream -> e2=TempStream[e1.roomNo==roomNo]<1:>
-> e3=RegulatorStream[e1.roomNo==roomNo] select e1.roomNo, e2[0].temp - e2[last].temp as tempDiff insert into TempDiffStream;
c、Query to send the
stop
control action to the regulator via
RegulatorActionStream
when the key is removed from the hotel room
当钥匙从房间删除的时候,则关闭空调
define stream RegulatorStateChangeStream(deviceID long, roomNo int, tempSet double, action string);
define stream RoomKeyStream(deviceID long, roomNo int, action string);
from every e1=RegulatorStateChangeStream[ action == 'on']-> e2=RoomKeyStream[ e1.roomNo == roomNo and action == 'removed'] or e3=RegulatorStateChangeStream[e1.roomNo == roomNo and action=='off'] select e1.roomNo,ifThenElse(e2 is null,'none','stop') as action having action != 'none' insert into RegulatorActionStream;
d、Query to generate alerts if the regulator gets switched off before the temperature reaches 12 degrees.
温度在达到12度之前,调节器关闭,则产生报警
define stream RegulatorStateChangeStream(deviceID long, roomNo int, tempSet double, action string);
define stream TempStream (deviceID long, roomNo int, temp double);
from every e1=RegulatorStateChangeStream[action == 'start']-> not TempStream[e1.roomNo == roomNo and temp <= 12] and e2=RegulatorStateChangeStream[e1.roomNo == roomNo and action == 'off'] select e1.roomNo as roomNo insert into AlertStream;
e、如果调节器打开5分钟之内,房价温度没有达到指定温度,则报警
define stream RegulatorStateChangeStream(deviceID long, roomNo int, tempSet double, action string);
define stream TempStream (deviceID long, roomNo int, temp double);
from e1=RegulatorStateChangeStream[action == 'start'] -> not TempStream[e1.roomNo == roomNo and temp <= e1.tempSet] for 5 min select e1.roomNo as roomNo insert into AlertStream;
Sequence
Sequence 也是一种状态机实现,旨在检测在某个事件范围内,连续事件的产生所发生的事件。约束条件:所有匹配的事件都需要连续到达,并且在匹配的事件序列之间,不应该出现未匹配的事件。
1、语法关键字
,
表示对于给定事件的下一个事件
其他的关键字同上CEP
2、Squence示例
a、Query to send alerts when temperature increases at least by one degree between two consecutive temperature events
from every e1=TempStream, e2=TempStream[temp > e1.temp + 1] select e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStream;
b、A query to identify a regulator activation event immediately followed by both temperature sensor and humidity sensor activation events in either order.
查询温度传感器和湿度传感器的激活事件
define stream TempStream(deviceID long, isActive bool);
define stream HumidStream(deviceID long, isActive bool);
define stream RegulatorStream(deviceID long, isOn bool);
from every e1=RegulatorStream[isOn == true], e2=TempStream and e3=HumidStream select e2.isActive as tempSensorActive, e3.isActive as humidSensorActive insert into StateNotificationStream;
总结:
简单看了一下siddhi 这个轻量级的流计算和CEP引擎。个人感觉至少对我目前的项目来说,可以用在两个方面:
1、在边缘侧进行配合其已经支持的各种类型的Source进行数据采集和边缘计算;
2、可以与分布式时序数据库存储集成提供进行轻量级流式处理和CEP的能力,因为其他框架比如flink、storm、sparkStreaming确实太重了。
基于完整功能的分布式时序数据库,提供边缘侧完整的数据采集、数据存储和计算的能力。
好吧!最近总是想抽时间写一个时序数据库的专题、数据结构与算法专题。但是,确实事情比较多。哎,明天把sihhdi编译一下,看看代码怎么搞吧!