使用自定义函数及Python第三方库

使用自定义函数及Python第三方库

本文为您介绍如何使用自定义函数及 Python 第三方库。

使用自定义函数

DataFrame 函数支持对 Sequence 使用 map ,它会对它的每个元素调用自定义函数。

>>> iris.sepallength.map(lambda x: x + 1).head(5)
   sepallength
0          6.1
1          5.9
2          5.7
3          5.6
4          6.0
说明

目前,自定义函数无法支持将 List/Dict 类型作为输入或输出。

如果 map 前后,Sequence 的类型发生了变化,则需要显式指定 map 后的类型。

>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
   sepallength
0         t5.1
1         t4.9
2         t4.7
3         t4.6
4         t5.0

如果在函数中包含闭包,则函数外闭包变量值的变化会引起函数内该变量值的变化。

>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(lambda x: x + i))

结果为 dfs 中每个 SequenceExpr 均为 df.sepal_length+9 。为解决此问题,可以将函数作为另一函数的返回值,或者使用 partial 。两个示例如下。

>>> dfs = []
>>> def get_mapper(i):
>>>     return lambda x: x + i
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(get_mapper(i)))
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))

map 也支持使用现有的 UDF 函数,传入的参数是 str 类型(函数名)或者 Function 对象,详情请参见 函数

map 传入 Python 函数的实现使用了 MaxCompute Python UDF。因此,如果您所在的 Project 不支持 Python UDF,则 map 函数无法使用。除此以外,所有 Python UDF 的限制在此都适用。

目前,默认可使用的第三方库(包含 C)只有 NumPy,第三方库使用详情请参见 使用第三方 Python

除了调用自定义函数,DataFrame 还提供了很多内置函数,这些函数中部分使用了 map 函数来实现。因此,如果您所在 Project 未开通 Python UDF,则无法使用这些函数(注意:阿里云公共服务暂不提供对 Python UDF 的支持)。

说明

由于字节码定义的差异,Python 3 下使用新语言特性(例如 yield from )时,代码在使用 Python 2.7 MaxCompute Worker 上执行时会发生错误。因此,建议您在 Python 3 下使用 MapReduce API 编写生产作业前,先确认相关代码是否能正常执行。

示例程序:使用计数器

from odps.udf import get_execution_context
def h(x):
    ctx = get_execution_context()
    counters = ctx.get_counters()
    counters.get_counter('df', 'add_one').increment(1)
    return x + 1
df.field.map(h)

Logview JSONSummary 中即可找到计数器值。

对一行数据使用自定义函数

如果您需要对一行数据使用自定义函数,可以使用 apply 方法。参数 axis 的值必须设为 1,表示对行进行操作。 apply 的自定义函数接收一个参数,参数为上一步 Collection 的一行数据。您可以通过属性或者偏移获得一个字段的数据。

  • reduce True 时,表示返回结果为 Sequence,否则返回结果为 Collection。 names types 参数分别指定返回的 Sequence Collection 的字段名和类型。 如果未指定类型,则会默认为 STRING 类型。

    >>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
       sepaladd
    0       8.6
    1       7.9
    2       7.9
  • apply 的自定义函数中, reduce False 时,您可以使用 yield 关键字返回多行结果。

    >>> iris.count()
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
    300
  • 您也可以在函数中注释返回的字段和类型,无需在函数调用时再指定它们。

    >>> from odps.df import output
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>> iris.apply(handle, axis=1).count()
    300
  • 您也可以使用 map-only map_reduce ,该操作与 axis=1 apply 操作是等价的。

    >>> iris.map_reduce(mapper=handle).count()
    300
  • 如果您想调用 MaxCompute 上已经存在的 UDTF,函数指定为函数名即可。

    >>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
  • 使用 apply 对行操作,且 reduce False 时,您可以使用并列多行输出与已有的行结合,用于后续聚合等操作。

    >>> from odps.df import output
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>> iris[iris.category, iris.apply(handle, axis=1)]

对所有列调用自定义聚合

调用 apply 方法,当不指定 axis ,或者 axis 值为 0 时,您可以通过传入一个自定义聚合类对所有 Sequence 进行聚合操作。

class Agg(object):
    def buffer(self):
        return [0.0, 0]
    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1
    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]
    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
   sepallength_aggregation  sepalwidth_aggregation  petallength_aggregation  petalwidth_aggregation
0                 5.843333                   3.054                 3.758667                1.198667
说明

目前,自定义函数无法支持将 LIST/DICT 类型作为初始输入或最终输出结果。

引用资源

自定义函数也能读取 MaxCompute 上的资源(表资源或文件资源),或者引用一个 Collection 作为资源。此时,自定义函数需要写成函数闭包或 Callable 的类。两个示例如下。

>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
       sepallength
0      Iris-setosa
1  Iris-versicolor
>>> def myfunc(resources):  # resources按调用顺序传入。
>>>     names = set()
>>>     fileobj = resources[0] # 文件资源是一个file-like的object。
>>>     for l in fileobj:
>>>         names.add(l)
>>>     collection = resources[1]
>>>     for r in collection:
>>>         names.add(r.name)  # 这里可以通过字段名或者偏移来取。
>>>     def h(x):
>>>         if x in names:
>>>             return True
>>>         else:
>>>             return False
>>>     return h
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>>         df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
              name   isin
0      Iris-setosa   True
1  Iris-versicolor   True
2   Iris-virginica  False
说明

