选项1:过滤器和unionBy
s=((df1.month >= 5)&(df1.Year == 2020))|(df1.Year >= 2021)
s1=((df2.month >= 5)&(df2.Year == 2020))|(df2.Year >= 2021)
new = df1.where(~s).unionByName(df2.where(s1)).orderBy('Id')
new.show()
+---+-----+-----+----+
| Id|Value|month|Year|
+---+-----+-----+----+
| 1| 672| 4|2020|
| 1| 353| 6|2020|
| 2| 683| 6|2019|
| 3| 363| 4|2021|
+---+-----+-----+----+
选项2:如果你有pandas代码,你可以使用pandas udfs。pandas udf的问题是,包括两个数据帧的数据帧使用cogroup
方法,会产生洗牌。在你的情况下。我会使用pandas的combine_first
或正是你所做的. code below
输入pandas作为pd
def mask_filter(l: pd.DataFrame, r: pd.DataFrame) -> pd.DataFrame:
l =l.mask((l['month'].ge(5) & l['Year'].eq(2020)) | l['Year'].ge(2021))
return l.combine_first(r)
df1.groupBy(['month', 'Year']).cogroup(df2.groupBy(['month', 'Year'])).applyInPandas(mask_filter, schema=df2.schema).orderBy('Id').show()
import pandas as pd
def mask_filter(l: pd.DataFrame, r: pd.DataFrame) -> pd.DataFrame:
t =l.mask((l['month'].ge(5) & l['Year'].eq(2020)) | l['Year'].ge(2021),r)
return t
df1.groupBy(['month', 'Year']).cogroup(df2.groupBy(['month', 'Year'])).applyInPandas(mask_filter, schema=df2.schema).orderBy('Id').show()
+---+-----+-----+----+
| Id|Value|month|Year|
+---+-----+-----+----+
| 1| 672| 4|2020|
| 3| 363| 4|2021|
| 2| 683| 6|2019|