本文为您介绍Debezium格式的使用示例、配置选项和类型映射。

背景信息

Debezium 是一个CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自MySQL、PostgreSQL、Oracle、Microsoft SQL Server和许多其他数据库的更改实时流式传输到Kafka中。Debezium为变更日志提供了统一的格式结构,并支持使用JSON和Apache Avro序列化消息。支持Debezium格式的连接器有 消息队列Kafka 对象存储OSS

Flink支持将Debezium JSON和Avro消息解析为INSERT、UPDATE或DELETE消息到Flink SQL系统中。在很多情况下,利用这个特性非常的有用,例如:

  • 将增量数据从数据库同步到其他系统

  • 日志审计

  • 数据库的实时物化视图

  • 数据库表的temporal join变更历史

Flink还支持将Flink SQL中的INSERT、UPDATE或DELETE消息编码为Debezium格式的JSON消息或Avro消息,输出到Kafka等存储中。

重要

目前Flink还不支持将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此,Flink将UPDATE_BEFORE和UPDATE_AFTER分别编码为DELETE和INSERT类型的Debezium消息。

使用示例

假设MySQL products表有4列(id、name、description、weight),一个JSON格式的从MySQL products表捕获的更新操作的简单示例如下:

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  "source": {...},
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null
};
说明

示例中各字段的含义,详情请参见 Debezium

上面的JSON消息是products表上的一条更新事件,其中id = 111的行的weight值从5.18更改为5.15。假设此消息已同步到一个名为products_binlog的Kafka topic ,则可以使用以下DDL来使用此topic并解析更改事件。

-- 使用'debezium-json' format来解析Debezium的JSON消息
CREATE TABLE topic_products (
-- schema与MySQL的products表完全相同
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
-- 使用'debezium-json' format来解析Debezium的JSON消息
-- 如果Debezium用Avro编码消息,请使用'debezium-avro-confluent'
);

在某些情况下,在设置Debezium Kafka Connect时,可能会开启Kafka的配置value.converter.schemas.enable,用来在消息体中包含schema信息。Debezium JSON消息可能如下所示:

