博客 详解 Flink Catalog 在 ChunJun 中的实践之路

详解 Flink Catalog 在 ChunJun 中的实践之路

   数栈君   发表于 2023-04-06 17:54  772  0

我们知道 Flink 有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,相对于这些耳熟能详的概念,Flink 里还有一个 Catalog(目录) 的概念。


本文将为大家带来 Flink Catalog 的介绍以及 Flink Catalog 在 ChunJun 中的实践之路。


Flink Catalog 简介


Catalog 提供元数据,如数据库、表、分区、视图,以及访问存储在数据库或其他外部系统中的数据所需的函数和信息。


Flink Catalog 作用

数据处理中最关键的一个方面是管理元数据

· 可能是暂时性的元数据,如临时表,或针对表环境注册的 UDFs;

· 或者是永久性的元数据,比如 Hive 元存储中的元数据。


Catalog 提供了一个统一的 API 来管理元数据,并使其可以从表 API 和 SQL 查询语句中来访问。


Catalog 使用户能够引用他们数据系统中的现有元数据,并自动将它们映射到 Flink 的相应元数据。例如,Flink 可以将 JDBC 表自动映射到 Flink 表,用户不必在 Flink 中手动重写 DDL。Catalog 大大简化了用户现有系统开始使用 Flink 所需的步骤,并增强了用户体验。


Flink Catalog 的结构

  //  

● Flink Catalog 原生结构

• GenericInMemoryCatalog:基于内存实现的 Catalog


• Jdbc Catalog:可以将 Flink 通过 JDBC 协议连接到关系数据库,目前 Flink 在1.12和1.13中有不同的实现,包括 MySql Catalog 和 Postgres Catalog


• Hive Catalog:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口


● Flink Iceberg Catalog


● Flink Hudi Catalog

HoodieCatalog、HoodieHiveCatalog

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d9eb7a3b6254c01e69ef3a2f46b535c5..jpg
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/b958ba2729294d9129864d8be8028c8b..jpg


Flink Catalog 详解


GenericInMemoryCatalog

final CatalogManager catalogManager =        CatalogManager.newBuilder()                .classLoader(userClassLoader)                .config(tableConfig)                .defaultCatalog(                        settings.getBuiltInCatalogName(),                        new GenericInMemoryCatalog(                                settings.getBuiltInCatalogName(),                                settings.getBuiltInDatabaseName()))                .build();
defaultCatalog = new GenericInMemoryCatalog( defaultCatalogName, settings.getBuiltInDatabaseName());
CatalogManager catalogManager = builder.defaultCatalog(defaultCatalogName, defaultCatalog).build();


GenericInMemoryCatalog 所有的数据都保存在 HashMap 里面,无法持久化。


JDBC Catalog

CREATE CATALOG my_catalog WITH(    'type' = 'jdbc',    'default-database' = '...',    'username' = '...',    'password' = '...',    'base-url' = '...');
USE CATALOG my_catalog;


如果创建并使用 Postgres Catalog 或 MySQL Catalog,请配置 JDBC 连接器和相应的驱动。


JDBC Catalog 支持以下参数:

  //  

• name:必填,Catalog 的名称

• default-database:必填,默认要连接的数据库

• username:必填,Postgres/MySQL 账户的用户名

• password:必填,账户的密码

• base-url: 必填,(不应该包含数据库名)

对于 Postgres Catalog base-url 应为 "jdbc:postgresql://:" 的格式

对于 MySQL   Catalog base-url 应为 "jdbc:mysql://:" 的格式


Hive Catalog

CREATE CATALOG myhive WITH (    'type' = 'hive',    'default-database' = 'mydatabase',    'hive-conf-dir' = '/opt/hive-conf');-- set the HiveCatalog as the current catalog of the sessionUSE CATALOG myhive;


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9148a6730d3842b0d3aa818a690e8457..jpg


Iceberg Catalog

● Hive Catalog 管理 Iceberg 表

