本文介绍如何通过spark sql对数据进行各种的合并操作,包括:列合并,行合并,相同key的合并等等。

在实际的数据处理场景中,数据的合并操作非常常用,这里介绍如何通过spark sql来完成常用的合并操作。

准备以下数据:

name,address,age,id,time
david,shenzhen,31,1,201903
eason,shenzhen,27,2,201904
jarry,wuhan,35,3,201904
aarry2,wuhan1,34,4,201904
barry3,wuhan2,33,5,201904
carry4,wuhan3,32,6,201904
darry5,wuhan4,31,7,201903
earry6,wuhan9,30,8,201903
david,shenzhen,31,1,201903
eason,shenzhen,27,2,201904
jarry,wuhan,35,3,201904
aarry2,wuhan1,34,4,201904
barry3,wuhan2,33,5,201904
carry4,wuhan3,32,6,201904
darry5,wuhan4,31,7,201903
earry6,wuhan9,30,8,201903
david,shenzhen,31,1,201903
eason,shenzhen,27,2,201904
carry4,wuhan3,32,6,201904

把以上数据保存到文件:idtimedata.csv中,并把该文件保存到hdfs的/curdata/目录下。

  • sparksession初始化

然后先在代码前进行sparksession的初始化:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.enableHiveSupport().appName("spark sql merge data").getOrCreate()
# 可以把文件放在本地,也可以放到hdfs中,这里放到本地为例
df = spark.read.csv("/Users/hover/work/testdata/idtimedata.csv", header=True)

列数据合并

通过SparkSql的接口函数可以方便的对数据进行合并操作。

1. 把多列合并成一列字符串,并按指定分隔符分割。

假设你有一个dataframe:df1,可以通过以下方法来合并该df1的字段。

>>df = spark.createDataFrame([('abcd', '123')], ['s', 'd'])
>>df.select(F.concat_ws('-', "s", "d").alias('s')).show()
 
# 合并多个列
>> merge_cols = [c for c in df.columns if c != 'id']
>> df.select(F.concat_ws(";", *merge_cols).alias("merged_data")).show(truncate=False)
+------------------------+
|merged_data             |
+------------------------+
|david;shenzhen;31;201903|
|eason;shenzhen;27;201904|
|jarry;wuhan;35;201904   |
|aarry2;wuhan1;34;201904 |
|barry3;wuhan2;33;201904 |
|carry4;wuhan3;32;201904 |
|darry5;wuhan4;31;201903 |
|earry6;wuhan9;30;201903 |

2. 把多列的值合并成一个list

该函数可以把一列的数据合并成一行,并按python的List方式保存,但注意:该函数不会去重。

>> df.select(F.collect_list("id").alias("id_merged")).show(truncate=False)
+---------------------------------------------------------+
|id_merged                                                |
+---------------------------------------------------------+
|[1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 6]|
+---------------------------------------------------------+
 

可以通过sort_array对合并的lis值进行排序。

>> df.select(F.sort_array(F.collect_list("id"))).show(truncate=False)
+---------------------------------------------------------+
|sort_array(collect_list(id), true)                       |
+---------------------------------------------------------+
|[1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 6, 7, 7, 8, 8]|
+---------------------------------------------------------+
 

通过collect_set函数对合并的数据进行去重。

>> df.select(F.sort_array(F.collect_set("id"))).show(truncate=False)
+---------------------------------+
|sort_array(collect_set(id), true)|
+---------------------------------+
|[1, 2, 3, 4, 5, 6, 7, 8]         |
+---------------------------------+
 

这种情况更加常用。先按某个列聚合,再合并其他列的数据。

# 把相同id的人的name聚合成一个list
>> df.groupby("id").agg(F.collect_list("name").alias("name_merge")).show(truncate=False)
+---+------------------------+
|id |name_merge              |
+---+------------------------+
|7  |[darry5, darry5]        |
|3  |[jarry, jarry]          |
|8  |[earry6, earry6]        |
|5  |[barry3, barry3]        |
|6  |[carry4, carry4, carry4]|
|1  |[david, david, david]   |
|4  |[aarry2, aarry2]        |
|2  |[eason, eason, eason]   |
+---+------------------------+
 
>> df.select(F.size(F.collect_list("id"))).show(truncate=False)
+----------------------+
|size(collect_list(id))|
+----------------------+
|19                    |
+----------------------+

3. 合并成一个map格式的数据

通过函数pyspark.sql.functions.create_map(*cols)可以把一个或多个key-value,其中key对应一列的值,而valu对应一列的值。

>> df.withColumn("finRes", F.create_map([df.id, df.name])).select("finRes").show(truncate=False)
+----------------+
|finRes          |
+----------------+
|Map(1 -> david) |
|Map(2 -> eason) |
|Map(3 -> jarry) |
|Map(4 -> aarry2)|
|Map(5 -> barry3)|
|Map(6 -> carry4)|
|Map(7 -> darry5)|
|Map(8 -> earry6)|
|Map(1 -> david) |
|Map(2 -> eason) |
|Map(3 -> jarry) |
|Map(4 -> aarry2)|
 

create_map函数的参数是多个列名,它们必须成对出现。作为key-value。

>> df3 = df.withColumn("finRes", F.create_map([df.id, df.name, df.name, df.age])).select("finRes")
+------------------------------+
|finRes                        |
+------------------------------+
|Map(1 -> david, david -> 31)  |
|Map(2 -> eason, eason -> 27)  |
|Map(3 -> jarry, jarry -> 35)  |
|Map(4 -> aarry2, aarry2 -> 34)|
|Map(5 -> barry3, barry3 -> 33)|
|Map(6 -> carry4, carry4 -> 32)|
|Map(7 -> darry5, darry5 -> 31)|
|Map(8 -> earry6, earry6 -> 30)|
|Map(1 -> david, david -> 31)  |
|Map(2 -> eason, eason -> 27)  |
|Map(3 -> jarry, jarry -> 35)  |
|Map(4 -> aarry2, aarry2 -> 34)|
|Map(5 -> barry3, barry3 -> 33)|
|Map(6 -> carry4, carry4 -> 32)|
|Map(7 -> darry5, darry5 -> 31)|
|Map(8 -> earry6, earry6 -> 30)|
|Map(1 -> david, david -> 31)  |
|Map(2 -> eason, eason -> 27)  |
|Map(6 -> carry4, carry4 -> 32)|
+------------------------------+

需要注意的是:create_map不会对相同的key值进行合并。若想把相同key的值进行合并,需要自己来完成。比如先进行groupby,在进行create_map操作。

>> df.withColumn("finRes", F.create_map([df.id, F.concat_ws(',', df.name, df.age)])).select("finRes").show(truncate=False)
+-------------------+
|finRes             |
+-------------------+
|Map(1 -> david,31) |
|Map(2 -> eason,27) |
|Map(3 -> jarry,35) |
|Map(4 -> aarry2,34)|
|Map(5 -> barry3,33)|
|Map(6 -> carry4,32)|
|Map(7 -> darry5,31)|
|Map(8 -> earry6,30)|

4. 数组值操作(spark-2.4)

在spark-2.4中提供了多个对数组值进行操作的函数:

  • 合并两个数组值的列: array_union

array_union函数可以把两个数组值的列合并在一起。

from pyspark.sql import Row
df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2=["c", "d", "a", "f"])])
df.select(F.array_union(df.c1, df.c2)).collect()
[Row(array_union(c1, c2)=['b', 'a', 'c', 'd', 'f'])]

注意:在使用该函数时,合并的值必须是[]类型。

  • 取数组值的交集:array_except
pyspark.sql.functions.array_except(col1, col2)

array_except返回一个值的数组,该值出现在col1的数组中,但不在col2数组中,返回的值不会重复。

>>> df = spark.createDataFrame([Row(c1=["b", "a", "c", "e", "e"], c2=["c", "d", "a", "f"])])
>>> df.show()
+---------------+------------+
|             c1|          c2|
+---------------+------------+
|[b, a, c, e, e]|[c, d, a, f]|
+---------------+------------+
>>> df.select(F.array_except(df.c1, df.c2)).show()
+--------------------+
|array_except(c1, c2)|
+--------------------+
|              [b, e]|
+--------------------+

说明:可以看到,该函数返回其值在col1数组中出现,而不在col2数组中出现过的值。并以数组的形式返回。

  • 去数组值的交接:array_intersect
pyspark.sql.functions.array_intersect(col1, col2)

该函数求同时出现在col1和col2的数组中的值,并以数组返回。结果会进行去重。

