读取到当前帧数据后,所有模型依次运行。

def one_by_one(weights, frames):
    sessions = [init_session(weight) for weight in weights]
    costs = [[] for _ in range(len(weights))]
    since_infer = time()
    for frame in frames:
        for session in sessions:
            since = time()
            _ = session.run('output', {'input': frame})
            cost = time() - since
            costs[idx].append(cost)
    print([sum(cost) for cost in costs])
    print("infer:", time() - since_infer)
    return

3.3 多线程

为每一个模型分配一个线程。

from threading import Thread
def multit_thread(weights, frames):
    sessions = [init_session(weight) for weight in weights]
    threads = []
    since_infer = time()
    for session in sessions:
        thread = Thread(target=run_session_thread, args=(session, frames))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    print("infer:", time() - since_infer)
    return
def run_session_thread(session, frames):
    costs = []
    for frame in frames:
        since = time()
        _ = session.run('output', {'input': frame})
        costs.append(time() - since)
    print(sum(costs))
    return

3.4 多进程

为每一个模型分配一个进程。
由于 session 不能在进程间传递,因此需要在每个进程的内部单独初始化。如果数据较多,这部分初始化的时间消耗基本可以忽略不急。

from multiprocessing import Manager, Process
def multi_process(weights, frames):
    inputs = Manager().list(frames)
    processes = []
    since_infer = time()
    for weight in weights:
        process = Process(target=run_session_process, args=(weight, inputs))
        process.start()
        processes.append(process)
    for process in processes:
        process.join()
    print("infer:", time() - since_infer)
    return
def run_session_process(weight, frames):
    session = init_session(weight)
    costs = []
    for frame in frames:
        since = time()
        _ = session.run('output', {'input': frame})
        costs.append(time() - since)
    print(sum(costs))
    return

3.5 协程

为每一个模型分配一个协程。

async def coroutine(weights, frames):
    sessions = [init_session(weight) for weight in weights]
    since_infer = time()
    tasks = [
        asyncio.create_task(run_session_coroutine(session, frames))
        for session in sessions
    for task in tasks:
        await task
    print("infer:", time() - since_all)
    return
async def run_session_coroutine(session, frames):
    costs = []
    for frame in frames:
        since = time()
        _ = session.run('output', {'input': frame})
        costs.append(time() - since)
    print(sum(costs))
    return

3.6 其他辅助函数

import cv2
import numpy as np
import onnxruntime as ort
def init_session(weight):
    provider = "CUDAExecutionProvider"
    session = ort.InferenceSession(weight, providers=[provider])
    return session
def load_video():
    # 为了减少读视频的时间,复制相同的图片组成batch
    vcap = cv2.VideoCapture('path_to_video')
    count = 1000
    batch_size = 4
    frames = []
    for _ in range(count):
        _, frame = vcap.read()
        frame = cv2.resize(frame, (256, 256)).transpose((2, 0, 1))
        frame = np.stack([frame] * batch_size, axis=0)
        frames.append(frame.astype(np.float32))
    return frames
def load_weights():
    return ['path_to_weights_0',
            'path_to_weights_1',
            'path_to_weights_2',
            'path_to_weights_3',]

4. 结果及分析

4.1 执行结果

batch_size=4共运行 1000 帧数据,推理结果如下:

  • 在这个场景下,多线程是综合效率最高的方式(时间最短、显存占用合理、GPU 利用率最高);
  • 串行作为最基础的方案,总时间就是每个模型执行时间之和;
  • 多进程的方式,单模型的累积时间与多线程类似,但是总时间有明显增加,且极大增加了显存占用;
  • 用协程的方式,总结果看,与串行模式本质上是一样的。
  • 4.2 结果分析

    4.2.1 关于线程方案

    为什么多线程相比串行可以提高运行效率?

  • 基本的判断是,session.run()函数运行时,既有 CPU 执行的部分,又有 GPU 执行的部分;
  • 如果是串行方案,则 CPU 运行时,GPU 会等待,反之亦然;
  • 当换用多线程方案后,当一个线程从 CPU 执行切换到 GPU 执行后,会继续执行另一个线程的 CPU 部分,并等待 GPU 返回结果。
  • 4.2.2 关于进程方案

    为什么多进程反而降低了运行效率?

  • 基本的判断是,整体执行的瓶颈并不在 CPU 的运算部分,而是在于 GPU 上模型前向推理的计算部分;
  • 因此,用多个进程并没有充分利用系统资源,多个 CPU 核心会争夺同一个 GPU 的计算资源,并增加了调度消耗。
  • 4.2.3 关于协程方案

    为什么看起来协程与串行的效果一样?

    协程方案在执行过程中,从表现上来看:

  • 单个模型的累积时间是逐步print出来的,间隔大致等于每个模型的累积时间(而线程和进程方案中,几乎是同时输出 4 个模型的累积时间,说明是同时运行结束);
  • 显存占用是逐步增加的,最后达到与串行方案一致。
  • 可能的原因:

  • CPU 和 GPU 的任务切换,可能无法触发协程的切换,导致最终的效果是,一个模型完成了所有数据的推理后,再进行下一个模型的推理。
  • 使用协程的必要性:

  • 从线程改为协程,是为了进一步降低线程切换的消耗;
  • 在这个场景下,需要同时执行推理的模型数量一般不会太多,建立同样数量的线程,系统资源的消耗是可控的;
  • 因此,没有使用协程的必要性。
  • 关于协程的使用,也是现学,有可能因为使用方法不当而得出以上的结论。如有错误,欢迎指正。