本文为您介绍 DataFrame 操作支持的执行方法。
前提条件
您需要提前完成以下步骤,用于操作本文中的示例:
-
准备示例表 pyodps_iris ,详情请参见 Dataframe 数据处理 。
-
创建 DataFrame,详情请参见 从 MaxCompute 表创建 DataFrame 。
延迟执行
DataFrame
上的所有操作并不会立即执行,只有当显式调用
execute
方法,或者调用立即执行的方法时(内部调用的也是
execute
),才会执行这些操作。立即执行的方法如下表所示。
方法 |
说明 |
返回值 |
persist |
将执行结果保存到 MaxCompute 表。 |
PyODPS DataFrame |
execute |
执行并返回全部结果。 |
ResultFrame |
head |
查看开头 N 行数据,这个方法会执行所有结果,并取开头 N 行数据。 |
ResultFrame |
tail |
查看结尾 N 行数据,这个方法会执行所有结果,并取结尾 N 行数据。 |
ResultFrame |
to_pandas |
转换为 Pandas DataFrame 或者 Series,wrap 参数为 True 的时候,返回 PyODPS DataFrame 对象。 |
|
plot,hist,boxplot |
画图有关。 |
不涉及 |
在交互式环境下,PyODPS DataFrame
会在打印或者
repr
的时候,调用
execute
方法,您无需手动调用
execute
。
示例
# 非交互环境执行,需手动调用execute方法
print(iris[iris.sepallength < 5][:5].execute())
# 交互环境执行,自动调用execute方法
print(iris[iris.sepallength < 5][:5])
返回结果:
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.7 3.2 1.3 0.2 Iris-setosa
2 4.6 3.1 1.5 0.2 Iris-setosa
3 4.6 3.4 1.4 0.3 Iris-setosa
4 4.4 2.9 1.4 0.2 Iris-setosa
在交互环境中,如果您需要关闭自动调用执行,请进行手动设置,设置方式如下。
from odps import options
options.interactive = False
print(iris[iris.sepallength < 5][:5])
返回结果:
Collection: ref_0
odps.Table
name: hudi_mc_0612.`iris3`
schema:
sepallength : double # 片长度(cm)
sepalwidth : double # 片宽度(cm)
petallength : double # 瓣长度(cm)
petalwidth : double # 瓣宽度(cm)
name : string # 种类
Collection: ref_1
Filter[collection]
collection: ref_0
predicate:
Less[sequence(boolean)]
sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
Scalar[int8]
Slice[collection]
collection: ref_1
stop:
Scalar[int8]
5
关闭自动调用执行后,打印
repr
对象,会显示整个抽象语法树。 如果需要执行,则必须手动调用
execute
方法。
读取执行结果
execute
或
head
函数输出的结果为
ResultFrame
类型,可从中读取结果。
ResultFrame 是结果集合,不能参与后续计算。
-
ResultFrame 可以迭代取出每条记录。 示例如下:
result = iris.head(3) for r in result: print(list(r))
返回结果:
[4.9, 3.0, 1.4, 0.2, 'Iris-setosa'] [4.7, 3.2, 1.3, 0.2, 'Iris-setosa'] [4.6, 3.1, 1.5, 0.2, 'Iris-setosa']
-
ResultFrame 也支持在安装有 Pandas 的前提下转换为 Pandas DataFrame,或使用 Pandas 后端的 PyODPS DataFrame。
# 返回Pandas DataFrame。 pd_df = iris.head(3).to_pandas() # 返回使用Pandas后端的PyODPS DataFrame。 wrapped_df = iris.head(3).to_pandas(wrap=True)
保存执行结果为 MaxCompute 表
-
对于 Collection,您可以调用
persist
方法,用于返回一个新的 DataFrame 对象,参数为表名。iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris') print(iris2.head(5))
返回结果:
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 5.5 2.3 4.0 1.3 Iris-versicolor 2 4.9 2.4 3.3 1.0 Iris-versicolor 3 5.0 2.0 3.5 1.0 Iris-versicolor 4 6.0 2.2 4.0 1.0 Iris-versicolor
-
persist
可以传入partitions
参数,用于创建一个分区表。它的分区是partitions
所指定的字段。iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris_test', partitions=['name']) print(iris3.data)
返回结果:
odps.Table name: odps_test_sqltask_finance.`pyodps_iris` schema: sepallength : double sepalwidth : double petallength : double petalwidth : double partitions: name : string
-
如果您需要写入已经存在的表的某个分区,
persist
可以传入partition
参数,指明写入表的哪个分区(例如ds=******
)。该 DataFrame 的每个字段的类型都必须相同,且都存在于该表中。drop_partition
和create_partition
参数只在此时有效,分别表示是否要删除(如果分区存在)或创建(如果分区不存在)该分区。print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris_partition', partition='ds=test', drop_partition=True, create_partition=True).head(5))
返回结果:
sepallength sepalwidth petallength petalwidth name ds 0 4.5 2.3 1.3 0.3 Iris-setosa test 1 5.5 2.3 4.0 1.3 Iris-versicolor test 2 4.9 2.4 3.3 1.0 Iris-versicolor test 3 5.0 2.0 3.5 1.0 Iris-versicolor test 4 6.0 2.2 4.0 1.0 Iris-versicolor test
-
写入表时,您还可以指定表的生命周期。例如下列语句将表的生命周期指定为 10 天。
print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris', lifecycle=10).head(5))
返回结果:
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 5.5 2.3 4.0 1.3 Iris-versicolor 2 4.9 2.4 3.3 1.0 Iris-versicolor 3 5.0 2.0 3.5 1.0 Iris-versicolor 4 6.0 2.2 4.0 1.0 Iris-versicolor
-
如果数据源中没有 ODPS 对象,例如数据源仅为 Pandas,在
persist
时需要手动指定 ODPS 入口对象,或者将需要的入口对象标明为全局对象。# 假设入口对象为o。 # 指定入口对象。 df.persist('table_name', odps=o) # 或者可将入口对象标记为全局。 o.to_global() df.persist('table_name')
保存执行结果为 Pandas DataFrame
您可以使用
to_pandas
方法,如果
wrap
参数为
True,将返回
PyODPS DataFrame
对象。
-
示例 1:使用
to_pandas
方法,返回 Pandas DataFrame 对象。print(type(iris[iris.sepalwidth < 2.5].to_pandas()))
返回结果:
<class 'pandas.core.frame.DataFrame'>
-
示例 2:
wrap
参数为 True,返回 PyODPS DataFrame 对象。print(type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True)))
返回结果:
<class 'odps.df.core.DataFrame'>
PyODPS
可以执行
open_reader
方法,通过
reader.to_pandas()
转成
Pandas DataFrame。详情请参见
表
。
立即运行设置运行参数
对于立即执行的方法,例如
execute
、
persist
、
to_pandas
等,您可以通过以下方法设置它们运行时的参数(仅对
ODPS SQL
后端有效):
-
设置全局参数。详情请参见 SQL 。
-
在这些立即执行的方法上,使用
hints
参数,可以确保这些参数只作用于当前的计算过程。print(iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16}))
返回结果:
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 4.9 2.4 3.3 1.0 Iris-versicolor
运行时显示详细信息
-
如果您需要查看运行时 Instance 的 Logview,则应该修改全局配置。代码示例如下。
from odps import options options.verbose = True print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())
返回结果:
Sql compiled: SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` FROM odps_test_sqltask_finance.`pyodps_iris` t1 WHERE t1.`sepallength` < 5 LIMIT 5 Instance ID: Log view:http://logview sepalwidth petallength petalwidth name 0 2.3 1.3 0.3 Iris-setosa 1 2.4 3.3 1.0 Iris-versicolor
-
您可以指定自己的日志记录函数。 代码示例如下。
my_logs = [] def my_logger(x): my_logs.append(x) options.verbose_log = my_logger print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()) print(my_logs)
返回结果:
sepalwidth petallength petalwidth name 0 2.3 1.3 0.3 Iris-setosa 1 2.4 3.3 1.0 Iris-versicolor ['Sql compiled:', 'CREATE TABLE tmp_pyodps_24332bdb_4fd0_4d0d_aed4_38a443618268 LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20230815034706122gbymevg*****', ' Log view:]
缓存中间 Collection 计算结果
DataFrame
的计算过程中,部分
Collection
被多处使用。如果您需要查看中间过程的执行结果, 可以使用
cache
标记某个需要被优先计算的
Collection。示例如下。
cache
会延迟执行,调用
cache
不会触发立即计算。
cached = iris[iris.sepalwidth < 3.5]['sepallength', 'name'].cache()
df = cached.head(3)
print(df)
# 返回结果
sepallength name
0 4.5 Iris-setosa
1 5.5 Iris-versicolor
2 4.9 Iris-versicolor
# 由于cached已经被计算,所以能立刻取到计算结果。
print(cached.head(3))
#返回结果
sepallength name
0 4.5 Iris-setosa
1 5.5 Iris-versicolor
2 4.9 Iris-versicolor
异步和并行执行
异步执行
DataFrame
支持异步操作,对于立即执行的方法,包括
execute
、
persist
、
head
、
tail
、
to_pandas
(其他方法不支持), 传入
async
参数,即可以将这个操作异步执行,
timeout
参数指定超时时间, 异步返回的是
Future
对象。
future = iris[iris.sepalwidth < 10].head(10, async_=True)
print(future.result())
# 返回结果
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 5.5 2.3 4.0 1.3 Iris-versicolor
2 4.9 2.4 3.3 1.0 Iris-versicolor
3 5.0 2.0 3.5 1.0 Iris-versicolor
4 6.0 2.2 4.0 1.0 Iris-versicolor
5 6.2 2.2 4.5 1.5 Iris-versicolor
6 5.5 2.4 3.8 1.1 Iris-versicolor
7 5.5 2.4 3.7 1.0 Iris-versicolor
8 6.3 2.3 4.4 1.3 Iris-versicolor
9 5.0 2.3 3.3 1.0 Iris-versicolor
并行执行
您可以使用新的
Delay API
,将立即执行的操作,包括
execute
、
persist
、
head
、
tail
、
to_pandas
(其他方法不支持),变成延迟操作,并返回
Future
对象。当触发
delay
执行时,会去寻找依赖,按照给定的并发度执行,并支持异步执行。
from odps.df import Delay
delay = Delay() # 创建Delay对象。
df = iris[iris.sepal_width < 5].cache() # 有一个共同的依赖。
future1 = df.sepal_width.sum().execute(delay=delay) # 立即返回future对象,此时并没有执行。
future2 = df.sepal_width.mean().execute(delay=delay)
future3 = df.sepal_length.max().execute(delay=delay)
delay.execute(n_parallel=3) # 并发度是3,此时才真正执行。
|==========================================| 1 / 1 (100.00%) 21s
print(future1.result())
# 返回结果
print(future2.result())
# 返回结果
2.272727272727273
上述示例中,共同依赖的对象会先执行,然后再以并发度为
3
分别执行
future1
到
future3
。
delay.execute
也接受
async
操作指定是否异步执行,当异步执行的时候,也可以用
timeout
参数指定超时时间。