相关文章推荐
失眠的啤酒  ·  [Python] ...·  1 年前    · 
俊逸的牛腩  ·  NiFi ...·  1 年前    · 
#except Exception as e: #print(e) #可以打印出异常的类型 except ZeroDivisionError: #抛出异常,执行下面的程序,如果是界面软件可以弹出一个窗口,提示用户输入错误 print(x) else: #如果程序不存在异常,则执行该程序 print(100 / x) finally: print('the program end') #不管程序是否错误,始终都会执行 test(0) # the program end test(1) # 10.0 # 100.0 # the program end
def test2(): 
        assert False, 'error'
    except:
        print('another')
    # 输出 another
    def myLevel(level):
        if level < 1:
            raise ValueError('Invalid level', level)  #自定义异常
        myLevel(0)    
    except ValueError:
        print('level < 1')
    #输出 :level < 1
print('test 2--')
test2()

2.多进程

引入多进程的库文件: import multiprocessing

多进程测试, 需要在主函数 main 中 进行测试
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置

daemon 是守护进程 :daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。#主线程执行完毕之后,不管子线程是否执行完毕都随着主线程一起结束。

def worker_1():
    print('this is worker 1')
    time.sleep(3)
    print('worker 1 end')
#     n = 3
#     while n > 0:
#         print("The time is {0}".format(time.ctime()))
#         time.sleep(interval)
#         n -= 1
def worker_2():
    print('this is worker 2')
    time.sleep(3)
    print('worker 2 end')
def worker_3():
    print('this is worker 3')
    time.sleep(3)
    print('worker 3 end')
if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = ())
    p2 = multiprocessing.Process(target = worker_2, args = ())
    p3 = multiprocessing.Process(target = worker_3, args = ())
#     p1.daemon = True    # 设置子程序为 守护进程, 主程序结束后,它就随之结束(即如果没有join函数,将不执行worker1函数), 需要放在start 前面
    p1.start()  #进程调用 start 的时候,将自动调用 run 函数
    p2.start()
    p3.start()      #运行3 个worker 程序花费的时间为3 s, 使用了3 个进程进行处理
    p1.join()
    p2.join()
    p3.join()        #添加 join 函数, 先执行 worker 函数, 再执行主进程函数
    for p in multiprocessing.active_children(): #这是主进程函数
        print('name is :' + p.name + 'pid is :' + str(p.pid) + 'is _alive: '+ str(p.is_alive()))
    print('电脑的核数'+ str(multiprocessing.cpu_count()))    # 电脑的核数4
    print('end----')
#---没有 join 函数---
# name is :Process-1pid is :6488is _alive: True
# name is :Process-2pid is :5660is _alive: True
# name is :Process-3pid is :7776is _alive: True
# 电脑的核数4
# end----
# this is worker 1
# this is worker 2
# this is worker 3
# worker 1 end
# worker 2 end
# worker 3 end    
#---有 join 函数---   等待所有子进程结束,再执行主进程
# this is worker 1
# this is worker 2
# this is worker 3
# worker 1 end
# worker 2 end
# worker 3 end
# 电脑的核数4    #此时执行完线程, active_children() 为空
# end----  

使用多个进程访问共享资源的时候,需要使用Lock 来避免访问的冲突

def worker_first(lock, f):
    with lock:
        with open(f, 'a+') as t:    #with 自动关闭文件
            n = 3
            while n > 0:
                t.write('first write ---')
                n -= 1
def worker_second(lock, f):
    lock.acquire()  #获取锁
        with open(f, 'a+') as t:
            n = 3
            while n > 0:
                t.write('second write ---')
                n -= 1
    finally:
        lock.release()  #使用异常机制,保证最后释放锁
def myProcess():
    lock = multiprocessing.Lock()
    f = r'D:\myfile1.txt'
    p1 = multiprocessing.Process(target=worker_first, args=(lock, f, ))
    p2 = multiprocessing.Process(target=worker_second, args=(lock, f, ))
    p1.start()
    p2.start()
    print('end -----------')
if __name__ == '__main__':
    myProcess()
first write ---first write ---first write ---second write ---second write ---second write ---

进程池 Pool
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,
此时可以发挥进程池的功效。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,
那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的

非阻塞和阻塞进程池

单个函数使用线程池:

def func(msg):
    print('msg is :' + msg)
    time.sleep(2)
    print(msg + ' end')
    return msg + ' re'
if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)    #创建进程池
    result = [] 
    for i in range(4):
        msg = '%d'%(i)
#         result.append(pool.apply_async(func, (msg, )))
        pool.apply_async(func, (msg, )) #非阻塞进程池, 维持总的进程数为 processes=3, 当一个进程结束后,添加另一个进程进去
#         pool.apply(func, (msg, ))   #阻塞进程池
    print('start------')
    pool.close()  #  close 函数需要在 join 函数之前, 关闭进程池后,没有进程加入到pool中
    pool.join() # join函数等待所有子进程结束
    print('all program done')
#     print([re.get() for re in result])  #获取结果
#---非阻塞进程池 输出------
# start------
# msg is :0
# msg is :1
# msg is :2
# 0 end
# msg is :3
# 1 end
# 2 end
# 3 end
# all program done    
#---阻塞进程池 输出------ 顺序执行
# msg is :0
# 0 end
# msg is :1
# 1 end
# msg is :2
# 2 end
# msg is :3
# 3 end
# start------
# all program done

多个函数使用线程池

def f1():
    result = 0
    print('this is one')
    for i in range(100000):
        result += i
    return result
def f2():
    result = 1
    print('this is two')
    for i in range(2, 100000):
        result *= i
    return result
if __name__ == '__main__':
    start = time.time()
    myPool = multiprocessing.Pool(processes=2)   
    result = []
    print('start----')
    myFunc = [f1, f2]
    for f in myFunc:
        result.append(myPool.apply_async(f))    #将输出结果保存到列表中
    myPool.close()
    myPool.join()
    print([re.get() for re in result])
    end = time.time()
    print('花费的时间: ' + str(end-start)) #huafei de 时间: 8.397480010986328
    print('end------')

使用get() 函数,同样可以获取pandas 的数据结构;

def f1():
    result = 0
    print('this is one')
    result = pd.DataFrame(np.arange(16).reshape(-1, 4))
    return result
#     for i in range(100000):
#         result += i
#     return result
def f2():
    result = 1
    print('this is two')
    result = pd.DataFrame(np.arange(9).reshape(-1, 3))
    return result    
#     for i in range(2, 100000):
#         result *= i
#     return result
if __name__ == '__main__':
    start = time.time()
    myPool = multiprocessing.Pool(processes=2)   
    result = []
    print('start----')
    myFunc = [f1, f2]
    for f in myFunc:
        result.append(myPool.apply_async(f))    #将输出结果保存到列表中
    myPool.close()
    myPool.join()
    for re in result:
        print(re.get())

输出结果:

start----
this is one
this is two
    0   1   2   3
0   0   1   2   3
1   4   5   6   7
2   8   9  10  11
3  12  13  14  15
   0  1  2
0  0  1  2
1  3  4  5
2  6  7  8
花费的时间: 0.9370532035827637
end------

3.PyQt5 中的QTimer 模块

pyqt5中的多线程的应用,多线程技术涉及3种方法,1.使用计时器QTimer, 2.使用多线程模块QThread ,3.使用事件处理功能
1.使用QTimer模块,创建QTimer实例,将timeout信号连接到槽函数,并调用timeout 信号。 self.timer.start(1000)时间为毫秒
引入模块
from PyQt5.QtCore import QTimer, QDateTime

def timerTest():
    class WinForm(QWidget):  
        def __init__(self,parent=None): 
            super(WinForm,self).__init__(parent) 
            self.setWindowTitle("QTimer demo")
            self.listFile= QListWidget() 
            self.label = QLabel('显示当前时间')
            self.startBtn = QPushButton('开始') 
            self.endBtn = QPushButton('结束') 
            layout = QGridLayout(self) 
            # 初始化一个定时器
            self.timer = QTimer(self)
            # showTime()方法
            self.timer.timeout.connect(self.showTime)   #
            layout.addWidget(self.label,0,0,1,2)   
            layout.addWidget(self.startBtn,1,0) 
            layout.addWidget(self.endBtn,1,1)         
            self.startBtn.clicked.connect( self.startTimer) #首先按下start开始计时,到了2s后,触发timeout 信号
            self.endBtn.clicked.connect( self.endTimer) 
            self.setLayout(layout)   
            QTimer.singleShot(5000, self.showmsg)   #给定的时间间隔后,调用一个槽函数来发射信号
        def showmsg(self):
            QMessageBox.information(self, '提示信息', '你好')
        def showTime(self): 
            # 获取系统现在的时间
            time = QDateTime.currentDateTime() #获取当前时间
            # 设置系统时间显示格式
            timeDisplay = time.toString("yyyy-MM-dd hh:mm:ss dddd")
            # 在标签上显示时间
            self.label.setText( timeDisplay ) 
        def startTimer(self): 
            # 设置计时间隔并启动
            self.timer.start(1000)  #时间为 1 s
            self.startBtn.setEnabled(False)
            self.endBtn.setEnabled(True)
        def endTimer(self): 
            self.timer.stop()
            self.startBtn.setEnabled(True)  #依次循环
            self.endBtn.setEnabled(False)
    if __name__ == "__main__":  
        app = QApplication(sys.argv)  
        form = WinForm()  
        form.show()  
#         QTimer.singleShot(5000, app.quit)    #界面运行5秒后,自动关闭
        sys.exit(app.exec_())
timerTest()

QThread模块,使用该模块开始一个线程,可以创建它的一个子类,然后覆盖QThread.run() 的方法。调用自定义的线程时,调用start()方法,会自动调用run的方法,
QThread 还有 started , finished 信号,可以见信号连接到槽函数中

def threadTest():
    class MainWidget(QWidget):
        def __init__(self,parent=None):
            super(MainWidget,self).__init__(parent)
            self.setWindowTitle("QThread 例子")    
            self.thread = Worker()  #创建线程
            self.listFile = QListWidget()
            self.btnStart = QPushButton('开始')
            self.btnStop = QPushButton('结束')
            layout = QGridLayout(self)
            layout.addWidget(self.listFile,0,0,1,2)
            layout.addWidget(self.btnStart,1,1) 
            layout.addWidget(self.btnStop,2,1)
            self.btnStart.clicked.connect( self.slotStart ) #通过按钮状态实现  线程的开启
            self.btnStop.clicked.connect(self.slotStop)
            self.thread.sinOut.connect(self.slotAdd)
        def slotStop(self):
            self.btnStart.setEnabled(True)
            self.thread.wait(2000)
        def slotAdd(self,file_inf):
            self.listFile.addItem(file_inf) #增加子项
        def slotStart(self):
            self.btnStart.setEnabled(False)
            self.thread.start() #线程开始
#创建一个线程
    class Worker(QThread):
        sinOut = pyqtSignal(str) #自定义一个信号
        def __init__(self, parent=None):
            super(Worker, self).__init__(parent)
            self.work = True
            self.num = 0
        def run(self):
            while self.work == True:
                listStr = 'this is index file {0}'.format(self.num)
                self.sinOut.emit(listStr)   #发射一个信号
                self.num += 1
                self.sleep(2)       #休眠2 秒
        def __del__(self):
            self.work = False
            self.wait()
    if __name__ == "__main__":              
        app = QApplication(sys.argv)
        demo = MainWidget()
        demo.show()
        sys.exit(app.exec_())
# threadTest()

demo 2:

def threadTest2():
    global sec
    sec=0
    class WorkThread(QThread):  #自定义一个线程
        trigger = pyqtSignal()
        def __int__(self):
            super(WorkThread,self).__init__()
        def run(self):
            for i in range(2000000000):
                print('haha ' + str(i)) #此处放运行 量较大的程序
            # 循环完毕后发出信号        
            self.trigger.emit()        
    def countTime():
        global  sec
        sec += 1
        # LED显示数字+1
        lcdNumber.display(sec)          
    def work():
        # 计时器每秒计数
        timer.start(1000)   #每秒运行完后,是lcd增加数字
        # 计时开始    
        workThread.start()       
        # 当获得循环完毕的信号时,停止计数    
        workThread.trigger.connect(timeStop)  
    def timeStop():
        timer.stop()
        print("运行结束用时",lcdNumber.value())
        global sec
        sec=0
    if __name__ == "__main__":      
        app = QApplication(sys.argv) 
        top = QWidget()
        top.resize(300,120)
        # 垂直布局类QVBoxLayout
        layout = QVBoxLayout(top) 
        # 加个显示屏    
        lcdNumber = QLCDNumber()             
        layout.addWidget(lcdNumber)
        button = QPushButton("测试")
        layout.addWidget(button)
        timer = QTimer()
        workThread = WorkThread()
        button.clicked.connect(work)
        # 每次计时结束,触发 countTime
        timer.timeout.connect(countTime)      
        top.show()
        sys.exit(app.exec_())
threadTest2()

项目推荐:

2000多G的计算机各行业电子资源分享(持续更新)

2020年微信小程序全栈项目之喵喵交友【附课件和源码】

Spring Boot开发小而美的个人博客【附课件和源码】

Java微服务实战296集大型视频-谷粒商城【附代码和课件】

Java开发微服务畅购商城实战【全357集大项目】-附代码和课件

最全最详细数据结构与算法视频-【附课件和源码】

多进程 https://www.cnblogs.com/kaituorensheng/p/4445418.html

1.异常处理机制def test(x): try: y = 10 / x print(y) #except Exception as e: #print(e) #可以打印出异常的类型 except ZeroDivisionError: #抛出异常,执行下面的程序,如果是界面软件可以弹出一个窗口,提示用户输入错误 ... Thinkpat T590 在项目很大的情况下,界面加载的东西会很多,而且必须在主线程 加载,无法用 多线程 去处理来节省时间,这时候打开程序的时候,双击exe,会过很长时间才能显示界面。 这时候,心急的用户可能就想:这破软件是不是没运行起来啊,然后有双击了一下exe,这下好了,直接一下开了两个程序。 为了解决这种现象,可以使用异步调用的方式或者单次计时器,让主界面先显示出来,然后再加载其
参考博客:https://blog.csdn.net/kidults/article/details/80091788 [static] void QT imer :: single Shot (int msec, const QObject *receiver, const char *member) QT imer :: single Shot (1000, this, SLOT(slot_index())); 单 single Shot ,表示它只会触发一次,发出一次信号,然后来执行槽函数。
PyQt5 QT imer 定时器 如果要在应用程序 周期性地进行某项操作,比如周期性地检测主机的CPU值,则需要用到 QT imer 定时器, QT imer 类提供了重复的和单次的定时器。要使用定时器,需要先创建一个 QT imer 实例,将其timeout信号连接到相应的槽,并调用start()。然后定时器会以恒定的间隔发出timeout信号,当窗口控件收到timeout信号后,它就会停止这个定时器。 一、 QT imer 的常用方法 from multiprocessing import Pool from PyQt5 import Qt Widgets from PyQt5 . Qt Widgets import * import sys class Window(QWidget): def __init__(self): super().__init__() self.set
【求解答】 pyqt5 主界面和控制算法运行的 多进程 有个特别头大的问题,希望在CSDN里面能得到相关大佬的解答!!! 呜呜呜,无助… 情况说明: 我使用了 pyqt5 开发深度学习算法的应用程序,主 进程 是控制界面运行的,也就是app=QApplication(sys.argv)这个是主 进程 ,而在界面里面有一个按钮(ok),点击是运行深度学习的某种算法。 问题: 我希望在运行算法的时候,主界面不会出现无...
Tqdm 是 Python 进度条库,可以在 Python 长循环 添加一个进度提示信息。用户只需要封装任意的迭代器,是一个快速、扩展性强的进度条工具库。用法:tqdm(iterator)安装方法:pip install tqdm 使用方法一: 传入可迭代对象import time from tqdm import * for i in tqdm(range(1000)): time.sl...
一、继承 QT hread 继承 QT hread,这应该是最常用的方法了。我们可以通过重写虚函数void QT hread::run ()实现我们自己想做的操作,实现新建线程的目的。前面已经介绍了 Qt hread,这里就不重复了。 这种方法,我们每一次要新建一个线程都需要继承 Qt hread,实现一个新的类,有点不太方便。但是相对于Qrunnable,这种方法的好处就是我们可以直接调用对象的sta
Python 使用 PyQt5 实现多层界面设计可以使用 QStackedWidget 组件。 QStackedWidget 组件是一个容器组件,可以在其 嵌入多个子界面,并通过设置当前层来显示不同的子界面。 使用方法如下: 1. 创建 QStackedWidget 对象 2. 创建多个子界面(QWidget 对象) 3. 将子界面添加到 QStackedWidget 4. 设置当前层来显示不同的子界面 下面是一个简单的例子: ``` python import sys from PyQt5 . Qt Widgets import QApplication, QWidget, QStackedWidget, QVBoxLayout class MainWindow(QWidget): def __init__(self): super().__init__() self.initUI() def initUI(self): # 创建 QStackedWidget 对象 stacked_widget = QStackedWidget() # 创建多个子界面 page1 = QWidget() page2 = QWidget() page3 = QWidget() # 将子界面添加到 QStackedWidget stacked_widget.addWidget(page1) stacked_widget.addWidget(page2) stacked_widget.addWidget(page3) # 设置当前层 stacked_widget.setCurrentIndex(0) # 在窗口 添加 QStackedWidget 并显示 layout = QVBoxLayout() layout.addWidget(stacked_widget) self.setLayout(layout) self.show() if __name__ == '__main__': app = QApplication(sys.argv) window = MainWindow() sys.exit(app.exec_()) 在这个例子 ,我们创建了三个子界面(page1、page2、page3)并将它们添加到 QStackedWidget ,然后设置当前层为第一层,就可以在窗口 显示第一