博客 flink sqlserver cdc实时同步(含sqlserver安装配置等)

flink sqlserver cdc实时同步(含sqlserver安装配置等)

   数栈君   发表于 2023-09-20 10:31  504  0

01 引言

官方文档:https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/sqlserver-cdc.md

如果要使用flink cdc做sqlserver的实时同步,需要满足以下条件:

    1.需要安装SQLServer(需要支持CDC的功能,SQLServer 2008之后的版本都支持);

    2.需要开启SQL Server代理;

    3.启用CDC功能。

ok,接下来开始讲解。

02 SQLServer安装

首先需要先安装SqlServer(使用的是2019版本)

主要就是两个步骤:

## 拉取最新镜像
docker pull mcr.microsoft.com/mssql/server:2019-latest
## 运行 SQL Server 容器(密码必须是8个字符,并包含字母、数字和特殊字符,如:abc@123456 ,下面映射主机端口为30027)
docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=abc@123456' -p 30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest

03 开启SQLServer代理

首先使用root用户进入容器:

docker exec -it --user root sql_server_2019 bash

进入容器后,执行命令启用SqlServeragent:

/opt/mssql/bin/mssql-conf set sqlagent.enabled true

退出,并重启容器:

exit
docker restart sql_server_2019

具体操作如下:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/eac960f2e06727e702b06c68fc89061c..png
    

04 开启CDC功能

step1:创建’cdc_test’数据库,并使用连接工具登录该数据库,使用以下 SQL 命令启用 CDC 功能:

-- 创建数据库
CREATE DATABASE cdc_test;

-- 启用CDC功能
EXEC sys.sp_cdc_enable_db;

-- 判断当前数据库是否启用了CDC(如果返回1,表示已启用)
SELECT is_cdc_enabled FROM sys.databases WHERE name = 'cdc_test';
  

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/1aab434d3ec434903b72c4d8a0d44af3..png
  

step2:选择要进行 CDC 跟踪的表(这里使用orders表作为演示)

-- 创建示例表(orders)
CREATE TABLE orders (
    id int,
    order_date date,
    purchaser int,
    quantity int,
    product_id int,
    PRIMARY KEY ([id])
);

-- schema_name 是表所属的架构(schema)的名称。
-- table_name 是要启用 CDC 跟踪的表的名称。
-- cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。
EXEC sys.sp_cdc_enable_table
   @source_schema = 'dbo',
   @source_name = 'orders',
   @role_name = 'cdc_role';


执行结果如下:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/662700943f5b73e3b699cb267e6e6ec2..png
  

step3:启用 CDC 后,SQL Server 将自动跟踪启用了 CDC 的表上的数据更改,并将更改信息存储在 CDC 相关的表中,您可以使用这些信息进行数据更改追踪和同步。

-- 查询在当前数据库下所有的表:
SELECT * FROM INFORMATION_SCHEMA.TABLES
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/7e003eded7a6b561b69c18e58cdc6437..png
  


05 Flink SQL

ok,现在可以写FlinkSQL了,如下:

-- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据
CREATE TABLE t_source_sqlserver (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,
    PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
) WITH (
    'connector' = 'sqlserver-cdc', -- 使用SQL Server CDC连接器
    'hostname' = '10.194.183.120', -- SQL Server主机名
    'port' = '30027', -- SQL Server端口
    'username' = 'sa', -- SQL Server用户名
    'password' = 'abc@123456', -- SQL Server密码
    'database-name' = 'cdc_test', -- 数据库名称
    'schema-name' = 'dbo', -- 模式名称
    'table-name' = 'orders' -- 要捕获更改的表名
);

-- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库
CREATE TABLE table_sink_mysql (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,
    PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
)
WITH (
    'connector' = 'jdbc', -- 使用JDBC连接器
    'url' = 'jdbc:mysql://10.194.183.120:30025/test', -- MySQL的JDBC URL
    'username' = 'root', -- MySQL用户名
    'password' = 'root', -- MySQL密码
    'table-name' = 'orders' -- 要写入的MySQL表名
);

-- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;

启动程序,一切正常:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e0d8a39b642b29340e39c687254e45f3..png
  

06 验证

验证新增:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/1469dbe1429a1db669bd8e013ec87aa1..png
  

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6f18a7af794d53cb608e74da162db0ed..png
  


验证修改:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/02ab3a80977fb69419be3e040f0dacba..png
  

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d991e05030761283a20db78278c9ecc3..png
  


验证删除:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/462f06cc7645e723cefc9c90718f0969..png
  

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/141757bdea09d7e0a8711fbd253b2134..png
  




免责申明:


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群