在不涉及并发概念的情况下,一个单进程单线程的程序执行情况可能是这样的:调用一个函数,发出调用的代码开始等待函数执行完成,直到函数返回结果,如果函数抛出异常,则可以把调用函数的代码放到
try/except
语句块中,来捕获和处理异常。
但是,当涉及到并发时,情况就没这么简单了。在启用多线程(或多进程)后,你无法在一个线程(或进程)中知道另一个线程(或进程)被调用的函数何时执行完成,也无法轻松得知函数调用结果或捕获异常。只能采用某种通知的方式,来进行线程(或进程)间通信,这可能是一个信号,也可能是一个消息队列等。
本文主要讲解如何让 Python 能够同时处理多个任务,即如何使用并发模型编程。
PEP 492
),开始原生支持了协程,我们不再需要编写难懂的
yeild from
来使用生成器实现协程功能了。
Python 协程通常在单线程的事件循环中运行。协程是一个可以挂起自身并在以后恢复的“函数”,
async
用来定义协程,一个协程必须显式的使用
await
关键字主动让出控制权,另一个协程才有机会在主事件循环的调度下并发的执行。
协程版本旋转指针程序需要对
spin
和
slow
两个函数做如下修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
import asyncio import itertools
async def spin(msg: str) -> None: for char in itertools.cycle(r'\|/-'): status = f'\r{char} {msg}' print(status, end='', flush=True) try: await asyncio.sleep(0.1) except asyncio.CancelledError: break blanks = ' ' * len(status) print(f'\r{blanks}\r', end='')
async def slow() -> int: await asyncio.sleep(3) return 42
|
首先我们使用
async def
将
spin
定义为一个协程,让其不再是一个常规的函数。
spin
协程取消了第二个参数,因为 Python 没有为协程提供
Event
对象来进行通信,我们需要采用其他招式。
在原来使用
Event
通信的地方替换成了由
try/except
包裹的
await asyncio.sleep(0.1)
语句块代码。这段代码块有如下三个作用:
await asyncio.sleep(0.1)
的作用类似
time.sleep
,可以让程序暂停 0.1 秒。不同的是,使用
await asyncio.sleep
暂停时不阻塞其他协程。
因为这里加入了
await
关键字,代码执行到这里时,当前协程会主动让出控制权,不再继续往下执行,由事件循环来调度其他协程执行。
如果在控制当前协程的
Task
实例中调用
cancel
方法(有关
Task
的内容稍后会进行讲解),
await asyncio.sleep(0.1)
会抛出
CancelledError
异常,这里使用
try/except
捕获异常后退出循环。这样,我们就在多个协程间利用异常机制完成了通信,而不必借助于额外的
Event
对象。
slow
函数也被改造为一个协程,其内部原来编写的阻塞代码
time.sleep(3)
被替换为了
await asyncio.sleep(3)
。
可以发现,其实协程与普通的函数在定义上差别不大,只不过多了两个关键字
async
和
await
。但二者在执行方式上大有不同,普通函数在使用
()
运算符调用时(即
spin()
)会立刻执行,而协程在使用
spin()
时只会创建一个协程对象,不会执行。
要执行上面两个协程对象,我们还要对
supervisor
和
main
函数进行改造:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
async def supervisor() -> int: spinner = asyncio.create_task(spin('thinking!')) print(f'spinner object: {spinner}') result = await slow() spinner.cancel() return result
def main() -> None: result = asyncio.run(supervisor()) print(f'Answer: {result}')
if __name__ == '__main__': main()
|
supervisor
函数同样被修改为协程,
spin('thinking!')
并不会像函数一样立即执行,只会创建一个协程对象,将它传递给
asyncio.create_task
,我们可以得到一个
asyncio.Task
对象,这个
Task
对象包装了协程对象并调度其执行,它还提供控制和查询协程对象运行状态的方法。
使用
await
关键字来调用
slow
协程,这将阻塞
supervisor
程序(但会让出控制权,使其他协程得以执行),直到
slow
返回,返回结果赋值给
result
变量。
接着调用了
spinner.cancel()
,
Task.cancel
方法调用后,将立即在
Task
所包装的协程对象即
spin
协程中抛出
CancelledError
异常,
spin
中需要使用
try/except
捕获
await asyncio.sleep(0.1)
抛出的异常,这样,就实现了不同协程之间通过异常进行通信。
main
是唯一的普通函数,没有被改造为协程。
main
函数中的
asyncio.run
是整个协程的启动入口,
asyncio.run
函数启动事件循环,驱动
supervisor()
协程运行,最终也将启动其他协程。
在以上示例代码中,我们可以总结出运行协程的 3 种方式:
asyncio.run(coroutine())
:在一个常规函数中调用,是协程启动入口,将开启协程的事件循环,调用后保持阻塞,直至拿到
coroutine()
的返回结果。
asyncio.create_task(coroutine())
:在协程中调用,接收另一个协程对象并调度其最终执行,返回的
Task
对象是对协程对象的包装,并且提供控制和查询协程对象运行状态的方法。
await coroutine()
:在协程中调用,
await
关键字主动让出执行控制权,终止当前协程执行,直至拿到
coroutine()
的返回结果。同时这也是一个表达式,返回结果即为
coroutine()
返回值。
下面是协程版本旋转指针程序测试效果:
在协程版本中,
spinner
是一个
Task
对象,其字符串表示形式为
<Task pending name='Task-2' coro=<spin() running at /Users/jianghushinian/spin/spinner_async.py:8>>
。
根据以上示例代码,我们可以总结出 Python 协程的最大特点:一处异步,处处异步。在协程中任何耗时操作都会减慢事件循环,由于事件循环是单线程管理的,所以这会影响其他所有协程。在编写协程代码,要格外小心,不要写出同步阻塞的代码。好在如今 Python 已经从语法层面原生支持协程,比使用生成器实现协程的年代要好多了。
给你留个小作业:尝试将
slow
协程中
await asyncio.sleep(3)
替换成普通的
time.sleep(3)
观察下效果并思考为什么。
ChatGPT研究社
」,军哥还是公众号「Python之禅」的出品人,目前星球已经有近 500 人加入,专栏文章更新近 30 篇,星球不限于分享 ChatGPT 内容,还有不定期抽奖。同时,我也会不定期在星球分享 AIGC 相关内容。