postgresql提供了copy函数,方便批量导入数据。
注意:sql字符串拼接时 表名 专用函数,在字符串两边添加 " 双引号 ,否则 英文中文混合表名出错
def str_tablenm(text,is_return_null=False):
sql字符串拼接时 表名 专用函数,在字符串两边添加 " 双引号 ,否则 英文中文混合表名出错
if not text is None and text!='':
return '"'+str(text)+'"'
elif not is_return_null:
return '""'
else:
return 'null'
复制代码
copy_from的参数说明:copy_from(file, table, sep='\t', null='\N', size=8192, columns=None)#colunms为元组数据
python写法如下:
#encoding=utf-8
import psycopg2
import StringIO
if __name__=='__main__':
s = ""
for i in range(0,1000000):
s += str(i)+"\taaa\t13434\t1\t2013-01-11\t1\t2\n"
conn = psycopg2.connect(host='localhost',user="test",password="********",database="test")
cur = conn.cursor()
cur.copy_from(StringIO.StringIO(s),'tb_user',columns=('id','userame','passwd','roleid','lasttime','failnum','info'))
conn.commit()
cur.close()
conn.close()
print('done')
复制代码
#原文链接: blog.csdn.net/rongyongfei… #=================================
#1.查询数据库中数据表是否存在,不存在则创建
bottle上传excel并导入数据库
#==============================上传excel并导入数据库
@post('/api/xlsfiles/')
@exception_handling
def excel_upload():
data={"state":0,"data":[],"msg":'','rows_count':0}
upload = request.files.getall('upfile')
for meta in upload:
buf = meta.file.read()
print ('image:',str(buf))
excel_obj=request.files.getall('upfile')
#excel_obj=request.files.get('upfile')
try:
# 组合成服务器端存储绝对路径
#先保存再读取,注意后面的os.remove(file_path)
file_path=upfile_path + '/'+ datetime_helper.to_number('%Y%m%d')+excel_obj[0].filename
print(file_path)
# 保存文件
excel_obj[0].save(file_path, overwrite=True)
#ecding=getFileChar(excel_obj[0].file)
excel_data=pd.read_excel(file_path)#,encoding=ecding
#ecding=getFileChar(excel_obj[0].file)
excel_data=pd.read_excel(excel_obj[0].file)#不保存直接读取,此方式不能使用getFileChar,encoding=ecding
except Exception as e:
print('excel_upload',str(e))
data["msg"]="读文件出错"
#return JsonResponse(data)
return json.dumps(data, cls=json_helper.CJsonEncoder)
#excel_data["data_time"]=excel_data["data_time"].map(lambda x: str(x).split(" ")[0])
col_n = ['号码','项目编号']
pds_tbl = pd.DataFrame(excel_data,columns = col_n)
#print(pds_tbl)
#mysql_engine =create_engine('mysql+pymysql://root:jjjinl@localhost:3306/administrationsystem')
try:
_phcodes = base_logic_codes.Codes_Logic()
_phcodes.execute('truncate table tmp_tbl_phonecodes')
#excel_data.tosql('score',con=mysql_engine,if_exists='append',index=false)
columns=('telcode','projectid')
write_to_table(pds_tbl,'tmp_tbl_phonecodes',columns,'append')
# 实例化phcodes表操作类CurrFlowLogic
strsql='select f_add_codes()'#执行存储过程,防止重复导入
s=_phcodes.execute(strsql)
data["msg"]="上传保存成功"
data["state"]=1
data['rows_count']=s[0].get('f_add_codes')
except Exception as e:
print('pd2pgsql',str(e))
data["msg"]="上传保存失败"
#return JsonResponse(data)
#os.remove(file_path)
return json.dumps(data, cls=json_helper.CJsonEncoder)
表不存在可以直接创建 sqlalchemy连接
def write_to_table(df, table_name,cols_name, if_exists='append'):#表不存在可以直接创建 sqlalchemy连接
pwd='qais@123'
conn_str="postgresql://qais:%s@localhost/db_qais"%urllib.parse.quote_plus(pwd)#?charset=utf8
dbcfg=db_config.DB
db_host=dbcfg.get('host')
db_database=dbcfg.get('database')
db_uname=dbcfg.get('user')
db_upwd=dbcfg.get('password')
conn_str="postgresql://%s:%s@%s/%s"%(db_uname,urllib.parse.quote_plus(db_upwd),db_host,db_database)#?charset=utf8
db_engine = create_engine(conn_str,encoding='utf-8',echo=False)#
string_data_io = io.StringIO()
df.to_csv(string_data_io, sep='|', index=False, header=False)#header行 不当作内容写入,否则需启用readline()存入数据库
pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine)
table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
index=False, if_exists=if_exists,schema = 'public')#goods_code
table.create()
string_data_io.seek(0)
#string_data_io.readline() # remove header 如上面header=True,则启用本行
with db_engine.connect() as connection:
with connection.connection.cursor() as cursor:
#copy_cmd = "COPY public.%s (%s) FROM STDIN HEADER DELIMITER '|' CSV" %(table_name,','.join(k for k in cols_name))#goods_code
#cursor.copy_expert(copy_cmd, string_data_io)
#或者使用如下一行
cursor.copy_from(string_data_io, table_name, null='', sep='|',columns=cols_name)
connection.connection.commit()
def getFileChar(filename):
f=open(filename,'rb')
data=f.read(200)
f.close()
#print(chardet.detect(data))
#print(chardet.detect(data).get('encoding'))