博客 Flink 检查点配置

Flink 检查点配置

   数栈君   发表于 2023-08-21 10:16  819  0

启用检查点

开启自动保存快照 (默认:关闭) :

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每隔 1 秒启动一次检查点保存
env.enableCheckpointing(1000);

间隔调整 :

    ●对性能的影响更小,就调大间隔时间

    ●为了更好的容错性,就以调小间隔时间

检查点存储

检查点存储 (CheckpointStorage) : 持久化存储位置

    ●JobManager 的堆内存 (JobManagerCheckpointStorage) : 默认

    ●文件系统 (FileSystemCheckpointStorage) : 常用 , (HDFS , S3)

// 配置存储检查点到 JobManager 堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));

通用增量

Rocksdb 状态后端 : 启用增量 checkpoint

    ●Flink 1.15 后 , HashMap , Rocksdb 都能开启通用的增量 checkpoint
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);

增量 checkpoint 过程 :

    1.带状态的算子任务 , 将状态更改 , 写入变更日志(记录状态)

    2.状态物化:状态表定期保存,独立于检查点

    3.状态物化完成后,状态变更日志 , 就截断到相应的点

注意点 :

HDFS : 文件数变多
    ●上传变更日志 : IO 宽带较大

    ●序列化状态变更 : CPU 消耗较大

    ●缓存状态变更 : TaskManager 内存消耗较大

    ●Checkpint 最大并发 = 1

    ●Flink 1.15 , Memory 测试阶段

    ●不支持 NO_ClAIM 模式

配置文件 :

state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem

# 存储 changelog 数据
dstl.dfs.base-path: hdfs://hadoop102:8020/changelog
execution.checkpointing.max-concurrent-checkpoints: 1
execution.savepoint-restore-mode: CLAIM

代码配置 :

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-changelog</artifactId>
    <version>${flink.version}</version>
    <scope>runtime</scope>
</dependency>

// 开启changelog:
env.enableChangelogStateBackend(true);

最终检查点

当有界数据 , 部分Task 完成 , Flink 1.14 后 , 它们依然能进行检查点

禁用 (Flink 1.15 后, 默认启用) :

Configuration config = new Configuration();
// 禁用最终检查点
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

配置建议

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 获取所有配置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();

// 检查点模式 (CheckpointingMode) :
// 精确一次 : exactly-once (默认)
// 至少一次 : at-least-once (效率更高)
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 最大并发检查点数量(maxConcurrentCheckpoints):
// 检查点最多有多少个
checkpointConfig.setMaxConcurrentCheckpoints(1)

// 启用非对齐的检查点保存
// 限制: CheckpointingMode= exctly-once , 并发的检查点 = 1
checkpointConfig.enableUnalignedCheckpoints();

// 默认: 0: 用非对齐的检查点
// > 0: 用 对齐的检查点(barrier对齐)
// 当对齐时间 > 阈值, 为: 非对齐检查点(barrier非对齐)
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));

// 超时时间 (checkpointTimeout) :
// 检查点保存的超时时间,当超时就丢弃
// 单位 : 长整型毫秒数
checkpointConfig.setCheckpointTimeout(60000)

//最小间隔时间 (minPauseBetweenCheckpoints):
// 上个 checkpoint 完成后, 最快多久触发另个 checkpoint
checkpointConfig.setMinPauseBetweenCheckpoints(500)

// 开启检查点的外部持久化
// DELETE_ON_CANCELLATION: 作业取消时, 自动删除外部检查点,但作业失败退出,就保留检查点
// RETAIN_ON_CANCELLATION:作业取消时, 也保留外部检查点
checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)


// 检查点异常时, 是否整个任务失败
// true : 失败提出
// false: 丢弃, 并继续运行
checkpointConfig.setFailOnCheckpointingErrors(true)


免责申明:


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群