github: https://github.com/PyMySQL/PyMySQL

Python3 MySQL 数据库连接 - PyMySQL 驱动: https://www.runoob.com/python3/python3-mysql.html

pymysql 是线程安全的( 搜索 thread,可以看到 thread_safe=1,同时函数 thread_safe() 返回 True ): https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/__init__.py

Mysql  如果数据存在则更新,不存在则插入

https://blog.csdn.net/zhou16333/article/details/95867827

1、PyMySQL 安装

在使用 PyMySQL 之前,我们需要确保 PyMySQL 已安装。

PyMySQL 下载地址: https://github.com/PyMySQL/PyMySQL

安装 PyMySQL 的 Python 包:pip3 install PyMySQL

2、数据库连接

连接数据库前,请先确认以下事项:

  • 已经创建了数据库 TESTDB .
  • TESTDB 数据库中您已经创建了表 EMPLOYEE
  • EMPLOYEE 表字段为 FIRST_NAME, LAST_NAME, AGE, SEX INCOME
  • 连接数据库 TESTDB 使用的用户名为 "testuser" ,密码为 "test123",你可以可以自己设定或者直接使用 root 用户名及其密码,Mysql 数据库用户授权请使用 Grant 命令。
  • 已经安装了 Python MySQLdb 模块。
  • 如果您对sql语句不熟悉,可以访问 SQL基础教程

示  例:

链接 Mysql 的 TESTDB 数据库:

import pymysql
def main():
    # 打开数据库连接
    mysql_conn = pymysql.connect("localhost", "testuser", "test123", "TESTDB")
        # 使用 cursor() 方法创建一个游标对象 cursor
        cursor = mysql_conn.cursor()
        sql_string = 'insert into test_table (col1, c0l2) values (1,2,3)'        
        cursor.execute(sql_string)
        mysql_conn.commit()
        # 使用 execute()  方法执行 SQL 查询
        cursor.execute("SELECT VERSION()")
        # 使用 fetchone() 方法获取单条数据.
        data = cursor.fetchone()
        print("Database version : %s " % data)
    except BaseException as be:
        # 关闭数据库连接
        mysql_conn.rollback()
    finally:
        mysql_conn.close()
if __name__ == '__main__':
    main()

游标对象:就是对数据库进行具体的操作了,比如增、删、改、查等等一系列操作都可以完成

游标类型 如下:

类型描述
Cursor普通的游标对象,默认创建的游标对象
SSCursor不缓存游标,主要用于当操作需要返回大量数据的时候
DictCursor以字典的形式返回操作结果
SSDictCursor不缓存游标,将结果以字典的形式进行返回

创建数据库表

如果数据库连接存在我们可以使用execute()方法来为数据库创建表,如下所示创建表EMPLOYEE:

import pymysql
# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
# 使用 execute() 方法执行 SQL,如果表存在则删除
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")
# 使用预处理语句创建表
sql = """CREATE TABLE EMPLOYEE (
         FIRST_NAME  CHAR(20) NOT NULL,
         LAST_NAME  CHAR(20),
         AGE INT,  
         SEX CHAR(1),
         INCOME FLOAT )"""
cursor.execute(sql)
# 关闭数据库连接
db.close()

查询  数据

Python 查询 Mysql 使用 fetchone() 方法获取单条数据,使用 fetchall() 方法获取多条数据。

  • fetchone():  该方法获取下一个查询结果集。结果集是一个对象
  • fetchall():  接收全部的返回结果行.
  • rowcount:  这是一个只读属性,并返回执行execute()方法后影响的行数。

查询 EMPLOYEE 表中 salary(工资)字段大于 1000 的所有数据:

import pymysql
# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标 
cursor = db.cursor()
# SQL 查询语句
sql = "SELECT * FROM EMPLOYEE \
       WHERE INCOME > %s" % (1000)
   # 执行SQL语句
   cursor.execute(sql)
   # 获取所有记录列表
   results = cursor.fetchall()
   for row in results:
      fname = row[0]
      lname = row[1]
      age = row[2]
      sex = row[3]
      income = row[4]
       # 打印结果
      print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
             (fname, lname, age, sex, income ))
except:
   print ("Error: unable to fetch data")
