博客 基于flinkcdc和superset的实时大屏实践

基于flinkcdc和superset的实时大屏实践

   数栈君   发表于 2023-11-13 14:02  173  0

准备环境:

1)EMR-Flink-Cluster3.36.1(HDFS2.8.5 YARN2.8.5 Flink1.12-vvr-3.0.2)

2)Rds-Mysql 5.7.26

3)EMR-Hadoop-Cluster4.9.0(SuperSet0.36.0)

方案架构:



测试EMR-Flink集群可用性:
对初始flink集群进行的配置:

1)Hdfs参数dfs.webhdfs.enabled置为true,以支持web访问hdfs(3版本默认支持);

2)Hdfs参数hadoop.http.authentication.simple.anonymous.allowed改为true,以支持匿名访问HDFS/YARN;

3)由于要使用EMR管理 flink程序,需要将jar包上传至oss并与oss做交互,需要使得flink集群联通oss,需另外进行如下配置:

1. deploy jindofs-sdk on cluster:

在emr-flink所有节点上将EMR-Hadoop集群(EMR-4.9.0)上/lib/b2jindosdk-current/lib/目录下的jindo-flink-sink(jindo-flink-sink-3.5.0.jar)和jindofs-sdk(jindofs-sdk-3.5.0.jar)放到/lib/flink-current/lib/下面


2. To enable smartdata after deploy jindofs-sdk on cluster,添加配置项{key:"fs.jfs.cache.oss.credentials.provider",value:"com.aliyun.emr.fs.auth.SimpleAliyunCredentialsProvider,com.aliyun.emr.fs.auth.EcsStsCredentialsProvider"}至core-site.xml,并创建目录/usr/lib/b2smartdata-current/conf/ 且在其下创建bigboot.cfg文件 (每台机器都要部署)

添加下面的内容:

[bigboot]

default.credential.provider=ECS_ROLE

执行wordcount程序:

在EMR数据开发界面新建flink job,执行命令如下:

run -m yarn-cluster -d ossref://shdr-cse-tools/diy005/TEST_EMR/TestJob/WordCount.jar --input oss://shdr-cse-tools/diy005/TEST_EMR/TestJob/wordcountinput --output oss://shdr-cse-tools/diy005/TEST_EMR/TestJob/wordcountoutput

并选择执行集群为EMR-Flink-Cluster3.36.1

运行结果:



运行成功,且oss中出现wordcount结果文件:


基于EMR-Flink集群执行flink-sql-cdc测试:
Sql-client引用的Jar包:

For-jdbc-connector:(source时为批处理)

1)flink-connector-jdbc_2.11-1.12.0.jar

2)mysql-connector-java-8.0.16.jar

For-mysql-cdc-connector(source时为流处理)

1)flink-connector-mysql-cdc-1.1.1.jar

注:jar包上传至/opt/apps/ecm/service/flink/1.12-vvr-3.0.2/package/flink-1.12-vvr-3.0.2/lib下

测试目标:

mysql-table-source 变更数据,实时统计其最新数据量上传至mysql-table-sink
启动flink-cluster:

由于emr执行flink任务是per-job on yarn的,因此flink-cluster上无长启flink集群存在,需要新建flink-cluster,方式有二:

1)bin/start-cluster.sh 启动flink-standalone集群;

2)yarn-session模式启动flink-on-yarn集群

采用方式2,执行命令如下:

yarn-session.sh -s 4 -jm 2048 -tm 4096 -nm test_dcu -d

启动sql-client客户端:

cd /opt/apps/ecm/service/flink/1.12-vvr-3.0.2/package/flink-1.12-vvr-3.0.2/

sql-client.sh embedded -s yarn-session -j lib/flink-connector-jdbc_2.11-1.12.0.jar -j lib/mysql-connector-java-8.0.16.jar -j lib/flink-connector-mysql-cdc-1.1.1.jar

使用mysql-cdc-connector创建流式的mysql-source:

首先在rds-mysql数据库中创建源表z_flinkcdc_test.mysql_users:

