map
map(func, preservesPartitioning=False)
最基本的转化操作,对数据集中的每一个元素,应用一个具名/匿名 函数进行才处理; 一个或多个map可以异步进行,因为它们不会产生副作用。
rdd = sc.parallelize(["b", "a", "c"])
sorted(rdd.map(lambda x: (x, 1)).collect())
output- [('a', 1), ('b', 1), ('c', 1)]
flatMap
flatMap(func, preservesPartitioning=False)
与map的操作类似,但会进一步拍平数据,表示会去掉一层嵌套
rdd = sc.parallelize([2, 3, 4])
sorted(rdd.flatMap(lambda x: range(1, x)).collect())
output: [1, 1, 1, 2, 2, 3]
sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
output: [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
filter
filter(func)
一般是依据括号中的一个布尔型表达式,来筛选出满足为真的元素
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]
distinct
distinct(numPartitions=None)
sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]
group by
groupBy(func, numPartitions=None, partitionFunc=)
依据func 中提供的条件,对原始RDD进行分组聚合
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()
sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]
sortBy
sortBy(keyfunc, ascending=True, numPartitions=None)
依据 keyfunc 对原RDD进行排序
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
count
count()
不接收参数,返回一个long类型的值,代表RDD中的元素个数
sc.parallelize([2, 3, 4]).count()
3
collect
collect()
输出一个由RDD中所有元素组成的列表 一般只在小规模数据中使用,避免输出一个过大的列表
take
take(n)
返回RDD的前n个元素(随机的)
top
top(n, key=None)
和top的功能类似,但是top会将元素排序并按照降序输出。
first
fisrt()
返回RDD中的第一个元素,与take(1)很相似,但是不同之处在于: take(1)返回的是由一个元素组成的列表; 而first( ) 返回的只是一个具体的元素。
reduce
reduce(func)
使用指定的满足交换律和结合律的运算符,来归约RDD中的所有元素。
foreach
foreach(func)
对数据集中的每一个元素应用具名/匿名函数,与map类似,但是不同之处在于: map是转化操作,无法输出;而foreach是行动操作,可以有输出函数