博客 Pyspark中pyspark.sql.functions常用方法(3)(array操作)

Pyspark中pyspark.sql.functions常用方法(3)(array操作)

   数栈君   发表于 2024-11-29 11:50  177  0

pyspark sql functions

from pyspark.sql import functions as fs

concat 多列合并成一列

将多个输入列连接成一列。该函数适用于字符串、数字、二进制和兼容的数组列。

df.select(fs.concat(df.s, df.d).alias('s')).show()
+-------+
| s|
+-------+
|abcd123|
+-------+

array 组合数组

df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age"))
df.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 2|
| Bob| 5|
+-----+---+

df.select(fs.array('age', 'age').alias("arr")).show()
+------+
| arr|
+------+
|[2, 2]|
|[5, 5]|
+------+

array_contains 检查数组是否包含

df = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
df.show()
---------+
| data|
+---------+
|[a, b, c]|
| []|
+---------+
# 检查数组类型data列 中是否包含'a'
df.select(fs.array_contains(df.data,'a')).show()
+-----------------------+
|array_contains(data, a)|
+-----------------------+
| true|
| false|
+-----------------------+
df.select(fs.array_contains(df.data,'z')).show()
+-----------------------+
|array_contains(data, z)|
+-----------------------+
| false|
| false|
+-----------------------+

arrays_join 数组中元素拼接

使用分隔符连接列的元素。

df = spark.createDataFrame([(["a", "b", "c"],), (["a", None],)], ['data'])
df.show()
+---------+
| data|
+---------+
|[a, b, c]|
|[a, null]|
+---------+
df.select(fs.array_join(df.data, ",").alias("joined")).show()
+------+
|joined|
+------+
| a,b,c|
| a|
+------+

create_map 创建映射

df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age"))
df.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 2|
| Bob| 5|
+-----+---+

df.select(fs.create_map('name', 'age').alias("map")).show()
+------------+
| map|
+------------+
|{Alice -> 2}|
| {Bob -> 5}|
+------------+

slice 数组选取索引内容

返回一个数组,其中包含从索引开始(,如果开始为负,则从末尾开始)到指定长度的x中的所有元素。

选中的数据组列

索引(数组索引从1开始)

长度

df = spark.createDataFrame([([1, 2, 3],), ([4, 5,6,7,8,9],)], ['x'])
df.show()
+------------------+
| x|
+------------------+
| [1, 2, 3]|
|[4, 5, 6, 7, 8, 9]|
+------------------+
df.select(fs.slice(df.x, 2, 5).alias("sliced")).show()
+---------------+
| sliced|
+---------------+
| [2, 3]|
|[5, 6, 7, 8, 9]|
+---------------+

array_append 数组列追加另一列内容

from pyspark.sql import Row
# 创建方式1
df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2="c")])
# 创建方式2
df = spark.createDataFrame([(["c", "b", "a"],'d')], ['data','d2'])
df.show()
+---------+---+
| data| d2|
+---------+---+
|[c, b, a]| d|
+---------+---+
# 直接合并到列表
df.select(fs.array_append(df.data, df.d2)).show()
+----------------------+
|array_append(data, d2)|
+----------------------+
| [c, b, a, d]|
+----------------------+

array_sort 数组排序

df = spark.createDataFrame([([2, 1, None, 3],),([1],),([],)], ['data'])
df.show()
+---------------+
| data|
+---------------+
|[2, 1, null, 3]|
| [1]|
| []|
+---------------+
df.select(fs.array_sort(df.data).alias('r')).show()
+---------------+
| r|
+---------------+
|[1, 2, 3, null]|
| [1]|
| []|
+---------------+

array_insert 插入数据

都是操作column

arr 数组列

pos 插入索引位置 从1开始

value 插入的值

