1 Spark SQL 概述
Hive 是将 SQL 转为 MapReduce。
SparkSQL 可以理解成是将 SQL 解析成:“RDD + 优化” 再执行
在学习Spark SQL前,需要了解数据分类。
2 数据分类
数据分为如下几类:
总结:
RDD 主要用于处理非结构化数据 、半结构化数据、结构化;
SparkSQL 是一个既支持 SQL 又支持命令式数据处理的工具;
SparkSQL 主要用于处理结构化数据(较为规范的半结构化数据也可以处理)。
3 Spark SQL 数据抽象
3.1 DataFrame 和 DataSet
Spark SQL数据抽象可以分为两类:
① DataFrame:DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格,带有 Schema 元信息(可以理解为数据库的列名和类型)。DataFrame = RDD + 泛型 + SQL 的操作 + 优化
② DataSet:DataSet是DataFrame的进一步发展,它比RDD保存了更多的描述信息,概念上等同于关系型数据库中的二维表,它保存了类型信息,是强类型的,提供了编译时类型检查。调用 Dataset 的方法先会生成逻辑计划,然后被 spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行!DataFrame = Dateset[Row]
RDD、DataFrame、DataSet的关系如下:
RDD[Person]:以 Person 为类型参数,但不了解其内部结构。
DataFrame:提供了详细的结构信息 schema 列的名称和类型。这样看起来就像一张表了。
DataSet[Person]:不光有 schema 信息,还有类型信息。
3.2 举例
假设 RDD 中的两行数据长这样:
RDD[Person]:
1
那么 DataFrame 中的数据长这样
DataFrame = RDD[Person] - 泛型 + Schema + SQL 操作 + 优化
1
那么 Dataset 中的数据长这样:
Dataset[Person] = DataFrame + 泛型:
1
Dataset 也可能长这样:Dataset[Row]:
即 DataFrame = DataSet[Row]:
1
总结:
DataFrame = RDD - 泛型 + Schema + SQL + 优化
DataSet = DataFrame + 泛型
DataSet = RDD + Schema + SQL + 优化
4 Spark SQL 应用
4.1 创建 DataFrame/DataSet
方式一:读取本地文件
① 在本地创建一个文件,有 id、name、age 三列,用空格分隔,然后上传到 hdfs 上。
vim /root/person.txt
1
内容如下:
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
1
2
3
4
5
6
② 打开 spark-shell
spark/bin/spark-shell
##创建 RDD
val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]
1
2
3
4
5
③ 定义 case class(相当于表的 schema)
case class Person(id:Int, name:String, age:Int)
1
④ 将 RDD 和 case class 关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]
1
⑤ 将 RDD 转换成 DataFrame
val personDF = personRDD.toDF //DataFrame
1
⑥ 查看数据和 schema
personDF.show
1
⑦ 注册表
personDF.createOrReplaceTempView("t_person")
1
⑧ 执行 SQL
spark.sql("select id,name from t_person where id > 3").show
1
⑨ 也可以通过 SparkSession 构建 DataFrame
val dataFrame=spark.read.text("hdfs://node1:8020/person.txt")
dataFrame.show //注意:直接读取的文本文件没有完整schema信息
dataFrame.printSchema
1
2
3
方式二:读取 json 文件
val jsonDF= spark.read.json("file:///resources/people.json")
1
接下来就可以使用 DataFrame 的函数操作
jsonDF.show
1
注意:直接读取 json 文件有schema 信息,因为json文件本身含有Schema信息,SparkSQL 可以自动解析。
方式三:读取 parquet 文件
val parquetDF=spark.read.parquet("file:///resources/users.parquet")
1
接下来就可以使用 DataFrame 的函数操作
parquetDF.show
1
注意:直接读取 parquet 文件有 schema 信息,因为 parquet 文件中保存了列的信息。
4.2 两种查询风格:DSL 和 SQL
DSL风格示例:
personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show
1
2
3
4
SQL 风格示例:
spark.sql("select * from t_person").show
1
总结:
DataFrame 和 DataSet 都可以通过RDD来进行创建;
也可以通过读取普通文本创建–注意:直接读取没有完整的约束,需要通过 RDD+Schema;
通过 josn/parquet 会有完整的约束;
不管是 DataFrame 还是 DataSet 都可以注册成表,之后就可以使用 SQL 进行查询了! 也可以使用 DSL!
3.4.3 Spark SQL 多数据源交互
读取 json 文件:
spark.read.json("D:\\data\\output\\json").show()
1
读取 csv 文件:
spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()
1
读取 parquet 文件:
spark.read.parquet("D:\\data\\output\\parquet").show()
1
读取 mysql 表:
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
1
2
3
4
5
写入 json 文件:
personDF.write.json("D:\\data\\output\\json")
1
写入 csv 文件:
personDF.write.csv("D:\\data\\output\\csv")
1
写入 parquet 文件:
personDF.write.parquet("D:\\data\\output\\parquet")
1
写入 mysql 表:
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
————————————————
版权声明:本文为CSDN博主「杨林伟」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_20042935/article/details/125536640
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!