博客 Spark SQL概述、数据抽象以及应用

Spark SQL概述、数据抽象以及应用

   数栈君   发表于 2024-01-12 11:03  338  0

1 Spark SQL 概述
Hive 是将 SQL 转为 MapReduce。

SparkSQL 可以理解成是将 SQL 解析成:“RDD + 优化” 再执行
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f320e3de0414b4df20484184fe6709ec..png
  
在学习Spark SQL前,需要了解数据分类。

2 数据分类
数据分为如下几类:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/05ea58fe19a4f519d0c3e297a33bef86..png

总结:

RDD 主要用于处理非结构化数据 、半结构化数据、结构化;
SparkSQL 是一个既支持 SQL 又支持命令式数据处理的工具;
SparkSQL 主要用于处理结构化数据(较为规范的半结构化数据也可以处理)。
3 Spark SQL 数据抽象
3.1 DataFrame 和 DataSet
Spark SQL数据抽象可以分为两类:

① DataFrame:DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格,带有 Schema 元信息(可以理解为数据库的列名和类型)。DataFrame = RDD + 泛型 + SQL 的操作 + 优化

② DataSet:DataSet是DataFrame的进一步发展,它比RDD保存了更多的描述信息,概念上等同于关系型数据库中的二维表,它保存了类型信息,是强类型的,提供了编译时类型检查。调用 Dataset 的方法先会生成逻辑计划,然后被 spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行!DataFrame = Dateset[Row]

RDD、DataFrame、DataSet的关系如下:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/28e9ffbcf2a21ef219dfd5bc1c41f646..png
  

RDD[Person]:以 Person 为类型参数,但不了解其内部结构。

DataFrame:提供了详细的结构信息 schema 列的名称和类型。这样看起来就像一张表了。

DataSet[Person]:不光有 schema 信息,还有类型信息。

3.2 举例
假设 RDD 中的两行数据长这样:

RDD[Person]:
1
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/34afe870c594e7a22f2a62e0da725eb5..png
  
那么 DataFrame 中的数据长这样

DataFrame = RDD[Person] - 泛型 + Schema + SQL 操作 + 优化
1
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/b4182e1498bcee141201a8041e35bc46..png
  
那么 Dataset 中的数据长这样:

Dataset[Person] = DataFrame + 泛型:
1
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/82ab7b05b64034b626452e9bf3ec338f..png
  
Dataset 也可能长这样:Dataset[Row]:

即 DataFrame = DataSet[Row]:
1
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/349b5bbc5285dc8117e736c954e7a913..png
  

总结:

DataFrame = RDD - 泛型 + Schema + SQL + 优化
DataSet = DataFrame + 泛型
DataSet = RDD + Schema + SQL + 优化
4 Spark SQL 应用
4.1 创建 DataFrame/DataSet
方式一:读取本地文件

① 在本地创建一个文件,有 id、name、age 三列,用空格分隔,然后上传到 hdfs 上。

vim /root/person.txt
1
内容如下:

1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
1
2
3
4
5
6
② 打开 spark-shell

spark/bin/spark-shell

##创建 RDD

val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]
1
2
3
4
5
③ 定义 case class(相当于表的 schema)

case class Person(id:Int, name:String, age:Int)
1
④ 将 RDD 和 case class 关联

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]
1
⑤ 将 RDD 转换成 DataFrame

val personDF = personRDD.toDF //DataFrame
1
⑥ 查看数据和 schema

personDF.show
1
⑦ 注册表

personDF.createOrReplaceTempView("t_person")
1
⑧ 执行 SQL

spark.sql("select id,name from t_person where id > 3").show
1
⑨ 也可以通过 SparkSession 构建 DataFrame

val dataFrame=spark.read.text("hdfs://node1:8020/person.txt")
dataFrame.show //注意:直接读取的文本文件没有完整schema信息
dataFrame.printSchema
1
2
3
方式二:读取 json 文件

val jsonDF= spark.read.json("file:///resources/people.json")
1
接下来就可以使用 DataFrame 的函数操作

jsonDF.show
1
注意:直接读取 json 文件有schema 信息,因为json文件本身含有Schema信息,SparkSQL 可以自动解析。

方式三:读取 parquet 文件

val parquetDF=spark.read.parquet("file:///resources/users.parquet")
1
接下来就可以使用 DataFrame 的函数操作

parquetDF.show
1
注意:直接读取 parquet 文件有 schema 信息,因为 parquet 文件中保存了列的信息。

4.2 两种查询风格:DSL 和 SQL
DSL风格示例:

personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show
1
2
3
4
SQL 风格示例:

spark.sql("select * from t_person").show
1
总结:

DataFrame 和 DataSet 都可以通过RDD来进行创建;
也可以通过读取普通文本创建–注意:直接读取没有完整的约束,需要通过 RDD+Schema;
通过 josn/parquet 会有完整的约束;
不管是 DataFrame 还是 DataSet 都可以注册成表,之后就可以使用 SQL 进行查询了! 也可以使用 DSL!
3.4.3 Spark SQL 多数据源交互
读取 json 文件:

spark.read.json("D:\\data\\output\\json").show()
1
读取 csv 文件:

spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()
1
读取 parquet 文件:

spark.read.parquet("D:\\data\\output\\parquet").show()
1
读取 mysql 表:

val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
1
2
3
4
5
写入 json 文件:

personDF.write.json("D:\\data\\output\\json")
1
写入 csv 文件:

personDF.write.csv("D:\\data\\output\\csv")
1
写入 parquet 文件:

personDF.write.parquet("D:\\data\\output\\parquet")
1
写入 mysql 表:

val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
————————————————
版权声明:本文为CSDN博主「杨林伟」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_20042935/article/details/125536640

免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!


《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack  
0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群