博客 实践数据湖iceberg:通过spark3打开iceberg的认知之门

实践数据湖iceberg:通过spark3打开iceberg的认知之门

   数栈君   发表于 2023-03-31 16:10  378  0

摘要
安装spark3.2.0-bin-hadoop3.2.tgz 对应iceberg0.13.0 是目前社区最稳定的版本。(试过spark3.2.1不行)
测试spark操作iceberg增删改查以及时间旅游功能

1.安装spark3
安装前准备: hadoop已经安装,并配置HADOOP_HOME,HADOOP_CONF_DIR 到/etc/profile
spark官网准备安装包 spark-3.2.1-bin-hadoop3.2.tgz (这个版本spark-sql没问题,spark-shell有问题,建议使用spark-3.2.0-bin-hadoop3.2)
解压
启动
准备启动 spark-sql
命令说明: --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.0 会自动下载iceberg的包(只在第一次下载)
–conf 声明catalog

bin/spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop --conf spark.sql.catalog.local.warehouse=/tmp/iceberg/warehouse
1
执行效果:

[root@hadoop103 spark-3.2.1-bin-hadoop3.2]# bin/spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop --conf spark.sql.catalog.local.warehouse=/tmp/iceberg/warehouse
:: loading settings :: url = jar:file:/opt/software/spark-3.2.1-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.2_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-70be9d21-1481-4c47-95f9-4ac13aaf8782;1.0
confs: [default]
found org.apache.iceberg#iceberg-spark-runtime-3.2_2.12;0.13.0 in central
:: resolution report :: resolve 100ms :: artifacts dl 3ms
:: modules in use:
org.apache.iceberg#iceberg-spark-runtime-3.2_2.12;0.13.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-70be9d21-1481-4c47-95f9-4ac13aaf8782
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/4ms)
22/02/14 11:43:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/14 11:43:13 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/02/14 11:43:13 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
22/02/14 11:43:15 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
22/02/14 11:43:15 WARN metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore root@10.233.65.40
Spark master: local[*], Application Id: local-1644810191872
spark-sql>

2.测试iceberg增删改查
2.1 准备增删改查sql:
CREATE TABLE local.db.table (id bigint, data string) USING iceberg;

INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
SELECT count(1) as count, data FROM local.db.table GROUP BY data;
select * from local.db.table;

INSERT INTO local.db.table VALUES (4, 'd'), (5, 'e'), (6, 'f');
select * from local.db.table;
update local.db.table set data='apple' where id=1;
delete from local.db.table;
select * from local.db.table;
INSERT INTO local.db.table VALUES (7, 'g'), (8, 'h');
select * from local.db.table;


2.2 执行效果:
spark-sql> CREATE TABLE local.db.table (id bigint, data string) USING iceberg;
Time taken: 0.072 seconds
spark-sql> INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
Time taken: 0.233 seconds
spark-sql> SELECT count(1) as count, data FROM local.db.table GROUP BY data;
1 a
1 b
1 c
Time taken: 0.161 seconds, Fetched 3 row(s)
spark-sql> select * from local.db.table;
1 a
2 b
3 c
Time taken: 0.095 seconds, Fetched 3 row(s)
spark-sql> INSERT INTO local.db.table VALUES (4, 'd'), (5, 'e'), (6, 'f');
Time taken: 0.19 seconds
spark-sql> select * from local.db.table;
1 a
2 b
3 c
4 d
5 e
6 f
Time taken: 0.115 seconds, Fetched 6 row(s)
spark-sql> update local.db.table set data='apple' where id=1;
Time taken: 1.883 seconds
spark-sql> delete from local.db.table;
Time taken: 0.198 seconds
spark-sql> select * from local.db.table;
Time taken: 0.047 seconds
spark-sql> INSERT INTO local.db.table VALUES (7, 'g'), (8, 'h');
Time taken: 0.181 seconds
spark-sql> select * from local.db.table;
7 g
8 h
Time taken: 0.079 seconds, Fetched 2 row(s)

