本文介绍如何通过阿里云实时计算Flink版实时读写云原生数据仓库AnalyticDB PostgreSQL版数据。
说明
如果您通过公网访问,请添加公网IP至白名单。--创建名称为adbpg_dim_table的表。
CREATE TABLE adbpg_dim_table(
id int,
username text,
PRIMARY KEY(id)
);
--向adbpg_dim_table的表中插入50行数据,其中id字段的值为从1到50的整数,而username字段的值为username字符串后面跟随当前行数的文本表示。
INSERT INTO adbpg_dim_table(id, username)
SELECT i, 'username'||i::text
FROM generate_series(1, 50) AS t(i);
您可以使用select * from adbpg_dim_table order by id;语句查看插入后的数据。
创建一张名为adbpg_sink_table的表,用于Flink写入结果数据。
CREATE TABLE adbpg_sink_table(
id int,
username text,
score int
);
作业参数 | 说明 | 示例 |
---|---|---|
文件名称 | 作业的名称。 说明 | adbpg-test |
存储位置 | 指定该作业的代码文件所属的文件夹。 您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。 | 作业草稿 |
引擎版本 | 当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。 | vvr-8.0.1-flink-1.17 |
单击创建。
---创建一个datagen源表。本示例中无需修改WITH参数。
CREATE TEMPORARY TABLE datagen_source (
id INT,
score INT
) WITH (
'connector' = 'datagen',
'fields.id.kind'='sequence',
'fields.id.start'='1',
'fields.id.end'='50',
'fields.score.kind'='random',
'fields.score.min'='70',
'fields.score.max'='100'
);
--创建adbpg维表。需根据您的实际情况修改WITH参数。
CREATE TEMPORARY TABLE dim_adbpg(
id int,
username varchar,
PRIMARY KEY(id) not ENFORCED
) WITH(
'connector' = 'adbpg',
'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
'tablename' = 'adbpg_dim_table',
'username' = 'flinktest',
'password' = '${secret_values.adb_password}',
'maxRetryTimes'='2', --写入数据失败后,重试写入的最大次数。
'cache'='lru', --缓存策略,
'cacheSize'='100' --缓存大小
);
--创建adbpg结果表。需根据您的实际情况修改WITH参数。
CREATE TEMPORARY TABLE sink_adbpg (
id int,
username varchar,
score int
) WITH (
'connector' = 'adbpg',
'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
'tablename' = 'adbpg_sink_table',
'username' = 'flinktest',
'password' = '${secret_values.adb_password}',
'maxRetryTimes' = '2',
'conflictMode' = 'ignore',--当Insert写入出现主键冲突或者唯一索引冲突时的处理策略。
'retryWaitTime' = '200' --重试的时间间隔。
);
--维表和源表join后的结果插入adbpg结果表。
INSERT INTO sink_adbpg
SELECT ts.id,ts.username,ds.score
FROM datagen_source AS ds
JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
on ds.id = ts.id;
根据实际情况修改参数。
本示例中无需修改datagen源表。您需要根据实际情况修改adbpg维表和结果表参数,具体说明如下。涉及的连接器更多相关参数和类型映射请参见相关文档。
参数 | 是否必填 | 说明 |
---|---|---|
url | 是 | AnalyticDB PostgreSQL版的JDBC连接地址。格式为jdbc:postgresql://<地址>:<端口>/<连接的数据库名称>。您可在云原生数据仓库 AnalyticDB PostgreSQL版控制台对应实例的数据库连接页面查看。 |
tablename | 是 | AnalyticDB PostgreSQL版的表名。 |
username | 是 | AnalyticDB PostgreSQL版的数据库账号。 |
password | 是 | AnalyticDB PostgreSQL版的数据库账号密码。 |
targetSchema | 否 | Schema名称。默认为public。如果您使用了对应数据库下其他Schema,请填写此参数。 |
在作业开发页面顶部,单击深度检查,进行语法检查。
单击部署。
在运维中心 > 作业运维页面,单击目标作业操作列下的启动。
————————————————
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/segwy/article/details/142653490
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack