Tech Blog

上一期 MyEncyclopedia公众号文章 SARSA、Q-Learning和Expected SARSA时序差分算法训练CartPole中,我们通过CartPole的OpenAI Gym环境实现了Q-learning算法,这一期,我们将会分析Q-learning算法面临的maximization bias 问题和提出double learning算法来改进。接着,我们将tabular Q-learning算法扩展到用带参函数来近似 Q(s, a),这就是Deepmind 在2015年Nature上发表的Deep Q Network (DQN)思想:用神经网络结合Q-learning算法实现超越人类玩家打Atari游戏的水平。

Q-Learning 回顾

\[ \begin{align*} &\textbf{Q-learning (off-policy TD Control) for estimating } \pi \approx \pi_{*} \\ & \text{Algorithm parameters: step size }\alpha \in ({0,1}]\text{, small }\epsilon > 0 \\ & \text{Initialize }Q(s,a), \text{for all } s \in \mathcal{S}^{+}, a \in \mathcal{A}(s) \text{, arbitrarily except that } Q(terminal, \cdot) = 0 \\ & \text{Loop for each episode:}\\ & \quad \text{Initialize }S\\ & \quad \text{Loop for each step of episode:} \\ & \quad \quad \text{Choose } A \text{ from } S \text{ using policy derived from } Q \text{ (e.g., } \epsilon\text{-greedy)} \\ & \quad \quad \text{Take action }A, \text { observe } R, S^{\prime} \\ & \quad \quad Q(S,A) \leftarrow Q(S,A) + \alpha[R+\gamma \max_{a}Q(S^{\prime}, a) - Q(S,A)] \\ & \quad \quad S \leftarrow S^{\prime}\\ & \quad \text{until }S\text{ is terminal} \\ \end{align*} \]

SARSA、Q-Learning和Expected SARSA时序差分算法训练CartPole 中,我们实现了同样基于 \(\epsilon\)-greedy 策略的Q-learning算法和SARSA算法,两者代码上的区别确实不大,但本质上Q-learning是属于 off-policy 范畴而 SARSA却属于 on-policy 范畴。一种理解方式是,Q-learning相比于SARSA少了第二次从 \(\epsilon\)-greedy 策略采样出下一个action,即S, A, R', S', A' 五元组中最后一个A',而直接通过max操作去逼近 \(q^{*}\)。如此,Q-learning并没有像SARSA完成一次完整的GPI(Generalized Policy Iteration),缺乏on-policy的策略迭代的特点,故而 Q-learning 属于off-policy方法。我们也可以从另一个角度来分析两者的区别。注意到这两个算法不是一定非要使用 \(\epsilon\)-greedy 策略的。对于Q-learning来说,完全可以使用随机策略,理论上已经证明,只要保证每个action以后依然有几率会被探索下去,Q-learning 最终会收敛到最优策略。Q-learning使用 \(\epsilon\)-greedy 是为了能快速收敛。对于SARSA算法来说,则无法使用随机策略,因为随机策略无法形成策略提升。而 \(\epsilon\)-greedy 策略却可以形成策略迭代,完成策略提升,当然,\(\epsilon\)-greedy 策略在 SARSA 算法中也可以保证快速收敛。因此,尽管两者都使用 \(\epsilon\)-greedy 策略再借由环境产生reward和state,它们的作用并非完全一样。至此,我们可以体会到on-policy和off-policy本质的区别。

收敛条件

Tabular Q-Learning 收敛到最佳Q函数的条件如下[2]:

\[ \Sigma^{\infty}_{n=0} \alpha_{n} = {\infty} \quad \text{ AND } \quad \Sigma^{\infty}_{n=0} \alpha^2_{n} \lt {\infty} \]

一种方式是将 \(\alpha\)设置成 (s, a)访问次数的倒数:\(\alpha_{n}(s,a) = 1/ n(s,a )\)

则整体更新公式为

\[ Q(s,a) \leftarrow Q(s,a) + \alpha_n(s, a)[R+\gamma \max_{a^{\prime}}Q(s^{\prime}, a^{\prime}) - Q(s, a)] \]

Q-Learning 最大化偏差问题

Q-Learning 会产生最大化偏差问题(Maximization Bias,在Sutton 教材6.7节),它的原因是用估计值中取最大值去估计真实值中最大是有偏的。这个可以做如下试验来模拟,若有5个 [-3, 3] 的离散均匀分布 \(d_i\)\(\max(\mathbb{E}[d_i]) = 0\),但是若我们用单批采样 \(x_i \sim d_i\)来估算 \(\mathbb{E}[d_i]\)在取max的话,\(\mathbb{E}[{\max(x_i)]}\) 是有bias的。但是如果我们将这个过程分解成选择最大action和评估其值两步,每一步用独立的采样集合就可以做到无偏,这个改进方法称为double learning。具体过程为第一步在\(Q_1\)集合中找到最大的action,第二步在\(Q_2\)中返回此action值,即:

\[ \begin{align*} A^{\star} = \operatorname{argmax}_{a}Q_1(a) \\ Q_2(A^{\star}) = Q_2(\operatorname{argmax}_{a}Q_1(a)) \end{align*} \]

则无限模拟后结果是无偏的:\(\mathbb{E}[Q_2(A^{\star})] = q(A^{\star})\) 下面是简单模拟试验两种方法的均值比较

Maximization Bias

试验完整代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import random
from math import floor
import numpy as np
import pandas as pd
import seaborn as sns


def uniform(a: int, b: int) -> int:
u = random.random()
return a + floor((b - a + 1) * u)


if __name__ == "__main__":
total_max_bias = 0
avgs_max_bias = []
total_double_sampling = 0
avgs_double_sampling = []

for e in range(1, 100):
samples = np.array([uniform(-3, 3) for _ in range(5)])
max_sample = max(samples)
total_max_bias += max_sample
avgs_max_bias.append(total_max_bias / e)

samples2 = np.array([uniform(-3, 3) for _ in range(5)])
total_double_sampling += samples2[np.argmax(samples)]
avgs_double_sampling.append(total_double_sampling / e)

df = pd.DataFrame({'Max of Samples': avgs_max_bias, 'Double Samples': avgs_double_sampling})
import matplotlib.pyplot as plt
sns.lineplot(data=df)
plt.show()

回到Q-learning 中使用的 \(\epsilon\)-greedy策略,Q-learning可以保证随着\(\epsilon\) 的减小,最大化偏差会 asymptotically 趋近于真实值,但是double learning 可以更快地趋近于真实值。

Maximization Bias vs Double learning

下面是Sutton 强化学习第二版6.7节中完整的Double Q-learning算法。

\[ \begin{align*} &\textbf{Double Q-learning, for estimating } Q_1 \approx Q_2 \approx q_{*} \\ & \text{Algorithm parameters: step size }\alpha \in ({0,1}]\text{, small }\epsilon > 0 \\ & \text{Initialize }Q_1(s,a), \text{ and } Q_2(s,a) \text{, for all } s \in \mathcal{S}^{+}, a \in \mathcal{A}(s) \text{, such that } Q(terminal, \cdot) = 0 \\ & \text{Loop for each episode:}\\ & \quad \text{Initialize }S\\ & \quad \text{Loop for each step of episode:} \\ & \quad \quad \text{Choose } A \text{ from } S \text{ using policy } \epsilon\text{-greedy in } Q_1 + Q_2 \\ & \quad \quad \text{Take action }A, \text { observe } R, S^{\prime} \\ & \quad \quad \text{With 0.5 probability:} \\ & \quad \quad \quad Q_1(S,A) \leftarrow Q_1(S,A) + \alpha \left ( R+\gamma Q_2(S^{\prime}, \operatorname{argmax}_{a}Q_1(S^{\prime}, a)) - Q_1(S,A) \right )\\ & \quad \quad \text{else:} \\ & \quad \quad \quad Q_1(S,A) \leftarrow Q_1(S,A) + \alpha \left ( R+\gamma Q_2(S^{\prime}, \operatorname{argmax}_{a}Q_1(S^{\prime}, a)) - Q_1(S,A) \right )\\ & \quad \quad S \leftarrow S^{\prime}\\ & \quad \text{until }S\text{ is terminal} \\ \end{align*} \]

更详细内容,可以参考 Hado V. Hasselt 的 Double Q-learning paper [3]。

Gradient Q-Learning

Tabular Q-learning由于受制于维度爆炸,无法扩展到高维状态空间,一般近似解决方案是用 approximating function来逼近Q函数。即我们将状态抽象出一组特征 \(s = \vec x= [x_1, x_2, ..., x_n]^T\),Q 用一个 x 的函数来近似表达 \(Q(s, a) \approx g(\vec x; \theta)\),如此,就联系起了深度神经网络。有了函数表达,深度学习还必须的元素是损失函数,这个很自然的可以用 TD error。至此,问题转换成深度学习的几个要素均已具备,Q-learning算法改造成了深度学习中的有监督问题。

估计值:\(Q\left(s, a ; \theta\right)\)

目标值:\(r+\gamma \max _{a^{\prime}} Q\left(s^{\prime}, a^{\prime} ; \theta\right)\)

损失函数:

\[ L\left(\theta\right)=\mathbb{E}_{\left(s, a, r, s^{\prime}\right) \sim \mathrm{U}(D)}\left[\left(r+\gamma \max _{a^{\prime}} Q\left(s^{\prime}, a^{\prime} ; \theta\right)-Q\left(s, a ; \theta\right)\right)^{2}\right] \]

收敛性分析

首先明确一点,至此 gradient q-learning 和 tabular Q-learning 一样,都是没有记忆的,即对于一个新的环境产生的 sample 去做 stochastic online update。

若Q函数是状态特征的线性函数,即 \(Q(s, a; \theta) = \Sigma_i w_i x_i\) ,那么线性Gradient Q-learning的收敛条件和Tabular Q-learning 一样,也为

\[ \Sigma^{\infty}_{n=0} \alpha_{n} = {\infty} \quad \text{ AND } \quad \Sigma^{\infty}_{n=0} \alpha^2_{n} \lt {\infty} \]

若Q函数是非线性函数,即使符合上述条件,也无法保证收敛,本质上源于改变 \(\theta\) 使得 Q 值在 (s, a) 点上减小误差会影响 (s, a) 周边点的误差。

DQN减少不收敛的两个技巧

  1. \(\theta_{i-1} \rightarrow \theta_{i}\) 改变导致max中的估计值和目标值中的Q同时变化,面临着 chasing its own tail 的问题。解决的方法是使用不同的参数来parameterize两个Q,并且目标值的Q网络参数固定一段时间产生一批固定策略下的环境采样。这个技巧称为 Target Network。引入这个 trick 后深度学习的要素变成

估计值:\(Q\left(s, a ; \theta_{i}\right)\)

目标值:\(r+\gamma \max _{a^{\prime}} Q\left(s^{\prime}, a^{\prime} ; \theta_i^{-}\right)\)

损失函数,DQN在Nature上的loss函数: \[ L\left(\theta_{i}\right)=\mathbb{E}_{\left(s, a, r, s^{\prime}\right) \sim \mathrm{U}(D)}\left[\left(r+\gamma \max _{a^{\prime}} Q\left(s^{\prime}, a^{\prime} ; \theta_{i}^{-}\right)-Q\left(s, a ; \theta_{i}\right)\right)^{2}\right] \]

  1. 尽管目标值的 \(Q(;\theta^{-})\)固定了,但是\(\theta_{i-1} \rightarrow \theta_{i}\) 还会使得估计值的 \(Q(s, a;\theta_i)\) 在变化的同时影响其他的 \(Q(s_k, a_j;\theta_i)\),让之前训练过的 (s, a)的点的损失值发生变化,解决的办法是将 online stochastic 改成 batch gradient,也就是将最近的一系列采样值保存下来,这个方法称为 experience replay。

有了这两个优化,Deep Q Network投入实战效果就容易收敛了,以下是Deepmind 发表在Nature 的 Human-level control through deep reinforcement learning [1] 的完整算法流程。

\[ \begin{align*} &\textbf{Deep Q-learning with experience replay}\\ & \text{Initialize replay memory } D\text{ to capacity } N \\ & \text{Initialize action-value function } Q \text{ with random weights } \theta \\ & \text{Initialize target action-value function } \hat{Q} \text{ with weights } \theta^{-} = \theta \\ & \textbf{For} \text{ episode = 1, } M \textbf{ do} \\ & \text{Initialize sequences } s_1 = \{x_1\} \text{ and preprocessed sequence } \phi_1 = \phi(s_1)\\ & \quad \textbf{For } t=\text{ 1, T }\textbf{ do} \\ & \quad \quad \text{With probability }\epsilon \text{ select a random action } a_t \\ & \quad \quad \text{otherwise select } a_t = \operatorname{argmax}_{a}Q(\phi(s_t), a; \theta)\\ & \quad \quad \text{Execute action } a_t \text{ in emulator and observe reward } r_t \text{ and image }x_{t+1}\\ & \quad \quad \text{Set } s_{t+1} = s_t, a_t, x_{t+1} \text{ and preprocess } \phi_{t+1} = \phi(s_{t+1})\\ & \quad \quad \text{Store transition } (\phi_t, a_t, r_t, \phi_{t+1}) \text{ in } D\\ & \quad \quad \text{Sample random minibatch of transitions } (\phi_j, a_j, r_j, \phi_{j+1}) \text{ from } D\\ & \quad \quad \text{Set } y_j= \begin{cases} r_j \quad \quad\quad\quad\text{if episode terminates at step j+1}\\ r_j + \gamma \max_{a^{\prime}}\hat Q(\phi_{j+1}, a^{\prime}; \theta^{-}) \quad \text { otherwise}\\ \end{cases} \\ & \quad \quad \text{Perform a gradient descent step on } (y_j - Q(\phi_j, a_j; \theta))^2 \text{ with respect to the network parameters } \theta\\ & \quad \quad \text{Every C steps reset } \hat Q = Q\\ & \quad \textbf{End For} \\ & \textbf{End For} \end{align*} \]

DQN with Double Q-Learning

DQN 算法和 Double Q-Learning 能不能结合起来呢?Hado van Hasselt 在 Deep Reinforcement Learning with Double Q-learning [4] 中提出参考 Double Q-learning 将 DQN 的目标值改成如下函数,可以进一步提升最初DQN的效果。

目标值:\(r+\gamma Q(s^{\prime}, \max _{a^{\prime}} Q\left(s^{\prime}, a^{\prime}; \theta_t\right); \theta_t^{-})\)

参考资料

  1. Human-level control through deep reinforcement learning Volodymyr Mnih, Koray Kavukcuoglu, David Silver (2015)

  2. CS885 Reinforcement Learning Lecture 4b: May 11, 2018

  3. Double Q-learning Hado V. Hasselt (2010)

  4. Deep Reinforcement Learning with Double Q-learning Hado van Hasselt, Arthur Guez, David Silver (2015)

本篇是TSP问题从DP算法到深度学习系列第三篇,在这一篇中,我们会开始进入深度学习领域来求近似解法。本文会介绍并实现指针网络(Pointer Networks),一种seq-to-seq模型,它的设计目的就是为了解决TSP问题或者凸包(Convex Hull)问题。本文代码在 https://github.com/MyEncyclopedia/blog/tree/master/tsp/ptr_net_pytorch 中。

Pointer Networks

