PySpark | 自定义函数UDF
1.1 自定义udf
1)首先创建DataFrame
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]
df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)
2)自定义的python函数,对df的Name列中的名字转换成大写字母开头
def convertCase(str):
resStr=""
arr = str.split(" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr
3)将自定义的convertCase函数注册为udf
from pyspark.sql.functions import udf
udf1 = udf(convertCase,StringType())
4)将自定义udf运用到dataframe中
df.select(col("Seqno"), \
udf1(col("Name")).alias("Name") ) \
.show(truncate=False)
1.2 注册udf,在sql中使用
有时候,需要将自定义的udf在sql中使用,可以使用下面的方法注册udf
spark.udf.register("udf1", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, udf1(Name) as Name from NAME_TABLE") \
.show(truncate=False)
1.3 注解形式更方便
@udf(returnType=StringType())
def convertCase(str):
resStr=""