相关文章推荐
谈吐大方的水煮鱼  ·  XMLHttpRequest ...·  11 月前    · 
任性的柿子  ·  Stable ...·  1 年前    · 

根据条件对Pyspark的行进行分组

0 人关注

我有一个有6列的表,我想根据 "记录 "归档,按 "ID1 "和 "ID2 "来分组行。我的记录字段要么是 "IN",要么是 "OUT",它们按日期排序。

这里是我的输入样本...

data = [("ACC.PXP","7246","2020-02-24T14:49:00",None,None,'IN'),
    ("ACC.PXP","7246","2021-03-09T08:20:00","Hospital","Foundation","OUT"),
    ("ACC.PXP","7246","2021-04-05T17:17:00","Hospital","Foundation","IN")
df = spark.createDataFrame(data=data,schema=['ID1','ID2','date','type','name','record'])
df.show(truncate=False)
+-------+----+-------------------+--------+----------+------+
|ID1    |ID2 |date               |type    |name      |record|
+-------+----+-------------------+--------+----------+------+
|ACC.PXP|7246|2020-02-24T14:49:00|null    |null      |IN    |
|ACC.PXP|7246|2021-03-09T08:20:00|Hospital|Foundation|OUT   |
|ACC.PXP|7246|2021-04-05T17:17:00|Hospital|Foundation|IN    |

以下是我想要的结果

data2 = [("ACC.PXP","7246","2020-02-24T14:49:00",None,None, "2021-03-09T08:20:00","Hospital","Foundation"),
    ("ACC.PXP","7246","2021-04-05T17:17:00","Hospital","Foundation", None,None,None)
df2 = spark.createDataFrame(data=data2,schema=['ID1','ID2','date','type','name','date1','type1','name1'])
df2.show(truncate=False)
+-------+----+-------------------+--------+----------+-------------------+--------+----------+
|ID1    |ID2 |date               |type    |name      |date1              |type1   |name1     |
+-------+----+-------------------+--------+----------+-------------------+--------+----------+
|ACC.PXP|7246|2020-02-24T14:49:00|null    |null      |2021-03-09T08:20:00|Hospital|Foundation|
|ACC.PXP|7246|2021-04-05T17:17:00|Hospital|Foundation|null               |null    |null      |
+-------+----+-------------------+--------+----------+-------------------+--------+----------+
    
2 个评论
@sammywemmy 你对如何处理这个问题有什么想法吗?
Hi @ScootCork 我在 stackoverflow.com/questions/57435858/. ..看到了你的答案,我的问题与你的答案很相似。你认为你能帮助解决这个问题吗? 谢谢
apache-spark
pyspark
apache-spark-sql
Sisay
Sisay
发布于 2021-06-15
2 个回答
Anna K.
Anna K.
发布于 2021-06-16
0 人赞同

你可以按id和 "count "分组,并对 "record "列作如下透视。

import pyspark.sql.functions as F
from pyspark.sql import Window
w = Window.partitionBy("ID1", "ID2", "record").orderBy("date")
df1 = (df
   .withColumn("count", F.row_number().over(w))
   .groupBy("ID1", "ID2", "count")
   .pivot("record")
   .agg(F.first("date"), F.first( "type"), F.first("name"))
   .select("ID1", "ID2", 
           F.col("IN_first(date)").alias("date"),  
           F.col("IN_first(type)").alias("type"), 
           F.col("IN_first(name)").alias("name"),
           F.col("OUT_first(date)").alias("date1"),
           F.col("OUT_first(date)").alias("type1"),
           F.col("OUT_first(name)").alias("name1"))

这将产生所需的输出表。然而,我想提醒你的是,与你的解决方案一样,这个解决方案假设每个ID对的第一个日期条目是记录=IN,并且如果按日期排序,这些行是按照IN-OUT-IN-OUT...的顺序排列的。否则,这个解决方案将无法正常工作。

谢谢你 @Anna
嗨,@Anna,你的代码在按日期排序时对IN-ONT-IN...序列运行良好。但是,如果我的数据不好,第一条记录从OUT开始,我想做和以前一样的事情,怎么办?
Sisay
Sisay
发布于 2021-06-16
0 人赞同

我相信有人会想出一个更快、更短、更优雅的pyspark代码。但这个代码也很好用。

from pyspark.sql import functions as F
from pyspark.sql import Window
data = [("ACC.PXP","7246","2020-02-24T14:49:00",None,None,'IN'),
    ("ACC.PXP","7246","2021-03-09T08:20:00","Hospital","Foundation","OUT"),
    ("ACC.PXP","7246","2021-04-05T17:17:00","Hospital","Foundation","IN")
sdf = spark.createDataFrame(data=data,schema=['ID1','ID2','date','type','name','record'])
sdf.show(truncate=False)
## split the data frame into two based on record type and give it a one 
sdf_1 = sdf.filter("record == 'IN'").withColumn('ones', F.lit(1))
sdf_2 = (sdf.filter("record == 'OUT'").withColumnRenamed('date', 'date1')\
                                      .withColumnRenamed('type', 'type1')\
                                        .withColumnRenamed('name', 'name1')\
                                          .withColumn('ones', F.lit(1))
## partition it by id1 and id2 
windowSpec1 = Window.partitionBy("ID1","ID2").orderBy("date")
windowSpec2 = Window.partitionBy("ID1","ID2").orderBy("date1")
## creat a count column to count the number of 'IN' and 'OUT'
sdf_1 = (sdf_1.withColumn('counter', F.sum('ones').over(windowSpec1))\
              .drop("ones", "record")
sdf_2 = (sdf_2.withColumn('counter', F.sum('ones').over(windowSpec2))\
               .withColumnRenamed('ID1','r_ID1')\
                 .withColumnRenamed('ID2','r_ID2')\
                   .withColumnRenamed('counter','r_counter')\
                     .drop("ones", "record")
## merge the two dataframes back
sdf_merged = (sdf_1.join(sdf_2, ( sdf_1.ID1 == sdf_2.r_ID1) & 
                                        (sdf_1.ID2 == sdf_2.r_ID2)  & 
                                        (sdf_1.counter  == sdf_2.r_counter), 
                                        how ='left')\
                                .drop(sdf_2.r_ID1).drop(sdf_2.r_ID2).drop(sdf_2.r_counter).drop(sdf_1.counter)\
                                .orderBy(F.asc('date'))
sdf_merged.show()
+-------+----+-------------------+--------+----------+-------------------+--------+----------+
|    ID1| ID2|               date|    type|      name|              date1|   type1|     name1|
+-------+----+-------------------+--------+----------+-------------------+--------+----------+