# 关闭数据库连接
db.close()
import pymysql
class DB():
    def __init__(self, host='localhost', port=3306, db='', user='root', passwd='root', charset='utf8'):
        # 建立连接 
        self.conn = pymysql.connect(host=host, port=port, db=db, user=user, passwd=passwd, charset=charset)
        # 创建游标,操作设置为字典类型        
        self.cur = self.conn.cursor(cursor = pymysql.cursors.DictCursor)
    def __enter__(self):
        # 返回游标        
        return self.cur
    def __exit__(self, exc_type, exc_val, exc_tb):
        # 提交数据库并执行        
        self.conn.commit()
        # 关闭游标        
        self.cur.close()
        # 关闭数据库连接        
        self.conn.close()
if __name__ == '__main__':
    with DB(host='192.168.68.129',user='root',passwd='zhumoran',db='text3') as db:
        db.execute('select * from course')
        print(db)
        for i in db:
            print(i)

插入  数据

执行 SQL INSERT 语句向表 EMPLOYEE 插入记录:

import pymysql
# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标 
cursor = db.cursor()
# SQL 插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
         LAST_NAME, AGE, SEX, INCOME)
         VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
   # 执行sql语句
   cursor.execute(sql)
   # 提交到数据库执行
   db.commit()
except:
   # 如果发生错误则回滚
   db.rollback()
# 关闭数据库连接
db.close()

以上例子也可以写成如下形式:

import pymysql
# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标 
cursor = db.cursor()
# SQL 插入语句
sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \
       LAST_NAME, AGE, SEX, INCOME) \
       VALUES ('%s', '%s',  %s,  '%s',  %s)" % \
       ('Mac', 'Mohan', 20, 'M', 2000)
   # 执行sql语句
   cursor.execute(sql)
   # 执行sql语句
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()
# 关闭数据库连接
db.close()

以下代码使用变量向SQL语句中传递参数:

..................................
user_id = "test123"
password = "password"
con.execute('insert into Login values( %s,  %s)' % \
             (user_id, password))
..................................

单条插入数据:

#!/usr/bin/python3 import pymysql # 打开数据库连接 db = pymysql.connect("localhost","testuser","test123","TESTDB" ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # SQL 插入语句 里面的数据类型要对应 sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \ LAST_NAME, AGE, SEX, INCOME) \ VALUES ('%s', '%s', %s, '%s', %s)" % \ ('Mac', 'Mohan', 20, 'M', 2000) # 执行sql语句 cursor.execute(sql) # 执行sql语句 db.commit() except: # 发生错误时回滚 db.rollback() # 关闭数据库连接 db.close()

批量插入数据:

注意:批量插入数据单条插入数据 的区别:

  • 批量插入:VALUES (%s, %s, %s, %s, %s,) 里面 不用引号
  • 单条插入:VALUES ('%s', '%s', '%s', '%s', '%s') 里面 需要引号
#!/usr/bin/env python
# -*-encoding:utf-8-*-
import pymysql
# 打开数据库连接
db = pymysql.connect("localhost","root","123","testdb")
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
# SQL 插入语句
sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \
       LAST_NAME, AGE, SEX, INCOME) \
       VALUES (%s,%s,%s,%s,%s)"
# 区别与单条插入数据,VALUES ('%s', '%s',  %s,  '%s', %s) 里面不用引号
val = (('li', 'si', 16, 'F', 1000),
       ('Bruse', 'Jerry', 30, 'F', 3000),
       ('Lee', 'Tomcat', 40, 'M', 4000),
       ('zhang', 'san', 18, 'M', 1500))
   # 执行sql语句
   cursor.executemany(sql,val)
   # 提交到数据库执行
   db.commit()
except:
   # 如果发生错误则回滚
   db.rollback()
# 关闭数据库连接
db.close()

更新  数据

更新操作用于更新数据表的数据,以下实例将 TESTDB 表中 SEX 为 'M' 的 AGE 字段递增 1:

import pymysql
# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标 
cursor = db.cursor()
# SQL 更新语句
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
   # 执行SQL语句
   cursor.execute(sql)
   # 提交到数据库执行
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()
# 关闭数据库连接
db.close()

批量 更新

使用 pymysql 的 course.executemany(sql, update_list) 进行批量更新

  • sql:更新一条的 sql 语句模板;
  • update_list:一个列表套元组的结构;

示  例:

