在PySpark中,你用python语法建立一个函数,然后用PySpark SQL中的udf()方法在dataframe中使用,或将其注册成udf并在sql中使用。

例1 通过select()使用UDF

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z),StringType())
""" Converting function to UDF 
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z))
df.select(col("Seqno"),convertUDF(col("Name")).alias("Name")).show()

例2 通过withColumn()使用UDF

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
def upperCase(str):
    return str.upper()
upperCaseUDF = udf(lambda z:upperCase(z),StringType())   
df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show()

例3 注册UDF并在sql中使用

def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr
""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)

例4 通过注释创建UDF

@udf(returnType=StringType()) # 或者写@udf
def upperCase(str):
    return str.upper()
df.withColumn("Cureated Name", upperCase(col("Name"))).show()

pyspark/spark并不能保证子句按从左到右或其他固定的顺序执行。pyspark会将执行根据查询优化与规划,所以and, or, where, having表述会有副作用

No guarantee Name is not null will execute first If convertUDF(Name) like '%John%' execute first then you will get runtime error spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + \ "where Name is not null and convertUDF(Name) like '%John%'") \ .show(truncate=False)
# 如果某些记录的name值为空,就会有问题
spark.sql("select convertUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)

tips:

  1. 最好在UDF函数内部检验null而不是在外部
  2. 如果在UDF内部不能检验null,那至少使用if或者case when来在使用UDF前检验null情况

例1,2,4用的是api方法(df.select(”col1”)之类),例3用的是sql方法(spark.sql(”select * from tbl1”)之类)

api方法定义udf的格式为

# 方法一 函数定义后写udf函数
def func1(str):
	return str[0]
convertUDF = udf(lambda z: func1(z),StringType())
convertUDF = udf(lambda z: func1(z))
# 方法二 在函数定义的上一行写上@udf
@udf(returnType=StringType()) # 或者写@udf
def func1(str):
	return str[0]

sql方法定义udf的格式为

# 方法一:注册udf函数
def func1(str):
	return str[0]
spark.udf.register("func1",func1)

PySpark UDF (User Defined Function)

在PySpark中,你用python语法建立一个函数,然后用PySpark SQL中的udf()方法在dataframe中使用,或将其注册成udf并在sql中使用。例1 通过select()使用UDFfrom pyspark.sql.functions import col, udffrom pyspark.sql.types import StringTypedef convertCase(str): resStr="" arr = str.split(" ") for UDF的执行速度很快,通过缓存计划在语句重复执行时降低代码的编译开销,比存储方法的执行效率更高 可用于减少网络流量 UDF放入内存中,设计不当可能导致系统的崩溃,所以必须在必要的时候实施优化,对udf的优化是通过改写原来的udf代码实现,主要包括两种场景 如果udf嵌套复杂,可以重写一个嵌套层数较少且可以实现相同功能的udf,使性能成倍提升 针对过滤类的udf,将过滤率高的放在前面,减少中间结果,避免不必要的计算 二、UDF的使用 1、建hive表 不得不说,udf函数在spark开发中是非常方便的。有了这个提供,我们不仅可以操作spark dataframe。还可以直接操作数仓(hive)而无需再去过多精力研究hive的udf的编写。 值得高兴的是pyspark同样也支持udf的编写,我们知道初期的spark对于python并不十分友好,随着版本的更新也给python提供了更多的接口。 udf函数的编写 这个其实就是把python的函数绑定sparkudf函数中。我们先来初始化我们的数据源。 df = sc.parallelize([
相比于java编写udf,python编写udf就显得简单的多。站在数据处理的角度来说,python也更合适。那python如何编写udf函数呢? 使用方法: (1)将编写的python代码上传到服务器 (2)添加python文件 (3)使用函数:TRANSFORM (data) USING “python udf_test.py” as (name,address) 对比java编写udf函数来说,少了打包、创建临时函数的过程 编写python代码: #!/usr/bin/python # 2.注册自定义函数 from pyspark.sql.functions import udf from pyspark.sql.types import StringType 自定义函数的重点在于定义返回值类型的数据格式,其数据类型基本都是从from pyspark.sql.types import * 导入,常用的包括: - StructType():结构体 - StructField():结构体中的元素 - LongT...
from datetime import date train = train.toDF("label", "uId", "adId", "operTime", "siteId", "slotId", "contentId", "netType") print(train.dtypes) train.show(n=20) train = train.with...
pyspark自定义函数比python中多了一部注册,整体流程是“定义-注册-调用”,其中注册和调用两步在sparksql和DSL中又有所区别,具体如下: from pyspark.sql import SparkSession 第一步定义一个函数: def squared_func(number): return number *number 这一步和在pyt...
Spark中的自定义函数包括三种类型:udf、udaf和udtf。 1. udf(User-Defined Function):用户自定义函数,用于对DataFrame中的每个元素进行单独的处理,返回一个新的值。可以使用Scala、Java或Python编写。 2. udaf(User-Defined Aggregate Function):用户自定义聚合函数,用于对DataFrame中的一组元素进行聚合操作,返回一个新的值。可以使用Scala、Java或Python编写。 3. udtf(User-Defined Table-Generating Function):用户自定义表生成函数,用于将一行数据转换为多行数据,返回一个新的DataFrame。只能使用Scala或Java编写。 这些自定义函数可以帮助我们更好地处理数据,提高Spark的处理效率和灵活性。