Azure IoT 操作预览版(由 Azure Arc 启用)当前处于预览状态。
不应在生产环境中使用此预览版软件。
有关 beta 版本、预览版或尚未正式发布的版本的 Azure 功能所适用的法律条款,请参阅
Microsoft Azure 预览版的补充使用条款
。
Kafka 连接器将消息从 Azure IoT MQ 的 MQTT 代理推送到 Kafka 终结点,并以类似的方式在另一端拉取消息。 由于
Azure 事件中心支持 Kafka API
,连接器可直接与事件中心一起使用。
默认情况下,连接器未随 Azure IoT MQ 一起安装。 必须使用指定的主题映射和身份验证凭据显式启用它。 按照以下步骤通过 Kafka 终结点在 IoT MQ 和 Azure 事件中心启用双向通信。
创建事件中心命名空间
。
为每个 Kafka 主题
创建事件中心
。
授予连接器对事件中心命名空间的访问权限
授予 IoT MQ Arc 扩展对事件中心命名空间的访问权限是建立从 IoT MQ 的 Kakfa 连接器到事件中心的安全连接的最便捷方法。
将以下 Bicep 模板保存到文件,并在为环境设置有效参数后将其应用于 Azure CLI:
Bicep 模板假定 Arc 连接群集和事件中心命名空间位于同一资源组中,如果环境不同,请调整模板。
@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'
resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
name: clusterName
resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
name: mqExtensionName
scope: connectedCluster
resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
name: eventHubNamespaceName
// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
# Set the required environment variables
# Resource group for resources
RESOURCE_GROUP=xxx
# Bicep template files name
TEMPLATE_FILE_NAME=xxx
# MQ Arc extension name
MQ_EXTENSION_NAME=xxx
# Arc connected cluster name
CLUSTER_NAME=xxx
# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx
az deployment group create \
--name assign-RBAC-roles \
--resource-group $RESOURCE_GROUP \
--template-file $TEMPLATE_FILE_NAME \
--parameters mqExtensionName=$MQ_EXTENSION_NAME \
--parameters clusterName=$CLUSTER_NAME \
--parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE
KafkaConnector
KafkaConnector 自定义资源 (CR) 允许配置可以与 Kafka 主机和事件中心通信的 Kafka 连接器。 Kafka 连接器可以使用事件中心作为 Kafka 兼容的终结点在 MQTT 主题和 Kafka 主题之间传输数据。
以下示例演示使用 IoT MQ 的 Azure 标识连接到事件中心终结点的 KafkaConnector CR,它假定使用快速入门安装了其他 MQ 资源:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
name: my-eh-connector
namespace: azure-iot-operations # same as one used for other MQ resources
spec:
image:
pullPolicy: IfNotPresent
repository: mcr.microsoft.com/azureiotoperations/kafka
tag: 0.1.0-preview
instances: 2
clientIdPrefix: my-prefix
kafkaConnection:
# Port 9093 is Event Hubs' Kakfa endpoint
# Plug in your Event Hubs namespace name
endpoint: <NAMESPACE>.servicebus.windows.net:9093
tlsEnabled: true
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
# plugin in your Event Hubs namespace name
audience: "https://<EVENTHUBS_NAMESPACE>.servicebus.windows.net"
localBrokerConnection:
endpoint: "aio-mq-dmqtt-frontend:8883"
tlsEnabled: true
trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
authentication:
kubernetes: {}
下表描述了 KafkaConnector CR 中的字段:
caConfigMap
配置映射的名称,其中包含用于验证服务器的标识的 CA 证书。 事件中心通信不需要此字段,因为事件中心默认使用受信任的已知 CA。 但是,如果要使用自定义 CA 证书,可以使用此字段。
指定受信任的 CA 时,请创建一个包含 PEM 格式的 CA 公共部分的 ConfigMap,并在 caConfigMap
属性中指定名称。
kubectl create configmap ca-pem --from-file path/to/ca.pem
身份验证字段支持不同类型的身份验证方法,例如 SASL、X509 或托管标识。
SASL 身份验证的配置。 指定 saslType
,它可以是纯文本、scram-sha-256 或 scram-sha-512,以及指定 secretName
以引用包含用户名和密码的 Kubernetes 机密。
是的,如果使用 SASL 身份验证
X509 身份验证的配置。 指定 secretName
字段。 secretName
字段是机密的名称,其中包含客户端证书和 PEM 格式的客户端密钥,存储为 TLS 机密。
是的,如果使用 X509 身份验证
systemAssignedManagedIdentity
托管标识身份验证的配置。 指定令牌请求的访问群体,该请求必须与事件中心命名空间(https://<NAMESPACE>.servicebus.windows.net
)匹配,因为连接器是 Kafka 客户端。 系统分配的托管标识会在启用后自动创建并分配给连接器。
是的,如果使用托管标识身份验证
可以使用 Azure Key Vault 管理 Azure IoT MQ 的机密,而不是 Kubernetes 机密。 若要了解详细信息,请参阅使用 Azure Key Vault 或 Kubernetes 机密管理机密。
对于事件中心,请使用纯文本 SASL 和 $ConnectionString
作为用户名,并将完整连接字符串用作密码。
kubectl create secret generic cs-secret \
--from-literal=username='$ConnectionString' \
--from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'
对于 X.509,请使用包含公共证书和私钥的 Kubernetes TLS 机密。
kubectl create secret tls my-tls-secret \
--cert=path/to/cert/file \
--key=path/to/key/file
要使用托管标识,请将其指定为身份验证下的唯一方法。 还需要将角色分配给托管标识,该标识授予从事件中心(例如 Azure 事件中心数据所有者或 Azure 事件中心数据发送方/接收方)发送和接收消息的权限。 要了解详情,请参阅使用 Microsoft Entra ID 对访问事件中心资源的应用程序进行身份验证。
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
audience: https://<NAMESPACE>.servicebus.windows.net
管理本地代理连接
与 MQTT 桥一样,事件中心连接器充当 IoT MQ MQTT 代理的客户端。 如果已自定义 IoT MQ MQTT 代理的侦听器端口和/或身份验证,请同时替代事件中心连接器的本地 MQTT 连接配置。 要了解详细信息,请参阅 MQTT 桥本地代理连接。
KafkaConnectorTopicMap
通过 KafkaConnectorTopicMap 自定义资源 (CR),可以定义 MQTT 主题与 Kafka 主题之间的映射,以便进行双向数据传输。 指定对 KafkaConnector CR 和路由列表的引用。 每个路由可以是 MQTT 到 Kafka 路由,也可以是 Kafka 到 MQTT 路由。 例如:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
name: my-eh-topic-map
namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
kafkaConnectorRef: my-eh-connector
compression: snappy
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
partitionStrategy: property
partitionKeyProperty: device-id
copyMqttProperties: true
routes:
# Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
- mqttToKafka:
name: "route1"
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
# Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
- kafkaToMqtt:
name: "route2"
consumerGroupId: mqConnector
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
下表描述了 KafkaConnectorTopicMap CR 中的字段:
copyMqttProperties
用于控制 MQTT 系统和用户属性是否复制到 Kafka 消息标头的布尔值。 用户属性按原样复制。 某些转换是使用系统属性完成的。 默认为 false。
routes
MQTT 主题和 Kafka 主题之间数据传输的路由列表。 每个路由都可以有 mqttToKafka
或 kafkaToMqtt
字段,具体取决于数据传输的方向。 请参阅 路由。
压缩字段可为发送到 Kafka 主题的消息启用压缩配置。 压缩有助于减少数据传输所需的网络带宽和存储空间。 但是,压缩还会给进程增加一些开销和延迟。 下表中列出了支持的压缩类型。
latencyMs
消息在发送前可以缓冲的最大时间间隔(以毫秒为单位)。 如果达到此间隔,则所有缓冲消息都会通过批处理发送,而不管它们的数量或大小。 如果未设置,则默认值为 5。
maxMessages
在发送之前可以缓冲的最大消息数。 如果达到此数量,则所有缓冲消息都会通过批处理发送,而不管它们的大小或缓冲时间。 如果未设置,则默认值为 100000。
maxBytes
在发送之前可以缓冲的最大大小(以字节为单位)。 如果达到此大小,则所有缓冲消息都会通过批处理发送,而不管它们的数量或缓冲时间。 默认值为 1000000 (1 MB)。
下面是使用批处理的示例:
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
这意味着,当缓冲区中有 100 条消息,缓冲区中有 1024 个字节时,或距离上次发送已有 1000 毫秒时(不论先达到哪个条件),将发送消息。
分区处理策略
分区处理策略是一项功能,可用于控制将消息发送到 Kafka 主题时如何将其分配给 Kafka 分区。 Kafka 分区是支持并行处理和容错的 Kafka 主题的逻辑段。 Kafka 主题中的每个消息都有一个分区和一个偏移量,用于标记消息并进行排序。
默认情况下,Kafka 连接器使用轮循机制算法将消息分配给随机分区。 但是,你可以使用不同的策略根据某些条件(例如 MQTT 主题名称或 MQTT 消息属性)将消息分配到分区。 这有助于实现更好的负载均衡、数据区域或消息排序。
这意味着具有相同设备 ID 的消息将发送到同一分区。
路由字段定义 MQTT 主题和 Kafka 主题之间数据传输的路由列表。 每个路由都可以有 mqttToKafka
或 kafkaToMqtt
字段,具体取决于数据传输的方向。
MQTT 到 Kafka
mqttToKafka
字段定义将数据从 MQTT 主题传输到 Kafka 主题的路由。
sharedSubscription
将共享订阅用于 MQTT 主题的配置。 指定 groupName
,这是一组订阅者的唯一标识符,以及指定 groupMinimumShareNumber
,这是从主题接收消息的组中的订阅服务器数量。 例如,如果 groupName 为“group1”且 groupMinimumShareNumber 为 3,则连接器会创建三个具有相同组名称的订阅服务器,以便从主题接收消息。 此功能允许在多个订阅服务器之间分发消息,而不会丢失任何消息或创建重复消息。
使用 mqttToKafka
路由的示例:
mqttToKafka:
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
在此示例中,与 temperature-alerts/# 匹配的 MQTT 主题的消息发送到 Kafka 主题 receiving-event-hub(其中 QoS 等效于 1)和共享订阅组“group1”(其中共享数量为 3)。
Kafka 到 MQTT
kafkaToMqtt
字段定义将数据从 Kafka 主题传输到 MQTT 主题的路由。
发送到 MQTT 主题的消息的服务质量 (QoS) 级别。 可能的值为 0 或 1(默认值)。 目前不支持 QoS 2。 如果 QoS 设置为 1,连接器会将消息发布到 MQTT 主题,然后在将消息提交回 Kafka 之前等待确认。 对于 QoS 0,连接器会立即提交回 Kafka,无需 MQTT 确认。
使用 kafkaToMqtt
路由的示例:
kafkaToMqtt:
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
在此示例中,来自 Kafka 主题 sending-event-hub* 的消息将发布到 MQTT 主题 heater-commands,且 QoS 级别为 0。
事件中心名称必须与 Kafka 主题匹配
每个单独的事件中心(而非命名空间)都必须与路由中指定的预期 Kafka 主题的名称完全相同。 此外,如果连接字符串限定为一个事件中心,则连接字符串 EntityPath
必须相匹配。 此要求是因为事件中心命名空间类似于 Kafka 群集,事件中心名称类似于 Kafka 主题,因此 Kafka 主题名称必须与事件中心名称匹配。
Kafka 使用者组偏移。
如果连接器断开或被删除,并使用同一 Kafka 使用者组 ID 重新安装,则使用者组偏移量(Kafka 使用者读取消息的最后一个位置)将存储在 Azure 事件中心。 要了解详细信息,请参阅事件中心使用者组与Kafka 使用者组。
MQTT 版本
此连接器仅使用 MQTT v5。
使用 Azure IoT MQ 发布和订阅 MQTT 消息