博客 Dinky 扩展批流统一数据集成框架 ChunJun 的实践分享

Dinky 扩展批流统一数据集成框架 ChunJun 的实践分享

   数栈君   发表于 2022-11-18 15:36  91  0

一、前言

ChunJun(原 FlinkX)是一个基于 Flink 提供易用、稳定、高效的批流统一的数据集成工具,既可以采集静态的数据,比如 MySQL,HDFS 等,也可以采集实时变化的数据,比如 binlog,Kafka 等。同时 ChunJun 也是一个支持原生 FlinkSql 所有语法和特性的计算框架。

ChunJun 具有丰富的插件种类,多达 40 种,如常见的 mysql、binlog、logminer 等,大部分插件都支持 source/reader、sink/writer 及维表功能。目前很多用户在思考能否在 Dinky 上使用 ChunJun 的插件以提供更全面的能力。那本文将带来如何在 Dinky 上集成 ChunJun 丰富的插件,其实简单,那我们开始吧。

二、部署 Flink+ChunJun

编译

注意,如果需要集成 Dinky,需要将 ChunJun 项目下的 chunjun-core 的 pom 文件中的 logback-classic 和 logback-core 注释掉,否则容易在 Dinky 执行 sql 任务的时候报错。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ce3abb6ab59b0d2c94b117df0b7a1338..png
 然后执行:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a16e8140bbcf574643ef84450264405e..png

部署

使用 ChunJun 需要先部署 Flink 集群,其部署本文不再做指导。

值得注意的是,如果你需要调用 Flinkx 的 connect jar 的话,则需要将 classloader.resolve-order 改成 parent-first。修改完成配置以后,把 Flinkx 的 jar 包复制过来,主要是 chunjun-clients-master.jar(Flinkx 现在改名 ChunJun )以及 chunjun 的其它 connector 放到 flink/lib 目录下,如图所示。 http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/72cbdeb700d4f09502ee1394706d538b..png

异常处理

如果启动集群时出现异常,即 Flink standalone 集群加载 flinkx-dist 里 jar 包之后,集群无法启动,日志报错:Exception in thread "main" java.lang.NoSuchFieldError: EMPTY_BYTE_ARRAY.

Exception in thread"main"java.lang.NoSuchFieldError:EMPTY_BYTE_ARRAY at org.apache.logging.log4j.core.config.ConfigurationSource.(ConfigurationSource.java:56) at org.apache.logging.log4j.core.config.NullConfiguration.< init>(NullConfiguration.java:32) at org.apache.logging.log4j.core.LoggerContext.< clinit>(LoggerContext.java:85) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.log4j.LogManager.< clinit>(LogManager.java:72) at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:285) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:305) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.< clinit>(ClusterEntrypoint.java:107)

原因:这个报错是因为 log4j 版本不统一导致的,因为 flinkx-dist 中部分插件引用的还是旧版本的 log4j 依赖,导致集群启动过程中,出现了类冲突问题;

方案:临时方案是将 flink lib 中 log4j 相关的 jar 包名字前加上字符 ‘a‘,使得 flink standalone jvm 优先加载。 http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e6e01370a64e89ffa8cbf752fc2a293f..png

三、部署 Dinky

编译

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/7db239ea38d0b87adc08c71ecd5dece5..png
 编译完成后的压缩包在 Dinky 根目录下的 build 文件夹下。

部署

1、上传 dlink 压缩包到部署服务器

2、解压

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/38a61cf453c2f9b083bff3a8c5032e0e..png
 3、数据库初始化

4、把 flink 的 jar 放到 dlink 目录下

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6832604658ffd518becccb03ded0a7fc..png

切换 Dinky 的 Flink 版本

因为目前 flinkx 的稳定版本是 1.12.7,所以我们把 dlink 默认的 client 版本修改为 1.12

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/141d587b34b32742a04a993ee054b9df..png
 lib 下的目录如图:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/155a1d81b20ae4cd891ddbe205f17610..png
 注意:因为我没有用上 dlink-connector-jdbc 的 jar 包,所以图中的 dlink-connector-jdbc-1.13-0.6.4-SNAPSHOT.jar 没有换成 1.12 版本的,可以去掉。

启动

启动命令 http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/da8dd81a7d4747fdd02c6b8f53b15598..png

注册集群实例

在集群实例中注册已经启动的 Flink 集群。 http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/1bcc19329164ad5ae2379070195802f7..png

四、示例分享

添加依赖

这里演示 mysql->mysql 的同步作业,所以需要 Flinkx 的 mysql-connector.jar 以及核心 jar。 http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ae595b5fed74f2201c1ccda5f0e372c3..png

