PyODPS支持对MaxCompute表的基本操作,包括创建表、创建表的Schema、同步表更新、获取表数据、删除表、表分区操作以及如何将表转换为DataFrame对象。

背景信息

PyODPS提供对MaxCompute表的基本操作方法。

操作

说明

基本操作

列出项目空间下的所有表、判断表是否存在、获取表等基本操作。

创建表的Schema

使用PyODPS创建表的Schema。

创建表

使用PyODPS创建表。

同步表更新

使用PyODPS同步表更新。

写入表数据

使用PyODPS向表中写入数据。

向表中插入一行记录

使用PyODPS向表中插入一行记录。

获取表数据

使用PyODPS获取表中数据。

删除表

使用PyODPS删除表。

转换表为DataFrame

使用PyODPS转换表为DataFrame。

表分区

使用PyODPS判断是否为分区表、遍历表全部分区、判断分区是否存在、创建分区等。

数据上传下载通道

使用PyODPS操作Tunnel向MaxCompute中上传或者下载数据。

说明

更多PyODPS方法说明,请参见 Python SDK方法说明

前提条件:准备运行环境

PyODPS支持在DataWorks的PyODPS节点或本地PC环境中运行,运行前您需先选择运行工具并准备好运行环境。
  • 使用DataWorks:创建好PyODPS 2节点或PyODPS 3节点,详情请参见 通过DataWorks使用PyODPS
  • 使用本地PC环境:安装好PyODPS并初始化ODPS入口对象。

基本操作

当前项目内的表操作

  • 列出项目空间下的所有表:

    o.list_tables() 方法可以列出项目空间下的所有表。

    # 列出项目空间下的所有表。
    for table in o.list_tables():
        print(table)
  • 判断表是否存在:

    o.exist_table() 方法可以判断表是否存在。

    print(o.exist_table('pyodps_iris'))
    # 返回True表示表pyodps_iris存在。
  • 获取表:

    入口对象的 o.get_table() 方法可以获取表。

    • 获取表的schema信息。

      t = o.get_table('pyodps_iris')
      print(t.schema)  # 获取表pyodps_iris的schema

      返回值示例如下。

      odps.Schema {
        sepallength           double      # 片长度(cm)
        sepalwidth            double      # 片宽度(cm)
        petallength           double      # 瓣长度(cm)
        petalwidth            double      # 瓣宽度(cm)
        name                  string      # 种类
      }
    • 获取表列信息。

      t = o.get_table('pyodps_iris')
      print(t.schema.columns)  # 获取表pyodps_iris的schema中的列信息

      返回值示例如下。

      [<column sepallength, type double>,
       <column sepalwidth, type double>,
       <column petallength, type double>,
       <column petalwidth, type double>,
       <column name, type string>]
    • 获取表的某个列信息。

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength'])  # 获取表pyodps_iris的sepallength列信息

      返回值示例如下。

      <column sepallength, type double>
    • 获取表的某个列的备注信息。

      t = o.get_table('pyodps_iris')
      print(t.schema['sepallength'].comment)  # 获取表pyodps_iris的sepallength列的备注信息

      返回示例如下。

      片长度(cm)
    • 获取表的生命周期。

      t = o.get_table('pyodps_iris')
      print(t.lifecycle)  # 获取表pyodps_iris的生命周期

      返回值示例如下。

      -1
    • 获取表的创建时间。

      t = o.get_table('pyodps_iris')
      print(t.creation_time)  # 获取表pyodps_iris的创建时间
    • 获取表是否是虚拟视图。

      t = o.get_table('pyodps_iris')
      print(t.is_virtual_view)  # 获取表pyodps_iris是否是虚拟视图,返回False,表示不是。

    与上述示例类似,您也可以通过 t.size t.comment 来获取表的大小、表备注等信息。

    跨项目的表操作

    您可以通过 project 参数,跨项目获取表。

    t = o.get_table('table_name', project='other_project')

    其中 other_project 为所跨的项目, table_name 为跨项目获取的表名称。

创建表的Schema

