在 PySpark 中,cast
函数用于将 DataFrame 或列中的数据类型转换为所需的数据类型。它可以用于将某个列的数据类型更改为其他类型,或者在查询中对特定表达式进行类型转换。
使用 cast
函数的一般语法如下:
df.withColumn("new_column", df["existing_column"].cast(StringType()))
其中,df
是一个 DataFrame,“new_column” 是新列的名称,“existing_column” 是现有列的名称,StringType()
是要转换为的目标数据类型。
例如,将一个整数列转换为浮点数列可以使用以下代码:
from pyspark.sql.functions import col
df = df.withColumn("new_column", col("existing_column").cast("float"))
类型最好使用pyspark.sql.types中的数据类型
此代码将 DataFrame df 中的名为 “existing_column” 的列的数据类型转换为浮点数,并将结果存储在名为 “new_column” 的新列中。
需要注意的是,cast 函数只返回一个新的 DataFrame,它不会修改原始的 DataFrame。如果需要在原始 DataFrame 上进行更改,可以重新分配变量。
另外,cast 函数还可用于在查询中对表达式进行类型转换,而不仅仅限于列。例如:
from pyspark.sql.functions import expr
df = df.withColumn("new_column", expr("CAST(existing_column AS float)"))
以上代码将 existing_column 表达式的数据类型转换为浮点数,并将结果存储在 “new_column” 列中。
groupBy()
在 PySpark 中,groupBy 函数返回的是一个 GroupedData 对象,它代表了对 DataFrame 进行分组后的结果。要展示 GroupedData 的内容,你可以使用一些聚合函数(如 count()、sum()、avg())或转换操作(如 agg()、pivot())来计算和转换数据。
以下是几种常见的方法来展示 GroupedData 的内容:
show()
方法展示结果。from pyspark.sql.functions import count,sum, avg
spark = SparkSession.builder.getOrCreate()
data = [("Alice", 25, 'cn'), ("Bob", None, 'cn'), ("John", 35, 'cn'),
("TOM", 25, 'am'), ("JOKER", None, 'am'), ("SILE", 35, 'am'),
("APPLE", 25, None), ("HABY", None, None), ("TINES", 35, 'yd')]
df = spark.createDataFrame(data, ["name", "age", "city"])
df.show()
》》
+-----+----+----+
| name| age|city|
+-----+----+----+
|Alice| 25| cn|
| Bob|null| cn|
| John| 35| cn|
| TOM| 25| am|
|JOKER|null| am|
| SILE| 35| am|
|APPLE| 25|null|
| HABY|null|null|
|TINES| 35| yd|
+-----+----+----+
print(df.groupBy('city'))
print(type(df.groupBy('city')))
# 聚合后类型变换为GroupedData
result = df.groupBy('city').agg(count("city"),sum('age'))
print(type(result))
result.show()
+----+-----------+--------+
|city|count(city)|sum(age)|
+----+-----------+--------+
| cn| 3| 60|
|null| 0| 25|
| am| 3| 60|
| yd| 1| 35|
+----+-----------+--------+
agg()
在 PySpark 中,agg(aggregate)函数用于对 DataFrame 进行聚合操作。它允许你在一个或多个列上应用一个或多个聚合函数,并返回计算后的结果。
agg 函数常与 groupBy 结合使用,以按照指定的分组条件对数据进行聚合。它可以用于计算各种统计量,如总和、平均值、最大值、最小值等。
以下是 agg 函数的示例用法:
from pyspark.sql.functions import sum, avg, max
df.groupBy("groupColumn").agg(sum("col1"), avg("col2"), max("col3")).show()
在上述代码中,我们首先使用 groupBy 对 DataFrame 进行分组,按照 “groupColumn” 列的值进行分组。然后,通过 agg 函数对每个组进行聚合操作,传递了三个聚合函数:sum、avg 和 max,分别应用于 “col1”、“col2” 和 “col3” 列。最后,使用 show 方法展示聚合结果。
通过 agg 函数,你可以根据需求选择不同的聚合函数,并在多个列上同时应用它们。还可以使用其他支持的聚合函数,如 count、min、collect_list 等。此外,你还可以自定义聚合操作,通过定义自己的聚合函数来实现更灵活的聚合操作。
总之,agg 函数在 PySpark 中用于对 DataFrame 进行聚合操作,可以在一个或多个列上应用一个或多个聚合函数,并返回计算后的结果。
collect_list()
collect_list 函数是 PySpark 中用于将指定列的值收集到一个列表中的聚合函数。
该函数常与 groupBy 结合使用,以按照指定的分组条件对数据进行聚合,并将每个组内指定列的值收集到一个列表中。这在需要将某一列的多个值作为一个整体进行处理时非常有用。
只能选一列,数据收集到列表中,列表中元素非row对象(选中的列每行数据都数据)
以下是 collect_list 函数的示例用法:
from pyspark.sql.functions import collect_list
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,count,sum,struct,collect_list
#样例
df.groupBy("groupColumn").agg(collect_list("valueColumn").alias("listValues")).show()
#实际
spark = SparkSession.builder.getOrCreate()
data = [("Alice", 25, 'cn','2023'), ("Bob", None, 'cn','2023'), ("John", 35, 'cn','2023'),
("TOM", 25, 'am','2023'), ("JOKER", None, 'am','2023'), ("SILE", 35, 'am','2023'),
("APPLE", 25, None,'2023'), ("HABY", None, None,'2023'), ("TINES", 35, 'yd','2023')]
df = spark.createDataFrame(data, ["name", "age", "city",'data_date'])
df2 = df.groupBy("city").agg(collect_list('name').alias("ldsx"))
df2.rdd.map(lambda x:print(x)).collect()
Row(city='yd', ldsx=['TINES'])
Row(city=None, ldsx=['APPLE', 'HABY'])
Row(city='am', ldsx=['TOM', 'JOKER', 'SILE'])
Row(city='cn', ldsx=['Alice', 'Bob', 'John'])
df2.show()
+----+------------------+
|city| ldsx|
+----+------------------+
| cn|[Alice, Bob, John]|
|null| [APPLE, HABY]|
| am|[TOM, JOKER, SILE]|
| yd| [TINES]|
+----+------------------+
在上述代码中,我们首先使用 groupBy 对 DataFrame 进行分组,按照 “groupColumn” 列的值进行分组。然后,通过 agg 函数对每个组进行聚合操作,使用 collect_list 函数来收集 “valueColumn” 列的值到一个列表中。最后,使用 alias 方法给聚合结果的列表列起名为 “listValues”,并通过 show 方法展示聚合结果。
使用 collect_list 函数可以将同一组内的多个值收集到一个列表中,方便进一步对列表进行处理或者存储。你也可以结合其他聚合函数一起使用,如 sum、avg 等,来完成更复杂的聚合操作。
总之,collect_list 函数在 PySpark 中用于将指定列的值收集到一个列表中,并适用于对数据进行分组和聚合的场景。
Struct
struct 函数在 PySpark 中的作用是将多个列组合成一个复杂类型(StructType)的单列。它可以用于创建结构化的数据,方便对多个相关列进行处理和操作。
具体而言,struct 函数将传入的列作为参数,并返回一个新的复杂类型列,其中包含了传入的列名和对应的值。这个新的列可以用于进一步的计算、聚合或者存储。
多列融合为一列,聚合后的列数据变成row对象(相当一行数据,选中的几列融合到一起,融合后的列 就是这一行的列不涉及其余行)
以下是 struct 函数的一个示例:
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,count,sum,struct
spark = SparkSession.builder.getOrCreate()
data = [("Alice", 25, 'cn'), ("Bob", None, 'cn'), ("John", 35, 'cn'),
("TOM", 25, 'am'), ("JOKER", None, 'am'), ("SILE", 35, 'am'),
("APPLE", 25, None), ("HABY", None, None), ("TINES", 35, 'yd')]
df = spark.createDataFrame(data, ["name", "age", "city"])
df.show()
df.select(struct("name", "age", "city").alias("combined")).show()
+---------------+
| combined|
+---------------+
|[Alice, 25, cn]|
| [Bob,, cn]|
| [John, 35, cn]|
| [TOM, 25, am]|
| [JOKER,, am]|
| [SILE, 35, am]|
| [APPLE, 25,]|
| [HABY,,]|
|[TINES, 35, yd]|
df.rdd.map(lambda x:print(x)).collect()
Row(combined=Row(name='Alice', age=25, city='cn'))
Row(combined=Row(name='TOM', age=25, city='am'))
Row(combined=Row(name='John', age=35, city='cn'))
Row(combined=Row(name='APPLE', age=25, city=None))
Row(combined=Row(name='HABY', age=None, city=None))
Row(combined=Row(name='SILE', age=35, city='am'))
Row(combined=Row(name='Bob', age=None, city='cn'))
Row(combined=Row(name='TINES', age=35, city='yd'))
Row(combined=Row(name='JOKER', age=None, city='am'))
在上述代码中,我们使用 struct 函数将 “col1”、“col2” 和 “col3” 列组合成一个名为 “combined” 的复杂类型列。通过 select 函数选择该复杂类型列,并设置别名为 “combined”。最后,通过 show 方法展示结果。
这样可以将多个相关的列组合在一起,方便进行后续的分析和处理。例如,你可以对该复杂类型列进行进一步的查询或者将其用于 groupBy 操作来进行数据聚合等。
总之,struct 函数在 PySpark 中提供了一种简洁和灵活的方式来组合多个列,并将它们封装为一个复杂类型的单列。这有助于在处理数据时更好地表示和操作相关信息。
# struct所有列聚合成一列,collect_list按照聚合列把每一行的数据收集到一个列表中
odps_df = df.groupBy("city").agg(
collect_list(struct(*df.columns)).alias("ldsx")
)
+----+--------------------+
|city| ldsx|
+----+--------------------+
| cn|[[Alice, 25, cn, ...|
|null|[[APPLE, 25,, 202...|
| am|[[TOM, 25, am, 20...|
| yd|[[TINES, 35, yd, ...|
+----+--------------------+
Row(city='yd', ldsx=[Row(name='TINES', age=35, city='yd', data_date='2023')])
Row(city='cn', ldsx=[Row(name='Alice', age=25, city='cn', data_date='2023'), Row(name='Bob', age=None, city='cn', data_date='2023'), Row(name='John', age=35, city='cn', data_date='2023')])
Row(city='am', ldsx=[Row(name='TOM', age=25, city='am', data_date='2023'), Row(name='JOKER', age=None, city='am', data_date='2023'), Row(name='SILE', age=35, city='am', data_date='2023')])
Row(city=None, ldsx=[Row(name='APPLE', age=25, city=None, data_date='2023'), Row(name='HABY', age=None, city=None, data_date='2023')])
df = spark.createDataFrame(
... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
>>> df.columns
['age', 'name']
df = spark.createDataFrame([(1, 12), (10, 1), (19, 8)], ["c1", "c2"])
df.cov("c1", "c2")
-18.0
df = spark.createDataFrame([(11, 12), (10, 11), (9, 10)], ["small", "bigger"])
df.cov("small", "bigger")
1.0
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.createTempView("people")
df2 = spark.sql("SELECT * FROM people")
sorted(df.collect()) == sorted(df2.collect())
True
Throw an exception if the table already exists.
df.createTempView("people")
Traceback (most recent call last):
...
AnalysisException: "Temporary table 'people' already exists;"
spark.catalog.dropTempView("people")
True
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.createOrReplaceTempView("people")
df2 = df.filter(df.age > 3)
df2.createOrReplaceTempView("people")
df3 = spark.sql("SELECT * FROM people")
sorted(df3.collect()) == sorted(df2.collect())
createOrReplaceTempView 方法可以用于创建或替换临时视图,而 createTempView 方法只能用于创建新的临时视图。
DataFrame.createGlobalTempView
DataFrame.createGlobalTempView 是 PySpark 中 DataFrame 对象的方法之一。它用于创建一个全局临时视图。
具体来说,createGlobalTempView 方法将当前 DataFrame 对象注册为一个全局临时视图。全局临时视图是一个在整个 Spark 应用程序中可见的、命名的逻辑表,可以基于该视图执行 SQL 查询。
这个方法的作用是将 DataFrame 转换为一个全局可见的虚拟表。通过创建全局临时视图,你可以在整个 Spark 应用程序的不同会话中使用相同的视图名称对 DataFrame 进行查询和操作。
以下是一个示例:
from pyspark.sql import SparkSession
# 创建一个 SparkSession
spark = SparkSession.builder.getOrCreate()
# 创建一个 DataFrame
df = spark.read.csv("data.csv", header=True, inferSchema=True)
# 将 DataFrame 注册为一个全局临时视图
df.createGlobalTempView("my_global_temp_view")
在上述代码中,createGlobalTempView
方法将 DataFrame df
注册为名为 “my_global_temp_view” 的全局临时视图。接下来,你可以在其他 Spark 应用程序的不同会话中使用相同的视图名称对该全局临时视图进行查询和操作,例如:
spark.newSession().sql("SELECT * FROM global_temp.my_global_temp_view").show()
createGlobalTempView 方法的作用是创建一个全局临时视图,使得该视图在整个 Spark 应用程序的不同会话中都是可见的。通过使用全局临时视图,可以实现跨会话的数据共享和查询操作。
可直接覆盖替换 同createTempView 与createOrReplaceTempView
去重完全相同的数据 不能加参数
df = spark.createDataFrame(
... [(14, "Tom"), (23, "Alice"), (23, "Alice")], ["age", "name"])
>>> df.distinct().count()
2
drop_duplicates() is an alias for dropDuplicates() 别名
DataFrame.``dropDuplicates(subset: Optional[List[str]] = None)
不加指定列等价于distinct
>>> from pyspark.sql import Row
>>> df = spark.createDataFrame([
... Row(name='Alice', age=5, height=80),
... Row(name='Alice', age=5, height=80),
... Row(name='Alice', age=10, height=80)
... ])
>>> df.dropDuplicates().show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
|Alice| 10| 80|
+-----+---+------+
指定列去重
>>> df.dropDuplicates(['name', 'height']).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
+-----+---+------+
删除选中的列drop(col1,col2),drop(*cols),可多选择列删除
from pyspark.sql import Row
>>> df = spark.createDataFrame(
... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
>>> df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")])
>>> df.drop('age').show()
+-----+
| name|
+-----+
| Tom|
|Alice|
| Bob|
+-----+
>>> df.drop(df.age).show()
+-----+
| name|
+-----+
| Tom|
|Alice|
| Bob|
+-----+
>>> df.join(df2, df.name == df2.name, 'inner').drop('name').sort('age').show()
+---+------+
|age|height|
+---+------+
| 14| 80|
| 16| 85|
+---+------+
DataFrameNaFunctions.drop别名
df.na.drop()是DataFrameNaFunctions类的一个方法,允许您处理包含空值的列
DataFrame.dropna() and DataFrameNaFunctions.drop() are aliases of each other.别名
from pyspark.sql import Row
>>> df = spark.createDataFrame([
... Row(age=10, height=80, name="Alice"),
... Row(age=5, height=None, name="Bob"),
... Row(age=None, height=None, name="Tom"),
... Row(age=None, height=None, name=None),
... ])
>>> df.na.drop().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
+---+------+-----+
在 PySpark 中,df.na.drop() 和 df.dropna() 都是 DataFrame 对象的方法,用于处理缺失值。它们之间的区别如下:
df.na.drop(**{subset:[col,col]}):这个方法用于删除包含任何缺失值(null 或 NaN)的行。默认情况下,该方法会删除包含任何缺失值的整行数据。你可以通过传递额外的参数来指定其他条件,例如只删除某一列中包含缺失值的行。
df.dropna():这个方法用于删除包含缺失值的行或列。默认情况下,该方法会删除包含任何缺失值的行。然而,**你也可以通过指定 how 参数来操作列上的缺失值,以控制删除行还是列。如果 how='all',则只有当整列都是缺失值时才会删除该列。**参数为:any,all
返回字段类型
df = spark.createDataFrame(
... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
DataFrame.exceptAll
返回一个新的DataFrame,其中包含此DataFrame 中的行,但不包含另一個DataFrame 中的行,同時保留重复项。
总结起来,exceptAll 方法用于计算两个 DataFrame 之间的差集,返回第一个 DataFrame 中存在但在第二个 DataFrame 中不存在的所有行,包括重复的行。差集且不去重
df1 = spark.createDataFrame(
... [("a", 1), ("a", 1), ("a", 1), ("a", 2), ("b", 3), ("c", 4)], ["C1", "C2"])
>>> df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["C1", "C2"])
>>> df1.exceptAll(df2).show()
+---+---+
| C1| C2|
+---+---+
| a| 1|
| a| 1|
| a| 2|
| c| 4|
+---+---+