博客 Pyspark dataframe基本内置方法(2)

Pyspark dataframe基本内置方法(2)

   数栈君   发表于 2024-11-22 11:01  234  0

Pyspark dataframe

from pyspark.sql import  SparkSession,Row
from pyspark.sql.types import *

def init_spark():
spark = SparkSession.builder.appName('LDSX_TEST_DATAFrame') \
.config('hive.metastore.uris', 'thrift://hadoop01:9083') \
.config('spark.master', "local[2]") \
.enableHiveSupport().getOrCreate()
return spark
spark = init_spark()

# 设置字段类型
schema = StructType([
StructField("name", StringType(), True),
StructField("age", StringType(), True),
StructField("id", StringType(), True),
StructField("gender", StringType(), True),
])

count 统计数量

返回dataframe中row的数量

name|age| id|gender|
+-----+---+---+------+
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
+-----+---+---+------+
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- id: string (nullable = true)
|-- gender: string (nullable = true)
PyDev console: starting.
data.count()
6

createGlobalTempView 创建全局视图表

创建的临时表名已存在报错,查询需要使用global_temp

data.createGlobalTempView('ldsx')
# 临时表名存在后重复设置报错
data.createGlobalTempView('ldsx')
Traceback (most recent call last):
pyspark.errors.exceptions.captured.AnalysisException: [TEMP_TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create the temporary view `ldsx` because it already exists.
Choose a different name, drop or replace the existing view, or add the IF NOT EXISTS clause to tolerate pre-existing views.
#查询时需要使用global_temp
spark.sql('select * from global_temp.ldsx').show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
+-----+---+---+------+

createOrReplaceGlobalTempView 创建全局视图表

创建的全局临时视图名已经存在的,将会进行替换操作不会报错

data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
+-----+---+---+------+
# 使用dataframe创建全局视图ldsx
data.createOrReplaceGlobalTempView('ldsx')
# 使用新的dataframe创建全局视图ldsx
spark.createDataFrame([(1,2,3)],['a','b','c']).createOrReplaceGlobalTempView('ldsx')
# 结果显示最新的dataframe内容
spark.sql('select * from global_temp.ldsx').show()
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
+---+---+---+

createTempView 创建临时视图

创建的临时表名已存在会报错,sql不需要使用全局域搜索

data.createTempView('ldsx_1')
spark.sql('select * from ldsx_1').show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
+-----+---+---+------+

createOrReplaceTempView 创建临时视图

创建临时视图名已经存在的,将会进行替换操作不会报错

data.createOrReplaceTempView('ldsx_1')
data.createOrReplaceTempView('ldsx_1')

crossJoin 返回笛卡尔积

df.show()
+---+-----+
|age| name|
+---+-----+
| 14| Tom|
| 23|Alice|
| 16| Bob|
+---+-----+
df2.show()
+------+----+
|height|name|
+------+----+
| 80| Tom|
| 85| Bob|
+------+----+
df.crossJoin(df2).show()
+---+-----+------+----+
|age| name|height|name|
+---+-----+------+----+
| 14| Tom| 80| Tom|
| 14| Tom| 85| Bob|
| 23|Alice| 80| Tom|
| 16| Bob| 80| Tom|
| 23|Alice| 85| Bob|
| 16| Bob| 85| Bob|
+---+-----+------+----+

cube 维度统计

选中两列的唯一值,分别作为横纵坐标 统计出现次数。

df = spark.createDataFrame([(1, 11), (1, 11), (3, 10), (4, 8), (4, 8)], ["c1", "c2"])
df.show()
+---+---+
| c1| c2|
+---+---+
| 1| 11|
| 1| 11|
| 3| 10|
| 4| 8|
| 4| 8|
+---+---+
df.crosstab("c1", "c2").show()
# 3 跟 10组合数量,3 跟11组合数量为 0 ,3跟8组合数量为0 以此类推
+-----+---+---+---+
|c1_c2| 10| 11| 8|
+-----+---+---+---+
| 3| 1| 0| 0|
| 1| 0| 2| 0|
| 4| 0| 0| 2|
+-----+---+---+---+

describe 统计列的基本信息

返回数量,平均值,标准方差,最小值,最大值(字符串也可统计)。

df = spark.createDataFrame(
[("Bob", 13, 40.3, 150.5), ("Alice", 12, 37.8, 142.3), ("Tom", 11, 44.1, 142.2)],
["name", "age", "weight", "height"]
)
df.show()
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
| Bob| 13| 40.3| 150.5|
|Alice| 12| 37.8| 142.3|
| Tom| 11| 44.1| 142.2|
+-----+---+------+------+
df.describe(['age']).show()
+-------+----+
|summary| age|
+-------+----+
| count| 3|
| mean|12.0|
| stddev| 1.0|
| min| 11|
| max| 13|
+-------+----+
df.describe(['name']).show()
+-------+-----+
|summary| name|
+-------+-----+
| count| 3|
| mean| null|
| stddev| null|
| min|Alice|
| max| Tom|
+-------+-----+

distinct 去重

去重完全重复数据返回dataframe

df.show()
+---+------+
|age| name|
+---+------+
| 14| Tom|
| 23| Alice|
| 23| Alice|
| 23|Alice1|
+---+------+
df.distinct().show()
+---+------+
|age| name|
+---+------+
| 14| Tom|
| 23| Alice|
| 23|Alice1|
+---+------+

drop 删除列

df.show()
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
| Bob| 13| 40.3| 150.5|
|Alice| 12| 37.8| 142.3|
| Tom| 11| 44.1| 142.2|
+-----+---+------+------+
df.drop('name').show()
+---+------+------+
|age|weight|height|
+---+------+------+
| 13| 40.3| 150.5|
| 12| 37.8| 142.3|
| 11| 44.1| 142.2|
+---+------+------+
df.drop(*['name','age']).show()
+------+------+
|weight|height|
+------+------+
| 40.3| 150.5|
| 37.8| 142.3|
| 44.1| 142.2|
+------+------+

dropDuplicates 去重

drop_duplicates 别名效果一样

from pyspark.sql import Row
df = spark.createDataFrame([
Row(name='Alice', age=5, height=80),
Row(name='Alice', age=5, height=80),
Row(name='Alice', age=10, height=80)
])
df.show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
|Alice| 5| 80|
|Alice| 10| 80|
+-----+---+------+
df.dropDuplicates().show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
|Alice| 10| 80|
+-----+---+------+
df.dropDuplicates(['name']).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
+-----+---+------+

dropna 删除null值

dropna() 参数可选项 all,全部为空的行,any只要存在null行就删掉,默认为any

df.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 10| 80|Alice|
| 5| null| Bob|
|null| null| Tom|
|null| null| null|
+----+------+-----+
df.na.drop().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
+---+------+-----+
df.dropna().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
+---+------+-----+
df.dropna('all').show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 10| 80|Alice|
| 5| null| Bob|
|null| null| Tom|
+----+------+-----+

dtypes 查看列类型

df.dtypes
[('age', 'bigint'), ('height', 'bigint'), ('name', 'string')]

exceptall 剔除交集数据

返回一个新的DataFrame,其中包含此DataFrame中的行,但不包含在另一个DataFrame中,同时保留重复项。

df1,df2,就是df1剔除(df1与df2交集)。有几个剔除几个重复项保留

df1 = spark.createDataFrame(
[("a", 1), ("a", 1), ("a", 1), ("a", 2), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["C1", "C2"])
df1.show()
+---+---+
| C1| C2|
+---+---+
| a| 1|
| a| 1|
| a| 1|
| a| 2|
| b| 3|
| c| 4|
+---+---+
df2.show()
+---+---+
| C1| C2|
+---+---+
| a| 1|
| b| 3|
+---+---+
df1.exceptAll(df2).show()
+---+---+
| C1| C2|
+---+---+
| a| 1|
| a| 1|
| a| 2|
| c| 4|
+---+---+

explan 查看执行计划

不加参数(逻辑和物理)计划展示。

可选参数,指定计划的预期输出格式。
simple:只打印一份实物计划。
extended:打印逻辑和物理计划。
codegen:打印物理计划和生成的代码(如果可用)。
codegen:打印逻辑计划和统计数据(如果可用)。
formatted:将解释输出分为两部分:物理计划大纲和节点详细信息。

df1.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[C1#1146,C2#1147L]

fillna 填充null值

fillna() 别名 na.fill() ,如果列的类型不符合填充的类型,则这列不填补

df.show()
+----+------+-----+----+
| age|height| name|bool|
+----+------+-----+----+
| 10| 80.5|Alice|null|
| 5| null| Bob|null|
|null| null| Tom|null|
|null| null| null|true|
+----+------+-----+----+

df.na.fill(100).show()
+---+------+-----+----+
|age|height| name|bool|
+---+------+-----+----+
| 10| 80.5|Alice|null|
| 5| 100.0| Bob|null|
|100| 100.0| Tom|null|
|100| 100.0| null|true|
+---+------+-----+----+
# 针对填充
df.na.fill({'age': 50, 'name': 'ldsx','bool':'false','height':100}).show()
+---+------+-----+-----+
|age|height| name| bool|
+---+------+-----+-----+
| 10| 80.5|Alice|false|
| 5| 100.0| Bob|false|
| 50| 100.0| Tom|false|
| 50| 100.0| ldsx| true|
+---+------+-----+-----+

filter 过滤

使用sql表达式可以把filter换成where

f = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
+---+-----+
df.filter(df.age>=5).show()
+---+----+
|age|name|
+---+----+
| 5| Bob|
+---+----+

first 获取第一条数据的row对象

df.first()
Row(age=2, name='Alice')

————————————————

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:

《行业指标体系白皮书》下载地址:

《数据治理行业实践白皮书》下载地址:

《数栈V6.0产品白皮书》下载地址:

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群