(Flink) default_database.flink_table -> (Iceberg) default_database.flink_tableCREATE TABLE flink_table (    id   BIGINT,    data STRING) WITH (    'connector'='iceberg',    'catalog-name'='hive_prod',    'uri'='thrift://localhost:9083',    'warehouse'='hdfs://nn:8020/path/to/warehouse');(Flink)default_database.flink_table -> (Iceberg) hive_db.hive_iceberg_tableCREATE TABLE flink_table (    id   BIGINT,    data STRING) WITH (    'connector'='iceberg',    'catalog-name'='hive_prod',    'catalog-database'='hive_db',    'catalog-table'='hive_iceberg_table',    'uri'='thrift://localhost:9083',    'warehouse'='hdfs://nn:8020/path/to/warehouse');


● Hadoop Catalog 管理 Iceberg 表

CREATE TABLE flink_table (    id   BIGINT,    data STRING) WITH (    'connector'='iceberg',    'catalog-name'='hadoop_prod',    'catalog-type'='hadoop',    'warehouse'='hdfs://nn:8020/path/to/warehouse');


● 自定义 Catalog 管理 Iceberg 表

CREATE TABLE flink_table (    id   BIGINT,    data STRING) WITH (    'connector'='iceberg',    'catalog-name'='custom_prod',    'catalog-impl'='com.my.custom.CatalogImpl',     -- More table properties for the customized catalog    'my-additional-catalog-config'='my-value',     ...);


  //  

• connector:iceberg


• catalog-name:用户指定的目录名称,这是必须的,因为连接器没有任何默认值


• catalog-type:内置目录的 hive 或 hadoop(默认为hive),或者对于使用 catalog-impl 的自定义目录实现,不做设置


• catalog-impl:自定义目录实现的全限定类名,如果 catalog-type 没有被设置,则必须被设置,更多细节请参见自定义目录


• catalog-database: 后台目录中的 iceberg 数据库名称,默认使用当前的 Flink 数据库名称


• catalog-table: 后台目录中的冰山表名,默认使用 Flink CREATE TABLE 句子中的表名


Hudi Catalog

create catalog hudi with( 'type' = 'hudi', 'mode' = 'hms',  'hive.conf.dir'='/etc/hive/conf');
--- 创建数据库供hudi使用create database hudi.hudidb;
--- order表CREATE TABLE hudi.hudidb.orders_hudi( uuid INT, ts INT, num INT, PRIMARY KEY(uuid) NOT ENFORCED) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ');
select * from hudi.hudidb.orders_hudi;


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/3c7eeae60057777b538785b5520a36da..jpg
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/fc87236652cdaa44c758e79ab9d58502..jpg


Flink Catalog 在 ChunJun 中的实践


下面将为大家介绍本文的重头戏,Flink Catalog 在 ChunJun 中的实践之路。


直接引入开源 Catalog

ChunJun 目前的所有 Catalog 为以下四种:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/05e7e0f669ae46a65d89007524e11553..jpg


● Hive Catalog 需要的依赖

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/983958a3aa7b62b0320572d237a1d64b..jpg


● Iceberg Catalog 需要的依赖

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a031d7e537001d0bd6163b45561d76b0..jpg


● JDBC Catalog

JDBC 因为 Flink 1.12 和 1.13 API 有变化,因此需要涉及源码的改动,改动一些 API 后,从源码引入。


● DT Catalog

结合内部业务,自定义的一种 Catalog ,下文将会进行详细介绍。


DT Catalog -存储元数据表设计

● 创建 mysql 元数据表 database_info

-- 创建表的 sqlcreate table database_info(    `id`            bigint PRIMARY KEY NOT NULL AUTO_INCREMENT COMMENT '项目ID',-- database id    `catalog_name`  varchar(255) COMMENT 'catalog 名字',    `database_name` varchar(255) COMMENT 'database 名字',    `catalog_type`  varchar(30) COMMENT 'catalog 类型, eg: mysql,oracle...',    `project_id`    int(11)            NOT NULL COMMENT '项目ID',    `tenant_id`     int(11)            NOT NULL COMMENT '租户ID') ENGINE = InnoDB  DEFAULT CHARSET = utf8;
-- 创建索引CREATE INDEX idx_catalog_name_database_name_project_id_tenant_id ON database_info (`catalog_name`, `database_name`, `project_id`, `tenant_id`);


