执行并获取结果

执行并获取结果

本文为您介绍 DataFrame 操作支持的执行方法。

前提条件

您需要提前完成以下步骤,用于操作本文中的示例:

延迟执行

DataFrame 上的所有操作并不会立即执行,只有当显式调用 execute 方法,或者调用立即执行的方法时(内部调用的也是 execute ),才会执行这些操作。立即执行的方法如下表所示。

方法

说明

返回值

persist

将执行结果保存到 MaxCompute 表。

PyODPS DataFrame

execute

执行并返回全部结果。

ResultFrame

head

查看开头 N 行数据,这个方法会执行所有结果,并取开头 N 行数据。

ResultFrame

tail

查看结尾 N 行数据,这个方法会执行所有结果,并取结尾 N 行数据。

ResultFrame

to_pandas

转换为 Pandas DataFrame 或者 Series,wrap 参数为 True 的时候,返回 PyODPS DataFrame 对象。

  • wrap True 时,返回 PyODPS DataFrame。

  • wrap False 时,返回 Pandas DataFrame。False 为默认值。

plot,hist,boxplot

画图有关。

不涉及

说明

在交互式环境下,PyODPS DataFrame 会在打印或者 repr 的时候,调用 execute 方法,您无需手动调用 execute

示例

# 非交互环境执行,需手动调用execute方法
print(iris[iris.sepallength < 5][:5].execute())
# 交互环境执行,自动调用execute方法
print(iris[iris.sepallength < 5][:5])

返回结果:

   sepallength  sepalwidth  petallength  petalwidth         name
0          4.9         3.0          1.4         0.2  Iris-setosa
1          4.7         3.2          1.3         0.2  Iris-setosa
2          4.6         3.1          1.5         0.2  Iris-setosa
3          4.6         3.4          1.4         0.3  Iris-setosa
4          4.4         2.9          1.4         0.2  Iris-setosa

在交互环境中,如果您需要关闭自动调用执行,请进行手动设置,设置方式如下。

from odps import options
options.interactive = False
print(iris[iris.sepallength < 5][:5])

返回结果:

Collection: ref_0
  odps.Table
    name: hudi_mc_0612.`iris3`
    schema:
      sepallength           : double      # 片长度(cm)
      sepalwidth            : double      # 片宽度(cm)
      petallength           : double      # 瓣长度(cm)
      petalwidth            : double      # 瓣宽度(cm)
      name                  : string      # 种类
Collection: ref_1
  Filter[collection]
    collection: ref_0
    predicate:
      Less[sequence(boolean)]
        sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
        Scalar[int8]
Slice[collection]
  collection: ref_1
  stop:
    Scalar[int8]
      5

关闭自动调用执行后,打印 repr 对象,会显示整个抽象语法树。 如果需要执行,则必须手动调用 execute 方法。

读取执行结果

execute head 函数输出的结果为 ResultFrame 类型,可从中读取结果。

说明

ResultFrame 是结果集合,不能参与后续计算。

  • ResultFrame 可以迭代取出每条记录。 示例如下:

    result = iris.head(3)
    for r in result:
        print(list(r))

    返回结果:

    [4.9, 3.0, 1.4, 0.2, 'Iris-setosa']
    [4.7, 3.2, 1.3, 0.2, 'Iris-setosa']
    [4.6, 3.1, 1.5, 0.2, 'Iris-setosa']
  • ResultFrame 也支持在安装有 Pandas 的前提下转换为 Pandas DataFrame,或使用 Pandas 后端的 PyODPS DataFrame。

    # 返回Pandas DataFrame。
    pd_df = iris.head(3).to_pandas()
    # 返回使用Pandas后端的PyODPS DataFrame。
    wrapped_df = iris.head(3).to_pandas(wrap=True)  

