从零开始的机器学习实战(十六):强化学习
本章是《Hands-On Machine Learning with Scikit-Learn & TensorFlow》的最后一章
强化学习(RL)是近些年深度学习最令人激动的领域之一,虽然早在1950s强化学习就有了研究,但没有搞出什么大新闻,但在2013年有了革命性的成果,Deepmind的研究者用RL成功设计了一个玩雅达利游戏的项目,仅仅输入游戏像素而不需要了解先验知识,2016年又设计了可以打败围棋冠军李世石的系统,这在历史上是从未接近过的目标!关键在于他们将深度学习运用到强化学习领域,结果却超越了他们最疯狂的设想。现在RL依旧是最活跃的领域,Deepmind也被谷歌在2014年以超过五亿美元收购。
学习优化奖励
在强化学习中,智能体在环境中观察并做出决策,作为回报,它们获得奖励(你可以认为正奖励代表愉快,负奖励代表痛苦),它的目标就是让愉快最大,痛苦最小。
这是一个相当广泛的设置,可以适用于各种各样的任务。下面有几个例子
- 智能体可以是控制一个机械狗的程序。环境就是真实的世界,智能体通过许多的传感器例如摄像机或者传感器来观察,它可以通过给电机发送信号来行动。到达目的正奖励,浪费时间或者走错方向或摔倒了就得到负奖励。
- 智能体可以是控制 MS.Pac-Man 的程序(雅达利游戏仿真),行为是 9 个操纵杆位(上下左右中间等等),观察是屏幕,回报就是游戏点数。
- 智能体也可以是棋盘游戏的程序例如:围棋。
- 智能体调整到目标温度以节能时会得到正奖励,当人们需要自己去调节温度时它会得到负奖励,所以智能体必须学会预见人们的需要。
- 智能体观测股票市场价格以实时决定买卖。奖励的依据显然为挣钱或者赔钱。
也可以没有正奖励,比如智能体在迷宫内每时每刻都受到负奖励,因此要尽快走出去
策略搜索
策略是智能体用来决定如何行动的,比如,策略是一个神经网络,观测时输入,行为是输出,如图:
策略可以是任何算法,甚至不必是确定的。比如,你训练一个智能吸尘器,奖励是30分钟内的吸尘数量,它的策略是以p的概率直走,以1-p的概率转弯,角度在-r到r之间随机选择。策略是随机策略。
我们怎么来训练呢,我们要选取两个策略参数,P和R,我们需要进行策略搜索,当然可以尝试多次随机组合,但是在通常参数空间太大的情况下,随机的选择就像大海捞针。比较有效的方法有:
- 遗传算法。如随机创造100个第一代的策略基因,随后杀死 80 个糟糕的策略,让 20 个幸存策略繁衍 4 代。后代只是它父辈基因的复制加上一些随机变异,迭代下去找到好的策略。
-
优化技术,通过评估奖励关于策略参数的梯度(
策略梯度
(PG)),跟随梯度向更高的奖励(梯度上升)调整这些参数。如,吸尘器机器人,你可以稍微增加概率P并评估这是否增加了机器人在 30 分钟内拾起的灰尘的量;如果确实增加了,就相对应增加
P
,否则减少P
。
我们将使用 Tensorflow 来实现 PG 算法,但是我们需要首先智能体创造一个生存的环境,所以现在是介绍 OpenAI 的时候了。
OpenAI 的介绍
强化学习的一个挑战是,要训练智能体,就要首先有一个环境,比如你要智能体学习玩雅达利游戏,你要有一个雅达利游戏的仿真,你要训练一个行走的机器人,环境可以是真实世界,但这样通常有很多限制:跌倒了你不能简单的撤销,也不能加速训练即使你计算力强大,同时训练1000个机器人价值昂贵,简而言之,训练在现实世界中是困难和缓慢的,所以你通常需要一个模拟环境,至少需要引导训练。
OpenAI gym 是一个工具包,它提供各种各样的模拟环境(雅达利游戏,2D/3D物理环境),来帮助你训练RL算法程序。
$ pip install --upgrade gym #pip安装
然后创建一个环境,例中是 CartPole 环境。这是一个 2D 模拟,其中推车可以被左右加速,以平衡放置在它上面的平衡杆(如图):需要使用
reset()
初始化,会返回第一个观察结果,观测的结果包含四个浮点的 1D Numpy 向量:这些浮点数代表推车的水平位置(0 为中心)、其速度、杆的角度(0 维垂直)及其角速度。
>>> import gym
>>> env = gym.make("CartPole-v0")
[2016-10-14 16:03:23,199] Making new env: MsPacman-v0
>>> obs = env.reset()
array([-0.03799846,-0.03288115,0.02337094,0.00720711])
>>> env.render() #render()方法显示如图所示的环境
来询问环境什么动作是可能的:
>>> env.action_space
Discrete(2)
表示可能的动作是整数 0 和 1,表示向左(0)或右(1)的加速。其他环境可能有更多的动作。因为杆子向右倾斜,让我们向右加速推车:
>>> action = 1 # accelerate right
>>> obs, reward, done, info = env.step(action)
array([-0.03865608, 0.16189797, 0.02351508, -0.27801135])
>>> reward
False
{}
step()
表示执行给定的动作并返回四个值:
-
obs
:新的观测,小车现在正在向右走(obs[1]>0
)。平衡杆仍然向右倾斜(obs[2]>0
),但是他的角速度现在为负(obs[3]<0
),所以它在下一步后可能会向左倾斜。 -
reward
:无论做什么,每一步都会得到 1.0 奖励,所以游戏的目标就是尽可能长的运行。 -
done
:当游戏结束时这个值会为True
。当平衡杆倾斜太多,之后,必须重新设置环境 -
info
:提供额外的调试信息。这些数据不应该用于训练(这是作弊)。
我们编写一个简单的策略,当杆向左倾斜时向左加速,当杆向右倾斜时向右加速:
def basic_policy(obs):
angle = obs[2]
return 0 if angle < 0 else 1
totals = []
for episode in range(500):
episode_rewards = 0
obs = env.reset()
for step in range(1000): # 最多1000 步,我们不想让它永远运行下去
action = basic_policy(obs)
obs, reward, done, info = env.step(action)
episode_rewards += reward
if done:
break
totals.append(episode_rewards)
>>> import numpy as np
>>> np.mean(totals), np.std(totals), np.min(totals), np.max(totals)
(42.125999999999998, 9.1237121830974033, 24.0, 68.0)
我们发现最多只有68步,这不理想。从模拟中可以观察到推车越来越强烈地左右摆动,直到平衡杆倾斜太多。
神经网络策略
让我们创建一个神经网络策略:和之前的一样,把观测作为输入,动作作为输出。在CartPole里,只需要一个神经元来决定左或右,如图,输出代表动作0(左)的概率
假设输出是0.7,我们将有0.7概率向左,0.3概率向右。这可能让你感到奇怪?为什么不选择概率高的呢?
实际上,我们需要使智能体在 探索新的行为 和 利用那些已知可行的行动 之间找到正确的平衡。比如你去餐馆就餐,如果每次都点最爱吃的,就会错过机会探索一些新出的菜品,即使它们更好吃。
这个CartPole 问题是简单的,它无噪声也观测到了全部状态。因此无需考虑过去,但是在其他环境下,你可能需要考虑过去。比如,有一些隐藏状态,那么你也需要考虑过去,例如环境仅有推车的位置,你要考虑先前的观测,以便估计当前的速度;或者观测是有噪声的的,通常你想用过去的观察来估计最可能的当前状态。
import tensorflow as tf
from tensorflow.contrib.layers import fully_connected
# 1. 声明神经网络结构
n_inputs = 4 # == env.observation_space.shape[0]
n_hidden = 4 # 这只是个简单的测试,不需要过多的隐藏层
n_outputs = 1 # 只输出向左加速的概率
initializer = tf.contrib.layers.variance_scaling_initializer()
# 2. 建立神经网络
X = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden = fully_connected(X, n_hidden, activation_fn=tf.nn.elu,weights_initializer=initializer)
# 隐层激活函数使用指数线性函数
logits = fully_connected(hidden, n_outputs, activation_fn=None,weights_initializer=initializer)
outputs = tf.nn.sigmoid(logits) #如果多个状态,可能需要softmax
# 3. 在概率基础上随机选择动作
p_left_and_right = tf.concat(axis=1, values=[outputs, 1 - outputs])
#如果softmax输出全部概率,此处可不必
action = tf.multinomial(tf.log(p_left_and_right), num_samples=1) #按概率随机选择一个
init = tf.global_variables_initializer()
评价行为:信用分配问题
如果我们知道每步最好的行为,训练很容易的,我们只要最小化目标和输出的交叉熵就可以了。但是强化学习奖励常常是稀疏和延迟的,假设100步杆子没有倒下来,我们不知道哪些步是获得好结果的关键,如果某次杆子倒下去了,最后一步也不该负全责,可能之前某次已经出了大的偏差。这被称为信用分配问题:当智能体得到奖励时,很难知道哪些行为应该被信任(或责备)。就像一只小狗在行为良好后的几小时得到奖励,它会明白它为什么获得回报吗?
一个通常的方法是,评价一个策略基于未来的步骤奖励的总和,通常会用一个衰减率r,如图所示,假设r=0.8,第3步获得-50的奖励,那么依次按0.8向前递推,第二部的奖励是-40,第一步是-22。如果衰减率接近0,那么未来的奖励就无足轻重,如果衰减率接近1,那么未来和现在一样重要。典型的衰减率通常为是 0.95 或 0.99。如果衰减率为 0.95,那么未来 13 步的奖励大约是即时奖励的一半( 0.95^{13}\approx0.5 ),而当衰减率为 0.99,未来 69 步的奖励是即时奖励的一半。在 CartPole 环境下,行为具有相对短期的影响,因此选择 0.95 的衰减率是合理的。
当然,如果好的动作后面跟着坏的动作,好的动作也会受到牵连得到低分,但是我们训练的足够久的话,好动作平均会比坏的动作得分要高,因此我们要训练多轮,并对每个动作的得分进行标准化。
策略梯度
正如前面所讨论的,策略梯度(PG)算法沿着高回报的梯度来优化策略参数。一种流行的 PG 算法,称为增强算法,在 1992 由 Ronald Williams 提出。这是一个常见的变体:
- 首先,让神经网络策略玩几次游戏,并在每一步计算梯度,这使得智能体更可能选择行为,但不应用这些梯度。
- 运行几次后,计算每个动作的得分(如上)。
- 每个梯度向量乘以相应的动作得分:分数为正的(好的动作),应用较早计算的梯度,增大未来选择的概率。但是,分数是负的(坏动作),应用负梯度来减小可能。
- 最后,计算所有得到的梯度向量的平均值,并使用它来执行梯度下降步骤。
下面是TF的实现
在执行阶段,算法将运行策略,并在每个步骤中评估这些梯度张量并存储它们的值。在多次运行之后,它如先前所解释的调整这些梯度(即,通过动作分数乘以它们并使它们归一化),并计算调整后的梯度的平均值。接下来,需要将结果梯度反馈到优化器,以便它可以执行优化步骤。这意味着对于每一个梯度向量我们需要一个占位符。
n_inputs = 4
n_hidden = 4
n_outputs = 1
initializer = tf.contrib.layers.variance_scaling_initializer()
learning_rate = 0.01
X = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden = fully_connected(X, n_hidden, activation_fn=tf.nn.elu,weights_initializer=initializer)
logits = fully_connected(hidden, n_outputs, activation_fn=None,weights_initializer=initializer)
outputs = tf.nn.sigmoid(logits)
p_left_and_right = tf.concat(axis=1, values=[outputs, 1 - outputs])
action = tf.multinomial(tf.log(p_left_and_right), num_samples=1)
y = 1. - tf.to_float(action)
cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits)
optimizer = tf.train.AdamOptimizer(learning_rate)
grads_and_vars = optimizer.compute_gradients(cross_entropy)
#调用优化器的compute_gradients()方法,而不是minimize()方法。这是因为我们想要在使用它们之前调整梯度
gradients = [grad for grad, variable in grads_and_vars]
gradient_placeholders = []
grads_and_vars_feed = []
for grad, variable in grads_and_vars:
gradient_placeholder = tf.placeholder(tf.float32, shape=grad.get_shape())
gradient_placeholders.append(gradient_placeholder)
grads_and_vars_feed.append((gradient_placeholder, variable))
training_op = optimizer.apply_gradients(grads_and_vars_feed)
#调用优化器的apply_gradients()函数,该函数接受梯度向量/变量对的列表。
#我们不给它原始的梯度向量,而是给它一个包含更新梯度的列表
init = tf.global_variables_initializer()
saver = tf.train.Saver()
计算总折扣奖励,给予原始奖励,以及归一化多次循环的结果
def discount_rewards(rewards, discount_rate):
discounted_rewards = np.empty(len(rewards))
cumulative_rewards = 0
for step in reversed(range(len(rewards))):
cumulative_rewards = rewards[step] + cumulative_rewards * discount_rate
discounted_rewards[step] = cumulative_rewards
return discounted_rewards
def discount_and_normalize_rewards(all_rewards, discount_rate): #归一化
all_discounted_rewards = [discount_rewards(rewards) for rewards in all_rewards]
flat_rewards = np.concatenate(all_discounted_rewards)
reward_mean = flat_rewards.mean()
reward_std = flat_rewards.std()
return [(discounted_rewards - reward_mean)/reward_std
for discounted_rewards in all_discounted_rewards]
n_iterations = 250 # 训练迭代次数
n_max_steps = 1000 # 每一次的最大步长
n_games_per_update = 10 # 每迭代十次训练一次策略网络
save_iterations = 10 # 每十次迭代保存模型
discount_rate = 0.95
with tf.Session() as sess:
init.run()
for iteration in range(n_iterations):
all_rewards = [] #每一次的所有奖励
all_gradients = [] #每一次的所有梯度
for game in range(n_games_per_update):
current_rewards = [] #当前步的所有奖励
current_gradients = [] #当前步的所有梯度
obs = env.reset()
for step in range(n_max_steps):
action_val, gradients_val = sess.run([action, gradients],
feed_dict={X: obs.reshape(1, n_inputs)}) # 一个obs
obs, reward, done, info = env.step(action_val[0][0])
current_rewards.append(reward)
current_gradients.append(gradients_val)
if done:
break
all_rewards.append(current_rewards)
all_gradients.append(current_gradients)
# 此时我们每10次运行一次策略,即使用迭代10次的结果来优化当前的策略。
all_rewards = discount_and_normalize_rewards(all_rewards)
feed_dict = {}
for var_index, grad_placeholder in enumerate(gradient_placeholders):
# 将梯度与行为分数相乘,并计算平均值
mean_gradients = np.mean(
[reward * all_gradients[game_index][step][var_index]
for game_index, rewards in enumerate(all_rewards)
for step, reward in enumerate(rewards)],
axis=0)
feed_dict[grad_placeholder] = mean_gradients
sess.run(training_op, feed_dict=feed_dict)
if iteration % save_iterations == 0:
saver.save(sess, "./my_policy_net_pg.ckpt")
在运行了这 10 次之后,我们使用
discount_and_normalize_rewards()
函数计算动作得分;我们遍历每个可训练变量,在所有次数和所有步骤中,通过其相应的动作分数来乘以每个梯度向量;并且我们计算结果的平均值。最后,我们运行训练操作,给它提供平均梯度(对每个可训练变量提供一个)。我们继续每 10 个训练次数保存一次模型。
尽管它相对简单,但是该算法是非常强大的。你可以用它来解决更难的问题,而不仅仅是平衡一辆手推车上的平衡杆。事实上,AlphaGo 是基于类似的 PG 算法(加上蒙特卡罗树搜索,这超出了本书的范围)。
马尔可夫决策过程
在20世纪初,Markov 研究了没有记忆的随机过程,称为
马尔可夫链
。它具有固定数量的状态,并且在每个步骤中随机地从一个状态演化到另一个状态。它从状态
S
演变为状态
S'
的概率是固定的,它只依赖于
(S, S')
对,而不是依赖于过去的状态(没有记忆)。
下图展示了一个例子。从状态
S0
开始,下一步有 70% 的概率保持不变。但最终必然离开那个状态,并且永远不会回来,因为没有其他状态回到
S0
。如果它进入状态
S1
,那么它很可能会进入状态
S2
(90% 的概率),然后立即回到状态
S1
(以 100% 的概率)。它可以在这两个状态之间交替多次,但最终它会落入状态
S3
并永远留在那里(这是一个终端状态)。马尔可夫链在不同学科有着很多不同的应用。
马尔可夫决策过程(MDP)最初在 20 世纪 50 年代由 Richard Bellman 描述的。它类似于马尔可夫链,但有一个不同:状态转移中,一个智能体可以选择几种可能的动作中的一个,并且转移概率取决于所选择的动作。此外,一些状态转移返回一些奖励(正或负),智能体的目标是找到一个策略,随着时间的推移将最大限度地提高奖励。
如图,智能体从
S0
开始,有
A0
、
A1
或
A2
三个动作可以选择。可以选择a1呆在原地不动,但是,如果它选择动作A0,它有 70% 的概率获得 10 奖励,并保持在状态S0。在状态
S0
中,清楚地知道
A0
是最好的选择,在状态S2中,智能体别无选择,只能采取行动
A1
,但是在状态
S1
中,智能体否应该保持不动(
A0
)或通过火(
A2
),这是不明确的。
Bellman 找到了一种估计任何状态
S
的最佳状态值的方法,他提出了
V(s)
,它是智能体在其采取最佳行为达到状态
s
后所有衰减未来奖励的总和的平均期望。
这引出了一种算法,可以精确估计每个可能状态的最优状态值:
- 首先将所有状态值估计初始化为零
- 然后用数值迭代算法迭代更新它们(见公式 16-2)
给定足够的时间,这些估计保证收敛到最优状态值,对应于最优策略。
该算法是动态规划的一个例子,它将了一个复杂的问题变为可处理的子问题,迭代地处理
但这仍不能明确应该让智能体采取什么动作,Bellman 发现了一种非常类似的算法来估计
最优状态-动作值
(
state-action values
),通常称为
Q 值
。状态行动
(S, A)
对的最优 Q 值,记为
Q(s, a)
,是智能体在到达状态
S
,然后选择动作
A
之后平均衰减未来奖励的期望的总和。但是在它看到这个动作的结果之前,假设它在该动作之后的动作是最优的。
当智能体处于状态
S
时,它应该选择具有最高 Q 值的动作
可以写一个简单的实现:
nan=np.nan # 代表不可能的动作
T = np.array([ # shape=[s, a, s']
[[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],
[[0.0, 1.0, 0.0], [nan, nan, nan], [0.0, 0.0, 1.0]],
[[nan, nan, nan], [0.8, 0.1, 0.1], [nan, nan, nan]], ])
R = np.array([ # shape=[s, a, s']
[[10., 0.0, 0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]],
[[10., 0.0, 0.0], [nan, nan, nan], [0.0, 0.0, -50.]],
[[nan, nan, nan], [40., 0.0, 0.0], [nan, nan, nan]], ])
possible_actions = [[0, 1, 2], [0, 2], [1]]
Q = np.full((3, 3), -np.inf) # -inf 对应着不可能的动作
for state, actions in enumerate(possible_actions):
Q[state, actions] = 0.0 # 对所有可能的动作初始化为0.0
learning_rate = 0.01
discount_rate = 0.95
n_iterations = 100
for iteration in range(n_iterations):
Q_prev = Q.copy()
for s in range(3):
for a in possible_actions[s]:
Q[s, a] = np.sum([T[s, a, sp] * (R[s, a, sp] + discount_rate * np.max(Q_prev[sp]))
for sp in range(3)])
array([[ 21.89498982, 20.80024033, 16.86353093],
[ 1.11669335, -inf, 1.17573546],
[ -inf, 53.86946068, -inf]])
>>> np.argmax(Q, axis=1) # 每一状态的最优动作
array([0, 2, 1])
这给我们这个 MDP 的最佳策略, 0.95 的衰减率时:状态
S1
选择通过火焰!而衰减率降低到 0.9,最好的动作变成保持不变。因为如果你认为现在比未来更重要,那么未来奖励的前景是不值得立刻经历痛苦(通过火)。
时间差分学习与 Q 学习
离散动作的强化学习常常都是马尔可夫决策,但是问题在于智能体最开始不知道转移的可能性(T)和奖励(R)。要想知道奖励,至少要经过一次,要知道转移的概率,要经过多次来估计得到。
时间差分学习 (TD 学习)算法与数值迭代算法非常类似,智能体使用探索策略,如随机探索 MDP,并且随着它的发展,TD 学习算法基于实际观察到的转换和奖励来更新状态值的估计。
-
a
是学习率(例如 0.01)
类似地,此时的Q 学习算法是
TD 学习与随机梯度下降有许多相似之处,特别是它一次处理一个样本的行为。就像 SGD 一样,只有当你逐渐降低学习速率时,它才能真正收敛
下面是实现
import numpy.random as rnd
learning_rate0 = 0.05
learning_rate_decay = 0.1
n_iterations = 20000
s = 0 # 在状态 0开始
Q = np.full((3, 3), -np.inf) # -inf 对应着不可能的动作
for state, actions in enumerate(possible_actions):
Q[state, actions] = 0.0 # 对于所有可能的动作初始化为 0.0
for iteration in range(n_iterations):
a = rnd.choice(possible_actions[s]) # 随机选择动作
sp = rnd.choice(range(3), p=T[s, a]) # 使用 T[s, a] 挑选下一状态
reward = R[s, a, sp]
learning_rate = learning_rate0 / (1 + iteration * learning_rate_decay)
Q[s, a] = learning_rate * Q[s, a] + (1 - learning_rate) * (reward + discount_rate * np.max(Q[sp]))
s = sp # 移动至下一状态
给定足够的迭代,该算法将收敛到最优 Q 值。这被称为离线策略算法,因为正在训练的策略不是正在执行的策略。
策略探索
只有对策略探索清楚,Q学习才会起作用。单纯随机的探索需要花很长的时间,一个更好的方法是:
ε 贪婪策略
,它以概率
ε
随机地探索或以概率为
1-ε
选择具有最高 Q 值的动作。ε 贪婪策略的优点在于,与完全随机策略相比,将花费越来越多的时间来探索环境中有趣的部分,因为 Q 值估计越来越好,同时仍花费一些时间访问 MDP 的未知区域。我们常常让
ε
初始的时候为很高的值(如1),然后逐渐减小它(如下降到 0.05)。
另一种方法是鼓励探索策略来尝试它以前没有尝试过的行动。这可以被实现为附加于 Q 值估计的奖金
近似Q学习
Q 学习的主要问题是,它不能很好地扩展到具有许多状态和动作的大(甚至中等)的 MDP,因为状态数量非常多,你绝对无法追踪每一个 Q 值的估计值。
解决方案是找到一个函数,使用可管理数量的参数来近似 Q 值。这被称为近似 Q 学习。之前人们一直在手工提取Q值,但是 DeepMind 表明使用深度神经网络可以工作得更好,特别是对于复杂的问题。它不需要任何特征工程。用于估计 Q 值的 DNN 被称为深度 Q 网络(DQN),并且使用近似 Q 学习的 DQN 被称为深度 Q 学习。
学习去使用深度 Q 学习来玩 Ms.Pac-Man
首先要安装一些环境
$ brew install cmake boost boost-python sdl2 swig wget
$ pip3 install --upgrade 'gym[all]'
你可以创建一个吃豆人小姐的环境
>>> env = gym.make("MsPacman-v0")
>>> obs = env.reset()
>>> obs.shape # [长,宽,通道]
(210, 160, 3)
>>> env.action_space
Discrete(9)
有九个离散动作可用,它对应于操纵杆的九个可能位置(左、右、上、下、中、左上等)。
观察结果是 Atari 屏幕的截图(上图左),表示为 3D Numpy 矩阵。这些图像有点大,所以我们将创建一个小的预处理函数,将图像裁剪并缩小到88×80像素,将其转换成灰度,并提高 Ms.Pac-Man 的对比度。这将减少 DQN 所需的计算量,并加快培训练。
mspacman_color = np.array([210, 164, 74]).mean()
def preprocess_observation(obs):
img = obs[1:176:2, ::2] # 裁剪
img = img.mean(axis=2) # 灰度化
img[img==mspacman_color] = 0 # 提升对比度
img = (img - 128) / 128 - 1 # 正则化为-1到1.
return img.reshape(88, 80, 1)
接下来,让我们创建 DQN。它可以只取一个状态动作对
(S,A)
作为输入,并输出相应的 Q 值
Q(s,a)
的估计值,
我们将使用的训练算法需要两个具有相同架构(但不同参数)的 DQN:一个将在训练期间用于驱动 Ms.Pac-Man(the
actor
,行动者),另一个将观看行动者并从其试验和错误中学习(the
critic
,评判者)。每隔一定时间,我们把评判者网络复制给行动者网络。因为我们需要两个相同的 DQN,所以我们将创建一个
q_network()
函数来构建它们
from tensorflow.contrib.layers import convolution2d, fully_connected
input_height = 88
input_width = 80
input_channels = 1
conv_n_maps = [32, 64, 64]
conv_kernel_sizes = [(8,8), (4,4), (3,3)]
conv_strides = [4, 2, 1]
conv_paddings = ["SAME"]*3
conv_activation = [tf.nn.relu]*3
n_hidden_in = 64 * 11 * 10 # conv3 有 64 个 11x10 映射
each n_hidden = 512
hidden_activation = tf.nn.relu
n_outputs = env.action_space.n # 9个离散动作
initializer = tf.contrib.layers.variance_scaling_initializer()
def q_network(X_state, scope):
prev_layer = X_state
conv_layers = []
with tf.variable_scope(scope) as scope:
for n_maps, kernel_size, stride, padding, activation in zip(conv_n_maps, conv_kernel_sizes,
conv_strides,
conv_paddings, conv_activation):
prev_layer = convolution2d(prev_layer,
num_outputs=n_maps,
kernel_size=kernel_size,
stride=stride, padding=padding,
activation_fn=activation,
weights_initializer=initializer)
conv_layers.append(prev_layer)
last_conv_layer_flat = tf.reshape(prev_layer, shape=[-1, n_hidden_in])
hidden = fully_connected(last_conv_layer_flat, n_hidden,
activation_fn=hidden_activation, weights_initializer=initializer)
outputs = fully_connected(hidden, n_outputs,
activation_fn=None,
weights_initializer=initializer)
trainable_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
scope=scope.name)
trainable_vars_by_name = {var.name[len(scope.name):]: var
for var in trainable_vars}
return outputs, trainable_vars_by_name
其中,
trainable_vars_by_name
字典收集了所有 DQN 的可训练变量。当我们创建操作以将评论家 DQN 复制到行动者 DQN 时,这将是有用的。字典的键是变量的名称,去掉与范围名称相对应的前缀的一部分。看起来像这样:
>>> trainable_vars_by_name
{'/Conv/biases:0': <tensorflow.python.ops.variables.Variable at 0x121cf7b50>, '/Conv/weights:0': <tensorflow.python.ops.variables.Variable...>,
'/Conv_1/biases:0': <tensorflow.python.ops.variables.Variable...>, '/Conv_1/weights:0': <tensorflow.python.ops.variables.Variable...>, '/Conv_2/biases:0': <tensorflow.python.ops.variables.Variable...>, '/Conv_2/weights:0': <tensorflow.python.ops.variables.Variable...>, '/fully_connected/biases:0': <tensorflow.python.ops.variables.Variable...>, '/fully_connected/weights:0': <tensorflow.python.ops.variables.Variable...>, '/fully_connected_1/biases:0': <tensorflow.python.ops.variables.Variable...>, '/fully_connected_1/weights:0': <tensorflow.python.ops.variables.Variable...>}
输入占位符,以及复制评论家 DQN 给行动者 DQN 的操作:
X_state = tf.placeholder(tf.float32,
shape=[None, input_height, input_width,input_channels])
actor_q_values, actor_vars = q_network(X_state, scope="q_networks/actor")
critic_q_values, critic_vars = q_network(X_state, scope="q_networks/critic")
copy_ops = [actor_var.assign(critic_vars[var_name])
for var_name, actor_var in actor_vars.items()]
copy_critic_to_actor = tf.group(*copy_ops) #tf.group()函数将所有赋值操作分组到一个方便的操作中
行动者 DQN 可以用来扮演 Ms.Pac-Man(最初非常糟糕)。正如前面所讨论的,你希望它足够深入地探究游戏,所以通常情况下你想将它用 ε 贪婪策略或另一种探索策略相结合。
评论家将试图使其预测的 Q 值去匹配行动者通过其经验的游戏估计的 Q 值。具体的说
- 让行动者玩一段时间,把所有的经验保存在回放记忆存储器中。
- 从回放存储器中采样一批记忆,并且我们将估计这些存储器中的 Q 值。
- 将使用监督学习技术训练评论家 DQN 去预测这些 Q 值。每隔几个训练周期,我们会把评论家 DQN 复制到行动者 DQN。
回放记忆是可选的,但强烈推荐使它存在。没有它,你会训练评论家 DQN 使用连续的经验,这可能是相关的。这将引入大量的偏差并且减慢训练算法的收敛性。通过使用回放记忆,我们确保馈送到训练算法的存储器可以是不相关的。
添加评论家 DQN 的训练操作:
X_action = tf.placeholder(tf.int32, shape=[None])
q_value = tf.reduce_sum(critic_q_values * tf.one_hot(X_action, n_outputs), axis=1, keep_dims=True)
#选择的动作相对应的 Q 值
假设目标Q值将通过占位符馈入。我们还创建了一个不可训练的变量
global_step
。优化器的
minimize()
操作将负责增加它。另外,我们创建了
init
操作和
Saver
::
y = tf.placeholder(tf.float32, shape=[None, 1])
cost = tf.reduce_mean(tf.square(y - q_value))
global_step = tf.Variable(0, trainable=False, name='global_step')
optimizer = tf.train.AdamOptimizer(learning_rate)
training_op = optimizer.minimize(cost, global_step=global_step)
init = tf.global_variables_initializer()
saver = tf.train.Saver()
编写一个小函数来随机地从回放记忆中采样一批处理:
from collections import deque
replay_memory_size = 10000
replay_memory = deque([], maxlen=replay_memory_size)
def sample_memories(batch_size):
indices = rnd.permutation(len(replay_memory))[:batch_size]
cols = [[], [], [], [], []] # state, action, reward, next_state, continue
for idx in indices:
memory = replay_memory[idx]
for col, value in zip(cols, memory):
col.append(value)
cols = [np.array(col) for col in cols]
return (cols[0], cols[1], cols[2].reshape(-1, 1), cols[3],cols[4].reshape(-1, 1))
使用 ε 贪婪策略,并在 50000 个训练步骤中逐步将
ε
从 1 降低到 0.05。
eps_min = 0.05
eps_max = 1.0
eps_decay_steps = 50000
def epsilon_greedy(q_values, step):
epsilon = max(eps_min, eps_max - (eps_max-eps_min) * step/eps_decay_steps)
if rnd.rand() < epsilon:
return rnd.randint(n_outputs) # 随机动作
else:
return np.argmax(q_values) # 最优动作
初始化变量:
n_steps = 100000 # 总的训练步长
training_start = 1000 # 在游戏1000次迭代后开始训练
training_interval = 3 # 每3次迭代训练一次
save_steps = 50 # 每50训练步长保存模型
copy_steps = 25 # 每25训练步长后复制评论家Q值到行动者
discount_rate = 0.95
skip_start = 90 # 跳过游戏开始(只是等待时间)
batch_size = 50
iteration = 0 # 游戏迭代
checkpoint_path = "./my_dqn.ckpt"
done = True # env 需要被重置
开始训练:
with tf.Session() as sess:
if os.path.isfile(checkpoint_path):
saver.restore(sess, checkpoint_path)
else:
init.run()
while True:
step = global_step.eval()
if step >= n_steps:
break
iteration += 1
if done: # 游戏结束,重来
obs = env.reset()
for skip in range(skip_start): # 跳过游戏开头
obs, reward, done, info = env.step(0)
state = preprocess_observation(obs)
# 行动者评估要干什么
q_values = actor_q_values.eval(feed_dict={X_state: [state]})
action = epsilon_greedy(q_values, step)
# 行动者开始玩游戏
obs, reward, done, info = env.step(action)
next_state = preprocess_observation(obs)
# 让我们记下来刚才发生了啥
replay_memory.append((state, action, reward, next_state, 1.0 - done)) state = next_state
if iteration < training_start or iteration % training_interval != 0: continue
# 评论家学习
X_state_val, X_action_val, rewards, X_next_state_val, continues = ( sample_memories(batch_size))
next_q_values = actor_q_values.eval( feed_dict={X_state: X_next_state_val})
max_next_q_values = np.max(next_q_values, axis=1, keepdims=True)
y_val = rewards + continues * discount_rate * max_next_q_values
training_op.run(feed_dict={X_state: X_state_val,X_action: X_action_val, y: y_val})
# 复制评论家Q值到行动者