pyspark之DataFrame数据处理学习【数据去重之一】
1、重复数据,例如
spark = SparkSession.builder.appName("dataDeal").getOrCreate()
df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])
>>> df.show()
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 1| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M|
| 3| 124.1| 5.2| 23| F|
| 4| 144.5| 5.9| 33| M|
| 5| 133.2| 5.7| 54| F|
| 3| 124.1| 5.2| 23| F|
| 5| 129.2| 5.3| 42| M|
+---+------+------+---+------+
上面的数据中存在如下问题:
有两行id等于3并且完全相同
id为1和4的两行是一样的数据,只是id不同,可以假定为是同一个人的数据
有两行的id等于5,这看上去是一个异常数据,因为他们看上去不像是同一个人的数据
2、检查是否有重复数据采用.distinct()方法
print ('Count of rows:{0}'.format(df.count()))
print ('Count of distinct rows:{0}'.format(df.distinct().count()))
>>> print ('Count of rows:{0}'.format(df.count()))
Count of rows:7
>>> print ('Count of distinct rows:{0}'.format(df.distinct().count()))
Count of distinct rows:6
>>> df.columns
['id', 'weight', 'height', 'age', 'gender']
可以看到返回的两个值不等,一个为6,一个为7.所以,可以判断出我们的数据集中有完全相同的行(即重复的数据)
3、移除重复的数据采用.dropDuplicates()方法
1)、#移除重复的数据
df = df.dropDuplicates()
#查看去重后的数据
df.show()
>>> df.show()
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 4| 144.5| 5.9| 33| M|
| 1| 144.5| 5.9| 33| M|
| 5| 129.2| 5.3| 42| M|
| 5| 133.2| 5.7| 54| F|
| 2| 167.2| 5.4| 45| M|
| 3| 124.1| 5.2| 23| F|
+---+------+------+---+------+
通过结果可以看出,删除了一行id为3的记录
2)、接着可以通过重复之前的工作检查与id无关的重复数据
#对除id以外的列进行对比
print ("Count of ids:{0}".format(df.count()))
print ("Count of distinct ids:{0}".format(df.select([c for c in df.columns if c != 'id']).distinct().count()))
Count of ids:6
Count of distinct ids:5
可以继续使用.dropDuplicates()删除重复数据,但是需要使用subset参数来指定只处理除id以外的列。subset参数指明.dropDuplicates()方法只查找subset参数指定的列
#去掉除id以外其他属性相同的数据
df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
df.show()
>>> df.show()
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 5| 133.2| 5.7| 54| F|
| 4| 144.5| 5.9| 33| M|
| 2| 167.2| 5.4| 45| M|
| 3| 124.1| 5.2| 23| F|
| 5| 129.2| 5.3| 42| M|
+---+------+------+---+------+
从结果可以看出,现在的数据没有任何一行是重复的(既没有完全相同的记录也没有除id以外相同的记录)
#去掉除id以外其他属性相同的数据
>>> df = df.dropDuplicates(subset=[c for c in df.columns if c in [ 'weight','height','age']])
>>> df.show()
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
| 2| 167.2| 5.4| 45| M|
| 5| 133.2| 5.7| 54| F|
| 5| 129.2| 5.3| 42| M|
| 3| 124.1| 5.2| 23| F|
| 4| 144.5| 5.9| 33| M|
+---+------+------+---+------+
3)检测是否有重复的id
#计算id的总数和id的唯一数
import pyspark.sql.functions as fn
>>> df.agg(fn.count('id').alias('count'),fn.countDistinct('id').alias('distinct')).show()
+-----+--------+
|count|distinct|
+-----+--------+
| 5| 4|
+-----+--------+
.count()方法和.countDistinct()方法分别计算DataFrame的行数和id的唯一数。.alias()方法可以对返回的列指定一个别名。
从结果中可以看出,总共5条记录,但只有4个唯一id。假设id相同的数据是偶然事件,异常值,则将每一行给定一个唯一的id
4)#重新给每行分配id
df.withColumn('new_id',fn.monotonically_increasing_id()).show()
+---+------+------+---+------+-------------+
| id|weight|height|age|gender| new_id|
+---+------+------+---+------+-------------+
| 2| 167.2| 5.4| 45| M| 68719476736|
| 5| 133.2| 5.7| 54| F| 395136991232|
| 5| 129.2| 5.3| 42| M| 884763262976|
| 3| 124.1| 5.2| 23| F| 962072674304|
| 4| 144.5| 5.9| 33| M|1331439861760|
+---+------+------+---+------+-------------+
.monotonicallymonotonically_increasing_id()方法给每条记录提供一个唯一且递增的id
参考:
https://blog.csdn.net/xiaoql520/article/details/78774581
pyspark之DataFrame数据处理学习【数据去重之一】1、重复数据,例如spark = SparkSession.builder.appName("dataDeal").getOrCreate()df = spark.createDataFrame([ (1, 144.5, 5.9, 33, 'M'), (2, 167.2, 5.4, 45, 'M'), ...
a=[['A',1,'aa'],['A',2,'aa'],['B',3,'aa'],['C',4,'bb']]
df = pd.
DataFrame
(a)
df.columns = ['Type','Num','type2']
df.groupby('Type').
import pandas as pd
sql="select * from uds.amz_daily_sales where purchase_date>='2021-04-01' "
data=pd.read_sql(sql=sql,con=db)
SELECT * FROM table_name --取出table中所有
数据
SELECT * FROM table_name limit 3 --取出table中的前3行
数据
SELECT * FROM table_name WHERE column_name = 'abc' --从table中...
df.dtypes #查看各行的
数据
格式
df['列名'].astype(int)#转换某列的
数据
类型res_df['T'] = res_df['T'].apply(int)
#切片操作
df.iloc[1:.
Spark SQL中的
DataFrame
类似于一张关系型
数据
表。在关系型
数据
库中对单表或进行的查询操作,在
DataFrame
中都可以通过调用其API接口来实现。可以参考,Scala提供的
DataFrame
API。
本文中的代码基于Spark-1.6.2的文档实现。
一、
DataFrame
对象的生成
Spark-SQL可以以其他RDD对象、parquet文...
最好可以用RDD的就不要用
DataFrame
今日就遇到执行出现 SparkContext异常停止,怀疑是
DataFrame
的distinct操作和groupby一样并不在本地合并为最小集,导致最后崩溃;而后换成RDD.distinct()却是可以的。
经多次测试都是以上结论
测试
数据
一亿两千万条
结论:能用RDD的相关操作,就别用
DataFrame
,比如排序、统计count、disti...
// 首先读取零售业的采购
数据
,然后对
数据
进行重划分以减少分区数量(因为我们事先知道仅有少量
数据
存储在大量的小文件里),
// 最后将这些
数据
缓存起来以便后续的快速访问
val df = spark.read.format("csv")
.option("header", true)
.option("inferSchema", true)
.load(inputPath)
.coalesce(5) // 小分区合并
DataFrame
也是一种不可变的分布式
数据
集,类似于Python Pandas中的
DataFrame
和关系
数据
库中的表。在分布式
数据
集上施加表结构之后,就可以使用Spark SQL查询结构化的
数据
或者使用Spark表达式方法。
1. Spark SQL性能
未引入
DataFrame
之前,使用Python操作RDD时的查询速度比使用Scala和Java的查询慢很多,因为
Pyspark
需要将所有........................................................
其中,withColumn方法接受两个参数,第一个参数是新列的名称,第二个参数是新列的值。在上面的例子中,我们使用了lit函数来创建一个常量值作为新列的值。如果需要根据已有的列计算新列的值,可以使用
pyspark
.sql.functions中的其他函数来实现。
### 回答2:
在
Pyspark
中,我们可以使用withColumn()方法为
DataFrame
对象添加新的一列。
withColumn()方法需要两个参数:第一个参数为新列的名称,第二个参数为新列的值或新列的计算方法。
以下是一个示例代码:
from
pyspark
.sql.functions import col
# 创建一个
DataFrame
对象
df = spark.create
DataFrame
([(1, "John", 25), (2, "Mary", 30)], ["id", "name", "age"])
# 添加一个新列"gender"
df = df.withColumn("gender", col("age") % 2)
# 展示
DataFrame
对象
df.show()
在上面的代码中,我们使用create
DataFrame
()方法创建了一个
DataFrame
对象,其中包含三列:id、name和age。
接着,我们使用withColumn()方法为该对象添加了一个新列gender。这个新列的值是根据age列的值计算得到的,使用了col()函数和%运算符。
最后,我们使用show()方法展示了更新后的
DataFrame
对象。
可以看到,新的
DataFrame
对象中有了一个名为gender的列,它的值分别是0和1,代表了age列的奇偶性。
### 回答3:
在
pyspark
中,我们可以使用withcolumn函数将新的一列添加到
dataframe
中。withcolumn需要两个参数,一个是新的列名,一个是新的列所要包含的值。以下是具体的步骤:
1. 导入
pyspark
.sql.functions包
```python
from
pyspark
.sql.functions import *
2. 创建一个
dataframe
例如,我们创建一个包含姓名和年龄的
dataframe
:
```python
from
pyspark
.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
data = [("John", 25), ("Lisa", 30), ("Tom", 20)]
df = spark.create
DataFrame
(data=schema)
3. 使用withcolumn函数添加新的一列
例如,我们添加一个新的列"gender",需要根据年龄判断性别:
```python
df1 = df.withColumn("gender", when(df.age >= 18, "Male").otherwise("Female"))
以上代码中,wehn函数判断age是否大于等于18,如果是则设置gender为"Male",否则设置为"Female"。
4. 查看新的
dataframe
```python
df1.show()
输出结果为:
+----+---+------+
|name|age|gender|
+----+---+------+
|John| 25| Male|
|Lisa| 30| Male|
|Tom | 20| Male|
+----+---+------+
以上是在
pyspark
中添加新的一列的基本步骤,具体使用可以根据需求进行修改。