2.3 以上sql在hdfs创建的数据
[root@hadoop103 iceberg]# hadoop fs -ls /tmp/iceberg/warehouse/db/table/data
Found 9 items
-rw-r--r-- 2 root supergroup 643 2022-02-14 14:00 /tmp/iceberg/warehouse/db/table/data/00000-12-725752c2-020c-41a8-a636-4096e98c139b-00001.parquet
-rw-r--r-- 2 root supergroup 642 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/data/00000-17-7fd36618-5679-4768-8a98-e99b88192b64-00001.parquet
-rw-r--r-- 2 root supergroup 643 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/data/00000-223-e882ea95-6224-42b2-b449-d57f5ef061f4-00001.parquet
-rw-r--r-- 2 root supergroup 643 2022-02-14 14:00 /tmp/iceberg/warehouse/db/table/data/00001-13-b1a90028-e5ae-4564-be89-97fe5ba26e52-00001.parquet
-rw-r--r-- 2 root supergroup 643 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/data/00001-18-67012ef6-ff46-44a5-88ab-71e4d43ecdad-00001.parquet
-rw-r--r-- 2 root supergroup 643 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/data/00001-224-7b43bf9d-4437-4d62-beb6-f4f665f0b380-00001.parquet
-rw-r--r-- 2 root supergroup 643 2022-02-14 14:00 /tmp/iceberg/warehouse/db/table/data/00002-14-85ca6677-70c5-4b7b-a800-22d95e5489eb-00001.parquet
-rw-r--r-- 2 root supergroup 643 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/data/00002-19-14269447-20ed-4983-b66c-02983409ed5f-00001.parquet
-rw-r--r-- 2 root supergroup 686 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/data/00175-23-2d571fde-d1d0-4a30-b52f-b6c69ac9ecf3-00001.parquet
[root@hadoop103 iceberg]# hadoop fs -ls /tmp/iceberg/warehouse/db/table/metadata
Found 20 items
-rw-r--r-- 2 root supergroup 5824 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/metadata/2e735a5c-bd99-46f7-af49-0e26bc51ec2f-m0.avro
-rw-r--r-- 2 root supergroup 5778 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/metadata/2e735a5c-bd99-46f7-af49-0e26bc51ec2f-m1.avro
-rw-r--r-- 2 root supergroup 5866 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/metadata/2e735a5c-bd99-46f7-af49-0e26bc51ec2f-m2.avro
-rw-r--r-- 2 root supergroup 5825 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/metadata/771a8e19-a87e-489f-88d1-9480553237e9-m0.avro
-rw-r--r-- 2 root supergroup 5867 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/metadata/954eb317-6a86-413b-94a2-d59e25e294c6-m0.avro
-rw-r--r-- 2 root supergroup 5860 2022-02-14 14:00 /tmp/iceberg/warehouse/db/table/metadata/daeb600d-166d-4ab5-8e8c-899382c24038-m0.avro
-rw-r--r-- 2 root supergroup 5877 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/metadata/fae8df3a-78fa-43a6-838f-6783e58f04ec-m0.avro
-rw-r--r-- 2 root supergroup 5779 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/metadata/fae8df3a-78fa-43a6-838f-6783e58f04ec-m1.avro
-rw-r--r-- 2 root supergroup 3797 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/metadata/snap-1588410421234526207-1-2e735a5c-bd99-46f7-af49-0e26bc51ec2f.avro
-rw-r--r-- 2 root supergroup 3826 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/metadata/snap-3074595041692363385-1-954eb317-6a86-413b-94a2-d59e25e294c6.avro
-rw-r--r-- 2 root supergroup 3768 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/metadata/snap-4321345386411511567-1-771a8e19-a87e-489f-88d1-9480553237e9.avro
-rw-r--r-- 2 root supergroup 3848 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/metadata/snap-5972104378811544858-1-fae8df3a-78fa-43a6-838f-6783e58f04ec.avro
-rw-r--r-- 2 root supergroup 3754 2022-02-14 14:00 /tmp/iceberg/warehouse/db/table/metadata/snap-7801623062552576504-1-daeb600d-166d-4ab5-8e8c-899382c24038.avro
-rw-r--r-- 2 root supergroup 1168 2022-02-14 14:00 /tmp/iceberg/warehouse/db/table/metadata/v1.metadata.json
-rw-r--r-- 2 root supergroup 2070 2022-02-14 14:00 /tmp/iceberg/warehouse/db/table/metadata/v2.metadata.json
-rw-r--r-- 2 root supergroup 3006 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/metadata/v3.metadata.json
-rw-r--r-- 2 root supergroup 4045 2022-02-14 14:01 /tmp/iceberg/warehouse/db/table/metadata/v4.metadata.json
-rw-r--r-- 2 root supergroup 4984 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/metadata/v5.metadata.json
-rw-r--r-- 2 root supergroup 5920 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/metadata/v6.metadata.json
-rw-r--r-- 2 root supergroup 1 2022-02-14 14:02 /tmp/iceberg/warehouse/db/table/metadata/version-hint.text