保存执行结果为 MaxCompute

  • 对于 Collection,您可以调用 persist 方法,用于返回一个新的 DataFrame 对象,参数为表名。

    iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris')
    print(iris2.head(5))

    返回结果:

       sepallength  sepalwidth  petallength  petalwidth             name
    0          4.5         2.3          1.3         0.3      Iris-setosa
    1          5.5         2.3          4.0         1.3  Iris-versicolor
    2          4.9         2.4          3.3         1.0  Iris-versicolor
    3          5.0         2.0          3.5         1.0  Iris-versicolor
    4          6.0         2.2          4.0         1.0  Iris-versicolor
  • persist 可以传入 partitions 参数,用于创建一个分区表。它的分区是 partitions 所指定的字段。

    iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris_test', partitions=['name'])
    print(iris3.data)

    返回结果:

    odps.Table
      name: odps_test_sqltask_finance.`pyodps_iris`
      schema:
        sepallength           : double
        sepalwidth            : double
        petallength           : double
        petalwidth            : double
      partitions:
        name                  : string
  • 如果您需要写入已经存在的表的某个分区, persist 可以传入 partition 参数,指明写入表的哪个分区(例如 ds=****** )。该 DataFrame 的每个字段的类型都必须相同,且都存在于该表中。 drop_partition create_partition 参数只在此时有效,分别表示是否要删除(如果分区存在)或创建(如果分区不存在)该分区。

    print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris_partition', partition='ds=test', drop_partition=True, create_partition=True).head(5))

    返回结果:

       sepallength  sepalwidth  petallength  petalwidth             name    ds
    0          4.5         2.3          1.3         0.3      Iris-setosa  test
    1          5.5         2.3          4.0         1.3  Iris-versicolor  test
    2          4.9         2.4          3.3         1.0  Iris-versicolor  test
    3          5.0         2.0          3.5         1.0  Iris-versicolor  test
    4          6.0         2.2          4.0         1.0  Iris-versicolor  test
  • 写入表时,您还可以指定表的生命周期。例如下列语句将表的生命周期指定为 10 天。

    print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris', lifecycle=10).head(5))

    返回结果:

       sepallength  sepalwidth  petallength  petalwidth             name
    0          4.5         2.3          1.3         0.3      Iris-setosa
    1          5.5         2.3          4.0         1.3  Iris-versicolor
    2          4.9         2.4          3.3         1.0  Iris-versicolor
    3          5.0         2.0          3.5         1.0  Iris-versicolor
    4          6.0         2.2          4.0         1.0  Iris-versicolor
  • 如果数据源中没有 ODPS 对象,例如数据源仅为 Pandas,在 persist 时需要手动指定 ODPS 入口对象,或者将需要的入口对象标明为全局对象。

    # 假设入口对象为o。
    # 指定入口对象。
    df.persist('table_name', odps=o)
    # 或者可将入口对象标记为全局。
    o.to_global()
    df.persist('table_name')

保存执行结果为 Pandas DataFrame

您可以使用 to_pandas 方法,如果 wrap 参数为 True,将返回 PyODPS DataFrame 对象。

  • 示例 1:使用 to_pandas 方法,返回 Pandas DataFrame 对象。

    print(type(iris[iris.sepalwidth < 2.5].to_pandas()))

    返回结果:

    <class 'pandas.core.frame.DataFrame'>
  • 示例 2: wrap 参数为 True,返回 PyODPS DataFrame 对象。

    print(type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True)))

    返回结果:

    <class 'odps.df.core.DataFrame'>
说明

PyODPS 可以执行 open_reader 方法,通过 reader.to_pandas() 转成 Pandas DataFrame。详情请参见

立即运行设置运行参数

对于立即执行的方法,例如 execute persist to_pandas 等,您可以通过以下方法设置它们运行时的参数(仅对 ODPS SQL 后端有效):

  • 设置全局参数。详情请参见 SQL

  • 在这些立即执行的方法上,使用 hints 参数,可以确保这些参数只作用于当前的计算过程。

    print(iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16}))

    返回结果:

       sepallength  sepalwidth  petallength  petalwidth             name
    0          4.5         2.3          1.3         0.3      Iris-setosa
    1          4.9         2.4          3.3         1.0  Iris-versicolor

