postgresql提供了copy函数,方便批量导入数据。

注意:sql字符串拼接时 表名 专用函数,在字符串两边添加 " 双引号 ,否则 英文中文混合表名出错

def str_tablenm(text,is_return_null=False):
    sql字符串拼接时 表名 专用函数,在字符串两边添加 " 双引号 ,否则 英文中文混合表名出错
    if not text is None and text!='':
        return '"'+str(text)+'"'
    elif not is_return_null:
        return '""'
    else:
        return 'null'
复制代码

copy_from的参数说明:copy_from(file, table, sep='\t', null='\N', size=8192, columns=None)#colunms为元组数据

python写法如下:

#encoding=utf-8
import psycopg2
import StringIO
if __name__=='__main__':
    s = ""
    for i in range(0,1000000):
        s += str(i)+"\taaa\t13434\t1\t2013-01-11\t1\t2\n"
    conn = psycopg2.connect(host='localhost',user="test",password="********",database="test")
    cur = conn.cursor()
    cur.copy_from(StringIO.StringIO(s),'tb_user',columns=('id','userame','passwd','roleid','lasttime','failnum','info'))
    conn.commit()
    cur.close()
    conn.close()
    print('done')
复制代码

#原文链接: blog.csdn.net/rongyongfei… #=================================

#1.查询数据库中数据表是否存在,不存在则创建

bottle上传excel并导入数据库

#==============================上传excel并导入数据库
@post('/api/xlsfiles/')
@exception_handling
def excel_upload():
    data={"state":0,"data":[],"msg":'','rows_count':0}
    upload = request.files.getall('upfile')
    for meta in upload:
        buf = meta.file.read()
        print ('image:',str(buf))
    excel_obj=request.files.getall('upfile')
    #excel_obj=request.files.get('upfile')
    try:
        # 组合成服务器端存储绝对路径
        #先保存再读取,注意后面的os.remove(file_path)
        file_path=upfile_path + '/'+ datetime_helper.to_number('%Y%m%d')+excel_obj[0].filename
        print(file_path)
        # 保存文件
        excel_obj[0].save(file_path, overwrite=True)
        #ecding=getFileChar(excel_obj[0].file)
        excel_data=pd.read_excel(file_path)#,encoding=ecding
        #ecding=getFileChar(excel_obj[0].file)
        excel_data=pd.read_excel(excel_obj[0].file)#不保存直接读取,此方式不能使用getFileChar,encoding=ecding
    except Exception as e:
        print('excel_upload',str(e))
        data["msg"]="读文件出错"
        #return JsonResponse(data)
        return json.dumps(data, cls=json_helper.CJsonEncoder)
    #excel_data["data_time"]=excel_data["data_time"].map(lambda x: str(x).split(" ")[0])
    col_n = ['号码','项目编号']
    pds_tbl = pd.DataFrame(excel_data,columns = col_n)
    #print(pds_tbl)
    #mysql_engine =create_engine('mysql+pymysql://root:jjjinl@localhost:3306/administrationsystem')
    try:
        _phcodes = base_logic_codes.Codes_Logic()
        _phcodes.execute('truncate table tmp_tbl_phonecodes')
        #excel_data.tosql('score',con=mysql_engine,if_exists='append',index=false)
        columns=('telcode','projectid')
        write_to_table(pds_tbl,'tmp_tbl_phonecodes',columns,'append')
        # 实例化phcodes表操作类CurrFlowLogic
        strsql='select f_add_codes()'#执行存储过程,防止重复导入
        s=_phcodes.execute(strsql)
        data["msg"]="上传保存成功"
        data["state"]=1
        data['rows_count']=s[0].get('f_add_codes')
    except Exception as e:
        print('pd2pgsql',str(e))
        data["msg"]="上传保存失败"
    #return JsonResponse(data)
    #os.remove(file_path)
    return json.dumps(data, cls=json_helper.CJsonEncoder)
  • 表不存在可以直接创建 sqlalchemy连接
  • def write_to_table(df, table_name,cols_name, if_exists='append'):#表不存在可以直接创建 sqlalchemy连接
        pwd='qais@123'
        conn_str="postgresql://qais:%s@localhost/db_qais"%urllib.parse.quote_plus(pwd)#?charset=utf8
        dbcfg=db_config.DB
        db_host=dbcfg.get('host')
        db_database=dbcfg.get('database')
        db_uname=dbcfg.get('user')
        db_upwd=dbcfg.get('password')
        conn_str="postgresql://%s:%s@%s/%s"%(db_uname,urllib.parse.quote_plus(db_upwd),db_host,db_database)#?charset=utf8
        db_engine = create_engine(conn_str,encoding='utf-8',echo=False)#
        string_data_io = io.StringIO()
        df.to_csv(string_data_io, sep='|', index=False, header=False)#header行 不当作内容写入,否则需启用readline()存入数据库
        pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine)
        table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
                                   index=False, if_exists=if_exists,schema = 'public')#goods_code
        table.create()
        string_data_io.seek(0)
        #string_data_io.readline()  # remove header 如上面header=True,则启用本行
        with db_engine.connect() as connection:
            with connection.connection.cursor() as cursor:
                #copy_cmd = "COPY public.%s (%s) FROM STDIN HEADER DELIMITER '|' CSV" %(table_name,','.join(k for k in cols_name))#goods_code
                #cursor.copy_expert(copy_cmd, string_data_io)
                #或者使用如下一行
                cursor.copy_from(string_data_io, table_name, null='', sep='|',columns=cols_name)
            connection.connection.commit()
    def getFileChar(filename):
        f=open(filename,'rb')
        data=f.read(200)
        f.close()
        #print(chardet.detect(data))
        #print(chardet.detect(data).get('encoding'))