db = pymysql.connect(user='root', password='mysql', database='test', host='127.0.0.1', port=3306, charset='utf8mb4')
name_list = ["re", "gh", "ds", "D"]  # 存储name的值
age_list = ["10", "20", "30", "40"]  # 存储age的值
id_list = ["1", "2", "3", "4"]  # 存储id的值
val_list = [[name_list[i], age_list[i], id_list[i]] for i in range(len(id_list))]
print(val_list)
# [['re', '10', '1'], ['gh', '20', '2'], ['ds', '30', '3'], ['D', '40', '4']]
with db.cursor() as cursor:
        sql = "UPDATE test SET name=(%s), age=(%s) WHERE id=(%s)"
        cursor.executemany(sql, val_list)
        db.commit()
    except:
        db.rollback()
db.close()

pymysql 批量 --- 增、删、改、查

注意:插入数字也是 %s

# coding=utf-8
import time
import pymysql.cursors
conn= pymysql.connect(host='rm-xxx.mysql.rds.aliyuncs.com',
                       port=3306,
                       user='dba',
                       password='xxxxx',
                       db='app',
                       charset='utf8')
cursor= conn.cursor()
# conn.ping(reconnect=True)
count= 0
posts=[]
for postin posts:
        sql= 'DELETE FROM user_like WHERE user_id=%s and like_post_id=%s'
        ret= cursor.executemany(sql, ((1,2), (3,4), (5,6)))
        conn.commit()
    except Exception as e:
        print("batch Exception:", e)
    count+=1
cursor.close()
conn.close()
# 基本sql语句写法
# INSERT INTO star(name,gender) VALUES(“XX”, 20)
# SELECT * FROM app.user_post WHERE post_id LIKE '%xxxx%';
# UPDATE app.user_post SET post_id=replace(post_id,'\'','’);
# UPDATE app.user_post SET  province = ‘xxx', city =‘xxx';
# DELETE FROM app.user_post where updated_at = '0000-00-00 00:00:00’;
# 带参数构造语句的基本写法
# sql = 'select user_id, post_id from user_post where user_id="{user_id}" and post_id="{post_id}"'.format(user_id=user_id, post_id=post_id)
# sql = 'SELECT count(*) FROM user_like where like_post_id = "%s"' % ("xxx")
# sql = 'update star set gender="{gender}", height="{height}" where star_id="{star_id}"'.format(gender='M', height=180, star_id=123456789)

删除  数据

删除操作用于删除数据表中的数据,以下实例演示了删除数据表 EMPLOYEE 中 AGE 大于 20 的所有数据:

import pymysql
# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标 
cursor = db.cursor()
# SQL 删除语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
   # 执行SQL语句
   cursor.execute(sql)
   # 提交修改
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()
# 关闭连接
db.close()

执行 事务

事务机制可以确保数据一致性。

对于支持事务的数据库, 在 Python 数据库编程中,当游标建立之时,就自动开始了一个隐形的数据库事务。commit()方法游标的所有更新操作,rollback()方法回滚当前游标的所有操作。每一个方法都开始了一个新的事务。

事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。

  • 原子性(atomicity)。一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做。
  • 一致性(consistency)。事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。
  • 隔离性(isolation)。一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
  • 持久性(durability)。持续性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响。
# SQL删除记录语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
   # 执行SQL语句
   cursor.execute(sql)
   # 向数据库提交
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()

pymysqlpool

线程安全 pymysqlpool

# -*-coding: utf-8-*-
# Author : Christopher Lee
# License: Apache License
# File   : test_example.py
# Date   : 2017-06-18 01-23
# Version: 0.0.1
# Description: simple test.
import logging
import string
import threading
import pandas as pd
import random
from pymysqlpool import ConnectionPool
config = {
    'pool_name': 'test',
    'host': 'localhost',
    'port': 3306,
    'user': 'root',
    'password': 'chris',
    'database': 'test',
    'pool_resize_boundary': 50,
    'enable_auto_resize': True,
    # 'max_pool_size': 10
logging.basicConfig(format='[%(asctime)s][%(name)s][%(module)s.%(lineno)d][%(levelname)s] %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    level=logging.DEBUG)
def connection_pool():
    # Return a connection pool instance
    pool = ConnectionPool(**config)
    # pool.connect()
    return pool
def test_pool_cursor(cursor_obj=None):
    cursor_obj = cursor_obj or connection_pool().cursor()
    with cursor_obj as cursor:
        print('Truncate table user')
        cursor.execute('TRUNCATE user')
        print('Insert one record')
        result = cursor.execute('INSERT INTO user (name, age) VALUES (%s, %s)', ('Jerry', 20))
        print(result, cursor.lastrowid)
        print('Insert multiple records')
        users = [(name, age) for name in ['Jacky', 'Mary', 'Micheal'] for age in range(10, 15)]
        result = cursor.executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)
        print(result)
        print('View items in table user')
        cursor.execute('SELECT * FROM user')
        for user in cursor:
            print(user)
        print('Update the name of one user in the table')
        cursor.execute('UPDATE user SET name="Chris", age=29 WHERE id = 16')
        cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 1')
        print(cursor.fetchone())
        print('Delete the last record')
        cursor.execute('DELETE FROM user WHERE id = 16')
def test_pool_connection():
    with connection_pool().connection(autocommit=True) as conn:
        test_pool_cursor(conn.cursor())
def test_with_pandas():
    with connection_pool().connection() as conn:
        df = pd.read_sql('SELECT * FROM user', conn)
        print(df)
def delete_users():
    with connection_pool().cursor() as cursor:
        cursor.execute('TRUNCATE user')
def add_users(users, conn):
    def execute(c):
        c.cursor().executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)
        c.commit()
    if conn:
        execute(conn)
        return
    with connection_pool().connection() as conn:
        execute(conn)
def add_user(user, conn=None):
    def execute(c):
        c.cursor().execute('INSERT INTO user (name, age) VALUES (%s, %s)', user)
        c.commit()
    if conn:
        execute(conn)
        return
    with connection_pool().connection() as conn:
        execute(conn)
def list_users():
    with connection_pool().cursor() as cursor:
        cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 5')
        print('...')
        for x in sorted(cursor, key=lambda d: d['id']):
            print(x)
def random_user():
    name = "".join(random.sample(string.ascii_lowercase, random.randint(4, 10))).capitalize()
    age = random.randint(10, 40)
    return name, age
def worker(id_, batch_size=1, explicit_conn=True):
    print('[{}] Worker started...'.format(id_))
    def do(conn=None):
        for _ in range(batch_size):
            add_user(random_user(), conn)
    if not explicit_conn:
        return
    with connection_pool().connection() as c:
        do(c)
    print('[{}] Worker finished...'.format(id_))
def bulk_worker(id_, batch_size=1, explicit_conn=True):
    print('[{}] Bulk worker started...'.format(id_))
    def do(conn=None):
        add_users([random_user() for _ in range(batch_size)], conn)
        time.sleep(3)
    if not explicit_conn:
        return
    with connection_pool().connection() as c:
        do(c)
    print('[{}] Worker finished...'.format(id_))
def test_with_single_thread(batch_number, batch_size, explicit_conn=False, bulk_insert=False):
    delete_users()
    wk = worker if not bulk_insert else bulk_worker
    for i in range(batch_number):
        wk(i, batch_size, explicit_conn)
    list_users()
def test_with_multi_threads(batch_number=1, batch_size=1000, explicit_conn=False, bulk_insert=False):
    delete_users()
    wk = worker if not bulk_insert else bulk_worker
    threads = []
    for i in range(batch_number):
        t = threading.Thread(target=wk, args=(i, batch_size, explicit_conn))
        threads.append(t)
        t.start()
    [t.join() for t in threads]
    list_users()
if __name__ == '__main__':
    import time
    start = time.perf_counter()
    test_pool_cursor()
    test_pool_connection()
    test_with_pandas()
    test_with_multi_threads(20, 10, True, bulk_insert=True)
    test_with_single_thread(1, 10, True, bulk_insert=True)
    elapsed = time.perf_counter() - start
    print('Elapsed time is: "{}"'.format(elapsed))
