process(self, obs):
def
reset(self):
obs
=
self.venv.reset()
return
self.process(obs)
def
step_wait(self):
obs, rews, dones, infos
=
self.venv.step_wait()
return
self.process(obs), rews, dones, infos
class
CloudpickleWrapper(object):
Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
def
__init__
(self, x):
self.x
=
x
def
__getstate__
(self):
import
cloudpickle
return
cloudpickle.dumps(self.x)
def
__setstate__
(self, ob):
import
pickle
self.x
=
pickle.loads(ob)
@contextlib.contextmanager
def
clear_mpi_env_vars():
from mpi4py import MPI will call MPI_Init by default. If the child process has MPI environment variables, MPI will think that the child process is an MPI process just like the parent and do bad things such as hang.
This context manager is a hacky way to clear those environment variables temporarily such as when we are starting multiprocessing
Processes.
removed_environment
=
{}
for
k, v
in
list(os.environ.items()):
for
prefix
in
[
'
OMPI_
'
,
'
PMI_
'
]:
if
k.startswith(prefix):
removed_environment[k]
=
v
del
os.environ[k]
try
:
yield
finally
:
os.environ.update(removed_environment)
class
AlreadySteppingError(Exception):
class
NotSteppingError(Exception):
作为异常类不过多介绍。
class
VecEnv(ABC): 作为抽象类是对gym的环境进行进一步的包装,该类的作用就是进行多环境env的并行操作,也就是并行与环境进行交互和采样。
继承并实现该类进行初始化的时候需要设置并行的环境数和环境的状态空间和动作空间。
该类的主要操作为 reset, step, render , 这三个操作的含义和gym的设定相同,不同的是并行操作部分:
step函数中调用
self.step_async(actions)
保证多个环境都可以并行的收到下步的动作,
self.step_wait()
可以视作阻塞操作用来同步多进程下多个环境的step同步,并将多个环境返回的:
Returns (obs, rews, dones, infos):
- obs: an array of observations, or a dict of
arrays of observations.
- rews: an array of rewards
- dones: an array of "episode done" booleans
- infos: a sequence of info object
obs,rews,dones,infos 向上级返回。
render函数为绘图动作,该函数将多个环境的当前状态的图片进行拼接,
在'human'模式下
将拼接后的图片进行绘图操作,在
'rgb_array'
模式下对拼接后的图片的np.array形式数据进行返回。
多环境当前状态图片的拼接参见:
https://www.cnblogs.com/devilmaycry812839668/p/16025513.html
close函数关闭绘图对象
self.viewer
并调用
close_extras
关闭其他资源。
函数get_viewer生成
绘图对象
self.viewer,绘图对象
self.viewer
为调用gym的rendering模块生成的。
函数
unwrapped判断实现VecEnv类的子类是否属于VecEnvWrapper类,如果是则调用
self.venv.unwrapped
,大致可以理解为该函数是要返回最原始的为包装过的env而不是VecEnv 。
def __getattr__(self, name):
if name.startswith('_'):
raise AttributeError("attempted to get missing private attribute '{}'".format(name))
return getattr(self.venv, name)
def
step_wait(self):
obs, rews, dones, infos
=
self.venv.step_wait()
return
self.process(obs), rews, dones, infos
可以看出这几个抽象类的最后都是通过调用self.env来实现的,比如:
step函数则是调用:
self.venv.step_async(actions)
obs, rews, dones, infos =
self.venv.step_wait()
return
self.process(obs), rews, dones, infos
来实现的。
而reset则是:
def reset(self):
obs = self.venv.reset()
return self.process(obs)
可以看出self.env的env.reset()函数、env.step_wait()函数、
venv.render(
mode=mode)函数比较重要。
在类VecEnv及子类中比较有意思的函数:
def __getattr__(self, name):
if name.startswith('_'):
raise AttributeError("attempted to get missing private attribute '{}'".format(name))
return getattr(self.venv, name)
该函数的含义是调用类对象的成员变量时如果是私有变量则报错,对其他的变量都是返回self.env中对应的同名变量。
class CloudpickleWrapper(object):
Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
def __init__(self, x):
self.x = x
def __getstate__(self):
import cloudpickle
return cloudpickle.dumps(self.x)
def __setstate__(self, ob):
import pickle
self.x = pickle.loads(ob)
这个类是实现调用pickle实现序列化时内部是先调用cloudpickle模块实现raw byte类型,该模块可以将变量及函数都转为raw byte类型从而可以调用pickle进行序列化,可以实现多进程间传递python函数等。
只要引入了mpi4py包就会自动设置环境变量,而引入mpi4py后在调用multiprocessing函数生成多个子进程时mpi4py模块会导致子进程挂起,这时如果我们已经引入了mpi4py后还想要调用multiprocessing模块生成多个进程则需要设置环境变量将mpi的环境变量删除,这样话就不会识别到已经引入的mpi4py,在生成多进程执行完操作后再将删除的mpi环境变量加回到环境变量中。