CREATE TABLE `mysql_users` (

`id` bigint(20) NOT NULL AUTO_INCREMENT,

`name` varchar(20) DEFAULT NULL,

`birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,

`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8

Sql-client中创建mysql-source表default.mysql_users:

CREATE TABLE mysql_users (

id BIGINT PRIMARY KEY NOT ENFORCED ,

name STRING,

birthday TIMESTAMP(3),

ts TIMESTAMP(3)

) WITH (

'connector' = 'mysql-cdc',

'hostname' = 'rm-uf6xxxxxxt6q.mysql.rds.aliyuncs.com',

'port' = '3306',

'username' = 'xxx_latest',

'password' = 'xxxxxx',

'server-time-zone' = 'Asia/Shanghai',

'database-name' = 'z_flinkcdc_test',

'table-name' = 'mysql_users'

);

使用jdbc创建mysql-sink:

首先在rds-mysql数据库中创建目标表z_flinkcdc_test.mysql_user_cnt:

CREATE TABLE `mysql_user_cnt` (

`cnt` bigint(20) NOT NULL,

`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP

) ENGINE=InnoDB DEFAULT CHARSET=utf8

Sql-client中创建mysql-sink表default.mysql_user_cnt:

CREATE TABLE mysql_user_cnt (

cnt BIGINT PRIMARY KEY NOT ENFORCED

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql:// rm-uf6xxxxxxt6q.mysql.rds.aliyuncs.com:3306/z_flinkcdc_test?useSSL=false&autoReconnect=true',

'driver' = 'com.mysql.cj.jdbc.Driver',

'table-name' = 'mysql_user_cnt',

'username' = 'xxx_latest',

'password' = 'xxxxxx',

'lookup.cache.max-rows' = '3000',

'lookup.cache.ttl' = '10s',

'lookup.max-retries' = '3'

);

创建flink-sql-cdc流式任务:

insert into mysql_user_cnt select count(1) as cnt from (select name from mysql_users group by name)s

任务提交并成功运行在flink-cluster:



由接收发送的record数可知,此cdc任务可同步历史数据:



验证增删改:

1)新增一不存在name new1:

insert into mysql_users (name) values ('new1')

查看目标表数据可知新增数据成功捕捉到:

MySQL [z_flinkcdc_test]> select * from mysql_user_cnt order by ts desc limit 1;

+-----+---------------------+

| cnt | ts |

+-----+---------------------+

| 16 | 2021-08-17 15:07:51 |

+-----+---------------------+

2)更改某原数据使其重复:

MySQL [z_flinkcdc_test]> select name from mysql_users where name like 'a%';

+------+

| name |

+------+

| a |

| a1 |

| a2 |

| a3 |

| a4 |

+------+

5 rows in set (0.00 sec)

MySQL [z_flinkcdc_test]> update mysql_users set name='a1' where name='a2';

查看目标表数据可知更新数据被成功捕捉到:

MySQL [z_flinkcdc_test]> select * from mysql_user_cnt order by ts desc limit 1;

+-----+---------------------+

| cnt | ts |

+-----+---------------------+

| 15 | 2021-08-17 15:12:36 |

+-----+---------------------+

3)删除new1:

delete from mysql_users where name='new1'

查看目标表数据可知数据删除事件被成功捕捉:

MySQL [z_flinkcdc_test]> select * from mysql_user_cnt order by ts desc limit 1;

+-----+---------------------+

| cnt | ts |

+-----+---------------------+

| 14 | 2021-08-17 15:17:27 |

+-----+---------------------+

使用SuperSet创建实时大屏:
为SuperSet创建数据源DB

1)Sources-> Databases-> Add a new record->

编写SQLAlchemy URI如下图,并点击Test Connection测试连通性:



其它选项默认即可
为SuperSet创建数据源Table

1)Sources-> Tables-> Add a new record->

如图选出目标表z_flinkcdc_test.mysql_user_cnt:



在SQL Lab中找出要展示的目标数据列



参考SQL:

select cnt,ts from mysql_user_cnt where date_format(ts,'%Y-%m-%d')=CURRENT_DATE() order by ts;

基于上面的Results生成Chart和DashBoard

点击上图中Explore跳转至如下Chart生成界面:



注:由此可知,上面的Results中一定要包含时间序列字段才可以使用折线图之类的Visualization Chart
基于目前的Chart创建DashBoard

点击上图中的+save按钮,保存Chart并存至DashBoard:

点击Save & go to dashboard,可以观察到我们生成的实时大屏:

V1:

ChangeLog:flink-sql-cdc-test done,next archive real-project needs

Time:2021-08-17 17:04:42
————————————————
版权声明:本文为CSDN博主「csdn_lan」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/csdn_lan/article/details/119760915

免责申明:


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack



0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群