编写作业

Mysql DDL:

CREATE TABLE datasource_classify ( id int unsigned NOT NULL AUTO_INCREMENT COMMENT ' 自增 id', classify_code varchar (64) NOT NULL COMMENT ' 类型栏唯一编码 ', sorted int NOT NULL DEFAULT '0' COMMENT ' 类型栏排序字段 默认从 0 开始 ', classify_name varchar (64) NOT NULL COMMENT ' 类型名称 包含全部和常用栏 ', is_deleted tinyint NOT NULL DEFAULT '0' COMMENT ' 是否删除,1 删除,0 未删除 ', gmt_create datetime DEFAULT CURRENT_TIMESTAMP, gmt_modified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (id), UNIQUE KEY classify_code (classify_code) ) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT=' 数据源分类表 ';

Flink Sql:

CREATE TABLE source ( id bigint, classify_code STRING, sorted int, classify_name STRING, is_deleted int, gmt_create timestamp(9), gmt_modified timestamp(9), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-x', 'url' = 'jdbc:mysql://192.168.31.101:3306/datasource?useSSL=false', 'table-name' = 'datasource_classify', 'username' = 'root', 'password' = 'root' ,'scan.fetch-size' = '2' ,'scan.query-timeout' = '10' );

CREATE TABLE sink ( id bigint, classify_code STRING, sorted int, classify_name STRING, is_deleted int, gmt_create timestamp(9), gmt_modified timestamp(9), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-x', 'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false', 'table-name' = 'datasource_classify', 'username' = 'root', 'password' = 'root' ,'scan.fetch-size' = '2' ,'scan.query-timeout' = '10' );

insert into sink select * from source u;

执行任务

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a2459b898cd071b02f58e5d4728cf4d9..png
 选中 Yarn Session 模式提交作业。 http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/2598692c67b48b054609f44ca0a6c7bc..png
 提交后可从执行历史查看作业提交状况。 http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/47d18dd6ef6a975f9a81dbb7c5f384cc..png
 进程中可以看的 Flink 集群上批作业执行完成。

对比数据

源库: http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ad0bcf9d91a2d63b4fa9fd467e2e7410..png
 目标库: http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ad3670166cedfa3c4d1426a511ef9e31..png
 同步成功,很丝滑。

五、总结

在集成 ChunJun 的时候遇到的问题大部分都是缺包以及包冲突,所以只需要注意一下这个问题就能比较好的进行集成。

在集成服务的时候建议是,先把 Flink 和 ChunJun 进行集成,确保服务能够正常启用以后再进行 Dinky 的集成,这样有利于快速定位查找问题,如果遇到文章之外的问题,也可以查看 Dinky 官网 FAQ | Dinky (dlink.top) chunjun 的官网 QuickStart | ChunJun 纯钧 (dtstack.github.io/chunjun/),看看是否有类似问题的解决办法作为参考。

六、用户体验

因为本人目前还是处于学习使用的过程中,所以很多功能没有好好使用,待自己研究更加透彻后希望写一篇文章,优化官网的用户手册。以下的优缺点以及建议都是目前我在使用学习的过程中遇到的问题。

优点

Dinky 最吸引我的地方应该就是 sql 编辑模版了,直接快捷键生成 sql 模版,在开发测试中屡试不爽。在集成了 ChunJun (Flinkx) 以后,能够做到多源数据的离线跑批任务及日常小批量实时任务的同步。支持各种类型的任务执行方式。

缺点

ui 上适配还有点小问题,例如:打开 F12 调整宽度后,再关闭,页面 ui 不会自适应,需要刷新。

期待改进点

1、更多的自定义异常、业务异常

2、增加新的向导模式,结合数据源,通过 webUI 可以一键引入字段或者勾选需要的字段,生成 Flink Sql 的一大部分配置

CREATE TABLE 表名 (-- 页面勾选字段,字段从元数据直接拉取 id bigint, classify_code STRING, sorted int, classify_name STRING, is_deleted int, gmt_create timestamp (9), gmt_modified timestamp (9), PRIMARY KEY (id) NOT ENFORCED ) WITH ( -- 从选择的数据中获取 'connector' = 'mysql-x', 'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false', 'table-name' = 'datasource_classify', 'username' = 'root', 'password' = 'root' , -- 其它非主要配置有用户自己填写 ); 3、sql 历史版本管理,目前我已经提交 Feature 并被合并到 0.6.5 版本中。

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术 qun」,交流最新开源技术信息,qun 号码:30537511,项目地址:https://github.com/DTStack/Taier

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群