初始化方法有如下两种:

  • 通过表的列以及可选的分区进行初始化。

    from odps.models import Schema, Column, Partition
    columns = [
        Column(name='num', type='bigint', comment='the column'),
        Column(name='num2', type='double', comment='the column2'),
    partitions = [Partition(name='pt', type='string', comment='the partition')]
    schema = Schema(columns=columns, partitions=partitions)

    初始化后,您可获取字段信息、分区信息等。

    • 获取所有字段信息。

      print(schema.columns)

      返回示例如下。

      [<column num, type bigint>,
       <column num2, type double>,
       <partition pt, type string>]
    • 获取分区字段。

      print(schema.partitions)

      返回示例如下。

      [<partition pt, type string>]
    • 获取非分区字段名称。

      print(schema.names)

      返回示例如下。

      ['num', 'num2']
    • 获取非分区字段类型。

      print(schema.types)

      返回示例如下。

      [bigint, double]
  • 使用 Schema.from_lists() 方法。该方法更容易调用,但无法直接设置列和分区的注释。

    from odps.models import Schema
    schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
    print(schema.columns)

    返回值示例如下。

    [<column num, type bigint>,
     <column num2, type double>,
     <partition pt, type string>]

创建表

您可以使用 o.create_table() 方法创建表,使用方式有两种:使用表Schema方式、使用字段名和字段类型方式。同时创建表时表字段的数据类型有一定的限制条件,详情如下。

使用表Schema创建表

使用表Schema创建表时,您需要先创建表的Schema,然后通过Schema创建表。

#创建表的schema
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
#通过schema创建表
table = o.create_table('my_new_table', schema)
#只有不存在表时,才创建表。
table = o.create_table('my_new_table', schema, if_not_exists=True)
#设置生命周期。
table = o.create_table('my_new_table', schema, lifecycle=7)

表创建完成后,您可以通过 print(o.exist_table('my_new_table')) 验证表是否创建成功,返回 True 表示表创建成功。

使用字段名及字段类型创建表

#创建分区表my_new_table,可传入(表字段列表,分区字段列表)。
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
#创建非分区表my_new_table02。
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)

表创建完成后,您可以通过 print(o.exist_table('my_new_table')) 验证表是否创建成功,返回 True 表示表创建成功。

使用字段名及字段类型创建表:新数据类型

未打开新数据类型开关时(默认关闭),创建表的数据类型只允许为BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY类型。如果您需要创建TINYINT和STRUCT等新数据类型字段的表,可以打开 options.sql.use_odps2_extension = True 开关,示例如下。

from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')

同步表更新

当一个表被其他程序更新,例如改变了Schema,可以调用 reload() 方法同步表的更新。

#表schema变更
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
#通过reload()同步表更新
table = o.create_table('my_new_table', schema)
table.reload()

写入表数据

  • 使用入口对象的 write_table() 方法写入数据。

    重要

    对于分区表,如果分区不存在,可以使用 create_partition 参数指定创建分区。

    records = [[111, 1.0],                 # 此处可以是list。
              [222, 2.0],
              [333, 3.0],
              [444, 4.0]]
    o.write_table('my_new_table', records, partition='pt=test', create_partition=True)  #创建pt=test分区并写入数据
    说明
    • 每次调用 write_table() 方法,MaxCompute都会在服务端生成一个文件。该操作耗时较长,同时文件过多会降低后续的查询效率。因此,建议您在使用此方法时,一次性写入多组数据,或者传入一个生成器对象。

    • 调用 write_table() 方法向表中写入数据时会追加到原有数据中。PyODPS不提供覆盖数据的选项,如果需要覆盖数据,请手动清除原有数据。对于非分区表,需要调用 table.truncate() 方法;对于分区表,需要删除分区后再建立新的分区。

  • 对表对象调用 open_writer() 方法写入数据。

    t = o.get_table('my_new_table')
    with t.open_writer(partition='pt=test02', create_partition=True) as writer:  #创建pt=test02分区并写入数据
        records = [[1, 1.0],                 # 此处可以是List。
                  [2, 2.0],
                  [3, 3.0],
                  [4, 4.0]]
        writer.write(records)  # 这里Records可以是可迭代对象。

    如果是多级分区表,写入示例如下。

    t = o.get_table('test_table')
    with t.open_writer(partition='pt1=test1,pt2=test2') as writer:  # 多级分区写法。
        records = [t.new_record([111, 'aaa', True]),   # 也可以是Record对象。
                   t.new_record([222, 'bbb', False]),
                   t.new_record([333, 'ccc', True]),
                   t.new_record([444, '中文', False])]
        writer.write(records)
  • 使用多进程并行写数据。

    每个进程写数据时共享同一个Session_ID,但是有不同的Block_ID。每个Block对应服务端的一个文件。主进程执行Commit,完成数据上传。

    import random
    from multiprocessing import Pool
    from odps.tunnel import TableTunnel
    def write_records(tunnel, table, session_id, block_id):
        # 对使用指定的ID创建Session。
        local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
        # 创建Writer时指定Block_ID。
        with local_session.open_record_writer(block_id) as writer:
            for i in range(5):
                # 生成数据并写入对应Block。
                record = table.new_record([random.randint(1, 100), random.random()])
                writer.write(record)
    if __name__ == '__main__':
        N_WORKERS = 3
        table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
        tunnel = TableTunnel(o)
        upload_session = tunnel.create_upload_session(table.name)
        # 每个进程使用同一个Session_ID。
        session_id = upload_session.id
        pool = Pool(processes=N_WORKERS)
        futures = []
        block_ids = []
        for i in range(N_WORKERS):
            futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i)))
            block_ids.append(i)
        [f.get() for f in futures]
        # 最后执行Commit,并指定所有Block。
        upload_session.commit(block_ids)

