数栈对于 Flink CEP 规则热更新的扩展
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(
new SimpleCondition<Event>() {
@Override
public boolean filter (Event event) {
return event.getId() == 42;
}
)
.next("middle").where(
new SimpleCondition<Event> {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).next("end" ).where (
new SimpleCondition<Event>() {
@Override
public boolean filter (Event event) {
return event.getName().equals("end");
}
}
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Double> result = patternStream.process (
new PatternProcessFunction<Event, Double>() {
@Override
public void processMatch(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Double> out) throws Exception {
Double total = 0d;
for (Map.Entry<String, List<Event>> entry : pattern.entrySet()) ‹
total = total + entry.getValue().get(0).getVolume();
}
out.collect(total);
});
CREATE TABLE source (
id INT,
name VARCHAR,
volume DOUBLE
procTime AS PROCTIME ()
) WITH (
'connector' = 'kafka',
'topic' = 'dtstack',
'properties.group.id' = 'dtstack',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'format'= 'json'
)
SELECT total
FROM source
MATCH_RECOGNIZE (
ORDER BY procTime
MEASURES
A. volume + B.volume + C.volume as total
ONE ROW PER MATCH
PATTERN (A B C)
DEFINE
A AS A. id == 42,
B AS B. volume >= 10.0
C AS C. name == 'end'
)
DataStream<Event> input = ...;
PatternStream<Double> output = CEP.dynamicPatterns(
input,
new JDBCPeriodicPatternProcessorDiscovererFactory<>(
JDBC_URL,
JDBC_DRIVE,
TABLE_NAME,
null,
JDBC_INTERVAL_MILLIS),
TimeBehaviour.ProcessingTime,
TypeInformation.of(new TypeHint<Double>(){})
)
CREATE TABLE source (
id INT,
name VARCHAR,
volume DOUBLE
procTime AS PROCTIME ()
) WITH (
'connector' = 'kafka',
'topic' = 'dtstack',
'properties.group.id' = 'dtstack',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'format'= 'json'
)
SELECT total
FROM source
DYNAMIC MATCH_RECOGNIZE (
ORDER BY procTime
OUTPUT (total double)
WITH_PATTERN (
'tableName' = 'dynamic_cep',
'user' = 'dtstack',
'password' = '***',
'driver' = 'com.mysql.cj jdbc.Driver',
'jdbcUrl' = 'jdbc:mysql://127.0.0.1:3306/cep',
'jdbcIntervalMillis' = '1000'
)
)
AS T;
{
"name": "rule",
"type": "COMPOSITE",
"edges": [{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
},{
"source": "middle",
"target": "end",
"type": "SKIP_TILL_NEXT"
}],
"nodes": [{
"name": "start",
"condition": {
"expression": "id == 45",
"type": "AVIATOR"
},
"type": "ATOMIC"
},
{
"name": "middle",
"condition": {
"expression": "volume >= 10.0",
"type": "AVIATOR"
},
"type": "ATOMIC"
}, {
"name": "end",
"condition": {
"expression": "name == 'end'",
"type": "AVIATOR"
},
"type": "ATOMIC"
}
]...
}
《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack