在 PySpark 中,DataFrame 的 .na
属性用于处理缺失值(NaN、null 或空值)。.na
属性提供了一组方法来处理和操作缺失值。以下是一些常用的方法:
drop()
删除包含任何缺失值的行
df.na.drop()
删除指定列中包含缺失值的行。
df.na.drop(subset=["col1", "col2"])
使用指定的值填充指定列中的缺失值。
df.na.fill(0, subset=["col1", "col2"])
将指定列中的特定值替换为给定的新值
df.na.replace("old_value", "new_value", subset=["col1", "col2"])
这些方法都返回一个新的 DataFrame,原始 DataFrame 不会被修改。
以下是一个使用 .na
方法处理缺失值的示例
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# 创建一个包含缺失值的 DataFrame
data = [("Alice", 25, None), ("Bob", None, 30), ("John", 35, 40)]
df = spark.createDataFrame(data, ["name", "age", "score"])
# 删除包含缺失值的行
df_without_na = df.na.drop()
# 填充缺失值
df_filled = df.na.fill(0, subset=["age"])
# 替换特定值
df_replaced = df.na.replace("Alice", "Lucy", subset=["name"])
# 显示处理后的 DataFrame
df_without_na.show()
df_filled.show()
df_replaced.show()
在上述示例中,我们首先创建了包含缺失值的 DataFrame。然后使用 .na.drop() 方法删除了包含任何缺失值的行,使用 .na.fill() 方法填充了缺失值,并使用 .na.replace() 方法替换了特定值。
最后,我们分别打印出经过处理后的 DataFrame。
col
from pyspark.sql.functions import col
pyspark.sql.functions.col() 是一个函数,用于引用 DataFrame 中的列。它主要用于在 Spark SQL 或 PySpark 中构建复杂的表达式和转换操作。
使用 col() 函数,你可以通过列名获取 DataFrame 中的列,并将其用作其他函数的参数或进行列之间的操作。
以下是一些 col() 函数的常见用法示例:
选择列:
df.select(col("column_name"))
进行条件过滤:
df.filter(col("column_name") > 5)
创建新列:
df.withColumn("new_column", col("column1") + col("column2"))
嵌套函数调用:
df.withColumn("new_column", sqrt(col("column1")))
通过使用 col() 函数,你可以对 DataFrame 的列执行各种转换和操作,例如选择、过滤、计算等。它提供了一种方便的方式来处理列级别的操作,同时使代码更易读和可维护。
withColumns()
在 PySpark 中,df.withColumn() 方法用于创建一个新的 DataFrame,并添加新的列或替换现有的列。它的语法如下:
df.withColumn(colName, col)
其中:
colName:要添加或替换的列的名称。
col:使用函数、表达式或已存在的列生成的新列。
withColumn() 方法允许你对现有 DataFrame 进行变换操作,例如添加新的计算列、重命名现有列、替换现有列的值等。它返回一个新的 DataFrame,而不会修改原始 DataFrame。
以下是一些 withColumn() 方法的常见用法示例:
添加计算列:
df.withColumn("new_column", df["column1"] + 1)
重命名列:
df.withColumnRenamed("old_column", "new_column")
替换列的值:
df.withColumn("column1", when(df["column1"] < 0, 0).otherwise(df["column1"]))
基于现有列创建新列:
df.withColumn("new_column", concat(df["first_name"], lit(" "), df["last_name"]))
通过使用 withColumn() 方法,你可以按照需要对 DataFrame 进行列级别的变换和操作。它提供了一种灵活的方式来构建和转换 DataFrame,以适应特定的数据处理需求。
when() otherwise()
在 PySpark 中,when() 函数用于执行条件逻辑操作。它通常与 otherwise() 方法一起使用来实现基于条件的列操作。
when() 函数接受一个条件表达式和一个要返回的值作为参数。它会根据条件表达式的结果决定返回的值。
以下是一个示例代码,展示了如何使用 when() 函数:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
spark = SparkSession.builder.getOrCreate()
# 创建一个包含数字列的 DataFrame
data = [(1,), (2,), (3,), (4,)]
df = spark.createDataFrame(data, ["number"])
# 使用 when 函数进行条件判断
df_updated = df.withColumn("new_number", when(col("number") < 3, "Low").otherwise("High"))
df_updated.show()
输出结果为:
+------+----------+
|number|new_number|
+------+----------+
| 1| Low|
| 2| Low|
| 3| High|
| 4| High|
+------+----------+
在上述示例中,我们创建了一个包含数字列的 DataFrame,并使用 when() 函数对 “number” 列进行条件判断。当数字小于 3 时,将 “new_number” 列设置为 “Low”;否则,设置为 “High”。
通过在 when() 函数中指定条件和相应的返回值,你可以根据特定条件对列进行处理和转换。这为数据操作提供了灵活性和可扩展性。
replace(str, search, replace)
pyspark.sql.functions.replace() 函数用于替换字符串中的特定子字符串。它的语法如下:
replace(str, search, replace)
其中:
str:要进行替换操作的字符串列或表达式。
search:要搜索并替换的子字符串。
replace:用于替换匹配项的新字符串。
这个函数将在给定的字符串列或表达式中查找所有匹配 search 的子字符串,并用 replace 进行替换。
以下是一个示例,展示了如何使用 replace() 函数:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, replace
spark = SparkSession.builder.getOrCreate()
# 创建一个包含字符串的 DataFrame
data = [("Alice", "Hello, Alice!"), ("Bob", "Hello, Bob!"), ("John", "Hi, John!")]
df = spark.createDataFrame(data, ["name", "message"])
# 使用 replace() 替换字符串中的子字符串
df_replaced = df.withColumn("new_message", replace(col("message"), "Hello", "Hi"))
df_replaced.show(truncate=False)
输出结果为:
+----+--------------+-------------------+
|name|message |new_message |
+----+--------------+-------------------+
|Alice|Hello, Alice! |Hi, Alice! |
|Bob |Hello, Bob! |Hi, Bob! |
|John |Hi, John! |Hi, John! |
+----+--------------+-------------------+
在上述示例中,我们创建了一个包含名字和消息的 DataFrame。然后,使用 replace() 函数将消息中的 “Hello” 替换为 “Hi”,并将结果保存在新列 “new_message” 中。
通过使用 replace() 函数,你可以对字符串列中的特定子字符串进行替换操作,使得数据处理更加灵活和方便。
Row对象
在 PySpark RDD 中,每一行数据通常不是 Row 对象,而是普通的 Python 对象(例如元组、列表等)或其他用户自定义的对象。
当使用 Spark 读取数据源创建 RDD 时,每一行会被解析为一个 Python 对象,通常是元组。例如,使用 sparkContext.parallelize() 方法创建 RDD,可以将元组作为每个元素,其中元组的每个元素表示一行数据的字段。
以下是一个示例代码,展示了如何创建包含元组的 RDD:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# 创建一个 RDD 包含元组作为每一行数据
rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob"), (3, "Charlie")])
# 输出每一行数据
for row in rdd.collect():
print(row)
输出结果为:
(1, 'Alice')
(2, 'Bob')
(3, 'Charlie')
在上述示例中,我们使用 parallelize() 方法创建了一个 RDD,其中每个元素都是一个元组,表示一行数据的字段。
注意,如果你使用 DataFrame 或 Dataset API 来操作数据,那么每一行数据将会以 Row 对象的形式存在。但是,在 RDD 中,每一行数据通常是用普通的 Python 对象来表示的。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [("Alice", 25, None), ("Bob", None, 30), ("John", 35, 40)]
df = spark.createDataFrame(data, ["name", "age", "score"])
df.show()
+-----+----+-----+
| name| age|score|
+-----+----+-----+
|Alice| 25| null|
| Bob|null| 30|
| John| 35| 40|
print(type(df.rdd))
print(df.rdd.collect())
》》 <class 'pyspark.rdd.RDD'>
》》 [Row(name='Alice', age=25, score=None), Row(name='Bob', age=None, score=30), Row(name='John', age=35, score=40)]
# 便利每一行dataframe
z=df.rdd.map(lambda x: print(type(x))).collect()
print(z)
》》 <class 'pyspark.sql.types.Row'>
》》 <class 'pyspark.sql.types.Row'>
》》 <class 'pyspark.sql.types.Row'>
》》 [None, None, None]
StructType,StructField,数据类型
StructType
在 PySpark 中,pyspark.sql.types.StructType 是用于定义 DataFrame 的结构或模式的类。它用于指定每个字段的名称和数据类型,并且可以嵌套定义复杂的结构。
以下是一个示例代码,展示了如何使用 StructType 定义一个包含多个字段的结构:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.getOrCreate()
# 定义结构(模式)
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("age", IntegerType(), nullable=True),
StructField("city", StringType(), nullable=True)
])
# 创建 DataFrame 使用定义的结构
data = [("Alice", 30, "New York"), ("Bob", 35, "San Francisco"), ("Charlie", None, None)]
df = spark.createDataFrame(data, schema=schema)
df.show()
输出结果为:
+-------+----+-------------+
| name| age| city|
+-------+----+-------------+
| Alice| 30| New York|
| Bob| 35|San Francisco|
|Charlie|null| null|
+-------+----+-------------+
在上述示例中,我们导入了 StructType 和相关的类。然后,通过创建一个包含多个 StructField 的列表,定义了一个结构(模式)schema。每个 StructField 都需要指定字段名、字段类型和可选的 nullable 参数。
最后,我们使用 createDataFrame() 方法将数据和定义的结构传递给 SparkSession 来创建 DataFrame。
StructType定义DataFrame优点
使用 StructType 来定义 DataFrame 的模式(schema)有以下几个好处:
指定字段的名称和数据类型:通过使用 StructType,你可以明确指定每个字段的名称和数据类型。这对于确保数据按照预期的方式进行解析和处理非常重要。如果数据源文件或输入数据的结构发生变化,使用结构化模式可以提前捕捉到这些变化,并在读取数据时进行验证和处理。
提供类型安全性:定义一个结构化模式可以为 DataFrame 提供类型安全性。根据字段的数据类型,PySpark 在查询和操作期间可以进行类型检查,并提供更好的错误提示。这可以避免由于类型不匹配而导致的运行时错误,提高代码的可维护性和稳定性。
优化执行计划:指定准确的模式可以帮助 PySpark 优化执行计划。根据字段的数据类型,PySpark 可以选择最佳的执行策略,并应用相关的优化技术。这可以提高查询性能和整体数据处理效率。
数据一致性:使用结构化模式可以确保 DataFrame 中的数据与实际数据的结构相匹配。这可以防止数据中的格式错误、缺失值或其他异常情况影响数据处理的准确性。通过强制执行模式,你可以确保 DataFrame 数据的一致性和完整性。
元数据描述:结构化模式提供了有关 DataFrame 的元数据描述。通过查看模式,你可以了解 DataFrame 中每个字段的名称、数据类型、是否可为空等详细信息。这对于数据文档、数据质量控制和数据分析非常有用。
总而言之,使用 StructType 来定义 DataFrame 的模式可以提供更好的代码健壮性、类型安全性以及查询性能优化。它确保数据按照预期的方式进行处理,并提供了额外的元数据信息,有助于数据处理的准确性和可维护性。
StructField
StructField 是 PySpark 中用于定义结构化数据模式的类之一。它用于描述 DataFrame 或表的字段,并指定每个字段的名称、数据类型和是否可为空。
StructField 的构造函数如下:
StructField(name, dataType, nullable=True)
参数说明:
name: 字段名称,为字符串类型。
dataType: 字段的数据类型,可以使用 pyspark.sql.types 中提供的数据类型,例如 StringType()、IntegerType() 等。
nullable: 指定该字段是否可以为空,默认为 True。
通过使用 StructField,你可以在 StructType 中定义多个字段,从而构建复杂的数据模式。这样做有助于确保 DataFrame 或表的结构与实际数据的结构相匹配,并提供了类型安全性和数据一致性。
pyspark.sql.types中的数据类型
pyspark.sql.types 模块提供了多种数据类型类,用于在 PySpark 中定义字段或列的数据类型。以下是一些常用的数据类型及其作用:
StringType: 表示字符串类型的数据。
IntegerType: 表示整数类型的数据。
FloatType: 表示浮点数类型的数据。
DoubleType: 表示双精度浮点数类型的数据。
BooleanType: 表示布尔类型的数据,取值为 True 或 False。
DateType: 表示日期类型的数据。
TimestampType: 表示时间戳类型的数据。
ArrayType: 表示数组类型的数据,可以包含不同类型的元素。
StructType: 表示结构类型的数据,类似于关系型数据库的表结构。
MapType: 表示键值对类型的数据,其中键和值可以具有不同的数据类型。
from pyspark.sql.types import IntegerType
# 定义一个整数类型的字段
age_field = StructField("age", IntegerType(), nullable=True)