Python 插入百万数据的时间优化与 OOM 问题的解决
原创一.背景
我们小组需要从 IT 部门同步客户信息和机构信息到本地,这两部分数据大概各 400W,总共 800W 的数据量。IT 部门提供两个存储过程用于分别获取这两部分数据,因此在使用 Python 处理数据时,只能调用存储过程将两部分数据分别一次性全部读入内存再处理。每个存储过程从 IT 部门的数据库获取数据大概需要 20min 的时间,总共 40min 的样子。
二 . 问题
之前一位已经离开公司的同事已经实现了一个版本的该需求的功能,思路是一个 Python 类的两个方法分别处理客户信息和机构信息,使用 MySQLdb 模块拼接一条 sql 语句便插入一条数据。在实际执行过程中的现象是程序大概执行了 4 个小时,然后进程被 Linux 的 OOM killer( Out_Of_Memory killer ) 机制给杀掉了。下图是截取自 Linux 的系统日志,可以看到 anon-rss 系统的物理内存是 9G 多些,然而进程使用的 total-vm 虚拟内存已经到达了 11G+。
这样就有两个问题需要优化处理:
- 处理时间过长,扣除固定的获取数据的 40min,竟然 3 个多小时都没能将 800w 的数据入本地 mysql。
- 内存溢出,内存使用过大被 Linux 内核杀死导致入库的数据不完整。
三. 定位与解决
1. 时间优化
通过打印处理时间,可以很容易发现处理时间主要都消耗在数据入本地 mysql 上。原始的入库逻辑是拼接一条 insert 语句,然后执行一次入库动作,这样的效率肯定不高。提升数据库入库的效率的一条原则就是以“insert table(XX) values(XX),(XX)…”来代替多次插入单条数据。因此对代码进行改造,简单粗暴地将读入的 400w 数据拼接成一条“insert table(XX) values(XX),(XX)…”的 sql 语句。但是执行的结果就是“Got a packet bigger than 'max_allowed_packet' bytes”,原来 mysql 对输入 sql 语句长度有限制,最大能够多长就是有“max_allowed_packet”这个参数决定的。
数据库才允许 16M 的输入,而 400w 的数据将就 900M 的长度,肯定处理不了。另外查资料发现“max_allowed_packet”不支持热修改,也就是或者修改配置文件然后重启 db,或者执行“set global max_allowed_packet=XX”语句,但是要重新登录。这样对于 python 脚本的执行都不太友好,所以想到的办法是把 400w 的数据进行拆分,组装成小于 16M 的 insert 语句执行。思路:使用迭代器对查询的结果集进行分割处理,返回“(XXX),(XXX)”形式的 insert 语句后半部分,以便拼接 sql 字符串。
class NextValues(object):
def __init__(self, properties, interval, source_list):
# 查询结果集
self.source_list = source_list
# 结果集长度
self.max = len(source_list)
# 分割间距
self.interval = interval
self.step = self.max / self.interval + 1
# 需要进行插入字段名称列表
self.properties = properties
self.index = 0
def __iter__(self):
return self
def next(self):
values = bytearray('')
if (self.index <= self.step):
for j in range(0, self.interval):
flag = self.index * self.interval + j
# 超过最大范围则返回
if (flag >= len(self.source_list)):
break
item = self.source_list[flag]
value = bytearray(",(%s)" % (','.join(["'%s'"] * len(self.properties))))
# 替换实际的数据
for proper in self.properties:
value = value.replace("%s", str(item.get(proper, '')).replace("'", r"\'"), 1)
values += value
self.index += 1
# 去除开头多余的逗号
if values:
return values[1:]
raise StopIteration
使用方法:
# 需要插入的字段名称列表
properties = ['CUSTOMER_ID', 'PARTY_NUMBER', 'ORG_ID', 'CUSTOMER_NAME', 'ACCOUNT_NUMBER', 'NAME', 'ADDRESS_ID','ADDRESS1', 'REGION_CODE', 'REGION_NAME']
# 初始化迭代器用于获取 value 值
next_value = NextValues(properties, 200000, result_set.result_info)
for values in next_value:
if not values:
break