canal format
Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如
将增量数据从数据库同步到其他系统
日志审计
数据库的实时物化视图
关联维度数据库的变更历史,等等。
Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。
示例:
1、在mysql中创建学生表,插入几条数据
CREATE TABLE `student` (
`id` varchar(20) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` bigint(20) DEFAULT NULL,
`gender` varchar(255) DEFAULT NULL,
`clazz` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2、在flink中创建kafka soure表指定数据的格式为canl-json
canal采集的数据包含三种类型,INSERT, UPDATE,DELETE
FLink 会自动将三种类型转换成变更日志流。同时会自动解析数据
CREATE TABLE student_kafka (
id STRING,
name STRING,
age bigint,
gender STRING,
clazz STRING
) WITH (
'connector' = 'kafka',
'topic' = 'bigdata.student',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'canal-json' -- 使用 canal-json 格式
);
3、统计班级的人数
select clazz,count(1) as c
from student_kafka
group by clazz
4、将统计的结果保存到数据库中
CREATE TABLE clazz_num (
clazz STRING,
c BIGINT,
PRIMARY KEY (clazz) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'clazz_num',
'username' ='root',
'password' = '123456'
);
insert into clazz_num
select clazz,count(1) as c
from student_kafka
group by clazz
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack