博客 实践数据湖iceberg:flink + iceberg CDC场景(版本问题,测试失败)

实践数据湖iceberg:flink + iceberg CDC场景(版本问题,测试失败)

   数栈君   发表于 2023-03-31 16:12  848  0

概要
测试flink cdc, 以及数据变化时update数据是如何落地

flink1.14.3
iceberg0.13.0
cdc: 2.2

设计测试场景:

1. mysql数据准备
1.1 准备数据(初始化)
create database xxzh_stock character set utf8;

CREATE TABLE `stock_basic` (
`i` bigint(20) DEFAULT NULL,
`ts_code` varchar(10),
`symbol` varchar(10),
`name` varchar(10),
`area` varchar(20),
`industry` varchar(20),
`list_date` varchar(10),
`actural_controller` varchar(100) DEFAULT NULL,
KEY `ix_stock_basic_index` (`i`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;



-- ----------------------------
-- Records of stock_basic
-- ----------------------------
INSERT INTO `stock_basic` VALUES ('0', '000001.SZ', '000001', '平安银行', '深圳', '银行', '19910403', null);
INSERT INTO `stock_basic` VALUES ('1', '000002.SZ', '000002', '万科A', '深圳', '全国地产', '19910129', null);
INSERT INTO `stock_basic` VALUES ('2', '000004.SZ', '000004', '国华网安', '深圳', '软件服务', '19910114', '李映彤');


1.2 开启mysql binlog
修改mysql配置,/etc/my.cnf,开启binlog, 并记录binlog-do-db=xxzh_stock 这个库的变化
文件末尾,增加如下配置

server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=xxzh_stock

重启mysql,让配置生效

[root@hadoop103 conf]# service mysqld restart
1
2 flink表定义
2.1 启动flink-sql命令
带上icerbg,kafka,hive,cdc的包

[root@hadoop101 ~]# sql-client.sh embedded -j /opt/software/iceberg0.13/iceberg-flink-runtime-1.14-0.13.0.jar -j /opt/software/iceberg0.13/flink-sql-connector-hive-2.3.6_2.12-1.14.3.jar -j /opt/software/flink-sql-connector-kafka_2.12-1.14.3.jar -j /opt/software/flink-connector-mysql-cdc-1.4.0.jar shell

cdc的下载地址: https://repo.maven.apache.org/maven2/com/alibaba/ververica/flink-connector-mysql-cdc/1.4.0/flink-connector-mysql-cdc-1.4.0.jar

2.2 准备mysql的表source表
CREATE TABLE stock_basic_source(
`i` INT NOT NULL,
`ts_code` INT NOT NULL,
`symbol` CHAR(10) NOT NULL,
`name` char(10) NOT NULL,
`area` CHAR(20) NOT NULL,
`industry` CHAR(20) NOT NULL,
`list_date` CHAR(10) NOT NULL,
`actural_controller` CHAR(100) NOT NULL,
PRIMARY KEY(i) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop103',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'xxzh_stock',
'table-name' = 'stock_basic'
);

PRIMARY KEY(i) NOT ENFORCED需要加上主键,否则会报

Flink SQL> select * from stock_basic_source;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'

2.3 准备iceberg的sink表
CREATE CATALOG hive_catalog6 WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://hadoop101:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://user/hive/warehouse/hive_catalog6'
);
use catalog hive_catalog6;

CREATE DATABASE xxzh_stock_mysql_db;
USE xxzh_stock_mysql_db;

CREATE TABLE stock_basic_iceberg_sink(
`i` INT NOT NULL,
`ts_code` INT NOT NULL,
`symbol` CHAR(10) NOT NULL,
`name` char(10) NOT NULL,
`area` CHAR(20) NOT NULL,
`industry` CHAR(20) NOT NULL,
`list_date` CHAR(10) NOT NULL,
`actural_controller` CHAR(100) NOT NULL,
PRIMARY KEY(i) NOT ENFORCED
);


3.通过flink从mysq写入iceberg
3.1 数据由mysql写入到iceberg
Flink SQL> insert into hive_catalog6.xxzh_stock_mysql_db.stock_basic_iceberg_sink select * from stock_basic_source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c83f55f408926ad987375f4fa3bf7df4

4. catalog路径说明
catalog切换回default_catalog才能查soure表,或者带default_catalog的绝对路径

4.1 切换catalog方式
Flink SQL> select * from stock_basic_source;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Object 'stock_basic_source' not found

Flink SQL> show current catalog;
+----------------------+
| current catalog name |
+----------------------+
| hive_catalog6 |
+----------------------+
1 row in set

Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
| hive_catalog6 |
+-----------------+
2 rows in set

Flink SQL> use catalog default_catalog;
[INFO] Execute statement succeed.

Flink SQL> select * from stock_basic_source;

4.2 绝对路径方式
参考3.2

5 报错处理
5.1 cdc包的问题
查询

解决方法:准备这个包 flink-connector-mysql-cdc-1.4.0.jar

Flink SQL> select * from stock_basic_source;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
iceberg
kafka
print
upsert-kafka

5.2 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder
2022-02-17 16:24:47,934 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend
2022-02-17 16:24:47,934 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@2772b695
2022-02-17 16:24:47,936 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@16ba5737
2022-02-17 16:24:47,936 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4ae44fbc
2022-02-17 16:24:47,936 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend
2022-02-17 16:24:47,936 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@276604d1
2022-02-17 16:24:47,936 INFO org.apache.flink.runtime.taskmanager.Task [] - SinkMaterializer -> IcebergStreamWriter (1/1)#3 (f31a1159e5464010d0ea8c9325e3db32) switched from DEPLOYING to INITIALIZING.
2022-02-17 16:24:47,941 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, stock_basic_source]], fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) -> NotNullEnforcer(fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) (1/1)#3 (3b4e39607a907745d88639c7300b088a) switched from DEPLOYING to INITIALIZING.
2022-02-17 16:24:47,942 WARN org.apache.flink.metrics.MetricGroup [] - The operator name NotNullEnforcer(fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) exceeded the 80 characters length limit and was truncated.
2022-02-17 16:24:47,943 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task IcebergFilesCommitter -> Sink: IcebergSink hive_catalog6.xxzh_stock_mysql_db.stock_basic_iceberg_sink (1/1)#3 (9dc85e56ba89021f64f46a5c55b86e9c), deploy into slot with allocation id 6fa395ed1d6e1678faaa162efd8ca0ff.
2022-02-17 16:24:47,946 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, stock_basic_source]], fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) exceeded the 80 characters length limit and was truncated.
2022-02-17 16:24:47,946 INFO com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction [] - Consumer subtask 0 has no restore state.
2022-02-17 16:24:47,947 INFO org.apache.flink.runtime.taskmanager.Task [] - IcebergFilesCommitter -> Sink: IcebergSink hive_catalog6.xxzh_stock_mysql_db.stock_basic_iceberg_sink (1/1)#3 (9dc85e56ba89021f64f46a5c55b86e9c) switched from CREATED to DEPLOYING.
2022-02-17 16:24:47,947 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task IcebergFilesCommitter -> Sink: IcebergSink hive_catalog6.xxzh_stock_mysql_db.stock_basic_iceberg_sink (1/1)#3 (9dc85e56ba89021f64f46a5c55b86e9c) [DEPLOYING].
2022-02-17 16:24:47,947 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5d60868d
2022-02-17 16:24:47,947 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@595f09f9
2022-02-17 16:24:47,948 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend
2022-02-17 16:24:47,948 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@7406169e
2022-02-17 16:24:47,948 INFO org.apache.flink.runtime.taskmanager.Task [] - IcebergFilesCommitter -> Sink: IcebergSink hive_catalog6.xxzh_stock_mysql_db.stock_basic_iceberg_sink (1/1)#3 (9dc85e56ba89021f64f46a5c55b86e9c) switched from DEPLOYING to INITIALIZING.
2022-02-17 16:24:47,955 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, stock_basic_source]], fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) -> NotNullEnforcer(fields=[i, ts_code, symbol, name, area, industry, list_date, actural_controller]) (1/1)#3 (3b4e39607a907745d88639c7300b088a) switched from INITIALIZING to FAILED with failure cause: java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder
at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:166)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)


问题分析:
cdc目前支持到flink1.13.3,具体看github pom, https://github.com/ververica/flink-cdc-connectors/blob/master/pom.xml

解决方法:
降级或者编译一个cdc,参考 http://betheme.net/news/txtlist_i66236v.html

碰到这种问题,很难讲心情好。。。

下载源码 : https://github.com/ververica/flink-cdc-connectors

尝试编译:发现1.14.3的blink包是没有的,好吧,还是放弃好了。blink-planner没有,未来吃太多坑,找社区的人问(github上有个cdc的社区服务钉钉群),也不建议使用flink1.14。

降级!
最新版flink1.13的子版本是flink1.13.5,cdc的flink1.13.3, 重新编译一个cdc
cdc的pom.xml中修改flink和scala版本。/

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for flink-cdc-connectors 2.2-SNAPSHOT:
[INFO]
[INFO] flink-cdc-connectors ............................... SUCCESS [ 11.822 s]
[INFO] flink-connector-debezium ........................... SUCCESS [ 6.094 s]
[INFO] flink-connector-test-util .......................... SUCCESS [ 0.812 s]
[INFO] flink-connector-mysql-cdc .......................... SUCCESS [ 11.715 s]
[INFO] flink-connector-postgres-cdc ....................... SUCCESS [ 1.001 s]
[INFO] flink-connector-oracle-cdc ......................... SUCCESS [ 1.162 s]
[INFO] flink-connector-mongodb-cdc ........................ SUCCESS [ 1.119 s]
[INFO] flink-connector-sqlserver-cdc ...................... SUCCESS [ 0.818 s]
[INFO] flink-sql-connector-mysql-cdc ...................... SUCCESS [ 6.187 s]
[INFO] flink-sql-connector-postgres-cdc ................... SUCCESS [ 4.314 s]
[INFO] flink-sql-connector-mongodb-cdc .................... SUCCESS [ 4.149 s]
[INFO] flink-sql-connector-oracle-cdc ..................... SUCCESS [ 6.307 s]
[INFO] flink-sql-connector-sqlserver-cdc .................. SUCCESS [ 3.716 s]
[INFO] flink-format-changelog-json ........................ SUCCESS [ 0.321 s]
[INFO] flink-cdc-e2e-tests ................................ SUCCESS [ 0.906 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:00 min
[INFO] Finished at: 2022-02-17T21:00:10+08:00
[INFO] ------------------------------------------------------------------------

总结
一开始,没有把版本全面考虑,现在又要回滚版本,过程痛苦

内容来源于网络,如侵删。


近日,袋鼠云重磅发布《数据治理行业实践白皮书》,白皮书基于袋鼠云在数据治理领域的8年深厚积累与实践服务经验,从专业视角逐步剖析数据治理难题,阐述数据治理的概念内涵、目标价值、实施路线、保障体系与平台工具,并借助行业实践案例解析,为广大读者提供一种数据治理新思路。

扫码下载《数据治理行业实践白皮书》,下载地址:https://fs80.cn/4w2atuhttp://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4404ca01208ea9dd9be6d88714ca37bc..png



想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群