最近做了套接口测试系统,后台定时任务会批量去跑测试用例,用的是多线程,以前多线程模块用的是 自己封装的包 ,但因为一些原因近期替换成了python3标准库中的ThreadPoolExecutor,于是问题开始了……

项目用的是:Flask + Flask-SQLAlchemy + Flask-APScheduler + ThreadPoolExecutor

替换原因,一些未知原因,任务运行完成后,个别线程无法正常退出,导致线程泄漏

二、问题描述

定时任务运行时大约有20%的概率会出现问题一与问题二

pymysql.err.InternalError: Packet sequence number wrong - got 45 e

大致意思是当前数据库连接发了一个序号的45的包,但收到的包序号不是45,错乱了。网上查找了各种资料,大致都说是多线程引起,即多个线程共用了同一个数据库连接解决方案有如下:

使用了多线程,多线程共享了同一个数据库连接,但每个execute前没有加上互斥锁 方法一:每个execute前加上互斥锁 lock.acquire() cursor.execute(command,data) lock.release()   每个线程拥有自己的数据库连接,即在线程调用函数中加上数据库连接代码 所有线程共用一个连接池,需要考虑线程总数和连接池连接数上限的问题

尝试了各种方法,还把每个线程db.session中数据库连接对象的内存地址打印出来了,确定每个线程都是唯一的,没有共用。

db.session是个全局对象,所有线程引用的都是同一个对象,不是每个线程一个。但是数据库连接只是db.session对象中的一个属性,这个属性存储在db.session.registry.registry中,registry是werkzeug.local.Local()类型的对象为key-value存储结构,key为当前线程ID,value为每个线程独立的值。每个线程一个连接,互不影响互不干扰。可以通过如下语句获取当前线程的数据库连接对象:db.session.registry.registry.get(threading.get_ident()))

(sqlalchemy.exc.InvalidRequestError) Can’t reconnect until invalid transaction is rolled back

大致的意思是在同一个数据库连接中,在发起新的数据库请求前,必须先关闭掉之前报错的事务,不要认为自己没有开启事务,采用Flask-SQLAlchemy时,即使是最简单的select语句,也会自动开启事务,查询完成时会自动回滚。如下日志:

2019-08-10 12:42:18,312 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2019-08-10 12:42:18,312 - Thread-1 - 123145310482432 - log.py - info - 110 -【INFO】- BEGIN (implicit)
2019-08-10 12:42:18,319 INFO sqlalchemy.engine.base.Engine SELECT user.status AS user_status, user.id AS user_id, user.username AS user_username, user.name AS user_name, user.phone AS user_phone, user.email AS user_email, user.address AS user_address, user.remark AS user_remark, user.create_time AS user_create_time
FROM user
WHERE user.status = %(status_1)s
2019-08-10 12:42:18,319 - Thread-1 - 123145310482432 - log.py - info - 110 -【INFO】- SELECT user.status AS user_status, user.id AS user_id, user.username AS user_username, user.name AS user_name, user.phone AS user_phone, user.email AS user_email, user.address AS user_address, user.remark AS user_remark, user.create_time AS user_create_time
FROM user
WHERE user.status = %(status_1)s
2019-08-10 12:42:18,319 INFO sqlalchemy.engine.base.Engine {‘status_1’: 1}
2019-08-10 12:42:18,319 - Thread-1 - 123145310482432 - log.py - info - 110 -【INFO】- {‘status_1’: 1}
2019-08-10 12:42:18,329 INFO sqlalchemy.engine.base.Engine ROLLBACK
2019-08-10 12:42:18,329 - Thread-1 - 123145310482432 - log.py - info - 110 -【INFO】- ROLLBACK

sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1040, ‘Too many connections’)

默认情况下mysql服务器端最大允许154个连接,此错误即是客户端发起的连接数太多,已经超过了数据库服务端的最大限制,从而导致的报错。但是ThreadPoolExecutor只开了20个线程,加上个别的访问请求,肯定不多超过154的限制,但定时任务运行时查看Mysql的连接(show processlist)时却一直在涨,直到超过限制报错。

三、问题分析

问题一分析

前面的描述中已经排除了多线程间互相的干扰,再对比下以前自己封装的多线程包,发现自己封装的多线程包,在定时任务每次运行时都会重新起20个线程,但ThreadPoolExecutor总计起20个线程,后续每次调度时任务都是在同一批线程中运行,那存在的可能就是在同一个线程内,上一次运行时的状态影响了本次,从而导致底层的pymysql在操作数据库时收发序号错乱。只要在任务开始时,确保数据库连接是初始的,不存在遗留事务即可解决此问题。

问题二分析

在一些可能出错的地方,代码都已经加上了异常处理,但总会有漏网之鱼,整个用例运行期间,因各种原因偶尔会有错误出现,再加上此问题与问题一互相干扰纠缠在一起,每次出现也都是同时出现。