随着深度学习 seq-to-seq 模型作为概率近似模型在各领域的成功,TSP问题似乎也可以用同样的思路去解决。然而,传统的seq-to-seq 模型其输出的类别是预先固定的。例如,NLP RNN生成模型每一步会从 \(|V|\) 大的词汇表中产生一个单词。 然而,有很大一类问题,譬如TSP问题、凸包(Convex Hull)问题、Delaunay三角剖分问题,输出的类别不是事先固定的,而是随着输入而变化的。 Pointer Networks 的出现解决了这种限制:输出的类别可以通过指向某个输入,以此克服类别的问题,因此形象地取名为指针网络(Pointer Networks)。先来看看原论文中提到的三个问题。

凸包问题(Convex Hull)

如下图所示,需要在给定的10个点中找到若干个点,使得这些点包住了所有点。问题输入是不确定个数 n 个点的位置信息,输出是 k (k<=n)个点的。 这个经典的算法问题已经被证明找出精确解等价于排序问题(wikipedia 链接),因此时间复杂度为 \(O(n*log(n))\)

image info

\[ \begin{align*} &\text{Input: } \mathcal{P} &=& \left\{ P_{1}, \ldots, P_{10} \right\} \\ &\text{Output: } C^{\mathcal{P}} &=& \{2,4,3,5,6,7,2\} \end{align*} \]

TSP 问题

TSP 和凸包问题很类似,输入为不确定个数的 n 个点信息,输出为这 n 个点的某序列。在。。。中,我们可以将确定解的时间复杂度从 \(O(n!)\) 降到 \(O(n^2*2^n)\)

image info

\[ \begin{align*} &\text{Input: } \mathcal{P} &= &\left\{P_{1}, \ldots, P_{6} \right\} \\ &\text{Output: } C^{\mathcal{P}} &=& \{1,3,2,4,5,6,1\} \end{align*} \]

Delaunay三角剖分

Delaunay三角剖分问题是将平面上的散点集划分成三角形,使得在可能形成的三角剖分中,所形成的三角形的最小角最大。这个问题的输出是若干个集合,每个集合代表一个三角形,由输入点的编号表示。 image info

\[ \begin{align*} &\text{Input: } \mathcal{P} &=& \left\{P_{1}, \ldots, P_{5} \right\} \\ &\text{Output: } C^{\mathcal{P}} &=& \{(1,2,4),(1,4,5),(1,3,5),(1,2,3)\} \end{align*} \]

Seq-to-Seq 模型

现在假设n是固定的,传统基本的seq-to-seq模型(参数部分记为 \(\theta\) ),训练数据若记为\((\mathcal{P}, C^{\mathcal{P}})\),,将拟合以下条件概率:

\[ \begin{equation} p\left(\mathcal{C}^{\mathcal{P}} | \mathcal{P} ; \theta\right)=\prod_{i=1}^{m(\mathcal{P})} p\left(C_{i} | C_{1}, \ldots, C_{i-1}, \mathcal{P} ; \theta\right) \end{equation} \] 训练的方向是找到 \(\theta^{*}\) 来最大化上述联合概率,即: \[ \begin{equation} \theta^{*}=\underset{\theta}{\arg \max } \sum_{\mathcal{P}, \mathcal{C}^{\mathcal{P}}} \log p\left(\mathcal{C}^{\mathcal{P}} | \mathcal{P} ; \theta\right) \end{equation} \]

Content Based Input Attention

一种增强基本seq-to-seq模型的方法是加入attention机制。记encoder和decoder隐藏状态分别是 $ (e_{1}, , e_{n}) $ 和 $ (d_{1}, , d_{m()}) $。seq-to-seq第 i 次输出了 \(d_i\),注意力机制额外计算第i步的注意力向量 \(d_i^{\prime}\),并将其和\(d_i\)连接后作为隐藏状态。\(d_i^{\prime}\)的计算方式如下,输入 $ (e_{1}, , e_{n}) $ 和 i 对应的权重向量 $ (a_{1}^{i}, , a_{n}^{i}) $做点乘。

\[ d_{i} = \sum_{j=1}^{n} a_{j}^{i} e_{j} \]

$ (a_{1}^{i}, , a_{n}^{i}) $ 是向量 $ (u_{1}^{i}, , u_{n}^{i}) $ softmax后的值, \(u_{j}^{i}\) 表示 \(d_{i}\)\(e_{j}\)的距离,Pointer Networks论文中的距离为如下的tanh公式。

\[ \begin{eqnarray} u_{j}^{i} &=& v^{T} \tanh \left(W_{1} e_{j}+W_{2} d_\right) \quad j \in(1, \ldots, n) \\ a_{j}^{i} &=& \operatorname{softmax}\left(u_{j}^{i}\right) \quad j \in(1, \ldots, n) \end{eqnarray} \]

更多Attention计算方式

FloydHub Blog - Attention Mechanism 中,作者清楚地解释了两种经典的attention方法,第一种称为Additive Attention,由Dzmitry Bahdanau 提出,也就是Pointer Networks中通过tanh的计算方式,第二种称为 Multiplicative Attention,由Thang Luong*提出。

Luong Attention 有三种方法计算 \(d_{i}\)\(e_{j}\) 的距离(或者可以认为向量间的对齐得分)。

\[ \operatorname{score} \left( d_i, e_j \right)= \begin{cases} d_i^{\top} e_j & \text { dot } \\ d_i^{\top} W_a e_j & \text { general } \\ v_a^{\top} \tanh \left( W_a \left[ d_i ; e_j \right] \right) & \text { concat } \end{cases} \]

Pointer Networks

image info

Pointer Networks 基于Additive Attention,其创新之处在于用 \(u^i_j\) 作为第j个输入的评分,即第 i 次输出为1-n个输入中 \(u^i_j\) 得分最高的j作为输出,这样巧妙的解决了n不是预先固定的限制。

\[ \begin{eqnarray*} u_{j}^{i} &=& v^{T} \tanh \left(W_{1} e_{j}+W_{2} d_{i}\right) \quad j \in(1, \ldots, n) \\ p\left(C_{i} | C_{1}, \ldots, C_{i-1}, \mathcal{P}\right) &=& \operatorname{softmax}\left(u^{i}\right) \end{eqnarray*} \]

PyTorch 代码实现

在本系列第二篇 episode 2,中,我们说明过TSP数据集的格式,每一行字段意义如下

1
x0, y0, x1, y1, ... output 1 v1 v2 v3 ... 1

转换成PyTorch Dataset

每一个case会转换成nd.ndarray,共有五个分量,分别是 (input, input_len, output_in, output_out, output_len) 并且分装成pytorch的 Dataset类。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
from torch.utils.data import Dataset

class TSPDataset(Dataset):
"each data item of form (input, input_len, output_in, output_out, output_len)"
data: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]]

def __len__(self):
return len(self.data)

def __getitem__(self, index):
input, input_len, output_in, output_out, output_len = self.data[index]
return input, input_len, output_in, output_out, output_len
image info

PyTorch pad_packed_sequence 优化技巧

PyTorch 实现 seq-to-seq 模型一般会使用 pack_padded_sequence 以及 pad_packed_sequence 来减少计算量,本质上可以认为根据pad大小分批进行矩阵运算,减少被pad的矩阵元素导致的无效运算,详细的解释可以参考 https://github.com/sgrvinod/a-PyTorch-Tutorial-to-Image-Captioning#decoder-1。

image info

对应代码如下:

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class RNNEncoder(nn.Module):
rnn: Union[nn.LSTM, nn.GRU, nn.RNN]

def __init__(self, rnn_type: str, bidirectional: bool, num_layers: int, input_size: int, hidden_size: int, dropout: float):
super(RNNEncoder, self).__init__()
if bidirectional:
assert hidden_size % 2 == 0
hidden_size = hidden_size // 2
self.rnn = rnn_init(rnn_type, input_size=input_size, hidden_size=hidden_size, bidirectional=bidirectional,num_layers=num_layers, dropout=dropout)

def forward(self, src: Tensor, src_lengths: Tensor, hidden: Tensor = None) -> Tuple[Tensor, Tensor]:
lengths = src_lengths.view(-1).tolist()
packed_src = pack_padded_sequence(src, lengths)
memory_bank, hidden_final = self.rnn(packed_src, hidden)
memory_bank = pad_packed_sequence(memory_bank)[0]
return memory_bank, hidden_final

注意力机制相关代码

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Attention(nn.Module):
linear_out: nn.Linear

def __init__(self, dim: int):
super(Attention, self).__init__()
self.linear_out = nn.Linear(dim * 2, dim, bias=False)

def score(self, src: Tensor, target: Tensor) -> Tensor:
batch_size, src_len, dim = src.size()
_, target_len, _ = target.size()
target_ = target
src_ = src.transpose(1, 2)
return torch.bmm(target_, src_)

def forward(self, src: Tensor, target: Tensor, src_lengths: Tensor) -> Tuple[Tensor, Tensor]:
assert target.dim() == 3

batch_size, src_len, dim = src.size()
_, target_len, _ = target.size()

align_score = self.score(src, target)

mask = sequence_mask(src_lengths)
# (batch_size, max_len) -> (batch_size, 1, max_len)
mask = mask.unsqueeze(1)
align_score.data.masked_fill_(~mask, -float('inf'))
align_score = F.softmax(align_score, -1)

c = torch.bmm(align_score, src)

concat_c = torch.cat([c, target], -1)
attn_h = self.linear_out(concat_c)

return attn_h, align_score

参考资料

这一期我们进入第六章:时序差分学习(Temporal-Difference Learning)。TD Learning本质上是加了bootstrapping的蒙特卡洛(MC),也是model-free的方法,但实践中往往比蒙特卡洛收敛更快。我们选取OpenAI Gym中经典的CartPole环境来讲解TD。更多相关内容,欢迎关注 本公众号 MyEncyclopedia

CartPole OpenAI 环境

如图所示,小车上放了一根杆,杆会根据物理系统定理因重力而倒下,我们可以控制小车往左或者往右,目的是尽可能地让杆保持树立状态。

CartPole OpenAI Gym

CartPole 观察到的状态是四维的float值,分别是车位置,车速度,杆角度和杆角速度。下表为四个维度的值范围。给到小车的动作,即action space,只有两种:0,表示往左推;1,表示往右推。

Min Max
Cart Position -4.8 4.8
Cart Velocity -Inf Inf
Pole Angle -0.418 rad (-24 deg) 0.418 rad (24 deg)
Pole Angular Velocity -Inf Inf

离散化连续状态

从上所知,CartPole step() 函数返回了4维ndarray,类型为float32的连续状态空间。对于传统的tabular方法来说第一步必须离散化状态,目的是可以作为Q table的主键来查找。下面定义的State类型是离散化后的具体类型,另外 Action 类型已经是0和1,不需要做离散化处理。