分区表资源在读取时不包含分区字段。

axis 值为 1,即在行上操作时,您需要写一个函数闭包或者 Callable 的类。 而对于列上的聚合操作,您只需在 __init__ 函数里读取资源即可。

>>> words_df
                     sentence
0                 Hello World
1                Hello Python
2  Life is short I use Python
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>>     stop_words = set([r[0] for r in resources[0]])
>>>     def h(row):
>>>         return ' '.join(w for w in row[0].split() if w not in stop_words),
>>>     return h
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
                sentence
0            Hello World
1           Hello Python
2  Life short use Python
说明

这里的 stop_words 存放于本地,但在真正执行时会被上传到 MaxCompute 作为资源引用。

使用第三方 Python

您可以把第三方 Python 包( whl egg zip 以及 tar.gz )上传到 MaxCompute,并在全局或在立即执行时指定包文件,请确保指定所有依赖库,否则会导致导入错误。

  • 您可以通过 PyODPS 的资源上传接口 create_resource 来完成资源的上传。

    以下是使用 python-dateutil 包的举例:

    1. 使用 pip download 命令,下载包以及其依赖到某个路径。下载后会出现两个包: six-1.10.0-py2.py3-none-any.whl python_dateutil-2.5.3-py2.py3-none-any.whl (注意:这里需要下载支持 Linux 环境的包)。

      $ pip download python-dateutil -d /to/path/
    2. 分别把两个文件上传到 MaxCompute 资源。

      >>> # 确保资源名后缀正确。
      >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb'))
      >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
    3. 现在有个 DataFrame,只有一个 STRING 类型字段。

      >>> df
                     datestr
      0  2016-08-26 14:03:29
      1  2015-08-26 14:03:29
    4. 全局配置使用到的三方库如下。

      >>> from odps import options
      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
      >>> df.datestr.map(get_year)
         datestr
      0     2016
      1     2015

      或者通过运行方法的 libraries 参数指定使用到的第三方库。

      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
         datestr
      0     2016
      1     2015

    PyODPS 默认支持执行仅包含 Python 且不含文件操作的第三方库。在较新版本的 MaxCompute 服务下,PyODPS 也支持执行带有二进制代码或带有文件操作的 Python 库。这些库的后缀必须是 cp27-cp27m-manylinux1_x86_64 ,以 archive 方式上传, whl 后缀的包需要重命名为 zip 。同时,作业需要开启 odps.isolation.session.enable 选项,或者在 Project 级别开启 isolation 。以下示例为您展示如何上传并使用 scipy 中的特殊函数。

    >>> # 对于含有二进制代码的包,必须使用archive方式上传资源,whl后缀需要改为zip。
    >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))
    >>> # 如果Project开启了isolation,下面的选项不是必需的。
    >>> options.sql.settings = { 'odps.isolation.session.enable': True }
    >>> def psi(value):
    >>>     # 建议在函数内部import第三方库,以防止不同操作系统下二进制包结构差异造成执行错误。
    >>>     from scipy.special import psi
    >>>     return float(psi(value))
    >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])

    对于只提供源码的二进制包,可以在 Linux Shell 中打包成 Wheel 再上传,Mac Windows 中生成的 Wheel 包无法在 MaxCompute 中使用。

    python setup.py bdist_wheel
  • 您也可以通过 MaxCompute Console 上传资源。

    1. 现在主流的 Python 包都提供了 whl 包,提供了各平台包含二进制文件的包,因此找到可以在 MaxCompute 上运行的包是第一步。

    2. 其次,需要包含所有的依赖包。各个包的依赖情况如下表所示。

      包名

      依赖

      pandas

      numpy、python-dateutil、pytz、six

      scipy

      numpy

      scikit-learn

      numpy、scipy

      说明

      其中 numpy 已包含,您只需上传 python-dateutil、pytz、pandas、scipy、sklearn、six 包,pandas、scipy scikit-learn 即可使用。

    3. 您可进入 python-dateutil 找到 python-dateutil-2.6.0.zip 进行下载。

    4. 重命名为 python-dateutil.zip ,通过 MaxCompute Console 上传资源。

      add archive python-dateutil.zip;
      说明

      pytz six 的上传方式同上,分别找到 pytz-2017.2.zip six-1.11.0.tar.gz 进行下载和上传资源操作。

    5. 对于 Pandas 这种包含 c 的包,需要找到名字中包含 cp27-cp27m-manylinux1_x86_64 whl 包,这样才能在 MaxCompute 上正确执行。因此,您需要找到 pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl 进行下载,然后把后缀改成 zip,在 MaxCompute Console 中执行 add archive pandas.zip; 进行上传。scipy scikit-learn 包的操作同上。

    所有包需要下载的资源如下表所示。

    包名

    文件名

    上传资源名

    python-dateutil

    python-dateutil-2.6.0.zip

    python-dateutil.zip

    pytz

    pytz-2017.2.zip

    pytz.zip

    six

    six-1.11.0.tar.gz

    six.tar.gz

    pandas

    pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.zip

    pandas.zip

    scipy

    scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.zip

    scipy.zip

    scikit-learn

    scikit_learn-0.18.1-cp27-cp27m-manylinux1_x86_64.zip

    sklearn.zip

指定第三方 Python

  • 在全局指定使用的库。

    >>> from odps import options
    >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
  • 在立即执行的方法中,局部指定使用的库。

    >>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])