博客 Pyspark dataframe基本内置方法(5)

Pyspark dataframe基本内置方法(5)

   数栈君   发表于 2024-11-27 12:02  280  0

toDF 设置新列名

列名更新,将会按照新列名顺序的替换原列名返回新dataframe,更新列名数量需要跟原始列名数量一致。

from pyspark.sql.functions import lit

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12| 1| 男| 1|
|test1| 20| 1| 女| 1|
|test2| 26| 1| 男| 1|
|test3| 19| 1| 女| 1|
|test4| 51| 1| 女| 1|
|test5| 13| 1| 男| 1|
+-----+---+---+------+------+
data.toDF(*['n1','n2','n3','n5','n4']).show()
+-----+---+---+---+---+
| n1| n2| n3| n5| n4|
+-----+---+---+---+---+
| ldsx| 12| 1| 男| 1|
|test1| 20| 1| 女| 1|
|test2| 26| 1| 男| 1|
|test3| 19| 1| 女| 1|
|test4| 51| 1| 女| 1|
|test5| 13| 1| 男| 1|
+-----+---+---+---+---+

toJSON row对象转换json字符串

把dataframe的row对象转换为json字符串,返回rdd

data.rdd.first()
Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
# data.toJSON()返回rdd类型
data.toJSON().first()
'{"name":"ldsx","age":"12","id":"1","gender":"男","new_id":"1"}'

toLocallterator 获取迭代器

返回一个迭代器,其中包含此DataFrame中的所有行。迭代器将消耗与此DataFrame中最大分区一样多的内存。通过预取,它可能会消耗最多2个最大分区的内存。

d1 = data.toLocalIterator()
d1
<generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f55c86e0570>
# 便利迭代器
for i in d1:
print(i)

Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
Row(name='test1', age='20', id='1', gender='女', new_id='1')
Row(name='test2', age='26', id='1', gender='男', new_id='1')
Row(name='test3', age='19', id='1', gender='女', new_id='1')
Row(name='test4', age='51', id='1', gender='女', new_id='1')
Row(name='test5', age='13', id='1', gender='男', new_id='1')

toPandas 转换python dataframe

需要python环境安装pandas的前提下使用,且dataframe需要很小,因为所有数据都加载到driver的内存中。

data.toPandas()
type(data.toPandas())
<class 'pandas.core.frame.DataFrame'>
name age id gender new_id
0 ldsx 12 1 男 1
1 test1 20 1 女 1
2 test2 26 1 男 1
3 test3 19 1 女 1
4 test4 51 1 女 1
5 test5 13 1 男 1

transform dataframe转换

参数为处理函数,返回值必须为dataframe

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12| 1| 男| 1|
|test1| 20| 1| 女| 1|
|test2| 26| 1| 男| 1|
|test3| 19| 1| 女| 1|
|test4| 51| 1| 女| 1|
|test5| 13| 1| 男| 1|
+-----+---+---+------+------+

# 处理函数自定义最后返回了dataframe
def ldsx(spark_df):
colums = [ str(i)+'_ldsx' for i in range(len(spark_df.columns)) ]
return spark_df.toDF(*colums)

data.transform(ldsx).show()
+------+------+------+------+------+
|0_ldsx|1_ldsx|2_ldsx|3_ldsx|4_ldsx|
+------+------+------+------+------+
| ldsx| 12| 1| 男| 1|
| test1| 20| 1| 女| 1|
| test2| 26| 1| 男| 1|
| test3| 19| 1| 女| 1|
| test4| 51| 1| 女| 1|
| test5| 13| 1| 男| 1|
+------+------+------+------+------+

union unionALL 并集不去重(按列顺序)

获得新dataframe,unionall别名为union,如果要去重使用distinct方法,不会解析对应的列名合并,是按照列的顺序合并的,硬合

