问题描述

多设备并发往 Azure Storage Blob 的 Container 存数据是否可以?

问题解答

可以! Azure Storage 是支持的并发存储数据的,Blob 可以使用乐观并发或悲观并发模型的,具体实现可以参考文档:​ ​https://docs.microsoft.com/zh-cn/azure/storage/blobs/concurrency-manage?tabs=dotnet#pessimistic-concurrency-for-blobs ​

当前,非常多的新应用程序通常允许多名用户同时查看和更新数据。尤其是在多名用户可以更新相同数据的情况下。

开发人员通常考虑下面三个主要数据并发策略:

  • 乐观并发 :执行更新的应用程序将在其更新过程中确定数据是否自该应用程序上次读取此数据以来已发生更改。 例如,如果两名查看 wiki 页面的用户对该页面进行更新, 则 wiki 平台必须确保第二次更新不会覆盖第一次更新 。 此外还必须确保两名用户都了解其更新是否成功。 此策略最常用于 Web 应用程序中。
  • 悲观并发 :要执行更新的应用程序会 对对象上锁,以防其他用户在该锁释放前更新数据。 例如,在进行主/辅数据复制且只有主对象执行更新的情况下,该对象通常会长时间以独占的形式锁定数据,以确保其他任何对象都不能更新该数据。
  • 以最后写入者为准 :一种方法,它允许更新操作继续进行,而不需要首先确定其他应用程序是否自数据被读取以来已更新该数据。 当数据分区时,通常会使用此方法,这样就不可能有多个用户同时访问相同的数据。 该策略可能还适用于正在处理短期数据流的情况。

Azure 存储支持所有三个策略,但是它在为乐观和悲观并发提供全面支持的能力方面与众不同。

Azure 存储旨在采用强大的一致性模型,确保在服务执行插入或更新操作后,后续读取操作会返回最新更新。

乐观并发和悲观并发实现的关键是在 UploadAsync 方法中传递的 BlobUploadOptions 对象。比如,乐观并发中,通过判断 IfMatch 中记录每一次文件修改后,返回的ETag值,来验证当前的修改是否是在最新文件的基础上进行修改,避免造成对其他修改者所修改的内容进行覆盖。

// Set the If-Match condition to the original ETag.
BlobUploadOptions blobUploadOptions = new BlobUploadOptions()
{
Conditions = new BlobRequestConditions()
{
IfMatch = originalETag
}
};

悲观并发中,则是通过使用 LeaseId (租约ID)来控制当前的操作不允许其他用户操作!

BlobLeaseClient blobLeaseClient = blobClient.GetBlobLeaseClient();

// Acquire a lease on the blob.
BlobLease blobLease = await blobLeaseClient.AcquireAsync(TimeSpan.FromSeconds(15));

// Set the request condition to include the lease ID.
BlobUploadOptions blobUploadOptions = new BlobUploadOptions()
{
Conditions = new BlobRequestConditions()
{
LeaseId = blobLease.LeaseId
}
};

乐观并发代码示例:

Azure 存储会为每个已存储的对象分配一个标识符。 只要对对象执行写入操作,就会更新此标识符。

该标识符作为 HTTP GET 响应的一部分在 ETag 标头(通过 HTTP 协议定义)中返回到客户端。

执行更新的客户端可以将原始 ETag 连同条件标头一起发送,以确保只有在满足特定条件的情况下才会进行更新。

例如,如果指定了 If-Match 标头,Azure 存储会验证更新请求中指定的 ETag 的值与所更新对象的 ETag 的值是否相同。

下面的代码示例演示如何在用于检查 blob 的 ETag 值的写入请求中构造 If-Match 条件。

Azure 存储会评估 blob 的当前 ETag 是否与请求中提供的 ETag 相同,只有在两个 ETag 值匹配时才执行写入操作。

如果其他进程已在此期间更新该 blob,则 Azure 存储会返回 HTTP 412(“不满足前提条件”)状态消息。

private static async Task DemonstrateOptimisticConcurrencyBlob(BlobClient blobClient)
{
Console.WriteLine("Demonstrate optimistic concurrency");

BlobContainerClient containerClient = blobClient.GetParentBlobContainerClient();

try
{
// Create the container if it does not exist.
await containerClient.CreateIfNotExistsAsync();

// Upload text to a new block blob.
string blobContents1 = "First update. Overwrite blob if it exists.";
byte[] byteArray = Encoding.ASCII.GetBytes(blobContents1);

ETag originalETag;

using (MemoryStream stream = new MemoryStream(byteArray))
{
// 从 Azure 存储检索 blob。 响应包括用于标识对象的当前版本的 HTTP ETag 标头值。
BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, overwrite: true);
originalETag = blobContentInfo.ETag;
Console.WriteLine("Blob added. Original ETag = {0}", originalETag);
}

// This code simulates an update by another client.
// No ETag was provided, so original blob is overwritten and ETag updated.
string blobContents2 = "Second update overwrites first update.";
byteArray = Encoding.ASCII.GetBytes(blobContents2);

using (MemoryStream stream = new MemoryStream(byteArray))
{
BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, overwrite: true);
Console.WriteLine("Blob updated. Updated ETag = {0}", blobContentInfo.ETag);
}

