博客 实践数据湖iceberg:后台运行flink sql 增删改的效果

实践数据湖iceberg:后台运行flink sql 增删改的效果

   数栈君   发表于 2023-03-31 16:25  379  0

前言
代码中展示FLINK SQL 执行增删改的效果

一、JAVA 后台代码
1.代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSqlCDC {

public static void main(String[] args) throws Exception {
//TODO 1.准备环境
//1.1流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.2 表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String createSql = "CREATE TABLE stock_basic_source(\n" +
" `i` INT NOT NULL,\n" +
" `ts_code` CHAR(10) NOT NULL,\n" +
" `symbol` CHAR(10) NOT NULL,\n" +
" `name` char(10) NOT NULL,\n" +
" `area` CHAR(20) NOT NULL,\n" +
" `industry` CHAR(20) NOT NULL,\n" +
" `list_date` CHAR(10) NOT NULL,\n" +
" `actural_controller` CHAR(100),\n" +
" PRIMARY KEY(i) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'hadoop103',\n" +
" 'port' = '3306',\n" +
" 'username' = 'hive',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'xxzh_stock',\n" +
" 'table-name' = 'stock_basic'\n" +
")" ;
//TODO 2.创建动态表
tableEnv.executeSql(createSql);

tableEnv.executeSql("select * from stock_basic_source").print();

//TODO 6.执行任务
env.execute();
}

}


2.pom
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<artifactId>flink-iceberg-learning</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>

<modelVersion>4.0.0</modelVersion>

<name>flink-iceberg-learning</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>

<properties>
<!-- project compiler -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- maven compiler-->
<scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version>
<!-- sdk -->
<java.version>1.8</java.version>
<scala.version>2.12.12</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<!-- engine-->
<hadoop.version>2.7.2</hadoop.version>
<flink.version>1.13.5</flink.version>
<flink.cdc.version>2.0.2</flink.cdc.version>
<iceberg.version>0.12.1</iceberg.version>
<hive.version>2.3.6</hive.version>
<!-- <scope.type>provided</scope.type>-->
<scope.type>compile</scope.type>
</properties>
<dependencies>
<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}
</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}
</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- <= 1.13 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}
</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- 1.14 -->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-planner_${scala.binary.version}
</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- <scope>${scope.type}</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}
</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.binary.version}
</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}
</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- iceberg Dependency -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>${iceberg.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- hadoop Dependency-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!-- hive Dependency-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>${scope.type}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-tez</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-api-jdo</artifactId>
<version>4.2.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${scala.maven.plugin.version}</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven.assembly.plugin.version}</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


二、运行效果
1.启动,初始化数据
+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op | i | ts_code | symbol | name | area | industry | list_date | actural_controller |
+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 2 | 000004.SZ | 000004 | 国华网安 | 深圳 | 软件服务 | 19910114 | 三体人 |
| +I | 1 | 000002.SZ | 000002 | 万科A | 深圳 | 全国地产 | 19910129 | 星星之火!!! |
| +I | 4 | 000006.SZ | 000006 | 深振业A | 深圳 | 区域地产 | 19920427 | 深圳市人民政府国有资产监督... |
| +I | 3 | 000005.SZ | 000005 | ST星源 | 深圳 | 环境保护 | 19901210 | 郑列列,丁芃 |
| +I | 6 | 000008.SZ | 000008 | 神州高铁 | 北京 | 运输设备 | 19920507 | 国家开发投资集团有限公司 |
| +I | 5 | 000007.SZ | 000007 | *ST全新 | 深圳 | 酒店餐饮 | 19920413 | (NULL) |
| +I | 7 | 000009.SZ | 000009 | 中国宝安 | 深圳 | 电气设备 | 19910625 | (NULL) |
| +I | 0 | 000001.SZ | 000001 | 平安银行 | 深圳 | 银行 | 19910403 | (NULL) |

2.新增数据
INSERT INTO `stock_basic` VALUES ('8', '000010.SZ', '000010', '美丽生态', '深圳', '建筑工程', '19951027', '沈玉兴');
INSERT INTO `stock_basic` VALUES ('9', '000011.SZ', '000011', '深物业A', '深圳', '区域地产', '19920330', '深圳市人民政府国有资产监督管理委员会');
1
2
控制台输出:

| +I | 8 | 000010.SZ | 000010 | 美丽生态 | 深圳 | 建筑工程 | 19951027 | 沈玉兴 |
| +I | 9 | 000011.SZ | 000011 | 深物业A | 深圳 | 区域地产 | 19920330 | 深圳市人民政府国有资产监督... |
1
2
该处使用的url网络请求的数据。

3.修改数据
把id=0的remark修改为11111,运行效果如下:

-U 表示修改前, +U表示修改后。

| -U | 0 | 000001.SZ | 000001 | 平安银行 | 深圳 | 银行 | 19910403 | (NULL) |
| +U | 0 | 000001.SZ | 000001 | 平安银行 | 深圳 | 银行 | 19910403 | 11111 |

1
2
3
4.删除数据
把id=9的进行删除,运行效果如下:

| -D | 9 | 000011.SZ | 000011 | 深物业A | 深圳 | 区域地产 | 19920330 | 深圳市人民政府国有资产监督... |
1
总结
flink sql 在java代码中体现增删改的运行效果


内容来源于网络,如侵删。


近日,袋鼠云重磅发布《数据治理行业实践白皮书》,白皮书基于袋鼠云在数据治理领域的8年深厚积累与实践服务经验,从专业视角逐步剖析数据治理难题,阐述数据治理的概念内涵、目标价值、实施路线、保障体系与平台工具,并借助行业实践案例解析,为广大读者提供一种数据治理新思路。

扫码下载《数据治理行业实践白皮书》,下载地址:https://fs80.cn/4w2atuhttp://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/3221a0b7e7f2f57078dcbebcf1e35f1e..png




想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群