《PostgreSQL 高并发任务分配系统 实践》

另一方面,如果业务要求完全无缝的自增ID,我也有对应的文档提及。

《PostgreSQL 无缝自增ID的实现 - by advisory lock》

那么接下来按本文开头提到的几个要求进行设计。

设计一、含组ID,每个组一个序列,序列和文本在单个组内唯一

1、设计一个UDF,自动生成与组ID一对一的序列,并返回序列的值。

create or replace function get_per_gp_id(    
  text,   -- 序列名前缀  
  int     -- 分组ID,作为序列名后缀  
) returns int8 as $$    
declare    
begin    
  return nextval(($1||$2)::regclass);    
  exception when others then    
    execute 'create sequence if not exists '||$1||$2||' start with 0 minvalue 0' ;    
    return nextval(($1||$2)::regclass);    
$$ language plpgsql strict;    

2、创建测试表

create table tbl1(    
  gid int,   -- 分组ID  
  ts text,   -- 文本  
  sn int8,   -- 自增序列值  
  unique(gid,ts),     
  unique(gid,sn)    

3、创建一个UDF,当输入组ID和文本时,如果文本存在,返回已有的序列,如果文本不存在则分配一个唯一ID,并返回这个ID。

create or replace function ins1(    
  int,   -- 分组ID  
  text   -- 文本  
) returns int8 as $$    
declare    
  res int8;    
begin    
  -- 查看该分组内该文本是否已存在  
  select sn into res from tbl1 where gid=$1 and ts=$2;    
  if found then    
    return res;    
    -- 不存在,则生成一个ID  
    insert into tbl1 (gid,ts,sn) values ($1, $2, get_per_gp_id('seq_', $1)) returning sn into res;    
    return res;    
  end if;    
  exception when others then    
    -- 异常则可能是其他并行会话正在生成该序列,重新查询,并返回SN。  
    select sn into res from tbl1 where gid=$1 and ts=$2;    
    if found then    
      return res;    
      raise ;    
    end if;    
$$ language plpgsql strict;    
设计二、不含组ID,文本和序列全局唯一

1、创建一个序列即可

create sequence seq_tbl2_sn start with 0 minvalue 0;    

2、创建测试表

create table tbl2(    
  ts text unique,   -- 文本  
  sn int8 default nextval('public.seq_tbl2_sn'::regclass) unique  -- 序列  

3、创建一个UDF,当输入文本时,如果文本已存在,返回文本对应的序列,如果文本不存在,则分配一个唯一序列值,同时返回该值。

create or replace function ins2(    
) returns int8 as $$    
declare    
  res int8;    
begin    
  -- 查看该文本是否已存在  
  select sn into res from tbl2 where ts=$1;    
  if found then    
    return res;    
    -- 不存在,则生成一个ID  
    insert into tbl2 (ts) values ($1) returning sn into res;    
    return res;    
  end if;    
  exception when others then    
    -- 异常则可能是其他并行会话正在生成该序列,重新查询,并返回SN。  
    select sn into res from tbl2 where ts=$1;    
    if found then    
      return res;    
      raise ;    
    end if;    
$$ language plpgsql strict;    
设计三、含组ID,并且全局唯一

我们假设字典空间为40亿,则使用INT4。

如果字典空间超过40亿,则需要使用INT8。

1、创建序列,设置起始值为INT4的最小值

create sequence seq_tbl_dict minvalue -2147483648 start with -2147483648;  

2、创建测试表

create table tbl_dict(    
  gid int2,    -- 组ID  
  ts text,     -- 文本  
  sn int4 default nextval('public.seq_tbl_dict'::regclass),  -- 序列  
  unique (gid,ts),  
  unique (sn)  

3、创建一个UDF,当输入文本时,如果文本已存在,返回文本对应的序列,如果文本不存在,则分配一个唯一序列值,同时返回该值。

create or replace function get_sn(int2, text) returns int as $$  
declare  
  res int;  
begin    
  -- 乐观查询  
  select sn into res from tbl_dict where gid=$1 and ts=$2;   
  if found then   
    return res;   
  end if;  
  -- 如果没有查到,则插入  
  insert into tbl_dict values($1,$2,nextval('public.seq_tbl_dict'::regclass)) on conflict (gid,ts) do nothing returning sn into res;  
  if found then  
    return res;  
  -- 如果插入冲突,则继续查询返回sn  
    select sn into res from tbl_dict where gid=$1 and ts=$2;  
    return res;  
  end if;  
$$ language plpgsql strict;  
批量操作用法
select ins1(gid, ts) from (values (),(),.....()) as t(gid, ts);    
select ins2(ts) from (values (),(),.....()) as t(ts);    

例子与性能,分配100条文本的ID约2毫秒

select ins1(id, 'test'||id) from generate_series(1,100) t(id);    
...........    
(100 rows)    
Time: 1.979 ms    
写操作压测

1、包括组ID

vi test1.sql    
\set gid random(1,10)    
\set ts random(1,100000000)    
select ins1(:gid, md5(:ts::text));    
pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 56 -j 56 -T 120    
transaction type: ./test1.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 56    
number of threads: 56    
duration: 120 s    
number of transactions actually processed: 18082960    
latency average = 0.232 ms    
latency stddev = 0.517 ms    
tps = 150680.114138 (including connections establishing)    
tps = 150687.227354 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set gid random(1,10)    
         0.000  \set ts random(1,100000000)    
         0.230  select ins1(:gid, md5(:ts::text));    

2、不包括组ID

vi test2.sql    
\set ts random(1,100000000)    
select ins2(md5(:ts::text));    
pgbench -M prepared -n -r -P 1 -f ./test2.sql -c 56 -j 56 -T 120    
transaction type: ./test2.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 56    
number of threads: 56    
duration: 120 s    
number of transactions actually processed: 11515008    
latency average = 0.584 ms    
latency stddev = 0.766 ms    
tps = 95613.170828 (including connections establishing)    
tps = 95618.249995 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set ts random(1,100000000)    
         0.582  select ins2(md5(:ts::text));    

3、包括组ID,且全局唯一

vi test3.sql    
\set gid random(1,10)    
\set ts random(1,100000000)    
select get_sn(:gid, md5(:ts::text));    
pgbench -M prepared -n -r -P 1 -f ./test3.sql -c 56 -j 56 -T 120    
transaction type: ./test3.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 56  
number of threads: 56  
duration: 120 s  
number of transactions actually processed: 7665708  
latency average = 0.877 ms  
latency stddev = 0.666 ms  
tps = 63868.058538 (including connections establishing)  
tps = 63875.166407 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set gid random(1,10)    
         0.000  \set ts random(1,100000000)    
         0.875  select get_sn(:gid, md5(:ts::text));  
postgres=# select * from tbl_dict  limit 10;
 gid |                ts                |     sn      
-----+----------------------------------+-------------
   9 | 8021bdb598f73577a063b50bdf0cef31 | -2147483648
   3 | e1988c3c7a80dcd1b1c1bdcf2ac31fe7 | -2147483646
   7 | 6ee09b73df8ae9bb97a4ebd4c51bd212 | -2147483647
   1 | fa8303da6ea2b6e995a1e090fb9cd9f2 | -2147483645
   7 | ca1c614104f1ad3af92d8d9a2911a5b6 | -2147483643
   8 | 4641dd1162f46e8be5f643facc85df94 | -2147483644
   6 | 88250e10f0d27cdebbf5c5eb4a7032a3 | -2147483641
   2 | 5718da726fd20d8fd12d56e9bf2d7e9e | -2147483642
   1 | 687e553016fe6bd1dba3ca6126b8b5b8 | -2147483639
  10 | a4707645d604dd1ad9ba96ff303cf9d9 | -2147483638
(10 rows)

空洞来源,序列不可逆转的使用。即使事务失败,耗费掉的序列值也不可能被返回。

实测符合要求。

postgres=# select gid,count(*),min(sn),max(sn),((max(sn)+1)/count(*)::float8-1)*100||' %' from tbl1 group by gid;    
 gid |  count  | min |   max   | ?column?     
-----+---------+-----+---------+----------    
   1 | 1790599 |   0 | 1790598 | 0 %    
   2 | 1793384 |   0 | 1793383 | 0 %    
   3 | 1791533 |   0 | 1791532 | 0 %    
   4 | 1792755 |   0 | 1792754 | 0 %    
   5 | 1793897 |   0 | 1793896 | 0 %    
   6 | 1794786 |   0 | 1794785 | 0 %    
   7 | 1792282 |   0 | 1792281 | 0 %    
   8 | 1790630 |   0 | 1790629 | 0 %    
   9 | 1791303 |   0 | 1791302 | 0 %    
  10 | 1790307 |   0 | 1790306 | 0 %    
(10 rows)    
postgres=# select count(*),min(sn),max(sn),((max(sn)+1)/count(*)::float8-1)*100||' %' from tbl2;    
  count   | min |   max    |        ?column?            
----------+-----+----------+------------------------    
 10877124 |   0 | 10877128 | 4.59680334685686e-05 %    
(1 row)    
读操作压测

只要写满了,就只是返回SN,所以只要略微修改一下压测脚本

vi test1.sql    
\set gid random(1,10)    
\set ts random(1,10000)    
select ins1(:gid, md5(:ts::text));    
pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 56 -j 56 -T 120    
transaction type: ./test1.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 56    
number of threads: 56    
duration: 120 s    
number of transactions actually processed: 69229025    
latency average = 0.097 ms    
latency stddev = 0.040 ms    
tps = 574906.288558 (including connections establishing)    
tps = 575063.117108 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.002  \set gid random(1,10)    
         0.001  \set ts random(1,10000)    
         0.098  select ins1(:gid, md5(:ts::text));    

或者你可以换成SELECT

vi test3.sql    
\set gid random(1,10)    
\set ts random(1,10000)    
select * from tbl1 where gid=:gid and ts=md5(:ts::text);    
pgbench -M prepared -n -r -P 1 -f ./test3.sql -c 56 -j 56 -T 120    
transaction type: ./test3.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 56    
number of threads: 56    
duration: 120 s    
number of transactions actually processed: 90985807    
latency average = 0.074 ms    
latency stddev = 0.009 ms    
tps = 758067.503368 (including connections establishing)    
tps = 758109.672642 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set gid random(1,10)    
         0.001  \set ts random(1,10000)    
         0.074  select * from tbl1 where gid=:gid and ts=md5(:ts::text);    

数据库本身没有做任何优化,同时使用了ECS虚拟机环境,还有一定的性能提升空间。或者可以按GID拆成多个库,实现100万QPS不是问题。

使用PostgreSQL的UDF,序列等功能,可以实现本文开头要求的“全局ID分配服务”的设计。

同时本例用到的PG实例为ECS虚拟机实例,读性能相比物理机要差一倍左右,单机实现100万的读,在物理机下面是没有问题的。

如果考虑将来的扩展性,可以将GID分配到不同的实例上,实现横向扩展,做到单个PG实例100万,多个实例100万*N的读TPS。

《PostgreSQL 单机3.9万亿/天(计数器、序列、自增)》

《PostgreSQL 无缝自增ID的实现 - by advisory lock》

《PostgreSQL sharding有序UUID最佳实践 - serial global uuid stored in 64bit int8》

《PostgreSQL 优化CASE - 无序UUID性能问题诊断》

《PostgreSQL 高并发任务分配系统 实践》

PolarDB 开源版 使用PostGIS 以及泰森多边形 解决 "零售、配送、综合体、教培、连锁店等经营"|"通信行业基站建设功率和指向" 的地理最优解问题 [视频]云原生数据仓库 AnalyticDB PostgreSQL 版解析与实践(上)|学习笔记(四)
快速学习[视频]云原生数据仓库 AnalyticDB PostgreSQL 版解析与实践(上)
serverless 入门与实践 | 动手实践1: 基于Serverless 数据库 RDS 和函数计算一键部署相册应用
serverless 入门与实践 | 动手实践1: 基于Serverless 数据库 RDS 和函数计算一键部署相册应用
postgresql 标签分组实战(可用于用户画像的实践)-数组篇
基于数组方式方面的基础应用,如有更大数据量的标签组合的时候,请参考下德哥写的文章 https://developer.aliyun.com/article/307731