3.快照管理
3.1 查看快照详细信息
查这个表所有的快照:
SELECT * FROM local.db.table.snapshots;

快照字段的意思:
desc local.db.table.snapshots;

spark-sql> SELECT * FROM local.db.table.snapshots;
2022-02-14 14:00:34.539 7801623062552576504 NULL append /tmp/iceberg/warehouse/db/table/metadata/snap-7801623062552576504-1-daeb600d-166d-4ab5-8e8c-899382c24038.avro {"added-data-files":"3","added-files-size":"1929","added-records":"3","changed-partition-count":"1","spark.app.id":"local-1644810838618","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"1929","total-position-deletes":"0","total-records":"3"}
2022-02-14 14:01:12.485 3074595041692363385 7801623062552576504 append /tmp/iceberg/warehouse/db/table/metadata/snap-3074595041692363385-1-954eb317-6a86-413b-94a2-d59e25e294c6.avro {"added-data-files":"3","added-files-size":"1928","added-records":"3","changed-partition-count":"1","spark.app.id":"local-1644810838618","total-data-files":"6","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"3857","total-position-deletes":"0","total-records":"6"}
2022-02-14 14:01:31.531 5972104378811544858 3074595041692363385 overwrite /tmp/iceberg/warehouse/db/table/metadata/snap-5972104378811544858-1-fae8df3a-78fa-43a6-838f-6783e58f04ec.avro {"added-data-files":"1","added-files-size":"686","added-records":"1","changed-partition-count":"1","deleted-data-files":"1","deleted-records":"1","removed-files-size":"643","spark.app.id":"local-1644810838618","total-data-files":"6","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"3900","total-position-deletes":"0","total-records":"6"}
2022-02-14 14:02:04.778 1588410421234526207 5972104378811544858 delete /tmp/iceberg/warehouse/db/table/metadata/snap-1588410421234526207-1-2e735a5c-bd99-46f7-af49-0e26bc51ec2f.avro {"changed-partition-count":"1","deleted-data-files":"6","deleted-records":"6","removed-files-size":"3900","spark.app.id":"local-1644810838618","total-data-files":"0","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"0","total-position-deletes":"0","total-records":"0"}
2022-02-14 14:02:47.404 4321345386411511567 1588410421234526207 append /tmp/iceberg/warehouse/db/table/metadata/snap-4321345386411511567-1-771a8e19-a87e-489f-88d1-9480553237e9.avro {"added-data-files":"2","added-files-size":"1286","added-records":"2","changed-partition-count":"1","spark.app.id":"local-1644810838618","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"1286","total-position-deletes":"0","total-records":"2"}
Time taken: 0.126 seconds, Fetched 5 row(s)
spark-sql> desc local.db.table.snapshots;
committed_at timestamp
snapshot_id bigint
parent_id bigint
operation string
manifest_list string
summary map<string,string>

3.2 查看快照对应的文件
当前表有哪些文件:

spark-sql> select * from local.db.table.files;
0 /tmp/iceberg/warehouse/db/table/data/00000-223-e882ea95-6224-42b2-b449-d57f5ef061f4-00001.parquet PARQUET 0 1 643 {1:46,2:48} {1:1,2:1} {1:0,2:0} {} {1:,2:g} {1:,2:g} NULL [4] NULL 0
0 /tmp/iceberg/warehouse/db/table/data/00001-224-7b43bf9d-4437-4d62-beb6-f4f665f0b380-00001.parquet PARQUET 0 1 643 {1:46,2:48} {1:1,2:1} {1:0,2:0} {} {1,2:h} {1,2:h} NULL [4] NULL 0
Time taken: 0.137 seconds, Fetched 2 row(s)

3.3 查快照的变更历史
spark-sql> select * from local.db.table.history;
2022-02-14 14:00:34.539 7801623062552576504 NULL true
2022-02-14 14:01:12.485 3074595041692363385 7801623062552576504 true
2022-02-14 14:01:31.531 5972104378811544858 3074595041692363385 true
2022-02-14 14:02:04.778 1588410421234526207 5972104378811544858 true
2022-02-14 14:02:47.404 4321345386411511567 1588410421234526207 true
Time taken: 0.08 seconds, Fetched 5 row(s)

3.4 时间旅游:根据快照id查看快照内容
3.4.1 又踩坑了
启动spark-shell,版本更改为 spark-3.2.0-bin-hadoop3.2,
spark-3.2.1-bin-hadoop3.2会报错

scala> spark.read.option("as-of-timestamp","7801623062552576504").format("iceberg").load("/tmp/iceberg/warehouse/db/table")
res0: org.apache.spark.sql.DataFrame = [id: bigint, data: string]

scala> res0.show
java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilterWithCardinalityCheck has interface org.apache.spark.sql.catalyst.plans.logical.BinaryNode as super class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
1
2
3
4
5
6
7
8
9
10
11
12
3.4.2 更换版本重跑
启动spark-shell:

[root@hadoop103 spark-3.2.0-bin-hadoop3.2]# bin/spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop --conf spark.sql.catalog.local.warehouse=/tmp/iceberg/warehouse
1
快照根据快照id读取,对应快照内容:

scala> spark.read.option("snapshot-id","7801623062552576504").format("iceberg").load("/tmp/iceberg/warehouse/db/table")
res4: org.apache.spark.sql.DataFrame = [id: bigint, data: string]

scala> spark.read.option("snapshot-id","7801623062552576504").format("iceberg").load("/tmp/iceberg/warehouse/db/table").show
+---+----+
| id|data|
+---+----+
| 1| a|
| 2| b|
| 3| c|
+---+----+


scala> spark.read.option("snapshot-id","3074595041692363385").format("iceberg").load("/tmp/iceberg/warehouse/db/table").show
+---+----+
| id|data|
+---+----+
| 1| a|
| 2| b|
| 3| c|
| 4| d|
| 5| e|
| 6| f|
+---+----+


scala> spark.read.option("snapshot-id","5972104378811544858").format("iceberg").load("/tmp/iceberg/warehouse/db/table").show
+---+-----+
| id| data|
+---+-----+
| 1|apple|
| 2| b|
| 3| c|
| 4| d|
| 5| e|
| 6| f|
+---+-----+


scala> spark.read.option("snapshot-id","1588410421234526207").format("iceberg").load("/tmp/iceberg/warehouse/db/table").show
+---+----+
| id|data|
+---+----+
+---+----+


scala> spark.read.option("snapshot-id","4321345386411511567").format("iceberg").load("/tmp/iceberg/warehouse/db/table").show
+---+----+
| id|data|
+---+----+
| 7| g|
| 8| h|
+---+----+


不提供快照id,默认读最新快照

scala> spark.read.format("iceberg").load("/tmp/iceberg/warehouse/db/table").show
+---+----+
| id|data|
+---+----+
| 7| g|
| 8| h|
+---+----+

3.5 查两个快照之间的增量数据
在option中指定start-snapshot-id,end-snapshot-id

scala> spark.read.format("iceberg").option("start-snapshot-id","7801623062552576504").option("end-snapshot-id","3074595041692363385").load("/tmp/iceberg/warehouse/db/table").show
+---+----+
| id|data|
+---+----+
| 4| d|
| 5| e|
| 6| f|
+---+----+

总结
对iceberg增删改查、快照有了初步认识

内容来源于网络,如侵删。


近日,袋鼠云重磅发布《数据治理行业实践白皮书》,白皮书基于袋鼠云在数据治理领域的8年深厚积累与实践服务经验,从专业视角逐步剖析数据治理难题,阐述数据治理的概念内涵、目标价值、实施路线、保障体系与平台工具,并借助行业实践案例解析,为广大读者提供一种数据治理新思路。

扫码下载《数据治理行业实践白皮书》,下载地址:https://fs80.cn/4w2atuhttp://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6afe448e466a8c9bcec59140a16b3aed..png



想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群