博客 Pyspark dataframe基本内置方法(3)

Pyspark dataframe基本内置方法(3)

   数栈君   发表于 2024-11-22 11:14  236  0

df.foreach 逐条执行

df.foreach() == df.rdd.foreach()

df.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
+---+-----+
def func(row):
print(row.name)

# row对象进入func执行
df.foreach(func)
Alice
Bob

foreachPartition 按分区逐条执行

df.show()
+---+-----+
|age| name|
+---+-----+
| 14| Tom|
| 23|Alice|
| 16| Bob|
+---+-----+
def func(itr):
for person in itr:
print(person.name)

df.foreachPartition(func)
Tom
Alice
Bob

freqltems

df = spark.createDataFrame([(1, 11), (1, 11), (3, 10), (4, 8), (4, 8)], ["c1", "c2"])
df.show()
+---+---+
| c1| c2|
+---+---+
| 1| 11|
| 1| 11|
| 3| 10|
| 4| 8|
| 4| 8|
+---+---+
df.freqItems(["c1", "c2"]).show()
+------------+------------+
|c1_freqItems|c2_freqItems|
+------------+------------+
| [1, 3, 4]| [8, 10, 11]|
+------------+------------+

groupBy 分组

df.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 2| Bob|
| 2| Bob|
| 5| Bob|
+---+-----+

df.groupBy("name").agg({"age": "sum"}).show()
+-----+--------+
| name|sum(age)|
+-----+--------+
| Bob| 9|
|Alice| 2|
+-----+--------+

df.groupBy("name").agg({"age": "max"}).withColumnRenamed('max(age)','new_age').sort('new_age').show()
+-----+-------+
| name|new_age|
+-----+-------+
|Alice| 2|
| Bob| 5|
+-----+-------+

head 获取指定数量开头

df.head(2)
[Row(age=2, name='Alice'), Row(age=2, name='Bob')]

hint 查询优化

处理大表join时,spark默认策略可能不是最优解,通过hint 可以设置join类型

