博客 Flink创建Hudi的Sink动态表

Flink创建Hudi的Sink动态表

   数栈君   发表于 2023-08-23 09:49  586  0

工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 。

createDynamicTableSink

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
        checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
            "Option [path] should not be empty.");
        setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        sanityCheck(conf, schema);
        setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
        setupSortOptions(conf, context.getConfiguration());
        return new HoodieTableSink(conf, schema);
    }

createDynamicTableSource

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
        Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
            new ValidationException("Option [path] should not be empty.")));
        setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
        return new HoodieTableSource(
            schema,
            path,
            context.getCatalogTable().getPartitionKeys(),
            conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
            conf);
    }

创建Sink表过程

1、检查是否设置了 path 选项(checkArgument),没有的话抛异常“Option [path] should not be empty.”。

2、做兼容性设置(setupTableOptions):

2.1、如果设置了 hoodie.table.recordkey.fields,但没有设置 hoodie.datasource.write.recordkey.field,则将 hoodie.datasource.write.recordkey.field 的值设置为 hoodie.table.recordkey.fields 的值;

2.2、如果设置了 hoodie.table.precombine.field,但没有设置 precombine.field,则将 precombine.field 的值设置为 hoodie.table.precombine.field 的值;

2.3、如果设置了 hoodie.datasource.write.hive_style_partitioning,但没有设置 hoodie.datasource.write.hive_style_partitioning,则将 hoodie.datasource.write.hive_style_partitioning 的值设置为 hoodie.datasource.write.hive_style_partitioning 的值。

3、必要选项检查:

3.1、检查表的类型(checkTableType),如果 table.type 的值为空,则不做处理,否则必须为 COPY_ON_WRITE 或者 MERGE_ON_READ,不然抛异常Invalid table type: TABLETYPE . Table type should be either MERGE_ON_READ or COPY_ON_WRITE.“;

3.2、如果为非 Append 模式,则检查是否设置了 hoodie.datasource.write.recordkey.field 和 precombine.field。

4、依次设置:

4.1、表名(hoodie.table.name);

4.2、主键(hoodie.datasource.write.recordkey.field);

4.3、分区(hoodie.datasource.write.partitionpath.field);

4.4、如果是 index 类型为 BUCKET,则设置桶(bucket)的键 hoodie.bucket.index.hash.field;

4.4.1、如果还没有设置 hoodie.bucket.index.hash.field,则使用 hoodie.datasource.write.recordkey.field 的值作为 hoodie.bucket.index.hash.field 的值;
4.4.2、否则进一步检查 hoodie.bucket.index.hash.field 的值是否为 hoodie.datasource.write.recordkey.field 值的子集。假设 hoodie.datasource.write.recordkey.field 值为“ds,dh”,则 hoodie.bucket.index.hash.field 值可以为“ds”、“dh”或“ds,dh”。
4.5、设置压缩选项:
4.5.1、设置 archive.min_commits,
4.5.1、设置 archive.max_commits。
4.6、设置Hive选项:
4.6.1、如果没有设置 hive_sync.db,则设置 hive_sync.db;
4.6.2、如果没有设置 hive_sync.table,则设置 hive_sync.table。
4.7、设置read选项,如果不是增量查询则什么也不做;否则设置 hoodie.datasource.query.type 值为 incremental 。
4.8、设置write选项:如果 write.operation 为默认值且为 cow 表,则实则 write.precombine 为 true 。
4.9、如果没有设置 source.avro-schema.path 和 source.avro-schema,则设置 source.avro-schema 。

5、设置排序选项(flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java):

5.1、设置 Flink 的 table.exec.sort.max-num-file-handles

5.2、设置 Flink 的 table.exec.spill-compression.enabled

5.3、设置 Flink 的 table.exec.spill-compression.block-size

5.4、设置 Flink 的 table.exec.sort.async-merge-enabled

Append 模式

write.operation 值为 insert,并且为 mor 表;或则为 cow 表,但是 write.insert.cluster 值为 false。

    ●write.insert.cluster

该选项用于控制是否在写入时合并小文件,仅对 cow 类型表有效,默认为 false。如果设置为 true,则每次写入前先合并小文件,这会降低写吞吐量,但可提高读性能。



免责申明:


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群