Azure 数据资源管理器支持使用 更改源 Azure Cosmos DB for NoSql 引入数据 。 Cosmos DB 更改源数据连接是一个引入管道,用于侦听 Cosmos DB 更改源并将数据引入到数据资源管理器表中。 更改源侦听新文档和更新的文档,但不记录删除。 有关 Azure 数据资源管理器中数据引入的常规信息,请参阅 Azure 数据资源管理器数据引入概述

每个数据连接都侦听特定的 Cosmos DB 容器,并将数据引入到指定的表中(多个连接可以在一个表中引入数据)。 该引入方法支持流式引入(如果已启用)和批量引入。

本文介绍如何设置 Cosmos DB 更改源数据连接,以便使用系统托管标识将数据引入到 Azure 数据资源管理器中。 在开始之前,请查看 注意事项

使用以下步骤设置连接器:

步骤 1: 选择 Azure 数据资源管理器表并配置其表映射

步骤 2: 创建 Cosmos DB 数据连接

步骤 3: 测试数据连接

  • Azure 订阅。 创建 免费 Azure 帐户
  • Azure 数据资源管理器群集和数据库。 创建群集和数据库
  • Cosmos DB for NoSQL 帐户 中的一个容器。
  • 如果 Cosmos DB 帐户阻止网络访问(例如使用 专用终结点 ),则必须为 Cosmos DB 帐户 创建托管专用终结点 。 这是群集调用更改源 API 时所必需的。
  • 步骤 1:选择 Azure 数据资源管理器表并配置其表映射

    在创建数据连接之前,请创建一个表,你将在其中存储引入的数据并应用与源 Cosmos DB 容器中的架构匹配的映射。 如果你的方案需要的不仅仅是简单的字段映射,则可以使用 更新策略来转换和映射从更改源中引入的数据

    下面显示了 Cosmos DB 容器中某个项的示例架构:

    "id": "17313a67-362b-494f-b948-e2a8e95e237e", "name": "Cousteau", "_rid": "pL0MAJ0Plo0CAAAAAAAAAA==", "_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/", "_etag": "\"000037fc-0000-0700-0000-626a44110000\"", "_attachments": "attachments/", "_ts": 1651131409

    使用以下步骤创建表并应用表映射:

  • 在 Azure 数据资源管理器 Web UI 的左侧导航菜单中选择“查询”,然后选择要在其中创建表的数据库。

  • 运行以下命令来创建一个名为 TestTable 的表。

    .create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
    
  • 运行以下命令来创建表映射。

    该命令将 Cosmos DB JSON 文档中的自定义属性映射到 TestTable 表中的列,如下所示:

    Cosmos DB 属性 {"column":"Id","path":"$.id"}, {"column":"Name","path":"$.name"}, {"column":"_ts","path":"$._ts"}, {"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}

    使用更新策略转换和映射数据

    如果你的方案需要的不仅仅是简单的字段映射,则可以使用更新策略来转换和映射从更改源中引入的数据。

    更新策略是在数据引入到表时转换数据的一种方式。 它们以 Kusto 查询语言编写,在引入管道上运行。 它们可用于转换从 Cosmos DB 更改源中引入的数据,例如在以下场景中:

  • 你的文档包含数组,如果使用 mv-expand 运算符将它们转换为多行,则这些数组将更易于查询。
  • 你想要筛选出文档。 例如,可以使用 where 运算符按类型筛选出文档。
  • 你的复杂逻辑无法在表映射中表示。
  • 有关如何创建和管理更新策略的信息,请参阅更新策略概述

    步骤 2:创建 Cosmos DB 数据连接

    可使用以下方法创建数据连接器:

    Azure 门户 ARM 模板
  • (可选)在“高级设置”部分下执行以下操作:

  • 指定“事件检索开始日期”。 这是连接器开始引入数据的时间。 如果未指定时间,连接器将从你创建数据连接的时间开始引入数据。 建议的日期格式是 ISO 8601 UTC 标准,按以下方式指定:yyyy-MM-ddTHH:mm:ss.fffffffZ

  • 选择“用户分配”,然后选择标识。 默认情况下,连接使用系统分配的托管标识。 如有必要,可以使用用户分配的标识。

  • 授予数据连接权限以访问 Cosmos DB 帐户。 通过向 Cosmos DB 提供数据连接访问权限,它就可以从数据库中访问和检索数据。 你将需要群集的主体 ID,该 ID 可在 Azure 门户中找到。 有关详细信息,请参阅为群集配置托管标识

  • 以下步骤将这些角色分配给主体 ID:
    • Cosmos DB 内置数据读取者
    • 不能使用 Azure 门户的“角色分配”功能分配“Cosmos DB 内置数据读取者”角色。
    • Cosmos DB 帐户读者角色
    • 使用 Azure CLI 授予访问权限:运行以下 CLI 命令(请使用下表中的信息将占位符替换为适当的值):

      az cosmosdb sql role assignment create --account-name <CosmosDbAccountName> --resource-group <CosmosDbResourceGroup> --role-definition-id 00000000-0000-0000-0000-000000000001 --principal-id <ClusterPrincipalId> --scope "/"
      az role assignment create --role fbdf93bf-df7d-467e-a4d2-9458aa1360c8 --assignee <ClusterPrincipalId> --scope <CosmosDBAccountResourceId>
      
    • 使用 ARM 模板授予访问权限:在 Cosmos DB 帐户资源组中部署以下模板:

      "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", "contentVersion": "1.0.0.0", "parameters": { "clusterPrincipalId": { "type": "string", "metadata": { "description": "The principle ID of your cluster." } "cosmosDbAccount": { "type": "string", "metadata": { "description": "The name of your Cosmos DB account." } "cosmosDbAccountResourceId": { "type": "string", "metadata": { "description": "The resource ID of your Cosmos DB account." } "variables": { "cosmosDataReader": "00000000-0000-0000-0000-000000000001", "dataRoleDefinitionId": "[format('/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.DocumentDB/databaseAccounts/{2}/sqlRoleDefinitions/{3}', subscription().subscriptionId, resourceGroup().name, parameters('cosmosDbAccount'), variables('cosmosDataReader'))]", "roleAssignmentId": "[guid(parameters('cosmosDbAccountResourceId'), parameters('clusterPrincipalId'))]", "rbacRoleDefinitionId": "[format('/subscriptions/{0}/providers/Microsoft.Authorization/roleDefinitions/{1}', subscription().subscriptionId, 'fbdf93bf-df7d-467e-a4d2-9458aa1360c8')]" "resources": [ "type": "Microsoft.DocumentDB/databaseAccounts/sqlRoleAssignments", "apiVersion": "2022-08-15", "name": "[concat(parameters('cosmosDbAccount'), '/', guid(parameters('clusterPrincipalId'), parameters('cosmosDbAccount')))]", "properties": { "principalId": "[parameters('clusterPrincipalId')]", "roleDefinitionId": "[variables('dataRoleDefinitionId')]", "scope": "[resourceId('Microsoft.DocumentDB/databaseAccounts', parameters('cosmosDbAccount'))]" "type": "Microsoft.Authorization/roleAssignments", "apiVersion": "2022-04-01", "name": "[variables('roleAssignmentId')]", "scope": "[format('Microsoft.DocumentDb/databaseAccounts/{0}', parameters('cosmosDbAccount'))]", "properties": { "description": "Giving RBAC reader on Cosmos DB", "principalId": "[parameters('clusterPrincipalId')]", "principalType": "ServicePrincipal", "roleDefinitionId": "[variables('rbacRoleDefinitionId')]"
    • 部署以下 ARM 模板以创建 Cosmos DB 数据连接。 将占位符替换为适当的值。

      "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", "contentVersion": "1.0.0.0", "parameters": { "kustoClusterName": { "type": "string", "metadata": { "description": "Kusto Cluster name" } "kustoDbName": { "type": "string", "metadata": { "description": "Kusto Database name" } "kustoConnectionName": { "type": "string", "metadata": { "description": "Kusto Database connection name" } "kustoLocation": { "type": "string", "metadata": { "description": "Location (Azure Region) of the Kusto cluster" } "kustoTable": { "type": "string", "metadata": { "description": "Kusto Table name where to ingest data" } "kustoMappingRuleName": { "type": "string", "defaultValue": "", "metadata": { "description": "Mapping name of the Kusto Table (if omitted, default mapping is applied)" } "managedIdentityResourceId": { "type": "string", "metadata": { "description": "ARM resource ID of the managed identity (cluster resource ID for system or user identity)" } "cosmosDbAccountResourceId": { "type": "string", "metadata": { "description": "ARM resource ID of Cosoms DB account" } "cosmosDbDatabase": { "type": "string", "metadata": { "description": "Cosmos DB Database name" } "cosmosDbContainer": { "type": "string", "metadata": { "description": "Cosmos DB container name" } "retrievalStartDate": { "type": "string", "defaultValue": "", "metadata": { "description": "Date-time at which to start the data retrieval; default: 'now' if not provided. Recommended format: yyyy-MM-ddTHH:mm:ss.fffffffZ" } "variables": { }, "resources": [{ "type": "Microsoft.Kusto/Clusters/Databases/DataConnections", "apiVersion": "2022-11-11", "name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDbName'), '/', parameters('kustoConnectionName'))]", "location": "[parameters('kustoLocation')]", "kind": "CosmosDb", "properties": { "tableName": "[parameters('kustoTable')]", "mappingRuleName": "[parameters('kustoMappingRuleName')]", "managedIdentityResourceId": "[parameters('managedIdentityResourceId')]", "cosmosDbAccountResourceId": "[parameters('cosmosDbAccountResourceId')]", "cosmosDbDatabase": "[parameters('cosmosDbDatabase')]", "cosmosDbContainer": "[parameters('cosmosDbContainer')]", "retrievalStartDate": "[parameters('retrievalStartDate')]"

      Azure 数据资源管理器具有用于数据引入的聚合(批处理)策略,旨在优化引入过程。 默认批处理策略配置为在批满足以下条件之一时封装该批:最大延迟时间为 5 分钟、总大小为 1 GB 或 1000 个 blob。 因此,你可能会遇到延迟。 有关详细信息,请参阅批处理策略。 若要降低延迟,请将表配置为支持流式处理。 请参阅流式处理策略

      以下注意事项适用于 Cosmos DB 更改源:

    • 更改源不会公开“删除”事件。

      Cosmos DB 更改源仅包括新文档和更新的文档。 如果需要了解有关已删除的文档的信息,可以配置源以使用软标记将 Cosmos DB 文档标记为已删除。 这会添加一个属性来更新用于指示文档是否已被删除的事件。 然后,可以在查询中使用 where 运算符来筛选出它们。

      例如,如果将 deleted 属性映射到名为“IsDeleted”的表列,则可以使用以下查询筛选出已删除的文档:

      TestTable
      | where not(IsDeleted)
      
    • 更改源仅公开文档的最新更新。

      若要了解第二个注意事项的影响,请查看以下方案:

      Cosmos DB 容器包含文档 A 和 B。下表显示了对名为 foo 的属性的更改:

      文档 ID 属性 foo 文档时间戳 (_ts)

      将 API 结果与 Cosmos DB 文档中所做的一系列更改进行比较时,你会发现它们不匹配。 文档 A 的更新事件(已在更改表中的时间戳 40 所在一行突出显示)未显示在 API 调用结果中。

      为了了解未显示该事件的原因,我们将检查 API 调用在时间戳 35 和 55 之间对文档 A 所做的更改。 在这两次调用之间,文档 A 更改了两次,如下所示:

      文档 ID 属性 foo 文档时间戳 (_ts)

      在时间戳 55 时进行 API 调用的时候,更改源 API 返回文档的最新版本。 在本例中,最新版本的文档 A 是在时间戳 50 时的更新,即属性 foo 从“粉红色”到“胭脂红色”的更新。

      由于这种情况,数据连接器可能会错过一些中间文档更改。 例如,如果数据连接服务关闭几分钟,或者文档更改频率高于 API 轮询频率,则可能会错过某些事件。 但是,每个文档的最新状态均会被捕获。

      使用 Cosmos DB 数据连接对 Cosmos DB 容器的请求单位 (RU) 使用量影响有多大?

      连接器在容器的每个物理分区上调用 Cosmos DB 更改源 API,每秒最多调用一次。 以下成本与这些调用相关:

  •