Python分片上传

更新时间:

在上传大文件(超过5 GB)到OSS的过程中,如果出现网络中断、程序异常退出等问题导致文件上传失败,您需要使用分片上传的方式上传大文件。分片上传通过将待上传的大文件分成多个较小的碎片(Part),充分利用网络带宽和服务器资源并发上传多个Part,加快上传完成时间,并在Part上传完成之后调用 CompleteMultipartUpload 接口将这些Part组合成一个完整的Object。

注意事项

  • 本文以华东1(杭州)外网Endpoint为例。如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的Region与Endpoint的对应关系,请参见 访问域名和数据中心

  • 本文以从环境变量读取访问凭证为例。如何配置访问凭证,请参见 配置访问凭证

  • 本文以OSS域名新建OSSClient为例。如果您希望通过自定义域名、STS等方式新建OSSClient,请参见 初始化

  • 要分片上传,您必须有 oss:PutObject 权限。具体操作,请参见 为RAM用户授权自定义的权限策略

分片上传流程

分片上传(Multipart Upload)分为以下三个步骤:

  1. 初始化一个分片上传事件。

    调用bucket.init_multipart_upload方法返回OSS创建的全局唯一的uploadId。

  2. 上传分片。

    调用bucket.upload_part方法上传分片数据。

    说明
    • 对于同一个uploadId,分片号(partNumber)标识了该分片在整个文件内的相对位置。如果使用同一个分片号上传了新的数据,那么OSS上这个分片已有的数据将会被覆盖。

    • OSS将收到的分片数据的MD5值放在ETag头内返回给用户。

    • OSS计算上传数据的MD5值,并与SDK计算的MD5值比较,如果不一致则返回InvalidDigest错误码。

  3. 完成分片上传。

    所有分片上传完成后,调用bucket.complete_multipart_upload方法将所有分片合并成完整的Object。

分片上传完整示例