其他hints: merge,shuffle,coalesce

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])                                                                                             
df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")])
df.join(df2, "name").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#1641, age#1640L, height#1644L]
+- SortMergeJoin [name#1641], [name#1645], Inner
:- Sort [name#1641 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#1641, 200), ENSURE_REQUIREMENTS, [plan_id=1916]
: +- Filter isnotnull(name#1641)
: +- Scan ExistingRDD[age#1640L,name#1641]
+- Sort [name#1645 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#1645, 200), ENSURE_REQUIREMENTS, [plan_id=1917]
+- Filter isnotnull(name#1645)
+- Scan ExistingRDD[height#1644L,name#1645]

df.join(df2.hint("broadcast"), "name").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#1641, age#1640L, height#1644L]
+- BroadcastHashJoin [name#1641], [name#1645], Inner, BuildRight, false
:- Filter isnotnull(name#1641)
: +- Scan ExistingRDD[age#1640L,name#1641]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=1946]
+- Filter isnotnull(name#1645)
+- Scan ExistingRDD[height#1644L,name#1645]

intersect 获取交集(去重)

df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
PyDev console: starting.
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
df1.show()
+---+---+
| C1| C2|
+---+---+
| a| 1|
| a| 1|
| b| 3|
| c| 4|
+---+---+
df2.show()
+---+---+
| C1| C2|
+---+---+
| a| 1|
| a| 1|
| b| 3|
+---+---+
df1.intersect(df2).show()
+---+---+
| C1| C2|
+---+---+
| b| 3|
| a| 1|
+---+---+

intersectAll 获取交集(保留重复项)

df1.intersectAll(df2).show()
+---+---+
| C1| C2|
+---+---+
| a| 1|
| a| 1|
| b| 3|
+---+---+

isEmpty 判断dataframe是否为空

# 空返回True 非空返回False
df1.isEmpty()
False

join 关联

注意聚合方式可能会影响show出来的列

单列聚合

df2.show()
+------+----+
|height|name|
+------+----+
| 80| Tom|
| 85| Bob|
+------+----+
df4.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 10| 80|Alice|
| 5| null| Bob|
|null| null| Tom|
|null| null| null|
+----+------+-----+
df4.join(df2,df4.name == df2.name,how='left').show()
+----+------+-----+------+----+
| age|height| name|height|name|
+----+------+-----+------+----+
| 5| null| Bob| 85| Bob|
| 10| 80|Alice| null|null|
|null| null| Tom| 80| Tom|
|null| null| null| null|null|
+----+------+-----+------+----+
df4.join(df2,df4.name == df2.name).show()
+----+------+----+------+----+
| age|height|name|height|name|
+----+------+----+------+----+
| 5| null| Bob| 85| Bob|
|null| null| Tom| 80| Tom|
+----+------+----+------+----+

# 会合并同列名
df4.join(df2,'name').show()
+-----+----+------+------+
| name| age|height|height|
+-----+----+------+------+
|Alice| 10| 80| 80|
| Bob| 5| null| 85|
| Tom|null| null| 80|
+-----+----+------+------+

多列聚合

df2.show()
+------+-----+
|height| name|
+------+-----+
| 80| Tom|
| 85| Bob|
| 80|Alice|
+------+-----+
df4.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 10| 80|Alice|
| 5| null| Bob|
|null| null| Tom|
|null| null| null|
+----+------+-----+
df4.join(df2,[df4.name == df2.name,df4.age==df2.age]).show()
+---+------+-----+------+-----+
|age|height| name|height| name|
+---+------+-----+------+-----+
| 10| 80|Alice| 80|Alice|
+---+------+-----+------+-----+

# 会合并同列名
df4.join(df2,['name','height']).show()
+-----+------+---+
| name|height|age|
+-----+------+---+
|Alice| 80| 10|
+-----+------+---+

df4.join(df2,[df4.name == df2.name,df4.height==df2.height],how='left').show()
+----+------+-----+------+-----+
| age|height| name|height| name|
+----+------+-----+------+-----+
| 10| 80|Alice| 80|Alice|
| 5| null| Bob| null| null|
|null| null| Tom| null| null|
|null| null| null| null| null|
+----+------+-----+------+-----+



df4.join(df2,'name').show()
+-----+----+------+------+
| name| age|height|height|
+-----+----+------+------+
|Alice| 10| 80| 80|
| Bob| 5| null| 85|
| Tom|null| null| 80|
+-----+----+------+------+
df4.join(df2,'name').select(df4.height).show()
+------+
|height|
+------+
| 80|
| null|
| null|
+------+
df4.join(df2,'name').select(df4.height,df2.height).show()
+------+------+
|height|height|
+------+------+
| 80| 80|
| null| 85|
| null| 80|
+------+------+

limit 限定数量

df = spark.createDataFrame( [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
df.limit(1).show()
+---+----+
|age|name|
+---+----+
| 14| Tom|
+---+----+
df.limit(0).show()
+---+----+
|age|name|
+---+----+
+---+----+

mapInPandas 迭代处理

使用pandas dataframe的迭代器

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
print(pdf,type(pdf))
yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, df.schema).show()
# 进入filter_func变成了dataframe处理
id age
0 1 21 <class 'pandas.core.frame.DataFrame'>
id age
0 2 30 <class 'pandas.core.frame.DataFrame'>
+---+---+
| id|age|
+---+---+
| 1| 21|
+---+---+

maplnArrow 迭代处理

该函数应采用pyarrow的迭代器

import pyarrow  
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for batch in iterator:
print(batch,type(batch))
pdf = batch.to_pandas()
print(pdf,type(pdf))
yield pyarrow.RecordBatch.from_pandas(pdf[pdf.id == 1])

df.mapInArrow(filter_func, df.schema).show()
pyarrow.RecordBatch
id: int64
age: int64 <class 'pyarrow.lib.RecordBatch'>
id age
0 1 21 <class 'pandas.core.frame.DataFrame'>
pyarrow.RecordBatch
id: int64
age: int64 <class 'pyarrow.lib.RecordBatch'>
id age
0 2 30 <class 'pandas.core.frame.DataFrame'>
+---+---+
| id|age|
+---+---+
| 1| 21|
+---+---+

fill 填充

d1 = spark.sql("SELECT 1 AS c1, int(NULL) AS c2")
d1.show()
+---+----+
| c1| c2|
+---+----+
| 1|null|
+---+----+
d1.na.fill(2).show()
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+

orderBy 排序

df.orderBy('age').show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 2| Bob|
| 5| Bob|
+---+-----+

persist 持久化缓存

from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.DISK_ONLY)

printSchema 打印架构

以树格式打印出架构

df.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 2| Bob|
| 5| Bob|
+---+-----+

df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:

《行业指标体系白皮书》下载地址:

《数据治理行业实践白皮书》下载地址:

《数栈V6.0产品白皮书》下载地址:

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群