参考1:
http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html
参考2:
https://www.cnblogs.com/Jaryer/p/13558701.html
在对spark sql 中的dataframe数据表去除重复数据的时候可以使用
dropDuplicates()
方法
-
第一个
def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。
-
第二个
def dropDuplicates(colNames: Seq[String])
传入的参数是一个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进行去重,然后也是返回每一行数据出现的第一条
-
第三个
def dropDuplicates(colNames: Array[String])
传入的参数是一个数组,然后方法会把数组转换为序列然后再调用第二个方法
-
第四个
def dropDuplicates(col1: String, cols: String*)
传入的参数为字符串,在方法体内会把你传入的字符串组合成一个序列再调用第二个方法。
dropDuplicates去重原则:按数据行的顺序保留每行数据出现的第一条
新建一个 dataframe :
val conf = new SparkConf().setAppName("TTyb").setMaster("local")
val sc = new SparkContext(conf)
val spark = new SQLContext(sc)
val dataFrame = spark.createDataFrame(Seq(
(1, 1, "2", "5"),
(2, 2, "3", "6"),
(2, 2, "35", "68"),
(2, 2, "34", "67"),
(2, 2, "38", "68"),
(3, 2, "36", "69"),
(1, 3, "4", null)
)).toDF("id", "label", "col1", "col2")
想根据 id 和 lable 来删除重复行,即删掉 id=2 且 lable=2 的重复行。利用 distinct 无法删
dataframe.distinct().show()
+---+-----+----+----+
| id|label|col1|col2|
+---+-----+----+----+
| 1| 1| 2| 5|
| 2| 2| 3| 6|
| 2| 2| 35| 68|
| 2| 2| 34| 67|
| 2| 2| 38| 68|
| 3| 2| 36| 69|
| 1| 3| 4|null|
+---+-----+----+----+
利用 dropDuplicates 可以根据 ID 来删除:
dataFrame.dropDuplicates("id","label").show()
+---+-----+----+----+
| id|label|col1|col2|
+---+-----+----+----+
| 2| 2| 3| 6|
| 1| 1| 2| 5|
| 1| 3| 4|null|
| 3| 2| 36| 69|
+---+-----+----+----+
from pyspark.sql import Row
df = sc.parallelize([ \
Row(name='Alice', age=5, height=80), \
Row(name='Alice', age=5, height=80), \
Row(name='Alice', age=10, height=80)]).toDF()
df.dropDuplicates().show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
|Alice| 10| 80|
+-----+---+------+
df.dropDuplicates(['name', 'height']).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
+-----+---+------+
按time取最新的日期一条数据
from pyspark.sql import Row
from pyspark.sql import function as F
def main(sparkSession):
df = sc.parallelize([ \
Row(name='Alice', age=5, time='20100312'), \
Row(name='Alice', age=5, time='20100312'), \
Row(name='Alice', age=5, time='20100313'), \
Row(name='Alice', age=8, time='20100313'), \
Row(name='Alice', age=10,time='20100314'), \
Row(name='Zmzzz', age=10,time='20100315')]).toDF()
df.dropDuplicates().show()
ddf = dropDuplicates(['name','time'])
ddf.show()
dddf = ddf.orderBy(F.col('time').desc())
ddddf = dddf.dropDuplicates(['name'])
ddddf.show()
原文:https://www.cnblogs.com/TTyb/p/8507237.html
原文链接:
https://www.cnblogs.com/TTyb/p/8507237.html
Spark学习笔记(一):Spark概述与运行原理
Spark学习笔记(二):RDD编程基础
Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据
Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范
•DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的
1、collect() ,返回值是一个数组,返回dataframe集合所有的行
2、collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行
3、count() 返回一个number类型的,返回dataframe集合的行数
4、describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以...
问题描述:原始数据data总行数是1303638,使用data.drop()后数据总行数是1303638,使用data.na.drop()后数据总行数是0;为啥data.drop()没有丢弃null或nan的数据?
1)data.drop()如果不传递列名,不会做任何操作;
2)通过以下比较发现,drop是用来丢弃列的,而na.drop是用来丢弃行的;
3)通过以下比较发现,dataframe.drop是直接调用的dataset中drop接口;
4)如果想要...
双重group by将去重分成了两步,是分组聚合运算,group by操作能进行多个reduce任务并行处理,每个reduce都能收到一部分数据然后进行分组内去重,不再像distinct只有一个reduce进行全局去重.sql中最简单的方式,当数据量小的时候性能还好.当数据量大的时候性能较差.因为distinct全局只有一个reduce任务来做去重操作,极容易发生数据倾斜的情况,整体运行效率较慢.DataFrame中,可以先将分区内数据进行排序,然后通过dropDuplicates将重复的数据删除.
awk是一种编程语言,用于在linux/unix下对文本和数据进行处理。数据可以来自标准输入(stdin)、一个或多个文件,或其它命令的输出。它支持用户自定义函数和动态正则表达式等先进功能,是linux/unix下的一个强大编程工具。它在命令行中使用,但更多是作为脚本来使用。awk有很多内建的功能,比如数组、函数等,这是它和C语言的相同之处,灵活性
Spark 是大数据领域的一大利器,花时间总结了一下 Spark 常用算子,正所谓温故而知新。
Spark 算子按照功能分,可以分成两大类:transform 和 action。Transform 不进行实际计算,是惰性的,action 操作才进行实际的计算。如何区分两者?看函数返回,如果输入到输出都是RDD类型,则认为是transform操作,反之为action操作。
准备阶段包括spar...
一、使用语法及参数
使用语法:
DataFrame.drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False)
subset – 指定特定的列 默认所有列
keep:{‘first’, ‘last’, False} – 删除重复项并保留第一次出现的项 默认第一个
keep=F
Action 操作
1、 collect() ,返回值是一个数组,返回dataframe集合所有的行
2、 collectAsList() 返回值是一个Java类型的数组,返回dataframe集合所有的行
3、 count() 返回一个number类型的,返回dataframe集合的行数
4、 describe(cols: String*) 返回一个通过数学计算的类表...