● 创建 mysql 元数据表 table_info

-- 创建表的 sqlcreate table table_info(    `id`            bigint PRIMARY KEY NOT NULL AUTO_INCREMENT,    `database_id`    bigint COMMENT 'database_info 表的 id',    `table_name`  varchar(255) COMMENT '表名',    `project_id`    int(11)            NOT NULL COMMENT '项目ID',    `tenant_id`     int(11)            NOT NULL COMMENT '租户ID') ENGINE = InnoDB  DEFAULT CHARSET = utf8;
-- 创建索引CREATE INDEX idx_catalog_id_project_id_tenant_id ON table_info (`database_id`, `project_id`, `tenant_id`);CREATE INDEX idx_database_id_table_name_project_id_tenant_id ON table_info (`database_id`, `table_name`, `project_id`, `tenant_id`);


● 创建 mysql 元数据表 properties_info

create table properties_info(    `id`       bigint PRIMARY KEY NOT NULL AUTO_INCREMENT ,    `table_id` bigint(20) COMMENT 'table_info 表的 id',    `key`      varchar(255) COMMENT '表的属性 key',    `value`    varchar(255) COMMENT '表的属性 value') ENGINE = InnoDB  DEFAULT CHARSET = utf8;
CREATE INDEX idx_table_id ON properties_info (table_id);


● properties_info 里面存了什么?

schema.0.name=id,  schema.0.data-type=INT NOT NULL,  schema.1.name=name,  schema.1.data-type=VARCHAR(2147483647)  schema.2.name=age,  schema.2.data-type=BIGINT,  schema.primary-key.name=PK_3386,  schema.primary-key.columns=id,
connector=jdbc, url=jdbc:mysql: //172.16.83.218:3306/wujuan?useSSL=false, username=drpeco, password=DT@Stack#123,
comment=, scan.auto-commit=true, lookup.cache.max-rows=20000, scan.fetch-size=10, lookup.cache.ttl=700000  table-name=t2,


使用 DT Catalog

● 创建 DT Catalog

CREATE CATALOG catalog1WITH (    'type' = 'dt',    'default-database' = 'default_database',    'driver' = 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://xxx:3306/catalog_default',    'username' = 'drpeco',    'password' = 'DT@Stack#123',    'project-id' = '1',    'tenant-id' = '1'  );


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/93242ac788c980141b0dff3a04ff85e1..jpg


● 创建 Database

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]Drop a database with the given database name. If the database to drop does not exist, an exception is thrown.IF EXISTSIf the database does not exist, nothing happens.RESTRICTDropping a non-empty database triggers an exception. Enabled by default.CASCADEDropping a non-empty database also drops all associated tables and functions.

create database if not exists catalog1.database1
drop database if exists catalog1.database1 -- 删除非空数据库,连通数据库中的所有表也一起删除drop database if exists catalog1.database1 CASCADE


● 创建 Table

1)Rename Table

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_nameRename the given table name to another new table name


2)Set or Alter Table Properties

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...) Set one or more properties in the specified table. If a particular property is already set in the table, override the old value with the new one.


