3、对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持。

4、模仿futures模块但适用于事件循环使用的Future类。

5、基于yield from的协议和任务,可以让你用顺序的方式编写并发代码。

6、当我们必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池。

7、模仿threading模块中的同步原语、可以用在单线程内的协程之间。

1、高并发编程中的三个要素

  • 回调(驱动生成器)
  • epoll(IO多路复用)
  • 2、asyncio

    asyncio是python用于解决异步io编程的一整套解决方案:

    tornado、gevent、twisted(scrapy, django channels)
    

    3、tornado

    torando自己实现了实现web服务器!

    django+flask使用第三方的web服务器:(uwsgi, gunicorn+nginx)

    tornado可以直接部署: nginx+tornado(nginx实现了一些tornado没有实现的功能)

    使用asyncio

    # 使用asyncio
    import asyncio
    import time
    async def get_html(url):
        print("start get url")
        # 同步的time.sleep不能使用在协程中!得使用自己的!
        # 这是一个耗时的操作:得加上await
        await asyncio.sleep(2)
        print("end get url")
    if __name__ == "__main__":
        start_time = time.time()
        # 自己完成select的操作
        loop = asyncio.get_event_loop()
        tasks = [get_html("http://www.imooc.com") for i in range(10)]
        # 直接将任务传进去
        """ 注意这里使用的是asyncio.wait """
        loop.run_until_complete(asyncio.wait(tasks))
        print(time.time()-start_time)
    

    获取协程的返回值

    # 获取协程的返回值
    import asyncio
    import time
    from functools import partial
    async def get_html(url):
        print("start get url")
        await asyncio.sleep(2)
        return "whw"
    """ 使用paratial的话,传的参数必须放在前边!"""
    def callback(url, future):
        print(url)
        print("send email to whw")
    if __name__ == "__main__":
        start_time = time.time()
        # 一个线程只有一个loop
        loop = asyncio.get_event_loop()
        # 使用loop.create_task方法
        task = loop.create_task(get_html("http://www.imooc.com"))
        # 可以添加一个callback!!!
        # 注意只传函数名
        # 使用partial包装callback!这样callback可以加参数了!partical方法返回的是个函数的地址
        task.add_done_callback(partial(callback, "http://www.imooc.com"))
        loop.run_until_complete(task)
        print(task.result())
        start get url
        http://www.imooc.com
        send email to whw
    
    # 获取协程的返回值
    import asyncio
    import time
    from functools import partial
    async def get_html(url):
        print("start get url")
        await asyncio.sleep(2)
        return "whw"
    def callback(url, future):
        print(url)
        print("send email to whw")
    if __name__ == "__main__":
        start_time = time.time()
        loop = asyncio.get_event_loop()
            # 调用asyncio.ensure_future —— 实际中返回的也是task类型
        get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
        loop.run_until_complete(get_future)
        print(get_future.result())  # whw
        start get url
    

    wait与gather

    gather更加高级,使用更加灵活。

    wait的方法

    # wait 和 gather
    import asyncio
    import time
    async def get_html(url):
        print("start get url")
        await asyncio.sleep(2)
        print("end get url")
    if __name__ == "__main__":
        start_time = time.time()
        loop = asyncio.get_event_loop()
        tasks = [get_html("http://www.imooc.com") for i in range(10)]
        loop.run_until_complete(asyncio.gather(*tasks))
        print(time.time()-start_time)
    

    gather的方法

    # # wait 和 gather
    import asyncio
    import time
    async def get_html(url):
        print("start get url")
        await asyncio.sleep(2)
        print("end get url")
    if __name__ == "__main__":
        start_time = time.time()
        loop = asyncio.get_event_loop()
        # gather和wait的区别
        # gather更加high-level
        group1 = [get_html("http://projectsedu.com") for i in range(2)]
        group2 = [get_html("http://www.imooc.com") for i in range(2)]
        # gather方法
        group1 = asyncio.gather(*group1)
        group2 = asyncio.gather(*group2)
        # 也可以取消掉任务
        # group2.cancel()
        # 直接传递多个group
        loop.run_until_complete(asyncio.gather(group1, group2))
        print(time.time() - start_time)
    

    run_until_complete与run_forever

    run_until_complete运行完指定的协程后自动停掉。

    """ loop会被放到future中:这样在任何future或者task中loop都会被停止! """
    """ loop放在future中,future也放在loop中~很容易引起循环引用! """
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_forever() # 永不会停止
    loop.run_until_complete(task...) # 自动停止
    

    如何取消future(task)

    import asyncio
    import time
    async def get_html(sleep_times):
        print("waiting")
        await asyncio.sleep(sleep_times)
        print("done after {}s".format(sleep_times))
    if __name__ == "__main__":
        task1 = get_html(2)
        task2 = get_html(3)
        task3 = get_html(3)
        tasks = [task1, task2, task3]
        loop = asyncio.get_event_loop()
            loop.run_until_complete(asyncio.wait(tasks))
        # 按了Ctrl+c后取消任务!
        except KeyboardInterrupt as e:
            all_tasks = asyncio.Task.all_tasks()
            for task in all_tasks:
                print("cancel task")
                # task.cancel()会返回True或者False
                print(task.cancel())
            loop.stop()
            """ # 必须调用run_forever() """
            loop.run_forever()
        finally:
            loop.close()
    

    协程的嵌套(互相调用)

    详见asyncio官方文档:

    协程相互调用文档

    import asyncio
    async def compute(x, y):
        print("Compute %s + %s ..." % (x, y))
        await asyncio.sleep(1.0)
        return x + y
    async def print_sum(x, y):
        result = await compute(x, y)
        print("%s + %s = %s" % (x, y, result))
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1, 2))
    loop.close()
    

    其他函数:call_soon/call_at/call_later/call_soon_threadsafe

    call_soon即刻执行

    ——在队列里面等到下一次循环的时候立即执行。

    import asyncio
    def callback(sleep_times):
        print("success time {}".format(sleep_times))
    def stoploop(loop):
        loop.stop()
    #call_later, call_at
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        now = loop.time()
        loop.call_soon(callback, 2)
        loop.call_soon(callback, 1)
        loop.call_soon(callback, 3)
        loop.call_soon(stoploop, loop)
        loop.run_forever()
    

    call_later

    根据延迟调用的时间来调用,并且是在call_soon之后调用。

    import asyncio
    def callback(sleep_times):
        print("success time {}".format(sleep_times))
    def stoploop(loop):
        loop.stop()
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        now = loop.time()
        # 第一个参数是指定的时间
        loop.call_later(2,callback, 2)
        loop.call_later(1,callback, 1)
        loop.call_later(3,callback, 3)
        # 先执行call_soon
        loop.call_soon(callback, 4)
        # # 停止
        # loop.call_soon(stoploop, loop)
        loop.run_forever()
        success time 4
        success time 1
        success time 2
        success time 3
    

    call_at

    import asyncio
    def callback(sleep_times,loop):
        print("success time {}".format(loop.time()))
    def stoploop(loop):
        loop.stop()
    #call_later, call_at
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        # 内部的单调时间
        now = loop.time()
        # 第一个参数是指定的时间 —— 是内部的时钟时间
        loop.call_at(now+2,callback, 2,loop)
        loop.call_at(now+1,callback, 1,loop)
        loop.call_at(now+3,callback, 3,loop)
        # 先执行call_soon
        loop.call_soon(callback, 4,loop)
        # # 停止
        # loop.call_soon(stoploop, loop)
        loop.run_forever()
        success time 22962.654698165
        success time 22963.65639768
        success time 22964.658318416
        success time 22965.659323424
    

    call_soon_threadsafe

    线程安全的方法 —— 与call_soon差不多,不过它是线程安全的 —— 用法与call_soon一样!

    线程池与asyncio结合完成阻塞IO请求

    —— 如果现实中某一个模块或者接口必须使用阻塞的话(pymysql等),可以将其放在线程池中运行!

    # 使用多线程:在协程中集成阻塞io
    import socket
    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    from urllib.parse import urlparse
    """ 阻塞的接口 """
    def get_url(url):
        # 通过socket请求html
        url = urlparse(url)
        host = url.netloc
        path = url.path
        if path == "":
            path = "/"
        #建立socket连接
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # client.setblocking(False)
        client.connect((host, 80)) # 阻塞不会消耗cpu
        #不停的询问连接是否建立好, 需要while循环不停的去检查状态
        #做计算任务或者再次发起其他的连接请求
        client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
        data = b""
        while True:
            d = client.recv(1024)
            if d:
                data += d
            else:
                break
        data = data.decode("utf8")
        html_data = data.split("\r\n\r\n")[1]
        print(html_data)
        client.close()
    if __name__ == "__main__":
        import time
        start_time = time.time()
        loop = asyncio.get_event_loop()
        # 线程池设为3
        executor = ThreadPoolExecutor(3)
        tasks = []
        for url in range(20):
            url = "http://shop.projectsedu.com/goods/{}/".format(url)
            # 将某个阻塞IO的函数放在executor中运行
            # 第一个参数是线程池 —— 第二个参数是阻塞函数,第三个参数是阻塞IO函数用到的参数
            task = loop.run_in_executor(executor, get_url, url)
            tasks.append(task)
        loop.run_until_complete(asyncio.wait(tasks))
        print("last time:{}".format(time.time()-start_time))
    

    asyncio模拟简单的http请求

    asyncio没有提供http协议接口,只提供了TCP/UDP协议接口。

    如果想使用http协议请求url的话可以使用aiohttp——搭建http服务器或者做爬虫。

    # asyncio 没有提供http协议的接口 aiohttp
    import asyncio
    import socket
    from urllib.parse import urlparse
    """# 改成协程函数"""
    async def get_url(url):
        #通过socket请求html
        url = urlparse(url)
        host = url.netloc
        path = url.path
        if path == "":
            path = "/"
        # 建立socket连接
        """# 使用await"""
        reader, writer = await asyncio.open_connection(host,80)
        writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
        all_lines = []
        async for raw_line in reader:
            data = raw_line.decode("utf8")
            all_lines.append(data)
        html = "\n".join(all_lines)
        return html
    async def main():
        tasks = []
        for url in range(20):
            url = "http://shop.projectsedu.com/goods/{}/".format(url)
            tasks.append(asyncio.ensure_future(get_url(url)))
        for task in asyncio.as_completed(tasks):
            result = await task
            print(result)
    if __name__ == "__main__":
        import time
        start_time = time.time()
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
        print('last time:{}'.format(time.time()-start_time))