在风险控制或者模式匹配等场景下,用户经常会想要在模式仍然能够提供服务的情况下,改变事件需要匹配的模式。在目前的 Flink CEP 中,一个 CEP 算子有一个固定的 CEP 模式,不支持改变。因此,为了达到上述目的,用户必须重启 Flink 作业,并等待相对较长的更新时间。
另一种常见情况是,一个事件流需要与多个模式匹配。虽然当前的 Flink CEP 不支持在一个 CEP 运算符中匹配多个模式,但用户必须为每个模式设置一个 Flink 作业或一个运算符。这可能会浪费内存和计算资源。为此我们支持了动态加载规则,支持任务不重启Flink 作用动态的匹配规则。
风险控制:例如,检测用户行为模式,如果发现异常行为(如短时间内多次登录失败),则可以触发相应的风险控制措施。
用户画像:通过分析用户的行为事件流,可以构建用户画像,进而实现精准营销。
运维监控:在企业服务的运维管理中,CEP 可以用来配置复杂的监控规则,以实现对服务状态的实时监控。
{ "afterMatchStrategy": { "type": "NO_SKIP" }, "edges": [ { "source": "middle", "target": "end", "type": "STRICT" }, { "source": "start", "target": "middle", "type": "SKIP_TILL_NEXT" } ], "name": "end", "nodes": [ { "condition": { "expression": "action == 2", "type": "AVIATOR" }, "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ] }, "type": "ATOMIC" }, { "condition": { "expression": "action == 1", "type": "AVIATOR" }, "name": "middle", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ] }, "type": "ATOMIC" }, { "condition": { "expression": "action == 0", "type": "AVIATOR" }, "name": "start", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ] }, "type": "ATOMIC" } ], "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ] }, "type": "COMPOSITE", "version": 1, "window": null}ADD JAR WITH /data/sftp/11_dynamic-cep-jar-1_dynamic-cep-jar-1.0-SNAPSHOT.jar AS functions.jar;
CREATE TABLE source ( id INT, name VARCHAR, productionId INT, action INT, eventTime BIGINT, procTime AS PROCTIME()) WITH ( 'connector' = 'kafka-x', 'topic' = 'stream', 'properties.group.id' = 'stream', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json');
CREATE TABLE sink ( id int) WITH ( 'connector' = 'mysql-x', 'url' = 'jdbc:mysql://localhost:3306/stream', 'schema-name' = 'stream', 'table-name' = 'stream_cep_001', 'username' = 'drpeco', 'password' = '******', 'sink.buffer-flush.max-rows' = '1024', 'sink.buffer-flush.interval' = '10000', 'sink.all-replace' = 'true', 'sink.parallelism' = '1' );INSERT INTO sinkSELECT id_total as idFROM source DYNAMIC MATCH_RECOGNIZE ( PARTITION BY productionId ORDER BY procTime OUTPUT (id_total int) WITH_PATTERN ( 'tableName' = 'dynamic_cep', 'user' = 'drpeco', 'password' = '******', 'driver' = 'com.mysql.cj.jdbc.Driver', 'jdbcUrl' = 'jdbc:mysql://localhost:3306/cep', 'jdbcIntervalMillis' = '1000' ) ) AS T;
{ "id": 1, "name" : "middle", "productionId" : 11, "action" : 0, "eventTime" : 1};{ "id": 2, "name" : "middle", "productionId" : 11, "action" : 1, "eventTime" : 1};{ "id": 3, "name" : "middle", "productionId" : 11, "action" : 2, "eventTime" : 1}《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack