Spark编程笔记(3)-键值对RDD

前言

上一篇文章 开始,正式开始介绍Spark编程中的RDD编程基础。

⭐️前文提到,完整的RDD编程章节会分为4部分进行介绍:

  • RDD编程基础
  • 键值对RDD
  • 数据读写
  • 综合案例

本节对上述内容的第2部分——键值对RDD进行详细介绍 。键值对RDD是很重要的一种数据形式。reduceByKey(func)和groupByKey()等聚合函数都需要在键值对中进行使用。

⭐️本文(键值对RDD)目录如下:

前言
键值对RDD的创建
键值对RDD转换操作
一个综合实例
总结

Part 1. 键值对RDD的创建

⭐️键值对RDD的创建和上一篇文章中的RDD创建类似,有2种创建方法。

1)从文件中加载 可以采用多种方式创建键值对RDD,其中一种主要方式是使用map()函数来实现。示例代码如下:

>>> lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
>>> pairRDD = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
>>> pairRDD.foreach(print)
('I', 1)
('love', 1)
('Hadoop', 1)
……

2)通过并行集合(列表)创建RDD。代码示例如下:

>>> list = ["Hadoop","Spark","Hive","Spark"]
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word:(word,1))
>>> pairRDD.foreach(print)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)

在后续内容中会发现,rdd.map(lambda word:(word,1))这种创建RDD键值对的方法是非常常用的 。因为在Spark中,RDD编程所使用的聚合函数需要根据键值对中的第一个元素,即键进行聚合,再对第二个元素进行计算。需要注意的是,第二个元素可能包含多个取值,如('word',(1,1,1,1))中,键为字符串'word',值为(1,1,1,1)。详细内容可见文章最后的例子。

Part2.键值对RDD转换操作

⭐️常用的键值对RDD转换操作包括:

  • reduceByKey(func)
  • groupByKey()
  • keys
  • values
  • sortByKey()
  • mapValues(func)
  • join
  • combineByKey

接下来分别在例子中,展示上述转换操作的含义以及使用方法。

1)reduceByKey(func)

reduceByKey(func)的功能是,使用func函数合并具有相同键的值 。也就是常说的聚合操作。按照键进行聚合,并使用func指定对值要进行的操作。代码展示如下:

>>> pairRDD = sc.parallelize([("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)])
>>> pairRDD.reduceByKey(lambda a,b:a+b).foreach(print)
('Spark', 2)
('Hive', 1)
('Hadoop', 1)

2)groupByKey()

groupByKey()的功能是,对具有相同键的值进行分组 。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5), 采用groupByKey()后得到的结果是:("spark",(1,2))和("hadoop",(3,5)) 。与reduceByKey(func) 的区别是,groupByKey()过程不对键值对中的值进行计算,只是单纯的罗列在同一个聚合中。代码示意如下:

>>> list = [("spark",1),("spark",2),("hadoop",3),("hadoop",5)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.groupByKey()
PythonRDD[27] at RDD at PythonRDD.scala:48
>>> pairRDD.groupByKey().foreach(print)
('hadoop', <pyspark.resultiterable.ResultIterable object at
0x7f2c1093ecf8>)
('spark', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>)

观察上述代码的输出结果,可以发现("hadoop",(3,5)) 中的值(3,5)是一种pyspark.resultiterable.ResultIterable类型的文件,在这里不做展开。

⭐️再总结一下reduceByKey和groupByKey的区别:

  • reduceByKey用于对每个key对应的多个value进行merge 操作,最重要的是它能够在本地先进行merge操作,并且 merge操作可以通过函数自定义 reduceByKey和groupByKey的区别;
  • groupByKey也是对每个key进行操作,但只生成一个 sequence,groupByKey本身不能自定义函数,需要先用 groupByKey生成RDD,然后才能对此RDD通过map进行 自定义函数操作。

通过一个示例展示两者的区别:

>>> words = ["one", "two", "two", "three", "three", "three"]
>>> wordPairsRDD = sc.parallelize(words).map(lambda word:(word, 1))
>>> wordCountsWithReduce = wordPairsRDD.reduceByKey(lambda a,b:a+b)
>>> wordCountsWithReduce.foreach(print)
('one', 1)
('two', 2)
('three', 3)
>>> wordCountsWithGroup = wordPairsRDD.groupByKey(). \
... map(lambda t:(t[0],sum(t[1])))
>>> wordCountsWithGroup.foreach(print)
('two', 2)
('three', 3)
('one', 1)

显然,上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一 样的,但是,它们的内部运算过程是不同的 。

3)keys

keys只会把Pair RDD中的key返回形成一个新的RDD 。即取出键值对RDD中的键。代码示意如下:

>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.keys().foreach(print)
Hadoop
Spark
Spark

4)values

values只会把Pair RDD中的value返回形成一个新的RDD 。即取出键值对RDD中的值。代码示意如下:

>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.values().foreach(print)
1

5)sortByKey()

sortByKey()的功能是返回一个根据 排序的RDD 。因此在排序的任务中,也需要先构造键值对RDD。代码示意如下:

>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.foreach(print)
('Hadoop', 1)
('Spark', 1)
('Hive', 1)
('Spark', 1)
>>> pairRDD.sortByKey().foreach(print)
('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)

实际上,除了可以根据键进行排序外,Spark还提供了用户自定义排序变量的方法。即使用sortBy() 函数来代替sortByKey()。sortBy() 函数示例如下:

>>> d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42), \
... ("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)])
>>> d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x,False).collect()
[('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)]
>>> d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[0],False).collect()
[('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)]
>>> d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],False).collect()
[('a', 42), ('b', 38), ('f', 29), ('c', 27), ('g', 21), ('e', 17), ('d', 9)]

通过lambda函数来指定, 还是 来作为排序变量。

6)mapValues(func)

对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化 。

>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD1 = pairRDD.mapValues(lambda x:x+1)
>>> pairRDD1.foreach(print)
('Hadoop', 2)
('Spark', 2)
('Hive', 2)
('Spark', 2)

7)join

join就表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集 。

>>> pairRDD1 = sc.parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)])
>>> pairRDD2 = sc.parallelize([("spark","fast")])
>>> pairRDD3 = pairRDD1.join(pairRDD2)
>>> pairRDD3.foreach(print)
('spark', (1, 'fast'))