在风险控制或者模式匹配等场景下,用户经常会想要在模式仍然能够提供服务的情况下,改变事件需要匹配的模式。在目前的 Flink CEP 中,一个 CEP 算子有一个固定的 CEP 模式,不支持改变。因此,为了达到上述目的,用户必须重启 Flink 作业,并等待相对较长的更新时间。
另一种常见情况是,一个事件流需要与多个模式匹配。虽然当前的 Flink CEP 不支持在一个 CEP 运算符中匹配多个模式,但用户必须为每个模式设置一个 Flink 作业或一个运算符。这可能会浪费内存和计算资源。为此我们支持了动态加载规则,支持任务不重启Flink 作用动态的匹配规则。
风险控制:例如,检测用户行为模式,如果发现异常行为(如短时间内多次登录失败),则可以触发相应的风险控制措施。
用户画像:通过分析用户的行为事件流,可以构建用户画像,进而实现精准营销。
运维监控:在企业服务的运维管理中,CEP 可以用来配置复杂的监控规则,以实现对服务状态的实时监控。
{
"afterMatchStrategy": {
"type": "NO_SKIP"
},
"edges": [
{
"source": "middle",
"target": "end",
"type": "STRICT"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"name": "end",
"nodes": [
{
"condition": {
"expression": "action == 2",
"type": "AVIATOR"
},
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
]
},
"type": "ATOMIC"
},
{
"condition": {
"expression": "action == 1",
"type": "AVIATOR"
},
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
]
},
"type": "ATOMIC"
},
{
"condition": {
"expression": "action == 0",
"type": "AVIATOR"
},
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
]
},
"type": "ATOMIC"
}
],
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
]
},
"type": "COMPOSITE",
"version": 1,
"window": null
}
ADD JAR WITH /data/sftp/11_dynamic-cep-jar-1_dynamic-cep-jar-1.0-SNAPSHOT.jar AS functions.jar;
CREATE TABLE source (
id INT,
name VARCHAR,
productionId INT,
action INT,
eventTime BIGINT,
procTime AS PROCTIME()
) WITH (
'connector' = 'kafka-x',
'topic' = 'stream',
'properties.group.id' = 'stream',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
CREATE TABLE sink
(
id int
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://localhost:3306/stream',
'schema-name' = 'stream',
'table-name' = 'stream_cep_001',
'username' = 'drpeco',
'password' = '******',
'sink.buffer-flush.max-rows' = '1024',
'sink.buffer-flush.interval' = '10000',
'sink.all-replace' = 'true',
'sink.parallelism' = '1'
);
INSERT INTO sink
SELECT id_total as id
FROM source
DYNAMIC MATCH_RECOGNIZE (
PARTITION BY productionId
ORDER BY procTime
OUTPUT (id_total int)
WITH_PATTERN (
'tableName' = 'dynamic_cep',
'user' = 'drpeco',
'password' = '******',
'driver' = 'com.mysql.cj.jdbc.Driver',
'jdbcUrl' = 'jdbc:mysql://localhost:3306/cep',
'jdbcIntervalMillis' = '1000'
)
)
AS T;
{
"id": 1,
"name" : "middle",
"productionId" : 11,
"action" : 0,
"eventTime" : 1
};
{
"id": 2,
"name" : "middle",
"productionId" : 11,
"action" : 1,
"eventTime" : 1
};
{
"id": 3,
"name" : "middle",
"productionId" : 11,
"action" : 2,
"eventTime" : 1
}
《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack