相关文章推荐
卖萌的烤地瓜  ·  MySQL整库同步Kafka - ...·  10 月前    · 
卖萌的烤地瓜  ·  All Aboard, Part 3: ...·  10 月前    · 
卖萌的烤地瓜  ·  typescript - Vue | ...·  11 月前    · 
聪明的作业本  ·  Advanced query ...·  56 分钟前    · 
失望的鸡蛋面  ·  "Microsoft Outlook ...·  58 分钟前    · 
坚强的柿子  ·  mongodb 多表关联处理 : ...·  1小时前    · 
不爱学习的火腿肠  ·  java ...·  2 小时前    · 
旅行中的铁链  ·  错误信息:SSL ShakeHand ...·  3 小时前    · 
憨厚的金鱼  ·  Scanpy数据结构:AnnData - 何帅 ·  3 小时前    · 

MySQL CDC数据表主要用于获取MySQL数据,并可以实时同步数据表中的修改,经常用在复杂的计算场景。例如,作为一张维表和其他数据表做Join操作。在使用中,同一张MySQL表可能被多个作业依赖,当多个任务使用同一张MySQL表做处理时,MySQL数据库会启动多个连接,对MySQL服务器和网络造成很大的压力。

为了缓解对上游MySQL数据库的压力,阿里云Flink实时计算已提供MySQL整库同步到Kafka的能力,通过引入Kafka作为中间层,并使用CDAS整库同步或CTAS整表同步到Kafka来解决。

具体操作是使用CDAS或CTAS语法,在一个作业里将上游的MySQL的数据实时同步到Kafka中。在MySQL整库同步任务启动后,由Kafka JSON Catalog创建Topic,每张MySQL表以Upsert Kafka 的方式写入对应topic。然后直接使用Kafka JSON Catalog中的表代替MySQL表,从而降低多个任务对MySQL数据库造成的压力。

  • 同步的MySQL表必须包含主键。
  • 支持使用自建Kafka集群,EMR的Kafka集群和阿里云消息队列Kafka版。使用阿里云消息队列Kafka版时,只能通过 默认接入点 使用。
  • upsert-kafka表暂未支持作为CTAS和CDAS语法的源表,upsert-kafka表只能作为CTAS和CDAS同步的结果表。
  • Kafka集群的存储空间必须大于源表数据的存储空间,否则会因存储空间不足导致数据丢失。因为整库同步Kafka建立的topic都是compacted topic,即topic的每个消息键(Key)仅保留最近的一条消息,但是数据不会过期,compacted topic里相当于保存了与源库的表相同大小的数据。
  • 创建并启动一个CDAS或CTAS同步任务,将数据库中的表同步到Kafka中。
  • CDAS同步语句
    CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
    AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;
    说明 由于Kafka本身没有数据库的概念,所以不存在创建数据库的操作,使用时需要结合IF NOT EXISTS来跳过建库。
  • CTAS同步语句
    CREATE TABLE `kafka-catalog`.`kafka`.`topic`
    AS TABLE `mysql-catalog`.`db`.`table`;
  • 使用同步到Kafka的表。

    整库同步任务建立的Kafka topic名称和MySQL表名相同,分区数和副本数会使用集群的默认配置,并且cleanup.policy会设置为compact。

    使用Kafka JSON Catalog访问MySQL数据库表的对应表,下游作业可以消费Topic中的数据来获取数据库表的最新数据。对于同步到Kafka的表,使用方式有以下两种:
  • 通过Catalog直接使用

    详情请参见 使用Kafka JSON Catalog

    说明 在直接使用时,由于可能发生了Schema变更,Kafka JSON Catalog解析出的Schema可能与MySQL对应表存在差异,例如出现已经删除的字段,部分字段可能出现为null的情况。

    Catalog读取出的Schema由消费到的数据的字段组成。如果存在删除的字段且消息未过期,则会出现一些已经不存在的字段,这样的字段值会为null,该情况无需特殊处理。

  • 通过创建临时表的方式使用
    这种方式支持用户自定义指定Schema。您可以在Schemas tab页查看并复制表配置的WITH部分。基本格式如下所示。
    CREATE TEMPORARY TABLE tempOrder (
      `key_order_id` BIGINT NOT NULL,
      `value_product` STRING,
      PRIMARY KEY (key_order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'order',
      'properties.bootstrap.servers' = 'xxxx',
      'key.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.format' = 'json',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY',
      'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
      'value.json.infer-schema.primitive-as-string' = 'false'
                      
    例如,在订单评论实时分析场景下,假设有用户表(user),订单表(order)和用户评论表(feedback)三张表。各个表包含数据如下图所示。mysql database
    在展示用户订单信息和用户评论时,需要通过关联用户表(user)来获取用户名(name字段)信息。代码示例如下。
    -- 将订单信息和用户表做join,展示每个订单的用户名和商品名。
    SELECT order.id as order_id, product, user.name as user_name
    FROM order LEFT JOIN user
    ON order.user_id = user.id;
    -- 将评论和用户表做join,展示每个评论的内容和对应用户名。
    SELECT feedback.id as feedback_id, comment, user.name as user_name
    FROM feedback LEFT JOIN user
    ON feedback.user_id = user.id;

    对于以上两个SQL任务,user表在两个作业中都被使用了一次。运行时,两个作业都会读取MySQL的全量数据和增量数据。全量读取需要创建MySQL连接,增量读取需要创建Binlog Client。随着作业的不断增多,MySQL连接和Binlog Client资源也会对应增长,会给上游数据库产生极大的压力。

    为了缓解对上游MySQL数据库的压力,可以通过CDAS或CTAS语法在一个作业里将上游的MySQL数据实时同步到Kafka中,然后提供给多个下游作业消费。代码示例如下。
    CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
    AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;
    同步任务成功启动后,上游MySQL数据库中的数据会以JSON格式写入Kafka中,一个Kafka Topic可以提供给多个下游作业消费,从而避免多个MySQL CDC Source直连数据库产生压力。代码示例如下。
    -- 将订单信息和Kafka JSON Catalog中的用户表做join,展示每个订单的用户名和商品名。
    SELECT order.id as order_id, product, user.value_name as user_name
    FROM order LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
    ON order.user_id = user.id;
    -- 将评论和Kafka JSON Catalog中的用户表做join,展示每个评论的内容和对应用户名。
    SELECT feedback.id as feedback_id, comment, user.value_name as user_name
    FROM feedback LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
    ON feedback.user_id = user.id;
  • CREATE TABLE AS(CTAS)语句
  • CREATE DATABASE AS(CDAS)语句
  • MySQL的CDC源表
  •  
    推荐文章