>>> df = spark.createDataFrame([Row(c1=["b", "a", "c", "c", "e"], c2=["c", "d", "a", "f"])])
>>> df.select(F.array_intersect(df.c1, df.c2)).show()
+-----------------------+
|array_intersect(c1, c2)|
+-----------------------+
|                 [a, c]|
+-----------------------+

说明:可以看到a,c是同时出现在col1和col2数组值中的元素,结果也去重了。

  • 把数组合并成字符串:array_join
pyspark.sql.functions.array_join(col, delimiter, null_replacement=None)

可以指定分隔符,若为None,则忽略。若为NULL则替换成NULL值。

>>> df = spark.createDataFrame([(["a", "b", "c"],), (["a", None],)], ['data'])
>>> df.select(F.array_join(df.data, "-").alias("joined")).show()
+------+
|joined|
+------+
| a-b-c|
|     a|
+------+
>>> df.select(F.array_join(df.data, ",", "NULL").alias("joined")).show()
+------+
|joined|
+------+
| a,b,c|
|a,NULL|
+------+
  • 返回数组值中的最小和最大值

array_min:返回每个数组值中的最小值。

array_max:返回每个数组值中的最大值。

  • 返回一个新的值,重复一个值count次,并形成一个数组值
pyspark.sql.functions.array_repeat(col, count)

其中count是重复的次数。

>>> df = spark.createDataFrame([('ab',)], ['data'])
>>> df.show()
+----+
|data|
+----+
|  ab|
+----+
>>> df.select(F.array_repeat(df.data, 3).alias('r')).show()
+------------+
|           r|
+------------+
|[ab, ab, ab]|
+------------+
  • 对数组值进行排序:array_sort
pyspark.sql.functions.array_sort(col)

以升序对输入数组进行排序。输入数组的元素必须是可排序的。空元素将放置在返回数组的末尾。

>>> df = spark.createDataFrame([([2, 1, None, 3],),([1],),([],)], ['data'])
>>> df.show()
+----------+
|      data|
+----------+
|[2, 1,, 3]|
|       [1]|
|        []|
+----------+
>>> df.select(F.array_sort(df.data).alias('r')).show()
+----------+
|         r|
+----------+
|[1, 2, 3,]|
|       [1]|
|        []|
+----------+
  • 对数组列的值去重:array_distinct
pyspark.sql.functions.array_distinct(col)
>>> df = spark.createDataFrame([([1, 2, 3, 2],), ([4, 5, 5, 4],)], ['data'])
>>> df.show()
+------------+
|        data|
+------------+
|[1, 2, 3, 2]|
|[4, 5, 5, 4]|
+------------+
>>> df.select(F.array_distinct(df.data)).show()
+--------------------+
|array_distinct(data)|
+--------------------+
|           [1, 2, 3]|
|              [4, 5]|
+--------------------+
  • 对数组对应位置的数据进行合并:arrays_zip
pyspark.sql.functions.arrays_zip(*cols)
>>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
>>> df.show()
+---------+---------+
|    vals1|    vals2|
+---------+---------+
|[1, 2, 3]|[2, 3, 4]|
+---------+---------+
>>> df.select(F.arrays_zip(df.vals1, df.vals2).alias('zipped')).show(truncate=False)
+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+
# 若数组值中对应的位置没有值,则合并时设置为空
>>> df = spark.createDataFrame([(([1, 2], [2, 3, 4]))], ['vals1', 'vals2'])
>>> df.select(F.arrays_zip(df.vals1, df.vals2).alias('zipped')).show(truncate=False)
+-----------------------+
|zipped                 |
+-----------------------+
|[[1, 2], [2, 3], [, 4]]|
+-----------------------+

行数据合并

  • 合并一个或多个dataframe:union

在使用该函数时需要注意:两个dataframe的列的个数必须相同,并且两个dataframe的列的类型也必须相同,否则会报错。

