相关文章推荐
帅气的领带  ·  【Pyspark ...·  3 周前    · 
爱看球的机器猫  ·  使用 JS 的 download ...·  1 年前    · 
才高八斗的骆驼  ·  SQLSERVER ...·  1 年前    · 

PySpark | 自定义函数UDF

1.1 自定义udf

1)首先创建DataFrame

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]
df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)

2)自定义的python函数,对df的Name列中的名字转换成大写字母开头

def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
        resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

3)将自定义的convertCase函数注册为udf

from pyspark.sql.functions import udf
udf1 = udf(convertCase,StringType())

4)将自定义udf运用到dataframe中

df.select(col("Seqno"), \
    udf1(col("Name")).alias("Name") ) \
   .show(truncate=False)


1.2 注册udf,在sql中使用

有时候,需要将自定义的udf在sql中使用,可以使用下面的方法注册udf

spark.udf.register("udf1", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, udf1(Name) as Name from NAME_TABLE") \
     .show(truncate=False)

1.3 注解形式更方便

@udf(returnType=StringType()) 
def convertCase(str):
    resStr=""