参考1: http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html

参考2: https://www.cnblogs.com/Jaryer/p/13558701.html
在对spark sql 中的dataframe数据表去除重复数据的时候可以使用 dropDuplicates() 方法

  • 第一个 def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns) 这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。
  • 第二个 def dropDuplicates(colNames: Seq[String]) 传入的参数是一个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进行去重,然后也是返回每一行数据出现的第一条
  • 第三个 def dropDuplicates(colNames: Array[String]) 传入的参数是一个数组,然后方法会把数组转换为序列然后再调用第二个方法
  • 第四个 def dropDuplicates(col1: String, cols: String*) 传入的参数为字符串,在方法体内会把你传入的字符串组合成一个序列再调用第二个方法。

dropDuplicates去重原则:按数据行的顺序保留每行数据出现的第一条

新建一个 dataframe :

val conf = new SparkConf().setAppName("TTyb").setMaster("local")
val sc = new SparkContext(conf)
val spark = new SQLContext(sc)
val dataFrame = spark.createDataFrame(Seq(
  (1, 1, "2", "5"),
  (2, 2, "3", "6"),
  (2, 2, "35", "68"),
  (2, 2, "34", "67"),
  (2, 2, "38", "68"),
  (3, 2, "36", "69"),
  (1, 3, "4", null)
)).toDF("id", "label", "col1", "col2")

想根据 id 和 lable 来删除重复行,即删掉 id=2 且 lable=2 的重复行。利用 distinct 无法删

dataframe.distinct().show()
+---+-----+----+----+
| id|label|col1|col2|
+---+-----+----+----+
|  1|    1|   2|   5|
|  2|    2|   3|   6|
|  2|    2|  35|  68|
|  2|    2|  34|  67|
|  2|    2|  38|  68|
|  3|    2|  36|  69|
|  1|    3|   4|null|
+---+-----+----+----+

利用 dropDuplicates 可以根据 ID 来删除:

dataFrame.dropDuplicates("id","label").show()
+---+-----+----+----+
| id|label|col1|col2|
+---+-----+----+----+
|  2|    2|   3|   6|
|  1|    1|   2|   5|
|  1|    3|   4|null|
|  3|    2|  36|  69|
+---+-----+----+----+

demo2:

from pyspark.sql import Row
df = sc.parallelize([ \
    Row(name='Alice', age=5, height=80), \
    Row(name='Alice', age=5, height=80), \
    Row(name='Alice', age=10, height=80)]).toDF()
df.dropDuplicates().show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Alice| 10|    80|
+-----+---+------+
df.dropDuplicates(['name', 'height']).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
+-----+---+------+

demo3

按time取最新的日期一条数据

from pyspark.sql import Row
from pyspark.sql import function as F
def main(sparkSession):
 df = sc.parallelize([ \
    Row(name='Alice', age=5, time='20100312'), \
	Row(name='Alice', age=5, time='20100312'), \
    Row(name='Alice', age=5, time='20100313'), \
	Row(name='Alice', age=8, time='20100313'), \
    Row(name='Alice', age=10,time='20100314'), \
	Row(name='Zmzzz', age=10,time='20100315')]).toDF()
	df.dropDuplicates().show()
	ddf = dropDuplicates(['name','time'])
	ddf.show()
	dddf = ddf.orderBy(F.col('time').desc())
    ddddf = dddf.dropDuplicates(['name'])
    ddddf.show()
 

原文:https://www.cnblogs.com/TTyb/p/8507237.html

原文链接:
https://www.cnblogs.com/TTyb/p/8507237.html Spark学习笔记(一):Spark概述与运原理  Spark学习笔记(二):RDD编程基础  Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据 Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范 •DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的 1、collect() ,返回值是一个数组,返回dataframe集合所有的 2、collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的 3、count() 返回一个number类型的,返回dataframe集合的数 4、describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以... 问题描述:原始数据data总数是1303638,使用data.drop()后数据总数是1303638,使用data.na.drop()后数据总数是0;为啥data.drop()没有丢弃null或nan的数据? 1)data.drop()如果不传递列名,不会做任何操作; 2)通过以下比较发现,drop是用来丢弃列的,而na.drop是用来丢弃的; 3)通过以下比较发现,dataframe.drop是直接调用的dataset中drop接口; 4)如果想要... 双重group by将去重分成了两步,是分组聚合运算,group by操作能进多个reduce任务并处理,每个reduce都能收到一部分数据然后进分组内去重,不再像distinct只有一个reduce进全局去重.sql中最简单的方式,当数据量小的时候性能还好.当数据量大的时候性能较差.因为distinct全局只有一个reduce任务来做去重操作,极容易发生数据倾斜的情况,整体运效率较慢.DataFrame中,可以先将分区内数据进排序,然后通过dropDuplicates重复的数据删除. awk是一种编程语言,用于在linux/unix下对文本和数据进处理。数据可以来自标准输入(stdin)、一个或多个文件,或其它命令的输出。它支持用户自定义函数和动态正则表达式等先进功能,是linux/unix下的一个强大编程工具。它在命令中使用,但更多是作为脚本来使用。awk有很多内建的功能,比如数组、函数等,这是它和C语言的相同之处,灵活性 Spark 是大数据领域的一大利器,花时间总结了一下 Spark 常用算子,正所谓温故而知新。 Spark 算子按照功能分,可以分成两大类:transform 和 action。Transform 不进实际计算,是惰性的,action 操作才进实际的计算。如何区分两者?看函数返回,如果输入到输出都是RDD类型,则认为是transform操作,反之为action操作。 准备阶段包括spar... 一、使用语法及参数 使用语法: DataFrame.drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False) subset – 指定特定的列 默认所有列 keep:{‘first’, ‘last’, False} – 删除重复项并保留第一次出现的项 默认第一个 keep=F Action 操作 1、 collect() ,返回值是一个数组,返回dataframe集合所有的 2、 collectAsList() 返回值是一个Java类型的数组,返回dataframe集合所有的 3、 count() 返回一个number类型的,返回dataframe集合的数 4、 describe(cols: String*) 返回一个通过数学计算的类表...