所有分片上传完成后,您可以通过以下两种方式将所有分片合并成完整的Object:

  • 通过Body传递分片信息的方式将所有分片合并成完整的Object

    # -*- coding: utf-8 -*-
    import os
    from oss2 import SizedFileAdapter, determine_part_size
    from oss2.models import PartInfo
    import oss2
    from oss2.credentials import EnvironmentVariableCredentialsProvider
    # 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
    auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
    # Endpoint以华东1(杭州)为例,其他Region请按实际情况填写。
    # 填写Bucket名称,例如examplebucket。
    bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
    # 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
    key = 'exampledir/exampleobject.txt'
    # 填写本地文件的完整路径,例如D:\\localpath\\examplefile.txt。
    filename = 'D:\\localpath\\examplefile.txt'
    total_size = os.path.getsize(filename)
    # determine_part_size方法用于确定分片大小。
    part_size = determine_part_size(total_size, preferred_size=100 * 1024)
    # 初始化分片。
    # 如需在初始化分片时设置文件存储类型,请在init_multipart_upload中设置相关Headers,参考如下。
    # headers = dict()
    # 指定该Object的网页缓存行为。
    # headers['Cache-Control'] = 'no-cache'
    # 指定该Object被下载时的名称。
    # headers['Content-Disposition'] = 'oss_MultipartUpload.txt'
    # 指定该Object的内容编码格式。
    # headers['Content-Encoding'] = 'utf-8'
    # 指定过期时间,单位为毫秒。
    # headers['Expires'] = '1000'
    # 指定初始化分片上传时是否覆盖同名Object。此处设置为true,表示禁止覆盖同名Object。
    # headers['x-oss-forbid-overwrite'] = 'true'
    # 指定上传该Object的每个Part时使用的服务器端加密方式。
    # headers[OSS_SERVER_SIDE_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS
    # 指定Object的加密算法。如果未指定此选项,表明Object使用AES256加密算法。
    # headers[OSS_SERVER_SIDE_DATA_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS
    # 表示KMS托管的用户主密钥。
    # headers[OSS_SERVER_SIDE_ENCRYPTION_KEY_ID] = '9468da86-3509-4f8d-a61e-6eab1eac****'
    # 指定Object的存储类型。
    # headers['x-oss-storage-class'] = oss2.BUCKET_STORAGE_CLASS_STANDARD
    # 指定Object的对象标签,可同时设置多个标签。
    # headers[OSS_OBJECT_TAGGING] = 'k1=v1&k2=v2&k3=v3'
    # upload_id = bucket.init_multipart_upload(key, headers=headers).upload_id
    upload_id = bucket.init_multipart_upload(key).upload_id
    # 根据upload_id执行取消分片上传事件或者列举已上传分片的操作。
    # 如果您需要根据您需要uploadId执行取消分片上传事件的操作,您需要在调用InitiateMultipartUpload完成初始化分片之后获取uploadId。
    # 如果您需要根据您需要uploadId执行列举已上传分片的操作,您需要在调用InitiateMultipartUpload完成初始化分片之后,且在调用CompleteMultipartUpload完成分片上传之前获取uploadId。
    # print("UploadID:", upload_id)
    parts = []
    # 逐个上传分片。
    with open(filename, 'rb') as fileobj:
        part_number = 1
        offset = 0
        while offset < total_size:
            num_to_upload = min(part_size, total_size - offset)
            # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
            result = bucket.upload_part(key, upload_id, part_number,
                                        SizedFileAdapter(fileobj, num_to_upload))
            parts.append(PartInfo(part_number, result.etag))
            offset += num_to_upload
            part_number += 1
    # 完成分片上传。
    # 如需在完成分片上传时设置相关Headers,请参考如下示例代码。
    headers = dict()
    # 设置文件访问权限ACL。此处设置为OBJECT_ACL_PRIVATE,表示私有权限。
    # headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PRIVATE
    bucket.complete_multipart_upload(key, upload_id, parts, headers=headers)
    # bucket.complete_multipart_upload(key, upload_id, parts)
    重要

    网络情况较好时,建议增大分片大小。反之,减小分片大小。

  • 通过服务端List分片数据的方式合并成完整的Object

    说明

    如果您希望使用该方式合并成完整Object,请确保已通过以下示例代码中指定的upload_id上传了多个分片。

    # -*- coding: utf-8 -*-
    import oss2
    from oss2.credentials import EnvironmentVariableCredentialsProvider
    # 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
    auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
    # Endpoint以华东1(杭州)为例,其他Region请按实际情况填写。
    # 填写Bucket名称,例如examplebucket。
    bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
    # 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
    key = 'exampledir/exampleobject.txt'
    # 填写本地文件的完整路径,例如D:\\localpath\\examplefile.txt。
    filename = 'D:\\localpath\\examplefile.txt'
    # 填写upload_id。您需要在调用InitiateMultipartUpload完成初始化分片之后,且在调用CompleteMultipartUpload完成分片上传之前获取upload_id。
    upload_id = '0004B9894A22E5B1888A1E29F823****'
    # 完成分片上传。
    # 如需在完成分片上传时设置文件访问权限ACL,请在complete_multipart_upload函数中设置相关headers,参考如下。
    headers = dict()
    # headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PRIVATE
    # 如果指定了x-oss-complete-all:yes,则OSS会列举当前uploadId已上传的所有Part,然后按照PartNumber的序号排序并执行CompleteMultipartUpload操作。
    # 如果指定了x-oss-complete-all:yes,则不允许继续指定Body,否则报错。
    headers["x-oss-complete-all"] = 'yes'
    bucket.complete_multipart_upload(key, upload_id, None, headers=headers)

取消分片上传事件

您可以调用bucket.abort_multipart_upload方法来取消分片上传事件。当一个分片上传事件被取消后,无法再使用此uploadId做任何操作,已经上传的分片数据会被删除。

以下代码用于取消分片上传事件。