问题三分析

运行定时任务时,随着运行用例数的增加,数据库连接也一直在增加,由此判断必定是连接没有复用,每次都新建了连接导致的,虽然代码中确实是采用了连接池,但感觉没起作用。使用以下语句,打印出线程中的连接池信息,发现每运行一个任务,都新建了一个连接池,池中只有一个连接,确实是没有达到复用的目的,要解决的是运行不同用例时不要创建新的池,只要共用原来的池即可。

logging.info(f"连接池内存地址:{id(db.engine.pool)}")
logging.info(f"当前session地址:{id(db.session)}")
logging.info(f"当前数据库连接内存地址:{id(db.session.registry.registry.get(threading.get_ident()))}")

四、问题解决

1.问题一和问题二

问题一与问题二纠缠在一起,大致确定属同一问题,即线程里有未处理异常,导致后续再次运行时出错。
原来用自已封装的多线程包时,因为每次都是起一批新线程,即使出现未处理异常也只会影响当前线程的当前用例。

1.1 自己封装的多线程包

每次起20个线程,当出现未处理异常时,会导致此线程down掉,剩余19线程继续干活,干完后这次线程全部正常退出,下次时再次新起20个线程,错误不会传递不会累加

1.2 ThreadPoolExecutor

在应用启动时初始化一次,创建20个线程,运行用例碰到未处理异常时,线程不会down掉,会继续干活,那些打开的未关闭事务会影响后续用例运行,错误会传递会累加,只要能消除错误传递即可。

executor = ThreadPoolExecutor(max_workers=20)

1.3 解决方案

【运行逻辑段】中的代码可能会报异常,导致db.session.close()语句没有运行,当前线程占用的连接没有还给连接池,再运行下一条用例时,会继续使用原来的连接,因一些遗留事务或遗留数据,导致报错。

def run_test_case(case_id):
	# 运行逻辑
	xxxxxxxxxxxxxxxx
	# 运行逻辑
	db.session.close()

想要确保获取到的连接是“干净”的,只要在【运行逻辑段】前面再加一条语句即可,如下代码。如果线程中已经存在连接时则会先关闭,后续要使用时会自动申请新连接;当线程中不存在连接时会申请连接并马上关闭,后续要使用时又会自动申请新连接,这样虽然会做一些无用功,但能确保连接“干净”。

def run_test_case(case_id):
	db.session.close()
	# 运行逻辑
	xxxxxxxxxxxxxxxx
	# 运行逻辑
	db.session.close()
 
  1. db.session.close()语句运行后,线程会将连接还给连接池,连接池不会断开与数据库的物理连接,只会做一些ROLLBACK操作,将一些未提交的数据或未提交的事务回滚,确保下次分配出去时是干净的。
  2. 代码中使用db.session.query()或User.query()等查询语句或别的操作语句时,线程会自动从连接池中申请到新的连接。

定时任务中要用到应用模型,必须在启动时推送应用上下文,否则会报错找不到应用的错误,如下代码。把推送上下文的语句放在了run_test_case函数中,但这样会带来一个恶果,即每条用例运行时都会创建一个app对象,每个app对象都有自己的连接池,导致连接池达不到复用的目的,越占越多,最终超出数据库连接从而报错。

def run_test_case(case_id):
	from .. import create_app
	app = create_app()
    app.app_context().push()
	# 运行逻辑
	xxxxxxxxxxxxxxxx
	# 运行逻辑
	db.session.close()

原来采用自己封装的多线程包时,因为每次都是新建一批线程,等所有用例运行完成后这些线程会退出,连接又会释放,连接只在当次内累加最终没有超出数据库上限,所以没有此错误。
采用标准库中的ThreadPoolExecutor后,因为线程在整个应用的生命周期中会一直存在,导致创建的连接都没有释放,最终总会超出数据库限制。解决此问题,只要不在每次运行用例时都重复推送上下文不创建新应用与新连接池即可。如下代码,把推送应用上下文的语句独立成一个函数,传给initializer参数,则只会在线程创建时初始化一次。

# 线程初始化函数
def init_context():
    from .. import app
    app.app_context().push()
    logging.info("推送应用上下文完成")
