#创建一个SparkSession对象,方便下面使用
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName('exam1')\
.enableHiveSupport()\
.getOrCreate()
# 通过文件创建RDD
sc = spark.sparkContext
sc.textFile(name, minPartitions=None, use_unicode=True)
Example=sc.textFile(“hdfs://exam_dir/running_logs/”)
# 代码内部创建RDD
#Create RDD from parallelize
data = [1,2,3,4,5,6,7,8,9,10,11,12]
Rdd = spark.sparkContext.parallelize(data)
#创建空RDD
rdd = spark.sparkContext.emptyRDD
rdd2 = spark.sparkContext.parallelize( [ ],10) #This creates 10 partitions
从数据源创建RDD
一般是使用SparkSession中的函数,SparkSession对象提供了read method,返回一个DataFrameReader对象。用该对象将数据读取到DataFrame中,DataFrame是一种特殊的RDD,老版本中称为SchemaRDD。 比如说,spark现在是一个已经被创建的SparkSession对象,然后调用read方法,spark.read就是一个DataFrameReader对象,然后就调用该对象(DataFrameReader)的一系列方法
常见的RDD类型
PairRDD: 由键值对组成的RDD,比如前面提到的用wholeTextFiles()方法读取的内容就是以键值对的形式存在
DoubleRDD: 由双精度浮点数组成的RDD。
DataFrame:以前的版本被称为SchemaRDD,按一组有固定名字和类型的列来组织的分布式数据集。DataFrame等价于sparkSQL中的关系型表! 所以我们在使用sparkSQL的时候常常要创建这个DataFrame,在sparkSQL部分会提及。
HadoopRDD:提供读取存储在HDFS上的数据的RDD。
重新分区
第一:使用repartition(numPartitions)从所有节点混洗数据的方法,也称为完全混洗, repartition()方法是一项非常昂贵的操作,因为它会从集群中的所有节点打乱数据。
第二:使用coalesce(n)方法从最小节点混洗数据,仅用于减少分区数。 这是repartition()使用合并降低跨分区数据移动的优化或改进版本。
第三:使用partitionBy(numPartitions, partiontionFunc=portable_hash)函数
混洗操作
Shuffle 是 PySpark 用来在不同执行器甚至跨机器重新分配数据的机制。 可能导致shuffle的操作包括:
repartition和coalesce等重新分区操作,
groupByKey和reduceByKey等聚合操作(计数除外),
以及cogroup和join等连接操作
PySpark Shuffle 是一项昂贵的操作,因为它涉及以下内容 ·磁盘输入/输出 ·涉及数据序列化和反序列化 ·网络输入/输出
混洗分区大小和性能 根据数据集大小,较多的内核和内存混洗可能有益或有害我们的任务。 ①当处理较少的数据量时,通常应该减少 shuffle 分区, 否则最终会得到许多分区文件,每个分区中的记录数较少,形成了文件碎片化。
②另一方面,当有太多数据且分区数量较少时,会导致运行时间较长的任务较少,有时也可能会出现内存不足错误。
获得正确大小的 shuffle 分区总是很棘手,需要多次运行不同的值才能达到优化的数量。当在 PySpark task上遇到性能问题时,这是要寻找的关键属性之一
RDD持久化与重用
RDD主要创建和存在于执行器的内存中。默认情况下,RDD是易逝对象,仅在需要的时候存在。 在它们被转化为新的RDD,并不被其他操作所依赖后,这些RDD就会被删除。 若一RDD在多个行动操作中用到,就每次都会重新计算,则可调用cache()或persist( )方法缓存或持久化RDD。
窄依赖(窄操作)- 宽依赖(宽操作)
窄操作:
①多个操作可以合并为一个阶段,比如同时对一个数据集进行的map操作或者filter操作可以在数据集的各元 素的一轮遍历中处理;
②子RDD只依赖于一个父RDD
③不需要进行节点间的数据混洗
宽操作:
①通常需要数据混洗
②RDD有多个依赖,比如在join或者union的时候
————————————————