Pyspark学习笔记(五)RDD的操作
提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
前言
提示:本篇博客讲的是RDD的各种操作,包括转换操作、行动操作、键值对操作
一、PySpark RDD 转换操作
PySpark RDD 转换操作(Transformation) 是惰性求值,用于将一个 RDD 转换/更新为另一个。由于RDD本质上是不可变的,转换操作总是创建一个或多个新的RDD而不更新现有的RDD,因此,一系列RDD转换创建了一个RDD谱系(依赖图)。
1.窄操作
这些计算数据存在于单个分区上,这意味着分区之间不会有任何数据移动。
常见的执行窄操作的一般有:
map()
,
mapPartition()
,
flatMap()
,
filter()
,
union()
2.宽操作
这些计算数据存在于许多分区上,这意味着分区之间将有数据移动以执行更广泛的转换。 由于这些对数据进行混洗,因此它们也称为混洗转换,所以与窄操作相比,是更加昂贵的操作。
常见的执行宽操作的一些方法是:
groupBy()
,
groupByKey()
,
join()
,
repartition()
等
3.常见的转换操作表
转换操作 |
描述 |
---|---|
map(<func>) |
是所有转换操作中最基本的。它应用一个具名函数或者匿名函数,对数据集内的所有元素执行同一操作。https://sparkbyexamples.com/pyspark/pyspark-map-transformation/ |
flatMap(<func>) |
与map的操作类似,但会进一步拍平数据,表示会去掉一层嵌套.https://sparkbyexamples.com/pyspark/pyspark-flatmap-transformation/ |
mapPartition(<func>) |
类似于map,但在每个分区上执行转换函数,mapPartitions() 的输出返回与输入 RDD 相同的行数,这比map函数提供更好的性能; |
filter(<func>) |
一般是依据括号中的一个布尔型表达式,来筛选出满足为真的元素 |
union( ) |
类似于sql中的union函数,就是将两个RDD执行合并操作;但是pyspark中的union操作似乎不会自动去重,如果需要去重就使用下面的distinct |
distinct( ) |
去除RDD中的重复值;带有参数numPartitions,默认值为None,可以对去重后的数据重新分区 |
groupBy(<func>) |
对元素进行分组。可以是具名函数,也可以是匿名,用来确定对所有元素进行分组的键,或者指定用于对元素进行求值以确定其分组方式的表达式.https://sparkbyexamples.com/pyspark/pyspark-groupby-explained-with-example/ |
sortBy(<keyfunc>,ascending=True) |
将RDD按照参数选出的指定数据集的键进行排序.使用groupBy 和 sortBy的示例:#求余数,并按余数,对原数据进行聚合分组#然后按照升序对各个组内的数据,进行排序 rdd = sc.parallelize([1, 1, 2, 3, 5, 8])result = rdd.groupBy(lambda x: x % 2).collect()sorted([(x, sorted(y)) for (x, y) in result])[(0, [2, 8]), (1, [1, 1, 3, 5])] |
repartition( ) |
重新分区,之前的博客的【并行化】 一节已经描述过 |
coalesce( ) |
重新分区,之前的博客的【并行化】一节已经描述过: |
cache( ) |
缓存,之前博文RDD【持久化】一节已经描述过; |
persist( ) |
持久化,之前博文RDD【持久化】一节已经描述过 |
二、pyspark 行动操作
PySpark RDD行动操作(Actions) 是将 值返回给驱动程序的 PySpark 操作 .行动操作会触发之前的转换操作进行执行。常见的一些行动操作。
行动操作 |
描述 |
---|---|
count() |
该操作不接受参数,返回一个long类型值,代表rdd的元素个数 |
collect() |
返回一个由RDD中所有元素组成的列表(没有限制输出数量,所以要注意RDD的大小) |
take(n) |
返回RDD的前n个元素(无特定顺序)(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中) |
takeOrdered(n, key) |
从一个按照升序排列的RDD,或者按照key中提供的方法升序排列的RDD, 返回前n个元素(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中) https://spark.apache.org/docs/2.2.1/api/python/pyspark.html#pyspark.RDD |
takeSample(withReplacement, num, seed=None) |
返回此 RDD 的固定大小的采样子集 |
top(n) |
返回RDD的前n个元素(按照降序输出, 排序方式由元素类型决定) |
first() |
返回RDD的第一个元素,也是不考虑元素顺序 |
reduce(<func>) |
使用指定的满足交换律/结合律的运算符来归约RDD中的所有元素.指定接收两个输入的 匿名函数(lambda x, y: …)#示例,求和操作Numbers=sc.parallelize([1,2,3,4,])Numbers.reduce(lambda x, y: x+y)#返回10 |
fold(zeroV, <func>) |
使用给定的func和zeroV把RDD中的每个分区的元素集合,然后把每个分区聚合结果再聚合;和reduce类似,但是不满足交换律需特别注意的是,zeroV要在计算的开头和结尾都加上:Numbers=sc.parallelize([1,2,3,4,])Numbers.fold(10, lambda x, y: x+y)#运算过程为 10 + 1+2+3+4 + 10 |
foreach(<func>) |
把具名或者匿名函数,应用到RDD的所有元素上.和map类似,但是由于foreach是行动操作,所以可以执行一些输出类的函数,比如print |
countByValue() |
将此 RDD 中每个唯一值的计数作为 (value, count) 对的字典返回.sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())[(1, 2), (2, 3)] |
aggregate(zeroValue, seqOp, combOp) |
使用给定的函数和初始值,对每个分区的聚合进行聚合,然后对聚合的结果进行聚合seqOp 能够返回与当前RDD不同的类型,比如说返回U,RDD本是T,所以会再用一个combine函数,将两种不同的类型U和T聚合起来 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)(10, 4) >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)(0,0)#这篇博文的示例较为详细https://blog.csdn.net/Li_peipei/article/details/84447234 |
|
|
三、键值对RDD的操作
键值对RDD,就是PairRDD,元素的形式是(key,value),键值对RDD是会被经常用到的一类RDD,它的一些操作函数大致可以分为四类: ·字典函数