运行时显示详细信息

  • 如果您需要查看运行时 Instance Logview,则应该修改全局配置。代码示例如下。

    from odps import options
    options.verbose = True
    print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())

    返回结果:

    Sql compiled:
    SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
    FROM odps_test_sqltask_finance.`pyodps_iris` t1
    WHERE t1.`sepallength` < 5
    LIMIT 5
    Instance ID:
      Log view:http://logview
       sepalwidth  petallength  petalwidth             name
    0         2.3          1.3         0.3      Iris-setosa
    1         2.4          3.3         1.0  Iris-versicolor
  • 您可以指定自己的日志记录函数。 代码示例如下。

    my_logs = []
    def my_logger(x):
        my_logs.append(x)
    options.verbose_log = my_logger
    print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())
    print(my_logs)

    返回结果:

       sepalwidth  petallength  petalwidth             name
    0         2.3          1.3         0.3      Iris-setosa
    1         2.4          3.3         1.0  Iris-versicolor
    ['Sql compiled:', 'CREATE TABLE tmp_pyodps_24332bdb_4fd0_4d0d_aed4_38a443618268 LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20230815034706122gbymevg*****', '  Log view:]

缓存中间 Collection 计算结果

DataFrame 的计算过程中,部分 Collection 被多处使用。如果您需要查看中间过程的执行结果, 可以使用 cache 标记某个需要被优先计算的 Collection。示例如下。

说明

cache 会延迟执行,调用 cache 不会触发立即计算。

cached = iris[iris.sepalwidth < 3.5]['sepallength', 'name'].cache()
df = cached.head(3)
print(df)
# 返回结果
   sepallength             name
0          4.5      Iris-setosa
1          5.5  Iris-versicolor
2          4.9  Iris-versicolor
# 由于cached已经被计算,所以能立刻取到计算结果。
print(cached.head(3))
#返回结果
   sepallength             name
0          4.5      Iris-setosa
1          5.5  Iris-versicolor
2          4.9  Iris-versicolor

异步和并行执行

异步执行

DataFrame 支持异步操作,对于立即执行的方法,包括 execute persist head tail to_pandas (其他方法不支持), 传入 async 参数,即可以将这个操作异步执行, timeout 参数指定超时时间, 异步返回的是 Future 对象。

future = iris[iris.sepalwidth < 10].head(10, async_=True)
print(future.result())
# 返回结果
   sepallength  sepalwidth  petallength  petalwidth             name
0          4.5         2.3          1.3         0.3      Iris-setosa
1          5.5         2.3          4.0         1.3  Iris-versicolor
2          4.9         2.4          3.3         1.0  Iris-versicolor
3          5.0         2.0          3.5         1.0  Iris-versicolor
4          6.0         2.2          4.0         1.0  Iris-versicolor
5          6.2         2.2          4.5         1.5  Iris-versicolor
6          5.5         2.4          3.8         1.1  Iris-versicolor
7          5.5         2.4          3.7         1.0  Iris-versicolor
8          6.3         2.3          4.4         1.3  Iris-versicolor
9          5.0         2.3          3.3         1.0  Iris-versicolor

并行执行

您可以使用新的 Delay API ,将立即执行的操作,包括 execute persist head tail to_pandas (其他方法不支持),变成延迟操作,并返回 Future 对象。当触发 delay 执行时,会去寻找依赖,按照给定的并发度执行,并支持异步执行。

from odps.df import Delay
delay = Delay()  # 创建Delay对象。
df = iris[iris.sepal_width < 5].cache()  # 有一个共同的依赖。
future1 = df.sepal_width.sum().execute(delay=delay)  # 立即返回future对象,此时并没有执行。
future2 = df.sepal_width.mean().execute(delay=delay)
future3 = df.sepal_length.max().execute(delay=delay)
delay.execute(n_parallel=3)  # 并发度是3,此时才真正执行。
|==========================================|   1 /  1  (100.00%)        21s
print(future1.result())
# 返回结果
print(future2.result())
# 返回结果
2.272727272727273

上述示例中,共同依赖的对象会先执行,然后再以并发度为 3 分别执行 future1 future3

delay.execute 也接受 async 操作指定是否异步执行,当异步执行的时候,也可以用 timeout 参数指定超时时间。