本文介绍了 AnalyticDB PostgreSQL版 中数据合并的方法和背后的原理,进而介绍如何使用批量操作,快速地更新数据。

更新,又称为合并(Merge),指把数据最新版本更新到 AnalyticDB PostgreSQL版 中。如果数据已经存在,则将它们替换为新版本;如果不存在,将它们插入数据库中。这种数据合并一般在离线完成。例如,设置每天一次批量把数据更新到 AnalyticDB PostgreSQL版 中。也有用户需要实时更新,要求做到分钟级甚至秒级延迟。

简单更新过程

无论怎么做数据合并,都是对数据的修改,即执行Update、Delete、Insert、Copy等操作。以用户发起一次Update操作为例(对列存表单行记录的更新), AnalyticDB PostgreSQL版 中的数据更新过程如下图所示。

步骤说明如下:

  • 用户发送Update的SQL请求至主节点。
  • 主节点发起分布式事务,对被Update的表加锁( AnalyticDB PostgreSQL版 不允许并行Update同一张表),然后把更新请求分发到对应的子节点。
  • 子节点通过索引扫描,定位到要更新的数据,并更新数据。对于列存表,更新逻辑就是删除旧的数据行,并在表的尾端写入新的数据行。列存表中被更新的数据页面会写入内存缓存区,对应的表文件长度的变化(因为尾端写入了数据,所以数据表对应的文件长度增大了)会写入日志(xlog文件)。
  • 在Update命令结束前,内存中被更新的数据页面和xlog日志,都要同步到Mirror节点。同步完成后,主节点结束分布式事务,返回用户执行成功的消息。
  • 可以看出,整个过程的链条很长,SQL语句解析、分布式事务、锁,主节点子节点之间的连接建立、子节点与Mirror数据和日志同步等操作,都会耗费CPU或I/O资源,同时拖慢整个请求的响应时间。因此,对于 AnalyticDB PostgreSQL版 来说,应该尽量避免单行数据的更新,尽量批量地更新数据,即:

  • 把更新放到一个SQL语句,减少语句解析、节点通信、数据同步等开销。
  • 把更新放到一个事务,避免不必要的事务开销。
  • 简而言之,就是尽量以“成批”的形式进行数据的合并和更新。

    批量Update

    怎样用一个SQL实现多个独立数据行的Update?

    1.准备目标表

    假设有张表需要更新(称为目标表,target_table),这张表的定义如下。

    create table target_table(c1 int, c2 int, primary key (c1));
    insert into target_table select generate_series(1, 10000000);

    一般目标表都非常大,这里假设往target_table里面插入1千万行数据。为了能快速更新,target_table上要有索引。这里定义了primary key,会隐含地创建一个唯一值索引(unique index)。

    2.准备中间表

    为了做批量Update,需要用到中间表(Stage Table,示例中的source_table),即为了更新数据临时创建的表。为了更新target_table的数据,可以先把新数据插入到中间表source_table中。然后,把新数据通过 COPY命令 OSS外部表 等方式导入到source_table。以下示例直接插入了一些数据。

    create table source_table(c1 int, c2 int);
    insert into source_table select generate_series(1, 100), generate_series(1,100);

    3.批量更新

    source_table数据准备好后,执行如下 update set … from … where .. 语句,即可实现批量的Update。

    注意 :为了最大限度的使用到索引,在执行Update前,要使用 set optimizer=on 启用ORCA优化器(如果不启用ORCA优化器,则需要执行 set enable_nestloop = on 才能使用到索引)。

    set optimizer=on;
    update target_table set c2 = source_table.c2 from source_table where target_table.c1= source_table.c1;

    该Update的执行计划如下:

    => explain update target_table set c2 = source_table.c2 from source_table where target_table.c1= source_table.c1;
                                                             QUERY PLAN
    -----------------------------------------------------------------------------------------------------------------------------
     Update  (cost=0.00..586.10 rows=25 width=1)
       ->  Result  (cost=0.00..581.02 rows=50 width=26)
             ->  Redistribute Motion 4:4  (slice1; segments: 4)  (cost=0.00..581.02 rows=50 width=22)
                   Hash Key: public.target_table.c1
                   ->  Assert  (cost=0.00..581.01 rows=50 width=22)
                         Assert Cond: NOT public.target_table.c1 IS NULL
                         ->  Split  (cost=0.00..581.01 rows=50 width=22)
                               ->  Nested Loop  (cost=0.00..581.01 rows=25 width=18)
                                     Join Filter: true
                                     ->  Table Scan on source_table  (cost=0.00..431.00 rows=25 width=8)
                                     ->  Index Scan using target_table_pkey on target_table  (cost=0.00..150.01 rows=1 width=14)
                                           Index Cond: public.target_table.c1 = source_table.c1

    可以看到, AnalyticDB PostgreSQL版 选择了索引。但是,如果往source_table里面加入更多数据,优化器会认为使用Nest Loop关联方法+索引扫描,没有不使用索引高效,而会选取Hash关联方法+表扫描方式来执行。例如:

    postgres=> insert into source_table select generate_series(1, 1000), generate_series(1,1000);
    INSERT 0 1000
    postgres=> analyze source_table;
    ANALYZE
    postgres=> explain update target_table set c2 = source_table.c2 from source_table where target_table.c1= source_table.c1;
                                                  QUERY PLAN
    ------------------------------------------------------------------------------------------------------
     Update  (cost=0.00..1485.82 rows=275 width=1)
       ->  Result  (cost=0.00..1429.96 rows=550 width=26)
             ->  Assert  (cost=0.00..1429.94 rows=550 width=22)
                   Assert Cond: NOT public.target_table.c1 IS NULL
                   ->  Split  (cost=0.00..1429.93 rows=550 width=22)
                         ->  Hash Join  (cost=0.00..1429.92 rows=275 width=18)
                               Hash Cond: public.target_table.c1 = source_table.c1
                               ->  Table Scan on target_table  (cost=0.00..477.76 rows=2500659 width=14)
                               ->  Hash  (cost=431.01..431.01 rows=275 width=8)
                                     ->  Table Scan on source_table  (cost=0.00..431.01 rows=275 width=8)

    上述批量Update方式,减少了SQL编译、节点间通信、事务等开销,可以大大提升数据更新性能并减少对资源的消耗。

    批量Delete

    对于Delete操作,采用和上述批量Update类似的中间表,然后使用下面的带有“Using”子句的Delete来实现批量删除:

    delete from target_table using source_table where target_table.c1 = source_table.c1;

    可以看到,这种批量的Delete同样使用了索引。

    explain delete from target_table using source_table where target_table.c1 = source_table.c1;
                                                 QUERY PLAN
    -----------------------------------------------------------------------------------------------------
     Delete (slice0; segments: 4)  (rows=50 width=10)
       ->  Nested Loop  (cost=0.00..41124.40 rows=50 width=10)
             ->  Seq Scan on source_table  (cost=0.00..6.00 rows=50 width=4)
             ->  Index Scan using target_table_pkey on target_table  (cost=0.00..205.58 rows=1 width=14)
                   Index Cond: target_table.c1 = source_table.c1

    利用Delete+Insert做数据合并

    如何实现批量的数据合并?在做数据合并时,需要先把待合入的数据放入中间表中。

  • 如果预先知道待合入的数据,在目标表中都已经有对应的数据行,即可通过Update语句实现数据合入。
  • 但多数情况下,待合入的数据中,一部分是在目标表中已存在记录的数据,还有一部分是新增的,目标表中没有对应记录。这种情况下,需要使用一次批量Delete+一次批量Insert。代码示例如下。
  • set optimizer=on;
    delete from target_table using source_table where target_table.c1 = source_table.c1;
    insert into target_table select * from source_table;

    利用 Values() 表达式做实时更新

    使用中间表,需要维护中间表生命周期。有的用户想实时批量更新数据到 AnalyticDB PostgreSQL版 ,即持续性地同步数据或合并数据到 AnalyticDB PostgreSQL版

    如果采用上面的方法,需要反复创建、删除(或Truncate)中间表。其实,可以利用Values表达式,达到类似中间表的效果,却不用维护表。方法是先将待更新的数据拼成一个Values表达式,然后按如下方式执行Update或Delete:

    update target_table set c2 = t.c2 from (values(1,1),(2,2),(3,3),…(2000,2000)) as t(c1,c2) where target_table.c1=t.c1
    delete from target_table using (values(1,1),(2,2),(3,3),…(2000,2000)) as t(c1,c2) where target_table.c1 = t.c1
    注意 使用 set optimizer=on; set enable_nestloop=on; 都可以生成使用索引的查询计划。但在比较复杂的情形下,比如索引字段有多个、涉及分区表等,必须要使用ORCA优化器才能匹配上索引。
    • 本页导读 (1)
    文档反馈