>>> d2 = [([1], 1), ([3], 2)]
>>> df2 = spark.createDataFrame(d2, ['c1', 'c2'])
>>> df2.show()
+---+---+
| c1| c2|
+---+---+
|[1]|  1|
|[3]|  2|
+---+---+
>>> d1 = [{'name': 'Alice', 'age': 1}]
>>> df1 = spark.createDataFrame(d1, ['c1', 'c2'])
>>> df1.show()
+-----+---+
|   c1| c2|
+-----+---+
|Alice|  1|
|  bob|  2|
+-----+---+
>>> df2.union(df1).show()
报错,错误信息如下:
pyspark.sql.utils.AnalysisException: u"Union can only be performed on tables with the compatible column types. string <> array<bigint> at the first column of the second table;;\n'Union\n:- LogicalRDD [c1#684, c2#685L], false\n+- LogicalRDD [c1#609, c2#610L], false\n"
  • 找出两个dataframe中的相同数据行: intersect
intersect(other)

intersect函数返回同时在两个dataframe中存在的行。要 注意:该函数要求两个dataframe的列和列的类型必须相同。否则会报错。

>>> d2 = [('name1', 1), ('name2', 2)]
>>> df2 = spark.createDataFrame(d2, ['c1', 'c2'])
>>> d1 = [('bob', 3)]
>>> df1 = spark.createDataFrame(d1, ['c1', 'c2'])
>>> df1.intersect(df2).show()
+---+---+
| c1| c2|
+---+---+
+---+---+
>>> d1 = [('bob', 3), ('name1', 1)]
>>> df1 = spark.createDataFrame(d1, ['c1', 'c2'])
>>> df1.intersect(df2).show()
+-----+---+
|   c1| c2|
+-----+---+
|name1|  1|
+-----+---+

本节讲述了如何通过spark-sql(python)来完成数据的合并。包括列和行的合并,对于日常的数据处理来说这些操作都非常有用,需要熟练掌握。但本节没有包括join的操作讲解,join操作情况比较多,放在另外一节进行讲解。

Usage: spark-sql-perf [options] -b <value> | --benchmark <value> the name of the benchmark to run -m <value> | --master <value> | --filter <value> a filter on the name of the queries to run -i <value> | --iteration concat()的一个特殊形式,表示concat with separator,两个参数之间加上特定的分隔符。如果分割符为null,则返回null,参数为null,则忽略该参数。2、将Hadoop etc/hadoop文件夹中的 hdfs-site.xml、core-site.xml 拷贝到spark的conf下。1、拷贝Hive conf文件夹中的 hive-site.xml 文件夹到 spark的conf下(配置需要与资料中的文件保持一致!连接参数的函数,返回结果为连接参数的字符串。 Spark Doris Connector(apache-doris-spark-connector-2.3_2.11-1.0.1-incubating-src.tar.gz) Spark Doris Connector Version:1.0.1 Spark Version:2.x Scala Version:2.11 Apache Doris是一个现代MPP分析数据库产品。它可以提供亚秒级查询和高效的实时数据分析。通过它的分布式架构,高达10PB级的数据集将得到很好的支持,易于操作。 Apache Doris可以满足各种数据分析需求,包括历史数据报告、实时数据分析、交互式数据分析和探索性数据分析。让您的数据分析更容易! 本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系 array(expr, …) 返回给定元素组成的数组。 SELECT array(1, 2, 3); +--------------+ |array(1, 2, 3)| +--------------+ | [1, 2, 3]| +--------------+ array_cont 一、首先使用sparksql读取需要合并数据。当然有两种情况,   一种是读取全部数据,即需要合并所有小文件。   第二种是合并部分数据,比如只查询某一天的数据,只合并某一个天分区下的小文件。 val df: DataFrame = spark.sql("sql") 二、将读取到的数据写入临时文件中。此处需注意使用coalesce方法对文件进行合并。 df.coalesce(1).wri... spark sql一.概述1 spark历史2 Spark-SQL 概述2.1 特点2.2 作用2.3 Spark SQL架构图3 Dataset演进历史3.1 RDD3.1.1 优点3.1.2 缺点3.2 DataFrame3.2.1 优点3.2.2 缺点3.2.3 核心特征3.3 Dataset3.3.1 区别3.3.2 特点4 SparkSQL API4.1创建SparkSession4.2... 一、引子项目中遇到这样一张表:userSididid_Typetags_1email性别:男s_1email年龄:12s_113866660000phone会员:是s_2email性别:男要求对这个表按照sid 进行聚合,将所有的id聚合成一个json,所有的tag聚合成一个json。在hive和Spark中,对tag的聚合相对简单,用聚合函数collect_list 或者collect_set(...