本文介绍如何通过spark sql对数据进行各种的合并操作,包括:列合并,行合并,相同key的合并等等。
在实际的数据处理场景中,数据的合并操作非常常用,这里介绍如何通过spark sql来完成常用的合并操作。
准备以下数据:
name,address,age,id,time
david,shenzhen,31,1,201903
eason,shenzhen,27,2,201904
jarry,wuhan,35,3,201904
aarry2,wuhan1,34,4,201904
barry3,wuhan2,33,5,201904
carry4,wuhan3,32,6,201904
darry5,wuhan4,31,7,201903
earry6,wuhan9,30,8,201903
david,shenzhen,31,1,201903
eason,shenzhen,27,2,201904
jarry,wuhan,35,3,201904
aarry2,wuhan1,34,4,201904
barry3,wuhan2,33,5,201904
carry4,wuhan3,32,6,201904
darry5,wuhan4,31,7,201903
earry6,wuhan9,30,8,201903
david,shenzhen,31,1,201903
eason,shenzhen,27,2,201904
carry4,wuhan3,32,6,201904
把以上数据保存到文件:idtimedata.csv中,并把该文件保存到hdfs的/curdata/目录下。
然后先在代码前进行sparksession的初始化:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.enableHiveSupport().appName("spark sql merge data").getOrCreate()
# 可以把文件放在本地,也可以放到hdfs中,这里放到本地为例
df = spark.read.csv("/Users/hover/work/testdata/idtimedata.csv", header=True)
通过SparkSql的接口函数可以方便的对数据进行合并操作。
假设你有一个dataframe:df1,可以通过以下方法来合并该df1的字段。
>>df = spark.createDataFrame([('abcd', '123')], ['s', 'd'])
>>df.select(F.concat_ws('-', "s", "d").alias('s')).show()
# 合并多个列
>> merge_cols = [c for c in df.columns if c != 'id']
>> df.select(F.concat_ws(";", *merge_cols).alias("merged_data")).show(truncate=False)
+------------------------+
|merged_data |
+------------------------+
|david;shenzhen;31;201903|
|eason;shenzhen;27;201904|
|jarry;wuhan;35;201904 |
|aarry2;wuhan1;34;201904 |
|barry3;wuhan2;33;201904 |
|carry4;wuhan3;32;201904 |
|darry5;wuhan4;31;201903 |
|earry6;wuhan9;30;201903 |
该函数可以把一列的数据合并成一行,并按python的List方式保存,但注意:该函数不会去重。
>> df.select(F.collect_list("id").alias("id_merged")).show(truncate=False)
+---------------------------------------------------------+
|id_merged |
+---------------------------------------------------------+
|[1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 6]|
+---------------------------------------------------------+
可以通过sort_array对合并的lis值进行排序。
>> df.select(F.sort_array(F.collect_list("id"))).show(truncate=False)
+---------------------------------------------------------+
|sort_array(collect_list(id), true) |
+---------------------------------------------------------+
|[1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 6, 7, 7, 8, 8]|
+---------------------------------------------------------+
通过collect_set函数对合并的数据进行去重。
>> df.select(F.sort_array(F.collect_set("id"))).show(truncate=False)
+---------------------------------+
|sort_array(collect_set(id), true)|
+---------------------------------+
|[1, 2, 3, 4, 5, 6, 7, 8] |
+---------------------------------+
这种情况更加常用。先按某个列聚合,再合并其他列的数据。
# 把相同id的人的name聚合成一个list
>> df.groupby("id").agg(F.collect_list("name").alias("name_merge")).show(truncate=False)
+---+------------------------+
|id |name_merge |
+---+------------------------+
|7 |[darry5, darry5] |
|3 |[jarry, jarry] |
|8 |[earry6, earry6] |
|5 |[barry3, barry3] |
|6 |[carry4, carry4, carry4]|
|1 |[david, david, david] |
|4 |[aarry2, aarry2] |
|2 |[eason, eason, eason] |
+---+------------------------+
>> df.select(F.size(F.collect_list("id"))).show(truncate=False)
+----------------------+
|size(collect_list(id))|
+----------------------+
|19 |
+----------------------+
通过函数pyspark.sql.functions.create_map(*cols)可以把一个或多个key-value,其中key对应一列的值,而valu对应一列的值。
>> df.withColumn("finRes", F.create_map([df.id, df.name])).select("finRes").show(truncate=False)
+----------------+
|finRes |
+----------------+
|Map(1 -> david) |
|Map(2 -> eason) |
|Map(3 -> jarry) |
|Map(4 -> aarry2)|
|Map(5 -> barry3)|
|Map(6 -> carry4)|
|Map(7 -> darry5)|
|Map(8 -> earry6)|
|Map(1 -> david) |
|Map(2 -> eason) |
|Map(3 -> jarry) |
|Map(4 -> aarry2)|
create_map函数的参数是多个列名,它们必须成对出现。作为key-value。
>> df3 = df.withColumn("finRes", F.create_map([df.id, df.name, df.name, df.age])).select("finRes")
+------------------------------+
|finRes |
+------------------------------+
|Map(1 -> david, david -> 31) |
|Map(2 -> eason, eason -> 27) |
|Map(3 -> jarry, jarry -> 35) |
|Map(4 -> aarry2, aarry2 -> 34)|
|Map(5 -> barry3, barry3 -> 33)|
|Map(6 -> carry4, carry4 -> 32)|
|Map(7 -> darry5, darry5 -> 31)|
|Map(8 -> earry6, earry6 -> 30)|
|Map(1 -> david, david -> 31) |
|Map(2 -> eason, eason -> 27) |
|Map(3 -> jarry, jarry -> 35) |
|Map(4 -> aarry2, aarry2 -> 34)|
|Map(5 -> barry3, barry3 -> 33)|
|Map(6 -> carry4, carry4 -> 32)|
|Map(7 -> darry5, darry5 -> 31)|
|Map(8 -> earry6, earry6 -> 30)|
|Map(1 -> david, david -> 31) |
|Map(2 -> eason, eason -> 27) |
|Map(6 -> carry4, carry4 -> 32)|
+------------------------------+
需要注意的是:create_map不会对相同的key值进行合并。若想把相同key的值进行合并,需要自己来完成。比如先进行groupby,在进行create_map操作。
>> df.withColumn("finRes", F.create_map([df.id, F.concat_ws(',', df.name, df.age)])).select("finRes").show(truncate=False)
+-------------------+
|finRes |
+-------------------+
|Map(1 -> david,31) |
|Map(2 -> eason,27) |
|Map(3 -> jarry,35) |
|Map(4 -> aarry2,34)|
|Map(5 -> barry3,33)|
|Map(6 -> carry4,32)|
|Map(7 -> darry5,31)|
|Map(8 -> earry6,30)|
在spark-2.4中提供了多个对数组值进行操作的函数:
array_union函数可以把两个数组值的列合并在一起。
from pyspark.sql import Row
df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2=["c", "d", "a", "f"])])
df.select(F.array_union(df.c1, df.c2)).collect()
[Row(array_union(c1, c2)=['b', 'a', 'c', 'd', 'f'])]
注意:在使用该函数时,合并的值必须是[]类型。
pyspark.sql.functions.array_except(col1, col2)
array_except返回一个值的数组,该值出现在col1的数组中,但不在col2数组中,返回的值不会重复。
>>> df = spark.createDataFrame([Row(c1=["b", "a", "c", "e", "e"], c2=["c", "d", "a", "f"])])
>>> df.show()
+---------------+------------+
| c1| c2|
+---------------+------------+
|[b, a, c, e, e]|[c, d, a, f]|
+---------------+------------+
>>> df.select(F.array_except(df.c1, df.c2)).show()
+--------------------+
|array_except(c1, c2)|
+--------------------+
| [b, e]|
+--------------------+
说明:可以看到,该函数返回其值在col1数组中出现,而不在col2数组中出现过的值。并以数组的形式返回。
pyspark.sql.functions.array_intersect(col1, col2)
该函数求同时出现在col1和col2的数组中的值,并以数组返回。结果会进行去重。
>>> df = spark.createDataFrame([Row(c1=["b", "a", "c", "c", "e"], c2=["c", "d", "a", "f"])])
>>> df.select(F.array_intersect(df.c1, df.c2)).show()
+-----------------------+
|array_intersect(c1, c2)|
+-----------------------+
| [a, c]|
+-----------------------+
说明:可以看到a,c是同时出现在col1和col2数组值中的元素,结果也去重了。
pyspark.sql.functions.array_join(col, delimiter, null_replacement=None)
可以指定分隔符,若为None,则忽略。若为NULL则替换成NULL值。
>>> df = spark.createDataFrame([(["a", "b", "c"],), (["a", None],)], ['data'])
>>> df.select(F.array_join(df.data, "-").alias("joined")).show()
+------+
|joined|
+------+
| a-b-c|
| a|
+------+
>>> df.select(F.array_join(df.data, ",", "NULL").alias("joined")).show()
+------+
|joined|
+------+
| a,b,c|
|a,NULL|
+------+
array_min:返回每个数组值中的最小值。
array_max:返回每个数组值中的最大值。
- 返回一个新的值,重复一个值count次,并形成一个数组值
pyspark.sql.functions.array_repeat(col, count)
其中count是重复的次数。
>>> df = spark.createDataFrame([('ab',)], ['data'])
>>> df.show()
+----+
|data|
+----+
| ab|
+----+
>>> df.select(F.array_repeat(df.data, 3).alias('r')).show()
+------------+
| r|
+------------+
|[ab, ab, ab]|
+------------+
pyspark.sql.functions.array_sort(col)
以升序对输入数组进行排序。输入数组的元素必须是可排序的。空元素将放置在返回数组的末尾。
>>> df = spark.createDataFrame([([2, 1, None, 3],),([1],),([],)], ['data'])
>>> df.show()
+----------+
| data|
+----------+
|[2, 1,, 3]|
| [1]|
| []|
+----------+
>>> df.select(F.array_sort(df.data).alias('r')).show()
+----------+
| r|
+----------+
|[1, 2, 3,]|
| [1]|
| []|
+----------+
pyspark.sql.functions.array_distinct(col)
>>> df = spark.createDataFrame([([1, 2, 3, 2],), ([4, 5, 5, 4],)], ['data'])
>>> df.show()
+------------+
| data|
+------------+
|[1, 2, 3, 2]|
|[4, 5, 5, 4]|
+------------+
>>> df.select(F.array_distinct(df.data)).show()
+--------------------+
|array_distinct(data)|
+--------------------+
| [1, 2, 3]|
| [4, 5]|
+--------------------+
- 对数组对应位置的数据进行合并:arrays_zip
pyspark.sql.functions.arrays_zip(*cols)
>>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
>>> df.show()
+---------+---------+
| vals1| vals2|
+---------+---------+
|[1, 2, 3]|[2, 3, 4]|
+---------+---------+
>>> df.select(F.arrays_zip(df.vals1, df.vals2).alias('zipped')).show(truncate=False)
+------------------------+
|zipped |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+
# 若数组值中对应的位置没有值,则合并时设置为空
>>> df = spark.createDataFrame([(([1, 2], [2, 3, 4]))], ['vals1', 'vals2'])
>>> df.select(F.arrays_zip(df.vals1, df.vals2).alias('zipped')).show(truncate=False)
+-----------------------+
|zipped |
+-----------------------+
|[[1, 2], [2, 3], [, 4]]|
+-----------------------+
在使用该函数时需要注意:两个dataframe的列的个数必须相同,并且两个dataframe的列的类型也必须相同,否则会报错。
>>> d2 = [([1], 1), ([3], 2)]
>>> df2 = spark.createDataFrame(d2, ['c1', 'c2'])
>>> df2.show()
+---+---+
| c1| c2|
+---+---+
|[1]| 1|
|[3]| 2|
+---+---+
>>> d1 = [{'name': 'Alice', 'age': 1}]
>>> df1 = spark.createDataFrame(d1, ['c1', 'c2'])
>>> df1.show()
+-----+---+
| c1| c2|
+-----+---+
|Alice| 1|
| bob| 2|
+-----+---+
>>> df2.union(df1).show()
报错,错误信息如下:
pyspark.sql.utils.AnalysisException: u"Union can only be performed on tables with the compatible column types. string <> array<bigint> at the first column of the second table;;\n'Union\n:- LogicalRDD [c1#684, c2#685L], false\n+- LogicalRDD [c1#609, c2#610L], false\n"
- 找出两个dataframe中的相同数据行: intersect
intersect(other)
intersect函数返回同时在两个dataframe中存在的行。要 注意:该函数要求两个dataframe的列和列的类型必须相同。否则会报错。
>>> d2 = [('name1', 1), ('name2', 2)]
>>> df2 = spark.createDataFrame(d2, ['c1', 'c2'])
>>> d1 = [('bob', 3)]
>>> df1 = spark.createDataFrame(d1, ['c1', 'c2'])
>>> df1.intersect(df2).show()
+---+---+
| c1| c2|
+---+---+
+---+---+
>>> d1 = [('bob', 3), ('name1', 1)]
>>> df1 = spark.createDataFrame(d1, ['c1', 'c2'])
>>> df1.intersect(df2).show()
+-----+---+
| c1| c2|
+-----+---+
|name1| 1|
+-----+---+
本节讲述了如何通过spark-sql(python)来完成数据的合并。包括列和行的合并,对于日常的数据处理来说这些操作都非常有用,需要熟练掌握。但本节没有包括join的操作讲解,join操作情况比较多,放在另外一节进行讲解。
Usage: spark-sql-perf [options]
-b <value> | --benchmark <value>
the name of the benchmark to run
-m <value> | --master <value> | --filter <value>
a filter on the name of the queries to run
-i <value> | --iteration
concat()的一个特殊形式,表示concat with separator,两个参数之间加上特定的分隔符。如果分割符为null,则返回null,参数为null,则忽略该参数。2、将Hadoop etc/hadoop文件夹中的 hdfs-site.xml、core-site.xml 拷贝到spark的conf下。1、拷贝Hive conf文件夹中的 hive-site.xml 文件夹到 spark的conf下(配置需要与资料中的文件保持一致!连接参数的函数,返回结果为连接参数的字符串。
Spark Doris Connector(apache-doris-spark-connector-2.3_2.11-1.0.1-incubating-src.tar.gz)
Spark Doris Connector Version:1.0.1
Spark Version:2.x
Scala Version:2.11
Apache Doris是一个现代MPP分析数据库产品。它可以提供亚秒级查询和高效的实时数据分析。通过它的分布式架构,高达10PB级的数据集将得到很好的支持,易于操作。
Apache Doris可以满足各种数据分析需求,包括历史数据报告、实时数据分析、交互式数据分析和探索性数据分析。让您的数据分析更容易!
本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系
array(expr, …)
返回给定元素组成的数组。
SELECT array(1, 2, 3);
+--------------+
|array(1, 2, 3)|
+--------------+
| [1, 2, 3]|
+--------------+
array_cont
一、首先使用sparksql读取需要合并的数据。当然有两种情况,
一种是读取全部数据,即需要合并所有小文件。
第二种是合并部分数据,比如只查询某一天的数据,只合并某一个天分区下的小文件。
val df: DataFrame = spark.sql("sql")
二、将读取到的数据写入临时文件中。此处需注意使用coalesce方法对文件进行合并。
df.coalesce(1).wri...
spark sql一.概述1 spark历史2 Spark-SQL 概述2.1 特点2.2 作用2.3 Spark SQL架构图3 Dataset演进历史3.1 RDD3.1.1 优点3.1.2 缺点3.2 DataFrame3.2.1 优点3.2.2 缺点3.2.3 核心特征3.3 Dataset3.3.1 区别3.3.2 特点4 SparkSQL API4.1创建SparkSession4.2...
一、引子项目中遇到这样一张表:userSididid_Typetags_1email性别:男s_1email年龄:12s_113866660000phone会员:是s_2email性别:男要求对这个表按照sid 进行聚合,将所有的id聚合成一个json,所有的tag聚合成一个json。在hive和Spark中,对tag的聚合相对简单,用聚合函数collect_list 或者collect_set(...