向表中插入一行记录

Record表示表的一行记录,对表对象调用 new_record() 方法即可创建一个新的Record。

t = o.get_table('test_table')
r = t.new_record(['val0', 'val1'])  # 值的个数必须等于表Schema的字段数。
r2 = t.new_record()     # 可以不传入值。
r2[0] = 'val0' # 通过偏移设置值。
r2['field1'] = 'val1'  # 通过字段名设置值。
r2.field1 = 'val1'  # 通过属性设置值。
print(record[0])  # 取第0个位置的值。
print(record['c_double_a'])  # 通过字段取值。
print(record.c_double_a)  # 通过属性取值。
print(record[0: 3])  # 切片操作。
print(record[0, 2, 3])  # 取多个位置的值。
print(record['c_int_a', 'c_double_a'])  # 通过多个字段取值。

获取表数据

获取表数据的方法有多种,常用方法如下:

  • 使用入口对象的 read_table() 方法。

    # 处理一条记录。
    for record in o.read_table('my_new_table', partition='pt=test'):
        print(record)
  • 如果您仅需要查看每个表最开始的小于1万条数据,可以对表对象调用 head() 方法。

    t = o.get_table('my_new_table')
    # 处理每个Record对象。
    for record in t.head(3):
        print(record)
  • 调用 open_reader() 方法读取数据。

    • 使用 with 表达式的写法如下。

      t = o.get_table('my_new_table')
      with t.open_reader(partition='pt=test') as reader:
      count = reader.count
      for record in reader[5:10]  # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。
          print(record)  # 处理一条记录,例如打印记录本身
    • 不使用 with 表达式的写法如下。

      reader = t.open_reader(partition='pt=test')
      count = reader.count
      for record in reader[5:10]  # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。
          print(record)  # 处理一条记录,例如打印记录本身

删除表

使用 delete_table() 方法删除已经存在的表。

o.delete_table('my_table_name', if_exists=True)  # 只有表存在时,才删除表。
t.drop()  # Table对象存在时,直接调用Drop方法删除。

转换表为DataFrame

PyODPS提供了 DataFrame框架 ,支持以更方便的方式查询和操作MaxCompute数据。使用 to_df() 方法,即可转化为DataFrame对象。

table = o.get_table('my_table_name')
df = table.to_df()

表分区

  • 判断是否为分区表。

    table = o.get_table('my_new_table')
    if table.schema.partitions:
        print('Table %s is partitioned.' % table.name)
  • 遍历表全部分区。

    table = o.get_table('my_new_table')
    for partition in table.partitions:  # 遍历所有分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    for partition in table.iterate_partitions(spec='pt=test'):  # 遍历 pt=test 分区下的二级分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    for partition in table.iterate_partitions(spec='dt>20230119'):  # 遍历 dt>20230119 分区下的二级分区
        print(partition.name)  # 具体的遍历步骤,这里是打印分区名
    重要

    PyODPS自0.11.3版本开始,支持为 iterate_partitions 指定逻辑表达式,如上述示例中的 dt>20230119

  • 判断分区是否存在。

    table = o.get_table('my_new_table')
    table.exist_partition('pt=test,sub=2015')
  • 获取分区。

    table = o.get_table('my_new_table')
    partition = table.get_partition('pt=test')
    print(partition.creation_time)
    partition.size
  • 创建分区。

    t = o.get_table('my_new_table')
    t.create_partition('pt=test', if_not_exists=True)  # 指定if_not_exists参数,分区不存在时才创建分区。
  • 删除分区。

    t = o.get_table('my_new_table')
    t.delete_partition('pt=test', if_exists=True)  # 指定if_exists参数,分区存在时才删除分区。
    partition.drop()  # 分区对象存在时,直接对分区对象调用Drop方法删除。

数据上传下载通道

Tunnel是MaxCompute的数据通道,用户可以通过Tunnel向MaxCompute中上传或者下载数据。

  • 上传数据示例

    from odps.tunnel import TableTunnel
    table = o.get_table('my_table')
    tunnel = TableTunnel(odps)
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    with upload_session.open_record_writer(0) as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    # 需要在 with 代码块外 commit,否则数据未写入即 commit,会导致报错
    upload_session.commit([0])
  • 下载数据示例

    from odps.tunnel import TableTunnel
    tunnel = TableTunnel(odps)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
    # 处理每条记录。
    with download_session.open_record_reader(0, download_session.count) as reader:
        for record in reader:
            print(record)  # 具体的遍历步骤,这里是打印记录对象
说明
  • PyODPS不支持上传外部表,例如OSS和OTS的表。

  • 不推荐直接使用Tunnel接口,推荐您直接使用表对象的写和读接口。

  • 如果您安装了CPython,在安装PyODPS时会编译C代码,加速Tunnel的上传和下载。