博客 Hudi集成Spark

Hudi集成Spark

   数栈君   发表于 2024-07-31 15:27  423  0

环境准备


安装Spark

1)Hudi支持的Spark版本

Hudi

Supported Spark 3 version

0.12.x

3.3.x3.2.x3.1.x

0.11.x

3.2.xdefault build, Spark bundle only),3.1.x

0.10.x

3.1.x(default build), 3.0.x

0.7.0-0.9.0

3.0.x

0.6.0 and prior

Not supported

2)下载Spark并安装配置好
Bash
# 拷贝编译好的包到spark的jars目录

cp /opt/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar /opt/spark-3.2.2/jars

# 不自己编译,去maven里面下载对应版本的jar包放到spark的jars目录下也可以
https://search.maven.org/artifact/org.apache.hudi/hudi-spark3.3-bundle_2.12/0.13.1/jar


Spark SQL方式


创建表

1)启动spark-sql

Bash
spark-sql \ 
--master yarn --deploy-mode client \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a2f0aaec96f1208b0b5e6e92f4c473e7..png

2)创建分区表

Bash
# 创建一个cow分区外部表,指定primaryKey和preCombineField

create table spark_hudi (
id int, name string,price double, ts bigint
) using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts');

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/41e719c9c953a60a63a78139f6dc3631..png



3)向分区表插入数据

Bash
# 默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。

insert into spark_hudi select 1, 'a1', 20, 1000;

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/297d2ebd057f36df4aa8f2dc1ea2a6cc..png

4)时间旅行查询

Bash
# 修改id为1的数据

insert into spark_hudi select 1, 'a1_1', 20,1000;

# 再基于第一次提交时间进行时间旅行查询

select * from spark_hudi timestamp as of '20231126202835692' where id = 1;

# 再次查询会发现查询结果为第一次插入的数据

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/8eca9d256d13d755992dbf7d66238a6a..png

5update

Bash
# 更新操作需要指定preCombineField

update spark_hudi set price = price * 2, ts = 1111 where id = 1;

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9491fea2f95e534dca1439d3ab19cb5c..png

6执行mergeinto

Bash
# 准备source表并插入数据

create table merge_source (
id int, name string, price double, ts bigint
) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');

insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into spark_hudi
as target using merge_source as source
on target.id = source.id
when matched then update
set * when not matched then insert *;

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/35b07f02978e8e76d253f2a1ee0f2d40..png

7)执行delete

Bash
delete from spark_hudi where id = 1;

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a76aa03f15033e4f868e650e0e36a654..png

8)执行bulk_insert

Bash
set hoodie.sql.bulk.insert.enable=true; 
set hoodie.sql.insert.mode=non-strict;
insert into spark_hudi select 2, 'a1_2', 20, 1002;

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/61db4287bdd23cae703c53fdd55c2970..png


《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
上一篇:HBase Shell操作
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群