读取到当前帧数据后,所有模型依次运行。
def one_by_one(weights, frames):
sessions = [init_session(weight) for weight in weights]
costs = [[] for _ in range(len(weights))]
since_infer = time()
for frame in frames:
for session in sessions:
since = time()
_ = session.run('output', {'input': frame})
cost = time() - since
costs[idx].append(cost)
print([sum(cost) for cost in costs])
print("infer:", time() - since_infer)
return
3.3 多线程
为每一个模型分配一个线程。
from threading import Thread
def multit_thread(weights, frames):
sessions = [init_session(weight) for weight in weights]
threads = []
since_infer = time()
for session in sessions:
thread = Thread(target=run_session_thread, args=(session, frames))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print("infer:", time() - since_infer)
return
def run_session_thread(session, frames):
costs = []
for frame in frames:
since = time()
_ = session.run('output', {'input': frame})
costs.append(time() - since)
print(sum(costs))
return
3.4 多进程
为每一个模型分配一个进程。
由于 session 不能在进程间传递,因此需要在每个进程的内部单独初始化。如果数据较多,这部分初始化的时间消耗基本可以忽略不急。
from multiprocessing import Manager, Process
def multi_process(weights, frames):
inputs = Manager().list(frames)
processes = []
since_infer = time()
for weight in weights:
process = Process(target=run_session_process, args=(weight, inputs))
process.start()
processes.append(process)
for process in processes:
process.join()
print("infer:", time() - since_infer)
return
def run_session_process(weight, frames):
session = init_session(weight)
costs = []
for frame in frames:
since = time()
_ = session.run('output', {'input': frame})
costs.append(time() - since)
print(sum(costs))
return
3.5 协程
为每一个模型分配一个协程。
async def coroutine(weights, frames):
sessions = [init_session(weight) for weight in weights]
since_infer = time()
tasks = [
asyncio.create_task(run_session_coroutine(session, frames))
for session in sessions
for task in tasks:
await task
print("infer:", time() - since_all)
return
async def run_session_coroutine(session, frames):
costs = []
for frame in frames:
since = time()
_ = session.run('output', {'input': frame})
costs.append(time() - since)
print(sum(costs))
return
3.6 其他辅助函数
import cv2
import numpy as np
import onnxruntime as ort
def init_session(weight):
provider = "CUDAExecutionProvider"
session = ort.InferenceSession(weight, providers=[provider])
return session
def load_video():
# 为了减少读视频的时间,复制相同的图片组成batch
vcap = cv2.VideoCapture('path_to_video')
count = 1000
batch_size = 4
frames = []
for _ in range(count):
_, frame = vcap.read()
frame = cv2.resize(frame, (256, 256)).transpose((2, 0, 1))
frame = np.stack([frame] * batch_size, axis=0)
frames.append(frame.astype(np.float32))
return frames
def load_weights():
return ['path_to_weights_0',
'path_to_weights_1',
'path_to_weights_2',
'path_to_weights_3',]
4. 结果及分析
4.1 执行结果
以batch_size=4
共运行 1000 帧数据,推理结果如下:
在这个场景下,多线程是综合效率最高的方式(时间最短、显存占用合理、GPU 利用率最高);
串行作为最基础的方案,总时间就是每个模型执行时间之和;
多进程的方式,单模型的累积时间与多线程类似,但是总时间有明显增加,且极大增加了显存占用;
用协程的方式,总结果看,与串行模式本质上是一样的。
4.2 结果分析
4.2.1 关于线程方案
为什么多线程相比串行可以提高运行效率?
基本的判断是,session.run()
函数运行时,既有 CPU 执行的部分,又有 GPU 执行的部分;
如果是串行方案,则 CPU 运行时,GPU 会等待,反之亦然;
当换用多线程方案后,当一个线程从 CPU 执行切换到 GPU 执行后,会继续执行另一个线程的 CPU 部分,并等待 GPU 返回结果。
4.2.2 关于进程方案
为什么多进程反而降低了运行效率?
基本的判断是,整体执行的瓶颈并不在 CPU 的运算部分,而是在于 GPU 上模型前向推理的计算部分;
因此,用多个进程并没有充分利用系统资源,多个 CPU 核心会争夺同一个 GPU 的计算资源,并增加了调度消耗。
4.2.3 关于协程方案
为什么看起来协程与串行的效果一样?
协程方案在执行过程中,从表现上来看:
单个模型的累积时间是逐步print
出来的,间隔大致等于每个模型的累积时间(而线程和进程方案中,几乎是同时输出 4 个模型的累积时间,说明是同时运行结束);
显存占用是逐步增加的,最后达到与串行方案一致。
可能的原因:
CPU 和 GPU 的任务切换,可能无法触发协程的切换,导致最终的效果是,一个模型完成了所有数据的推理后,再进行下一个模型的推理。
使用协程的必要性:
从线程改为协程,是为了进一步降低线程切换的消耗;
在这个场景下,需要同时执行推理的模型数量一般不会太多,建立同样数量的线程,系统资源的消耗是可控的;
因此,没有使用协程的必要性。
关于协程的使用,也是现学,有可能因为使用方法不当而得出以上的结论。如有错误,欢迎指正。