博客 Flink cdc 介绍及使用 FlinkCDC读取mysql 及 jdbc 连接参数配置

Flink cdc 介绍及使用 FlinkCDC读取mysql 及 jdbc 连接参数配置

   数栈君   发表于 2023-07-18 11:54  1421  0

1. Flink cdc 介绍


CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC。

目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛;

Flink 的 cdc 是基于日志的:实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog文件当作流的数据源;保障数据一致性,因为 binlog 文件包含了所有历史变更明细;保障实时性,因为类似 binlog的日志文件是可以流式消费的,提供的是实时数据。

2. 常见cdc开源方案


3. Flink cdc 使用案例
3.1 Mysql开启binlog
修改配置文件

vi /etc/my.cnf
1
my.cnf文件内容

# 第一个参数是打开binlog日志
log_bin=ON

# 第二个参数是binlog日志的基本文件名,后面会追加标识来表示每一个文件
log_bin_basename=/usr/local/mysql/log-bin/mysql-bin

# 第三个参数指定的是binlog文件的索引文件,这个文件管理了所有的binlog文件的目录
log_bin_index=/usr/local/mysql/log-bin/mysql-bin.index
1
2
3
4
5
6
7
8
修改完成后 查看 binlog 开启状态

show variables like '%log_bin%';
1
如下图所示 ON 为开启状态



3.2 Flink cdc读取mysql 及 jdbc 连接参数配置
flink-connector-mysql-cdc 2.2 版本之前没有找到关于 jdbc 连接参数的配置,此处以 2.2 为主

3.2.1 Maven POM 文件
<dependencies>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<!--mysql cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
<!-- <scope>provided</scope>-->
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>

<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>

<!--本地调试flink ui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.13.0</version>
<scope>compile</scope>
</dependency>

</dependencies>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
3.2.2 Flink CDC 代码
Flink cdc

package flink_cdc;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class FlinkCDC_Mysql {
public static void main(String[] args) throws Exception {

//创建执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

Properties prop = new Properties();
prop.setProperty("autoReconnect","true");

//创建 Flink-MySQL-CDC 的 Source
//initial (default): 在第一次启动时对被监视的数据库表执行初始快照,并继续读取最新的binlog (开启断点续传后从上次消费offset继续消费)
//latest-offset: 永远不要在第一次启动时对被监视的数据库表执行快照,只从binlog的末尾读取,这意味着只有自连接器启动以来的更改
//timestamp: 永远不要在第一次启动时对监视的数据库表执行快照,直接从指定的时间戳读取binlog。使用者将从头遍历binlog,并忽略时间戳小于指定时间戳的更改事件
//specific-offset: 不允许在第一次启动时对监视的数据库表进行快照,直接从指定的偏移量读取binlog。
MySqlSource<String> build = MySqlSource.<String>builder()
.serverTimeZone("UTC")
.hostname("localhost")
.port(3306)
.username("root")
.password("123456")
.databaseList("test")
//tableList为可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
.tableList("test.test")
.startupOptions(StartupOptions.latest())
//自定义反序列化器
.deserializer(new FlinkCdcDataDeserializationSchema())
//jdbc连接参数配置
.jdbcProperties(prop)
.build();


//使用 CDC Source 从 MySQL 读取数据
DataStreamSource<String> mysqlDS = env.fromSource(build, WatermarkStrategy.noWatermarks(), "MysqlSource");

//打印数据
mysqlDS.printToErr("------>").setParallelism(1);

mysqlDS.addSink(new MysqlSink());

//6.执行任务
env.execute("FlinkCDC_mysql");


}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
自定义反序列化类
自定义反序列化类解析读入mysql的binlog为指定的json格式(实现接口DebeziumDeserializationSchema重写deserialize、getProducedType方法)

package flink_cdc;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

public class FlinkCdcDataDeserializationSchema implements DebeziumDeserializationSchema<String> {

@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

Struct valueStruct = (Struct)sourceRecord.value();
Struct sourceStruct = valueStruct.getStruct("source");

//获取数据库名称,表名,操作类型
String database = sourceStruct.getString("db");
String table = sourceStruct.getString("table");
String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();

if (type.equals("create")) type="insert";
JSONObject jsonObject = new JSONObject();
jsonObject.put("database",database);
jsonObject.put("table",table);
jsonObject.put("type",type);

//格式转换
Struct beforeStruct = valueStruct.getStruct("before");
JSONObject beforeDataJson = new JSONObject();
if (beforeStruct != null) {
for (Field field : beforeStruct.schema().fields()) {
beforeDataJson.put(field.name(),beforeStruct.get(field));
}
}

Struct afterStruct = valueStruct.getStruct("after");
JSONObject afterDataJson = new JSONObject();
if (afterStruct != null) {
for (Field field : afterStruct.schema().fields()) {
afterDataJson.put(field.name(),afterStruct.get(field));
}
}

jsonObject.put("beforeData",beforeDataJson);
jsonObject.put("afterData",afterDataJson);

//向下游传递数据
collector.collect(jsonObject.toJSONString());

}

@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
Mysql Sink
不需要可忽略

package flink_cdc;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class MysqlSink extends RichSinkFunction<String> {

Connection connection = null;
PreparedStatement insertSmt = null;

@Override
public void open(Configuration parameters) throws Exception {
String url = "jdbc:mysql://localhost:3306/test?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false";
connection = DriverManager.getConnection(url,"root","123456");
insertSmt = connection.prepareStatement("REPLACE into test2(id,name) values (?,?)");
}

@Override
public void invoke(String value, Context context) throws Exception {

System.err.println(value);
JSONObject jsonObject = JSONObject.parseObject(value);
System.out.println(jsonObject.get("afterData"));

TestBean afterData = JSONObject.parseObject(JSONObject.toJSONString(jsonObject.get("afterData")), TestBean.class);

//直接执行更新语句
insertSmt.setInt(1,afterData.getId());
insertSmt.setString(2,afterData.getName());
insertSmt.execute();
}

@Override
public void close() throws Exception {
insertSmt.close();
connection.close();
}


}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
sink测试实体

package flink_cdc;

public class TestBean {

private int id;

private String name;


public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
4. 补充
基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL
假设我们正在经营电子商务业务,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。 对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。

通过Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:


基于 Flink CDC 同步 MySQL 分库分表构建实时数据湖
在 OLTP 系统中,为了解决单表数据量大的问题,通常采用分库分表的方式将单个大表进行拆分以提高系统的吞吐量。 但是为了方便数据分析,通常需要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。

将数据从 MySQL 同步到 Iceberg 为例的整个流程,架构图如下所示:


也可以使用不同的 source 比如 Oracle/Postgres 和 sink 比如 Hudi 来构建自己的 ETL 流程。

Flink cdc 官方文档地址:https://ververica.github.io/flink-cdc-connectors/master/index.html

免责申明:


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群