相关文章推荐
买醉的熊猫  ·  C# Quartz.dll使用-CSDN博客·  1 年前    · 
侠义非凡的硬盘  ·  PHP ...·  2 年前    · 
无邪的硬盘  ·  When connecting to ...·  2 年前    · 
性感的黄瓜  ·  Android service 启动篇之 ...·  2 年前    · 
腼腆的牛肉面  ·  [LeetCode] Solve the ...·  2 年前    · 

Lambda Expression + pySpark

2 人关注

我试图将Spark DataFrame中的一个列与给定的日期进行比较,如果列的日期小于给定的日期,则增加n小时,否则增加x小时。

addhours = lambda x,y: X + 14hrs if (x < y) else X + 10hrs

其中y将持有一个指定的静态日期,然后应用在DataFrame列上

df = df.withColumn("newDate", checkDate(df.Time, F.lit('2015-01-01') ))

这里是df的样本

from pyspark.sql import functions as F
import datetime
df = spark.createDataFrame([('America/NewYork', '2020-02-01 10:00:00'),('Africa/Nairobi', '2020-02-01 10:00:00')],["OriginTz", "Time"])

我对火花数据框架有点陌生 :)

pyspark
azure-databricks
pyspark-dataframes
Sreedhar
Sreedhar
发布于 2020-05-21
2 个回答
notNull
notNull
发布于 2020-05-21
已采纳
0 人赞同

使用 when+othewise 语句而不是 udf

Example:

from pyspark.sql import functions as F
#we are casting to timestamp and date so that we can compare in when
df = spark.createDataFrame([('America/NewYork', '2020-02-01 10:00:00'),('Africa/Nairobi', '2003-02-01 10:00:00')],["OriginTz", "Time"]).\
withColumn("literal",F.lit('2015-01-01').cast("date")).\
withColumn("Time",F.col("Time").cast("timestamp"))
df.show()
#+---------------+-------------------+----------+
#|       OriginTz|               Time|   literal|
#+---------------+-------------------+----------+
#|America/NewYork|2020-02-01 10:00:00|2015-01-01|
#| Africa/Nairobi|2003-02-01 10:00:00|2015-01-01|
#+---------------+-------------------+----------+
#using unix_timestamp function converting to epoch time then adding 10*3600 -> 10 hrs finally converting to timestamp format
df.withColumn("new_date",F.when(F.col("Time") > F.col("literal"),F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss')  + 10 * 3600)).\
    otherwise(F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss')  + 14 * 3600))).\
show()
#+---------------+-------------------+----------+-------------------+
#|       OriginTz|               Time|   literal|           new_date|
#+---------------+-------------------+----------+-------------------+
#|America/NewYork|2020-02-01 10:00:00|2015-01-01|2020-02-01 20:00:00|
#| Africa/Nairobi|2003-02-01 10:00:00|2015-01-01|2003-02-02 00:00:00|
#+---------------+-------------------+----------+-------------------+

如果你不想把字面价值作为数据框架列来添加。

lit_val='2015-01-01'
df = spark.createDataFrame([('America/NewYork', '2020-02-01 10:00:00'),('Africa/Nairobi', '2003-02-01 10:00:00')],["OriginTz", "Time"]).\
withColumn("Time",F.col("Time").cast("timestamp"))
df.withColumn("new_date",F.when(F.col("Time") > F.lit(lit_val).cast("date"),F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss')  + 10 * 3600)).\
    otherwise(F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss')  + 14 * 3600))).\
show()
#+---------------+-------------------+----------+-------------------+
#|       OriginTz|               Time|   literal|           new_date|
#+---------------+-------------------+----------+-------------------+
#|America/NewYork|2020-02-01 10:00:00|2015-01-01|2020-02-01 20:00:00|
#| Africa/Nairobi|2003-02-01 10:00:00|2015-01-01|2003-02-02 00:00:00|
#+---------------+-------------------+----------+-------------------+
murtihash
murtihash
发布于 2020-05-21
0 人赞同

你也可以用 .expr interval .这样,你就不必转换为另一种格式。

from pyspark.sql import functions as F
df.withColumn("new_date", F.expr("""IF(Time<y, Time + interval 14 hours, Time + interval 10 hours)""")).show()
#+---------------+-------------------+----------+-------------------+
#|       OriginTz|               Time|         y|           new_date|