-- 创建表CREATE TABLE if not exists catalog1.default_database.table1(    id      int,    name    string,    age     bigint,    primary key ( id) not enforced) with (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',    'table-name' = 't2',    'username' = 'drpeco',    'password' = 'DT@Stack#123');
-- 删除表drop table if exists mysql_catalog2.wujuan_database2.wujuan_table
-- 重命名表名ALTER TABLE catalog1.default_database.table1 RENAME TO table2;
-- 设置表属性ALTER TABLE catalog1.default_database.table1 SET ('tablename'='t2','url'='dbc:mysql://172.16.83.218:3306/wujuan?useSSL=false')


使用 DTCatalog 的具体场景和实现原理

● 全部是 DDL,只有 Catalog 的创建

CREATE CATALOG catalog1WITH (    'type' = 'DT',    'default-database' = 'default_database',    'driver' = 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default?autoReconnect=true&failOverReadOnly=false',    'username' = 'drpeco',    'password' = 'DT@Stack#123',    'project-id' = '1',    'tenant-id' = '1'  );

· 可以执行,但是没有意义,ChunJun 不会存储 Catalog 信息,只有平台存储;

· 不支持语法校验。


● 全部是 DDL,包含 Catalog、Database、Table 的创建

-- 初始化 CatalogCREATE CATALOG catalog1WITH (    'type' = 'dt',    'default-database' = 'default_database',    'driver' = 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',    'username' = 'drpeco',    'password' = 'DT@Stack#123',    'project-id' = '1',    'tenant-id' = '1'  );-- 创建数据库create database if not exists database1-- 创建表CREATE TABLE if not exists catalog1.default_database.table1(    id      int,    name    string,    age     bigint,    primary key ( id) not enforced) with (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',    'table-name' = 't2',    'username' = 'drpeco',    'password' = 'DT@Stack#123');

· 无论创建数据库、表,删除数据库、表,必须包含 create catalog 语句;

· 可以执行,可以创建数据库和表;

· 不支持语法校验。


// 抛出异常的逻辑StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tEnv);TableResult execute = statementSet.execute();       -->tableEnvironment.executeInternal(operations);        -->Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);     -->StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(getExecutionEnvironment(), transformations); -->
// 抛出异常的方法public static StreamGraph generateStreamGraph(StreamExecutionEnvironment execEnv, List<Transformation<?>> transformations){ if (transformations.size() <= 0) { throw new IllegalStateException( "No operators defined in streaming topology. Cannot generate StreamGraph."); } ... return generator.generate();}
// 如果没有 insert 语句的时候,无法生成 JobGraph,但是 DDL 是执行成功的。// 因此捕获 FlinkX 抛出的特殊异常,此语句的异常 Message 是 FlinkX 里面处理的。try {    PackagedProgramUtils.createJobGraph(program, flinkConfig, 1, false);} catch (ProgramInvocationException e) {    // 仅执行 DDL FlinkX 抛出的异常    if (!e.getMessage().contains("OnlyExecuteDDL")) {        throw e;    }}
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/8c4f8e2f64e977c68ce8cdce4196eb27..jpg


● DDL + DML,包含 create + insert 语句

1)初始化 Catalog

CREATE CATALOG catalog1WITH (    'type' = 'dt',    'default-database' = 'default_database',    'driver' = 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',    'username' = 'drpeco',    'password' = 'DT@Stack#123',    'project-id' = '1',    'tenant-id' = '1'  );


2.1)创建数据库

create database if not exists database1


2.2)创建源表

CREATE TABLE if not exists catalog1.default_database.table1(    id      int,    name    string,    age     bigint,    primary key ( id) not enforced) with (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',    'table-name' = 't2',    'username' = 'drpeco',    'password' = 'DT@Stack#123');


3.1)创建数据库

create database if not exists catalog1.database2;


3.2)创建结果表

CREATE TABLE if not exists catalog1.database2.table2(    id      int,    name    string,    age     bigint,    primary key ( id) not enforced) with ( 'connector' = 'print');


4)执行任务

insert into catalog1.database2.table2 select * from catalog1.database1.table1

· 不可以执行,可以提交;

· 支持语法校验。


● DML,只有 Insert 语句

-- 初始化 CatalogCREATE CATALOG catalog1WITH (  'type' = 'dt',  'default-database' = 'default_database',  'driver' = 'com.mysql.cj.jdbc.Driver',  'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',  'username' = 'drpeco',  'password' = 'DT@Stack#123',  'project-id' = '1',  'tenant-id' = '1');
-- 执行任务insert into catalog1.database2.table2 select * from catalog1.database1.table1

· 如果 Catalog 的 数据库和表都已经创建好了,那么直接写 insert 就可以提交任务;

· 不可以执行,可以提交;

· 支持语法校验。

140页深度干货,囊括15个典型成功案例,覆盖金融、集团、政务、制造、港口5大行业,全书从方法论到实践全面解码数据治理,开辟数据治理新范式,丰富内容可免费获取!

免费获取链接:https://fs80.cn/4w2atu

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs


同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群