每个数据连接都侦听特定的 Cosmos DB 容器,并将数据引入到指定的表中(多个连接可以在一个表中引入数据)。 该引入方法支持流式引入(如果已启用)和批量引入。
在创建数据连接之前,请创建一个表,你将在其中存储引入的数据并应用与源 Cosmos DB 容器中的架构匹配的映射。 如果你的方案需要的不仅仅是简单的字段映射,则可以使用
更新策略来转换和映射从更改源中引入的数据
。
"id": "17313a67-362b-494f-b948-e2a8e95e237e",
"name": "Cousteau",
"_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
"_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
"_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
"_attachments": "attachments/",
"_ts": 1651131409
运行以下命令来创建一个名为 TestTable 的表。
.create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
运行以下命令来创建表映射。
该命令将 Cosmos DB JSON 文档中的自定义属性映射到 TestTable 表中的列,如下所示:
Cosmos DB 属性
{"column":"Id","path":"$.id"},
{"column":"Name","path":"$.name"},
{"column":"_ts","path":"$._ts"},
{"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}
如果你的方案需要的不仅仅是简单的字段映射,则可以使用更新策略来转换和映射从更改源中引入的数据。
更新策略是在数据引入到表时转换数据的一种方式。 它们以 Kusto 查询语言编写,在引入管道上运行。 它们可用于转换从 Cosmos DB 更改源中引入的数据,例如在以下场景中:
你的文档包含数组,如果使用 mv-expand
运算符将它们转换为多行,则这些数组将更易于查询。
你想要筛选出文档。 例如,可以使用 where
运算符按类型筛选出文档。
你的复杂逻辑无法在表映射中表示。
有关如何创建和管理更新策略的信息,请参阅更新策略概述。
步骤 2:创建 Cosmos DB 数据连接
可使用以下方法创建数据连接器:
Azure 门户
ARM 模板
(可选)在“高级设置”部分下执行以下操作:
指定“事件检索开始日期”。 这是连接器开始引入数据的时间。 如果未指定时间,连接器将从你创建数据连接的时间开始引入数据。 建议的日期格式是 ISO 8601 UTC 标准,按以下方式指定:yyyy-MM-ddTHH:mm:ss.fffffffZ
。
选择“用户分配”,然后选择标识。 默认情况下,连接使用系统分配的托管标识。 如有必要,可以使用用户分配的标识。
授予数据连接权限以访问 Cosmos DB 帐户。 通过向 Cosmos DB 提供数据连接访问权限,它就可以从数据库中访问和检索数据。 你将需要群集的主体 ID,该 ID 可在 Azure 门户中找到。 有关详细信息,请参阅为群集配置托管标识。
以下步骤将这些角色分配给主体 ID:
- Cosmos DB 内置数据读取者
- 不能使用 Azure 门户的“角色分配”功能分配“Cosmos DB 内置数据读取者”角色。
- Cosmos DB 帐户读者角色
使用 Azure CLI 授予访问权限:运行以下 CLI 命令(请使用下表中的信息将占位符替换为适当的值):
az cosmosdb sql role assignment create --account-name <CosmosDbAccountName> --resource-group <CosmosDbResourceGroup> --role-definition-id 00000000-0000-0000-0000-000000000001 --principal-id <ClusterPrincipalId> --scope "/"
az role assignment create --role fbdf93bf-df7d-467e-a4d2-9458aa1360c8 --assignee <ClusterPrincipalId> --scope <CosmosDBAccountResourceId>
使用 ARM 模板授予访问权限:在 Cosmos DB 帐户资源组中部署以下模板:
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"clusterPrincipalId": {
"type": "string",
"metadata": { "description": "The principle ID of your cluster." }
"cosmosDbAccount": {
"type": "string",
"metadata": { "description": "The name of your Cosmos DB account." }
"cosmosDbAccountResourceId": {
"type": "string",
"metadata": { "description": "The resource ID of your Cosmos DB account." }
"variables": {
"cosmosDataReader": "00000000-0000-0000-0000-000000000001",
"dataRoleDefinitionId": "[format('/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.DocumentDB/databaseAccounts/{2}/sqlRoleDefinitions/{3}', subscription().subscriptionId, resourceGroup().name, parameters('cosmosDbAccount'), variables('cosmosDataReader'))]",
"roleAssignmentId": "[guid(parameters('cosmosDbAccountResourceId'), parameters('clusterPrincipalId'))]",
"rbacRoleDefinitionId": "[format('/subscriptions/{0}/providers/Microsoft.Authorization/roleDefinitions/{1}', subscription().subscriptionId, 'fbdf93bf-df7d-467e-a4d2-9458aa1360c8')]"
"resources": [
"type": "Microsoft.DocumentDB/databaseAccounts/sqlRoleAssignments",
"apiVersion": "2022-08-15",
"name": "[concat(parameters('cosmosDbAccount'), '/', guid(parameters('clusterPrincipalId'), parameters('cosmosDbAccount')))]",
"properties": {
"principalId": "[parameters('clusterPrincipalId')]",
"roleDefinitionId": "[variables('dataRoleDefinitionId')]",
"scope": "[resourceId('Microsoft.DocumentDB/databaseAccounts', parameters('cosmosDbAccount'))]"
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "2022-04-01",
"name": "[variables('roleAssignmentId')]",
"scope": "[format('Microsoft.DocumentDb/databaseAccounts/{0}', parameters('cosmosDbAccount'))]",
"properties": {
"description": "Giving RBAC reader on Cosmos DB",
"principalId": "[parameters('clusterPrincipalId')]",
"principalType": "ServicePrincipal",
"roleDefinitionId": "[variables('rbacRoleDefinitionId')]"
部署以下 ARM 模板以创建 Cosmos DB 数据连接。 将占位符替换为适当的值。
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"kustoClusterName": {
"type": "string",
"metadata": { "description": "Kusto Cluster name" }
"kustoDbName": {
"type": "string",
"metadata": { "description": "Kusto Database name" }
"kustoConnectionName": {
"type": "string",
"metadata": { "description": "Kusto Database connection name" }
"kustoLocation": {
"type": "string",
"metadata": { "description": "Location (Azure Region) of the Kusto cluster" }
"kustoTable": {
"type": "string",
"metadata": { "description": "Kusto Table name where to ingest data" }
"kustoMappingRuleName": {
"type": "string",
"defaultValue": "",
"metadata": { "description": "Mapping name of the Kusto Table (if omitted, default mapping is applied)" }
"managedIdentityResourceId": {
"type": "string",
"metadata": { "description": "ARM resource ID of the managed identity (cluster resource ID for system or user identity)" }
"cosmosDbAccountResourceId": {
"type": "string",
"metadata": { "description": "ARM resource ID of Cosoms DB account" }
"cosmosDbDatabase": {
"type": "string",
"metadata": { "description": "Cosmos DB Database name" }
"cosmosDbContainer": {
"type": "string",
"metadata": { "description": "Cosmos DB container name" }
"retrievalStartDate": {
"type": "string",
"defaultValue": "",
"metadata": { "description": "Date-time at which to start the data retrieval; default: 'now' if not provided. Recommended format: yyyy-MM-ddTHH:mm:ss.fffffffZ" }
"variables": { },
"resources": [{
"type": "Microsoft.Kusto/Clusters/Databases/DataConnections",
"apiVersion": "2022-11-11",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDbName'), '/', parameters('kustoConnectionName'))]",
"location": "[parameters('kustoLocation')]",
"kind": "CosmosDb",
"properties": {
"tableName": "[parameters('kustoTable')]",
"mappingRuleName": "[parameters('kustoMappingRuleName')]",
"managedIdentityResourceId": "[parameters('managedIdentityResourceId')]",
"cosmosDbAccountResourceId": "[parameters('cosmosDbAccountResourceId')]",
"cosmosDbDatabase": "[parameters('cosmosDbDatabase')]",
"cosmosDbContainer": "[parameters('cosmosDbContainer')]",
"retrievalStartDate": "[parameters('retrievalStartDate')]"
Azure 数据资源管理器具有用于数据引入的聚合(批处理)策略,旨在优化引入过程。 默认批处理策略配置为在批满足以下条件之一时封装该批:最大延迟时间为 5 分钟、总大小为 1 GB 或 1000 个 blob。 因此,你可能会遇到延迟。 有关详细信息,请参阅批处理策略。 若要降低延迟,请将表配置为支持流式处理。 请参阅流式处理策略。
以下注意事项适用于 Cosmos DB 更改源:
更改源不会公开“删除”事件。
Cosmos DB 更改源仅包括新文档和更新的文档。 如果需要了解有关已删除的文档的信息,可以配置源以使用软标记将 Cosmos DB 文档标记为已删除。 这会添加一个属性来更新用于指示文档是否已被删除的事件。 然后,可以在查询中使用 where
运算符来筛选出它们。
例如,如果将 deleted 属性映射到名为“IsDeleted”的表列,则可以使用以下查询筛选出已删除的文档:
TestTable
| where not(IsDeleted)
更改源仅公开文档的最新更新。
若要了解第二个注意事项的影响,请查看以下方案:
Cosmos DB 容器包含文档 A 和 B。下表显示了对名为 foo 的属性的更改:
文档 ID
属性 foo
文档时间戳 (_ts)
将 API 结果与 Cosmos DB 文档中所做的一系列更改进行比较时,你会发现它们不匹配。 文档 A 的更新事件(已在更改表中的时间戳 40 所在一行突出显示)未显示在 API 调用结果中。
为了了解未显示该事件的原因,我们将检查 API 调用在时间戳 35 和 55 之间对文档 A 所做的更改。 在这两次调用之间,文档 A 更改了两次,如下所示:
文档 ID
属性 foo
文档时间戳 (_ts)
在时间戳 55 时进行 API 调用的时候,更改源 API 返回文档的最新版本。 在本例中,最新版本的文档 A 是在时间戳 50 时的更新,即属性 foo 从“粉红色”到“胭脂红色”的更新。
由于这种情况,数据连接器可能会错过一些中间文档更改。 例如,如果数据连接服务关闭几分钟,或者文档更改频率高于 API 轮询频率,则可能会错过某些事件。 但是,每个文档的最新状态均会被捕获。
使用 Cosmos DB 数据连接对 Cosmos DB 容器的请求单位 (RU) 使用量影响有多大?
连接器在容器的每个物理分区上调用 Cosmos DB 更改源 API,每秒最多调用一次。 以下成本与这些调用相关: