#except Exception as e:
#print(e) #可以打印出异常的类型
except ZeroDivisionError: #抛出异常,执行下面的程序,如果是界面软件可以弹出一个窗口,提示用户输入错误
print(x)
else: #如果程序不存在异常,则执行该程序
print(100 / x)
finally:
print('the program end') #不管程序是否错误,始终都会执行
test(0)
# the program end
test(1)
# 10.0
# 100.0
# the program end
def test2():
assert False, 'error'
except:
print('another')
# 输出 another
def myLevel(level):
if level < 1:
raise ValueError('Invalid level', level) #自定义异常
myLevel(0)
except ValueError:
print('level < 1')
#输出 :level < 1
print('test 2--')
test2()
2.多进程
引入多进程的库文件: import multiprocessing
多进程测试, 需要在主函数 main 中 进行测试
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置
daemon 是守护进程 :daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。#主线程执行完毕之后,不管子线程是否执行完毕都随着主线程一起结束。
def worker_1():
print('this is worker 1')
time.sleep(3)
print('worker 1 end')
# n = 3
# while n > 0:
# print("The time is {0}".format(time.ctime()))
# time.sleep(interval)
# n -= 1
def worker_2():
print('this is worker 2')
time.sleep(3)
print('worker 2 end')
def worker_3():
print('this is worker 3')
time.sleep(3)
print('worker 3 end')
if __name__ == "__main__":
p1 = multiprocessing.Process(target = worker_1, args = ())
p2 = multiprocessing.Process(target = worker_2, args = ())
p3 = multiprocessing.Process(target = worker_3, args = ())
# p1.daemon = True # 设置子程序为 守护进程, 主程序结束后,它就随之结束(即如果没有join函数,将不执行worker1函数), 需要放在start 前面
p1.start() #进程调用 start 的时候,将自动调用 run 函数
p2.start()
p3.start() #运行3 个worker 程序花费的时间为3 s, 使用了3 个进程进行处理
p1.join()
p2.join()
p3.join() #添加 join 函数, 先执行 worker 函数, 再执行主进程函数
for p in multiprocessing.active_children(): #这是主进程函数
print('name is :' + p.name + 'pid is :' + str(p.pid) + 'is _alive: '+ str(p.is_alive()))
print('电脑的核数'+ str(multiprocessing.cpu_count())) # 电脑的核数4
print('end----')
#---没有 join 函数---
# name is :Process-1pid is :6488is _alive: True
# name is :Process-2pid is :5660is _alive: True
# name is :Process-3pid is :7776is _alive: True
# 电脑的核数4
# end----
# this is worker 1
# this is worker 2
# this is worker 3
# worker 1 end
# worker 2 end
# worker 3 end
#---有 join 函数--- 等待所有子进程结束,再执行主进程
# this is worker 1
# this is worker 2
# this is worker 3
# worker 1 end
# worker 2 end
# worker 3 end
# 电脑的核数4 #此时执行完线程, active_children() 为空
# end----
使用多个进程访问共享资源的时候,需要使用Lock 来避免访问的冲突
def worker_first(lock, f):
with lock:
with open(f, 'a+') as t: #with 自动关闭文件
n = 3
while n > 0:
t.write('first write ---')
n -= 1
def worker_second(lock, f):
lock.acquire() #获取锁
with open(f, 'a+') as t:
n = 3
while n > 0:
t.write('second write ---')
n -= 1
finally:
lock.release() #使用异常机制,保证最后释放锁
def myProcess():
lock = multiprocessing.Lock()
f = r'D:\myfile1.txt'
p1 = multiprocessing.Process(target=worker_first, args=(lock, f, ))
p2 = multiprocessing.Process(target=worker_second, args=(lock, f, ))
p1.start()
p2.start()
print('end -----------')
if __name__ == '__main__':
myProcess()
first write ---first write ---first write ---second write ---second write ---second write ---
进程池 Pool
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,
此时可以发挥进程池的功效。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,
那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
非阻塞和阻塞进程池
单个函数使用线程池:
def func(msg):
print('msg is :' + msg)
time.sleep(2)
print(msg + ' end')
return msg + ' re'
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3) #创建进程池
result = []
for i in range(4):
msg = '%d'%(i)
# result.append(pool.apply_async(func, (msg, )))
pool.apply_async(func, (msg, )) #非阻塞进程池, 维持总的进程数为 processes=3, 当一个进程结束后,添加另一个进程进去
# pool.apply(func, (msg, )) #阻塞进程池
print('start------')
pool.close() # close 函数需要在 join 函数之前, 关闭进程池后,没有进程加入到pool中
pool.join() # join函数等待所有子进程结束
print('all program done')
# print([re.get() for re in result]) #获取结果
#---非阻塞进程池 输出------
# start------
# msg is :0
# msg is :1
# msg is :2
# 0 end
# msg is :3
# 1 end
# 2 end
# 3 end
# all program done
#---阻塞进程池 输出------ 顺序执行
# msg is :0
# 0 end
# msg is :1
# 1 end
# msg is :2
# 2 end
# msg is :3
# 3 end
# start------
# all program done
多个函数使用线程池
def f1():
result = 0
print('this is one')
for i in range(100000):
result += i
return result
def f2():
result = 1
print('this is two')
for i in range(2, 100000):
result *= i
return result
if __name__ == '__main__':
start = time.time()
myPool = multiprocessing.Pool(processes=2)
result = []
print('start----')
myFunc = [f1, f2]
for f in myFunc:
result.append(myPool.apply_async(f)) #将输出结果保存到列表中
myPool.close()
myPool.join()
print([re.get() for re in result])
end = time.time()
print('花费的时间: ' + str(end-start)) #huafei de 时间: 8.397480010986328
print('end------')
使用get() 函数,同样可以获取pandas 的数据结构;
def f1():
result = 0
print('this is one')
result = pd.DataFrame(np.arange(16).reshape(-1, 4))
return result
# for i in range(100000):
# result += i
# return result
def f2():
result = 1
print('this is two')
result = pd.DataFrame(np.arange(9).reshape(-1, 3))
return result
# for i in range(2, 100000):
# result *= i
# return result
if __name__ == '__main__':
start = time.time()
myPool = multiprocessing.Pool(processes=2)
result = []
print('start----')
myFunc = [f1, f2]
for f in myFunc:
result.append(myPool.apply_async(f)) #将输出结果保存到列表中
myPool.close()
myPool.join()
for re in result:
print(re.get())
输出结果:
start----
this is one
this is two
0 1 2 3
0 0 1 2 3
1 4 5 6 7
2 8 9 10 11
3 12 13 14 15
0 1 2
0 0 1 2
1 3 4 5
2 6 7 8
花费的时间: 0.9370532035827637
end------
3.PyQt5 中的QTimer 模块
pyqt5中的多线程的应用,多线程技术涉及3种方法,1.使用计时器QTimer, 2.使用多线程模块QThread ,3.使用事件处理功能
1.使用QTimer模块,创建QTimer实例,将timeout信号连接到槽函数,并调用timeout 信号。 self.timer.start(1000)时间为毫秒
引入模块
from PyQt5.QtCore import QTimer, QDateTime
def timerTest():
class WinForm(QWidget):
def __init__(self,parent=None):
super(WinForm,self).__init__(parent)
self.setWindowTitle("QTimer demo")
self.listFile= QListWidget()
self.label = QLabel('显示当前时间')
self.startBtn = QPushButton('开始')
self.endBtn = QPushButton('结束')
layout = QGridLayout(self)
# 初始化一个定时器
self.timer = QTimer(self)
# showTime()方法
self.timer.timeout.connect(self.showTime) #
layout.addWidget(self.label,0,0,1,2)
layout.addWidget(self.startBtn,1,0)
layout.addWidget(self.endBtn,1,1)
self.startBtn.clicked.connect( self.startTimer) #首先按下start开始计时,到了2s后,触发timeout 信号
self.endBtn.clicked.connect( self.endTimer)
self.setLayout(layout)
QTimer.singleShot(5000, self.showmsg) #给定的时间间隔后,调用一个槽函数来发射信号
def showmsg(self):
QMessageBox.information(self, '提示信息', '你好')
def showTime(self):
# 获取系统现在的时间
time = QDateTime.currentDateTime() #获取当前时间
# 设置系统时间显示格式
timeDisplay = time.toString("yyyy-MM-dd hh:mm:ss dddd")
# 在标签上显示时间
self.label.setText( timeDisplay )
def startTimer(self):
# 设置计时间隔并启动
self.timer.start(1000) #时间为 1 s
self.startBtn.setEnabled(False)
self.endBtn.setEnabled(True)
def endTimer(self):
self.timer.stop()
self.startBtn.setEnabled(True) #依次循环
self.endBtn.setEnabled(False)
if __name__ == "__main__":
app = QApplication(sys.argv)
form = WinForm()
form.show()
# QTimer.singleShot(5000, app.quit) #界面运行5秒后,自动关闭
sys.exit(app.exec_())
timerTest()
QThread模块,使用该模块开始一个线程,可以创建它的一个子类,然后覆盖QThread.run() 的方法。调用自定义的线程时,调用start()方法,会自动调用run的方法,
QThread 还有 started , finished 信号,可以见信号连接到槽函数中
def threadTest():
class MainWidget(QWidget):
def __init__(self,parent=None):
super(MainWidget,self).__init__(parent)
self.setWindowTitle("QThread 例子")
self.thread = Worker() #创建线程
self.listFile = QListWidget()
self.btnStart = QPushButton('开始')
self.btnStop = QPushButton('结束')
layout = QGridLayout(self)
layout.addWidget(self.listFile,0,0,1,2)
layout.addWidget(self.btnStart,1,1)
layout.addWidget(self.btnStop,2,1)
self.btnStart.clicked.connect( self.slotStart ) #通过按钮状态实现 线程的开启
self.btnStop.clicked.connect(self.slotStop)
self.thread.sinOut.connect(self.slotAdd)
def slotStop(self):
self.btnStart.setEnabled(True)
self.thread.wait(2000)
def slotAdd(self,file_inf):
self.listFile.addItem(file_inf) #增加子项
def slotStart(self):
self.btnStart.setEnabled(False)
self.thread.start() #线程开始
#创建一个线程
class Worker(QThread):
sinOut = pyqtSignal(str) #自定义一个信号
def __init__(self, parent=None):
super(Worker, self).__init__(parent)
self.work = True
self.num = 0
def run(self):
while self.work == True:
listStr = 'this is index file {0}'.format(self.num)
self.sinOut.emit(listStr) #发射一个信号
self.num += 1
self.sleep(2) #休眠2 秒
def __del__(self):
self.work = False
self.wait()
if __name__ == "__main__":
app = QApplication(sys.argv)
demo = MainWidget()
demo.show()
sys.exit(app.exec_())
# threadTest()
demo 2:
def threadTest2():
global sec
sec=0
class WorkThread(QThread): #自定义一个线程
trigger = pyqtSignal()
def __int__(self):
super(WorkThread,self).__init__()
def run(self):
for i in range(2000000000):
print('haha ' + str(i)) #此处放运行 量较大的程序
# 循环完毕后发出信号
self.trigger.emit()
def countTime():
global sec
sec += 1
# LED显示数字+1
lcdNumber.display(sec)
def work():
# 计时器每秒计数
timer.start(1000) #每秒运行完后,是lcd增加数字
# 计时开始
workThread.start()
# 当获得循环完毕的信号时,停止计数
workThread.trigger.connect(timeStop)
def timeStop():
timer.stop()
print("运行结束用时",lcdNumber.value())
global sec
sec=0
if __name__ == "__main__":
app = QApplication(sys.argv)
top = QWidget()
top.resize(300,120)
# 垂直布局类QVBoxLayout
layout = QVBoxLayout(top)
# 加个显示屏
lcdNumber = QLCDNumber()
layout.addWidget(lcdNumber)
button = QPushButton("测试")
layout.addWidget(button)
timer = QTimer()
workThread = WorkThread()
button.clicked.connect(work)
# 每次计时结束,触发 countTime
timer.timeout.connect(countTime)
top.show()
sys.exit(app.exec_())
threadTest2()
项目推荐:
2000多G的计算机各行业电子资源分享(持续更新)
2020年微信小程序全栈项目之喵喵交友【附课件和源码】
Spring Boot开发小而美的个人博客【附课件和源码】
Java微服务实战296集大型视频-谷粒商城【附代码和课件】
Java开发微服务畅购商城实战【全357集大项目】-附代码和课件
最全最详细数据结构与算法视频-【附课件和源码】
多进程
https://www.cnblogs.com/kaituorensheng/p/4445418.html
1.异常处理机制def test(x): try: y = 10 / x print(y) #except Exception as e: #print(e) #可以打印出异常的类型 except ZeroDivisionError: #抛出异常,执行下面的程序,如果是界面软件可以弹出一个窗口,提示用户输入错误 ...
Thinkpat T590
在项目很大的情况下,界面加载的东西会很多,而且必须在主线程
中
加载,无法用
多线程
去处理来节省时间,这时候打开程序的时候,双击exe,会过很长时间才能显示界面。
这时候,心急的用户可能就想:这破软件是不是没运行起来啊,然后有双击了一下exe,这下好了,直接一下开了两个程序。
为了解决这种现象,可以使用异步调用的方式或者单次计时器,让主界面先显示出来,然后再加载其
参考博客:https://blog.csdn.net/kidults/article/details/80091788
[static] void
QT
imer
::
single
Shot
(int msec, const QObject *receiver, const char *member)
QT
imer
::
single
Shot
(1000, this, SLOT(slot_index()));
单
single
Shot
,表示它只会触发一次,发出一次信号,然后来执行槽函数。
PyQt5
之
QT
imer
定时器
如果要在应用程序
中
周期性地进行某项操作,比如周期性地检测主机的CPU值,则需要用到
QT
imer
定时器,
QT
imer
类提供了重复的和单次的定时器。要使用定时器,需要先创建一个
QT
imer
实例,将其timeout信号连接到相应的槽,并调用start()。然后定时器会以恒定的间隔发出timeout信号,当窗口控件收到timeout信号后,它就会停止这个定时器。
一、
QT
imer
类
中
的常用方法
from multiprocessing import Pool
from
PyQt5
import
Qt
Widgets
from
PyQt5
.
Qt
Widgets import *
import sys
class Window(QWidget):
def __init__(self):
super().__init__()
self.set
【求解答】
pyqt5
主界面和控制算法运行的
多进程
有个特别头大的问题,希望在CSDN里面能得到相关大佬的解答!!! 呜呜呜,无助…
情况说明: 我使用了
pyqt5
开发深度学习算法的应用程序,主
进程
是控制界面运行的,也就是app=QApplication(sys.argv)这个是主
进程
,而在界面里面有一个按钮(ok),点击是运行深度学习的某种算法。
问题: 我希望在运行算法的时候,主界面不会出现无...
Tqdm 是
Python
进度条库,可以在
Python
长循环
中
添加一个进度提示信息。用户只需要封装任意的迭代器,是一个快速、扩展性强的进度条工具库。用法:tqdm(iterator)安装方法:pip install tqdm 使用方法一: 传入可迭代对象import time
from tqdm import *
for i in tqdm(range(1000)):
time.sl...
一、继承
QT
hread
继承
QT
hread,这应该是最常用的方法了。我们可以通过重写虚函数void
QT
hread::run ()实现我们自己想做的操作,实现新建线程的目的。前面已经介绍了
Qt
hread,这里就不重复了。
这种方法,我们每一次要新建一个线程都需要继承
Qt
hread,实现一个新的类,有点不太方便。但是相对于Qrunnable,这种方法的好处就是我们可以直接调用对象的sta
在
Python
中
使用
PyQt5
实现多层界面设计可以使用 QStackedWidget 组件。
QStackedWidget 组件是一个容器组件,可以在其
中
嵌入多个子界面,并通过设置当前层来显示不同的子界面。
使用方法如下:
1. 创建 QStackedWidget 对象
2. 创建多个子界面(QWidget 对象)
3. 将子界面添加到 QStackedWidget
中
4. 设置当前层来显示不同的子界面
下面是一个简单的例子:
```
python
import sys
from
PyQt5
.
Qt
Widgets import QApplication, QWidget, QStackedWidget, QVBoxLayout
class MainWindow(QWidget):
def __init__(self):
super().__init__()
self.initUI()
def initUI(self):
# 创建 QStackedWidget 对象
stacked_widget = QStackedWidget()
# 创建多个子界面
page1 = QWidget()
page2 = QWidget()
page3 = QWidget()
# 将子界面添加到 QStackedWidget
中
stacked_widget.addWidget(page1)
stacked_widget.addWidget(page2)
stacked_widget.addWidget(page3)
# 设置当前层
stacked_widget.setCurrentIndex(0)
# 在窗口
中
添加 QStackedWidget 并显示
layout = QVBoxLayout()
layout.addWidget(stacked_widget)
self.setLayout(layout)
self.show()
if __name__ == '__main__':
app = QApplication(sys.argv)
window = MainWindow()
sys.exit(app.exec_())
在这个例子
中
,我们创建了三个子界面(page1、page2、page3)并将它们添加到 QStackedWidget
中
,然后设置当前层为第一层,就可以在窗口
中
显示第一