相关文章推荐
八块腹肌的匕首  ·  Linux sed 命令 | ·  1 年前    · 
直爽的牛肉面  ·  Use ...·  1 年前    · 
慈祥的紫菜汤  ·  Request for the ...·  1 年前    · 

1. 概述

concurrent.futures 是 3.2 中引入的新模块,它为异步执行可调用对象提供了高层接口。
可以使用 ThreadPoolExecutor 来进行多线程编程,ProcessPoolExecutor 进行多进程编程,两者实现了同样的接口,这些接口由抽象类 Executor 定义。
这个模块提供了两大类型,一个是执行器类 Executor,另一个是 Future 类。
执行器用来管理工作池,future 用来管理工作计算出来的结果,通常不用直接操作 future 对象,因为有丰富的 API。

2. Executor Object 执行器对象

concurrent.futures.Executor 类

这个抽象类提供了一系列方法,可以用于异步执行调用。
它不能直接使用,只能通过子类化出来的具体类来使用。

它定义的方法有:

submit(fn, *args, **kwargs)

安排可调用对象 fn 以 fn(*args, **kwargs) 的形式执行,并返回 Future 对象来表示它的执行。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

map(func, *iterables, timeout=None, chunksize=1)

类似内置函数 map(func, *iterables),但是有两点不同:

  • 立即获取 iterables 而不会惰性获取;
  • 异步执行 func,并支持多次并发调用。
  • 它返回一个迭代器。
    从调用 Executor.map() 开始的 timeout 秒之后,如果在迭代器上调用了 __next__() 并且无可用结果的话,迭代器会抛出 concurrent.futures.TimeoutError 异常。
    timeout 秒数可以是浮点数或者整数,如果设置为 None 或者不指定,则不限制等待时间。

    如果 func 调用抛出了异常,那么该异常会在从迭代器获取值的时候抛出。

    当使用 ProcessPoolExecutor 的时候,这个方法会把 iterables 划分成多个块,作为独立的任务提交到进程池。这些块的近似大小可以通过给 chunksize 指定一个正整数。对于很长的 iterables,使用较大的 chunksize 而不是采用默认值 1,可以显著提高性能。对于 ThreadPoolExecutor,chunksize 不起作用。

    注意:不管并发任务的执行次序如何,map 总是基于输入顺序来返回值。map 返回的迭代器,在主程序迭代的时候,会等待每一项的响应。

    shutdown(wait=True)

    告诉执行器 executor 在当前所有等待的 future 对象运行完毕后,应该释放执行器用到的所有资源。
    在 shutdown 之后再调用 Executor.submit() 和 Executor.map() 会报运行时错误 RuntimeError。
    如果 wait 为 True,那么这个方法会在所有等待的 future 都执行完毕,并且属于执行器 executor 的资源都释放完之后才会返回。
    如果 wait 为 False,本方法会立即返回。属于执行器的资源会在所有等待的 future 执行完毕之后释放。
    不管 wait 取值如何,整个 Python 程序在等待的 future 执行完毕之前不会退出。
    你可以通过 with 语句来避免显式调用本方法。with 语句会用 wait=True 的默认参数调用 Executor.shutdown() 方法。

    import shutil
    with ThreadPoolExecutor(max_workers=4) as e:
        e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
        e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
        e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
        e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
    

     执行器类 Executor 实现了上下文协议,可以用做上下文管理器。它能并发执行任务,等待它们全部完成。当上下文管理器退出时,自动调用 shutdown() 方法。

    3. ThreadPoolExecutor 线程池执行器

    ThreadPoolExecutor 线程池执行器是 Executor 执行器的子类,通过线程池来执行异步调用。它管理一组工作线程,当工作线程有富余的时候,给它们传递任务。
    当属于一个 Future 对象的可调用对象等待另一个 Future 的返回时,会发生死锁 deadlock。
    举个例子:

    import time
    from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ProcessPoolExecutor
    from concurrent.futures import Future
    from multiprocessing import Pool
    def get_html(times):
        time.sleep(times)
        print("get page {} success".format(times))
        return times
    executor = ThreadPoolExecutor(max_workers=2)
    # 通过submit函数提交执行的函数到线程池中, submit 是立即返回
    task1 = executor.submit(get_html, (3))
    task2 = executor.submit(get_html, (2))
    # 要获取已经成功的task的返回
    urls = [3, 2, 4]
    all_task = [executor.submit(get_html, (url)) for url in urls]
    wait(all_task, return_when=FIRST_COMPLETED)
    print("main")
    for future in as_completed(all_task):
        data = future.result()
        print("get {} page".format(data))
    # 通过executor的map获取已经完成的task的值
    for data in executor.map(get_html, urls):
        print("get {} page".format(data))
    # done方法用于判定某个任务是否完成
    print(task1.done())
    print(task2.cancel())
    time.sleep(3)
    print(task1.done())
    # result方法可以获取task的执行结果
    print(task1.result())

    concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

    这个 Executor 子类最多用 max_workers 个线程来异步执行调用。

    initializer 是一个可选的可调用对象,会在每个 worker 线程启动之前调用。
    initargs 是传递给 initializer 的参数元组。
    如果 initializer 抛出了异常,那么当前所有等待的任务都会抛出 BrokenThreadPool 异常,继续提交 submit 任务也会抛出此异常。

    4. ThreadPoolExecutor 例子

    import concurrent.futures
    import urllib.request
    URLS = ['http://www.foxnews.com/',
            'http://www.cnn.com/',
            'http://europe.wsj.com/',
            'http://www.bbc.co.uk/',
            'http://some-made-up-domain.com/']
    # Retrieve a single page and report the URL and contents
    def load_url(url, timeout):
        with urllib.request.urlopen(url, timeout=timeout) as conn:
            return conn.read()
    # We can use a with statement to ensure threads are cleaned up promptly
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                print('%r page is %d bytes' % (url, len(data)))
    

    5. ProcessPoolExecutor 进程池执行器

    ProcessPoolExecutor 进程池执行器类是 Executor 执行器类的子类,使用进程池来异步执行调用。
    ProcessPoolExecutor 使用了 multiprocessing 模块,这允许它可以规避 Global Interpreter Lock,但是也意味着只能执行和返回可序列化的(picklable)对象。

    __main__ 模块必须被 worker 子进程导入,这意味着 ProcessPoolExecutor 在交互解释器中无法工作。

    在已经被提交到 ProcessPoolExecutor 中的可调用对象内使用 Executor 或者 Future 方法会导致死锁。

    concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

    这个 Executor 子类最多用 max_workers 个进程来异步执行调用。
    如果不指定 max_workers 或者为 None,它默认为本机的处理器数量。
    如果 max_workers 小于等于 0,会抛出 ValueError 异常。
    mp_context 是多进程上下文(multiprocessing context)或者 None,它会被用来启动 workers。如果不指定 mp_context 或者为 None,会使用默认的多进程上下文环境。

    initializer 是一个可选的可调用对象,会在每个 worker 进程启动之前调用。
    initargs 是传递给 initializer 的参数元组。
    如果 initializer 抛出了异常,那么当前所有等待的任务都会抛出 BrokenProcessPool 异常,继续提交 submit 任务也会抛出此异常。

    7. Future 对象

    Future 类封装了可调用对象的异步执行。
    Future 实例通过 Executor.submit() 创建。

    concurrent.futures.Future

    封装了可调用对象的异步执行。

  • Future 实例通过 Executor.submit() 创建,除非用于测试,不应该直接手动创建。
  • cancel() 尝试取消调用,如果该调用正在执行中,无法取消,本方法返回 False,其他情况下调用会被取消,并返回 True。
  • cancelled() 如果调用已经被成功取消,返回 True。
  • running() 如果调用正在执行,无法被取消,则返回 True。
  • done() 如果调用成功被取消或者已经执行完毕,返回 True。
  • result(timeout=None) 返回调用的返回值。如果调用还没有完成,则最多等待 timeout 秒。如果 timeout 秒之后还没有完成,抛出 concurrent.futures.TimeoutError。timeout 可以为整数或者浮点数。如果不指定或者为 None,则不限制等待时间。如果 future 在完成之前被取消了,会抛出 CancelledError 异常。
  • exception(timeout=None)
    返回被调用抛出的异常。如果调用还没有执行完毕,则最多等待 timeout 秒。如果 timeout 秒之后还没有完成,抛出 concurrent.futures.TimeoutError。timeout 可以为整数或者浮点数。如果不指定或者为 None,则不限制等待时间。
    如果 future 在完成之前被取消了,会抛出 CancelledError 异常。
    如果调用完成并且没有抛出异常,返回 None。

    8. 模块函数

    concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

    等待 Future 实例完成,这些实例可能由多个不同的执行器实例创建,通过 fs 指定这些 Future 实例。返回具名元组,该元组有两个元素,每个元素都是一个集合。第一个元素名叫 done,该集合包括已完成的 futures;第二个元素名叫 not_done,该集合包括未完成的 futures。
    timeout 用来控制返回之前等待的最大秒数,可以是整数或者浮点数。如果不指定或为 None,不限制等待时间。
    return_when 指明函数何时应该返回。它必须是下列常量之一:

  • FIRST_COMPLETED:函数在任意一个 future 完成或者被取消时返回。
  • FIRST_EXCEPTION:函数在任意一个 future 因为异常而结束时返回。如果没有 future 抛出异常,它等价于 ALL_COMPLETED。
  • ALL_COMPLETED:当所有 future 完成或者被取消时函数才会返回。
  • concurrent.futures.as_completed(fs, timeout=None)

    当通过 fs 指定的 Future 实例全部执行完毕或者被取消后,返回这些 Future 实例组成的迭代器。fs 中的 Future 实例可以被不同的执行器创建。任何在 as_completed() 调用之前就已经完成的 Future 实例会被最先生成。

    查看源码发现,实际上这是一个用到了 yield from 的生成器函数,所以调用返回一个生成器。

    如果从 as_completed() 调用开始,经过 timeout 秒之后,对返回的迭代器调用 __next__() 时结果仍不可用,则会抛出 concurrent.futures.TimeoutError 异常。timeout 可以是整数或者浮点数,如果 timeout 没有指定或者为 None,则不限制等待时间。

    import concurrent.futures
    import random
    import time
    URLS = ['http://www.foxnews.com/',
            'http://www.cnn.com/',
            'http://europe.wsj.com/',
            'http://www.bbc.co.uk/',
            'url',
            'http://some-made-up-domain.com/']
    class CrawlerFramework(object):
        def a(self, url):
            # print(url)
            time.sleep(random.random())
            return "ok"
    if __name__ == "__main__":
        crawler_framework = CrawlerFramework()
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
            future = executor.submit(crawler_framework.a, URLS[0])  # 单个
                print(future.result())  # 获取异步返回的结果,这里看到这个异步是否报错
            except Exception as e:
                print("异步执行错误:{}".format(e))
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
            future_to_url = {executor.submit(crawler_framework.a, url): url for url in URLS}  # 多个
            for future in concurrent.futures.as_completed(future_to_url):
                    url = future_to_url[future]  # 获取传入异步调用函数的参数
                    data = future.result()  # 获取异步返回的结果,这里看到这个异步是否报错
                    print(url)
                    print(data)
                except Exception as e:
                    print("异步执行错误:{}".format(e))
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
            future_to_url = executor.map(crawler_framework.a, URLS)  # 多个
                print(future_to_url.__next__())  # 获取异步返回的结果,这里看到这个异步是否报错
            except Exception as e:
                print("异步执行错误:{}".format(e))

    9.简单使用

    import concurrent.futures
    with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
       future_to_url = executor.map(self.business_query, cur.fetchall())
           print(future_to_url.__next__())  # 获取异步返回的结果,这里看到这个异步是否报错
       except Exception as e:
           print("异步执行错误:{}".format(e))
    with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
       future_to_url = executor.map(sy_zc_crawler.parse_yaml, path_list)
           print(future_to_url.__next__())  # 获取异步返回的结果,这里看到这个异步是否报错
       except Exception as e:
           print("异步执行错误:{}".format(e))