# -*- coding: utf-8 -*-
import os
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。
# 填写Bucket名称,例如examplebucket。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
# 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
key = 'exampledir/exampleobject.txt'
# 填写upload_id。upload_id来源于调用InitiateMultipartUpload完成初始化分片之后的返回结果。
upload_id = 'yourUploadId'
# 取消指定upload_id的分片上传事件,已上传的分片会被删除。
bucket.abort_multipart_upload(key, upload_id)

列举已上传的分片信息

以下代码用于列举已上传的分片信息:

# -*- coding: utf-8 -*-
import os
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# Endpoint以杭州为例,其它Region请按实际情况填写。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'yourBucketName')
# 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
key = 'exampledir/exampleobject.txt'
# 填写upload_id。upload_id来源于调用InitiateMultipartUpload完成初始化分片之后,且在调用CompleteMultipartUpload完成分片上传之前的返回结果。
upload_id = 'yourUploadId'
# 列举指定upload_id对应的已上传分片信息。
for part_info in oss2.PartIterator(bucket, key, upload_id):
    print('part_number:', part_info.part_number)
    print('etag:', part_info.etag)
    print('size:', part_info.size)

列举分片上传事件

  • 列举指定Object的分片上传事件

    以下代码用于列举指定Object的分片上传事件:

    # -*- coding: utf-8 -*-
    import os
    import oss2
    from oss2.credentials import EnvironmentVariableCredentialsProvider
    # 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
    auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
    # Endpoint以杭州为例,其它Region请按实际情况填写。
    # 填写Bucket名称,例如examplebucket。
    bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
    # 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
    key = 'exampledir/exampleobject.txt'
    # 列举Object的所有分片上传事件。对于同一个Object,每次调用init_multipart_upload均会返回不同的upload_id。
    # 一个upload_id对应一个分片上传事件。
    for upload_info in oss2.ObjectUploadIterator(bucket, key):
        print('key:', upload_info.key)
        print('upload_id:', upload_info.upload_id)
  • 列举存储空间下的所有分片事件

    以下代码用于列举存储空间下的所有分片上传事件:

    # -*- coding: utf-8 -*-
    import os
    import oss2
    from oss2.credentials import EnvironmentVariableCredentialsProvider
    # 阿里云账号AccessKey拥有所有API的访问权限,风险很高阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
    auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
    # Endpoint以杭州为例,其它Region请按实际情况填写。
    # 填写Bucket名称,例如examplebucket。
    bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
    # 列举存储空间下的所有分片上传事件。
    for upload_info in oss2.MultipartUploadIterator(bucket):
        print('key:', upload_info.key)
        print('upload_id:', upload_info.upload_id)
  • 列举存储空间下指定前缀Object的分片上传事件

    以下代码用于列举存储空间下指定前缀Object的分片上传事件:

    # -*- coding: utf-8 -*-
    import os
    import oss2
    from oss2.credentials import EnvironmentVariableCredentialsProvider
    # 阿里云账号AccessKey拥有所有API的访问权限,风险很高阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
    auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
    # Endpoint以杭州为例,其它Region请按实际情况填写。
    # 填写Bucket名称,例如examplebucket。
    bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
    # 列举存储空间下以'test'为前缀的Object的分片上传事件。
    for upload_info in oss2.MultipartUploadIterator(bucket, prefix='test'):
        print('key:', upload_info.key)
        print('upload_id:', upload_info.upload_id)

常见问题

如何删除碎片?

您可以通过以下两种方式删除碎片:

  • 自动删除

    通过生命周期规则在指定时间内自动删除碎片。具体步骤,请参见 清理过期碎片

  • 手动删除

    调用 AbortMultipartUpload 接口取消MultipartUpload事件并删除对应的碎片。更多信息,请参见 AbortMultipartUpload

相关文档

  • 分片上传的完整实现涉及三个API接口,详情如下:

  • 关于取消分片上传事件的API接口说明,请参见 AbortMultipartUpload

  • 关于列举已上传分片的API接口说明,请参见 ListParts

  • 关于列举所有执行中的分片上传事件(即已完成初始化但未完成(Complete)或者未中止(Abort)的分片上传事件)的API接口说明,请参见 ListMultipartUploads