python 标准数据库接口为 Python DB-API,Python DB-API为开发人员提供了数据库应用编程接口不同的数据库你需要下载不同的DB API模块,例如你需要访问Mysql数据库,你需要下载MySQL数据库模块。DB-API 是一个规范. 它定义了一系列必须的对象和数据库存取方式, 以便为各种各样的底层数据库系统和多种多样的数据库接口程序提供一致的访问接口 。Python的DB-... import pymysql def insert_to_mysql(to_db_list): mysql_db = pymysql.connect(host="HOST_IP", port=3306, user="username", password="password", database="db", charset="utf8") cursor = mysql_db.cursor() sql = "INSERT INTO `your_db`.`your_table`(`colum1`, `colum2`, `colum3`) VA  pip install pymysql…tar.gz  连接数据库  conn=pymysql.connect(host=‘服务器 IP’, port=3306, user=‘用户名’, passwd=‘密码’, db=‘数据库名’, charset=‘utf8’)  port=3306,... # 创建连接对象也就是为了连接到本地的数据库 engine = pymysql.connect(host='xxx', user='xxx', password='xxx', database='xxx', ) regonls = pd.read_csv('xxx.csv') cursor = engine.cursor() # 批量询 for i in tqdm(range(len(re
如果对于sqlalchemy框架不会使用,可以看——Python操作MySql——使用SQLAlchemy ORM操作数据库 1 使用sqlalchemy 框架,连接数据库 from random import random from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String   最近做web网站的测试,遇到很多需要批量造数据的功能;比如某个页面展示数据条数需要达到10000条进行测试,此时手动构造数据肯定是不可能的,此时只能通过python脚本进行自动构造数据;本次构造数据主要涉及到在某个表里面批量添加数据、在关联的几个表中同步批量添加数据、批量询某个表中符合条件的数据、批量更新某个表中符合条件的数据等。   二、数据添加   即批量添加数据到某个表中。 insert_data.py import pymysql import random import time from get_userinfo import get_userinfo 由于上篇文章中批量了文件,有的时候数据库也需要批量一下,之前的做法是使用宝塔的phpMyAdmin导出一个已经修好了的sql文件,然后依次去其他数据库里导入,效率不说极低,也算低了,且都是些重复性的劳动,所以打算用Python批量执行sql 版本:Python3.6 系统:MacOS IDE:PyCharm 第三方库:pymysql Show Code import pymysql host = 'xxx.65.9.191' username = 'root' password = 'root' def connectMySQL(): print('
SQLAlchemy是Python编程语言下的一款开源软件,提供了SQL工具包及对象关系映射(ORM)工具,使用MIT许可证发行。SQLAlchemy首次发行于2006年2月,并迅速地在Python社区中最广泛使用的ORM工具之一,不亚于Django的ORM框架。 本文将介绍如何使用SQLAlchemy操作MySQL,完成基础的表创建,表格数据的新询、修删除(CRUD)等操作。 首先我们需要确认当前的Python环境下已经安装sqlalchemy和pymysql模块。 我们使用本地的MySQL数据库数据库为orm_test。新建一张users表,字段为id,name,
import pymysql #import datetime #day = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')#参数值插入时间 db = pymysql.connect(host='\u670d\u52a1\u5668IP', user='\u8d26\u53f7', passwd='\u5bc6\u7801', port=端口号) cur = db.cursor() cur.execute('use 数据库') #批量创建测试账号 SELECT XM,ID FROM JCXX WHERE XM LIKE '%(%' OPEN XM_Cursor FETCH FROM XM_Cursor into @XM,@ID WHILE @@FETCH_STATUS=0 BEGIN set @COUNT = PATINDEX( '%(%', @XM) IF @COUNT > 0 BEGIN SET @YXM = SUBSTRING(@XM,0,@COUNT) UPDATE JCXX SET XM = @YXM WHERE ID = @ID PRINT @YXM FETCH NEXT FROM XM_Cursor into @XM,@ID close XM_Cursor DEALLOCATE XM_Cursor import pymysql conn = pymysql.connect(host='localhost', user='root', password='password', database='test') 2. 创建游标对象: cursor = conn.cursor() 3. 定义 SQL 语句: sql = "INSERT INTO table_name (column1, column2, column3) VALUES (%s, %s, %s)" 4. 定义数据列表: data = [ ('value1', 'value2', 'value3'), ('value4', 'value5', 'value6'), ('value7', 'value8', 'value9') 5. 执行批量插入操作: cursor.executemany(sql, data) 6. 提交事务: conn.commit() 7. 关闭游标和连接: cursor.close() conn.close() 以上就是使用 Python pymysql 批量添加数据到数据库的方法。