# 创建定时任务线程池    
executor = ThreadPoolExecutor(
    max_workers=20, initializer=init_context
 # 测试用例运行函数
def run_test_case(case_id):    
	# 运行逻辑
	xxxxxxxxxxxxxxxx
	# 运行逻辑
	db.session.close()
# 线程初始化函数2
def init_context2():
    from .. import create_app()
    app = create_app()
    app.app_context().push()
    logging.info("推送应用上下文完成")
 

注意init_context与init_context2函数的区别,init_context2会创建一个新的连接池,不会与应用共用一个。推荐用init_context,这样整个应用进程中只存在一个连接池。

  • 碰到问题一时先入为主了,一直在多线程方面排查,导致浪费了不少时间;
  • 替换多线程实现时考虑不周,简单的替换完成后大多情况下也正常,忽略了实现方式的差异。在动手替换前应该把代码通读一遍,先记录哪些可能需要修改的点,不然一旦替换成功后,思维容易定势;

按照以上解释虽然确实解决了我遇到的问题,但还有一些存在疑问的地方,比如问题一与问题二,按照上面的解释,问题应该是在所有有数据库操作的地方随机出现,但实际上问题总是在查询某一张表时出现,且查询这张表没有任何特别之处。这个按照上面的描述是解释不通的,可能还有某些没搞明白的地方,等一段时间来再回头看看是否能明悟。

7.1 为什么“问题一”与“问题二”总是在同一个地方出现

接口系统是在线的Web服务,框架用的是Flask,通过接口调用触发测试用例运行。

7.1.1 运行单个用例

调用参数:GET:/run_case?case_id=1
此时是通过case_id获取用例对象,然后将用例对象传递给测试引擎,完成后将引擎返回结果返回给前端。此时没有用到ThreadPoolExecutor对象,整个过程都是在flask的线程中完成的,flask框架会自动管理数据库连接,故没有碰到问题。

7.1.2 运行多个用例

调用参数:GET:/run_case?module_id=1
此时会先查询出模块下所有的测试用例对象,然后将测试用例集传递给测试引擎,测试引擎为了加速执行,会启用ThreadPoolExecutor线程,ThreadPoolExecutor线程始终存在,且里面的数据库连接已经脱离了flask的管理范涛,不会自动管理需要手工管理,一旦出现未处理异常,后续再使用此线程时则会碰到问题一与问题二。因为执行用例的步骤始终一致,都是先查询某张表,故错误始终出在同一处。
——2021.07.23

背景最近做了套接口测试系统,后台定时任务会批量去跑测试用例,用的是多线程,以前多线程模块用的是自己封装的包,但因为一些原因近期替换成了python标准库中的ThreadPoolExecutor,于是问题开始了……项目用的是:Flask + Flask-SQLAlchemy + Flask-APScheduler问题描述定时任务运行时大约有20%的概率会出现问题一与问题二问题一py...
扫描服务器ip开放端口,用线程池ThreadPoolExecutor,i7的cpu可以开到600个左右现成,大概20s左右扫描完65535个端口,根据电脑配置适当降低线程数 #!/usr/local/python3.6.3/bin/python3.6 # coding = utf-8 import socket import datetime import re from concurrent.futures import ThreadPoolExecutor, wait DEBUG = False # 判断ip地址输入是否符合规范 def check_ip(ipAddr): compi
# 耗cpu的操作,用多进程编程, 对于io操作来说,使用多线程编程 import time from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ProcessPoolExecutor def fib(n): if n <= 2: return 1 return fib(n - 2) + fib(n - 1) if __name__ == '__main__':
众所周知,python3多线程有threading,很好的支持了多线程,那么问题来了,为什么还需要线程池呢,其实很好回答,如果你要爬取网站有八百页,每页设置一个线程,难道能开启八百个么,光切换的时间也很高了吧。这时候就需要用到线程池,可以设置一个20的线程池,同时只有20个县城在运行,剩下的排队。直接上讲解 线程池模块 在threading中是没有线程池相关功能的,想要运行线程池需要自己重写,很明显向我这么懒不可能重写,而且自己编写线程池很难写的比较完美,还需要考虑复杂情况下的线程同步,很容易发生死锁。所以
参照c++的线程池,使用python的threading库实现线程池。import threading import time # 线程池的任务,包含一个可调用对象和一个参数数组 class ThreadTask(object): def __init__(self, job, args=list()): self.task = job self.args =
传统多线程方案会使用“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。 一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这... 普通多线程方案会使用“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。 一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。 使用线程池,线程预先被创建并放入线程池中,同时处理完当前任务之后并不销
ThreadPoolExecutorPython中的一个线程池实现,它可以用来管理和调度线程的执行。在ThreadPoolExecutor中,启动的线程都是守护线程。[1]守护线程是一种特殊类型的线程,当主线程退出时,守护线程会随之结束。但是,在ThreadPoolExecutor中,即使主线程退出了,进程并不会立即退出,而是等待所有线程池中的线程执行完毕后才会退出。这是因为ThreadPoolExecutor在设计上为了避免突然中断线程造成其他不良影响,比如文件写入未完成等,注册了atexit退出方法。简单来说,当调用线程退出时,并不会立即退出,而是会调用注册在atexit上的方法,而线程池的退出方法就是等待线程池中的所有线程执行完毕后再退出。所以,即使线程池中的线程是守护线程,但是主线程退出后,进程仍然会等待线程池中的线程执行完毕后才会退出。