声明:
python协程系列文章的上一篇,即第五篇,详细介绍了asyncio的核心概念,asyncio的设计架构,Task类的详细作用,本文为系列文章的第六篇,将介绍更加底层的API,以
EventLoop和Future
为主,介绍他们的设计理念,包含的方法以及使用技巧。
一、事件循环EventLoop
事件循环是asyncio的核心,异步任务的运行、任务完成之后的回调、网络IO操作、子进程的运行,都是通过事件循环完成的。在前一篇文章中,已经提到过,在python3.7中,我们甚至完全不用管事件循环,只需要使用高层API,即asyncio中的方法,我们很少直接与事件循环打交道,但是为了更加熟悉asyncio的运行原理,最好还是了解EventLoop的设计原理。
1、事件循环的创建、获取、设置(上文已经介绍过了)
(1)asyncio.
get_running_loop
()。python3.7新添加的
(2)asyncio.
get_event_loop
()
(3)asyncio.set_event_loop
(
loop
)
(4)asyncio.
new_event_loop
()
2、运行和停止事件循环
(1)loop.
run_until_complete
(
future
)。运行事件循环,直到future运行结束
(2)loop.
run_forever
()。在python3.7中已经取消了,表示事件循环会一直运行,直到遇到stop。
(3)loop.
stop
()。停止事件循环
(4)loop.
is_running
()。如果事件循环依然在运行,则返回True
(5)loop.
is_closed
()。如果事件循环已经close,则返回True
(6)loop.
close
()。关闭事件循环
3、创建Future和Task
(1)loop.
create_future
(coroutine) ,返回future对象
(2)loop.
create_task
(
corootine
) ,返回task对象
(3)loop.
set_task_factory
(
factory
)
(4)loop.
get_task_factory
()
4、事件循环的时钟
loop.time()。可以这么理解,事件循环内部也维护着一个时钟,可以查看事件循环现在运行的时间点是多少,就像普通的time.time()类似,它返回的是一个
浮点数值
,比如下面的代码。
import asyncio
async def hello1(a,b):
print('准备做加法运算')
await asyncio.sleep(3)
return a+b
loop=asyncio.get_event_loop()
t1=loop.time() #开始时间
print(t1)
loop.run_until_complete(hello1(3,4))
t2=loop.time() #结束时间
print(t2)
print(t2-t1) #时间间隔
'''运行结果为:
28525.671
准备做加法运算
28528.703
3.0320000000028813
5、计划执行回调函数(CallBacks)
(1)loop.call_later(delay, callback, *args, context=None)
首先简单的说一下它的含义,就是事件循环在delay多长时间之后才执行callback函数,它的返回值是asyncio.TimerHandle类的一个实例对象。
(2)loop.call_at(when, callback, *args, context=None)
即在某一个时刻进行调用计划的回调函数,第一个参数不再是delay而是when,表示一个绝对的时间点,结合前面的loop.time使用,它的使用方法和call_later()很类似。它的返回值是asyncio.TimerHandle类的一个实例对象。
(3)loop.
call_soon
(callback, *args, context=None)
在下一个迭代的时间循环中立刻调用回调函数,用法同上面。它的返回值是asyncio.Handle类的一个实例对象。
(4)loop.
call_soon_threadsafe
(callback, *args, context=None)
这是call_soon()函数的线程安全版本,计划回调函数必须在另一个线程中使用。
需要注意的是:上面的几个回调函数都只使用了“位置参数”哦,asyncio中,大部分的计划回调函数都不支持“关键字参数”,如果是想要使用关键字参数,则推荐使用functools.aprtial()对方法进一步包装,详细可以参考前面的python标准库系列文章。
# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))
下面来看一下具体的使用例子。
import asyncio
def callback(n):
print('我是回调函数,参数为: {0} '.format(n))
async def main(loop):
print('在异步函数中注册回调函数')
loop.call_later(2, callback, 1)
loop.call_later(1, callback, 2)
loop.call_soon(callback, 3)
await asyncio.sleep(4)
loop = asyncio.get_event_loop()
print('进入事件循环')
loop.run_until_complete(main(loop))
print('关闭事件循环')
loop.close()
'''运行结果为:
进入事件循环
在异步函数中注册回调函数
我是回调函数,参数为: 3
我是回调函数,参数为: 2
我是回调函数,参数为: 1
关闭事件循环
再看一个简单的例子:
import asyncio
def callback(a, loop):
print("我的参数为 {0},执行的时间为{1}".format(a,loop.time()))
#call_later, call_at
if __name__ == "__main__":
loop = asyncio.get_event_loop()
now = loop.time()
loop.call_later(5, callback, 5, loop) #第一个参数设置的时间5.5秒后执行,
loop.call_at(now+2, callback, 2, loop) #在指定的时间,运行,当前时间+2秒
loop.call_at(now+1, callback, 1, loop)
loop.call_at(now+3, callback, 3, loop)
loop.call_soon(callback, 4, loop)
loop.run_forever() #要用这个run_forever运行,因为没有传入协程,这个函数在3.7中已经被取消
except KeyboardInterrupt:
print("Goodbye!")
'''运行结果为:
我的参数为 4,执行的时间为266419.843
我的参数为 1,执行的时间为266420.843
我的参数为 2,执行的时间为266421.859
我的参数为 3,执行的时间为266422.859
我的参数为 5,执行的时间为266424.843
总结注意事项:
(1)CallBack函数只能够定义为同步方法,不能够定义为async方法,及不能使用async和@asyncio.coroutine修饰;
(2)每一个CallBack方法只会调用一次,如果在同一个时刻有另个CallBack方法需要调用,则他们的执行顺序是不确定的;
(3)注意使用functools.partial()去修饰带有关键字参数的CallBack方法;
(4)如何理解?对于一般的异步函数,我们需要将它放在时间循环里面,然后通过事件循环去循环调用它,而因为CallBack并不是异步函数,它是定义为普通的同步方法,所以不能够放在时间循环里面,但是如果我依然想要让事件循环去执行它怎么办呢?那就不放进事件循环,直接让事件循环“立即、稍后、在什么时候”去执行它不就行了嘛,call的含义就是“执行”。
二、底层API之Future
1、Future的定义概览
Future的本质是一个类。他表示的是异步操作的最终将要返回的结果,故而命名为Future,它不是线程安全的。Future对象是awaitable的,参见系类文章的前面,
class asyncio.
Future
(*, loop=None)
2、asyncio中关于Future的几个方法
(1)asyncio.
isfuture
(obj) 。判断一个对象是不是Future,注意python中一切皆对象哦,包括函数,当obj是下面几种情况时返回true:
- asyncio.Future的实例对象
- asyncio.Task的实例对象
- 一个具有
_asyncio_future_blocking
属性的对象
(2)asyncio.
ensure_future
(obj, *, loop=None)。将一个obj包装成Future
(3)asyncio.
wrap_future
(future, *, loop=None)
将concurrent.futures.Future
对象包装成一个 asyncio.Future
对象。
3、Future对象的常用方法
(1)result
()。返回Future执行的结果返回值
如果Future被执行完成,如果使用set_result()方法设置了一个结果,那个设置的value就会被返回;
如果Future被执行完成,如果使用set_exception()方法设置了一个异常,那么使用这个方法也会触发异常;
如果Future被取消了,那么使用这个方法会触发CancelledError异常;
如果Future的结果不可用或者是不可达,那么使用这个方法也会触发InvalidStateError异常;
(2)set_result
(result)
标记Future已经执行完毕,并且设置它的返回值。
(3)set_exception
(exception)
标记Future已经执行完毕,并且触发一个异常。
(4)done
()
如果Future1执行完毕,则返回 True
。
(5)cancelled
()
判断任务是否取消。
(6)add_done_callback
(callback, *, context=None)
在Future完成之后,给它添加一个回调方法,这个方法就相当于是loop.call_soon()方法,参见前面,如下例子:
如果要回调带有关键字参数的函数,也需要使用partial方法哦。
(7)remove_done_callback
(callback)
(8)cancel
()
(9)exception
()
(10)get_loop
()。返回Future所绑定的事件循环
三、集中回答以下几个问题
通过前面的讲解,已经讲清楚了asyncio架构里面的一些基本东西,现在可以来集中回答以下一些常见的问题了,弄清楚这希尔问题,可以方便我们更加深入的理解协程。
1、很多个协程一起运行有创建新的线程吗?
协程运行时,都是在一个线程中运行的,没有创建新的线程。如下
import asyncio
import time
import threading
a=time.time()
async def hello1():
print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(3)
print("Hello again 01 end")
async def hello2():
print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(2)
print("Hello again 02 end")
async def hello3():
print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(1)
print("Hello again 03 end")
loop = asyncio.get_event_loop()
tasks = [hello1(), hello2(),hello3()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
b=time.time()
print('---------------------------------------')
print(b-a)
'''运行结果为:
Hello world 03 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello world 02 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello world 01 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello again 03 end
Hello again 02 end
Hello again 01 end
---------------------------------------
2.994506597518921
从上面那个可以看出,三个不同的协程函数都是在一个线程完成的。但是并不是意味着,多个协程函数只能在一个线程中执行,同样可以创建新的线程,其实我们完全可以在新的线程中重新创建一个事件循环,具体的实例参见后面。
2、线程一定效率更高吗?
也不是绝对的,当然在一般情况下,异步方式的执行效率是更高的,就比如上面的三个函数,如果按照同步的方式执行,则一共需要6秒的时间,但是采用协程则只需要最长的那个时间3秒,这自然是提高了工作效率,那是不是一定会提高呢?也不一定,这与协程的调用方式是由密切关系的。如下所示:
import asyncio
import time
import threading
a=time.time()
async def hello1():
print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(3)
print("Hello again 01 end")
async def hello2():
print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(2)
print("Hello again 02 end")
async def hello3():
print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
await hello2()
await hello1()
print("Hello again 03 end")
loop = asyncio.get_event_loop()
tasks = [hello3()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
b=time.time()
print('---------------------------------------')
print(b-a)
'''运行结果为:
Hello world 03 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello world 02 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello again 02 end
Hello world 01 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello again 01 end
Hello again 03 end
---------------------------------------
5.008373498916626
我们发现一个问题,上面执行的顺序完全不是异步执行,执行的时间也没有得到改善,究其原因,是因为上面是通过hello3去调用hello1和hello2的,这和同步调用的方式完全是一样的,即使我定义的都是异步方法,它既没有提高执行效率,还会有阻塞。
结论:在有很多个异步方式的时候,一定要尽量避免这种异步函数的直接调用,这和同步是没什么区别的,一定要通过事件循环loop,“让事件循环在各个异步函数之间不停游走”,这样才不会造成阻塞。
3、协程会不会有阻塞呢?
异步方式依然会有阻塞的,当我们定义的很多个异步方法彼此之间有一来的时候,比如,我必须要等到函数1执行完毕,函数2需要用到函数1的返回值,如上面的例子2所示,就会造成阻塞,这也是异步编程的难点之一,如何合理配置这些资源,尽量减少函数之间的明确依赖,这是很重要的。
4、协程的4种状态
协程函数相比于一般的函数来说,我们可以将协程包装成任务Task,任务Task就在于可以跟踪它的状态,我就知道它具体执行到哪一步了,一般来说,协程函数具有4种状态,可以通过相关的模块进行查看,请参见前面的文章,他的四种状态为:
- Pending
- Running
- Done
- Cacelled
创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,中途需要取消,就需要先把task取消,即为cancelled。
四、多任务实现并发
python异步协程函数的最终目的是实现并发,这样才能提高工作效率。
我们经常看见下面这样的代码,即:
tasks = asyncio.gather(*[task1,task2,task3])
loop.run_until_complete(tasks)
tasks = asyncio.wait([task1,task2,task3])
loop.run_until_complete(tasks)
#甚至可以写在一起,即
loop.run_until_complete(asyncio.gather(*[task1,task2,task3])
asyncio.gather(asyncio.wait([task1,task2,task3]))
上面这些都是一些简单的应用,可以同时进行多任务,进行并发,但是如果我们每一个任务都有返回值,而且需要获取这些返回值,这样做显然还不够,还需要做进一步的处理。
asyncio实现并发的思想是一样的,只是实现的手段稍有区别,主要有以下几种实现方式:
(1)使用gather同时注册多个任务,实现并发
awaitable asyncio.
gather
(*aws, loop=None, return_exceptions=False)
注意事项:
gather的返回值是它所绑定的所有任务的执行结果,而且顺序是不变的,即返回的result的顺序和绑定的顺序是保持一致的。
除此之外,它是awaitable的,所以,如果需要获取多个任务的返回值,既然是awaitable的,就需要将它放在一个函数里面,所以我们引入一个包装多个任务的入口main,这也是python3.7的思想。如下:
# import asyncio
# import time
# import threading
# a=time.time()
# async def hello1():
# print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
# await asyncio.sleep(3)
# print("Hello again 01 end")
# async def hello2():
# print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
# await asyncio.sleep(2)
# print("Hello again 02 end")
# async def hello3():
# print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
# await hello2()
# await hello1()
# print("Hello again 03 end")
# loop = asyncio.get_event_loop()
# tasks = [hello3()]
# loop.run_until_complete(asyncio.wait(tasks))
# loop.close()
# b=time.time()
# print('---------------------------------------')
# print(b-a)
import asyncio
import time
async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模拟耗时任务3秒
print("Hello again 01 end")
return a+b
async def hello2(a,b):
print("Hello world 02 begin")
await asyncio.sleep(2) #模拟耗时任务2秒
print("Hello again 02 end")
return a-b
async def hello3(a,b):
print("Hello world 03 begin")
await asyncio.sleep(4) #模拟耗时任务4秒
print("Hello again 03 end")
return a*b
async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
results=await asyncio.gather(task1,task2,task3)
for result in results: #通过迭代获取函数的结果,每一个元素就是相对应的任务的返回值,顺序都没变
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
'''运行结果为:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
(2)使用wait可以同时注册多个任务,实现并发
await asyncio.
wait
(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
它与gather不同的地方是他的参数是集合类型,而且他的返回类型是这样一个形式,即
(done, pending). #返回dones是已经完成的任务,pending是未完成的任务,都是集合类型,不同的是每一个元素不再是返回值,而是某一个task哦,
相同的是它依然也是awaitable的,故而也需要定义在一个异步函数main()中,如下。
#前面的代码和上面一样
async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
done,pending=await asyncio.wait([task1,task2,task3])
for done_task in done:
print(done_task.result()) #这里返回的是一个任务,不是直接的返回值,故而需要使用result函数进行获取
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
#运行结果也一样
(3)使用as_completed可以同时注册多个任务,实现并发
这个方法使用的比较少,与前面的两个gather和wait不同的是,它不是awaitable。使用实例参见前面的一篇文章,参见如下:
(4)主调方获取任务的运行结果
上面的运行结果,都是在main()函数里面获取的运行结果,那可不可以不再main()里面获取结果呢,,当然是可以的,我们可以这样做,
async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
return await asyncio.gather(task1,task2,task3) #不在这里获取结果,只是返回
loop = asyncio.get_event_loop()
results=loop.run_until_complete(main()) #在这里再获取返回函数值,然后迭代获取
for result in results:
print(result)
loop.close()
#y运行结果同上
或者是如下:
async def main(): #封装多任务的入口函数
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
return await asyncio.wait([task1,task2,task3]) #不在这里获取结果,只是返回
loop = asyncio.get_event_loop()
done,pending=loop.run_until_complete(main()) #在这里再获取返回函数值,然后迭代获取
for done_task in done:
print(done_task.result())
loop.close()
五、Future补充下一篇预告
1、Future补充
asyncio中的Future类是模仿concurrent.futures.Future类而设计的,关于concurrent.futures.Future,可以查阅相关的文档。它们之间的主要区别是:
(1)asyncio.Future对象是awaitable的,但是concurrent.futures.Future对象是不能够awaitable的;
(2)asyncio.Future.result()和asyncio.Future.exception()是不接受关键字参数timeout的;
(3)当Future没有完成的时候,asyncio.Future.result()和asyncio.Future.exception()将会触发一个InvalidStateError异常;
(4)使用asyncio.Future.add_done_callback()注册的回调函数不会立即执行,它可以使用loop.call_soon代替;
(5)asyncio里面的Future和concurrent.futures.wait()以及concurrent.futures.as_completed()是不兼容的。
有兴趣的小伙伴可以自己学一下concurrent.futures哦!
2、下一篇预告
多线程+asyncio协程。实现更加强健的程序。
声明:python协程系列文章的上一篇,即第五篇,详细介绍了asyncio的核心概念,asyncio的设计架构,Task类的详细作用,本文为系列文章的第六篇,将介绍更加底层的API,以EventLoop和Future为主,介绍他们的设计理念,包含的方法以及使用技巧。一、事件循环EventLoop事件循环是asyncio的核心,异步任务的运行、任务完成之后的回调、网络IO操作、子进程...
进程,是计算机中已运行程序的实体。程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行实例。
线程的定义:
操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。
进程和线程的关系:
一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
CPU的最小调度单元是线程不是进程,所以单进程多线程也可以利用多核CPU.
协程的定义:
协程通过在线程中实现调度,避免了陷入内核级别的上下文切换造成的性能损失,进而突破了线程在IO上的性能瓶颈。
协程和线程的关系
协程是在语言层面实现对线程的调
协程多任务通常运行在同一进程的单线程环境中,但如果有要将多个基于
asyncio的
协程运行在不同线程中的需求,可在主线程中使用
asyncio.
new_
event_l
oop()创建一个
new_l
oop,在子线程中使用
asyncio.set_
event_l
oop(
new_l
oop)为子线程设置其
事件循环。
给出如下例子做参考:
一、事件循环EventLoop
事件循环是asyncio的核心,异步任务的运行、任务完成之后的回调、网络IO操作、子进程的运行,都是通过事件循环完成的。在前一篇文章中,已经提到过,在python3.7中,我们甚至完全不用管事件循环,只需要使用高层API,即asyncio中的方法,我们很少直接与事件循环打交道,但是为了更加熟悉asyncio的运行原理,最好还是了解EventLoop的设计原理。
1、事件循环的创建、获取、设置(上文已经介绍过了)
(1)asyncio.get_running_loop(
张浩Java考试成绩未达到自己的目标。为了表明自己勤奋学习的决心,他决定写一百遍“好好学习,天天向上!”根据前面学习的内容我们知道System.out.println(“好好学习,天天向上!”)可以将这句话打印出来,但是现在是要打印100次,那是不是我们需要写100次呢?答案肯定是否定的,在java中我们要避免写重读的代码,我们要尽量“偷懒”让代码能重复使用。那如何才能写一次输出语句,让它运行100次呢?接下来我们就要学习流程控制中的--循环结构。
一、什么是循环结构
1.循环简单理解就是重读
所谓「异步 IO」,就是你发起一个 IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。
Asyncio 是并发(concurrency)的一种方式。当然对 Python 来说,并发编程还可以通过线程(threading)和多进程(multiprocessing)来实现。Asyncio 并不能带来真正的并行(parallelism)。
当然,因为 GIL(全局解释器锁)的存在,Python 的多线程也不能带来真正的并行。
可交给 asyncio 执行的任务,称为协程(corout
Python协程是一种轻量级的多任务并发方法,它可以在单线程中实现并发执行。asyncio是Python 3.4版本之后新增的标准库,提供了对异步IO的支持,可以用于编写高效的异步IO应用程序。asyncio中的协程被称为coroutine,使用async关键字定义。
在asyncio中,可以使用async关键字定义协程,使用await关键字来等待协程执行完成。同时,asyncio提供了事件循环(event loop)来管理协程的执行。事件循环是一个类似于死循环的机制,在其中不断地检查协程的状态并执行它们。
下面是一个简单的使用asyncio的例子:
```python
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
在上面的代码中,我们定义了一个协程hello,它先输出“Hello”,然后等待1秒钟,最后输出“World”。我们使用事件循环来执行这个协程。
asyncio还提供了一些常用的工具函数和类,例如:
- asyncio.get_event_loop():获取事件循环对象。
- asyncio.sleep():让协程等待一段时间。
- asyncio.gather():并发执行多个协程。
- asyncio.ensure_future():将协程加入事件循环。
- asyncio.Queue():实现协程之间的消息传递。
使用asyncio可以方便地编写高效的异步IO应用程序。但需要注意的是,在使用asyncio时,应该尽量避免阻塞操作,因为阻塞操作会阻塞事件循环的执行,影响程序的性能。如果需要进行阻塞操作,可以使用asyncio提供的线程池或进程池来进行处理。