相关文章推荐
礼貌的哑铃  ·  【异常】java: Internal ...·  3 月前    · 
多情的匕首  ·  Python ...·  1 年前    · 

Azure IoT 操作预览版(由 Azure Arc 启用)当前处于预览状态。 不应在生产环境中使用此预览版软件。

有关 beta 版本、预览版或尚未正式发布的版本的 Azure 功能所适用的法律条款,请参阅 Microsoft Azure 预览版的补充使用条款

Kafka 连接器将消息从 Azure IoT MQ 的 MQTT 代理推送到 Kafka 终结点,并以类似的方式在另一端拉取消息。 由于 Azure 事件中心支持 Kafka API ,连接器可直接与事件中心一起使用。

通过 Kafka 终结点配置事件中心连接器

默认情况下,连接器未随 Azure IoT MQ 一起安装。 必须使用指定的主题映射和身份验证凭据显式启用它。 按照以下步骤通过 Kafka 终结点在 IoT MQ 和 Azure 事件中心启用双向通信。

  • 创建事件中心命名空间

  • 为每个 Kafka 主题 创建事件中心

    授予连接器对事件中心命名空间的访问权限

    授予 IoT MQ Arc 扩展对事件中心命名空间的访问权限是建立从 IoT MQ 的 Kakfa 连接器到事件中心的安全连接的最便捷方法。

    将以下 Bicep 模板保存到文件,并在为环境设置有效参数后将其应用于 Azure CLI:

    Bicep 模板假定 Arc 连接群集和事件中心命名空间位于同一资源组中,如果环境不同,请调整模板。

    @description('Location for cloud resources')
    param mqExtensionName string = 'mq'
    param clusterName string = 'clusterName'
    param eventHubNamespaceName string = 'default'
    resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
      name: clusterName
    resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
      name: mqExtensionName
      scope: connectedCluster
    resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
      name: eventHubNamespaceName
    // Role assignment for Event Hubs Data Receiver role
    resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
      name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
      scope: ehNamespace
      properties: {
         // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
        roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
        principalId: mqExtension.identity.principalId
        principalType: 'ServicePrincipal'
    // Role assignment for Event Hubs Data Sender role
    resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
      name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
      scope: ehNamespace
      properties: {
        // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
        roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
        principalId: mqExtension.identity.principalId
        principalType: 'ServicePrincipal'
    
    # Set the required environment variables
    # Resource group for resources
    RESOURCE_GROUP=xxx
    # Bicep template files name
    TEMPLATE_FILE_NAME=xxx
    # MQ Arc extension name
    MQ_EXTENSION_NAME=xxx
    # Arc connected cluster name
    CLUSTER_NAME=xxx
    # Event Hubs namespace name
    EVENTHUB_NAMESPACE=xxx
    az deployment group create \
          --name assign-RBAC-roles \
          --resource-group $RESOURCE_GROUP \
          --template-file $TEMPLATE_FILE_NAME \
          --parameters mqExtensionName=$MQ_EXTENSION_NAME \
          --parameters clusterName=$CLUSTER_NAME \
          --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE
    

    KafkaConnector

    KafkaConnector 自定义资源 (CR) 允许配置可以与 Kafka 主机和事件中心通信的 Kafka 连接器。 Kafka 连接器可以使用事件中心作为 Kafka 兼容的终结点在 MQTT 主题和 Kafka 主题之间传输数据。

    以下示例演示使用 IoT MQ 的 Azure 标识连接到事件中心终结点的 KafkaConnector CR,它假定使用快速入门安装了其他 MQ 资源:

    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: KafkaConnector
    metadata:
      name: my-eh-connector
      namespace: azure-iot-operations # same as one used for other MQ resources
    spec:
      image:
        pullPolicy: IfNotPresent
        repository: mcr.microsoft.com/azureiotoperations/kafka
        tag: 0.1.0-preview
      instances: 2
      clientIdPrefix: my-prefix
      kafkaConnection:
        # Port 9093 is Event Hubs' Kakfa endpoint
        # Plug in your Event Hubs namespace name
        endpoint: <NAMESPACE>.servicebus.windows.net:9093
          tlsEnabled: true
        authentication:
          enabled: true
          authType:
            systemAssignedManagedIdentity:
              # plugin in your Event Hubs namespace name
              audience: "https://<EVENTHUBS_NAMESPACE>.servicebus.windows.net" 
      localBrokerConnection:
        endpoint: "aio-mq-dmqtt-frontend:8883"
          tlsEnabled: true
          trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
        authentication:
          kubernetes: {}
    

    下表描述了 KafkaConnector CR 中的字段:

    caConfigMap 配置映射的名称,其中包含用于验证服务器的标识的 CA 证书。 事件中心通信不需要此字段,因为事件中心默认使用受信任的已知 CA。 但是,如果要使用自定义 CA 证书,可以使用此字段。

    指定受信任的 CA 时,请创建一个包含 PEM 格式的 CA 公共部分的 ConfigMap,并在 caConfigMap 属性中指定名称。

    kubectl create configmap ca-pem --from-file path/to/ca.pem
    

    身份验证字段支持不同类型的身份验证方法,例如 SASL、X509 或托管标识。

    SASL 身份验证的配置。 指定 saslType,它可以是纯文本scram-sha-256scram-sha-512,以及指定 secretName 以引用包含用户名和密码的 Kubernetes 机密。 是的,如果使用 SASL 身份验证 X509 身份验证的配置。 指定 secretName 字段。 secretName 字段是机密的名称,其中包含客户端证书和 PEM 格式的客户端密钥,存储为 TLS 机密。 是的,如果使用 X509 身份验证 systemAssignedManagedIdentity 托管标识身份验证的配置。 指定令牌请求的访问群体,该请求必须与事件中心命名空间(https://<NAMESPACE>.servicebus.windows.net匹配,因为连接器是 Kafka 客户端。 系统分配的托管标识会在启用后自动创建并分配给连接器。 是的,如果使用托管标识身份验证

    可以使用 Azure Key Vault 管理 Azure IoT MQ 的机密,而不是 Kubernetes 机密。 若要了解详细信息,请参阅使用 Azure Key Vault 或 Kubernetes 机密管理机密

    对于事件中心,请使用纯文本 SASL 和 $ConnectionString 作为用户名,并将完整连接字符串用作密码。

    kubectl create secret generic cs-secret \
      --from-literal=username='$ConnectionString' \
      --from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'
    

    对于 X.509,请使用包含公共证书和私钥的 Kubernetes TLS 机密。

    kubectl create secret tls my-tls-secret \
      --cert=path/to/cert/file \
      --key=path/to/key/file
    

    要使用托管标识,请将其指定为身份验证下的唯一方法。 还需要将角色分配给托管标识,该标识授予从事件中心(例如 Azure 事件中心数据所有者或 Azure 事件中心数据发送方/接收方)发送和接收消息的权限。 要了解详情,请参阅使用 Microsoft Entra ID 对访问事件中心资源的应用程序进行身份验证

    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          audience: https://<NAMESPACE>.servicebus.windows.net
    

    管理本地代理连接

    与 MQTT 桥一样,事件中心连接器充当 IoT MQ MQTT 代理的客户端。 如果已自定义 IoT MQ MQTT 代理的侦听器端口和/或身份验证,请同时替代事件中心连接器的本地 MQTT 连接配置。 要了解详细信息,请参阅 MQTT 桥本地代理连接

    KafkaConnectorTopicMap

    通过 KafkaConnectorTopicMap 自定义资源 (CR),可以定义 MQTT 主题与 Kafka 主题之间的映射,以便进行双向数据传输。 指定对 KafkaConnector CR 和路由列表的引用。 每个路由可以是 MQTT 到 Kafka 路由,也可以是 Kafka 到 MQTT 路由。 例如:

    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: KafkaConnectorTopicMap
    metadata:
      name: my-eh-topic-map
      namespace: <SAME NAMESPACE AS BROKER> # For example "default"
    spec:
      kafkaConnectorRef: my-eh-connector
      compression: snappy
      batching:
        enabled: true
        latencyMs: 1000
        maxMessages: 100
        maxBytes: 1024
      partitionStrategy: property
      partitionKeyProperty: device-id
      copyMqttProperties: true
      routes:
        # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
        - mqttToKafka:
            name: "route1"
            mqttTopic: temperature-alerts/#
            kafkaTopic: receiving-event-hub
            kafkaAcks: one
            qos: 1
            sharedSubscription:
              groupName: group1
              groupMinimumShareNumber: 3
        # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
        - kafkaToMqtt:
            name: "route2"
            consumerGroupId: mqConnector
            kafkaTopic: sending-event-hub
            mqttTopic: heater-commands
            qos: 0
    

    下表描述了 KafkaConnectorTopicMap CR 中的字段:

    copyMqttProperties 用于控制 MQTT 系统和用户属性是否复制到 Kafka 消息标头的布尔值。 用户属性按原样复制。 某些转换是使用系统属性完成的。 默认为 false。 routes MQTT 主题和 Kafka 主题之间数据传输的路由列表。 每个路由都可以有 mqttToKafkakafkaToMqtt 字段,具体取决于数据传输的方向。 请参阅 路由

    压缩字段可为发送到 Kafka 主题的消息启用压缩配置。 压缩有助于减少数据传输所需的网络带宽和存储空间。 但是,压缩还会给进程增加一些开销和延迟。 下表中列出了支持的压缩类型。

    latencyMs 消息在发送前可以缓冲的最大时间间隔(以毫秒为单位)。 如果达到此间隔,则所有缓冲消息都会通过批处理发送,而不管它们的数量或大小。 如果未设置,则默认值为 5。 maxMessages 在发送之前可以缓冲的最大消息数。 如果达到此数量,则所有缓冲消息都会通过批处理发送,而不管它们的大小或缓冲时间。 如果未设置,则默认值为 100000。 maxBytes 在发送之前可以缓冲的最大大小(以字节为单位)。 如果达到此大小,则所有缓冲消息都会通过批处理发送,而不管它们的数量或缓冲时间。 默认值为 1000000 (1 MB)。

    下面是使用批处理的示例:

    batching:
      enabled: true
      latencyMs: 1000
      maxMessages: 100
      maxBytes: 1024
    

    这意味着,当缓冲区中有 100 条消息,缓冲区中有 1024 个字节时,或距离上次发送已有 1000 毫秒时(不论先达到哪个条件),将发送消息。

    分区处理策略

    分区处理策略是一项功能,可用于控制将消息发送到 Kafka 主题时如何将其分配给 Kafka 分区。 Kafka 分区是支持并行处理和容错的 Kafka 主题的逻辑段。 Kafka 主题中的每个消息都有一个分区和一个偏移量,用于标记消息并进行排序。

    默认情况下,Kafka 连接器使用轮循机制算法将消息分配给随机分区。 但是,你可以使用不同的策略根据某些条件(例如 MQTT 主题名称或 MQTT 消息属性)将消息分配到分区。 这有助于实现更好的负载均衡、数据区域或消息排序。

    这意味着具有相同设备 ID 的消息将发送到同一分区。

    路由字段定义 MQTT 主题和 Kafka 主题之间数据传输的路由列表。 每个路由都可以有 mqttToKafkakafkaToMqtt 字段,具体取决于数据传输的方向。

    MQTT 到 Kafka

    mqttToKafka 字段定义将数据从 MQTT 主题传输到 Kafka 主题的路由。

    sharedSubscription 将共享订阅用于 MQTT 主题的配置。 指定 groupName,这是一组订阅者的唯一标识符,以及指定 groupMinimumShareNumber,这是从主题接收消息的组中的订阅服务器数量。 例如,如果 groupName 为“group1”且 groupMinimumShareNumber 为 3,则连接器会创建三个具有相同组名称的订阅服务器,以便从主题接收消息。 此功能允许在多个订阅服务器之间分发消息,而不会丢失任何消息或创建重复消息。

    使用 mqttToKafka 路由的示例:

    mqttToKafka:
      mqttTopic: temperature-alerts/#
      kafkaTopic: receiving-event-hub
      kafkaAcks: one
      qos: 1
      sharedSubscription:
        groupName: group1
        groupMinimumShareNumber: 3
    

    在此示例中,与 temperature-alerts/# 匹配的 MQTT 主题的消息发送到 Kafka 主题 receiving-event-hub(其中 QoS 等效于 1)和共享订阅组“group1”(其中共享数量为 3)。

    Kafka 到 MQTT

    kafkaToMqtt 字段定义将数据从 Kafka 主题传输到 MQTT 主题的路由。

    发送到 MQTT 主题的消息的服务质量 (QoS) 级别。 可能的值为 0 或 1(默认值)。 目前不支持 QoS 2。 如果 QoS 设置为 1,连接器会将消息发布到 MQTT 主题,然后在将消息提交回 Kafka 之前等待确认。 对于 QoS 0,连接器会立即提交回 Kafka,无需 MQTT 确认。

    使用 kafkaToMqtt 路由的示例:

    kafkaToMqtt:
      kafkaTopic: sending-event-hub
      mqttTopic: heater-commands
      qos: 0
    

    在此示例中,来自 Kafka 主题 sending-event-hub* 的消息将发布到 MQTT 主题 heater-commands,且 QoS 级别为 0。

    事件中心名称必须与 Kafka 主题匹配

    每个单独的事件中心(而非命名空间)都必须与路由中指定的预期 Kafka 主题的名称完全相同。 此外,如果连接字符串限定为一个事件中心,则连接字符串 EntityPath 必须相匹配。 此要求是因为事件中心命名空间类似于 Kafka 群集,事件中心名称类似于 Kafka 主题,因此 Kafka 主题名称必须与事件中心名称匹配。

    Kafka 使用者组偏移。

    如果连接器断开或被删除,并使用同一 Kafka 使用者组 ID 重新安装,则使用者组偏移量(Kafka 使用者读取消息的最后一个位置)将存储在 Azure 事件中心。 要了解详细信息,请参阅事件中心使用者组与Kafka 使用者组

    MQTT 版本

    此连接器仅使用 MQTT v5。

    使用 Azure IoT MQ 发布和订阅 MQTT 消息

  •