{
  "schema": {...},
  "payload": {
    "before": {
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.18
    "after": {
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.15
    "source": {...},
    "op": "u",
    "ts_ms": 1589362330904,
    "transaction": null
}

为了解析这一类信息,您需要在上述DDL WITH子句中添加选项 'debezium-json.schema-include' = 'true' (默认为false)。通常情况下,建议不要包含schema的描述,因为这样会使消息变得非常冗长,并降低解析性能。

在将主题注册为Flink表之后,可以将Debezium消息用作变更日志源。

-- MySQL "products" 的实时物化视图。
-- 计算相同产品的最新平均重量。
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- 将 MySQL "products" 表的所有数据和增量更改同步到。
-- Elasticsearch "products" 索引,供将来查找。
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

配置选项

Flink提供了debezium-avro-confluent和debezium-json两种格式来解析Debezium生成的Avro格式和JSON格式的消息。

debezium-avro-confluent

使用debezium-avro-confluent来解析Debezium Avro消息。

参数

是否必选

默认值

类型

描述

format

(none)

String

指定要使用的格式,解析Debezium Avro消息时,参数取值为debezium-avro-confluent。

debezium-avro-confluent.basic-auth.credentials-source

(none)

String

Schema Registry的基本身份验证凭据的source。

debezium-avro-confluent.basic-auth.user-info

(none)

String

Schema Registry的基本认证用户信息。

debezium-avro-confluent.bearer-auth.credentials-source

(none)

String

Schema Registry的持有者身份验证凭据的source。

debezium-avro-confluent.bearer-auth.token

(none)

String

Schema Registry的持有者认证令牌。

debezium-avro-confluent.properties

(none)

Map

属性映射,该映射被转发到Schema Registry。对于没有通过Flink配置选项正式公开的选项非常有用。

重要

Flink选项具有更高的优先级。

debezium-avro-confluent.ssl.keystore.location

(none)

String

SSL keystore的位置。

debezium-avro-confluent.ssl.keystore.password

(none)

String

SSL keystore的密码。

debezium-avro-confluent.ssl.truststore.location

(none)

String

SSL truststore的位置。

debezium-avro-confluent.ssl.truststore.password

(none)

String

SSL truststore的密码。

debezium-avro-confluent.subject

(none)

String

Confluent Schema Registry subject,在该subject下注册此格式在序列化期间使用的模式。默认情况下,如果使用kafka和upsert-kafka连接器作为值或键格式,则使用<Topname>-value或<topname>-key作为默认subject名称。对于filesystem连接器,当其作为sink使用时,必须使用subject选项。

debezium-avro-confluent.url

(none)

String

用于获取或注册schemas的Confluent Schema Registry的URL。

debezium-json

使用debezium-json来解析Debezium JSON消息。

参数

是否必选

默认值

类型

描述

format

(none)

String

指定要使用的格式,解析Debezium JSON消息时,参数取值为debezium-json。

debezium-json.schema-include

false

Boolean

设置Debezium Kafka Connect时,可以启用Kafka配置value.converter.schemas.enable,以在消息中包含schema。此选项表明Debezium JSON消息是否包含schema。

参考取值如下:

  • true:Debezium JSON消息包含schema。

  • false:Debezium JSON消息不包含schema。

debezium-json.ignore-parse-errors

false

Boolean

参数取值如下:

  • true:当解析异常时,跳过当前字段或行。

  • false(默认值):报出错误,作业启动失败。

debezium-json.timestamp-format.standard

SQL

String

指定输入和输出时间戳格式。参数取值如下:

  • SQL:解析yyyy-MM-dd HH:mm:ss.s{precision}格式的输入时间戳,例如2020-12-30 12:13:14.123,并以相同格式输出时间戳。

  • ISO-8601:解析yyyy-MM-ddTHH:mm:ss.s{precision}格式的输入时间戳,例如2020-12-30T12:13:14.123,并以相同的格式输出时间戳。

debezium-json.map-null-key.mode

FAIL

String

指定处理Map中key值为空的方法。参数取值如下:

  • FAIL:在Map中key值为空的时候抛出异常。

  • DROP:丢弃Map中key值为空的数据项。

  • LITERAL:使用字符串常量来替换Map中的空key值。字符串常量的值由canal-json.map-null-key.literal定义。

debezium-json.map-null-key.literal

null

String

当debezium-json.map-null-key.mode的值是LITERAL时,指定字符串常量替换Map中的空key值。

debezium-json.encode.decimal-as-plain-number

false

Boolean

参数取值如下:

  • true:所有DECIMAL类型的数据保持原状,不使用科学计数法表示,例:0.000000027表示为0.000000027。

  • false:所有DECIMAL类型的数据,使用科学计数法表示,例如0.000000027表示为2.7E-8。

类型映射

目前,Debezium使用JSON格式进行序列化和反序列化。有关数据类型映射的更多详细信息,请参考JSON Format文档和Confluent Avro Format文档。

其他使用说明

可用的元数据

以下格式元数据可以在DDL语句中声明为只读(VIRTUAL)列。

重要

只有当相应的连接器转发格式元数据时,格式元数据字段才可用。目前,只有Kafka连接器能够声明其值格式的元数据字段。

数据类型

说明

schema

STRING NULL

描述有效负载模式的JSON字符串。如果模式未包含在Debezium记录中,则为Null。

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

连接器处理事件时的时间戳。对应于Debezium记录中的ts_ms字段。

source.timestamp

TIMESTAMP_LTZ(3) NULL

源系统创建事件的时间戳。对应于Debezium记录中的source.ts_ts字段。

source.database

STRING NULL

原始数据库。对应于Debezium记录中的source.db字段(如果可用)。

source.schema

STRING NULL

原始数据库的模式。对应于Debezium记录中的source.schema字段(如果可用)。

source.table

STRING NULL

原始数据库的表。对应于Debezium记录中的source.table或source.collection字段(如果可用)。

source.properties

MAP<STRING, STRING> NULL

各种源属性的映射。对应于Debezium记录中的source字段。

下面的例子展示了如何在Kafka中访问Debezium元数据字段:

CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

常见问题

故障时投递重复的变更事件

在正常的操作环境下,Debezium能够以exactly-once的语义投递每条变更事件,Flink能够正常消费Debezium产生的变更事件。在非正常情况下(例如有故障发生),Debezium只能保证at-least-once的投递语义。此时,Debezium可能会投递重复的变更事件到Kafka中,当Flink从Kafka中消费的时候就会得到重复的事件,可能导致Flink query的运行得到错误的结果或者非预期的异常。因此,在这种情况下,建议将作业参数table.exec.source.cdc-events-duplicate设置成true,并在该源表上定义PRIMARY KEY。Flink系统会生成一个额外的有状态算子,使用该PRIMARY KEY来对变更事件去重并生成一个规范化的changelog流。

说明

关于Debezium的消息投递语义的更多信息,请参见 Debezium

无法正确解析Debezium Postgres Connector产生的数据

如果您正在使用 Debezium PostgreSQL Connector 捕获变更到Kafka,请确保被监控表的 REPLICA IDENTITY 已经被配置成FULL了,默认值是DEFAULT。否则,Flink SQL将无法正确解析Debezium数据。

当配置为FULL时,更新和删除事件将完整包含所有列的之前的值。当为其他配置时,更新和删除事件的before字段将只包含PRIMARY KEY字段的值,或者为null(没有PRIMARY KEY)。您可以通过运行ALTER TABLE <your-table-name> REPLICA IDENTITY FULL来更改REPLICA IDENTITY的配置。

说明

更多信息,请参见 Debezium connector for PostgreSQL