数栈对于 Flink CEP 规则热更新的扩展
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = Pattern.<Event>begin("start") .where( new SimpleCondition<Event>() { @Override public boolean filter (Event event) { return event.getId() == 42; } ) .next("middle").where( new SimpleCondition<Event> { @Override public boolean filter(SubEvent subEvent) { return subEvent.getVolume() >= 10.0; } } ).next("end" ).where ( new SimpleCondition<Event>() { @Override public boolean filter (Event event) { return event.getName().equals("end"); } } );
PatternStream<Event> patternStream = CEP.pattern(input, pattern);DataStream<Double> result = patternStream.process ( new PatternProcessFunction<Event, Double>() { @Override public void processMatch( Map<String, List<Event>> pattern, Context ctx, Collector<Double> out) throws Exception { Double total = 0d; for (Map.Entry<String, List<Event>> entry : pattern.entrySet()) ‹ total = total + entry.getValue().get(0).getVolume(); } out.collect(total); });CREATE TABLE source (id INT,name VARCHAR,volume DOUBLEprocTime AS PROCTIME ()) WITH ('connector' = 'kafka','topic' = 'dtstack','properties.group.id' = 'dtstack','scan.startup.mode' = 'latest-offset','properties.bootstrap.servers' = '127.0.0.1:9092','format'= 'json')SELECT totalFROM sourceMATCH_RECOGNIZE (ORDER BY procTimeMEASURESA. volume + B.volume + C.volume as totalONE ROW PER MATCHPATTERN (A B C)DEFINEA AS A. id == 42,B AS B. volume >= 10.0C AS C. name == 'end')
DataStream<Event> input = ...;
PatternStream<Double> output = CEP.dynamicPatterns( input, new JDBCPeriodicPatternProcessorDiscovererFactory<>( JDBC_URL, JDBC_DRIVE, TABLE_NAME, null, JDBC_INTERVAL_MILLIS), TimeBehaviour.ProcessingTime, TypeInformation.of(new TypeHint<Double>(){}))CREATE TABLE source ( id INT, name VARCHAR, volume DOUBLE procTime AS PROCTIME ()) WITH ( 'connector' = 'kafka', 'topic' = 'dtstack', 'properties.group.id' = 'dtstack', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'format'= 'json')
SELECT totalFROM source DYNAMIC MATCH_RECOGNIZE ( ORDER BY procTime OUTPUT (total double) WITH_PATTERN ( 'tableName' = 'dynamic_cep', 'user' = 'dtstack', 'password' = '***', 'driver' = 'com.mysql.cj jdbc.Driver', 'jdbcUrl' = 'jdbc:mysql://127.0.0.1:3306/cep', 'jdbcIntervalMillis' = '1000' ) )AS T;{ "name": "rule", "type": "COMPOSITE", "edges": [{ "source": "start", "target": "middle", "type": "SKIP_TILL_NEXT" },{ "source": "middle", "target": "end", "type": "SKIP_TILL_NEXT" }], "nodes": [{ "name": "start", "condition": { "expression": "id == 45", "type": "AVIATOR" }, "type": "ATOMIC" }, { "name": "middle", "condition": { "expression": "volume >= 10.0", "type": "AVIATOR" }, "type": "ATOMIC" }, { "name": "end", "condition": { "expression": "name == 'end'", "type": "AVIATOR" }, "type": "ATOMIC" } ]... }《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack