最近一项工作需要读取数据库中1500万条数据,考虑到数据量太大,不方便直接一次性读取,不然会内存爆炸。想到用pandas.read_sql_query()里有一个chunksize可以分批返回chunksize个数据,于是用pandas试了下,代码如下:

import pandas as pd
from sqlalchemy import create_engine
import psycopg2
import json
class DB_connection(object):
    def __init__(self):
        with open('config_db.json', 'r') as load_f:
            db_config = json.load(load_f)
        self.db_engine = create_engine(''.join(['postgresql+psycopg2://', db_config['USER'], ':', db_config['PASSWORD'], '@', db_config['HOST'], ':', str(db_config['PORT']), '/', db_config['DATABASE']]))
        self.db_conn = self.db_engine.connect()
        self.database = db_config['DATABASE']
    def read_from_table(self):
        data_gen = pd.read_sql_query(
            'SELECT case_id, text FROM first_case',
            self.db_conn, chunksize=2000
        return data_gen

因为pandas.read_sql_query()加上chunksize后返回的是一个iterator。但运行程序时一直卡在那不动,看pandas.read_sql_query()源码才知道它不是真正的分批次读取,而是根据SQL语句全部读取出来后,再把它按chunksize个一批一批地转为iterator然后再返回。

    def read_query(self, sql, index_col=None, coerce_float=True,
                   parse_dates=None, params=None, chunksize=None):
        """Read SQL query into a DataFrame.
        Parameters
        ----------
        sql : string
            SQL query to be executed.
        index_col : string, optional, default: None
            Column name to use as index for the returned DataFrame object.
        coerce_float : boolean, default True
            Attempt to convert values of non-string, non-numeric objects (like
            decimal.Decimal) to floating point, useful for SQL result sets.
        params : list, tuple or dict, optional, default: None
            List of parameters to pass to execute method.  The syntax used
            to pass parameters is database driver dependent. Check your
            database driver documentation for which of the five syntax styles,
            described in PEP 249's paramstyle, is supported.
            Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
        parse_dates : list or dict, default: None
            - List of column names to parse as dates.
            - Dict of ``{column_name: format string}`` where format string is
              strftime compatible in case of parsing string times, or is one of
              (D, s, ns, ms, us) in case of parsing integer timestamps.
            - Dict of ``{column_name: arg dict}``, where the arg dict
              corresponds to the keyword arguments of
              :func:`pandas.to_datetime` Especially useful with databases
              without native Datetime support, such as SQLite.
        chunksize : int, default None
            If specified, return an iterator where `chunksize` is the number
            of rows to include in each chunk.
        Returns
        -------
        DataFrame
        See also
        --------
        read_sql_table : Read SQL database table into a DataFrame
        read_sql
        args = _convert_params(sql, params)
        result = self.execute(*args)
        columns = result.keys()
        if chunksize is not None:
            return self._query_iterator(result, chunksize, columns,
                                        index_col=index_col,
                                        coerce_float=coerce_float,
                                        parse_dates=parse_dates)
        else:
            data = result.fetchall()
            frame = _wrap_result(data, columns, index_col=index_col,
                                 coerce_float=coerce_float,
                                 parse_dates=parse_dates)
            return frame

上面源码可以看到,它先用execute执行sql语句,然后在判断是否有chunksize,没有就直接返回所有数据,有的话根据chunksize返回一个iterator。所以这不是一个真正的分批次读取,如果数据量大,还是会导致内存爆炸直至卡死。

所以没必要去用pandas.read_sql_query()。所有包装好的方法其实都是在执行SQL语句的基础上加了一些小技巧,这些不一定对你有用。最好方法是直接执行SQL语句,因为SQL语句是最灵活的,也是最直接的,比如我们要分批读取就可以在SQL语句中使用limit和offset。

# conn为数据库链接
cursor = conn.cursor()
query = 'SELECT case_id, text FROM first_case order by case_id limit 1000 offset 0'
cursor.execute(query)

limit a offset b,表示跳过b个数据,读取出a个数据,这样可以固定a, 更新b就可实现一批一批地读取到所有数据。