{linenos
1
2
State = Tuple[int, int, int, int]
Action = int

离散化处理时需要考虑的一个问题是如何设置每个维度的分桶策略。分桶策略会决定性地影响训练的效果。原则上必须将和action以及reward强相关的维度做细粒度分桶,弱相关或者无关的维度做粗粒度分桶。举个例子,小车位置本身并不能影响Agent采取的下一动作,当给定其他三维状态的前提下,因此我们对小车位置这一维度仅设置一个桶(bucket size=1)。而杆的角度和角速度是决定下一动作的关键因素,因此我们分别设置成6个和12个。

以下是离散化相关代码,四个维度的 buckets=(1, 2, 6, 12)。self.q是action value的查找表,具体类型是shape 为 (1, 2, 6, 12, 2) 的ndarray。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class CartPoleAbstractAgent(metaclass=abc.ABCMeta):
def __init__(self, buckets=(1, 2, 6, 12), discount=0.98, lr_min=0.1, epsilon_min=0.1):
self.env = gym.make('CartPole-v0')

env = self.env
# [position, velocity, angle, angular velocity]
self.dims_config = [(env.observation_space.low[0], env.observation_space.high[0], 1),
(-0.5, 0.5, 1),
(env.observation_space.low[2], env.observation_space.high[2], 6),
(-math.radians(50) / 1., math.radians(50) / 1., 12)]
self.q = np.zeros(buckets + (self.env.action_space.n,))
self.pi = np.zeros_like(self.q)
self.pi[:] = 1.0 / env.action_space.n

def to_bin_idx(self, val: float, lower: float, upper: float, bucket_num: int) -> int:
percent = (val + abs(lower)) / (upper - lower)
return min(bucket_num - 1, max(0, int(round((bucket_num - 1) * percent))))

def discretize(self, obs: np.ndarray) -> State:
discrete_states = tuple([self.to_bin_idx(obs[d], *self.dims_config[d]) for d in range(len(obs))])
return discrete_states

train() 方法串联起来 agent 和 env 交互的流程,包括从 env 得到连续状态转换成离散状态,更新 Agent 的 Q table 甚至 Agent的执行policy,choose_action会根据执行 policy 选取action。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def train(self, num_episodes=2000):
for e in range(num_episodes):
print(e)
s: State = self.discretize(self.env.reset())

self.adjust_learning_rate(e)
self.adjust_epsilon(e)
done = False

while not done:
action: Action = self.choose_action(s)
obs, reward, done, _ = self.env.step(action)
s_next: State = self.discretize(obs)
a_next = self.choose_action(s_next)
self.update_q(s, action, reward, s_next, a_next)
s = s_next

choose_action 的默认实现为基于现有 Q table 的 \(\epsilon\)-greedy 策略。

{linenos
1
2
3
4
5
def choose_action(self, state) -> Action:
if np.random.random() < self.epsilon:
return self.env.action_space.sample()
else:
return np.argmax(self.q[state])

抽象出公共的基类代码 CartPoleAbstractAgent 之后,SARSA、Q-Learning和Expected SARSA只需要复写 update_q 抽象方法即可。

{linenos
1
2
3
4
class CartPoleAbstractAgent(metaclass=abc.ABCMeta):
@abc.abstractmethod
def update_q(self, s: State, a: Action, r, s_next: State, a_next: Action):
pass

TD Learning的精髓

在上一期,本公众号 MyEncyclopedia 的21点游戏的蒙特卡洛On-Policy控制介绍了Monte Carlo方法,知道MC需要在环境中模拟直至最终结局。若记\(G_t\)为t步以后的最终return,则 MC online update 版本更新为:

\[ V(S_t) \leftarrow V(S_t) + \alpha[G_{t} - V(S_t)] \]

可以认为 \(V(S_t)\) 向着目标为 \(G_t\) 更新了一小步。

而TD方法可以只模拟下一步,得到 \(R_{t+1}\),而余下步骤的return,\(G_t - R_{t+1}\) 用已有的 \(V(S_{t+1})\) 来估计,或者统计上称作bootstrapping。这样 TD 的更新目标值变成 \(R_{t+1} + \gamma V(S_{t+1})\),整体online update 公式则为: \[ V(S_t) \leftarrow V(S_t) + \alpha[R_{t+1} + \gamma V(S_{t+1})- V(S_t)] \]

概念上,如果只使用下一步 \(R_{t+1}\) 值然后bootstrap称为 TD(0),用于区分使用多步后的reward的TD方法。另外,变化的数值 \(R_{t+1} + \gamma V(S_{t+1})- V(S_t)\) 称为TD error。

另外一个和Monte Carlo的区别在于一般TD方法保存更精细的Q值,\(Q(S_t, A_t)\),并用Q值来boostrap,而MC一般用V值也可用Q值。

SARSA: On-policy TD 控制

SARSA的命名源于一次迭代产生了五元组 \(S_t,A_t,R_{t+1},S_{t+1},A_{t+1}\)。SARSA利用五个值做 action-value的 online update:

\[ Q(S_t,A_t) \leftarrow Q(S_t,A_t) + \alpha[R_{t+1}+\gamma Q(S_{t+1}, A_{t+1}) - Q(S_t,A_t)] \]

对应的Q table更新实现为:

{linenos
1
2
3
4
class SarsaAgent(CartPoleAbstractAgent):

def update_q(self, s: State, a: Action, r, s_next: State, a_next: Action):
self.q[s][a] += self.lr * (r + self.discount * (self.q[s_next][a_next]) - self.q[s][a])

SARSA 在执行policy 后的Q值更新是对于针对于同一个policy的,完成了一次策略迭代(policy iteration),这个特点区分于后面的Q-learning算法,这也是SARSA 被称为 On-policy 的原因。下面是完整算法伪代码。

\[ \begin{align*} &\textbf{Sarsa (on-policy TD Control) for estimating } Q \approx q_{*} \\ & \text{Algorithm parameters: step size }\alpha \in ({0,1}]\text{, small }\epsilon > 0 \\ & \text{Initialize }Q(s,a), \text{for all } s \in \mathcal{S}^{+}, a \in \mathcal{A}(s) \text{, arbitrarily except that } Q(terminal, \cdot) = 0 \\ & \text{Loop for each episode:}\\ & \quad \text{Initialize }S\\ & \quad \text{Choose } A \text{ from } S \text{ using policy derived from } Q \text{ (e.g., } \epsilon\text{-greedy)} \\ & \quad \text{Loop for each step of episode:} \\ & \quad \quad \text{Take action }A, \text { observe } R, S^{\prime} \\ & \quad \quad \text{Choose }A^{\prime} \text { from } S^{\prime} \text{ using policy derived from } Q \text{ (e.g., } \epsilon\text{-greedy)} \\ & \quad \quad Q(S,A) \leftarrow Q(S,A) + \alpha[R+\gamma Q(S^{\prime}, A^{\prime}) - Q(S,A)] \\ & \quad \quad S \leftarrow S^{\prime}; A \leftarrow A^{\prime} \\ & \quad \text{until }S\text{ is terminal} \\ \end{align*} \]

SARSA 训练分析

SARSA收敛较慢,1000次episode后还无法持久稳定,后面的Q-learning 和 Expected Sarsa 都可以在1000次episode学习长时间保持不倒的状态。

Q-Learning: Off-policy TD 控制

Q-Learning 是深度学习时代前强化学习领域中的著名算法,它的 online update 公式为: \[ Q(S_t,A_t) \leftarrow Q(S_t,A_t) + \alpha[R_{t+1}+\gamma \max_{a}Q(S_{t+1}, a) - Q(S_t,A_t)] \]

对应的 update_q() 方法具体实现

{linenos
1
2
3
4
class QLearningAgent(CartPoleAbstractAgent):

def update_q(self, s: State, a: Action, r, s_next: State, a_next: Action):
self.q[s][a] += self.lr * (r + self.discount * np.max(self.q[s_next]) - self.q[s][a])

本质上用现有的Q table中最好的action来bootrap 对应的最佳Q值,推导如下:

\[ \begin{aligned} q_{*}(s, a) &=\mathbb{E}\left[R_{t+1}+\gamma \max _{a^{\prime}} q_{*}\left(S_{t+1}, a^{\prime}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\mathbb{E}[R \mid S_{t}=s, A_{t}=a] + \gamma\sum_{s^{\prime}} p\left(s^{\prime}\mid s, a\right)\max _{a^{\prime}} q_{*}\left(s^{\prime}, a^{\prime}\right) \\ &\approx r + \gamma \max _{a^{\prime}} q_{*}\left(s^{\prime}, a^{\prime}\right) \end{aligned} \]

Q-Learning 被称为 off-policy 的原因是它并没有完成一次policy iteration,而是直接用已有的 Q 来不断近似 \(Q_{*}\)

对比下面的Q-Learning 伪代码和之前的 SARSA 版本可以发现,Q-Learning少了一次模拟后的 \(A_{t+1}\),这也是Q-Learning 中执行policy和预估Q值(即off-policy)分离的一个特征。

\[ \begin{align*} &\textbf{Q-learning (off-policy TD Control) for estimating } \pi \approx \pi_{*} \\ & \text{Algorithm parameters: step size }\alpha \in ({0,1}]\text{, small }\epsilon > 0 \\ & \text{Initialize }Q(s,a), \text{for all } s \in \mathcal{S}^{+}, a \in \mathcal{A}(s) \text{, arbitrarily except that } Q(terminal, \cdot) = 0 \\ & \text{Loop for each episode:}\\ & \quad \text{Initialize }S\\ & \quad \text{Loop for each step of episode:} \\ & \quad \quad \text{Choose } A \text{ from } S \text{ using policy derived from } Q \text{ (e.g., } \epsilon\text{-greedy)} \\ & \quad \quad \text{Take action }A, \text { observe } R, S^{\prime} \\ & \quad \quad Q(S,A) \leftarrow Q(S,A) + \alpha[R+\gamma \max_{a}Q(S^{\prime}, a) - Q(S,A)] \\ & \quad \quad S \leftarrow S^{\prime}\\ & \quad \text{until }S\text{ is terminal} \\ \end{align*} \]

Q-Learning 训练分析

Q-Learning 1000次episode就可以持久稳定住。

SARSA 改进版 Expected SARSA

Expected SARSA 改进了 SARSA 的地方在于考虑到了在某一状态下的现有策略动作分布,以此来减少variance,加快收敛,具体更新规则为:

\[ \begin{aligned} Q(S_t,A_t) &\leftarrow Q(S_t,A_t) + \alpha[R_{t+1}+\gamma \mathbb{E}_{\pi}[Q(S_{t+1}, A_{t+1} \mid S_{t+1})] - Q(S_t,A_t)] \\ &\leftarrow Q(S_t,A_t) + \alpha[R_{t+1}+\gamma \sum_{a} \pi\left(a\mid S_{t+1}\right) Q(S_{t+1}, a) - Q(S_t,A_t)] \\ \end{aligned} \]

注意在实现中,update_q() 不仅更新了Q table,还显示更新了执行policy \(\pi\)

{linenos
1
2
3
4
5
6
7
8
9
class ExpectedSarsaAgent(CartPoleAbstractAgent):

def update_q(self, s: State, a: Action, r, s_next: State, a_next: Action):
self.q[s][a] = self.q[s][a] + self.lr * (r + self.discount * np.dot(self.pi[s_next], self.q[s_next]) - self.q[s][a])
# update pi[s]
best_a = np.random.choice(np.where(self.q[s] == max(self.q[s]))[0])
n_actions = self.env.action_space.n
self.pi[s][:] = self.epsilon / n_actions
self.pi[s][best_a] = 1 - (n_actions - 1) * (self.epsilon / n_actions)

同样的,Expected SARSA 1000次迭代也能比较好的学到最佳policy。

快速幂运算是一种利用位运算和DP思想求的\(x^n\)的数值算法,它将时间复杂度\(O(n)\)降到\(O(log(n))\)。快速幂运算结合矩阵乘法,可以巧解不少DP问题。本篇会由浅入深,从最基本的快速幂运算算法,到应用矩阵快速幂运算解DP问题,结合三道Leetcode题目来具体讲解。

Leetcode 50. Pow(x, n) (Medium)

Leetcode 50. Pow(x, n) 是实数的快速幂运算问题,题目如下。

Implement pow(x, n), which calculates x raised to the power n (i.e. \(x^n\)).

Example 1:

1
2
Input: x = 2.00000, n = 10
Output: 1024.00000

Example 2:

1
2
Input: x = 2.10000, n = 3
Output: 9.26100

Example 3:

1
2
3
Input: x = 2.00000, n = -2
Output: 0.25000
Explanation: 2-2 = 1/22 = 1/4 = 0.25

快速幂运算解法分析

假设n是32位的int类型,将n写成二进制形式,那么n可以写成最多32个某位为 1(第k位为1则值为\(2^k\))的和。那么\(x^n\)最多可以由32个 \(x^{2^k}\)的乘积组合,例如:

\[ x^{\text{10011101}_{2}} = x^{1} \times x^{\text{100}_{2}} \times x^{\text{1000}_{2}} \times x^{\text{10000}_{2}} \times x^{\text{10000000}_{2}} \]

快速幂运算的特点就是通过32次循环,每次循环根据上轮\(x^{2^k}\)的值进行平方后得出这一轮的值:\(x^{2^k} \times x^{2^k} = x^{2^{k+1}}\),即循环计算出如下数列

\[ x^{1}, x^2=x^{\text{10}_{2}}, x^4=x^{\text{100}_{2}}, x^8=x^{\text{1000}_{2}}, x^{16}=x^{\text{10000}_{2}}, ..., x^{128} = x^{\text{10000000}_{2}} \]

在循环时,如果n的二进制形式在本轮对应的位的值是1,则将这次结果累乘计入最终结果。

下面是python 3 的代码,由于循环为32次,所以容易看出算法复杂度为 \(O(log(n))\)

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# AC
# Runtime: 32 ms, faster than 54.28% of Python3 online submissions for Pow(x, n).
# Memory Usage: 14.2 MB, less than 5.04% of Python3 online submissions for Pow(x, n).

class Solution:
def myPow(self, x: float, n: int) -> float:
ret = 1.0
i = abs(n)
while i != 0:
if i & 1:
ret *= x
x *= x
i = i >> 1
return 1.0 / ret if n < 0 else ret

对应的 Java 的代码。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// AC
// Runtime: 1 ms, faster than 42.98% of Java online submissions for Pow(x, n).
// Memory Usage: 38.7 MB, less than 48.31% of Java online submissions for Pow(x, n).

class Solution {
public double myPow(double x, int n) {
double ret = 1.0;
long i = Math.abs((long) n);
while (i != 0) {
if ((i & 1) > 0) {
ret *= x;
}
x *= x;
i = i >> 1;
}

return n < 0 ? 1.0 / ret : ret;
}
}

矩阵快速幂运算

快速幂运算也可以应用到计算矩阵的幂,即上面的x从实数变为方形矩阵。实现上,矩阵的幂需要矩阵乘法:$ A_{r c} B_{c p}$ ,Python中可以用numpy的 np.matmul(A, B)来完成,而Java版本中我们手动实现简单的矩阵相乘算法,从三重循环看出其算法复杂度为\(O(r \times c \times p)\)

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public int[][] matrixProd(int[][] A, int[][] B) {
int R = A.length;
int C = B[0].length;
int P = A[0].length;
int[][] ret = new int[R][C];
for (int r = 0; r < R; r++) {
for (int c = 0; c < C; c++) {
for (int p = 0; p < P; p++) {
ret[r][c] += A[r][p] * B[p][c];
}
}
}
return ret;
}

Leetcode 509. Fibonacci Number (Easy)

有了快速矩阵幂运算,我们来看看如何具体解题。Fibonacci问题作为最基本的DP问题,在上一篇Leetcode 679 24 Game 的 Python 函数式实现中我们用python独有的yield来巧解,这次再拿它来做演示。

The Fibonacci numbers, commonly denoted F(n) form a sequence, called the Fibonacci sequence, such that each number is the sum of the two preceding ones, starting from 0 and 1. That is,

1
2
F(0) = 0,   F(1) = 1
F(N) = F(N - 1) + F(N - 2), for N > 1.

Given N, calculate F(N).

Example 1:

1
2
3
Input: 2
Output: 1
Explanation: F(2) = F(1) + F(0) = 1 + 0 = 1.

Example 2:

1
2
3
Input: 3
Output: 2
Explanation: F(3) = F(2) + F(1) = 1 + 1 = 2.

Example 3:

1
2
3
Input: 4
Output: 3
Explanation: F(4) = F(3) + F(2) = 2 + 1 = 3.

转换为矩阵幂运算

Fibonacci的二阶递推式如下:

\[ \begin{align*} F(n) =& F(n-1) + F(n-2) \\ F(n-1) =& F(n-1) \end{align*} \]

等价的矩阵递推形式为:

\[ \begin{bmatrix}F(n)\\F(n-1)\end{bmatrix} = \begin{bmatrix}1 & 1\\1 & 0\end{bmatrix} \begin{bmatrix}F(n-1)\\F(n-2)\end{bmatrix} \]

也就是每轮左乘一个2维矩阵。其循环形式为,即矩阵幂的形式:

\[ \begin{bmatrix}F(n)\\F(n-1)\end{bmatrix} = \begin{bmatrix}1 & 1\\1 & 0\end{bmatrix}^{n-1} \begin{bmatrix}F(1)\\F(0)\end{bmatrix} \]

AC代码

有了上面的矩阵幂公式,代码稍作改动即可。Java 版本代码。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* AC
* Runtime: 0 ms, faster than 100.00% of Java online submissions for Fibonacci Number.
* Memory Usage: 37.9 MB, less than 18.62% of Java online submissions for Fibonacci Number.
*
* Method: Matrix Fast Power Exponentiation
* Time Complexity: O(log(N))
**/
class Solution {
public int fib(int N) {
if (N <= 1) {
return N;
}
int[][] M = {{1, 1}, {1, 0}};
// powers = M^(N-1)
N--;
int[][] powerDouble = M;
int[][] powers = {{1, 0}, {0, 1}};
while (N > 0) {
if (N % 2 == 1) {
powers = matrixProd(powers, powerDouble);
}
powerDouble = matrixProd(powerDouble, powerDouble);
N = N / 2;
}

return powers[0][0];
}

public int[][] matrixProd(int[][] A, int[][] B) {
int R = A.length;
int C = B[0].length;
int P = A[0].length;
int[][] ret = new int[R][C];
for (int r = 0; r < R; r++) {
for (int c = 0; c < C; c++) {
for (int p = 0; p < P; p++) {
ret[r][c] += A[r][p] * B[p][c];
}
}
}
return ret;
}

}

Python 3的numpy.matmul() 版本代码。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# AC
# Runtime: 256 ms, faster than 26.21% of Python3 online submissions for Fibonacci Number.
# Memory Usage: 29.4 MB, less than 5.25% of Python3 online submissions for Fibonacci Number.

class Solution:

def fib(self, N: int) -> int:
if N <= 1:
return N

import numpy as np
F = np.array([[1, 1], [1, 0]])

N -= 1
powerDouble = F
powers = np.array([[1, 0], [0, 1]])
while N > 0:
if N % 2 == 1:
powers = np.matmul(powers, powerDouble)
powerDouble = np.matmul(powerDouble, powerDouble)
N = N // 2

return powers[0][0]

或者也可以直接调用numpy.matrix_power() 代替手动的快速矩阵幂运算。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# AC
# Runtime: 116 ms, faster than 26.25% of Python3 online submissions for Fibonacci Number.
# Memory Usage: 29.2 MB, less than 5.25% of Python3 online submissions for Fibonacci Number.

class Solution:

def fib(self, N: int) -> int:
if N <= 1:
return N

from numpy.linalg import matrix_power
import numpy as np
F = np.array([[1, 1], [1, 0]])
F = matrix_power(F, N - 1)

return F[0][0]

Leetcode 1411. Number of Ways to Paint N × 3 Grid (Hard)

下面来看一道稍难一点的DP问题,1411. Number of Ways to Paint N × 3 Grid

You have a grid of size n x 3 and you want to paint each cell of the grid with exactly one of the three colours: Red, Yellow or Green while making sure that no two adjacent cells have the same colour (i.e no two cells that share vertical or horizontal sides have the same colour).

You are given n the number of rows of the grid.

Return the number of ways you can paint this grid. As the answer may grow large, the answer must be computed modulo 10^9 + 7.

Example 1:

1
2
3
Input: n = 1
Output: 12
Explanation: There are 12 possible way to paint the grid as shown:

Example 2:

1
2
Input: n = 2
Output: 54

Example 3:

1
2
Input: n = 3
Output: 246

Example 4:

1
2
Input: n = 7
Output: 106494

Example 5:

1
2
Input: n = 5000
Output: 30228214

标准DP解法

分析题目容易发现第i行的状态只取决于第i-1行的状态,第i行会有两种不同状态:三种颜色都有或者只有两种颜色。这个问题容易识别出是经典的双状态DP问题,那么我们定义dp2[i]为第i行只有两种颜色的数量,dp3[i]为第i行有三种颜色的数量。

先考虑dp3[i]和i-1行的关系。假设第i行包含3种颜色,即dp3[i],假设具体颜色为红,绿,黄,若i-1行包含两种颜色(即dp2[i-1]),此时dp2[i-1]只有以下2种可能:

dp2[i-1] -> dp3[i]
还是dp3[i] 红,绿,黄情况,若i-1行包含三种颜色(从dp3[i-1]转移过来),此时dp3[i-1]也只有以下2种可能:
dp3[i-1] -> dp3[i]

因此,dp3[i]= dp2[i-1] * 2 + dp3[i-1] * 2。

同理,若第i行包含两种颜色,即dp2[i],假设具体颜色为绿,黄,绿,若i-1行是两种颜色(dp2[i-1]),此时dp2[i-1]有如下3种可能:

dp2[i-1] -> dp2[i]
dp2[i]的另一种情况是由dp3[i-1]转移过来,则dp3[i-1]有2种可能,枚举如下:
dp3[i-1] -> dp2[i]

因此,dp2[i] = dp2[i-1] * 3 + dp3[i-1] * 2。 初始值dp2[1] = 6,dp3[1] = 6,最终答案为dp2[i] + dp3[i]。

很容易写出普通DP版本的Python 3代码,时间复杂度为\(O(n)\)

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
# AC
# Runtime: 36 ms, faster than 98.88% of Python3 online submissions for Number of Ways to Paint N × 3 Grid.
# Memory Usage: 13.9 MB, less than 58.66% of Python3 online submissions for Number of Ways to Paint N × 3 Grid.

class Solution:
def numOfWays(self, n: int) -> int:
MOD = 10 ** 9 + 7
dp2, dp3 = 6, 6
n -= 1
while n > 0:
dp2, dp3 = (dp2 * 3 + dp3 * 2) % MOD, (dp2 * 2 + dp3 * 2) % MOD
n -= 1
return (dp2 + dp3) % MOD

快速矩阵幂运算解法

和Fibonacci一样,我们将DP状态转移方程转换成矩阵乘法:

\[ \begin{bmatrix}dp2(n)\\dp3(n)\end{bmatrix} = \begin{bmatrix}3 & 2\\2 & 2\end{bmatrix} \begin{bmatrix}dp2(n-1)\\dp3(n-1)\end{bmatrix} \]

代入初始值,转换成矩阵幂形式

\[ \begin{bmatrix}dp2(n)\\dp3(n)\end{bmatrix} = \begin{bmatrix}3 & 2\\2 & 2\end{bmatrix}^{n-1}\begin{bmatrix}6\\6\end{bmatrix} \]

代码几乎和Fibonacci一模一样,仅仅多了mod 计算。下面是Java版本。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

/**
AC
Runtime: 0 ms, faster than 100.00% of Java online submissions for Number of Ways to Paint N × 3 Grid.
Memory Usage: 35.7 MB, less than 97.21% of Java online submissions for Number of Ways to Paint N × 3 Grid.
**/

class Solution {
public int numOfWays(int n) {
long MOD = (long) (1e9 + 7);
long[][] ret = {{6, 6}};
long[][] m = {{3, 2}, {2, 2}};
n -= 1;
while(n > 0) {
if ((n & 1) > 0) {
ret = matrixProd(ret, m, MOD);
}
m = matrixProd(m, m, MOD);
n >>= 1;
}
return (int) ((ret[0][0] + ret[0][1]) % MOD);

}

public long[][] matrixProd(long[][] A, long[][] B, long MOD) {
int R = A.length;
int C = B[0].length;
int P = A[0].length;
long[][] ret = new long[R][C];
for (int r = 0; r < R; r++) {
for (int c = 0; c < C; c++) {
for (int p = 0; p < P; p++) {
ret[r][c] += A[r][p] * B[p][c];
ret[r][c] = ret[r][c] % MOD;
}
}
}
return ret;
}

}

Python 3实现为

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# AC
# Runtime: 88 ms, faster than 39.07% of Python3 online submissions for Number of Ways to Paint N × 3 Grid.
# Memory Usage: 30.2 MB, less than 11.59% of Python3 online submissions for Number of Ways to Paint N × 3 Grid.

class Solution:
def numOfWays(self, n: int) -> int:
import numpy as np

MOD = int(1e9 + 7)
ret = np.array([[6, 6]])
m = np.array([[3, 2], [2, 2]])

n -= 1
while n > 0:
if n % 2 == 1:
ret = np.matmul(ret, m) % MOD
m = np.matmul(m, m) % MOD
n = n // 2
return int((ret[0][0] + ret[0][1]) % MOD)

这期继续Sutton强化学习第二版,第五章蒙特卡洛方法。在上期21点游戏的策略蒙特卡洛值预测学习了如何用Monte Carlo来预估给定策略 \(\pi\)\(V_{\pi}\) 值之后,这一期我们用Monte Carlo方法来解得21点游戏最佳策略 \(\pi_{*}\)

蒙特卡洛策略提升

回顾一下,在Grid World 策略迭代和值迭代中由于存在Policy Improvement Theorem,我们可以利用环境dynamics信息计算出策略v值,再选取最greedy action的方式改进策略,形成策略提示最终能够不断逼近最佳策略。 \[ \pi_{0} \stackrel{\mathrm{E}}{\longrightarrow} v_{\pi_{0}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{1} \stackrel{\mathrm{E}}{\longrightarrow} v_{\pi_{1}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{2} \stackrel{\mathrm{E}}{\longrightarrow} \cdots \stackrel{\mathrm{I}}{\longrightarrow} \pi_{*} \stackrel{\mathrm{E}}{\longrightarrow} v_{*} \] Monte Carlo Control方法搜寻最佳策略 \(\pi{*}\),是否也能沿用同样的思路呢?答案是可行的。不过,不同于第四章中我们已知环境MDP就知道状态的前后依赖关系,进而从v值中能推断出策略 \(\pi\),在Monte Carlo方法中,环境MDP是未知的,因而我们只能从action-value中下手,通过海量Monte Carlo试验来近似 \(q_{\pi}\)。有了策略 Q 值,再和MDP策略迭代方法一样,选取最greedy action的策略,这种策略提示方式理论上被证明了最终能够不断逼近最佳策略。 \[ \pi_{0} \stackrel{\mathrm{E}}{\longrightarrow} q_{\pi_{0}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{1} \stackrel{\mathrm{E}}{\longrightarrow} q_{\pi_{1}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{2} \stackrel{\mathrm{E}}{\longrightarrow} \cdots \stackrel{\mathrm{I}}{\longrightarrow} \pi_{*} \stackrel{\mathrm{E}}{\longrightarrow} q_{*} \]

但是此方法有一个前提要满足,由于数据是依据策略 \(\pi_{i}\) 生成的,理论上需要保证在无限次的模拟过程中,每个状态都必须被无限次访问到,才能保证最终每个状态的Q估值收敛到真实的 \(q_{*}\)。满足这个前提的一个简单实现是强制随机环境初始状态,保证每个状态都能有一定概率被生成。这个思路就是 Monte Carlo Control with Exploring Starts算法,伪代码如下:

\[ \begin{align*} &\textbf{Monte Carlo ES (Exploring Starts), for estimating } \pi \approx \pi_{*} \\ & \text{Initialize:} \\ & \quad \pi(s) \in \mathcal A(s) \text{ arbitrarily for all }s \in \mathcal{S} \\ & \quad Q(s, a) \in \mathbb R \text{, arbitrarily, for all }s \in \mathcal{S}, a \in \mathcal A(s) \\ & \quad Returns(s, a) \leftarrow \text{ an empty list, for all }s \in \mathcal{S}, a \in \mathcal A(s)\\ & \\ & \text{Loop forever (for episode):}\\ & \quad \text{Choose } S_0\in \mathcal{S}, A_0 \in \mathcal A(S_0) \text{ randomly such that all pairs have probability > 0} \\ & \quad \text{Generate an episode from } S_0, A_0 \text{, following } \pi : S_0, A_0, R_1, S_1, A_1, R_2, ..., S_{T-1}, A_{T-1}, R_T\\ & \quad G \leftarrow 0\\ & \quad \text{Loop for each step of episode, } t = T-1, T-2, ..., 0:\\ & \quad \quad \quad G \leftarrow \gamma G + R_{t+1}\\ & \quad \quad \quad \text{Unless the pair } S_t, A_t \text{ appears in } S_0, A_0, S_1, A_1, ..., S_{t-1}, A_{t-1}\\ & \quad \quad \quad \quad \text{Append } G \text { to }Returns(S_t, A_t) \\ & \quad \quad \quad \quad Q(S_t, A_t) \leftarrow \operatorname{average}(Returns(S_t, A_t))\\ & \quad \quad \quad \quad \pi(S_t) \leftarrow \operatorname{argmax}_a Q(S_t, a)\\ \end{align*} \]

下面我们实现21点游戏的Monte Carlo ES 算法。21点游戏只有200个有效的状态,可以满足算法要求的生成episode前先随机选择某一状态的前提条件。

相对于上一篇,我们增加 ActionValue和Policy的类型定义,ActionValue表示 \(q(s, a)\) ,是一个State到动作分布的Dict,Policy 类型也一样。Actions为一维ndarray,维数是离散动作数量。

{linenos
1
2
3
4
5
6
State = Tuple[int, int, bool]
Action = bool
Reward = float
Actions = np.ndarray
ActionValue = Dict[State, Actions]
Policy = Dict[State, Actions]

下面代码示例如何给定 Policy后,依据指定状态state的动作分布采样,决定下一动作。

1
2
3
policy: Policy
A: ActionValue = policy[state]
action = np.random.choice([0, 1], p=A/sum(A))

整个算法的 python 代码实现如下:

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def mc_control_exploring_starts(env: BlackjackEnv, num_episodes, discount_factor=1.0) \
-> Tuple[ActionValue, Policy]:
states = list(product(range(10, 22), range(1, 11), (True, False)))
policy = {s: np.ones(env.action_space.n) * 1.0 / env.action_space.n for s in states}
Q = defaultdict(lambda: np.zeros(env.action_space.n))
returns_sum = defaultdict(float)
returns_count = defaultdict(float)

for episode_i in range(1, num_episodes + 1):
s0 = random.choice(states)
reset_env_with_s0(env, s0)
episode_history = gen_custom_s0_stochastic_episode(policy, env, s0)

G = 0
for t in range(len(episode_history) - 1, -1, -1):
s, a, r = episode_history[t]
G = discount_factor * G + r
if not any(s_a_r[0] == s and s_a_r[1] == a for s_a_r in episode_history[0: t]):
returns_sum[s, a] += G
returns_count[s, a] += 1.0
Q[s][a] = returns_sum[s, a] / returns_count[s, a]
best_a = np.argmax(Q[s])
policy[s][best_a] = 1.0
policy[s][1-best_a] = 0.0

return Q, policy

在MC Exploring Starts 算法中,我们需要指定环境初始状态,一种做法是env.reset()时接受初始状态,但是考虑到不去修改第三方实现的 BlackjackEnv类,采用一个取巧的办法,在调用reset()后直接改写env 的私有变量,这个逻辑封装在 reset_env_with_s0 方法中。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def reset_env_with_s0(env: BlackjackEnv, s0: State) -> BlackjackEnv:
env.reset()
player_sum = s0[0]
oppo_sum = s0[1]
has_usable = s0[2]

env.dealer[0] = oppo_sum
if has_usable:
env.player[0] = 1
env.player[1] = player_sum - 11
else:
if player_sum > 11:
env.player[0] = 10
env.player[1] = player_sum - 10
else:
env.player[0] = 2
env.player[1] = player_sum - 2
return env

算法结果的可视化和理论对比

下图是有Usable Ace情况下的理论最优策略。
理论最佳策略(有Ace)
Monte Carlo方法策略提示的收敛是比较慢的,下图是运行10,000,000次episode后有Usable Ace时的策略 \(\pi_{*}^{\prime}\)。对比理论最优策略,MC ES在不少的状态下还未收敛到理论最优解。
MC ES 10M的最佳策略(有Ace)
同样的,下两张图是无Usable Ace情况下的理论最优策略和试验结果的对比。
理论最佳策略(无Ace)
MC ES 10M的最佳策略(无Ace)
下面的两张图画出了运行代码10,000,000次episode后 \(\pi{*}\)的V值图。
MC ES 10M的最佳V值(有Ace)
MC ES 10M的最佳V值(无Ace)

Exploring Starts 蒙特卡洛控制改进

为了避免Monte Carlo ES Control在初始时必须访问到任意状态的限制,教材中介绍了一种改进算法,On-policy first-visit MC control for \(\epsilon \text{-soft policies}\) ,它同样基于Monte Carlo 预估Q值,但用 \(\epsilon \text{-soft}\) 策略来代替最有可能的action策略作为下一次迭代策略,\(\epsilon \text{-soft}\) 本质上来说就是对于任意动作都保留 \(\epsilon\) 小概率的访问可能,权衡了exploration和exploitation,由于每个动作都可能被无限次访问到,Explorting Starts中的强制随机初始状态就可以去除了。Monte Carlo ES Control 和 On-policy first-visit MC control for \(\epsilon \text{-soft policies}\) 都属于on-policy算法,其区别于off-policy的本质在于预估 \(q_{\pi}(s,a)\)时是否从同策略\(\pi\)生成的数据来计算。一个比较subtle的例子是著名的Q-Learning,因为根据这个定义,Q-Learning属于off-policy。

\[ \begin{align*} &\textbf{On-policy first-visit MC control (for }\epsilon \textbf{-soft policies), estimating } \pi \approx \pi_{*} \\ & \text{Algorithm parameter: small } \epsilon > 0 \\ & \text{Initialize:} \\ & \quad \pi \leftarrow \text{ an arbitrary } \epsilon \text{-soft policy} \\ & \quad Q(s, a) \in \mathbb R \text{, arbitrarily, for all }s \in \mathcal{S}, a \in \mathcal A(s) \\ & \quad Returns(s, a) \leftarrow \text{ an empty list, for all }s \in \mathcal{S}, a \in \mathcal A(s)\\ & \\ & \text{Repeat forever (for episode):}\\ & \quad \text{Generate an episode following } \pi : S_0, A_0, R_1, S_1, A_1, R_2, ..., S_{T-1}, A_{T-1}, R_T\\ & \quad G \leftarrow 0\\ & \quad \text{Loop for each step of episode, } t = T-1, T-2, ..., 0:\\ & \quad \quad \quad G \leftarrow \gamma G + R_{t+1}\\ & \quad \quad \quad \text{Unless the pair } S_t, A_t \text{ appears in } S_0, A_0, S_1, A_1, ..., S_{t-1}, A_{t-1}\\ & \quad \quad \quad \quad \text{Append } G \text { to }Returns(S_t, A_t) \\ & \quad \quad \quad \quad Q(S_t, A_t) \leftarrow \operatorname{average}(Returns(S_t, A_t))\\ & \quad \quad \quad \quad A^{*} \leftarrow \operatorname{argmax}_a Q(S_t, a)\\ & \quad \quad \quad \quad \text{For all } a \in \mathcal A(S_t):\\ & \quad \quad \quad \quad \quad \pi(a|S_t) \leftarrow \begin{cases} 1 - \epsilon + \epsilon / |\mathcal A(S_t)| & \text{ if } a = A^{*}\\ \epsilon / |\mathcal A(S_t)| & \text{ if } a \neq A^{*}\\ \end{cases} \\ \end{align*} \]

伪代码对应的 Python 实现如下。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def mc_control_epsilon_greedy(env: BlackjackEnv, num_episodes, discount_factor=1.0, epsilon=0.1) \
-> Tuple[ActionValue, Policy]:
returns_sum = defaultdict(float)
returns_count = defaultdict(float)

states = list(product(range(10, 22), range(1, 11), (True, False)))
policy = {s: np.ones(env.action_space.n) * 1.0 / env.action_space.n for s in states}
Q = defaultdict(lambda: np.zeros(env.action_space.n))

def update_epsilon_greedy_policy(policy: Policy, Q: ActionValue, s: State):
policy[s] = np.ones(env.action_space.n, dtype=float) * epsilon / env.action_space.n
best_action = np.argmax(Q[s])
policy[s][best_action] += (1.0 - epsilon)

for episode_i in range(1, num_episodes + 1):
episode_history = gen_stochastic_episode(policy, env)

G = 0
for t in range(len(episode_history) - 1, -1, -1):
s, a, r = episode_history[t]
G = discount_factor * G + r
if not any(s_a_r[0] == s and s_a_r[1] == a for s_a_r in episode_history[0: t]):
returns_sum[s, a] += G
returns_count[s, a] += 1.0
Q[s][a] = returns_sum[s, a] / returns_count[s, a]
update_epsilon_greedy_policy(policy, Q, s)

return Q, policy

从这期开始我们进入Sutton强化学习第二版,第五章蒙特卡洛方法。蒙特卡洛方法是一种在工程各领域都存在的基本方法,在强化领域中,其特点是无需知道环境的dynamics,只需不断模拟记录并分析数据即可逼近理论真实值。蒙特卡洛方法本篇将会用21点游戏作为示例来具体讲解其原理和代码实现。

21点游戏问题

21点游戏是一个经典的赌博游戏。大致规则是玩家和庄家各发两张牌,一张明牌,一张暗牌。玩家和庄家可以决定加牌或停止加牌,新加的牌均为暗牌,最后比较两个玩家的牌面和,更接近21点的获胜。游戏的变化因素是牌Ace,既可以作为11也可以作为1来计算,算作11的时候称作usable。

Sutton教材中的21点游戏规则简化了几个方面用于控制问题状态数:

  • 已发的牌的无状态性:和一副牌的21点游戏不同的是,游戏环境简化为牌是可以无穷尽被补充的,一副牌的某一张被派发后,同样的牌会被补充进来,或者可以认为每次发放的牌都是从一副新牌中抽出的。统计学中的术语称为重复采样(sample with replacement)。这种规则下极端情况下,玩家可以拥有 5个A或者5个2。另外,这会导致玩家无法通过开局看到的3张牌的信息推断后续发牌的概率,如此就大规模减小了游戏状态数。
  • 庄家和玩家独立游戏,无需按轮次要牌。开局给定4张牌后,玩家先行动,加牌直至超21点或者停止要牌,如果超21点,玩家输,否则,等待庄家行动,庄家加牌直至超21点或者停止要牌,如果超21点,庄家输,否则比较两者的总点数。这种方式可以认为当玩家和庄家看到初始的三张牌后独立做一系列决策,最后比较结果,避免了交互模式下因为能观察到每一轮后对方牌数变化产生额外的信息而导致的游戏状态数爆炸。

有以上两个规则的简化,21点游戏问题的总状态数有下面三个维度

  • 自己手中的点数和(12到21)

  • 庄家明牌的点数(A到10)

  • 庄家明牌是否有 A(True, False)。

状态总计总数为三个维度的乘积 10 * 10 * 2 = 200。

关于游戏状态有几个比较subtle的假设或者要素。首先,玩家初始时能看到三张牌,这三张牌确定了状态的三个维度的值,当然也就确定了Agent的初始状态,随后按照独立游戏的规则进行,玩家根据初始状态依照某种策略决策要牌还是结束要牌,新拿的牌更新了游戏状态,玩家转移到新状态下继续做决策。举个例子,假设初始时玩家明牌为8,暗牌为6,庄家明牌为7,则游戏状态为Tuple (14, 7, False)。若玩家的策略为教材中的固定规则策略:没到20或者21继续要牌。下一步玩家拿到牌3,则此时新状态为 (17, 7, False),按照策略继续要牌。

第二个方面是游戏的状态完全等价于玩家观察到的信息。比如尽管初始时有4张牌,真正的状态是这四张牌的值,但是出于简化目的,不考虑partially observable 的情况,即不将暗牌纳入游戏状态中。另外,庄家做决策的时候也无法得知玩家的手中的总牌数。

第三个方面是关于玩家点数。考虑玩家初始时的两张牌为2,3,总点数是5,那么为何不将5加入到游戏状态中呢?原则上是可以将初始总和为2到11都加入到游戏状态,但是意义不大,原因在于我们已经假设了已发牌的无状态性,拿到的这两张牌并不会改变后续补充的牌的出现概率。当玩家初始总和为2到11时一定会追加牌,因为无论第三张牌是什么,都不会超过21点,只会增加获胜概率。若后续第三张牌为8,总和变成13,就进入了有效的游戏状态,因为此时如果继续要牌,获得10,则游戏输掉。因此,我们关心的游戏状态并不完全等价于所有可能的游戏状态。

21点游戏 OpenAI Gym环境

OpenAI Gym 已经实现了Sutton版本的21点游戏环境,并按上述规则来进行。在安装完OpenAI Gym包之后 import BlackjackEnv即可使用。

1
from gym.envs.toy_text import BlackjackEnv

根据这个游戏环境,我们先来定义一些类型,可以令代码更具可读性和抽象化。State 上文说过是由三个分量组成的Tuple。Action 为bool类型 表示是否继续要牌。Reward 为+1或者-1,玩家叫牌过程中为0。StateValue 为书中的 \(V_{\pi}\),实现上是一个Dict。DeterministicPolicy 为一个函数,输入是某一状态,输出是唯一的决策动作。

{linenos
1
2
3
4
5
State = Tuple[int, int, bool]
Action = bool
Reward = float
StateValue = Dict[State, float]
DeterministicPolicy = Callable[[State], Action]

以下代码是 BlackjackEnv 核心代码,step 方法的输入为玩家的决策动作(叫牌还是结束),并输出State, Reward, is_done。简单解释一下代码逻辑,当玩家继续加牌时,需要判断是否超21点,如果没有超过的话,返回下一状态,同时reward 为0,等待下一step方法。若玩家停止叫牌,则按照庄家策略:小于17时叫牌。游戏终局时产生+1表示玩家获胜,-1表示庄家获胜。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class BlackjackEnv(gym.Env):

def step(self, action):
assert self.action_space.contains(action)
if action: # hit: add a card to players hand and return
self.player.append(draw_card(self.np_random))
if is_bust(self.player):
done = True
reward = -1.
else:
done = False
reward = 0.
else: # stick: play out the dealers hand, and score
done = True
while sum_hand(self.dealer) < 17:
self.dealer.append(draw_card(self.np_random))
reward = cmp(score(self.player), score(self.dealer))
if self.natural and is_natural(self.player) and reward == 1.:
reward = 1.5
return self._get_obs(), reward, done, {}

def _get_obs(self):
return (sum_hand(self.player), self.dealer[0], usable_ace(self.player))

下面示例如何调用step方法生成一个episode的数据集。数据集的类型为 List[Tuple[State, Action, Reward]]。

{linenos
1
2
3
4
5
6
7
8
9
10
def gen_episode_data(policy: DeterministicPolicy, env: BlackjackEnv) -> List[Tuple[State, Action, Reward]]:
episode_history = []
state = env.reset()
done = False
while not done:
action = policy(state)
next_state, reward, done, _ = env.step(action)
episode_history.append((state, action, reward))
state = next_state
return episode_history

策略的蒙特卡洛值预测

Monte Carlo Prediction解决如下问题:当给定Agent 策略\(\pi\)时,反复试验来预估策略的 \(V_{\pi}\) 值。具体来说,产生一系列的episode数据之后,对于出现了的所有状态分别计算其Return,再通过 average 某一状态 s 的Return来估计 \(V_{\pi}(s)\),理论上,依据大数定理(Law of large numbers),在可以无限模拟的情况下,Monte Carlo prediction 一定会收敛到真实的 \(V_{\pi}\)。算法实现上有两个略微不同的版本,一个版本称为 First-visit,另一个版本称为 Every-visit,区别在于如何计算出现的状态 s 的 Return值。

对于 First-visit 来说,当状态 s 第一次出现时计算一次 Returns,若继续出现状态 s 不再重复计算。对于Every-visit来说,每次出现 s 计算一次 Returns(s)。举个例子,某episode 数据如下: \[ S_1, R_1, S_2, R_2, S_1, R_3, S_3, R_4 \] First-visit 对于状态S1的Returns计算为

\[ Returns(S_1) = R_1 + R_2 + R_3 + R_4 \]

Every-visit 对于状态S1的Returns计算了两次,因为S1出现了两次。 \[ \begin{align*} Returns(S_1) = \frac{Return_1(S_1) + Return_2(S_1)}2 \\ = \frac{(R_1 + R_2 + R_3 + R_4) + (R_3 + R_4)} 2 \end{align*} \]

下面用Monte Carlo来模拟解得书中示例玩家固定策略的V值,策略具体为:加牌直到手中点数>=20,代码为

{linenos
1
2
3
4
5
6
def fixed_policy(observation):
"""
sticks if the player score is >= 20 and hits otherwise.
"""
score, dealer_score, usable_ace = observation
return 0 if score >= 20 else 1

First-visit MC Predicition

伪代码如下,注意考虑到实现上的高效性,在遍历episode序列数据时是从后向前扫的,这样可以边扫边更新G。

\[ \begin{align*} &\textbf{First-visit MC prediction, for estimating } V \approx v_{\pi} \\ & \text{Input: a policy } \pi \text{ to be evaluated} \\ & \text{Initialize} \\ & \quad V(s) \in \mathbb R \text{, arbitrarily, for all }s \in \mathcal{S} \\ & \quad Returns(s) \leftarrow \text{ an empty list, arbitrarily, for all }s \in \mathcal{S} \\ & \\ & \text{Loop forever (for episode):}\\ & \quad \text{Generate an episode following } \pi: S_0, A_0, R_1, S_1, A_1, R_2, ..., S_{T-1}, A_{T-1}, R_T\\ & \quad G \leftarrow 0\\ & \quad \text{Loop for each step of episode, } t = T-1, T-2, ..., 0:\\ & \quad \quad \quad G \leftarrow \gamma G + R_{t+1}\\ & \quad \quad \quad \text{Unless } S_t \text{ appears in } S_0, S_1, ..., S_{t-1}\\ & \quad \quad \quad \quad \text{Append } G \text { to }Returns(S_t) \\ & \quad \quad \quad \quad V(S_t) \leftarrow \operatorname{average}(Returns(S_t))\\ \end{align*} \]

对应的 python 实现

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def mc_prediction_first_visit(policy: DeterministicPolicy, env: BlackjackEnv,
num_episodes, discount_factor=1.0) -> StateValue:
returns_sum = defaultdict(float)
returns_count = defaultdict(float)

for episode_i in range(1, num_episodes + 1):
episode_history = gen_episode_data(policy, env)

G = 0
for t in range(len(episode_history) - 1, -1, -1):
s, a, r = episode_history[t]
G = discount_factor * G + r
if not any(s_a_r[0] == s for s_a_r in episode_history[0: t]):
returns_sum[s] += G
returns_count[s] += 1.0

V = defaultdict(float)
V.update({s: returns_sum[s] / returns_count[s] for s in returns_sum.keys()})
return V

Every-visit MC Prediciton

Every-visit 代码实现相对更简单一些,t 从后往前遍历时更新对应s的状态变量。如下所示

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def mc_prediction_every_visit(policy: DeterministicPolicy, env: BlackjackEnv,
num_episodes, discount_factor=1.0) -> StateValue:
returns_sum = defaultdict(float)
returns_count = defaultdict(float)

for episode_i in range(1, num_episodes + 1):
episode_history = gen_episode_data(policy, env)

G = 0
for t in range(len(episode_history) - 1, -1, -1):
s, a, r = episode_history[t]
G = discount_factor * G + r
returns_sum[s] += G
returns_count[s] += 1.0

V = defaultdict(float)
V.update({s: returns_sum[s] / returns_count[s] for s in returns_sum.keys()})
return V

策略 V值 3D 可视化

运行first-visit 算法,模拟10000次episode,fixed_policy的V值的3D图为下面两张图,分别是不含usable Ace和包含usable Ace。总的说来,一旦玩家能到达20点或21点获胜概率极大,到达13-17获胜概率较小,在11-13时有一定获胜概率,比较符合经验直觉。

first-visit MC 10000次没有usable A的V值
first-visit MC 10000次含有usable A的V值

同样运行every-visit 算法,模拟10000次的V值图。对比两种方法结果比较接近。

every-visit MC 10000次没有usable A的V值
every-visit MC 10000次含有usable A的V值

本篇是TSP问题从DP算法到深度学习系列第二篇。

AIZU TSP 自底向上迭代DP解

上一篇中,我们用Python 3和Java 8完成了自顶向下递归版本的DP解。我们继续改进代码,将它转换成标准DP方式:自底向上的迭代DP版本。下图是3个点TSP问题的递归调用图。

将这个图反过来检查状态的依赖关系,可以很容易发现规律:首先计算状态位含有一个1的点,接着是两个1的节点,最后是状态位三个1的点。简而言之,在计算状态位为n+1个1的节点时需要用到n个1的节点的计算结果,如果能依照这样的 topological 顺序来的话,就可以去除递归,写成迭代(循环)版本的DP。

迭代算法的Java 伪代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for (int bitset_num = N; bitset_num >=0; bitset_num++) {
while(hasNextCombination(bitset_num)) {
int state = nextCombination(bitset_num);
// compute dp[state][v], v-th bit is set in state
for (int v = 0; v < n; v++) {
for (int u = 0; u < n; u++) {
// for each u not reached by this state
if (!include(state, u)) {
dp[state][v] = min(dp[state][v],
dp[new_state_include_u][u] + dist[v][u]);
}
}
}
}
}

举例来说,dp[00010][1] 是从顶点0出发,刚经过顶点1的最小距离 \(0 \rightarrow 1 \rightarrow ? \rightarrow ? \rightarrow ? \rightarrow 0\)

为了找到最小距离值,就必须遍历所有可能的下一个可能的顶点u (第一个问号位置)。 \[ (0 \rightarrow 1) + \begin{align*} \min \left\lbrace \begin{array}{r@{}l} 2 \rightarrow ? \rightarrow ? \rightarrow 0 + dist(1,2) \qquad\text{ new_state=[00110][2] } \qquad\\\\ 3 \rightarrow ? \rightarrow ? \rightarrow 0 + dist(1,3) \qquad\text{ new_state=[01010][3] } \qquad\\\\ 4 \rightarrow ? \rightarrow ? \rightarrow 0 + dist(1,4) \qquad\text{ new_state=[10010][4] } \qquad \end{array} \right. \end{align*} \]

迭代DP AC代码

以下是AC 的Java 算法核心代码。完整代码在 github/MyEncyclopedia 的tsp/alg_aizu/Main_loop.java

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public long solve() {
int N = g.V_NUM;
long[][] dp = new long[1 << N][N];
// init dp[][] with MAX
for (int i = 0; i < dp.length; i++) {
Arrays.fill(dp[i], Integer.MAX_VALUE);
}
dp[(1 << N) - 1][0] = 0;

for (int state = (1 << N) - 2; state >= 0; state--) {
for (int v = 0; v < N; v++) {
for (int u = 0; u < N; u++) {
if (((state >> u) & 1) == 0) {
dp[state][v] = Math.min(dp[state][v], dp[state | 1 << u][u] + g.edges[v][u]);
}
}
}
}
return dp[0][0] == Integer.MAX_VALUE ? -1 : dp[0][0];
}

很显然,时间算法复杂度对应了三重 for 循环,为 O(\(2^n * n * n\)) = O(\(2^n*n^2\) )。

类似的,Python 3 AC 代码如下。完整代码在 github/MyEncyclopedia 的tsp/alg_aizu/TSP_loop.py

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TSPSolver:
g: Graph

def __init__(self, g: Graph):
self.g = g

def solve(self) -> int:
"""
:param v:
:param state:
:return: -1 means INF
"""
N = self.g.v_num
dp = [[INT_INF for c in range(N)] for r in range(1 << N)]

dp[(1 << N) - 1][0] = 0

for state in range((1 << N) - 2, -1, -1):
for v in range(N):
for u in range(N):
if ((state >> u) & 1) == 0:
if dp[state | 1 << u][u] != INT_INF and self.g.edges[v][u] != INT_INF:
if dp[state][v] == INT_INF:
dp[state][v] = dp[state | 1 << u][u] + self.g.edges[v][u]
else:
dp[state][v] = min(dp[state][v], dp[state | 1 << u][u] + self.g.edges[v][u])
return dp[0][0]

一个欧式空间TSP数据集

至此,TSP的DP解法全部讲解完毕。接下去,我们引入一个二维欧式空间的TSP数据集 PTR_NET on Google Drive ,这个数据集是 Pointer Networks 的作者 Oriol Vinyals 用于模型的训练测试而引入的。

数据集的每一行格式如下:

1
x1, y1, x2, y2, ... output 1 v1 v2 v3 ... 1

一行开始为n个点的x, y坐标,接着是 output,再接着是1,表示从顶点1出发,经v1,v2,...,返回1,注意顶点编号从1开始。

十个顶点数据集的一些数据示例如下:

1
2
3
4
0.607122 0.664447 0.953593 0.021519 0.757626 0.921024 0.586376 0.433565 0.786837 0.052959 0.016088 0.581436 0.496714 0.633571 0.227777 0.971433 0.665490 0.074331 0.383556 0.104392 output 1 3 8 6 10 9 5 2 4 7 1 
0.930534 0.747036 0.277412 0.938252 0.794592 0.794285 0.961946 0.261223 0.070796 0.384302 0.097035 0.796306 0.452332 0.412415 0.341413 0.566108 0.247172 0.890329 0.429978 0.232970 output 1 3 2 9 6 5 8 7 10 4 1
0.686712 0.087942 0.443054 0.277818 0.494769 0.985289 0.559706 0.861138 0.532884 0.351913 0.712561 0.199273 0.554681 0.657214 0.909986 0.277141 0.931064 0.639287 0.398927 0.406909 output 1 5 2 10 7 4 3 9 8 6 1

画出第一个例子的全部顶点和边。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import matplotlib.pyplot as plt
points='0.607122 0.664447 0.953593 0.021519 0.757626 0.921024 0.586376 0.433565 0.786837 0.052959 0.016088 0.581436 0.496714 0.633571 0.227777 0.971433 0.665490 0.074331 0.383556 0.104392'
float_list = list(map(lambda x: float(x), points.split(' ')))

x,y = [],[]
for idx, p in enumerate(float_list):
if idx % 2 == 0:
x.append(p)
else:
y.append(p)

for i in range(0, len(x)):
for j in range(0, len(x)):
if i == j:
continue
plt.plot((x[i],x[j]),(y[i],y[j]))

plt.show()
全连接的图

这个例子的最短TSP旅程为 \[ 1 \rightarrow 3 \rightarrow 8 \rightarrow 6 \rightarrow 10 \rightarrow 9 \rightarrow 5 \rightarrow 2 \rightarrow 4 \rightarrow 7 \rightarrow 1 \]

{linenos
1
2
3
4
5
6
7
8
tour_str = '1 3 8 6 10 9 5 2 4 7 1'
tour = list(map(lambda x: int(x), tour_str.split(' ')))

for i in range(0, len(tour)-1):
p1 = tour[i] - 1
p2 = tour[i + 1] - 1
plt.plot((x[p1],x[p2]),(y[p1],y[p2]))
plt.show()
最短路径

PTR_NET TSP 的Python代码

初始化Init Graph Edges

在之前的自顶向下的递归版本中,需要做一些改动。首先,是图的初始化,我们依然延续之前的邻接矩阵来表示,由于这次的图是无向图,对于任意两个顶点,需要初始化双向的边。

{linenos
1
2
3
4
5
6
7
8
g: Graph = Graph(N)
for v in range(N):
for u in range(N):
diff_x = coordinates[v][0] - coordinates[u][0]
diff_y = coordinates[v][1] - coordinates[u][1]
dist: float = math.sqrt(diff_x * diff_x + diff_y * diff_y)
g.setDist(u, v, dist)
g.setDist(v, u, dist)

辅助变量记录父节点

另一大改动是需要在遍历过程中保存的顶点关联信息,以便在最终找到最短路径值时可以回溯对应的完整路径。在下面代码中,使用parent[bitstate][v] 来保存此状态下最小路径对应的顶点u。

{linenos
1
2
3
4
5
6
7
8
9
10
11
ret: float = FLOAT_INF
u_min: int = -1
for u in range(self.g.v_num):
if (state & (1 << u)) == 0:
s: float = self._recurse(u, state | 1 << u)
if s + edges[v][u] < ret:
ret = s + edges[v][u]
u_min = u
dp[state][v] = ret
self.parent[state][v] = u_min

当最终最短行程确定后,根据parent的信息可以按图索骥找到完整的行程顶点信息。

{linenos
1
2
3
4
5
6
7
8
9
def _form_tour(self):
self.tour = [0]
bit = 0
v = 0
for _ in range(self.g.v_num - 1):
v = self.parent[bit][v]
self.tour.append(v)
bit = bit | (1 << v)
self.tour.append(0)

需要注意的是,有可能存在多个最短行程,它们的距离值是一致的。这种情况下,代码输出的最短路径可能和数据集output后行程路径不一致,但是的两者的总距离是一致的。下面的代码验证了这一点。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
tsp: TSPSolver = TSPSolver(g)
tsp.solve()

output_dist: float = 0.0
output_tour = list(map(lambda x: int(x) - 1, output.split(' ')))
for v in range(1, len(output_tour)):
pre_v = output_tour[v-1]
curr_v = output_tour[v]
diff_x = coordinates[pre_v][0] - coordinates[curr_v][0]
diff_y = coordinates[pre_v][1] - coordinates[curr_v][1]
dist: float = math.sqrt(diff_x * diff_x + diff_y * diff_y)
output_dist += dist

passed = abs(tsp.dist - output_dist) < 10e-5
if passed:
print(f'passed dist={tsp.tour}')
else:
print(f'Min Tour Distance = {output_dist}, Computed Tour Distance = {tsp.dist}, Expected Tour = {output_tour}, Result = {tsp.tour}')

本文所有代码在 github/MyEncyclopedia tsp/alg_plane 中。

上一期 通过代码学Sutton强化学习1:Grid World OpenAI环境和策略评价算法,我们引入了 Grid World 问题,实现了对应的OpenAI Gym 环境,也分析了其最佳策略和对应的V值。这一期中,继续通过这个例子详细讲解策略提升(Policy Improvment)、策略迭代(Policy Iteration)、值迭代(Value Iteration)和异步迭代方法。

回顾 Grid World 问题

Grid World 问题
在Grid World 中,Agent初始可以出现在编号1-14的网格中,Agent 每往四周走一步得到 -1 reward,因此需要尽快走到两个出口。当然最佳策略是以最小步数往出口逃离,如下所示。
Grid World 最佳策略

最佳策略对应的状态V值和3D heatmap如下

1
2
3
4
[[ 0. -1. -2. -3.]
[-1. -2. -3. -2.]
[-2. -3. -2. -1.]
[-3. -2. -1. 0.]]

Grid World V值 3D heatmap

策略迭代

上一篇中,我们知道如何evaluate 给定policy \(\pi\)\(v_{\pi}\)值,那么是否可能在此基础上改进生成更好的策略 \(\pi^{\prime}\)。如果可以,能否最终找到最佳策略\({\pi}_{*}\)?答案是肯定的,因为存在策略提升定理(Policy Improvement Theorem)。

策略提升定理

在 4.2 节 Policy Improvement Theorem 可以证明,利用 \(v_{\pi}\) 信息对于每个状态采取最 greedy 的 action (又称exploitation)能够保证生成的新 \({\pi}^{\prime}\) 是不差于旧的policy \({\pi}\),即

\[ q_{\pi}(s, {\pi}^{\prime}(s)) \gt v_{\pi}(s) \]

\[ v_{\pi^{\prime}}(s) \gt v_{\pi}(s) \]

因此,可以通过在当前policy求得v值,再选取最greedy action的方式形成如下迭代,就能够不断逼近最佳策略。

\[ \pi_{0} \stackrel{\mathrm{E}}{\longrightarrow} v_{\pi_{0}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{1} \stackrel{\mathrm{E}}{\longrightarrow} v_{\pi_{1}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{2} \stackrel{\mathrm{E}}{\longrightarrow} \cdots \stackrel{\mathrm{I}}{\longrightarrow} \pi_{*} \stackrel{\mathrm{E}}{\longrightarrow} v_{*} \]

策略迭代算法

以下为书中4.3的policy iteration伪代码。其中policy evaluation的算法在上一篇中已经实现。Policy improvement 的精髓在于一次遍历所有状态后,通过policy 的最大Q值找到该状态的最佳action,并更新成最新policy,循环直至没有 action 变更。

\[ \begin{align*} &\textbf{Policy Iteration (using iterative policy evaluation) for estimating } \pi\approx {\pi}_{*} \\ &1. \quad \text{Initialization} \\ & \quad \quad V(s) \in \mathbb R\text{ and } \pi(s) \in \mathcal A(s) \text{ arbitrarily for all }s \in \mathcal{S} \\ & \\ &2. \quad \text{Policy Evaluation} \\ & \quad \quad \text{Loop:}\\ & \quad \quad \Delta \leftarrow 0\\ & \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ & \quad \quad \quad \quad v \leftarrow V(s) \\ & \quad \quad \quad \quad V(s) \leftarrow \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ & \quad \quad \quad \quad \Delta \leftarrow \max(\Delta, |v-V(s)|) \\ & \quad \quad \text{until } \Delta < \theta \text{ (a small positive number determining the accuracy of estimation)}\\ & \\ &3. \quad \text{Policy Improvement} \\ & \quad \quad policy\text{-}stable\leftarrow true \\ & \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ & \quad \quad \quad \quad old\text{-}action\leftarrow \pi(s) \\ & \quad \quad \quad \quad \pi(s) \leftarrow \operatorname{argmax}_{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ & \quad \quad \quad \quad \text{If } old\text{-}action \neq \pi\text{,then }policy\text{-}stable\leftarrow false \\ & \quad \quad \text{If } policy\text{-}stable \text{, then stop and return }V \approx v_{*} \text{ and } \pi\approx {\pi}_{*}\text{; else go to 2} \end{align*} \]

注意到状态Q值 \(q_{\pi}(s, a)\) 会被多处调用,将其封装为单独的函数。

\[ \begin{aligned} q_{\pi}(s, a) &=\sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{\pi}\left(s^{\prime}\right)\right] \end{aligned} \]

Q值函数实现如下:

{linenos
1
2
3
4
5
6
def action_value(env: GridWorldEnv, state: State, V: StateValue, gamma=1.0) -> ActionValue:
q = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[state][a]:
q[a] += prob * (reward + gamma * V[next_state])
return q

有了 action_value 和上期的 policy_evaluate,policy iteration 实现完全对应上面的伪代码。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def policy_improvement(env: GridWorldEnv, policy: Policy, V: StateValue, gamma=1.0) -> bool:
policy_stable = True

for s in range(env.nS):
old_action = np.argmax(policy[s])
Q_s = action_value(env, s, V)
best_action = np.argmax(Q_s)
policy[s] = np.eye(env.nA)[best_action]

if old_action != best_action:
policy_stable = False
return policy_stable


def policy_iteration(env: GridWorldEnv, policy: Policy, gamma=1.0) -> Tuple[Policy, StateValue]:
iter = 0
while True:
V = policy_evaluate(policy, env, gamma)
policy_stable = policy_improvement(env, policy, V)
iter += 1

if policy_stable:
return policy, V

Grid World 例子通过两轮迭代就可以收敛,以下是初始时随机策略的V值和第一次迭代后的V值。
初始随机策略 V 值
第一次迭代后的 V 值

值迭代

值迭代( Value Iteration)的本质是,将policy iteration中的policy evaluation过程从不断循环到收敛直至小于theta,改成只执行一遍,并直接用最佳Q值更新到状态V值,如此可以不用显示地算出\({\pi}\) 而直接在V值上迭代。具体迭代公式如下:

\[ \begin{aligned} v_{k+1}(s) & \doteq \max _{a} \mathbb{E}\left[R_{t+1}+\gamma v_{k}\left(S_{t+1}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\max_{a} q_{\pi_k}(s, a) \\ &=\max _{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{k}\left(s^{\prime}\right)\right] \end{aligned} \]

完整的伪代码为:

\[ \begin{align*} &\textbf{Value Iteration, for estimating } \pi\approx \pi_{*} \\ & \text{Algorithm parameter: a small threshold } \theta > 0 \text{ determining accuracy of estimation} \\ & \text{Initialize } V(s), \text{for all } s \in \mathcal{S}^{+} \text{, arbitrarily except that } V (terminal) = 0\\ & \\ &1: \text{Loop:}\\ &2: \quad \quad \Delta \leftarrow 0\\ &3: \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ &4: \quad \quad \quad \quad v \leftarrow V(s) \\ &5: \quad \quad \quad \quad V(s) \leftarrow \operatorname{max}_{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ &6: \quad \quad \quad \quad \Delta \leftarrow \max(\Delta, |v-V(s)|) \\ &7: \text{until } \Delta < \theta \\ & \\ & \text{Output a deterministic policy, }\pi\approx \pi_{*} \text{, such that} \\ & \quad \quad \pi(s) \leftarrow \operatorname{argmax}_{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \end{align*} \]

代码实现也比较直接,可以复用上面已经实现的 action_value 函数。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def value_iteration(env:GridWorldEnv, gamma=1.0, theta=0.0001) -> Tuple[Policy, StateValue]:
V = np.zeros(env.nS)
while True:
delta = 0
for s in range(env.nS):
action_values = action_value(env, s, V, gamma=gamma)
best_action_value = np.max(action_values)
delta = max(delta, np.abs(best_action_value - V[s]))
V[s] = best_action_value
if delta < theta:
break

policy = np.zeros([env.nS, env.nA])
for s in range(env.nS):
action_values = action_value(env, s, V, gamma=gamma)
best_action = np.argmax(action_values)
policy[s, best_action] = 1.0

return policy, V

异步迭代

在第4.5节中提到了DP迭代方式的改进版:异步方式迭代(Asychronous Iteration)。这里的异步是指每一轮无需全部扫一遍所有状态,而是根据上一轮变化的状态决定下一轮需要最多计算的状态数,类似于Dijkstra最短路径算法中用 heap 来维护更新节点集合,减少运算量。下面我们通过异步值迭代来演示异步迭代的工作方式。

下图表示状态的变化方向,若上一轮 \(V(s)\) 发生更新,那么下一轮就要考虑状态 s 可能会影响到上游状态的集合( p1,p2),避免下一轮必须遍历所有状态的V值计算。

Async 反向传播

要做到部分更新就必须知道每个状态可能影响到的上游状态集合,上图对应的映射关系可以表示为

\[ \begin{align*} s'_1 &\rightarrow \{s\} \\ s'_2 &\rightarrow \{s\} \\ s &\rightarrow \{p_1, p_2\} \end{align*} \]

建立映射关系的代码如下,build_reverse_mapping 返回类型为 Dict[State, Set[State]]。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
def build_reverse_mapping(env:GridWorldEnv) -> Dict[State, Set[State]]:
MAX_R, MAX_C = env.shape[0], env.shape[1]
mapping = {s: set() for s in range(0, MAX_R * MAX_C)}
action_delta = {Action.UP: (-1, 0), Action.DOWN: (1, 0), Action.LEFT: (0, -1), Action.RIGHT: (0, 1)}
for s in range(0, MAX_R * MAX_C):
r = s // MAX_R
c = s % MAX_R
for a in list(Action):
neighbor_r = min(MAX_R - 1, max(0, r + action_delta[a][0]))
neighbor_c = min(MAX_C - 1, max(0, c + action_delta[a][1]))
s_ = neighbor_r * MAX_R + neighbor_c
mapping[s_].add(s)
return mapping

有了描述状态依赖的映射 dict 后,代码也比较简洁,changed_state_set 变量保存了这轮必须计算的状态集合。新的一轮迭代时,将下一轮需要计算的状态保存到 changed_state_set_ 中,本轮结束后,changed_state_set 更新成changed_state_set_,开始下一轮循环直至没有状态需要更新。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def value_iteration_async(env:GridWorldEnv, gamma=1.0, theta=0.0001) -> Tuple[Policy, StateValue]:
mapping = build_reverse_mapping(env)

V = np.zeros(env.nS)
changed_state_set = set(s for s in range(env.nS))

iter = 0
while len(changed_state_set) > 0:
changed_state_set_ = set()
for s in changed_state_set:
action_values = action_value(env, s, V, gamma=gamma)
best_action_value = np.max(action_values)
v_diff = np.abs(best_action_value - V[s])
if v_diff > theta:
changed_state_set_.update(mapping[s])
V[s] = best_action_value
changed_state_set = changed_state_set_
iter += 1

policy = np.zeros([env.nS, env.nA])
for s in range(env.nS):
action_values = action_value(env, s, V, gamma=gamma)
best_action = np.argmax(action_values)
policy[s, best_action] = 1.0

return policy, V
比较值迭代和异步值迭代方法后发现,值迭代用了4次循环,每次涉及所有状态,总计算状态数为 4 x 16 = 64。异步值迭代也用了4次循环,但是总计更新了54个状态。由于Grid World 的状态数很少,异步值迭代优势并不明显,但是对于状态数众多并且迭代最终集中在少部分状态的环境下,节省的计算量还是很可观的。

Leetcode 679 24 Game (Hard)

先来介绍一下24点游戏题目,大家一定都玩过,就是给定4个牌面数字,用加减乘除计算24点。

本篇会用两种偏函数式的 Python 3解法来AC 24 Game。

Leetcode 679 24 Game (Hard) > You have 4 cards each containing a number from 1 to 9. You need to judge whether they could operated through *, /, +, -, (, ) to get the value of 24.

Example 1:

Input: [4, 1, 8, 7]

Output: True

Explanation: (8-4) * (7-1) = 24

Example 2:

Input: [1, 2, 1, 2]

Output: False

itertools.permutations

先来介绍一下Python itertools.permutations 的用法,正好用Leetcode 中的Permutation问题来示例。Permutations 的输入可以是List,返回是 generator 实例,用于生成所有排列。简而言之,python 的 generator 可以和List一样,用 for 语句来全部遍历产生的值。和List不同的是,generator 的所有值并不必须全部初始化,一般按需产生从而大量减少内存占用。下面在介绍 yield 时我们会看到如何合理构造 generator。

Leetcode 46 Permutations (Medium) > Given a collection of distinct integers, return all possible permutations.

Example:

Input: [1,2,3]

Output: [ [1,2,3], [1,3,2], [2,1,3], [2,3,1], [3,1,2], [3,2,1]]

用 permutations 很直白,代码只有一行。

{linenos
1
2
3
4
5
6
7
8
9
10
11
# AC
# Runtime: 36 ms, faster than 91.78% of Python3 online submissions for Permutations.
# Memory Usage: 13.9 MB, less than 66.52% of Python3 online submissions for Permutations.

from itertools import permutations
from typing import List


class Solution:
def permute(self, nums: List[int]) -> List[List[int]]:
return [p for p in permutations(nums)]

itertools.combinations

有了排列就少不了组合,itertools.combinations 可以产生给定List的k个元素组合 \(\binom{n}{k}\),用一道算法题来举例,同样也是一句语句就可以AC。

Leetcode 77 Combinations (Medium)

Given two integers n and k, return all possible combinations of k numbers out of 1 ... n. You may return the answer in any order.

Example 1:

Input: n = 4, k = 2

Output: [ [2,4], [3,4], [2,3], [1,2], [1,3], [1,4],]

Example 2:

Input: n = 1, k = 1

Output: [[1]]

{linenos
1
2
3
4
5
6
7
8
9
# AC
# Runtime: 84 ms, faster than 95.43% of Python3 online submissions for Combinations.
# Memory Usage: 15.2 MB, less than 68.98% of Python3 online submissions for Combinations.
from itertools import combinations
from typing import List

class Solution:
def combine(self, n: int, k: int) -> List[List[int]]:
return [c for c in combinations(list(range(1, n + 1)), k)]

itertools.product

当有多维度的对象需要迭代笛卡尔积时,可以用 product(iter1, iter2, ...)来生成generator,等价于多重 for 循环。

1
2
[lst for lst in product([1, 2, 3], ['a', 'b'])]
[(i, s) for i in [1, 2, 3] for s in ['a', 'b']]

这两种方式都生成了如下结果

1
[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b'), (3, 'a'), (3, 'b')]

再举一个Leetcode的例子来实战product generator。

Leetcode 17. Letter Combinations of a Phone Number (Medium)

Given a string containing digits from 2-9 inclusive, return all possible letter combinations that the number could represent. A mapping of digit to letters (just like on the telephone buttons) is given below. Note that 1 does not map to any letters.

Example:

Input: "23"

Output: ["ad", "ae", "af", "bd", "be", "bf", "cd", "ce", "cf"].

举例来说,下面的代码当输入 digits 是 '352' 时,iter_dims 的值是 ['def', 'jkl', 'abc'],再输入给 product 后会产生 'dja', 'djb', 'djc', 'eja', 共 3 x 3 x 3 = 27个组合的值。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# AC
# Runtime: 24 ms, faster than 94.50% of Python3 online submissions for Letter Combinations of a Phone Number.
# Memory Usage: 13.7 MB, less than 83.64% of Python3 online submissions for Letter Combinations of a Phone Number.

from itertools import product
from typing import List


class Solution:
def letterCombinations(self, digits: str) -> List[str]:
if digits == "":
return []
mapping = {'2':'abc', '3':'def', '4':'ghi', '5':'jkl', '6':'mno', '7':'pqrs', '8':'tuv', '9':'wxyz'}
iter_dims = [mapping[i] for i in digits]

result = []
for lst in product(*iter_dims):
result.append(''.join(lst))

return result

yield 示例

Python具有独特的itertools generator,可以花式AC代码,接下来讲解如何进一步构造 generator。Python 定义只要函数中使用了yield关键字,这个函数就是 generator。Generator 在计算机领域的标准名称是 coroutine,即协程,是一种特殊的函数:当返回上层调用时自身能够保存调用栈状态,并在上层函数处理完逻辑后跳入到这个 generator,恢复之前的状态再继续运行下去。Yield语句也举一道经典的Fibonacci 问题。

Leetcode 509. Fibonacci Number (Easy)

The Fibonacci numbers, commonly denoted F(n) form a sequence, called the Fibonacci sequence, such that each number is the sum of the two preceding ones, starting from 0 and 1. That is, F(0) = 0, F(1) = 1 F(N) = F(N - 1) + F(N - 2), for N > 1. Given N, calculate F(N).

Example 1:

Input: 2

Output: 1

Explanation: F(2) = F(1) + F(0) = 1 + 0 = 1.

Example 2:

Input: 3

Output: 2

Explanation: F(3) = F(2) + F(1) = 1 + 1 = 2.

Example 3:

Input: 4

Output: 3

Explanation: F(4) = F(3) + F(2) = 2 + 1 = 3.

Fibonacci 的一般标准解法是循环迭代方式,可以以O(n)时间复杂度和O(1) 空间复杂度来AC。下面的 yield 版本中,我们构造了fib_next generator,它保存了最后两个值作为内部迭代状态,外部每调用一次可以得到下一个fib(n),如此外部只需不断调用直到满足题目给定次数。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# AC
# Runtime: 28 ms, faster than 85.56% of Python3 online submissions for Fibonacci Number.
# Memory Usage: 13.8 MB, less than 58.41% of Python3 online submissions for Fibonacci Number.

class Solution:
def fib(self, N: int) -> int:
if N <= 1:
return N
i = 2
for fib in self.fib_next():
if i == N:
return fib
i += 1

def fib_next(self):
f_last2, f_last = 0, 1
while True:
f = f_last2 + f_last
f_last2, f_last = f_last, f
yield f

yield from 示例

上述yield用法之后,再来演示 yield from 的用法。Yield from 始于Python 3.3,用于嵌套generator时的控制转移,一种典型的用法是有多个generator嵌套时,外层的outer_generator 用 yield from 这种方式等价代替如下代码。

1
2
3
def outer_generator():
for i in inner_generator():
yield i

用一道算法题目来具体示例。

Leetcode 230. Kth Smallest Element in a BST (Medium)

Given a binary search tree, write a function kthSmallest to find the kth smallest element in it.

Example 1: Input: root = [3,1,4,null,2], k = 1

1
2
3
4
5
  3
/ \
1 4
\
2
Output: 1

Example 2:

Input: root = [5,3,6,2,4,null,null,1], k = 3

1
2
3
4
5
6
7
         5
/ \
3 6
/ \
2 4
/
1
Output: 3

直觉思路上,我们只要从小到大有序遍历每个节点直至第k个。因为给定的树是Binary Search Tree,有序遍历意味着以左子树、节点本身和右子树的访问顺序递归下去就行。由于ordered_iter是generator,递归调用自己的过程就是嵌套使用generator的过程。下面是yield版本。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# AC
# Runtime: 48 ms, faster than 90.31% of Python3 online submissions for Kth Smallest Element in a BST.
# Memory Usage: 17.9 MB, less than 14.91% of Python3 online submissions for Kth Smallest Element in a BST.

class Solution:
def kthSmallest(self, root: TreeNode, k: int) -> int:
def ordered_iter(node):
if node:
for sub_node in ordered_iter(node.left):
yield sub_node
yield node
for sub_node in ordered_iter(node.right):
yield sub_node

for node in ordered_iter(root):
k -= 1
if k == 0:
return node.val

等价于如下 yield from 版本:

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# AC
# Runtime: 56 ms, faster than 63.74% of Python3 online submissions for Kth Smallest Element in a BST.
# Memory Usage: 17.7 MB, less than 73.33% of Python3 online submissions for Kth Smallest Element in a BST.

class Solution:
def kthSmallest(self, root: TreeNode, k: int) -> int:
def ordered_iter(node):
if node:
yield from ordered_iter(node.left)
yield node
yield from ordered_iter(node.right)

for node in ordered_iter(root):
k -= 1
if k == 0:
return node.val

24 点问题之函数式枚举解法

看明白了itertools.permuations,combinations,product,yield以及yield from,我们回到本篇最初的24点游戏问题。

24点游戏的本质是枚举出所有可能运算,如果有一种方式得到24返回True,否则返回Flase。进一步思考所有可能的运算,包括下面三个维度:

  1. 4个数字的所有排列,比如给定 [1, 2, 3, 4],可以用permutations([1, 2, 3, 4]) 生成这个维度的所有可能

  2. 三个位置的操作符号的全部可能,可以用 product([+, -, *, /], repeat=3) 生成,具体迭代结果为:[+, +, +],[+, +, -],...

  3. 给定了前面两个维度后,还有一个比较不容易察觉但必要的维度:运算优先级。比如在给定数字顺序 [1, 2, 3, 4]和符号顺序 [+, *, -]之后可能的四种操作树

四种运算优先级

能否算得24点只需要枚举这三个维度笛卡尔积的运算结果

(维度1:数字组合) x (维度2:符号组合) x (维度3:优先级组合)

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# AC
# Runtime: 112 ms, faster than 57.59% of Python3 online submissions for 24 Game.
# Memory Usage: 13.7 MB, less than 85.60% of Python3 online submissions for 24 Game.

import math
from itertools import permutations, product
from typing import List

class Solution:

def iter_trees(self, op1, op2, op3, a, b, c, d):
yield op1(op2(a, b), op3(c, d))
yield op1(a, op2(op3(b, c), d))
yield op1(a, op2(b, op3(c, d)))
yield op1(op2(a, op3(b, c)), d)

def judgePoint24(self, nums: List[int]) -> bool:
mul = lambda x, y: x * y
plus = lambda x, y: x + y
div = lambda x, y: x / y if y != 0 else math.inf
minus = lambda x, y: x - y

op_lst = [plus, minus, mul, div]

for ops in product(op_lst, repeat=3):
for val in permutations(nums):
for v in self.iter_trees(ops[0], ops[1], ops[2], val[0], val[1], val[2], val[3]):
if abs(v - 24) < 0.0001:
return True
return False

24 点问题之 DFS yield from 解法

一种常规的思路是,在四个数组成的集合中先选出任意两个数,枚举所有可能的计算,再将剩余的三个数组成的集合递归调用下去,直到叶子节点只剩一个数,如下图所示。

DFS 调用示例

下面的代码是这种思路的 itertools + yield from 解法,recurse方法是generator,会自我递归调用。当只剩下两个数时,用 yield 返回两个数的所有可能运算得出的值,其他非叶子情况下则自我调用使用yield from,例如4个数任选2个先计算再合成3个数的情况。这种情况下,比较麻烦的是由于4个数可能有相同值,若用 combinations(lst, 2) 先任选两个数,后续要生成剩余两个数加上第三个计算的数的集合代码会繁琐。因此,我们改成任选4个数index中的两个,剩余的indices 可以通过集合操作来完成。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# AC
# Runtime: 116 ms, faster than 56.23% of Python3 online submissions for 24 Game.
# Memory Usage: 13.9 MB, less than 44.89% of Python3 online submissions for 24 Game.

import math
from itertools import combinations, product, permutations
from typing import List

class Solution:

def judgePoint24(self, nums: List[int]) -> bool:
mul = lambda x, y: x * y
plus = lambda x, y: x + y
div = lambda x, y: x / y if y != 0 else math.inf
minus = lambda x, y: x - y

op_lst = [plus, minus, mul, div]

def recurse(lst: List[int]):
if len(lst) == 2:
for op, values in product(op_lst, permutations(lst)):
yield op(values[0], values[1])
else:
# choose 2 indices from lst of length n
for choosen_idx_lst in combinations(list(range(len(lst))), 2):
# remaining indices not choosen (of length n-2)
idx_remaining_set = set(list(range(len(lst)))) - set(choosen_idx_lst)

# remaining values not choosen (of length n-2)
value_remaining_lst = list(map(lambda x: lst[x], idx_remaining_set))
for op, idx_lst in product(op_lst, permutations(choosen_idx_lst)):
# 2 choosen values are lst[idx_lst[0]], lst[idx_lst[1]
value_remaining_lst.append(op(lst[idx_lst[0]], lst[idx_lst[1]]))
yield from recurse(value_remaining_lst)
value_remaining_lst = value_remaining_lst[:-1]

for v in recurse(nums):
if abs(v - 24) < 0.0001:
return True

经典教材Reinforcement Learning: An Introduction 第二版由强化领域权威Richard S. Sutton 和 Andrew G. Barto 完成编写,内容深入浅出,非常适合初学者。在本篇中,引入Grid World示例,结合强化学习核心概念,并用python代码实现OpenAI Gym的模拟环境,进一步实现策略评价算法。

Grid World 问题

第四章例子4.1提出了一个简单的离散空间状态问题:Grid World,其大致意思是在4x4的网格世界中有14个格子是非终点状态,在这些非终点状态的格子中可以往上下左右四个方向走,直至走到两个终点状态格子,则游戏结束。每走一步,Agent收获reward -1,表示Agent希望在Grid World中尽早出去。另外,Agent在Grid World边缘时,无法继续往外只能呆在原地,reward也是-1。

Finite MDP 模型

先来回顾一下强化学习的建模基础:有限马尔可夫决策过程(Finite Markov Decision Process, Finite MDP)。如下图,强化学习模型将世界抽象成两个实体,强化学习解决目标的主体Agent和其他外部环境。它们之间的交互过程遵从有限马尔可夫决策过程:若Agent在t时间步骤时处于状态 \(S_t\),采取动作 \(A_t\),然后环境根据自身机制,产生Reward \(R_{t+1}\) 并将Agent状态变为 \(S_{t+1}\)

环境自身机制又称为dynamics,工程上可以看成一个输入(S, A),输出(S, R)的方法。由于MDP包含随机过程,某个输入并不能确定唯一输出,而会根据概率分布输出不同的(S, R)。Finite MDP简化了时间对于模型的影响,因为(S, R)只和(S, A)有关,不和时间t有关。另外,有限指的是S,A,R的状态数量是有限的。

数学上dynamics可以如下表示

\[ p\left(s^{\prime}, r \mid s, a\right) \doteq \operatorname{Pr}\left\{S_{t}=s^{\prime}, R_{t}=r \mid S_{t-1}=s, A_{t-1}=a\right\} \]

即是四元组作为输入的概率函数 \(p: S \times R \times S \times A \rightarrow [0, 1]\)

满足 \[ \sum_{s^{\prime} \in \mathcal{S}} \sum_{r \in \mathcal{R}} p\left(s^{\prime}, r \mid s, a\right)=1, \text { for all } s \in \mathcal{S}, a \in \mathcal{A}(s) \]

以Grid World为例,当Agent处于编号1的网格时,可以往四个方向走,往任意方向走都只产生一种 S, R,因为这个简单的游戏是确定性的,不存在某一动作导致stochastic状态。例如,在1号网格往左就到了终点网格(编号0),得到Reward -1这个规则可以如下表示 \[ p\left(s^{\prime}=0, r=-1 \mid s=1, a=\text{L}\right) = 1 \] 因此,状态s=1的所有dynamics概率映射为

\[ \begin{aligned} p\left(s^{\prime}=0, r=-1 \mid s=1, a=\text{L}\right) &=& 1 \\ p\left(s^{\prime}=2, r=-1 \mid s=1, a=\text{R}\right) &=& 1 \\ p\left(s^{\prime}=1, r=-1 \mid s=1, a=\text{U}\right) &=& 1 \\ p\left(s^{\prime}=5, r=-1 \mid s=1, a=\text{D}\right) &=& 1 \end{aligned} \]

强化学习的目的

在给定了问题以及定义了强化学习的模型之后,强化学习的目的当然是通过学习让Agent能够学到最佳策略\(\pi_{*}\),也就是在某个状态下的行动分布,记成 \(\pi(a|s)\)。对应在数值上的优化目标是Agent在一系列过程中采取某种策略的reward总和的期望(Expected Return)。下面公式定义了t步往后的reward总和,其中 \(\gamma\) 为discount factor,用于权衡短期和长期reward对于当前Agent的效用影响。等式最后一步的意义是t步后的reward总和等价于t步所获的立即reward \(R_{t+1}\),加上t+1步后的reward总和 \(\gamma G_{t+1}\)

\[ \begin{aligned} G_{t} & \doteq R_{t+1}+\gamma R_{t+2}+\gamma^{2} R_{t+3}+\gamma^{3} R_{t+4}+\cdots \\ &=R_{t+1}+\gamma\left(R_{t+2}+\gamma R_{t+3}+\gamma^{2} R_{t+4}+\cdots\right) \\ &=R_{t+1}+\gamma G_{t+1} \end{aligned} \]

有了reward总和的定义,评价Agent策略 \(\pi\) 就可以定义成Agent在状态 s 时采用此策略的Expected Return。

\[ v_{\pi}(s) \doteq \mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s\right] \]

下面公式推导了 \(v_{\pi}(s)\) 数值上和相关状态 \(s{\prime}\) 的关系:

\[ \begin{aligned} v_{\pi}(s) &\doteq \mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s\right] \\ &=\mathbb{E}_{\pi}\left[\sum_{k=0}^{\infty} \gamma^{k} R_{t+k+1} \mid S_{t}=s\right]\\ &=\mathbb{E}_{\pi}\left[R_{t+1}+\gamma G_{t+1} \mid S_{t}=s\right] \\ &=\sum_{a} \pi(a \mid s) \sum_{s^{\prime}} \sum_{r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma \mathbb{E}_{\pi}\left[G_{t+1} \mid S_{t+1}=s^{\prime}\right]\right] \\ &=\sum_{a} \pi(a \mid s) \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{\pi}\left(s^{\prime}\right)\right] \quad \text { for all } s \in \mathcal{S} \end{aligned} \]

注意到如果将 \(v_{\pi}(s)\) 看成未知数,上式即形成 \(\mid \mathcal{S} \mid\) 个未知变量的方程组,可以在数值上解得各个 \(v_{\pi}(s)\)

书中用Backup Diagram来表示递推关系,下图是\(v_{\pi}(s)\)的backup diagram。

尽管v值可以来衡量策略,但由于\(v_{\pi}(s)\) 是Agent在策略\(\pi(a|s)\)的Expected Return,将不同的action拆出来单独计算Expected Return,这样的做法有时更为直接,这就是著名的Q Learning中的q 值,记成\(q_{\pi}(s, a)\)

\[ q_{\pi}(s, a) \doteq \mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s, A_{t}=a\right] \]

下面是 $q_{}(s, a) $ 的递推 backup diagram。

Bellman 最佳原则

对于所有状态集合\(\mathcal{S}\),策略\({\pi}\)的评价指标 \(v_{\pi}(s)\) 是一个向量,本质上是无法相互比较的。但由于存在Bellman 最佳原则(Bellman's principle of optimality):在有限状态情况下,一定存在一个或者多个最好的策略 \({\pi}_{*}\),它在所有状态下的v值都是最好的,即 \(v_{\pi_{*}}(s) \ge v_{\pi^{\prime}}(s) \text { for all } s \in \mathcal{S}\)

因此,最佳v值定义为最佳策略 \({\pi}_{*}\) 对应的 v 值

\[ v_{*}(s) \doteq \max_{\pi} v_{\pi}(s) \]

同理,也存在最佳q值,记为 \[ \begin{aligned} q_{*}(s, a) &\doteq \max_{\pi} q_{\pi}(s,a) \end{aligned} \]

\(v_{*}(s)\) 改写成递推形式,称为 Bellman Optimality Equation,推导如下

\[ \begin{aligned} v_{*}(s) &=\max _{a \in \mathcal{A}(s)} q_{\pi_{*}}(s, a) \\ &=\max _{a} \mathbb{E}_{\pi_{*}}\left[G_{t} \mid S_{t}=s, A_{t}=a\right] \\ &=\max _{a} \mathbb{E}_{\pi_{*}}\left[R_{t+1}+\gamma G_{t+1} \mid S_{t}=s, A_{t}=a\right] \\ &=\max _{a} \mathbb{E}\left[R_{t+1}+\gamma v_{*}\left(S_{t+1}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\max _{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{*}\left(s^{\prime}\right)\right] \end{aligned} \]

直觉上可以理解为状态 s 对应的最佳v值是只采取此状态下的最佳动作后的Expected Return。

最佳q值递归形式的意义为最佳策略下状态s时采取行动 a 的Expected Return,等于所有可能后续状态 s' 下采取最优行动的Expected Return的均值。推导如下:

\[ \begin{aligned} q_{*}(s, a) &=\mathbb{E}\left[R_{t+1}+\gamma \max _{a^{\prime}} q_{*}\left(S_{t+1}, a^{\prime}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma \max _{a^{\prime}} q_{*}\left(s^{\prime}, a^{\prime}\right)\right] \end{aligned} \]

\(v_{*}(s), q_{*}(s, a)\) 的backup diagram 如下图

Grid World 最佳策略和V值

Grid World 的最佳策略如下:尽可能快的走出去

Grid World最佳策略

上面的2D图中不同颜色表示不同V值,终点格子的红色表示0,隔着一步的黄色为-1,隔两步的绿色为-2,最远的紫色为-3。下面是立体图示。

Grid World最佳策略V值

Grid World OpenAI Gym 环境

下面是OpenAI Gym框架下Grid World环境的代码实现。本质是在GridWorldEnv构造函数中构建MDP,类型定义如下

1
2
3
4
5
6
MDP = Dict[State, Dict[Action, List[Tuple[Prob, State, Reward, bool]]]]

# P[state][action] = [
# (prob1, next_state1, reward1, is_done),
# (prob2, next_state2, reward2, is_done), ...]

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Action(Enum):
UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3

State = int
Reward = float
Prob = float
Policy = Dict[State, Dict[Action, Prob]]
Value = List[float]
StateSet = Set[int]
NonTerminalStateSet = Set[int]
MDP = Dict[State, Dict[Action, List[Tuple[Prob, State, Reward, bool]]]]
# P[s][a] = [(prob, next_state, reward, is_done), ...]

class GridWorldEnv(discrete.DiscreteEnv):
"""
Grid World environment described in Sutton and Barto Reinforcement Learning 2nd, chapter 4.
"""

def __init__(self, shape=[4,4]):
self.shape = shape
nS = np.prod(shape)
nA = len(list(Action))
MAX_R = shape[0]
MAX_C = shape[1]
self.grid = np.arange(nS).reshape(shape)
isd = np.ones(nS) / nS

# P[s][a] = [(prob, next_state, reward, is_done), ...]
P: MDP = {}
action_delta = {Action.UP: (-1, 0), Action.DOWN: (1, 0), Action.LEFT: (0, -1), Action.RIGHT: (0, 1)}
for s in range(0, MAX_R * MAX_C):
P[s] = {a.value : [] for a in list(Action)}
is_terminal = self.is_terminal(s)
if is_terminal:
for a in list(Action):
P[s][a.value] = [(1.0, s, 0, True)]
else:
r = s // MAX_R
c = s % MAX_R
for a in list(Action):
neighbor_r = min(MAX_R-1, max(0, r + action_delta[a][0]))
neighbor_c = min(MAX_C-1, max(0, c + action_delta[a][1]))
s_ = neighbor_r * MAX_R + neighbor_c
P[s][a.value] = [(1.0, s_, -1, False)]

super(GridWorldEnv, self).__init__(nS, nA, P, isd)

策略评估(Policy Evaluation)

策略评估需要解决在给定环境dynamics和Agent策略 \(\pi\)下,计算策略的v值 \(v_{\pi}\)。由于所有数量关系都已知,可以通过解方程组的方式求得,但通常会通过数值迭代的方式来计算,即通过一系列 \(v_{0}, v_{1}, ..., v_{k}\) 收敛至 \(v_{\pi}\)。如下迭代方式已经得到证明,当 \(k \rightarrow \infty\) 一定收敛至 \(v_{\pi}\)

\[ \begin{aligned} v_{k+1}(s) & \doteq \mathbb{E}_{\pi}\left[R_{t+1}+\gamma v_{k}\left(S_{t+1}\right) \mid S_{t}=s\right] \\ &=\sum_{a} \pi(a \mid s) \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{k}\left(s^{\prime}\right)\right] \end{aligned} \]

书中具体伪代码如下

\[ \begin{align*} &\textbf{Iterative Policy Evaluation, for estimating } V\approx v_{\pi} \\ & \text{Input } {\pi}, \text{the policy to be evaluated} \\ & \text{Algorithm parameter: a small threshold } \theta > 0 \text{ determining accuracy of estimation} \\ & \text{Initialize } V(s), \text{for all } s \in \mathcal{S}^{+} \text{, arbitrarily except that } V (terminal) = 0\\ & \\ &1: \text{Loop:}\\ &2: \quad \quad \Delta \leftarrow 0\\ &3: \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ &4: \quad \quad \quad \quad v \leftarrow V(s) \\ &5: \quad \quad \quad \quad V(s) \leftarrow \sum_{a} \pi(a \mid s) \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ &6: \quad \quad \quad \quad \Delta \leftarrow \max(\Delta, |v-V(s)|) \\ &7: \text{until } \Delta < \theta \end{align*} \]

下面是python 代码实现,注意这里单run迭代时,新的v值直接覆盖数组里的旧v值,这种做法在书中被证明不仅有效,甚至更为高效。这种做法称为原地(in place)更新。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def policy_evaluate(policy: Policy, env: GridWorldEnv, gamma=1.0, theta=0.0001):
V = np.zeros(env.nS)
while True:
delta = 0
for s in range(env.nS):
v = 0
for a, action_prob in enumerate(policy[s]):
for prob, next_state, reward, done in env.P[s][a]:
v += action_prob * prob * (reward + gamma * V[next_state])
delta = max(delta, np.abs(v - V[s]))
V[s] = v
if delta < theta:
break
return np.array(V)

输入策略为随机选择方向,运行上面的policy_evaluate最终多轮收敛后的V值输出为

{linenos
1
2
3
4
[[  0.         -13.99931242 -19.99901152 -21.99891199]
[-13.99931242 -17.99915625 -19.99908389 -19.99909436]
[-19.99901152 -19.99908389 -17.99922697 -13.99942284]
[-21.99891199 -19.99909436 -13.99942284 0. ]]
在3D V值图中可以发现,由于是随机选择方向的策略, Agent在每个格子的V值绝对数值要比最佳V值大,意味着随机策略下Agent在Grid World会得到更多的负reward。
Grid World随机策略V值
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×