df = spark.createDataFrame(
[(['a', 'b', 'c'], 2, 'd'), (['c', 'b', 'a'], -2, 'd')],
['data', 'pos', 'val']
)
df.show()
+---------+---+---+
| data|pos|val|
+---------+---+---+
|[a, b, c]| 2| d|
|[c, b, a]| -2| d|
+---------+---+---+
df.select(fs.array_insert(df.data, df.pos.cast('integer'), df.val).alias('data')).show()
+------------+
| data|
+------------+
|[a, d, b, c]|
|[c, b, d, a]|
+------------+

array_remove 移除元素

参数:

1.选中固定数据列

2.要移除的元素

df = spark.createDataFrame([([1, 2, 3, 1, 1],), ([],)], ['data'])
df.show()
+---------------+
| data|
+---------------+
|[1, 2, 3, 1, 1]|
| []|
+---------------+

df.select(fs.array_remove(df.data, 1)).show()
+---------------------+
|array_remove(data, 1)|
+---------------------+
| [2, 3]|
| []|
+---------------------+

array_distinct 数组去重

df = spark.createDataFrame([([1, 2, 3, 2],), ([4, 5, 5, 4],)], ['data'])
df.show()
+------------+
| data|
+------------+
|[1, 2, 3, 2]|
|[4, 5, 5, 4]|
+------------+

df.select(fs.array_distinct(df.data)).show()
+--------------------+
|array_distinct(data)|
+--------------------+
| [1, 2, 3]|
| [4, 5]|
+--------------------+

array_intersect 获取交集

df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2=["c", "d", "a", "f"])])
df.show()
+---------+------------+
| c1| c2|
+---------+------------+
|[b, a, c]|[c, d, a, f]|
+---------+------------+

df.select(fs.array_intersect(df.c1, df.c2)).show()
+-----------------------+
|array_intersect(c1, c2)|
+-----------------------+
| [a, c]|
+-----------------------+

array_union 获取并集

from pyspark.sql import Row
df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2=["c", "d", "a", "f"])])
df.show()
+---------+------------+
| c1| c2|
+---------+------------+
|[b, a, c]|[c, d, a, f]|
+---------+------------+
df.select(fs.array_union(df.c1, df.c2)).show()
+-------------------+
|array_union(c1, c2)|
+-------------------+
| [b, a, c, d, f]|
+-------------------+

array_except 获取差集

获取列1,列2的差集

df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2=["c", "d", "a", "f"])])
df.show()
+---------+------------+
| c1| c2|
+---------+------------+
|[b, a, c]|[c, d, a, f]|
+---------+------------+

df.select(fs.array_except(df.c1,df.c2)).show()
+--------------------+
|array_except(c1, c2)|
+--------------------+
| [b]|
+--------------------+

array_compact 删除数组中空值

df = spark.createDataFrame([([1, None, 2, 3],), ([4, 5, None, 4],)], ['data'])
df.show()
+---------------+
| data|
+---------------+
|[1, null, 2, 3]|
|[4, 5, null, 4]|
+---------------+
df.select(fs.array_compact(df.data)).show()
+-------------------+
|array_compact(data)|
+-------------------+
| [1, 2, 3]|
| [4, 5, 4]|
+-------------------+

transfrom 转换列操作

df = spark.createDataFrame([(1, [1, 2, 3, 4])], ("key", "values"))
df.show()
+---+------------+
|key| values|
+---+------------+
| 1|[1, 2, 3, 4]|
+---+------------+
df.select(fs.transform("values", lambda x: x * 2).alias("doubled")).show()
+------------+
| doubled|
+------------+
|[2, 4, 6, 8]|
+------------+


def alternate(x, i):
return when(i % 2 == 0, x).otherwise(-x)

df.select(transform("values", alternate).alias("alternated")).show()
+--------------+
| alternated|
+--------------+
|[1, -2, 3, -4]|
+--------------+

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:

《行业指标体系白皮书》下载地址:

《数据治理行业实践白皮书》下载地址:

《数栈V6.0产品白皮书》下载地址:

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群