// Now try to update the blob using the original ETag value.
string blobContents3 = "Third update. If-Match condition set to original ETag.";
byteArray = Encoding.ASCII.GetBytes(blobContents3);

// Set the If-Match condition to the original ETag.

// 在更新 blob 时,应将在步骤 1 中获得的 ETag 值包括在写入请求的 If-Match 条件标头中。 Azure 存储会将请求中的 ETag 值与 blob 当前的 ETag 值进行比较。
BlobUploadOptions blobUploadOptions = new BlobUploadOptions()
{
Conditions = new BlobRequestConditions()
{
IfMatch = originalETag
}
};

using (MemoryStream stream = new MemoryStream(byteArray))
{
// This call should fail with error code 412 (Precondition Failed).
//如果 blob 当前的 ETag 值不同于请求中提供的 If-Match 条件标头中指定的 ETag 值,则 Azure 存储会返回 HTTP 状态代码412(“不满足前提条件”)。 此错误向客户端表明,另一进程在客户端首先检索 blob 后已更新该 blob。
//如果 blob 的当前 ETag 值与请求的 If-Match 条件标头中的 ETag 的版本相同,则 Azure 存储会执行请求的操作,并更新该 blob 的当前 ETag 值。
BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, blobUploadOptions);
}
}
catch (RequestFailedException e)
{
if (e.Status == (int)HttpStatusCode.PreconditionFailed)
{
Console.WriteLine(
@"Precondition failure as expected. Blob's ETag does not match ETag provided.");
}
else
{
Console.WriteLine(e.Message);
throw;
}
}
}

悲观并发代码示例:

若要锁定 Blob 以供独占使用,您可以对该 Blob 获得租约。

获取租约时,可以指定租期。 有限期租约的有效期可为 15 到 60 秒。

租约也可以是无限期的,这相当于一个排他锁。

可续订有限期租约来延长租约,也可在租约完成后将其释放。

Azure 存储在有限期租约到期时会自动释放这些租约。

下面的代码示例演示了如何获取 blob 的独占租约,通过提供租约 ID 来更新 blob 的内容,然后释放租约。

如果租约有效,但写入请求中未提供租约 ID,则写入操作会失败,并出现错误代码 412(“不满足前提条件”)。

public static async Task DemonstratePessimisticConcurrencyBlob(BlobClient blobClient)
{
Console.WriteLine("Demonstrate pessimistic concurrency");

BlobContainerClient containerClient = blobClient.GetParentBlobContainerClient();
BlobLeaseClient blobLeaseClient = blobClient.GetBlobLeaseClient();

try
{
// Create the container if it does not exist.
await containerClient.CreateIfNotExistsAsync();

// Upload text to a blob.
string blobContents1 = "First update. Overwrite blob if it exists.";
byte[] byteArray = Encoding.ASCII.GetBytes(blobContents1);
using (MemoryStream stream = new MemoryStream(byteArray))
{
BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, overwrite: true);
}

// Acquire a lease on the blob.
BlobLease blobLease = await blobLeaseClient.AcquireAsync(TimeSpan.FromSeconds(15));
Console.WriteLine("Blob lease acquired. LeaseId = {0}", blobLease.LeaseId);

// Set the request condition to include the lease ID.
BlobUploadOptions blobUploadOptions = new BlobUploadOptions()
{
Conditions = new BlobRequestConditions()
{
LeaseId = blobLease.LeaseId
}
};

// Write to the blob again, providing the lease ID on the request.
// The lease ID was provided, so this call should succeed.
string blobContents2 = "Second update. Lease ID provided on request.";
byteArray = Encoding.ASCII.GetBytes(blobContents2);

using (MemoryStream stream = new MemoryStream(byteArray))
{
BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream, blobUploadOptions);
}

// This code simulates an update by another client.
// The lease ID is not provided, so this call fails.
string blobContents3 = "Third update. No lease ID provided.";
byteArray = Encoding.ASCII.GetBytes(blobContents3);

using (MemoryStream stream = new MemoryStream(byteArray))
{
// This call should fail with error code 412 (Precondition Failed).
BlobContentInfo blobContentInfo = await blobClient.UploadAsync(stream);
}
}
catch (RequestFailedException e)
{
if (e.Status == (int)HttpStatusCode.PreconditionFailed)
{
Console.WriteLine(
@"Precondition failure as expected. The lease ID was not provided.");
}
else
{
Console.WriteLine(e.Message);
throw;
}
}
finally
{
await blobLeaseClient.ReleaseAsync();
}
}

参考资料

在 Blob 存储中管理并发 : ​ ​https://learn.microsoft.com/zh-cn/azure/storage/blobs/concurrency-manage?tabs=dotnet#pessimistic-concurrency-for-blobs​

当在复杂的环境中面临问题,格物之道需:浊而静之徐清,安以动之徐生。 云中,恰是如此!