CREATE TABLE kafka_flinkSqlTest(
f1 STRING,
f2 STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dataflowcz',
'properties.bootstrap.servers' = '172.18.26.218:9092',
'properties.group.id' = 'testGroup3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
create catalog myhive with (
'type' = 'hive',
'hive-conf-dir' = 'hdfs://lab-cdh-ylxx0:8020/bdops'
);
use catalog myhive;
-- 切换到hive方言
SET 'table.sql-dialect' = 'hive';
create table IF NOT EXISTS cmdata.test1 (
f1 string,
f2 string
) partitioned by (dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LINES TERMINATED BY '\n' stored as TEXTFILE tblproperties(
'sink.partition-commit.policy.kind'='metastore,success-file'
);
insert into cmdata.test1
select
f1
,f2
,'20230228'
from default_catalog.default_database.kafka_flinkSqlTest;
./flink stop cc1b4809b520d9f85e6f70a6613aeda3 -yid application_1676268563167_0325 -p hdfs://lab-cdh-ylxx0:8020/flink/taskname/savepoint
set '$internal.pipeline.job-id' = 'cc1b4809b520d9f85e6f70a6613aeda3'; --设置作业id
set 'execution.savepoint.path' = 'hdfs://lab-cdh-ylxx0:8020/flink/taskname/savepoint/savepoint-cc1b48-0c6ecbef25f9';-- 设置恢复路径
set 'execution.savepoint.ignore-unclaimed-state' = 'true'; --允许跳过无法还原的保存点状态
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack