删除特定列上的连续重复数据 pyspark

0 人关注

我想从数据框架中删除一个子集内的连续重复的列。我 在这里 找到了一个关于如何做的解决方案,但只适用于单列

有一个这样的数据框架。

test_df = spark.createDataFrame([
  (2,3.0,"a", "2020-01-01"),
  (2,6.0,"a", "2020-01-02"),
  (3,2.0,"a", "2020-01-02"),
  (4,1.0,"b", "2020-01-04"),
  (4,9.0,"b", "2020-01-05"),
  (4,7.0,"b", "2020-01-05"),
  (2,3.0,"a", "2020-01-08"),
  (4,7.0,"b", "2020-01-09")
], ("id", "num","st", "date"))
##############
id   num   st    date
2,   3.0,  "a"  "2020-01-01"
2,   6.0,  "a"  "2020-01-02"
3,   2.0,  "a"  "2020-01-02"
4,   1.0,  "b"  "2020-01-04"
4,   9.0,  "b"  "2020-01-05"
4,   7.0,  "b"  "2020-01-05"
2,   3.0,  "a"  "2020-01-08"
4,   7.0,  "b"  "2020-01-09"

我想在一个特定的列[id,st]中删除连续的重复数据,当连续的案例出现时,保留第一条记录(按日期排序)。如果两个样本出现在同一天,不能正确排序,可以随机选择。结果会是这样的。

##############
id   num   st    date
2,   3.0,  "a"   "2020-01-01"
3,   2.0,  "a"   "2020-01-02"
4,   1.0,  "b"   "2020-01-04"
2,   3.0,  "a"   "2020-01-08"
4,   7.0,  "b"   "2020-01-09"

我怎么能这样做呢?

5 个评论
我相信你会保留第一个记录,但你如何定义正确的顺序?
@Steven 是的第一张唱片,Sr我没有明确提到它。编辑了主帖
这种操作已经被回答过几次了。这里有一个。 stackoverflow.com/questions/68076603/ ...
我无法使用你的代码样本。理论上,它似乎也应该适用于这个用例,但是当创建ID列时,我所有的行都被标记为id=1。当运行最后的df4代码时,它不会是预期的结果。
python
dataframe
apache-spark
pyspark
apache-spark-sql
jiwidi
jiwidi
发布于 2021-08-09
1 个回答
Kafels
Kafels
发布于 2021-08-17
已采纳
0 人赞同

你可以将这些列 id st ,并使用 lag 函数验证前一个值是否与当前值相同。

from pyspark.sql import Window
import pyspark.sql.functions as f
window = Window.orderBy(f.col('date'))
before_dedup_df = (test_df
                   .withColumn('_custom_id', f.concat(f.col('id'), f.col('st')))
                   .withColumn('_consecutive', f.col('_custom_id').eqNullSafe(f.lag('_custom_id').over(window))))
# +---+---+---+----------+----------+------------+
# |id |num|st |date      |_custom_id|_consecutive|
# +---+---+---+----------+----------+------------+
# |2  |3.0|a  |2020-01-01|2a        |false       |
# |2  |6.0|a  |2020-01-02|2a        |true        |
# |3  |2.0|a  |2020-01-02|3a        |false       |
# |4  |1.0|b  |2020-01-04|4b        |false       |
# |4  |9.0|b  |2020-01-05|4b        |true        |
# |4  |7.0|b  |2020-01-05|4b        |true        |
# |2  |3.0|a  |2020-01-08|2a        |false       |
# |4  |7.0|b  |2020-01-09|4b        |false       |
# +---+---+---+----------+----------+------------+
dedup_df = (before_dedup_df
            .where(~f.col('_consecutive'))
            .drop('_custom_id', '_consecutive'))
# +---+---+---+----------+
# |id |num|st |date      |
# +---+---+---+----------+
# |2  |3.0|a  |2020-01-01|
# |3  |2.0|a  |2020-01-02|