def test2():
assert False, 'error'
except:
print('another')
2.多进程
引入多进程的库文件: import multiprocessing
多进程测试, 需要在主函数 main 中 进行测试
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置
daemon 是守护进程 :daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。#主线程执行完毕之后,不管子线程是否执行完毕都随着主线程一起结束。
def worker_1():
print('this is worker 1')
time.sleep(3)
print('worker 1 end')
#---没有 join 函数---
# name is :Process-1pid is :6488is _alive: True
# name is :Process-2pid is :5660is _alive: True
# name is :Process-3pid is :7776is _alive: True
# 电脑的核数4
# end----
# this is worker 1
# this is worker 2
# this is worker 3
# worker 1 end
# worker 2 end
# worker 3 end
#---有 join 函数--- 等待所有子进程结束,再执行主进程
# this is worker 1
# this is worker 2
# this is worker 3
# worker 1 end
# worker 2 end
# worker 3 end
# 电脑的核数4 #此时执行完线程, active_children() 为空
# end----
使用多个进程访问共享资源的时候,需要使用Lock 来避免访问的冲突
def worker_first(lock, f):
with lock:
with open(f, 'a+') as t:
first write ---first write ---first write ---second write ---second write ---second write ---
进程池 Pool
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,
此时可以发挥进程池的功效。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,
那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
非阻塞和阻塞进程池
单个函数使用线程池:
def func(msg):
print('msg is :' + msg)
time.sleep(2)
print(msg + ' end')
return msg + ' re'
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
#---非阻塞进程池 输出------
# start------
# msg is :0
# msg is :1
# msg is :2
# 0 end
# msg is :3
# 1 end
# 2 end
# 3 end
# all program done
#---阻塞进程池 输出------ 顺序执行
# msg is :0
# 0 end
# msg is :1
# 1 end
# msg is :2
# 2 end
# msg is :3
# 3 end
# start------
# all program done
多个函数使用线程池
def f1():
result = 0
print('this is one')
for i in range(100000):
result += i
return result
def f2():
result = 1
print('this is two')
for i in range(2, 100000):
result *= i
return result
if __name__ == '__main__':
start = time.time()
myPool = multiprocessing.Pool(processes=2)
result = []
print('start----')
myFunc = [f1, f2]
for f in myFunc:
result.append(myPool.apply_async(f))
使用get() 函数,同样可以获取pandas 的数据结构;
def f1():
result = 0
print('this is one')
result = pd.DataFrame(np.arange(16).reshape(-1, 4))
return result
输出结果:
start----
this is one
this is two
0 1 2 3
0 0 1 2 3
1 4 5 6 7
2 8 9 10 11
3 12 13 14 15
0 1 2
0 0 1 2
1 3 4 5
2 6 7 8
花费的时间: 0.9370532035827637
end------
3.PyQt5 中的QTimer 模块
pyqt5中的多线程的应用,多线程技术涉及3种方法,1.使用计时器QTimer, 2.使用多线程模块QThread ,3.使用事件处理功能
1.使用QTimer模块,创建QTimer实例,将timeout信号连接到槽函数,并调用timeout 信号。 self.timer.start(1000)时间为毫秒
引入模块
from PyQt5.QtCore import QTimer, QDateTime
def timerTest():
class WinForm(QWidget):
def __init__(self,parent=None):
super(WinForm,self).__init__(parent)
self.setWindowTitle("QTimer demo")
self.listFile= QListWidget()
self.label = QLabel('显示当前时间')
self.startBtn = QPushButton('开始')
self.endBtn = QPushButton('结束')
layout = QGridLayout(self)
QThread模块,使用该模块开始一个线程,可以创建它的一个子类,然后覆盖QThread.run() 的方法。调用自定义的线程时,调用start()方法,会自动调用run的方法,
QThread 还有 started , finished 信号,可以见信号连接到槽函数中
def threadTest():
class MainWidget(QWidget):
def __init__(self,parent=None):
super(MainWidget,self).__init__(parent)
self.setWindowTitle("QThread 例子")
self.thread = Worker()
demo 2:
def threadTest2():
global sec
sec=0
class WorkThread(QThread):