说明
如果您需要编写公共UDTF,并为多个MaxCompute项目授权UDTF的操作权限,建议UDTF同时兼容Python 2和Python 3。
开启Python 3
MaxCompute默认使用Python 2,如果您要使用Python 3,可以在Session级别设置如下属性开启Python 3,并与SQL语句一起提交执行。
set odps.sql.python.version=cp37;
MaxCompute内置的Python 3运行环境中未安装第三方库Numpy。如果您需要使用Numpy的UDTF,请手动上传Numpy的WHEEL包。从PyPI或镜像下载Numpy包时,包的文件名为
numpy-<版本号>-cp37-cp37m-manylinux1_x86_64.whl
。上传包的操作请参见
资源操作
或
Python UDF使用第三方包
。
函数签名及数据类型
函数签名格式如下。
@annotate(<signature>)
signature
为函数签名字符串,用于标识输入参数和返回值的数据类型。执行UDTF时,UDTF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。
'arg_type_list -> type_list'
其中:
-
type_list
:表示返回值的数据类型。UDTF可以返回多列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
-
arg_type_list
:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
arg_type_list
还支持星号(*)或为空(''):
-
当
arg_type_list
为星号(*)时,表示输入参数为任意个数。
-
当
arg_type_list
为空('')时,表示无输入参数。
@annotate("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>")
输入参数类型为ARRAY、STRUCT、MAP,返回值类型为MAP、STRUCT。
-
odps.distcache.get_cache_file(resource_name)
:返回指定文件资源的内容。
-
resource_name
为STRING类型,对应当前MaxCompute项目中已存在的文件资源名。如果文件资源名非法或者没有相应的文件资源,会返回异常。
说明
使用UDTF访问资源,在创建UDTF时需要声明引用的资源,否则会报错。
-
返回值为File-like对象。在使用完此对象后,您需要调用
close
方法释放打开的资源文件。
-
odps.distcache.get_cache_table(resource_name)
:返回指定表资源的内容。
-
resource_name
支持STRING类型,对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。
-
返回值为GENERATOR类型,调用者以遍历方式获取表的内容,每次遍历可得到以数组形式存在的表中的一条记录。
引用文件资源和表资源的代码示例如下。
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
"""读取资源文件和资源表里的pageid、adid_list,生成dict
def __init__(self):
import json
cache_file = get_cache_file('test_json.txt')
self.my_dict = json.load(cache_file)
cache_file.close()
records = list(get_cache_table('table_resource1'))
for record in records:
self.my_dict[record[0]] = record[1]
"""输入pageid,输出pageid以及它对应的所有adid
def process(self, pageid):
for adid in self.my_dict[pageid]:
self.forward(pageid, adid)
按照
开发流程
,完成Python 3 UDTF开发后,您即可通过MaxCompute SQL调用Python 3 UDTF。调用方法如下:
-
在归属MaxCompute项目中使用自定义函数:使用方法与
内建函数
类似,您可以参照内建函数的使用方法使用自定义函数。
-
跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。更多跨项目分享信息,请参见
基于Package的跨项目空间资源访问
。
使用MaxCompute Studio完整开发及调用Python 3 UDTF的操作,请参见
开发Python UDF
。