df2 = spark.createDataFrame([(3, 'C'), (4, 'D')], ['id', 'value'])
df1 = spark.createDataFrame([(1, 'A'), (2, 'B'),(3, 'C'),(3, 'C')], ['id', 'value'])
df1.show()
+---+-----+
| id|value|
+---+-----+
| 1| A|
| 2| B|
| 3| C|
| 3| C|
+---+-----+
df2.show()
+---+-----+
| id|value|
+---+-----+
| 3| C|
| 4| D|
+---+-----+
df1.union(df2)
DataFrame[id: bigint, value: string]
df1.union(df2).show()
+---+-----+
| id|value|
+---+-----+
| 1| A|
| 2| B|
| 3| C|
| 3| C|
| 3| C|
| 4| D|
+---+-----+

# 去重使用distinct
df1.union(df2).distinct().show()
+---+-----+
| id|value|
+---+-----+
| 2| B|
| 1| A|
| 3| C|
| 4| D|
+---+-----+

unionByName 并集不去重(按列名)

是否允许缺失列:allowMissingColumns,默认不允许

# 按照列名合并
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
| 1| 2| 3|
| 6| 4| 5|
+----+----+----+


# 对于不存在列进行填补
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6, 7]], ["col1", "col2", "col3", "col4"])
# allowMissingColumns True默认填补null
df1.unionByName(df2, allowMissingColumns=True).show()

+----+----+----+----+----+
|col0|col1|col2|col3|col4|
+----+----+----+----+----+
| 1| 2| 3|NULL|NULL|
|NULL| 4| 5| 6| 7|
+----+----+----+----+----+

unpivot 反转表(宽表转长表)

ids: 标识列
values:选中的列(LIST)
variableColumnName: 列名
valueColumnName:对应列的值

宽表转长表,一行变多行,除了选中的ids是不变的,但是会把选中的values中的列由列变成行记录,variableColumnName记录了反转前的列名,

valueColumnName 对应 variableColumnName 存储值。

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12| 1| 男| 1|
|test1| 20| 1| 女| 1|
|test2| 26| 1| 男| 1|
|test3| 19| 1| 女| 1|
|test4| 51| 1| 女| 1|
|test5| 13| 1| 男| 1|
+-----+---+---+------+------+
# 一行变成三行,id不变 'age','name','gender'由列转行,c_col依次记录'age','name','gender',c_value则记录对应的值
data.unpivot('id',['age','name','gender'],'c_col','c_value').show()
+---+------+-------+
| id| c_col|c_value|
+---+------+-------+
| 1| age| 12|
| 1| name| ldsx|
| 1|gender| 男|
| 1| age| 20|
| 1| name| test1|
| 1|gender| 女|
| 1| age| 26|
| 1| name| test2|
| 1|gender| 男|
| 1| age| 19|
| 1| name| test3|
| 1|gender| 女|
| 1| age| 51|
| 1| name| test4|
| 1|gender| 女|
| 1| age| 13|
| 1| name| test5|
| 1|gender| 男|
+---+------+-------+

withColumn 添加列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

# 使用d1上的列或者用常量列
d1.withColumn('c_value2',d1.c_value).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
| 1| age| 12| 12|
| 1| name| ldsx| ldsx|
| 1|gender| 男| 男|
| 1| age| 20| 20|
| 1| name| test1| test1|
| 1|gender| 女| 女|
| 1| age| 26| 26|
| 1| name| test2| test2|
| 1|gender| 男| 男|
| 1| age| 19| 19|
| 1| name| test3| test3|
| 1|gender| 女| 女|
| 1| age| 51| 51|
| 1| name| test4| test4|
| 1|gender| 女| 女|
| 1| age| 13| 13|
| 1| name| test5| test5|
| 1|gender| 男| 男|
+---+------+-------+--------+
# 使用常量补充列
from pyspark.sql.functions import lit
d1.withColumn('c_value2',lit('ldsx')).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
| 1| age| 12| ldsx|
| 1| name| ldsx| ldsx|
| 1|gender| 男| ldsx|
| 1| age| 20| ldsx|
| 1| name| test1| ldsx|
| 1|gender| 女| ldsx|
| 1| age| 26| ldsx|
| 1| name| test2| ldsx|
| 1|gender| 男| ldsx|
| 1| age| 19| ldsx|
| 1| name| test3| ldsx|
| 1|gender| 女| ldsx|
| 1| age| 51| ldsx|
| 1| name| test4| ldsx|
| 1|gender| 女| ldsx|
| 1| age| 13| ldsx|
| 1| name| test5| ldsx|
| 1|gender| 男| ldsx|
+---+------+-------+--------+
# 使用表达式设置列
data = [(1,), (2,), (3,), (4,)]
df = spark.createDataFrame(data, ["number"])
df.show()
+------+
|number|
+------+
| 1|
| 2|
| 3|
| 4|
+------+
from pyspark.sql.functions import col, when
df.withColumn("new_number", when(df.number < 3, "Low").otherwise("High")).show()
------+----------+
|number|new_number|
+------+----------+
| 1| Low|
| 2| Low|
| 3| High|
| 4| High|
+------+----------+

