博客 flink 从savepoint、checkpoint中恢复数据

flink 从savepoint、checkpoint中恢复数据

   数栈君   发表于 2023-09-26 09:55  1740  0

前言

提示:flink checkpoint重启:

  flink作业因为故障导致restart strategy失败或升级flink版本重新发布任务,这时就需要从最近的checkpoint恢复。一般而言有两种方案,第一种方案是开启checkpoint且任务取消时不删除checkpoint(调整参数execution.checkpointing.externalized-checkpoint-retention),第二种方案是定时触发savepoint(编写代码调用flink rest api)。

一、savepoint是什么?

   checkpoint的生命周期由flink来管理,flink负责checkpoint的创建、维护和释放,过程中没有与用户交互。与checkpoint不同,savepoint则由用户来创建、维护和删除的,savepoint是事先规划好的、手动备份并用于恢复。
  Savepoint 由两部分组成:稳定存储(例如 HDFS…) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(相对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。

二、如何从savepoint/checkpoint中恢复数据

  savepoint/checkpoint恢复逻辑是一致的,即保存一份快照数据,重启作业时从快照数据重启

1.flink sql流作业

从kafka消费数据写入hive分区表中,采用的flink on yarn模式

CREATE TABLE kafka_flinkSqlTest(
    f1 STRING,
    f2 STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'dataflowcz',
    'properties.bootstrap.servers' = '172.18.26.218:9092',
    'properties.group.id' = 'testGroup3',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);



create catalog myhive with (
    'type' = 'hive',
    'hive-conf-dir' = 'hdfs://lab-cdh-ylxx0:8020/bdops'
);

use catalog myhive;
-- 切换到hive方言
SET 'table.sql-dialect' = 'hive';
create table IF NOT EXISTS cmdata.test1 (
    f1 string,
    f2 string
) partitioned by (dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LINES TERMINATED BY '\n' stored as TEXTFILE tblproperties(
'sink.partition-commit.policy.kind'='metastore,success-file'
);

insert into cmdata.test1
select
    f1
   ,f2
,'20230228'
from default_catalog.default_database.kafka_flinkSqlTest;

2.查看flink web ui
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/59c8713cb8142510489e12196d147605..png
  
job id为cc1b4809b520d9f85e6f70a6613aeda3
yarn application id 为application_1676268563167_0325

3.手动停止作业并设置savepoint

./flink stop cc1b4809b520d9f85e6f70a6613aeda3 -yid application_1676268563167_0325 -p hdfs://lab-cdh-ylxx0:8020/flink/taskname/savepoint

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d519d316a3a12836d4ff44430b1530aa..png
  

4.查看生成的文件

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/61473aad90a4b902f1428ce9fad27886..png
  
元数据文件(_metadata)

5.添加参数,进行作业恢复

set '$internal.pipeline.job-id' = 'cc1b4809b520d9f85e6f70a6613aeda3'; --设置作业id
set 'execution.savepoint.path' = 'hdfs://lab-cdh-ylxx0:8020/flink/taskname/savepoint/savepoint-cc1b48-0c6ecbef25f9';-- 设置恢复路径
set 'execution.savepoint.ignore-unclaimed-state' = 'true'; --允许跳过无法还原的保存点状态
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/05717d310f749ea3759140bf3226cf60..png
  
查看flink web ui:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/14c3cddea9900d7c19bb93fd24103765..png






免责申明:


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群