原因
之前写的一个项目,由于数据都是json,就直接使用了MongoDB进行数据存储,但是由于读写较为频繁,MongoDB较低的性能就不符合需求了,遂有了迁移的念头。
迁移的是一个扁平结构的collection,选择PostgreSQL是因为有jsonb的支持,部分字段我还是有json需求的。
前期准备
将需要的字段都整理出来,然后在PostgreSQL里建好库。
实施过程
使用mongodbexport导出数据为csv
mongoexport -d wzry -c collection_name --type csv --fields "field1,field2" --out out.csv
在PostgreSQL命令行导入csv
COPY table_name
FROM '/out.csv'
DELIMITER ','
CSV HEADER;
这里要注意一个坑,尽量不要使用图形化管理软件,navicat我显示不出创建好的表,datagrip有个
从文件导入
的功能,理论上应该是可以使用,但是不知道为什么不识别jsonb,说转换失败,但实际上在命令行可以轻松导入。
这里还会对数据进行检查,比如应该是bigint,但是表结构只有int,会报错。
到这数据导入就结束了,非常轻松,300万条数据也很快。
代码修改
python代码改了我一天时间。。。golang还没改。。。这就只说python了
首先自然是driver库的选择,最正统的库自然是psycopg2,要orm的话SQLAlchemy也是可以。但是我需要一个异步的库,就看到了asyncpg
这张图还是蛮有吸引力的
不过这就只能手写sql了
其实查询都好说,真正的难点出现在如何实现MongoDB里的Upsert功能,也就是PostgreSQL如何实现insert or update,实际上这个关键词在google能搜索到一大堆,官方文档也有说明,就是利用
ON CONFLICT
。
但是!有如下还需解决:
- 如何实现像MongoDB这样轻松的upsert任意数量字段
- 如何最高性能批量提交
- 如何获取inserted count and modified count
解决问题
---写到这突然懒了
任意字段
编写函数,for循环很方便
高性能批量提交
可以参考 stackoverflow
asyncpg提供了executemany,但是目测是跟psycopg2的一个样,只是多条语句多次提交,性能堪忧,所以需要自行优化,将多条语句合并成一条。
values (record1....),(record2....),(record3....)
并且的话为了要输入数据,就是构建安全的语句,我们不能像
conn.execute(sql, value1, value2)
这样子做,于是找了找半天终于找到psycopg2的cursor.mogrify可以构建语句而不执行。不过因为是cursor游标类的函数,需要初始化,也就是需要一个连接(实际上用不到),怎么避免,其实可以自己hack一下,也挺有意思的。
inserted count and modified count
可以参考 stackoverflow
然后结合以上几点配合大量循环就可以编写一个通用的函数完成upsert语句的构建
这是我写的一个没那么通用的函数,部分写死了,有一些内容没贴出来,所以直接copy没法用,但是如果有编程能力的应该都能够看得懂自己修改了,欢迎留言
def test(data):
fields = tuple(i[0].data.keys())
_sql = f'''WITH t AS (INSERT INTO public.collection_name ({'"' + '","'.join(fields) + '"'})
VALUES %s
ON CONFLICT ("Id") DO UPDATE SET {','.join(['"' + _x + '" = EXCLUDED."' + _x + '"' for _x in fields if _x != 'Id'])} RETURNING xmax)
SELECT COUNT(*) AS rows,
SUM(CASE WHEN xmax = 0 THEN 1 ELSE 0 END) AS inserted,
SUM(CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END) AS updated
FROM t;''' % data_many(data, keys=fields)
res = await conn.fetchrow(_sql)
result = BulkResult(result.rows + (res['rows'] or 0),
result.inserted + (res['inserted'] or 0),
result.updated + (res['updated'] or 0))
def data_many(ops, keys=None, jsonb=False):
return ','.join(tuple(data_format(x, keys=keys, jsonb=jsonb) for x in ops))
def data_format(args, keys, jsonb=False):
if keys:
args = tuple(args.data[x] for x in keys)
return __cur.mogrify(f"({','.join(tuple('%s' if not jsonb or not isinstance(args[_], Json) else '%s::jsonb' for _ in range(len(args))))})", args).decode('utf-8')
except psycopg2.ProgrammingError as e:
print(args)
raise e
批量update的问题
可以参考 stackoverflow
批量update会面临一个问题就是因为需要构建这么一个SQL
update test as t set