withColumns 添加多列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).show()
+---+-----+----+----+
|age| name|age2|age3|
+---+-----+----+----+
| 2|Alice| 4| 5|
| 5| Bob| 7| 8|
+---+-----+----+----+

# 可使用表达式
df.withColumns({'h1': when(df.age < 2, "Low").otherwise("High"), 'h2': df.age + 3}).show()
+---+-----+----+---+
|age| name| h1| h2|
+---+-----+----+---+
| 2|Alice|High| 5|
| 5| Bob|High| 8|
+---+-----+----+---+

withColumnRenamed 列重命名

不存在的列重命名报错,返回新dataframe。

列,重命名列

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumnRenamed('age', 'age2').show()
+----+-----+
|age2| name|
+----+-----+
| 2|Alice|
| 5| Bob|
+----+-----+

withColumnsRenamed 多列重命名

字典,列名的映射

df.withColumnsRenamed({'age':'new_age','name':'new_name'}).show()
+-------+--------+
|new_age|new_name|
+-------+--------+
| 2| Alice|
| 5| Bob|
+-------+--------+

withMetadata 设置元数据

更新元数据,返回新dataframe

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
# 查看列的元数据
df.schema['age'].metadata
{}
# 设置元数据
df_meta = df.withMetadata('age', {'foo': 'bar'})
df_meta.schema['age'].metadata
{'foo': 'bar'}

write 存储表

write.saveAsTable

当追加插入的时候dataframe只需要scheam一致,会自动匹配

  • name: str, 表名

  • format: Optional[str] = None, 格式类型 hive,parquet…

  • mode: Optional[str] = None, 写入方式

    • partitionBy: Optional[Union[str, List[str]]] = None, 分区列表

  • df.show()
    +---+-----+
    |age| name|
    +---+-----+
    | 2|Alice|
    | 5| Bob|
    +---+-----+
    # 覆盖重写
    df.write.saveAsTable('ldsx_test','parquet','overwrite',['age'])

    # 追加写入
    df.write.saveAsTable('ldsx_test','parquet','append',['age'])

    # 另一种写法
    df.write.format('parquet').mode('append').partitionBy(['age']).saveAsTable('ldsx_test')

    http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c9d6aa7f9841e345b5188e4b17e1d857..png

  • http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6ff472a764c2ab3b225a8b99de320de8..png

  • insertInto

    不会对scheam进行校验,按位置插入

  • d2.show()
    +-----+----+
    |name1|age1|
    +-----+----+
    |ldsx1| 2|
    |ldsx2| 3|
    +-----+----+
    d2.write.insertInto('ldsx_test')
    d2.schema
    StructType([StructField('name1', StringType(), True), StructField('age1', LongType(), True)])

    本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

    《数据资产管理白皮书》下载地址:

    《行业指标体系白皮书》下载地址:

    《数据治理行业实践白皮书》下载地址:

    《数栈V6.0产品白皮书》下载地址:

    想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:

    同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群