在PySpark中,你用python语法建立一个函数,然后用PySpark SQL中的udf()方法在dataframe中使用,或将其注册成udf并在sql中使用。
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
def convertCase(str):
resStr=""
arr = str.split(" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z),StringType())
""" Converting function to UDF
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z))
df.select(col("Seqno"),convertUDF(col("Name")).alias("Name")).show()
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
def upperCase(str):
return str.upper()
upperCaseUDF = udf(lambda z:upperCase(z),StringType())
df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show()
def convertCase(str):
resStr=""
arr = str.split(" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr
""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
.show(truncate=False)
@udf(returnType=StringType())
def upperCase(str):
return str.upper()
df.withColumn("Cureated Name", upperCase(col("Name"))).show()
pyspark/spark并不能保证子句按从左到右或其他固定的顺序执行。pyspark会将执行根据查询优化与规划,所以and, or, where, having表述会有副作用
No guarantee Name is not null will execute first
If convertUDF(Name) like '%John%' execute first then
you will get runtime error
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + \
"where Name is not null and convertUDF(Name) like '%John%'") \
.show(truncate=False)
spark.sql("select convertUDF(Name) from NAME_TABLE2") \
.show(truncate=False)
tips:
- 最好在UDF函数内部检验null而不是在外部
- 如果在UDF内部不能检验null,那至少使用if或者case when来在使用UDF前检验null情况
例1,2,4用的是api方法(df.select(”col1”)之类),例3用的是sql方法(spark.sql(”select * from tbl1”)之类)
api方法定义udf的格式为
def func1(str):
return str[0]
convertUDF = udf(lambda z: func1(z),StringType())
convertUDF = udf(lambda z: func1(z))
@udf(returnType=StringType())
def func1(str):
return str[0]
sql方法定义udf的格式为
def func1(str):
return str[0]
spark.udf.register("func1",func1)
PySpark UDF (User Defined Function)
在PySpark中,你用python语法建立一个函数,然后用PySpark SQL中的udf()方法在dataframe中使用,或将其注册成udf并在sql中使用。例1 通过select()使用UDFfrom pyspark.sql.functions import col, udffrom pyspark.sql.types import StringTypedef convertCase(str): resStr="" arr = str.split(" ") for
UDF的执行速度很快,通过缓存计划在语句重复执行时降低代码的编译开销,比存储方法的执行效率更高
可用于减少网络流量
UDF放入内存中,设计不当可能导致系统的崩溃,所以必须在必要的时候实施优化,对udf的优化是通过改写原来的udf代码实现,主要包括两种场景
如果udf嵌套复杂,可以重写一个嵌套层数较少且可以实现相同功能的udf,使性能成倍提升
针对过滤类的udf,将过滤率高的放在前面,减少中间结果,避免不必要的计算
二、UDF的使用
1、建hive表
不得不说,udf函数在spark开发中是非常方便的。有了这个提供,我们不仅可以操作spark dataframe。还可以直接操作数仓(hive)而无需再去过多精力研究hive的udf的编写。
值得高兴的是pyspark同样也支持udf的编写,我们知道初期的spark对于python并不十分友好,随着版本的更新也给python提供了更多的接口。
udf函数的编写
这个其实就是把python的函数绑定spark的udf函数中。我们先来初始化我们的数据源。
df = sc.parallelize([
相比于java编写udf,python编写udf就显得简单的多。站在数据处理的角度来说,python也更合适。那python如何编写udf函数呢?
使用方法:
(1)将编写的python代码上传到服务器
(2)添加python文件
(3)使用函数:TRANSFORM (data) USING “python udf_test.py” as (name,address)
对比java编写udf函数来说,少了打包、创建临时函数的过程
编写python代码:
#!/usr/bin/python
# 2.注册自定义函数
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
自定义函数的重点在于定义返回值类型的数据格式,其数据类型基本都是从from pyspark.sql.types import * 导入,常用的包括:
- StructType():结构体
- StructField():结构体中的元素
- LongT...
from datetime import date
train = train.toDF("label", "uId", "adId", "operTime", "siteId", "slotId", "contentId", "netType")
print(train.dtypes)
train.show(n=20)
train = train.with...
pyspark中自定义函数比python中多了一部注册,整体流程是“定义-注册-调用”,其中注册和调用两步在sparksql和DSL中又有所区别,具体如下:
from pyspark.sql import SparkSession
第一步定义一个函数:
def squared_func(number):
return number *number
这一步和在pyt...
Spark中的自定义函数包括三种类型:udf、udaf和udtf。
1. udf(User-Defined Function):用户自定义函数,用于对DataFrame中的每个元素进行单独的处理,返回一个新的值。可以使用Scala、Java或Python编写。
2. udaf(User-Defined Aggregate Function):用户自定义聚合函数,用于对DataFrame中的一组元素进行聚合操作,返回一个新的值。可以使用Scala、Java或Python编写。
3. udtf(User-Defined Table-Generating Function):用户自定义表生成函数,用于将一行数据转换为多行数据,返回一个新的DataFrame。只能使用Scala或Java编写。
这些自定义函数可以帮助我们更好地处理数据,提高Spark的处理效率和灵活性。