pyspark之DataFrame数据处理学习【数据去重之一】

1、重复数据,例如

spark = SparkSession.builder.appName("dataDeal").getOrCreate()
df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])

>>> df.show()
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  4| 144.5|   5.9| 33|     M|
|  5| 133.2|   5.7| 54|     F|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+
上面的数据中存在如下问题:
有两行id等于3并且完全相同

id为1和4的两行是一样的数据,只是id不同,可以假定为是同一个人的数据

有两行的id等于5,这看上去是一个异常数据,因为他们看上去不像是同一个人的数据

2、检查是否有重复数据采用.distinct()方法

print ('Count of rows:{0}'.format(df.count()))
print ('Count of distinct rows:{0}'.format(df.distinct().count()))
>>> print ('Count of rows:{0}'.format(df.count()))
Count of rows:7
>>> print ('Count of distinct rows:{0}'.format(df.distinct().count()))
Count of distinct rows:6
>>> df.columns
['id', 'weight', 'height', 'age', 'gender']

可以看到返回的两个值不等,一个为6,一个为7.所以,可以判断出我们的数据集中有完全相同的行(即重复的数据)

3、移除重复的数据采用.dropDuplicates()方法

1)、#移除重复的数据
df = df.dropDuplicates()
#查看去重后的数据
df.show()

>>> df.show()
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  4| 144.5|   5.9| 33|     M|
|  1| 144.5|   5.9| 33|     M|
|  5| 129.2|   5.3| 42|     M|
|  5| 133.2|   5.7| 54|     F|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
+---+------+------+---+------+

通过结果可以看出,删除了一行id为3的记录

2)、接着可以通过重复之前的工作检查与id无关的重复数据

#对除id以外的列进行对比
print ("Count of ids:{0}".format(df.count()))
print ("Count of distinct ids:{0}".format(df.select([c for c in df.columns if c != 'id']).distinct().count()))
Count of ids:6
Count of distinct ids:5
可以继续使用.dropDuplicates()删除重复数据,但是需要使用subset参数来指定只处理除id以外的列。subset参数指明.dropDuplicates()方法只查找subset参数指定的列

#去掉除id以外其他属性相同的数据
df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
df.show()
>>> df.show()
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  4| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+
从结果可以看出,现在的数据没有任何一行是重复的(既没有完全相同的记录也没有除id以外相同的记录)

#去掉除id以外其他属性相同的数据

>>> df = df.dropDuplicates(subset=[c for c in df.columns if c in [ 'weight','height','age']])
>>> df.show()
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  2| 167.2|   5.4| 45|     M|
|  5| 133.2|   5.7| 54|     F|
|  5| 129.2|   5.3| 42|     M|
|  3| 124.1|   5.2| 23|     F|
|  4| 144.5|   5.9| 33|     M|
+---+------+------+---+------+

3)检测是否有重复的id

#计算id的总数和id的唯一数
import pyspark.sql.functions as fn
>>> df.agg(fn.count('id').alias('count'),fn.countDistinct('id').alias('distinct')).show()
+-----+--------+
|count|distinct|
+-----+--------+
|    5|       4|
+-----+--------+
.count()方法和.countDistinct()方法分别计算DataFrame的行数和id的唯一数。.alias()方法可以对返回的列指定一个别名。

从结果中可以看出,总共5条记录,但只有4个唯一id。假设id相同的数据是偶然事件,异常值,则将每一行给定一个唯一的id

4)#重新给每行分配id
df.withColumn('new_id',fn.monotonically_increasing_id()).show()
+---+------+------+---+------+-------------+
| id|weight|height|age|gender|       new_id|
+---+------+------+---+------+-------------+
|  2| 167.2|   5.4| 45|     M|  68719476736|
|  5| 133.2|   5.7| 54|     F| 395136991232|
|  5| 129.2|   5.3| 42|     M| 884763262976|
|  3| 124.1|   5.2| 23|     F| 962072674304|
|  4| 144.5|   5.9| 33|     M|1331439861760|
+---+------+------+---+------+-------------+
.monotonicallymonotonically_increasing_id()方法给每条记录提供一个唯一且递增的id

参考: https://blog.csdn.net/xiaoql520/article/details/78774581

