相关文章推荐
酒量大的充电器  ·  Controller action ...·  3 周前    · 
酒量大的充电器  ·  如何将 LINQ ...·  2 月前    · 
酒量大的充电器  ·  标注属性面板·  5 月前    · 
酒量大的充电器  ·  XML 与 HTML 对比 - ...·  8 月前    · 
酒量大的充电器  ·  Problem in login with ...·  9 月前    · 
威武的香瓜  ·  [转]Entity Framework ...·  2 小时前    · 

从MongoDB迁移到PostgreSQL

in DB with 0 comment

原因

之前写的一个项目,由于数据都是json,就直接使用了MongoDB进行数据存储,但是由于读写较为频繁,MongoDB较低的性能就不符合需求了,遂有了迁移的念头。

迁移的是一个扁平结构的collection,选择PostgreSQL是因为有jsonb的支持,部分字段我还是有json需求的。

前期准备

将需要的字段都整理出来,然后在PostgreSQL里建好库。

实施过程

使用mongodbexport导出数据为csv

mongoexport -d wzry -c collection_name --type csv --fields "field1,field2"  --out out.csv

在PostgreSQL命令行导入csv

COPY table_name 
FROM '/out.csv'
DELIMITER ','
CSV HEADER;

这里要注意一个坑,尽量不要使用图形化管理软件,navicat我显示不出创建好的表,datagrip有个 从文件导入 的功能,理论上应该是可以使用,但是不知道为什么不识别jsonb,说转换失败,但实际上在命令行可以轻松导入。

这里还会对数据进行检查,比如应该是bigint,但是表结构只有int,会报错。

到这数据导入就结束了,非常轻松,300万条数据也很快。

代码修改

python代码改了我一天时间。。。golang还没改。。。这就只说python了

首先自然是driver库的选择,最正统的库自然是psycopg2,要orm的话SQLAlchemy也是可以。但是我需要一个异步的库,就看到了asyncpg

性能对比
这张图还是蛮有吸引力的

不过这就只能手写sql了

其实查询都好说,真正的难点出现在如何实现MongoDB里的Upsert功能,也就是PostgreSQL如何实现insert or update,实际上这个关键词在google能搜索到一大堆,官方文档也有说明,就是利用 ON CONFLICT

但是!有如下还需解决:

  • 如何实现像MongoDB这样轻松的upsert任意数量字段
  • 如何最高性能批量提交
  • 如何获取inserted count and modified count

解决问题

---写到这突然懒了

任意字段

编写函数,for循环很方便

高性能批量提交

可以参考 stackoverflow

asyncpg提供了executemany,但是目测是跟psycopg2的一个样,只是多条语句多次提交,性能堪忧,所以需要自行优化,将多条语句合并成一条。 values (record1....),(record2....),(record3....)

并且的话为了要输入数据,就是构建安全的语句,我们不能像 conn.execute(sql, value1, value2) 这样子做,于是找了找半天终于找到psycopg2的cursor.mogrify可以构建语句而不执行。不过因为是cursor游标类的函数,需要初始化,也就是需要一个连接(实际上用不到),怎么避免,其实可以自己hack一下,也挺有意思的。

inserted count and modified count

可以参考 stackoverflow

然后结合以上几点配合大量循环就可以编写一个通用的函数完成upsert语句的构建

这是我写的一个没那么通用的函数,部分写死了,有一些内容没贴出来,所以直接copy没法用,但是如果有编程能力的应该都能够看得懂自己修改了,欢迎留言

def test(data):
    fields = tuple(i[0].data.keys())
    _sql = f'''WITH t AS  (INSERT INTO public.collection_name ({'"' + '","'.join(fields) + '"'})
        VALUES %s
        ON CONFLICT ("Id") DO UPDATE SET {','.join(['"' + _x + '" = EXCLUDED."' + _x + '"' for _x in fields if _x != 'Id'])} RETURNING xmax)
        SELECT COUNT(*) AS rows,
        SUM(CASE WHEN xmax = 0 THEN 1 ELSE 0 END) AS inserted,
        SUM(CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END) AS updated
        FROM t;''' % data_many(data, keys=fields)
    res = await conn.fetchrow(_sql)
    result = BulkResult(result.rows + (res['rows'] or 0),
        result.inserted + (res['inserted'] or 0),
        result.updated + (res['updated'] or 0))
def data_many(ops, keys=None, jsonb=False):
    return ','.join(tuple(data_format(x, keys=keys, jsonb=jsonb) for x in ops))
def data_format(args, keys, jsonb=False):
    if keys:
        args = tuple(args.data[x] for x in keys)
        return __cur.mogrify(f"({','.join(tuple('%s' if not jsonb or not isinstance(args[_], Json) else '%s::jsonb' for _ in range(len(args))))})", args).decode('utf-8')
    except psycopg2.ProgrammingError as e:
        print(args)
        raise e

批量update的问题

可以参考 stackoverflow

批量update会面临一个问题就是因为需要构建这么一个SQL

update test as t set
 
推荐文章