这是我预期的数据集 sparkDF ,从月=5年=2020年开始使用 sparkDF2 ,在这之前使用 sparkDF1

Id   Value  month   Year
1    672        4   2020  
1    353        6   2020
2    683        6   2019  
3    363        4   2021

熊猫的替代方案是

df = df1.mask((df1['month'].ge(5) & df1['Year'].eq(2020)) | df1['Year'].ge(2021), df2)
    
1 个评论
对不起,这是两个不同的数据集,第一个数据集在月=5年=2020之前更有效,另一个在月=5年=2020之后更有效。
python
dataframe
pyspark
Nabih Bawazir
Nabih Bawazir
发布于 2022-04-21
2 个回答
wwnde
wwnde
发布于 2022-04-21
已采纳
0 人赞同

选项1:过滤器和unionBy

s=((df1.month >= 5)&(df1.Year == 2020))|(df1.Year >= 2021)
s1=((df2.month >= 5)&(df2.Year == 2020))|(df2.Year >= 2021)
new = df1.where(~s).unionByName(df2.where(s1)).orderBy('Id')
new.show()
+---+-----+-----+----+
| Id|Value|month|Year|
+---+-----+-----+----+
|  1|  672|    4|2020|
|  1|  353|    6|2020|
|  2|  683|    6|2019|
|  3|  363|    4|2021|
+---+-----+-----+----+

选项2:如果你有pandas代码,你可以使用pandas udfs。pandas udf的问题是,包括两个数据帧的数据帧使用cogroup方法,会产生洗牌。在你的情况下。我会使用pandas的combine_first正是你所做的. code below

输入pandas作为pd

def mask_filter(l: pd.DataFrame, r: pd.DataFrame) -> pd.DataFrame:
  l =l.mask((l['month'].ge(5) & l['Year'].eq(2020)) | l['Year'].ge(2021))
  return l.combine_first(r)
df1.groupBy(['month', 'Year']).cogroup(df2.groupBy(['month', 'Year'])).applyInPandas(mask_filter, schema=df2.schema).orderBy('Id').show() 
import pandas as pd
def mask_filter(l: pd.DataFrame, r: pd.DataFrame) -> pd.DataFrame:
  t =l.mask((l['month'].ge(5) & l['Year'].eq(2020)) | l['Year'].ge(2021),r)
  return t
df1.groupBy(['month', 'Year']).cogroup(df2.groupBy(['month', 'Year'])).applyInPandas(mask_filter, schema=df2.schema).orderBy('Id').show() 
+---+-----+-----+----+
| Id|Value|month|Year|
+---+-----+-----+----+
|  1|  672|    4|2020|
|  3|  363|    4|2021|
|  2|  683|    6|2019|