本文为您介绍如何使用自定义函数及 Python 第三方库。
使用自定义函数
DataFrame
函数支持对
Sequence
使用
map
,它会对它的每个元素调用自定义函数。
>>> iris.sepallength.map(lambda x: x + 1).head(5)
sepallength
0 6.1
1 5.9
2 5.7
3 5.6
4 6.0
目前,自定义函数无法支持将 List/Dict 类型作为输入或输出。
如果
map
前后,Sequence
的类型发生了变化,则需要显式指定
map
后的类型。
>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
sepallength
0 t5.1
1 t4.9
2 t4.7
3 t4.6
4 t5.0
如果在函数中包含闭包,则函数外闭包变量值的变化会引起函数内该变量值的变化。
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(lambda x: x + i))
结果为
dfs
中每个
SequenceExpr
均为
df.sepal_length+9
。为解决此问题,可以将函数作为另一函数的返回值,或者使用
partial
。两个示例如下。
>>> dfs = []
>>> def get_mapper(i):
>>> return lambda x: x + i
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(get_mapper(i)))
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))
map
也支持使用现有的
UDF
函数,传入的参数是
str
类型(函数名)或者
Function
对象,详情请参见
函数
。
map
传入
Python
函数的实现使用了
MaxCompute Python UDF。因此,如果您所在的
Project
不支持
Python UDF,则
map
函数无法使用。除此以外,所有
Python UDF
的限制在此都适用。
目前,默认可使用的第三方库(包含 C)只有 NumPy,第三方库使用详情请参见 使用第三方 Python 库 。
除了调用自定义函数,DataFrame
还提供了很多内置函数,这些函数中部分使用了
map
函数来实现。因此,如果您所在
Project
未开通
Python UDF,则无法使用这些函数(注意:阿里云公共服务暂不提供对
Python UDF
的支持)。
由于字节码定义的差异,Python 3
下使用新语言特性(例如
yield from
)时,代码在使用
Python 2.7
的
MaxCompute Worker
上执行时会发生错误。因此,建议您在
Python 3
下使用
MapReduce API
编写生产作业前,先确认相关代码是否能正常执行。
示例程序:使用计数器
from odps.udf import get_execution_context
def h(x):
ctx = get_execution_context()
counters = ctx.get_counters()
counters.get_counter('df', 'add_one').increment(1)
return x + 1
df.field.map(h)
Logview 的 JSONSummary 中即可找到计数器值。
对一行数据使用自定义函数
如果您需要对一行数据使用自定义函数,可以使用
apply
方法。参数
axis
的值必须设为
1,表示对行进行操作。
apply
的自定义函数接收一个参数,参数为上一步
Collection
的一行数据。您可以通过属性或者偏移获得一个字段的数据。
-
reduce
为 True 时,表示返回结果为 Sequence,否则返回结果为 Collection。names
和types
参数分别指定返回的 Sequence 或 Collection 的字段名和类型。 如果未指定类型,则会默认为 STRING 类型。>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3) sepaladd 0 8.6 1 7.9 2 7.9
-
在
apply
的自定义函数中,reduce
为 False 时,您可以使用yield
关键字返回多行结果。>>> iris.count() >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count() 300
-
您也可以在函数中注释返回的字段和类型,无需在函数调用时再指定它们。
>>> from odps.df import output >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> iris.apply(handle, axis=1).count() 300
-
您也可以使用
map-only
的map_reduce
,该操作与axis=1
的apply
操作是等价的。>>> iris.map_reduce(mapper=handle).count() 300
-
如果您想调用 MaxCompute 上已经存在的 UDTF,函数指定为函数名即可。
>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
-
使用
apply
对行操作,且reduce
为 False 时,您可以使用并列多行输出与已有的行结合,用于后续聚合等操作。>>> from odps.df import output >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> iris[iris.category, iris.apply(handle, axis=1)]
对所有列调用自定义聚合
调用
apply
方法,当不指定
axis
,或者
axis
值为
0
时,您可以通过传入一个自定义聚合类对所有
Sequence
进行聚合操作。
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
sepallength_aggregation sepalwidth_aggregation petallength_aggregation petalwidth_aggregation
0 5.843333 3.054 3.758667 1.198667
目前,自定义函数无法支持将 LIST/DICT 类型作为初始输入或最终输出结果。
引用资源
自定义函数也能读取 MaxCompute 上的资源(表资源或文件资源),或者引用一个 Collection 作为资源。此时,自定义函数需要写成函数闭包或 Callable 的类。两个示例如下。
>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
sepallength
0 Iris-setosa
1 Iris-versicolor
>>> def myfunc(resources): # resources按调用顺序传入。
>>> names = set()
>>> fileobj = resources[0] # 文件资源是一个file-like的object。
>>> for l in fileobj:
>>> names.add(l)
>>> collection = resources[1]
>>> for r in collection:
>>> names.add(r.name) # 这里可以通过字段名或者偏移来取。
>>> def h(x):
>>> if x in names:
>>> return True
>>> else:
>>> return False
>>> return h
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>> df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
name isin
0 Iris-setosa True
1 Iris-versicolor True
2 Iris-virginica False
分区表资源在读取时不包含分区字段。
当
axis
值为
1,即在行上操作时,您需要写一个函数闭包或者
Callable
的类。 而对于列上的聚合操作,您只需在
__init__
函数里读取资源即可。
>>> words_df
sentence
0 Hello World
1 Hello Python
2 Life is short I use Python
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>> stop_words = set([r[0] for r in resources[0]])
>>> def h(row):
>>> return ' '.join(w for w in row[0].split() if w not in stop_words),
>>> return h
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
sentence
0 Hello World
1 Hello Python
2 Life short use Python
这里的
stop_words
存放于本地,但在真正执行时会被上传到
MaxCompute
作为资源引用。
使用第三方 Python 库
您可以把第三方 Python 包( whl 、 egg 、 zip 以及 tar.gz )上传到 MaxCompute,并在全局或在立即执行时指定包文件,请确保指定所有依赖库,否则会导致导入错误。
-
您可以通过 PyODPS 的资源上传接口 create_resource 来完成资源的上传。
以下是使用 python-dateutil 包的举例:
-
使用
pip download
命令,下载包以及其依赖到某个路径。下载后会出现两个包: six-1.10.0-py2.py3-none-any.whl 和 python_dateutil-2.5.3-py2.py3-none-any.whl (注意:这里需要下载支持 Linux 环境的包)。$ pip download python-dateutil -d /to/path/
-
分别把两个文件上传到 MaxCompute 资源。
>>> # 确保资源名后缀正确。 >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb')) >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
-
现在有个 DataFrame,只有一个 STRING 类型字段。
>>> df datestr 0 2016-08-26 14:03:29 1 2015-08-26 14:03:29
-
全局配置使用到的三方库如下。
>>> from odps import options >>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> options.df.libraries = ['six.whl', 'python_dateutil.whl'] >>> df.datestr.map(get_year) datestr 0 2016 1 2015
或者通过运行方法的
libraries
参数指定使用到的第三方库。>>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl']) datestr 0 2016 1 2015
PyODPS 默认支持执行仅包含 Python 且不含文件操作的第三方库。在较新版本的 MaxCompute 服务下,PyODPS 也支持执行带有二进制代码或带有文件操作的 Python 库。这些库的后缀必须是 cp27-cp27m-manylinux1_x86_64 ,以
archive
方式上传, whl 后缀的包需要重命名为 zip 。同时,作业需要开启odps.isolation.session.enable
选项,或者在 Project 级别开启isolation
。以下示例为您展示如何上传并使用scipy
中的特殊函数。>>> # 对于含有二进制代码的包,必须使用archive方式上传资源,whl后缀需要改为zip。 >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb')) >>> # 如果Project开启了isolation,下面的选项不是必需的。 >>> options.sql.settings = { 'odps.isolation.session.enable': True } >>> def psi(value): >>> # 建议在函数内部import第三方库,以防止不同操作系统下二进制包结构差异造成执行错误。 >>> from scipy.special import psi >>> return float(psi(value)) >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])
对于只提供源码的二进制包,可以在 Linux Shell 中打包成 Wheel 再上传,Mac 和 Windows 中生成的 Wheel 包无法在 MaxCompute 中使用。
python setup.py bdist_wheel
-
-
您也可以通过 MaxCompute Console 上传资源。
-
现在主流的 Python 包都提供了 whl 包,提供了各平台包含二进制文件的包,因此找到可以在 MaxCompute 上运行的包是第一步。
-
其次,需要包含所有的依赖包。各个包的依赖情况如下表所示。
包名
依赖
pandas
numpy、python-dateutil、pytz、six
scipy
numpy
scikit-learn
numpy、scipy
说明其中 numpy 已包含,您只需上传 python-dateutil、pytz、pandas、scipy、sklearn、six 包,pandas、scipy 和 scikit-learn 即可使用。
-
您可进入 python-dateutil 找到 python-dateutil-2.6.0.zip 进行下载。
-
重命名为 python-dateutil.zip ,通过 MaxCompute Console 上传资源。
add archive python-dateutil.zip;
说明pytz 和 six 的上传方式同上,分别找到 pytz-2017.2.zip 和 six-1.11.0.tar.gz 进行下载和上传资源操作。
-
对于 Pandas 这种包含 c 的包,需要找到名字中包含 cp27-cp27m-manylinux1_x86_64 的 whl 包,这样才能在 MaxCompute 上正确执行。因此,您需要找到 pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl 进行下载,然后把后缀改成 zip,在 MaxCompute Console 中执行
add archive pandas.zip;
进行上传。scipy 和 scikit-learn 包的操作同上。
所有包需要下载的资源如下表所示。
包名
文件名
上传资源名
python-dateutil
python-dateutil.zip
pytz
pytz.zip
six
six.tar.gz
pandas
pandas.zip
scipy
scipy.zip
scikit-learn
sklearn.zip
-
指定第三方 Python 库
-
在全局指定使用的库。
>>> from odps import options >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
-
在立即执行的方法中,局部指定使用的库。
>>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])