本文介绍如何通过社区版Flink将Kafka的数据同步至 AnalyticDB PostgreSQL版

前提条件

  • 已将社区版Flink的客户端IP地址添加至 AnalyticDB PostgreSQL版 白名单中,设置白名单的方法,请参见 设置白名单
  • 在Flink的客户端的 $FLINK_HOME/lib 路径上部署Kafka Connector相关依赖,您可以直接使用Flink官网提供的Table API Kafka Connector,具体内容,请参见 Apache Kafka SQL Connector
    本文示例中使用的依赖如下。
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.17-SNAPSHOT</version>
    </dependency>
  • 在Flink的客户端的 $FLINK_HOME/lib 路径上部署 AnalyticDB PostgreSQL Connector相关依赖。获取 AnalyticDB PostgreSQL Connector的JAR包,请参见 AnalyticDB PostgreSQL Connector

    本文示例中使用的 AnalyticDB PostgreSQL Connector版本为1.13。实际使用时建议选择Flink引擎版本相近的版本。

注意事项

阿里云实时计算Flink版与社区版Flink操作存在差异,但是使用过程基本相同,如需使用阿里云实时计算Flink版,请参见 阿里云实时计算Flink版文档

操作步骤

  1. 在Flink上创建一张表,用于读取Kafka表的数据。
    CREATE TABLE KafkaTable (
      `user_id` BIGINT,
      `item_id` BIGINT,
      `behavior` STRING,
      `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
      `shard` BIGINT METADATA FROM 'partition' VIRTUAL,
      `meta_offset` BIGINT METADATA FROM 'offset' VIRTUAL
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_behavior',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
    );

    Kafka Connector参数说明如下:

    参数 是否必填 说明
    connector Connector名称,此处固定为 kafka
    topic Kafka的Topic名称。
    properties.bootstrap.servers Kafka客户端的连接地址和端口。
    properties.group.id Kafka的消费组ID。
    scan.startup.mode 数据消费的起始点位,具体介绍,请参见 起始消费点位
    value.format 序列化和反序列化Kafka消息体时使用的格式。更多格式和相关配置的介绍,请参见 格式

    更多Kafka Connector参数介绍,请参见 连接器参数

    Kafka的每条消息(Record)中均包含了一些元数据(例如timestamp、offset、partition等),这些元数据在业务中可以起到一定作用。测试表中的event_time、meta_offset和shard列就是从Kafka消息中获取到的有用信息。关于可用的元数据的介绍,请参见 可用的元数据

  2. AnalyticDB PostgreSQL版 上创建一张目标表。
    CREATE TABLE ADBPGTargetTable (
      user_id BIGINT primary key,
      item_id BIGINT,
      behavior VARCHAR,
      event_time TIMESTAMP,
      shard BIGINT,            -- partition在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的partition列名替换为shard。
      meta_offset BIGINT       -- offset在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的offset列名替换为meta_offset。
    );
  3. 在Flink上创建一张表,用于将数据同步至 AnalyticDB PostgreSQL版 ,建议表结构需与步骤1创建的源表结构相同。
    CREATE TABLE ADBPGTargetTable (
      `user_id` BIGINT primary key,
      `item_id` BIGINT,
      `behavior` STRING,
      `event_time` TIMESTAMP(3),
      `shard` BIGINT,            -- partition在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的partition列名替换为shard。
      `meta_offset` BIGINT       -- offset在AnalyticDB PostgreSQL中是保留关键字,因此需要将Kafka表原有的offset列名替换为meta_offset。
    ) WITH (
       'connector' = 'adbpg-nightly-1.13',
       'password' = 'Password01',
       'tablename' = 'ADBPGTargetTable',
       'username' = 'user01',
       'url' = 'jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres',
       'maxretrytimes' = '2',
       'batchsize' = '50000',
       'connectionmaxactive' = '5',
       'conflictmode' = 'ignore',
       'usecopy' = '0',
       'targetschema' = 'public',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    AnalyticDB PostgreSQL Connector参数说明如下:

    参数 是否必填 说明
    connector Connector名称,格式为 adbpg-nightly-版本号

    例如本次示例使用的 AnalyticDB PostgreSQL Connector是1.13版本,那么Connector名称为 adbpg-nightly-1.13

    url AnalyticDB PostgreSQL版 的JDBC连接地址。格式为 jdbc:postgresql://<连接地址:端口>/<连接的数据库名称> ,示例如下 jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres
    tablename AnalyticDB PostgreSQL版 的表名。
    username AnalyticDB PostgreSQL版 的数据库账号。
    password AnalyticDB PostgreSQL版 的数据库账号密码。
    maxretrytimes SQL执行失败后重试次数,默认值为3次。
    batchsize 一次批量写入的最大条数,默认值为50000条。
    exceptionmode 数据写入过程中出现异常时的处理策略。支持以下两种处理策略:
    • ignore(默认):数据写入异常时,忽略出现异常的数据。
    • strict:数据写入异常时,故障转移(Failover)并报错。
    conflictmode 当出现主键冲突或者唯一索引冲突时的处理策略。支持以下四种处理策略:
    • ignore :忽略主键冲突,保留之前的数据。
    • strict:主键冲突时,故障转移(Failover)并报错。
    • update:主键冲突时,更新为新增的数据。
    • upsert(默认):主键冲突时,采用UPSERT方式写入数据。

      AnalyticDB PostgreSQL版 通过 INSERT ON CONFLICT COPY ON CONFLICT 实现UPSERT写入数据。

      如果目标表为分区表,则需要内核小版本为V6.3.6.1及以上。如何升级内核小版本,请参见 版本升级

    targetschema AnalyticDB PostgreSQL版 的Schema,默认值为public。
    writemode 写入方式。取值说明:
    • 0 :采用BATCH INSERT方式写入数据。
    • 1(默认):采用COPY API写入数据。
    • 2:采用BATCH UPSERT方式写入数据。
    verbose 是否输出connector运行日志。取值说明:
    • 0(默认):不输出运行日志。
    • 1:输出运行日志。
    retrywaittime 出现异常后,重试的间隔时间。单位为毫秒(ms),默认值为100。
    batchwritetimeoutms 攒批写入数据时最长攒批时间,超过此时间会触发写入。单位为毫秒(ms),默认值为50000。
    connectionmaxactive 连接池参数,单个Task manager中连接池最大连接数。默认值为5。
    casesensitive 列名和表名是否区分大小写,取值说明:
    • 0(默认):不区分大小写。
    • 1:区分大小写。
  4. 在Flink上执行INSERT INTO命令将Kafka数据同步至 AnalyticDB PostgreSQL版
    INSERT INTO ADBPGTargetTable SELECT * FROM KafkaSourceTable;

您可以在Flink控制台查看执行情况,示例如下。

Flink将kafka数据同步至adbpg