参考:利用pandas的to_sql将数据插入MySQL数据库和所踩过的坑
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://{}:{}@{}/{}?charset={}".format('用户名', '登录密码', '127.0.0.1:3306', '数据库名','字符编码'))
con = engine.connect()#创建连接
df.to_sql(name='rumousdata', con=con, if_exists='append', index=False)
文章:python使用dbutils的PooledDB连接池,操作数据库
使用优势:
- 1、使用dbutils的PooledDB连接池,操作数据库。
这样就不需要每次执行sql后都关闭数据库连接,频繁的创建连接,消耗时间 - 2、如果是使用一个连接一直不关闭,多线程下,插入超长字符串到数据库,运行一段时间后很容易出现OperationalError: (2006, ‘MySQL server has gone away’)这个错误。
使用PooledDB解决。
self.__class__.__pool = PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, setsession, reset,
host=host, port=port, db=db,
user=user, passwd=passwd,
charset=charset,
cursorclass=pymysql.cursors.DictCursor
python3.6 使用 pymysql 连接 Mysql 数据库及 简单的增删改查操作
import pymysql #导入 pymysql
#打开数据库连接
db= pymysql.connect(host="localhost",user="root",
password="123456",db="test",port=3307)
# 使用cursor()方法获取操作游标
cur = db.cursor()
#1.查询操作
# 编写sql 查询语句 user 对应我的表名
sql = "select * from user"
cur.execute(sql) #执行sql语句
results = cur.fetchall() #获取查询的所有记录
print("id","name","password")
#遍历结果
for row in results :
id = row[0]
name = row[1]
password = row[2]
print(id,name,password)
except Exception as e:
raise e
finally:
db.close() #关闭连接
import pymysql
#2.插入操作
db= pymysql.connect(host="localhost",user="root",
password="123456",db="test",port=3307)
# 使用cursor()方法获取操作游标
cur = db.cursor()
sql_insert ="""insert into user(id,username,password) values(4,'liu','1234')"""
cur.execute(sql_insert)
db.commit()
except Exception as e:
#错误回滚
db.rollback()
finally:
db.close()
# 单条插入
coin_cnts_2 = ['46213000',
'14000',
'1952',
'249',
'62000',
'10000',
'48',
'6446',
'121000']
for n,cot in tqdm(enumerate(coin_cnts_2),total = len(coin_cnts_2)):
sql = 'UPDATE douyin_data_1 SET coin_cnt = {} WHERE id = {} '.format(cot,n+1)
mq.execute(sql)
# 批量插入
conn = pymysql.connect(host=mq.host, user=mq.user,password=mq.password,database=mq.database)
cursor = conn.cursor()
sql = 'UPDATE douyin_data_1 SET coin_cnt = %s WHERE id = %s '
data_info = [(cot,n) for n,cot in enumerate(coin_cnts_2)]
%time cursor.executemany(sql, data_info)
插入的速度比较慢
import pymysql
#3.更新操作
db= pymysql.connect(host="localhost",user="root",
password="123456",db="test",port=3307)
# 使用cursor()方法获取操作游标
cur = db.cursor()
sql_update ="update user set username = '%s' where id = %d"
cur.execute(sql_update % ("xiongda",3)) #像sql语句传递参数
db.commit()
except Exception as e:
#错误回滚
db.rollback()
finally:
db.close()
import pymysql
#4.删除操作
db= pymysql.connect(host="localhost",user="root",
password="123456",db="test",port=3307)
# 使用cursor()方法获取操作游标
cur = db.cursor()
sql_delete ="delete from user where id = %d"
cur.execute(sql_delete % (3)) #像sql语句传递参数
db.commit()
except Exception as e:
#错误回滚
db.rollback()
finally:
db.close()
pd.to_sql()知道这些就够用了
DataFrame.to_sql(name, con, schema=None,
if_exists='fail', index=True, index_label=None,
chunksize=None, dtype=None, method=None)
参见pandas.to_sql函数,主要有以下几个参数:
- name: 输出的表名
- con: 与read_sql中相同,数据库链接
- if_exits: 三个模式:fail,若表存在,则不输出;replace:若表存在,覆盖原来表里的数据;append:若表存在,将数据写到原表的后面。默认为fail
- index:是否将df的index单独写到一列中
- index_label:指定列作为df的index输出,此时index为True
- chunksize: 同read_sql
- dtype: 指定列的输出到数据库中的数据类型。字典形式储存:{column_name: sql_dtype}。常见的数据类型有sqlalchemy.types.INTEGER(), sqlalchemy.types.NVARCHAR(),sqlalchemy.Datetime()等,具体数据类型可以参考这里
使用to_sql
的方式写回数据库之中。
import pandas as pd
from sqlalchemy import create_engine
yconnect = create_engine('mysql+pymysql://user:password@111.111.111.111:3306/your_database_name?charset=utf8')
pd.io.sql.to_sql(dataframe_name,'form_name', yconnect, schema='your_database_name', if_exists='append')
将数据写入mysql的数据库,但需要先通过sqlalchemy.create_engine建立连接,且字符编码设置为utf8,否则有些latin字符不能处理
- 第二个参数tablename,form_name,是将导入的数据库中的表名
- 第四个参数your_database_name是将导入的数据库名字
- if_exists='append’的意思是,如果表tablename存在,则将数据添加到这个表的后面
- fail的意思如果表存在,啥也不做
- replace的意思,如果表存在,删了表,再建立一个新表,把数据插入
- append的意思,如果表存在,把数据插入,如果表不存在创建一个表!!
字段字符格式的初始化:
df.to_sql(name='table',
con=con,
if_exists='append',
index=False,
dtype={'col1':sqlalchemy.types.INTEGER(),
'col2':sqlalchemy.types.NVARCHAR(length=255),
'col_time':sqlalchemy.DateTime(),
'col_bool':sqlalchemy.types.Boolean
注
如果不提供dtype,to_sql会自动根据df列的dtype选择默认的数据类型输出,比如字符型会以sqlalchemy.types.TEXT类型输出,相比NVARCHAR,TEXT类型的数据所占的空间更大,所以一般会指定输出为NVARCHAR;
而如果df的列的类型为np.int64时,将会导致无法识别并转换成INTEGER型,需要事先转换成int类型(用map,apply函数可以方便的转换)。
自建格式的一些格式要求:
df.to_sql('emp_backup', engine, if_exists='replace', index=False,
dtype={'EMP_ID': sqlalchemy.types.BigInteger(),
'GENDER': sqlalchemy.types.String(length=20),
'AGE': sqlalchemy.types.BigInteger(),
'EMAIL': sqlalchemy.types.String(length=50),
'PHONE_NR': sqlalchemy.types.String(length=50),
'EDUCATION': sqlalchemy.types.String(length=50),
'MARITAL_STAT': sqlalchemy.types.String(length=50),
'NR_OF_CHILDREN': sqlalchemy.types.BigInteger()
其他包括:
from sqlalchemy import exc
from sqlalchemy import schema as sa_schema
from sqlalchemy import types as sqltypes
from sqlalchemy import util
from sqlalchemy.engine import default
from sqlalchemy.engine import reflection
from sqlalchemy.sql import compiler
from sqlalchemy.sql import text
from sqlalchemy.types import BIGINT
from sqlalchemy.types import BINARY
from sqlalchemy.types import CHAR
from sqlalchemy.types import DATE
from sqlalchemy.types import DATETIME
from sqlalchemy.types import DECIMAL
from sqlalchemy.types import FLOAT
from sqlalchemy.types import INT # noqa
from sqlalchemy.types import INTEGER
from sqlalchemy.types import NCHAR
from sqlalchemy.types import NUMERIC
from sqlalchemy.types import NVARCHAR
from sqlalchemy.types import REAL
from sqlalchemy.types import SMALLINT
from sqlalchemy.types import TEXT
from sqlalchemy.types import TIME
from sqlalchemy.types import TIMESTAMP
from sqlalchemy.types import Unicode
from sqlalchemy.types import VARBINARY
from sqlalchemy.types import VARCHAR
参考:https://blog.csdn.net/ydyang1126/article/details/78222125
# -*- coding: utf-8 -*-
import pandas as pd
import pymysql
conn = pymysql.connect(host='127.0.0.1', \
user='root',password='123456', \
db='TESTDB',charset='utf8', \
use_unicode=True)
sql = 'select GroupName from group limit 20'
df = pd.read_sql(sql, con=conn)
print(df.head())
df.to_csv("data.csv")
conn.close()
如果数据源本身是来自数据库,通过脚本操作是比较方便的。如果数据源是来自 CSV 之类的文本文件,可以手写 SQL 语句或者利用 pandas get_schema() 方法,如下例:
import sqlalchemy
print(pd.io.sql.get_schema(df, 'emp_backup', keys='EMP_ID',
dtype={'EMP_ID': sqlalchemy.types.BigInteger(),
'GENDER': sqlalchemy.types.String(length=20),
'AGE': sqlalchemy.types.BigInteger(),
'EMAIL': sqlalchemy.types.String(length=50),
'PHONE_NR': sqlalchemy.types.String(length=50),
'EDUCATION': sqlalchemy.types.String(length=50),
'MARITAL_STAT': sqlalchemy.types.String(length=50),
'NR_OF_CHILDREN': sqlalchemy.types.BigInteger()
}, con=engine))
get_schema()
并不是一个公开的方法,没有文档可以查看。生成的 SQL 语句如下:
CREATE TABLE emp_backup (
`EMP_ID` BIGINT NOT NULL AUTO_INCREMENT,
`GENDER` VARCHAR(20),
`AGE` BIGINT,
`EMAIL` VARCHAR(50),
`PHONE_NR` VARCHAR(50),
`EDUCATION` VARCHAR(50),
`MARITAL_STAT` VARCHAR(50),
`NR_OF_CHILDREN` BIGINT,
CONSTRAINT emp_pk PRIMARY KEY (`EMP_ID`)
当然,如果数据源本身就是 mysql,当然不用大费周章来创建数据表的结构,直接使用 create table like xxx 就行。以下代码展示了这种用法:
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://user:password@localhost/stonetest?charset=utf8')
df = pd.read_sql('emp_master', engine)
# Copy table structure
with engine.connect() as con:
con.execute('DROP TABLE if exists emp_backup')
con.execute('CREATE TABLE emp_backup LIKE emp_master;')
df.to_sql('emp_backup', engine, index=False, if_exists='append')
update djk_corpus set create_time='2019-09-29 14:09:45'
python的to_sql那点儿事
to_sql结论
- 可以对齐字段(dataframe的columns和数据库字段一一对齐)
- 可以缺少字段(dataframe的columns可以比数据库字段少)
- 不可以多出字段,会报错
-if_exists='append’进行新增(bug:如果设置了PK,ignore 和 replace会报错) - 一定要先创建好数据库,设置好格式,
- 否则使用if_exists='append’自动创建的字段格式乱七八糟
- 不能replace!!!!
to_sql代码
#构建数据库连接
engine=create_engine(f'mysql+pymysql://{user}:{passwd}@{host}:3306/{db}')
#可以对齐字段,以及缺少字段;不可以增加字段
data.to_sql(sql_name,engine,index=False,if_exists='append')
自定义w_sql (迭代后版本)
# 定义写入数据库函数
def w_sql(sql_name,data,db_name,host=host,user=user,passwd=passwd):
zd=""
for j in data.columns:
zd=zd+j+","
connent = pymysql.connect(host=host, user=user, passwd=passwd, db=db_name, charset='utf8mb4') #连接数据库
cursor = connent.cursor()#创建游标
for i in data.values:
va=""
for j in i:
if pd.isnull(j):
va=va+","+'null' #缺失值判断和转换
else:
va=va+","+'"'+str(j)+'"'
# sql=u"""insert ignore into %s (%s) values(%s)"""%(sql_name,zd[:-1],va[1:])
sql=u"""replace into %s (%s) values(%s)"""%(sql_name,zd[:-1],va[1:])
cursor.execute(sql)
connent.commit() #提交事务
cursor.close()#关闭游标
connent.close()#断开连接
sql = "show full columns from douyin_data_1; "
mq.query(sql)
ALTER TABLE `数据集` CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
SELECT column_name FROM information_schema.columns
WHERE table_name='xxxx表名';
某个数据库下有多少张表
import pymysql
# 连接database
user = "xxx"
password = "xxx"
host = '1.1.1.1'
port = 3306
conn = pymysql.connect(host=host, user=user,password=password,database="xxx")
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
database = 'd-lab'
sql = "SELECT table_name FROM information_schema.tables WHERE table_schema='{}' AND table_type='base table';".format(database)
cursor.execute(sql) # 执行sql语句
cursor.fetchall() # 获取查询的所有记录
不同表的列名:
'show columns from {}; '.format(form_name)
import pandas as pd
from sqlalchemy import create_engine
import sqlalchemy
engine = create_engine('mysql+pymysql://user:password@localhost/stonetest?charset=utf8')
df = pd.read_sql('emp_master', engine)
# make sure emp_master_backup table has been created
# so the table schema is what we want
df.to_sql('emp_backup', engine, index=False, if_exists='append')
也可以在 to_sql() 方法中,通过 dtype 参数指定字段的类型,然后在 mysql 中 通过 alter table 命令将字段 EMP_ID 变成 primary key。
df.to_sql('emp_backup', engine, if_exists='replace', index=False,
dtype={'EMP_ID': sqlalchemy.types.BigInteger(),
'GENDER': sqlalchemy.types.String(length=20),
'AGE': sqlalchemy.types.BigInteger(),
'EMAIL': sqlalchemy.types.String(length=50),
'PHONE_NR': sqlalchemy.types.String(length=50),
'EDUCATION': sqlalchemy.types.String(length=50),
'MARITAL_STAT': sqlalchemy.types.String(length=50),
'NR_OF_CHILDREN': sqlalchemy.types.BigInteger()
with engine.connect() as con:
con.execute('ALTER TABLE emp_backup ADD PRIMARY KEY (`EMP_ID`);')
其中包括:
- left join(左联接) 返回包括左表中的所有记录和右表中联结字段相等的记录
- right join(右联接) 返回包括右表中的所有记录和左表中联结字段相等的记录
- inner join(等值连接) 只返回两个表中联结字段相等的行
select * from A
innerjoin B
on A.aID = B.bID
最简单的文字匹配
select * FROM xiaohongshu_article_3 WHERE content REGEXP "家居"
通配符查询:
MySql的like语句中的通配符:百分号、下划线和escape
FROM [user] WHERE u_name REGEXP ‘^三’; # 以三开头的句子
FROM [user] WHERE u_name REGEXP ‘三$’; # 以三结尾的句子
其中:
%:表示任意个或多个字符。可匹配任意类型和长度的字符。
select * from user where username like '%huxiao';
select * from user where username like 'huxiao%';
select * from user where username like '%huxiao%';
SELECT * FROM [user] WHERE u_name LIKE ‘%三%猫%’
其中,%huxiao,代表句尾
‘%三%猫%’
虽然能搜索出“三脚猫”,但不能搜索出符合条件的“张猫三”。
**_:表示任意单个字符。**匹配单个任意字符,它常用来限制表达式的字符长度语句:(可以代表一个中文字符)
select * from user where username like '_';
select * from user where username like 'huxia_';
select * from user where username like 'h_xiao';
如果我就真的要查%或者_,怎么办呢?使用escape,转义字符后面的%或_就不作为通配符了,注意前面没有转义字符的%和_仍然起通配符作用
Sql代码
select username from gg_user where username like '%xiao/_%' escape '/';
select username from gg_user where username like '%xiao/%%' escape '/';
SELECT * FROM `magazine` WHERE CONCAT(`title`,`tag`,`description`) LIKE ‘%关键字%’
单字段多like模糊匹配:
SELECT * FROM [user] WHERE u_name LIKE '%三%' AND u_name LIKE '%猫%'
多字段 + 多like联合匹配:(参考:https://www.cnblogs.com/chywx/p/10154990.html)
select * from djk_corpus where concat(TestSubject,AnswerA,AnswerB,AnswerC,AnswerD,AnswerE) NOT REGEXP "png|jpg|jpeg|gif"
TestSubject,AnswerA等这几个字段,都不包含,png,jpg等,这些字段
匹配 p1 或 p2 或 p3。例如,‘z|food’ 能匹配 “z” 或 “food”。’(z|f)ood’ 则匹配 “zood” 或 “food”。
但是不能写成’p1&p2’,只能用"|"来写
还可以使用其他,但是需要注意顺序关系:
*
(星号)和+
(加号)都可以匹配多个该符号之前的字符。但是,+
至少表示一个字符,而*
可以表示0个字符。
SELECT * FROM baike369 WHERE name REGEXP 'a*c';
SELECT * FROM baike369 WHERE name REGEXP 'a+c';
其他方式:
select * from weibo_posts_21092h where
content like '%益生菌%'
OR content like '%益生元%'
OR content like '%微生态%'
OR content like '%后生元%'
OR content like '%微生物%'
content like '%皮肤%'
OR content like '%肌肤%'
OR content like '%口腔%'
OR content like '%肠道%'
OR content like '%身体%'
OR content like '%人体%'
来自:MySQL匹配指定字符串的查询
从baike369表的name字段中查询包含“a”到“w”字母和数字以外的字符的记录。SQL代码如下:
SELECT * FROM baike369 WHERE name REGEXP '[^a-w0-9]';
查看name字段中查询包含“a”到“w”字母和数字以外的字符的记录的操作效果。
使用方括号([])可以将需要查询的字符组成一个字符集;通过“[abc]”可以查询包含a、b和c等3个字母中任何一个的记录。
SELECT * FROM baike369 WHERE name REGEXP '[ceo]';
name字段中查询出包含数字的记录。
SELECT * FROM baike369 WHERE name REGEXP '[0-9]';
查询包含数字或者字母a、b和c的记录
SELECT * FROM baike369 WHERE name REGEXP '[0-9a-c]';
可行的是下面的这种:
data[data['col'].str.contains('(?=.*组合)(?=.*衣物)^.*$' , regex=True )]****
不可行的有:
data[data['col'].str.contains('组合(.*?)衣物' , regex=True )]
data[data['col'].str.contains('组合*衣物' , regex=True )]
data[data['col'].str.contains('组合+衣物' , regex=True )]
data[data['col'].str.contains('.*[(组合)(衣物)].*' , regex=True )]
ProgrammingError: (1064, 'You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near \'\\\\"]", "1", "", "", "", "", "", "", "", "", "", "水", "7732-18-5", "WATER\' at line 2')
有人说是因为转义问题,所以:
可以使用:pymysql.escape_string(cdv)
对文字进行一定编码(有点像,as.numbic() )
import pymysql
pymysql.escape_string('大花田菁(SESBANIAGRANDIFLORA)花')
(1366, "Incorrect string value: '\xF0\x9F\x92\x9C\xF0\x9F...' for column 'text' at row,\xF0\x9F\x92\x9C\xF0\x9F,
这种原因因为MySQL不识别有些字符的utf8编码(如表情字符),这时你需要指定连接字符编码为utf8mb4。数据表对应字段编码也改成utf8mb4
[ERR] 1366 - Incorrect string value: ‘\xE4\xB8\xAD\xE5\x86\xB6…’ for column ‘楼盘名称’ at row 1
文章:python3 pandas to_sql填坑
Execution failed on sql ‘SELECT name FROM sqlite_master WHERE type=‘table’ AND name=?;’: not all arguments converted during string formatting
数据库链接不再使用pymysql,而改用sqlalchemy,con=engine 而不是con=db 官方文档
但是,如果按照如上写法,在python3.6(我的python版本)环境下会出现找不到mysqldb模块错误!
正确的写法如下,因为python3将mysqldb改为pymysql了!!!
mysql+pymysql://root:root@localhost:3306/tushare?charset=utf8
当我用pymmsql执行,
cursor.execute('SELECT * FROM persons WHERE salesrep=John ')
DatabaseError: Execution failed on sql 'select * from fresh_ax_product where itemid = "d"': (207, b"Invalid column name 'd'.DB-Lib error message 20018, severity 16:\nGeneral SQL Server error: Check messages from the SQL Server\n")
那么这里用pymmsql 来进行一些where语句的时候,就需要一些特殊的写入方式:
cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
参考:Python连接SQL Server数据库 - pymssql使用基础
Cannot resolve the collation conflict between "Chinese_PRC_CI_AS" and "Chinese_PRC_Stroke_90_CI_AS_WS" in the equal to operation. (468)
select a1.*,a2.* from
(select *,(left(txno,4)) ShopNumber from xxx1) a1
left join xxx2 a2
on a1.fet1 = a2.fet2
COLLATE Chinese_PRC_CI_AS
import pymysql
class mysql_query():
def __init__(self,user = "root",password = "123456",\
host = '47.103.114.113',port = 3306,\
database="d-lab"):
self.user = user
self.password = password
self.host = host
self.port = port
self.database = database
def query(self,sql):
conn = pymysql.connect(host=self.host, user=self.user,password=self.password,database=self.database)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
#sql = "SELECT * FROM xiaohongshu_article_2 limit 1000"
cursor.execute(sql) # 执行sql语句
query_out = cursor.fetchall() # 获取查询的所有记录
except Exception as e:
print(e)
finally:
conn.close()
return query_out
def execute(self,sql):
conn = pymysql.connect(host=self.host, user=self.user,password=self.password,database=self.database)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
#sql = "SELECT * FROM xiaohongshu_article_2 limit 1000"
cursor.execute(sql) # 执行sql语句
conn.commit() #提交到数据库执行
except Exception as e:
print(e)
conn.rollback() #发生错误后回滚
finally:
conn.close()
return 'success'
mq = mysql_query(user = "xxx",password = "xxx",\
host = 'xxx',port = 3306,\
database="xxx")
sql = "select count(*) from data "
mq.query(sql)
python使用dbutils的PooledDB连接池,操作数据库
使用DBUtils数据库连接池中的连接,操作数据库
OperationalError: (2006, ‘MySQL server has gone away’)
import json
import pymysql
import datetime
from DBUtils.PooledDB import PooledDB
import pymysql
class MysqlClient(object):
__pool = None;
def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
maxusage=100, setsession=None, reset=True,
host='127.0.0.1', port=3306, db='test',
user='root', passwd='123456', charset='utf8mb4'):
:param mincached:连接池中空闲连接的初始数量
:param maxcached:连接池中空闲连接的最大数量
:param maxshared:共享连接的最大数量
:param maxconnections:创建连接池的最大数量
:param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
:param maxusage:单个连接的最大重复使用次数
:param setsession:optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
:param reset:how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param host:数据库ip地址
:param port:数据库端口
:param db:库名
:param user:用户名
:param passwd:密码
:param charset:字符编码
if not self.__pool:
self.__class__.__pool = PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, setsession, reset,
host=host, port=port, db=db,
user=user, passwd=passwd,
charset=charset,
cursorclass=pymysql.cursors.DictCursor
self._conn = None
self._cursor = None
self.__get_conn()
def __get_conn(self):
self._conn = self.__pool.connection();
self._cursor = self._conn.cursor();
def close(self):
self._cursor.close()
self._conn.close()
except Exception as e:
print e
def __execute(self, sql, param=()):
count = self._cursor.execute(sql, param)
print count
return count
@staticmethod
def __dict_datetime_obj_to_str(result_dict):
"""把字典里面的datatime对象转成字符串,使json转换不出错"""
if result_dict:
result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
result_dict.update(result_replace)
return result_dict
def select_one(self, sql, param=()):
"""查询单个结果"""
count = self.__execute(sql, param)
result = self._cursor.fetchone()
""":type result:dict"""
result = self.__dict_datetime_obj_to_str(result)
return count, result
def select_many(self, sql, param=()):
查询多个结果
:param sql: qsl语句
:param param: sql参数
:return: 结果数量和查询结果集
count = self.__execute(sql, param)
result = self._cursor.fetchall()
""":type result:list"""
[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
return count, result
def execute(self, sql, param=()):
count = self.__execute(sql, param)
return count
def begin(self):
"""开启事务"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""结束事务"""
if option == 'commit':
self._conn.autocommit()
else:
self._conn.rollback()
if __name__ == "__main__":
mc = MysqlClient()
sql1 = 'SELECT * FROM shiji WHERE id = 1'
result1 = mc.select_one(sql1)
print json.dumps(result1[1], ensure_ascii=False)
sql2 = 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)'
param = (2, 3, 4)
print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)
利用to_sql导入数据
import pandas as pd
import datetime
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
yconnect = create_engine('mysql+pymysql://user:password@111.111.111.111:3306/your_database_name?charset=utf8mb4')
df = pd.DataFrame({'序号':[2],'id':[2],'name2':["user 10"],'time':[datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")]})
df.to_sql(name='test_8', con=yconnect, if_exists='append', index=False,dtype = {'time':sqlalchemy.DateTime()})
导入的定义不同字段的数据格式
如果,表格里面该字段已经是时间格式了,那么就可以直接插入:
# sql语句:
table_name = 'test_8'
sql = "update {} set time='{}' where id= {}".format(table_name,datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),2)
out = mc._cursor.execute(sql,)
>>> update test_8 set time='2020-12-15 09:39:05' where id= 2
反正几种py时间生成的方式,都是符合规定:
update_time=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
update_time=time.strftime('%Y-%m-%d %X',time.localtime(time.time()))
简单写了一个可以连接mysql / mmsql的小函数,通过Pandas直接调用
import pymssql
import pymysql
import pandas as pd
import numpy as np
from collections import Counter
def connect_db(host= '0.0.0.0' , user='xxx',\
password= 'xxx', database = 'xxx'):
conn = pymssql.connect(host , user , password, database)
return conn
def fetch_data_from_db(sql_query, host, user, password, database):
Fetch data from SQL server
con = connect_db(host , user , password, database)
cursor = con.cursor()
cursor.execute(sql_query)
result = cursor.fetchall()
col_result = cursor.description
columns = []
for i in range(len(col_result)):
columns.append(col_result[i][0])
df = pd.DataFrame(list(result), columns = columns)
con.close()
return df
class pandas_read():
def __init__(self,sql_config,types = 'mmsql'):
self.sql_config = sql_config
if types == 'mmsql':
self.conn = self.get_conn_mmsql(sql_config)
if types == 'mysql':
self.conn = self.get_conn_mysql(sql_config)
def get_conn_mmsql(self,mmsql_config):
conn = pymssql.connect(mmsql_config['host'] , mmsql_config['user'] ,\
mmsql_config['password'], mmsql_config['db'])
return conn
def get_conn_mysql(self,mysql_config):
conn = pymysql.connect(host=mysql_config['host'], \
user=mysql_config['user'],password=mysql_config['password'], \
db=mysql_config['db'],charset='utf8', \
use_unicode=True)
return conn
def read_sql(self,sql):
return pd.read_sql(sql, con=self.conn)
if __name__ == "__main__":
user = 'xxx'
password = 'xxx'
host = '0.0.0.0'
database = 'xxx'
sql_query = 'select top 5000 * from database'
data = fetch_data_from_db(sql_query, host, user, password, database)
# pandas 读入
sql_config = {
'user':user,
'password':password,
'host':host,
'db':'xxxdb'
pr = pandas_read(sql_config,types = 'mmsql')
sql = 'select * from database'
data = pr.read_sql(sql)
之前的一篇:python︱mysql数据库连接——pyodbc0 安装依赖pip3 install --pre pymysql -i https://pypi.tuna.tsinghua.edu.cn/simple pip3 install --pre sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple1 连接方式1.1 pymysql.connect直接连import pymysql# 连接databaseuser = "xxx
PyMySQL介绍
PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,Python2中则使用mysqldb。
Django中也可以使用PyMySQL连接MySQL数据库。
PyMySQL安装
#终端中安装pymysql
pip install pymysql
PyMySQL的使用
sudo pip3 install pymysql
2.基本使用
from pymysql import connect
# 1.创建链接
coon = connect()
* 参数host:连接的mysql主机,如果本机是'localhost
pwd = input('请输入密码:')
# 1.连接
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='t1', charset='utf8')
print(conn)
# 2.创建游标
cursor = conn.cursor()
#注意%s
项目中遇到了数据库连接池太多,严重影响性能,就重新设置了mysql的存活时间,特在此记录一下。
[参考链接](https://blog.csdn.net/weixin_35826751/article/details/113152007?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0.control&spm=1001.2101.3001.4242)
creator:数据库驱动模块,如常见的pymysql,pymssql,cx_Oracle模块。无默认值
mincached:初始化连接池时创建的连接数。默认为0,即初始化时不创建连接。(建议默认0,假如非0的话,在某些数据库不可用时,整个项目会启动不了)
maxcached:池中空闲连接的最大数量。默认为0,即无最大数量限制。(建议默认)
maxsh...
import pymysql, os, configparser
from pymysql.cursors import DictCursor
from DBUtils.PooledDB import PooledDB
class Config(object):
# Config().get_content("user_information")
配置文件里面...
因为Anaconda中没有DBUtils包,所以pip命令安装的DBUtils是假的,在pycharm 的project interpreter中是没有dbutils的。
Anaconda正确安装DButils姿势:
1.anaconda search ...
DBUtils 是一套用于管理数据库连接池的包,为高频度高并发的数据库访问提供更好的性能,可以自动管理连接对象的创建和释放。最常用的两个外部接口是 PersistentDB 和 PooledDB,前者提供了单个线程专用的数据库连接池,后者则是进程内所有线程共享的数据库连接池。
DBUtils是一套Python数据库连接池包,并允许对非线程安全的数据库接口进行线程安全包装。DBUtils来...
db = pymysql.connect("localhost", "root", "password", "test")
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
# 使用 execute() 方法执行 SQL 查询
cursor.execute("SELECT VERSION()")
# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone()
print("Database version : %s " % data)
# 关闭数据库连接
db.close()
在这个例子中,我们通过pymysql连接到了MySQL数据库,并查询了MySQL的版本信息。我们首先使用connect()函数建立了数据库连接。connect函数的参数依次为:
- host:MySQL服务器地址
- user:用户名
- password:密码
- database:要连接的数据库
如果连接成功,我们就可以使用cursor()方法创建一个游标对象。游标对象可以用来执行SQL查询和获取结果数据。接下来,我们使用execute()方法执行了一个SQL查询,这个查询返回了MySQL的版本信息。最后,我们使用fetchone()方法获取了查询结果。
查询结果是一个元组,我们可以通过下标或者字段名来获取查询结果的具体值。最后,我们使用close()方法关闭数据库连接。