pyspark之DataFrame数据处理学习【数据去重之一】1、重复数据,例如spark = SparkSession.builder.appName("dataDeal").getOrCreate()df = spark.createDataFrame([    (1, 144.5, 5.9, 33, 'M'),    (2, 167.2, 5.4, 45, 'M'),    ... a=[['A',1,'aa'],['A',2,'aa'],['B',3,'aa'],['C',4,'bb']] df = pd. DataFrame (a) df.columns = ['Type','Num','type2'] df.groupby('Type'). import pandas as pd sql="select * from uds.amz_daily_sales where purchase_date>='2021-04-01' " data=pd.read_sql(sql=sql,con=db) SELECT * FROM table_name --取出table中所有 数据 SELECT * FROM table_name limit 3 --取出table中的前3行 数据 SELECT * FROM table_name WHERE column_name = 'abc' --从table中... df.dtypes #查看各行的 数据 格式 df['列名'].astype(int)#转换某列的 数据 类型res_df['T'] = res_df['T'].apply(int) #切片操作 df.iloc[1:.
Spark SQL中的 DataFrame 类似于一张关系型 数据 表。在关系型 数据 库中对单表或进行的查询操作,在 DataFrame 中都可以通过调用其API接口来实现。可以参考,Scala提供的 DataFrame API。   本文中的代码基于Spark-1.6.2的文档实现。 一、 DataFrame 对象的生成   Spark-SQL可以以其他RDD对象、parquet文...
最好可以用RDD的就不要用 DataFrame 今日就遇到执行出现 SparkContext异常停止,怀疑是 DataFrame 的distinct操作和groupby一样并不在本地合并为最小集,导致最后崩溃;而后换成RDD.distinct()却是可以的。 经多次测试都是以上结论 测试 数据 一亿两千万条 结论:能用RDD的相关操作,就别用 DataFrame ,比如排序、统计count、disti...
// 首先读取零售业的采购 数据 ,然后对 数据 进行重划分以减少分区数量(因为我们事先知道仅有少量 数据 存储在大量的小文件里), // 最后将这些 数据 缓存起来以便后续的快速访问 val df = spark.read.format("csv") .option("header", true) .option("inferSchema", true) .load(inputPath) .coalesce(5) // 小分区合并
DataFrame 也是一种不可变的分布式 数据 集,类似于Python Pandas中的 DataFrame 和关系 数据 库中的表。在分布式 数据 集上施加表结构之后,就可以使用Spark SQL查询结构化的 数据 或者使用Spark表达式方法。 1. Spark SQL性能 未引入 DataFrame 之前,使用Python操作RDD时的查询速度比使用Scala和Java的查询慢很多,因为 Pyspark 需要将所有........................................................
其中,withColumn方法接受两个参数,第一个参数是新列的名称,第二个参数是新列的值。在上面的例子中,我们使用了lit函数来创建一个常量值作为新列的值。如果需要根据已有的列计算新列的值,可以使用 pyspark .sql.functions中的其他函数来实现。 ### 回答2: 在 Pyspark 中,我们可以使用withColumn()方法为 DataFrame 对象添加新的一列。 withColumn()方法需要两个参数:第一个参数为新列的名称,第二个参数为新列的值或新列的计算方法。 以下是一个示例代码: from pyspark .sql.functions import col # 创建一个 DataFrame 对象 df = spark.create DataFrame ([(1, "John", 25), (2, "Mary", 30)], ["id", "name", "age"]) # 添加一个新列"gender" df = df.withColumn("gender", col("age") % 2) # 展示 DataFrame 对象 df.show() 在上面的代码中,我们使用create DataFrame ()方法创建了一个 DataFrame 对象,其中包含三列:id、name和age。 接着,我们使用withColumn()方法为该对象添加了一个新列gender。这个新列的值是根据age列的值计算得到的,使用了col()函数和%运算符。 最后,我们使用show()方法展示了更新后的 DataFrame 对象。 可以看到,新的 DataFrame 对象中有了一个名为gender的列,它的值分别是0和1,代表了age列的奇偶性。 ### 回答3: 在 pyspark 中,我们可以使用withcolumn函数将新的一列添加到 dataframe 中。withcolumn需要两个参数,一个是新的列名,一个是新的列所要包含的值。以下是具体的步骤: 1. 导入 pyspark .sql.functions包 ```python from pyspark .sql.functions import * 2. 创建一个 dataframe 例如,我们创建一个包含姓名和年龄的 dataframe : ```python from pyspark .sql.types import StructType, StructField, IntegerType, StringType schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) data = [("John", 25), ("Lisa", 30), ("Tom", 20)] df = spark.create DataFrame (data=schema) 3. 使用withcolumn函数添加新的一列 例如,我们添加一个新的列"gender",需要根据年龄判断性别: ```python df1 = df.withColumn("gender", when(df.age >= 18, "Male").otherwise("Female")) 以上代码中,wehn函数判断age是否大于等于18,如果是则设置gender为"Male",否则设置为"Female"。 4. 查看新的 dataframe ```python df1.show() 输出结果为: +----+---+------+ |name|age|gender| +----+---+------+ |John| 25| Male| |Lisa| 30| Male| |Tom | 20| Male| +----+---+------+ 以上是在 pyspark 中添加新的一列的基本步骤,具体使用可以根据需求进行修改。