#Dynamic Programming

本篇是TSP问题从DP算法到深度学习系列第四篇,这一篇我们会详细举例并比较在 seq-to-seq 或者Markov Chain中的一些常见的搜索概率最大的状态序列的算法。这些方法在深度学习的seq-to-seq 中被用作decoding。在第五篇中,我们使用强化学习时也会使用了本篇中讲到的方法。

马尔科夫链问题

在 seq-to-seq 问题中,我们经常会遇到需要从现有模型中找概率最大的可能状态序列。这类问题在机器学习算法和控制领域广泛存在,抽象出来可以表达成马尔可夫链模型:给定初始状态的分布和系统的状态转移方程(称为系统动力,dynamics),找寻最有可能的状态序列。

举个例子,假设系统有 \(n\) 个状态,初始状态由 $s_0 = [0.35, 0.25, 0.4] $ 指定,表示初始时三种状态的分布为 0.35,0.25和0.4。

状态转移矩阵由 \(T\) 表达,其中 $ T[i][j]$ 表示从状态 \(i\) 到状态 \(j\) 的概率。注意下面的矩阵 \(T\) 每行的和为 1.0,对应了从任意状态出发,下一时刻的所有可能转移概率和为1。 \[ T= \begin{matrix} & \begin{matrix}0&1&2\end{matrix} \\\\ \begin{matrix}0\\\\1\\\\2\end{matrix} & \begin{bmatrix}0.3&0.6&0.1\\\\0.4&0.2&0.4\\\\0.3&0.3&0.4\end{bmatrix}\\\\ \end{matrix} \]

至此,系统的所有参数都定下来了。接下去的各个时刻的状态分布可以通过矩阵乘法来算得。比如,记\(s_1\)\(t_1\) 时刻状态分布,计算方法为 \(s_0\) 乘以 \(T\),动画如下:

\(s_1\) 数值计算结果如下。

\[ s_1 = \begin{bmatrix}0.35& 0.25& 0.4\end{bmatrix} \begin{matrix} \begin{bmatrix}0.3&0.6&0.1\\\\0.4&0.2&0.4\\\\0.3&0.3&0.4\end{bmatrix}\\\\ \end{matrix} = \begin{bmatrix}0.325& 0.35& 0.255\end{bmatrix} \] 矩阵左乘行向量可以理解为矩阵每一行的线性组合,直觉上理解为下一时刻的状态分布是上一时刻初始状态分布乘以转移关系的线性组合。 \[ \begin{bmatrix}0.35& 0.25& 0.4\end{bmatrix} \begin{matrix} \begin{bmatrix}0.3&0.6&0.1\\\\0.4&0.2&0.4\\\\0.3&0.3&0.4\end{bmatrix}\\\\ \end{matrix} = 0.35 \times \begin{bmatrix}0.35& 0.6& 0.1\end{bmatrix} + 0.25 \times \begin{bmatrix}0.4& 0.2& 0.4\end{bmatrix} + 0.4 \times \begin{bmatrix}0.3& 0.3& 0.4\end{bmatrix} \] 同样的,后面每一个时刻都可以由上一个状态分布向量乘以 \(T\),当然这里我们假设每个时刻的转移矩阵是不变的。当然,问题也可以是每个时刻都有不同的转移矩阵来定义,例如深度学习 seq-to-seq 模型。当然,这个设定的变化不会影响搜索最可能状态序列的算法。出于简单考虑,本篇中我们假定所有时刻的状态转移矩阵都是 \(T\)

下面我们通过多种算法来找出由上述参数定义的系统中前三个时刻的最有可能序列,即概率最大的 \(s_0 \rightarrow s_1 \rightarrow s_2\)

\(L\) 是阶段数,\(N\) 是每个阶段的状态数,则我们的例子中 \(L=N=3\) 。并且,总共有 \(N^L\) 种不同的路径。

穷竭搜索

若给定一条路径,计算特定路径的概率是很直接的,例如,若给定路径为 \(2(s_0) \rightarrow 1(s_1) \rightarrow 2(s_2)\),则这条路径的概率为

\[ p(2 \rightarrow 1 \rightarrow 2) = s_0[2] \times T[2][1] \times T[1][2] = 0.4 \times 0.3 \times 0.4 = 0.048 \]

因此,我们可以通过枚举所有 \(N^L\) 条路径并计算每条路径的概率来找到最有可能的状态序列。

下面是Python 3的穷竭搜索代码,输出为最有可能的概率及其路径。样例问题的输出为 0.084 和状态序列 \(0 \rightarrow 1 \rightarrow 2\)

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def search_brute_force(initial: List, transition: List, L: int) -> Tuple[float, Tuple]:
from itertools import combinations_with_replacement
v = [0, 1, 2]
path_all = combinations_with_replacement(v, L)

max_prop = 0.0
max_route = None
prob = 0.0
for path in list(path_all):
for idx, v in enumerate(path):
if idx == 0:
prob = initial[v] # reset to initial state
else:
prev_v = path[idx-1]
prob *= transition[prev_v][v]
if prob > max_prop:
max_prop = max(max_prop, prob)
max_route = path
return max_prop, max_route

贪心搜索

穷竭搜索一定会找到最有可能的状态序列,但是算法复杂度是指数级的 \(O(N^L)\)。一种最简化的策略是,每一时刻都只选取下一时刻最可能的状态,显然这种策略没有考虑全局最优,只考虑下一步最优,因此称为贪心策略。当然,贪心策略虽然牺牲全局最优解但是换取了很快的时间复杂度。贪心搜索算法动画如下。

Python 3 实现中我们利用了 numpy 类库,主要是 np.argmax() 可以让代码简洁。代码本质上是两重循环,(一层循环是np.argmax中),对应时间算法复杂度是 \(O(N\times L)\)

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def search_greedy(initial: List, transition: List, L: int) -> Tuple[float, Tuple]:
import numpy as np
max_route = []
max_prop = 0.0
states = np.array(initial)

prev_max_v = None
for l in range(0, L):
max_v = np.argmax(states)
max_route.append(max_v)
if l == 0:
max_prop = initial[max_v]
else:
max_prop = max_prop * transition[prev_max_v][max_v]
states = max_prop * states
prev_max_v = max_v

return max_prop, max_route

Beam 搜索

贪心策略只考虑了下一步的最大概率状态,若我们改进一下贪心策略,将下一步的最大 \(k\) 个状态保留下来就是beam 搜索了。具体来说, \(k\) beam search表示每个阶段保留 \(k\) 个最大概率路径,下一阶段扩展这 \(k\) 条路径至 \(k \times N\) 条路径再选取最大的top k。以上例来说,选取\(k=2\),则初始 \(s_0\)时选取最大概率的两种状态 0和 2,下一阶段 \(s_1\),计算以0和2开始的共 \(2 \times 3\) 条路径,并保留其中最大概率的两条,如此往复。显然,beam search也无法找到全局最优解,但是它能以线性时间复杂度探索更多的路径空间。

以下是Python 3 的代码实现,利用了 PriorityQueue 选取 \(k\) 路径。由于PriorityQueue 无法自定义比较关系,我们定义了 @total_ordering 标注的类来实现比较关心。时间算法复杂度是 \(O(k\times N \times L)\)

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def search_beam(initial: List, transition: List, L: int, K: int) -> Tuple[float, Tuple]:
N = len(initial)
from queue import PriorityQueue
current_q = PriorityQueue()
next_q = PriorityQueue()

from functools import total_ordering
@total_ordering
class PQItem(object):
def __init__(self, prob, route):
self.prob = prob
self.route = route
self.last_v = int(route[-1])

def __eq__(self, other):
return self.prob == other.prob

def __lt__(self, other):
return self.prob > other.prob

for v in range(N):
next_q.put(PQItem(initial[v], str(v)))

for l in range(1, L):
current_q = next_q
next_q = PriorityQueue()
k = K
while not current_q.empty() and k > 0:
item = current_q.get()
prob, route, prev_v = item.prob, item.route, item.last_v
k -= 1
for v in range(N):
nextItem = PQItem(prob * transition[prev_v][v], route + str(v))
next_q.put(nextItem)

max_item = next_q.get()

return max_item.prob, list(map(lambda x: int(x), max_item.route))

Viterbi 动态规划

和之前TSP 动态规划算法的思想一样,最有可能状态路径问题解法有可以将指数时间复杂度 \(O(N^L)\) 降到多项式时间复杂度 \(O(L \times N \times N)\) 的算法,就是大名鼎鼎的 Viterbi 算法(维特比算法)。核心思想是在每个阶段,用数组保存每个状态结尾路径的阶段最大概率(及其对应路径)。在不考虑优化空间的情况下,我们开一个二维数组 \(dp[][]\),第一维表示阶段序号,第二维表示状态序号。例如,\(dp[1][0]\)\(s_1\) 阶段时以状态0结尾的所有路径中的最大概率,即 \[ dp[1][0] = \max \\{s_0[0] \rightarrow s_1[0], s_0[1] \rightarrow s_1[0], s_0[2] \rightarrow s_1[0]\\} \]

实现代码中没有返回路径本身而只是其概率值,目的是通过简洁的三层循环来表达算法精髓。

{linenos
1
2
3
4
5
6
7
8
9
10
11
def search_dp(initial: List, transition: List, L: int) -> float:
N = len(initial)
dp = [[0.0 for c in range(N)] for r in range(L)]
dp[0] = initial[:]

for l in range(1, L):
for v in range(N):
for prev_v in range(N):
dp[l][v] = max(dp[l][v], dp[l - 1][prev_v] * transition[prev_v][v])

return max(dp[L-1])

概率采用

以上所有的算法都是确定性的。在NLP 深度学习decoding 时候会带来一个问题:确定性容易导致生成重复的短语或者句子。比如,确定性算法很容易生成如下句子。

1
This is the best of best of best of ...
一种简单的方法是采用概率采用的方式回避这个问题。也就是我们不寻找确定的局部最优或者全局最优的解,而是通过局部路径或者全局路径的概率信息进行采样生成序列。例如,对于穷竭搜索的 \(N^L\) 条路径计算得到对应概率,转变成 \(N^L\) 个点的 categorical 分布,采样生成某条路径。也可以如下改造贪心或者beam 这类阶段性生成算法一个时刻一个时刻的输出采样的状态序列。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def search_prob_greedy(initial: List, transition: List, L: int) -> Tuple[float, Tuple]:
import random
N = len(initial)
max_route = []
max_prop = 0.0
vertices = [i for i in range(N)]
prob = initial[:]

for l in range(0, L):
v_lst = random.choices(vertices, prob)
v = v_lst[0]
max_route.append(v)
max_prop = prob[v]
prob = [prob[v] * transition[v][v_target] for v_target in range(N)]

return max_prop, max_route

这一期我们进入第六章:时序差分学习(Temporal-Difference Learning)。TD Learning本质上是加了bootstrapping的蒙特卡洛(MC),也是model-free的方法,但实践中往往比蒙特卡洛收敛更快。我们选取OpenAI Gym中经典的CartPole环境来讲解TD。更多相关内容,欢迎关注 本公众号 MyEncyclopedia

CartPole OpenAI 环境

如图所示,小车上放了一根杆,杆会根据物理系统定理因重力而倒下,我们可以控制小车往左或者往右,目的是尽可能地让杆保持树立状态。

CartPole OpenAI Gym

CartPole 观察到的状态是四维的float值,分别是车位置,车速度,杆角度和杆角速度。下表为四个维度的值范围。给到小车的动作,即action space,只有两种:0,表示往左推;1,表示往右推。

Min Max
Cart Position -4.8 4.8
Cart Velocity -Inf Inf
Pole Angle -0.418 rad (-24 deg) 0.418 rad (24 deg)
Pole Angular Velocity -Inf Inf

离散化连续状态

从上所知,CartPole step() 函数返回了4维ndarray,类型为float32的连续状态空间。对于传统的tabular方法来说第一步必须离散化状态,目的是可以作为Q table的主键来查找。下面定义的State类型是离散化后的具体类型,另外 Action 类型已经是0和1,不需要做离散化处理。

{linenos
1
2
State = Tuple[int, int, int, int]
Action = int

离散化处理时需要考虑的一个问题是如何设置每个维度的分桶策略。分桶策略会决定性地影响训练的效果。原则上必须将和action以及reward强相关的维度做细粒度分桶,弱相关或者无关的维度做粗粒度分桶。举个例子,小车位置本身并不能影响Agent采取的下一动作,当给定其他三维状态的前提下,因此我们对小车位置这一维度仅设置一个桶(bucket size=1)。而杆的角度和角速度是决定下一动作的关键因素,因此我们分别设置成6个和12个。

以下是离散化相关代码,四个维度的 buckets=(1, 2, 6, 12)。self.q是action value的查找表,具体类型是shape 为 (1, 2, 6, 12, 2) 的ndarray。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class CartPoleAbstractAgent(metaclass=abc.ABCMeta):
def __init__(self, buckets=(1, 2, 6, 12), discount=0.98, lr_min=0.1, epsilon_min=0.1):
self.env = gym.make('CartPole-v0')

env = self.env
# [position, velocity, angle, angular velocity]
self.dims_config = [(env.observation_space.low[0], env.observation_space.high[0], 1),
(-0.5, 0.5, 1),
(env.observation_space.low[2], env.observation_space.high[2], 6),
(-math.radians(50) / 1., math.radians(50) / 1., 12)]
self.q = np.zeros(buckets + (self.env.action_space.n,))
self.pi = np.zeros_like(self.q)
self.pi[:] = 1.0 / env.action_space.n

def to_bin_idx(self, val: float, lower: float, upper: float, bucket_num: int) -> int:
percent = (val + abs(lower)) / (upper - lower)
return min(bucket_num - 1, max(0, int(round((bucket_num - 1) * percent))))

def discretize(self, obs: np.ndarray) -> State:
discrete_states = tuple([self.to_bin_idx(obs[d], *self.dims_config[d]) for d in range(len(obs))])
return discrete_states

train() 方法串联起来 agent 和 env 交互的流程,包括从 env 得到连续状态转换成离散状态,更新 Agent 的 Q table 甚至 Agent的执行policy,choose_action会根据执行 policy 选取action。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def train(self, num_episodes=2000):
for e in range(num_episodes):
print(e)
s: State = self.discretize(self.env.reset())

self.adjust_learning_rate(e)
self.adjust_epsilon(e)
done = False

while not done:
action: Action = self.choose_action(s)
obs, reward, done, _ = self.env.step(action)
s_next: State = self.discretize(obs)
a_next = self.choose_action(s_next)
self.update_q(s, action, reward, s_next, a_next)
s = s_next

choose_action 的默认实现为基于现有 Q table 的 \(\epsilon\)-greedy 策略。

{linenos
1
2
3
4
5
def choose_action(self, state) -> Action:
if np.random.random() < self.epsilon:
return self.env.action_space.sample()
else:
return np.argmax(self.q[state])

抽象出公共的基类代码 CartPoleAbstractAgent 之后,SARSA、Q-Learning和Expected SARSA只需要复写 update_q 抽象方法即可。

{linenos
1
2
3
4
class CartPoleAbstractAgent(metaclass=abc.ABCMeta):
@abc.abstractmethod
def update_q(self, s: State, a: Action, r, s_next: State, a_next: Action):
pass

TD Learning的精髓

在上一期,本公众号 MyEncyclopedia 的21点游戏的蒙特卡洛On-Policy控制介绍了Monte Carlo方法,知道MC需要在环境中模拟直至最终结局。若记\(G_t\)为t步以后的最终return,则 MC online update 版本更新为:

\[ V(S_t) \leftarrow V(S_t) + \alpha[G_{t} - V(S_t)] \]

可以认为 \(V(S_t)\) 向着目标为 \(G_t\) 更新了一小步。

而TD方法可以只模拟下一步,得到 \(R_{t+1}\),而余下步骤的return,\(G_t - R_{t+1}\) 用已有的 \(V(S_{t+1})\) 来估计,或者统计上称作bootstrapping。这样 TD 的更新目标值变成 \(R_{t+1} + \gamma V(S_{t+1})\),整体online update 公式则为: \[ V(S_t) \leftarrow V(S_t) + \alpha[R_{t+1} + \gamma V(S_{t+1})- V(S_t)] \]

概念上,如果只使用下一步 \(R_{t+1}\) 值然后bootstrap称为 TD(0),用于区分使用多步后的reward的TD方法。另外,变化的数值 \(R_{t+1} + \gamma V(S_{t+1})- V(S_t)\) 称为TD error。

另外一个和Monte Carlo的区别在于一般TD方法保存更精细的Q值,\(Q(S_t, A_t)\),并用Q值来boostrap,而MC一般用V值也可用Q值。

SARSA: On-policy TD 控制

SARSA的命名源于一次迭代产生了五元组 \(S_t,A_t,R_{t+1},S_{t+1},A_{t+1}\)。SARSA利用五个值做 action-value的 online update:

\[ Q(S_t,A_t) \leftarrow Q(S_t,A_t) + \alpha[R_{t+1}+\gamma Q(S_{t+1}, A_{t+1}) - Q(S_t,A_t)] \]

对应的Q table更新实现为:

{linenos
1
2
3
4
class SarsaAgent(CartPoleAbstractAgent):

def update_q(self, s: State, a: Action, r, s_next: State, a_next: Action):
self.q[s][a] += self.lr * (r + self.discount * (self.q[s_next][a_next]) - self.q[s][a])

SARSA 在执行policy 后的Q值更新是对于针对于同一个policy的,完成了一次策略迭代(policy iteration),这个特点区分于后面的Q-learning算法,这也是SARSA 被称为 On-policy 的原因。下面是完整算法伪代码。

\[ \begin{align*} &\textbf{Sarsa (on-policy TD Control) for estimating } Q \approx q_{*} \\ & \text{Algorithm parameters: step size }\alpha \in ({0,1}]\text{, small }\epsilon > 0 \\ & \text{Initialize }Q(s,a), \text{for all } s \in \mathcal{S}^{+}, a \in \mathcal{A}(s) \text{, arbitrarily except that } Q(terminal, \cdot) = 0 \\ & \text{Loop for each episode:}\\ & \quad \text{Initialize }S\\ & \quad \text{Choose } A \text{ from } S \text{ using policy derived from } Q \text{ (e.g., } \epsilon\text{-greedy)} \\ & \quad \text{Loop for each step of episode:} \\ & \quad \quad \text{Take action }A, \text { observe } R, S^{\prime} \\ & \quad \quad \text{Choose }A^{\prime} \text { from } S^{\prime} \text{ using policy derived from } Q \text{ (e.g., } \epsilon\text{-greedy)} \\ & \quad \quad Q(S,A) \leftarrow Q(S,A) + \alpha[R+\gamma Q(S^{\prime}, A^{\prime}) - Q(S,A)] \\ & \quad \quad S \leftarrow S^{\prime}; A \leftarrow A^{\prime} \\ & \quad \text{until }S\text{ is terminal} \\ \end{align*} \]

SARSA 训练分析

SARSA收敛较慢,1000次episode后还无法持久稳定,后面的Q-learning 和 Expected Sarsa 都可以在1000次episode学习长时间保持不倒的状态。

Q-Learning: Off-policy TD 控制

Q-Learning 是深度学习时代前强化学习领域中的著名算法,它的 online update 公式为: \[ Q(S_t,A_t) \leftarrow Q(S_t,A_t) + \alpha[R_{t+1}+\gamma \max_{a}Q(S_{t+1}, a) - Q(S_t,A_t)] \]

对应的 update_q() 方法具体实现

{linenos
1
2
3
4
class QLearningAgent(CartPoleAbstractAgent):

def update_q(self, s: State, a: Action, r, s_next: State, a_next: Action):
self.q[s][a] += self.lr * (r + self.discount * np.max(self.q[s_next]) - self.q[s][a])

本质上用现有的Q table中最好的action来bootrap 对应的最佳Q值,推导如下:

\[ \begin{aligned} q_{*}(s, a) &=\mathbb{E}\left[R_{t+1}+\gamma \max _{a^{\prime}} q_{*}\left(S_{t+1}, a^{\prime}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\mathbb{E}[R \mid S_{t}=s, A_{t}=a] + \gamma\sum_{s^{\prime}} p\left(s^{\prime}\mid s, a\right)\max _{a^{\prime}} q_{*}\left(s^{\prime}, a^{\prime}\right) \\ &\approx r + \gamma \max _{a^{\prime}} q_{*}\left(s^{\prime}, a^{\prime}\right) \end{aligned} \]

Q-Learning 被称为 off-policy 的原因是它并没有完成一次policy iteration,而是直接用已有的 Q 来不断近似 \(Q_{*}\)

对比下面的Q-Learning 伪代码和之前的 SARSA 版本可以发现,Q-Learning少了一次模拟后的 \(A_{t+1}\),这也是Q-Learning 中执行policy和预估Q值(即off-policy)分离的一个特征。

\[ \begin{align*} &\textbf{Q-learning (off-policy TD Control) for estimating } \pi \approx \pi_{*} \\ & \text{Algorithm parameters: step size }\alpha \in ({0,1}]\text{, small }\epsilon > 0 \\ & \text{Initialize }Q(s,a), \text{for all } s \in \mathcal{S}^{+}, a \in \mathcal{A}(s) \text{, arbitrarily except that } Q(terminal, \cdot) = 0 \\ & \text{Loop for each episode:}\\ & \quad \text{Initialize }S\\ & \quad \text{Loop for each step of episode:} \\ & \quad \quad \text{Choose } A \text{ from } S \text{ using policy derived from } Q \text{ (e.g., } \epsilon\text{-greedy)} \\ & \quad \quad \text{Take action }A, \text { observe } R, S^{\prime} \\ & \quad \quad Q(S,A) \leftarrow Q(S,A) + \alpha[R+\gamma \max_{a}Q(S^{\prime}, a) - Q(S,A)] \\ & \quad \quad S \leftarrow S^{\prime}\\ & \quad \text{until }S\text{ is terminal} \\ \end{align*} \]

Q-Learning 训练分析

Q-Learning 1000次episode就可以持久稳定住。

SARSA 改进版 Expected SARSA

Expected SARSA 改进了 SARSA 的地方在于考虑到了在某一状态下的现有策略动作分布,以此来减少variance,加快收敛,具体更新规则为:

\[ \begin{aligned} Q(S_t,A_t) &\leftarrow Q(S_t,A_t) + \alpha[R_{t+1}+\gamma \mathbb{E}_{\pi}[Q(S_{t+1}, A_{t+1} \mid S_{t+1})] - Q(S_t,A_t)] \\ &\leftarrow Q(S_t,A_t) + \alpha[R_{t+1}+\gamma \sum_{a} \pi\left(a\mid S_{t+1}\right) Q(S_{t+1}, a) - Q(S_t,A_t)] \\ \end{aligned} \]

注意在实现中,update_q() 不仅更新了Q table,还显示更新了执行policy \(\pi\)

{linenos
1
2
3
4
5
6
7
8
9
class ExpectedSarsaAgent(CartPoleAbstractAgent):

def update_q(self, s: State, a: Action, r, s_next: State, a_next: Action):
self.q[s][a] = self.q[s][a] + self.lr * (r + self.discount * np.dot(self.pi[s_next], self.q[s_next]) - self.q[s][a])
# update pi[s]
best_a = np.random.choice(np.where(self.q[s] == max(self.q[s]))[0])
n_actions = self.env.action_space.n
self.pi[s][:] = self.epsilon / n_actions
self.pi[s][best_a] = 1 - (n_actions - 1) * (self.epsilon / n_actions)

同样的,Expected SARSA 1000次迭代也能比较好的学到最佳policy。

快速幂运算是一种利用位运算和DP思想求的\(x^n\)的数值算法,它将时间复杂度\(O(n)\)降到\(O(log(n))\)。快速幂运算结合矩阵乘法,可以巧解不少DP问题。本篇会由浅入深,从最基本的快速幂运算算法,到应用矩阵快速幂运算解DP问题,结合三道Leetcode题目来具体讲解。

Leetcode 50. Pow(x, n) (Medium)

Leetcode 50. Pow(x, n) 是实数的快速幂运算问题,题目如下。

Implement pow(x, n), which calculates x raised to the power n (i.e. \(x^n\)).

Example 1:

1
2
Input: x = 2.00000, n = 10
Output: 1024.00000

Example 2:

1
2
Input: x = 2.10000, n = 3
Output: 9.26100

Example 3:

1
2
3
Input: x = 2.00000, n = -2
Output: 0.25000
Explanation: 2-2 = 1/22 = 1/4 = 0.25

快速幂运算解法分析

假设n是32位的int类型,将n写成二进制形式,那么n可以写成最多32个某位为 1(第k位为1则值为\(2^k\))的和。那么\(x^n\)最多可以由32个 \(x^{2^k}\)的乘积组合,例如:

\[ x^{\text{10011101}_{2}} = x^{1} \times x^{\text{100}_{2}} \times x^{\text{1000}_{2}} \times x^{\text{10000}_{2}} \times x^{\text{10000000}_{2}} \]

快速幂运算的特点就是通过32次循环,每次循环根据上轮\(x^{2^k}\)的值进行平方后得出这一轮的值:\(x^{2^k} \times x^{2^k} = x^{2^{k+1}}\),即循环计算出如下数列

\[ x^{1}, x^2=x^{\text{10}_{2}}, x^4=x^{\text{100}_{2}}, x^8=x^{\text{1000}_{2}}, x^{16}=x^{\text{10000}_{2}}, ..., x^{128} = x^{\text{10000000}_{2}} \]

在循环时,如果n的二进制形式在本轮对应的位的值是1,则将这次结果累乘计入最终结果。

下面是python 3 的代码,由于循环为32次,所以容易看出算法复杂度为 \(O(log(n))\)

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# AC
# Runtime: 32 ms, faster than 54.28% of Python3 online submissions for Pow(x, n).
# Memory Usage: 14.2 MB, less than 5.04% of Python3 online submissions for Pow(x, n).

class Solution:
def myPow(self, x: float, n: int) -> float:
ret = 1.0
i = abs(n)
while i != 0:
if i & 1:
ret *= x
x *= x
i = i >> 1
return 1.0 / ret if n < 0 else ret

对应的 Java 的代码。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// AC
// Runtime: 1 ms, faster than 42.98% of Java online submissions for Pow(x, n).
// Memory Usage: 38.7 MB, less than 48.31% of Java online submissions for Pow(x, n).

class Solution {
public double myPow(double x, int n) {
double ret = 1.0;
long i = Math.abs((long) n);
while (i != 0) {
if ((i & 1) > 0) {
ret *= x;
}
x *= x;
i = i >> 1;
}

return n < 0 ? 1.0 / ret : ret;
}
}

矩阵快速幂运算

快速幂运算也可以应用到计算矩阵的幂,即上面的x从实数变为方形矩阵。实现上,矩阵的幂需要矩阵乘法:$ A_{r c} B_{c p}$ ,Python中可以用numpy的 np.matmul(A, B)来完成,而Java版本中我们手动实现简单的矩阵相乘算法,从三重循环看出其算法复杂度为\(O(r \times c \times p)\)

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public int[][] matrixProd(int[][] A, int[][] B) {
int R = A.length;
int C = B[0].length;
int P = A[0].length;
int[][] ret = new int[R][C];
for (int r = 0; r < R; r++) {
for (int c = 0; c < C; c++) {
for (int p = 0; p < P; p++) {
ret[r][c] += A[r][p] * B[p][c];
}
}
}
return ret;
}

Leetcode 509. Fibonacci Number (Easy)

有了快速矩阵幂运算,我们来看看如何具体解题。Fibonacci问题作为最基本的DP问题,在上一篇Leetcode 679 24 Game 的 Python 函数式实现中我们用python独有的yield来巧解,这次再拿它来做演示。

The Fibonacci numbers, commonly denoted F(n) form a sequence, called the Fibonacci sequence, such that each number is the sum of the two preceding ones, starting from 0 and 1. That is,

1
2
F(0) = 0,   F(1) = 1
F(N) = F(N - 1) + F(N - 2), for N > 1.

Given N, calculate F(N).

Example 1:

1
2
3
Input: 2
Output: 1
Explanation: F(2) = F(1) + F(0) = 1 + 0 = 1.

Example 2:

1
2
3
Input: 3
Output: 2
Explanation: F(3) = F(2) + F(1) = 1 + 1 = 2.

Example 3:

1
2
3
Input: 4
Output: 3
Explanation: F(4) = F(3) + F(2) = 2 + 1 = 3.

转换为矩阵幂运算

Fibonacci的二阶递推式如下:

\[ \begin{align*} F(n) =& F(n-1) + F(n-2) \\ F(n-1) =& F(n-1) \end{align*} \]

等价的矩阵递推形式为:

\[ \begin{bmatrix}F(n)\\F(n-1)\end{bmatrix} = \begin{bmatrix}1 & 1\\1 & 0\end{bmatrix} \begin{bmatrix}F(n-1)\\F(n-2)\end{bmatrix} \]

也就是每轮左乘一个2维矩阵。其循环形式为,即矩阵幂的形式:

\[ \begin{bmatrix}F(n)\\F(n-1)\end{bmatrix} = \begin{bmatrix}1 & 1\\1 & 0\end{bmatrix}^{n-1} \begin{bmatrix}F(1)\\F(0)\end{bmatrix} \]

AC代码

有了上面的矩阵幂公式,代码稍作改动即可。Java 版本代码。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* AC
* Runtime: 0 ms, faster than 100.00% of Java online submissions for Fibonacci Number.
* Memory Usage: 37.9 MB, less than 18.62% of Java online submissions for Fibonacci Number.
*
* Method: Matrix Fast Power Exponentiation
* Time Complexity: O(log(N))
**/
class Solution {
public int fib(int N) {
if (N <= 1) {
return N;
}
int[][] M = {{1, 1}, {1, 0}};
// powers = M^(N-1)
N--;
int[][] powerDouble = M;
int[][] powers = {{1, 0}, {0, 1}};
while (N > 0) {
if (N % 2 == 1) {
powers = matrixProd(powers, powerDouble);
}
powerDouble = matrixProd(powerDouble, powerDouble);
N = N / 2;
}

return powers[0][0];
}

public int[][] matrixProd(int[][] A, int[][] B) {
int R = A.length;
int C = B[0].length;
int P = A[0].length;
int[][] ret = new int[R][C];
for (int r = 0; r < R; r++) {
for (int c = 0; c < C; c++) {
for (int p = 0; p < P; p++) {
ret[r][c] += A[r][p] * B[p][c];
}
}
}
return ret;
}

}

Python 3的numpy.matmul() 版本代码。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# AC
# Runtime: 256 ms, faster than 26.21% of Python3 online submissions for Fibonacci Number.
# Memory Usage: 29.4 MB, less than 5.25% of Python3 online submissions for Fibonacci Number.

class Solution:

def fib(self, N: int) -> int:
if N <= 1:
return N

import numpy as np
F = np.array([[1, 1], [1, 0]])

N -= 1
powerDouble = F
powers = np.array([[1, 0], [0, 1]])
while N > 0:
if N % 2 == 1:
powers = np.matmul(powers, powerDouble)
powerDouble = np.matmul(powerDouble, powerDouble)
N = N // 2

return powers[0][0]

或者也可以直接调用numpy.matrix_power() 代替手动的快速矩阵幂运算。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# AC
# Runtime: 116 ms, faster than 26.25% of Python3 online submissions for Fibonacci Number.
# Memory Usage: 29.2 MB, less than 5.25% of Python3 online submissions for Fibonacci Number.

class Solution:

def fib(self, N: int) -> int:
if N <= 1:
return N

from numpy.linalg import matrix_power
import numpy as np
F = np.array([[1, 1], [1, 0]])
F = matrix_power(F, N - 1)

return F[0][0]

Leetcode 1411. Number of Ways to Paint N × 3 Grid (Hard)

下面来看一道稍难一点的DP问题,1411. Number of Ways to Paint N × 3 Grid

You have a grid of size n x 3 and you want to paint each cell of the grid with exactly one of the three colours: Red, Yellow or Green while making sure that no two adjacent cells have the same colour (i.e no two cells that share vertical or horizontal sides have the same colour).

You are given n the number of rows of the grid.

Return the number of ways you can paint this grid. As the answer may grow large, the answer must be computed modulo 10^9 + 7.

Example 1:

1
2
3
Input: n = 1
Output: 12
Explanation: There are 12 possible way to paint the grid as shown:

Example 2:

1
2
Input: n = 2
Output: 54

Example 3:

1
2
Input: n = 3
Output: 246

Example 4:

1
2
Input: n = 7
Output: 106494

Example 5:

1
2
Input: n = 5000
Output: 30228214

标准DP解法

分析题目容易发现第i行的状态只取决于第i-1行的状态,第i行会有两种不同状态:三种颜色都有或者只有两种颜色。这个问题容易识别出是经典的双状态DP问题,那么我们定义dp2[i]为第i行只有两种颜色的数量,dp3[i]为第i行有三种颜色的数量。

先考虑dp3[i]和i-1行的关系。假设第i行包含3种颜色,即dp3[i],假设具体颜色为红,绿,黄,若i-1行包含两种颜色(即dp2[i-1]),此时dp2[i-1]只有以下2种可能:

dp2[i-1] -> dp3[i]
还是dp3[i] 红,绿,黄情况,若i-1行包含三种颜色(从dp3[i-1]转移过来),此时dp3[i-1]也只有以下2种可能:
dp3[i-1] -> dp3[i]

因此,dp3[i]= dp2[i-1] * 2 + dp3[i-1] * 2。

同理,若第i行包含两种颜色,即dp2[i],假设具体颜色为绿,黄,绿,若i-1行是两种颜色(dp2[i-1]),此时dp2[i-1]有如下3种可能:

dp2[i-1] -> dp2[i]
dp2[i]的另一种情况是由dp3[i-1]转移过来,则dp3[i-1]有2种可能,枚举如下:
dp3[i-1] -> dp2[i]

因此,dp2[i] = dp2[i-1] * 3 + dp3[i-1] * 2。 初始值dp2[1] = 6,dp3[1] = 6,最终答案为dp2[i] + dp3[i]。

很容易写出普通DP版本的Python 3代码,时间复杂度为\(O(n)\)

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
# AC
# Runtime: 36 ms, faster than 98.88% of Python3 online submissions for Number of Ways to Paint N × 3 Grid.
# Memory Usage: 13.9 MB, less than 58.66% of Python3 online submissions for Number of Ways to Paint N × 3 Grid.

class Solution:
def numOfWays(self, n: int) -> int:
MOD = 10 ** 9 + 7
dp2, dp3 = 6, 6
n -= 1
while n > 0:
dp2, dp3 = (dp2 * 3 + dp3 * 2) % MOD, (dp2 * 2 + dp3 * 2) % MOD
n -= 1
return (dp2 + dp3) % MOD

快速矩阵幂运算解法

和Fibonacci一样,我们将DP状态转移方程转换成矩阵乘法:

\[ \begin{bmatrix}dp2(n)\\dp3(n)\end{bmatrix} = \begin{bmatrix}3 & 2\\2 & 2\end{bmatrix} \begin{bmatrix}dp2(n-1)\\dp3(n-1)\end{bmatrix} \]

代入初始值,转换成矩阵幂形式

\[ \begin{bmatrix}dp2(n)\\dp3(n)\end{bmatrix} = \begin{bmatrix}3 & 2\\2 & 2\end{bmatrix}^{n-1}\begin{bmatrix}6\\6\end{bmatrix} \]

代码几乎和Fibonacci一模一样,仅仅多了mod 计算。下面是Java版本。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

/**
AC
Runtime: 0 ms, faster than 100.00% of Java online submissions for Number of Ways to Paint N × 3 Grid.
Memory Usage: 35.7 MB, less than 97.21% of Java online submissions for Number of Ways to Paint N × 3 Grid.
**/

class Solution {
public int numOfWays(int n) {
long MOD = (long) (1e9 + 7);
long[][] ret = {{6, 6}};
long[][] m = {{3, 2}, {2, 2}};
n -= 1;
while(n > 0) {
if ((n & 1) > 0) {
ret = matrixProd(ret, m, MOD);
}
m = matrixProd(m, m, MOD);
n >>= 1;
}
return (int) ((ret[0][0] + ret[0][1]) % MOD);

}

public long[][] matrixProd(long[][] A, long[][] B, long MOD) {
int R = A.length;
int C = B[0].length;
int P = A[0].length;
long[][] ret = new long[R][C];
for (int r = 0; r < R; r++) {
for (int c = 0; c < C; c++) {
for (int p = 0; p < P; p++) {
ret[r][c] += A[r][p] * B[p][c];
ret[r][c] = ret[r][c] % MOD;
}
}
}
return ret;
}

}

Python 3实现为

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# AC
# Runtime: 88 ms, faster than 39.07% of Python3 online submissions for Number of Ways to Paint N × 3 Grid.
# Memory Usage: 30.2 MB, less than 11.59% of Python3 online submissions for Number of Ways to Paint N × 3 Grid.

class Solution:
def numOfWays(self, n: int) -> int:
import numpy as np

MOD = int(1e9 + 7)
ret = np.array([[6, 6]])
m = np.array([[3, 2], [2, 2]])

n -= 1
while n > 0:
if n % 2 == 1:
ret = np.matmul(ret, m) % MOD
m = np.matmul(m, m) % MOD
n = n // 2
return int((ret[0][0] + ret[0][1]) % MOD)

本篇是TSP问题从DP算法到深度学习系列第二篇。

AIZU TSP 自底向上迭代DP解

上一篇中,我们用Python 3和Java 8完成了自顶向下递归版本的DP解。我们继续改进代码,将它转换成标准DP方式:自底向上的迭代DP版本。下图是3个点TSP问题的递归调用图。

将这个图反过来检查状态的依赖关系,可以很容易发现规律:首先计算状态位含有一个1的点,接着是两个1的节点,最后是状态位三个1的点。简而言之,在计算状态位为n+1个1的节点时需要用到n个1的节点的计算结果,如果能依照这样的 topological 顺序来的话,就可以去除递归,写成迭代(循环)版本的DP。

迭代算法的Java 伪代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for (int bitset_num = N; bitset_num >=0; bitset_num++) {
while(hasNextCombination(bitset_num)) {
int state = nextCombination(bitset_num);
// compute dp[state][v], v-th bit is set in state
for (int v = 0; v < n; v++) {
for (int u = 0; u < n; u++) {
// for each u not reached by this state
if (!include(state, u)) {
dp[state][v] = min(dp[state][v],
dp[new_state_include_u][u] + dist[v][u]);
}
}
}
}
}

举例来说,dp[00010][1] 是从顶点0出发,刚经过顶点1的最小距离 \(0 \rightarrow 1 \rightarrow ? \rightarrow ? \rightarrow ? \rightarrow 0\)

为了找到最小距离值,就必须遍历所有可能的下一个可能的顶点u (第一个问号位置)。 \[ (0 \rightarrow 1) + \begin{align*} \min \left\lbrace \begin{array}{r@{}l} 2 \rightarrow ? \rightarrow ? \rightarrow 0 + dist(1,2) \qquad\text{ new_state=[00110][2] } \qquad\\\\ 3 \rightarrow ? \rightarrow ? \rightarrow 0 + dist(1,3) \qquad\text{ new_state=[01010][3] } \qquad\\\\ 4 \rightarrow ? \rightarrow ? \rightarrow 0 + dist(1,4) \qquad\text{ new_state=[10010][4] } \qquad \end{array} \right. \end{align*} \]

迭代DP AC代码

以下是AC 的Java 算法核心代码。完整代码在 github/MyEncyclopedia 的tsp/alg_aizu/Main_loop.java

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public long solve() {
int N = g.V_NUM;
long[][] dp = new long[1 << N][N];
// init dp[][] with MAX
for (int i = 0; i < dp.length; i++) {
Arrays.fill(dp[i], Integer.MAX_VALUE);
}
dp[(1 << N) - 1][0] = 0;

for (int state = (1 << N) - 2; state >= 0; state--) {
for (int v = 0; v < N; v++) {
for (int u = 0; u < N; u++) {
if (((state >> u) & 1) == 0) {
dp[state][v] = Math.min(dp[state][v], dp[state | 1 << u][u] + g.edges[v][u]);
}
}
}
}
return dp[0][0] == Integer.MAX_VALUE ? -1 : dp[0][0];
}

很显然,时间算法复杂度对应了三重 for 循环,为 O(\(2^n * n * n\)) = O(\(2^n*n^2\) )。

类似的,Python 3 AC 代码如下。完整代码在 github/MyEncyclopedia 的tsp/alg_aizu/TSP_loop.py

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TSPSolver:
g: Graph

def __init__(self, g: Graph):
self.g = g

def solve(self) -> int:
"""
:param v:
:param state:
:return: -1 means INF
"""
N = self.g.v_num
dp = [[INT_INF for c in range(N)] for r in range(1 << N)]

dp[(1 << N) - 1][0] = 0

for state in range((1 << N) - 2, -1, -1):
for v in range(N):
for u in range(N):
if ((state >> u) & 1) == 0:
if dp[state | 1 << u][u] != INT_INF and self.g.edges[v][u] != INT_INF:
if dp[state][v] == INT_INF:
dp[state][v] = dp[state | 1 << u][u] + self.g.edges[v][u]
else:
dp[state][v] = min(dp[state][v], dp[state | 1 << u][u] + self.g.edges[v][u])
return dp[0][0]

一个欧式空间TSP数据集

至此,TSP的DP解法全部讲解完毕。接下去,我们引入一个二维欧式空间的TSP数据集 PTR_NET on Google Drive ,这个数据集是 Pointer Networks 的作者 Oriol Vinyals 用于模型的训练测试而引入的。

数据集的每一行格式如下:

1
x1, y1, x2, y2, ... output 1 v1 v2 v3 ... 1

一行开始为n个点的x, y坐标,接着是 output,再接着是1,表示从顶点1出发,经v1,v2,...,返回1,注意顶点编号从1开始。

十个顶点数据集的一些数据示例如下:

1
2
3
4
0.607122 0.664447 0.953593 0.021519 0.757626 0.921024 0.586376 0.433565 0.786837 0.052959 0.016088 0.581436 0.496714 0.633571 0.227777 0.971433 0.665490 0.074331 0.383556 0.104392 output 1 3 8 6 10 9 5 2 4 7 1 
0.930534 0.747036 0.277412 0.938252 0.794592 0.794285 0.961946 0.261223 0.070796 0.384302 0.097035 0.796306 0.452332 0.412415 0.341413 0.566108 0.247172 0.890329 0.429978 0.232970 output 1 3 2 9 6 5 8 7 10 4 1
0.686712 0.087942 0.443054 0.277818 0.494769 0.985289 0.559706 0.861138 0.532884 0.351913 0.712561 0.199273 0.554681 0.657214 0.909986 0.277141 0.931064 0.639287 0.398927 0.406909 output 1 5 2 10 7 4 3 9 8 6 1

画出第一个例子的全部顶点和边。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import matplotlib.pyplot as plt
points='0.607122 0.664447 0.953593 0.021519 0.757626 0.921024 0.586376 0.433565 0.786837 0.052959 0.016088 0.581436 0.496714 0.633571 0.227777 0.971433 0.665490 0.074331 0.383556 0.104392'
float_list = list(map(lambda x: float(x), points.split(' ')))

x,y = [],[]
for idx, p in enumerate(float_list):
if idx % 2 == 0:
x.append(p)
else:
y.append(p)

for i in range(0, len(x)):
for j in range(0, len(x)):
if i == j:
continue
plt.plot((x[i],x[j]),(y[i],y[j]))

plt.show()
全连接的图

这个例子的最短TSP旅程为 \[ 1 \rightarrow 3 \rightarrow 8 \rightarrow 6 \rightarrow 10 \rightarrow 9 \rightarrow 5 \rightarrow 2 \rightarrow 4 \rightarrow 7 \rightarrow 1 \]

{linenos
1
2
3
4
5
6
7
8
tour_str = '1 3 8 6 10 9 5 2 4 7 1'
tour = list(map(lambda x: int(x), tour_str.split(' ')))

for i in range(0, len(tour)-1):
p1 = tour[i] - 1
p2 = tour[i + 1] - 1
plt.plot((x[p1],x[p2]),(y[p1],y[p2]))
plt.show()
最短路径

PTR_NET TSP 的Python代码

初始化Init Graph Edges

在之前的自顶向下的递归版本中,需要做一些改动。首先,是图的初始化,我们依然延续之前的邻接矩阵来表示,由于这次的图是无向图,对于任意两个顶点,需要初始化双向的边。

{linenos
1
2
3
4
5
6
7
8
g: Graph = Graph(N)
for v in range(N):
for u in range(N):
diff_x = coordinates[v][0] - coordinates[u][0]
diff_y = coordinates[v][1] - coordinates[u][1]
dist: float = math.sqrt(diff_x * diff_x + diff_y * diff_y)
g.setDist(u, v, dist)
g.setDist(v, u, dist)

辅助变量记录父节点

另一大改动是需要在遍历过程中保存的顶点关联信息,以便在最终找到最短路径值时可以回溯对应的完整路径。在下面代码中,使用parent[bitstate][v] 来保存此状态下最小路径对应的顶点u。

{linenos
1
2
3
4
5
6
7
8
9
10
11
ret: float = FLOAT_INF
u_min: int = -1
for u in range(self.g.v_num):
if (state & (1 << u)) == 0:
s: float = self._recurse(u, state | 1 << u)
if s + edges[v][u] < ret:
ret = s + edges[v][u]
u_min = u
dp[state][v] = ret
self.parent[state][v] = u_min

当最终最短行程确定后,根据parent的信息可以按图索骥找到完整的行程顶点信息。

{linenos
1
2
3
4
5
6
7
8
9
def _form_tour(self):
self.tour = [0]
bit = 0
v = 0
for _ in range(self.g.v_num - 1):
v = self.parent[bit][v]
self.tour.append(v)
bit = bit | (1 << v)
self.tour.append(0)

需要注意的是,有可能存在多个最短行程,它们的距离值是一致的。这种情况下,代码输出的最短路径可能和数据集output后行程路径不一致,但是的两者的总距离是一致的。下面的代码验证了这一点。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
tsp: TSPSolver = TSPSolver(g)
tsp.solve()

output_dist: float = 0.0
output_tour = list(map(lambda x: int(x) - 1, output.split(' ')))
for v in range(1, len(output_tour)):
pre_v = output_tour[v-1]
curr_v = output_tour[v]
diff_x = coordinates[pre_v][0] - coordinates[curr_v][0]
diff_y = coordinates[pre_v][1] - coordinates[curr_v][1]
dist: float = math.sqrt(diff_x * diff_x + diff_y * diff_y)
output_dist += dist

passed = abs(tsp.dist - output_dist) < 10e-5
if passed:
print(f'passed dist={tsp.tour}')
else:
print(f'Min Tour Distance = {output_dist}, Computed Tour Distance = {tsp.dist}, Expected Tour = {output_tour}, Result = {tsp.tour}')

本文所有代码在 github/MyEncyclopedia tsp/alg_plane 中。

上一期 通过代码学Sutton强化学习1:Grid World OpenAI环境和策略评价算法,我们引入了 Grid World 问题,实现了对应的OpenAI Gym 环境,也分析了其最佳策略和对应的V值。这一期中,继续通过这个例子详细讲解策略提升(Policy Improvment)、策略迭代(Policy Iteration)、值迭代(Value Iteration)和异步迭代方法。

回顾 Grid World 问题

Grid World 问题
在Grid World 中,Agent初始可以出现在编号1-14的网格中,Agent 每往四周走一步得到 -1 reward,因此需要尽快走到两个出口。当然最佳策略是以最小步数往出口逃离,如下所示。
Grid World 最佳策略

最佳策略对应的状态V值和3D heatmap如下

1
2
3
4
[[ 0. -1. -2. -3.]
[-1. -2. -3. -2.]
[-2. -3. -2. -1.]
[-3. -2. -1. 0.]]

Grid World V值 3D heatmap

策略迭代

上一篇中,我们知道如何evaluate 给定policy \(\pi\)\(v_{\pi}\)值,那么是否可能在此基础上改进生成更好的策略 \(\pi^{\prime}\)。如果可以,能否最终找到最佳策略\({\pi}_{*}\)?答案是肯定的,因为存在策略提升定理(Policy Improvement Theorem)。

策略提升定理

在 4.2 节 Policy Improvement Theorem 可以证明,利用 \(v_{\pi}\) 信息对于每个状态采取最 greedy 的 action (又称exploitation)能够保证生成的新 \({\pi}^{\prime}\) 是不差于旧的policy \({\pi}\),即

\[ q_{\pi}(s, {\pi}^{\prime}(s)) \gt v_{\pi}(s) \]

\[ v_{\pi^{\prime}}(s) \gt v_{\pi}(s) \]

因此,可以通过在当前policy求得v值,再选取最greedy action的方式形成如下迭代,就能够不断逼近最佳策略。

\[ \pi_{0} \stackrel{\mathrm{E}}{\longrightarrow} v_{\pi_{0}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{1} \stackrel{\mathrm{E}}{\longrightarrow} v_{\pi_{1}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{2} \stackrel{\mathrm{E}}{\longrightarrow} \cdots \stackrel{\mathrm{I}}{\longrightarrow} \pi_{*} \stackrel{\mathrm{E}}{\longrightarrow} v_{*} \]

策略迭代算法

以下为书中4.3的policy iteration伪代码。其中policy evaluation的算法在上一篇中已经实现。Policy improvement 的精髓在于一次遍历所有状态后,通过policy 的最大Q值找到该状态的最佳action,并更新成最新policy,循环直至没有 action 变更。

\[ \begin{align*} &\textbf{Policy Iteration (using iterative policy evaluation) for estimating } \pi\approx {\pi}_{*} \\ &1. \quad \text{Initialization} \\ & \quad \quad V(s) \in \mathbb R\text{ and } \pi(s) \in \mathcal A(s) \text{ arbitrarily for all }s \in \mathcal{S} \\ & \\ &2. \quad \text{Policy Evaluation} \\ & \quad \quad \text{Loop:}\\ & \quad \quad \Delta \leftarrow 0\\ & \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ & \quad \quad \quad \quad v \leftarrow V(s) \\ & \quad \quad \quad \quad V(s) \leftarrow \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ & \quad \quad \quad \quad \Delta \leftarrow \max(\Delta, |v-V(s)|) \\ & \quad \quad \text{until } \Delta < \theta \text{ (a small positive number determining the accuracy of estimation)}\\ & \\ &3. \quad \text{Policy Improvement} \\ & \quad \quad policy\text{-}stable\leftarrow true \\ & \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ & \quad \quad \quad \quad old\text{-}action\leftarrow \pi(s) \\ & \quad \quad \quad \quad \pi(s) \leftarrow \operatorname{argmax}_{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ & \quad \quad \quad \quad \text{If } old\text{-}action \neq \pi\text{,then }policy\text{-}stable\leftarrow false \\ & \quad \quad \text{If } policy\text{-}stable \text{, then stop and return }V \approx v_{*} \text{ and } \pi\approx {\pi}_{*}\text{; else go to 2} \end{align*} \]

注意到状态Q值 \(q_{\pi}(s, a)\) 会被多处调用,将其封装为单独的函数。

\[ \begin{aligned} q_{\pi}(s, a) &=\sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{\pi}\left(s^{\prime}\right)\right] \end{aligned} \]

Q值函数实现如下:

{linenos
1
2
3
4
5
6
def action_value(env: GridWorldEnv, state: State, V: StateValue, gamma=1.0) -> ActionValue:
q = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[state][a]:
q[a] += prob * (reward + gamma * V[next_state])
return q

有了 action_value 和上期的 policy_evaluate,policy iteration 实现完全对应上面的伪代码。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def policy_improvement(env: GridWorldEnv, policy: Policy, V: StateValue, gamma=1.0) -> bool:
policy_stable = True

for s in range(env.nS):
old_action = np.argmax(policy[s])
Q_s = action_value(env, s, V)
best_action = np.argmax(Q_s)
policy[s] = np.eye(env.nA)[best_action]

if old_action != best_action:
policy_stable = False
return policy_stable


def policy_iteration(env: GridWorldEnv, policy: Policy, gamma=1.0) -> Tuple[Policy, StateValue]:
iter = 0
while True:
V = policy_evaluate(policy, env, gamma)
policy_stable = policy_improvement(env, policy, V)
iter += 1

if policy_stable:
return policy, V

Grid World 例子通过两轮迭代就可以收敛,以下是初始时随机策略的V值和第一次迭代后的V值。
初始随机策略 V 值
第一次迭代后的 V 值

值迭代

值迭代( Value Iteration)的本质是,将policy iteration中的policy evaluation过程从不断循环到收敛直至小于theta,改成只执行一遍,并直接用最佳Q值更新到状态V值,如此可以不用显示地算出\({\pi}\) 而直接在V值上迭代。具体迭代公式如下:

\[ \begin{aligned} v_{k+1}(s) & \doteq \max _{a} \mathbb{E}\left[R_{t+1}+\gamma v_{k}\left(S_{t+1}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\max_{a} q_{\pi_k}(s, a) \\ &=\max _{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{k}\left(s^{\prime}\right)\right] \end{aligned} \]

完整的伪代码为:

\[ \begin{align*} &\textbf{Value Iteration, for estimating } \pi\approx \pi_{*} \\ & \text{Algorithm parameter: a small threshold } \theta > 0 \text{ determining accuracy of estimation} \\ & \text{Initialize } V(s), \text{for all } s \in \mathcal{S}^{+} \text{, arbitrarily except that } V (terminal) = 0\\ & \\ &1: \text{Loop:}\\ &2: \quad \quad \Delta \leftarrow 0\\ &3: \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ &4: \quad \quad \quad \quad v \leftarrow V(s) \\ &5: \quad \quad \quad \quad V(s) \leftarrow \operatorname{max}_{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ &6: \quad \quad \quad \quad \Delta \leftarrow \max(\Delta, |v-V(s)|) \\ &7: \text{until } \Delta < \theta \\ & \\ & \text{Output a deterministic policy, }\pi\approx \pi_{*} \text{, such that} \\ & \quad \quad \pi(s) \leftarrow \operatorname{argmax}_{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \end{align*} \]

代码实现也比较直接,可以复用上面已经实现的 action_value 函数。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def value_iteration(env:GridWorldEnv, gamma=1.0, theta=0.0001) -> Tuple[Policy, StateValue]:
V = np.zeros(env.nS)
while True:
delta = 0
for s in range(env.nS):
action_values = action_value(env, s, V, gamma=gamma)
best_action_value = np.max(action_values)
delta = max(delta, np.abs(best_action_value - V[s]))
V[s] = best_action_value
if delta < theta:
break

policy = np.zeros([env.nS, env.nA])
for s in range(env.nS):
action_values = action_value(env, s, V, gamma=gamma)
best_action = np.argmax(action_values)
policy[s, best_action] = 1.0

return policy, V

异步迭代

在第4.5节中提到了DP迭代方式的改进版:异步方式迭代(Asychronous Iteration)。这里的异步是指每一轮无需全部扫一遍所有状态,而是根据上一轮变化的状态决定下一轮需要最多计算的状态数,类似于Dijkstra最短路径算法中用 heap 来维护更新节点集合,减少运算量。下面我们通过异步值迭代来演示异步迭代的工作方式。

下图表示状态的变化方向,若上一轮 \(V(s)\) 发生更新,那么下一轮就要考虑状态 s 可能会影响到上游状态的集合( p1,p2),避免下一轮必须遍历所有状态的V值计算。

Async 反向传播

要做到部分更新就必须知道每个状态可能影响到的上游状态集合,上图对应的映射关系可以表示为

\[ \begin{align*} s'_1 &\rightarrow \{s\} \\ s'_2 &\rightarrow \{s\} \\ s &\rightarrow \{p_1, p_2\} \end{align*} \]

建立映射关系的代码如下,build_reverse_mapping 返回类型为 Dict[State, Set[State]]。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
def build_reverse_mapping(env:GridWorldEnv) -> Dict[State, Set[State]]:
MAX_R, MAX_C = env.shape[0], env.shape[1]
mapping = {s: set() for s in range(0, MAX_R * MAX_C)}
action_delta = {Action.UP: (-1, 0), Action.DOWN: (1, 0), Action.LEFT: (0, -1), Action.RIGHT: (0, 1)}
for s in range(0, MAX_R * MAX_C):
r = s // MAX_R
c = s % MAX_R
for a in list(Action):
neighbor_r = min(MAX_R - 1, max(0, r + action_delta[a][0]))
neighbor_c = min(MAX_C - 1, max(0, c + action_delta[a][1]))
s_ = neighbor_r * MAX_R + neighbor_c
mapping[s_].add(s)
return mapping

有了描述状态依赖的映射 dict 后,代码也比较简洁,changed_state_set 变量保存了这轮必须计算的状态集合。新的一轮迭代时,将下一轮需要计算的状态保存到 changed_state_set_ 中,本轮结束后,changed_state_set 更新成changed_state_set_,开始下一轮循环直至没有状态需要更新。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def value_iteration_async(env:GridWorldEnv, gamma=1.0, theta=0.0001) -> Tuple[Policy, StateValue]:
mapping = build_reverse_mapping(env)

V = np.zeros(env.nS)
changed_state_set = set(s for s in range(env.nS))

iter = 0
while len(changed_state_set) > 0:
changed_state_set_ = set()
for s in changed_state_set:
action_values = action_value(env, s, V, gamma=gamma)
best_action_value = np.max(action_values)
v_diff = np.abs(best_action_value - V[s])
if v_diff > theta:
changed_state_set_.update(mapping[s])
V[s] = best_action_value
changed_state_set = changed_state_set_
iter += 1

policy = np.zeros([env.nS, env.nA])
for s in range(env.nS):
action_values = action_value(env, s, V, gamma=gamma)
best_action = np.argmax(action_values)
policy[s, best_action] = 1.0

return policy, V
比较值迭代和异步值迭代方法后发现,值迭代用了4次循环,每次涉及所有状态,总计算状态数为 4 x 16 = 64。异步值迭代也用了4次循环,但是总计更新了54个状态。由于Grid World 的状态数很少,异步值迭代优势并不明显,但是对于状态数众多并且迭代最终集中在少部分状态的环境下,节省的计算量还是很可观的。

经典教材Reinforcement Learning: An Introduction 第二版由强化领域权威Richard S. Sutton 和 Andrew G. Barto 完成编写,内容深入浅出,非常适合初学者。在本篇中,引入Grid World示例,结合强化学习核心概念,并用python代码实现OpenAI Gym的模拟环境,进一步实现策略评价算法。

Grid World 问题

第四章例子4.1提出了一个简单的离散空间状态问题:Grid World,其大致意思是在4x4的网格世界中有14个格子是非终点状态,在这些非终点状态的格子中可以往上下左右四个方向走,直至走到两个终点状态格子,则游戏结束。每走一步,Agent收获reward -1,表示Agent希望在Grid World中尽早出去。另外,Agent在Grid World边缘时,无法继续往外只能呆在原地,reward也是-1。

Finite MDP 模型

先来回顾一下强化学习的建模基础:有限马尔可夫决策过程(Finite Markov Decision Process, Finite MDP)。如下图,强化学习模型将世界抽象成两个实体,强化学习解决目标的主体Agent和其他外部环境。它们之间的交互过程遵从有限马尔可夫决策过程:若Agent在t时间步骤时处于状态 \(S_t\),采取动作 \(A_t\),然后环境根据自身机制,产生Reward \(R_{t+1}\) 并将Agent状态变为 \(S_{t+1}\)

环境自身机制又称为dynamics,工程上可以看成一个输入(S, A),输出(S, R)的方法。由于MDP包含随机过程,某个输入并不能确定唯一输出,而会根据概率分布输出不同的(S, R)。Finite MDP简化了时间对于模型的影响,因为(S, R)只和(S, A)有关,不和时间t有关。另外,有限指的是S,A,R的状态数量是有限的。

数学上dynamics可以如下表示

\[ p\left(s^{\prime}, r \mid s, a\right) \doteq \operatorname{Pr}\left\{S_{t}=s^{\prime}, R_{t}=r \mid S_{t-1}=s, A_{t-1}=a\right\} \]

即是四元组作为输入的概率函数 \(p: S \times R \times S \times A \rightarrow [0, 1]\)

满足 \[ \sum_{s^{\prime} \in \mathcal{S}} \sum_{r \in \mathcal{R}} p\left(s^{\prime}, r \mid s, a\right)=1, \text { for all } s \in \mathcal{S}, a \in \mathcal{A}(s) \]

以Grid World为例,当Agent处于编号1的网格时,可以往四个方向走,往任意方向走都只产生一种 S, R,因为这个简单的游戏是确定性的,不存在某一动作导致stochastic状态。例如,在1号网格往左就到了终点网格(编号0),得到Reward -1这个规则可以如下表示 \[ p\left(s^{\prime}=0, r=-1 \mid s=1, a=\text{L}\right) = 1 \] 因此,状态s=1的所有dynamics概率映射为

\[ \begin{aligned} p\left(s^{\prime}=0, r=-1 \mid s=1, a=\text{L}\right) &=& 1 \\ p\left(s^{\prime}=2, r=-1 \mid s=1, a=\text{R}\right) &=& 1 \\ p\left(s^{\prime}=1, r=-1 \mid s=1, a=\text{U}\right) &=& 1 \\ p\left(s^{\prime}=5, r=-1 \mid s=1, a=\text{D}\right) &=& 1 \end{aligned} \]

强化学习的目的

在给定了问题以及定义了强化学习的模型之后,强化学习的目的当然是通过学习让Agent能够学到最佳策略\(\pi_{*}\),也就是在某个状态下的行动分布,记成 \(\pi(a|s)\)。对应在数值上的优化目标是Agent在一系列过程中采取某种策略的reward总和的期望(Expected Return)。下面公式定义了t步往后的reward总和,其中 \(\gamma\) 为discount factor,用于权衡短期和长期reward对于当前Agent的效用影响。等式最后一步的意义是t步后的reward总和等价于t步所获的立即reward \(R_{t+1}\),加上t+1步后的reward总和 \(\gamma G_{t+1}\)

\[ \begin{aligned} G_{t} & \doteq R_{t+1}+\gamma R_{t+2}+\gamma^{2} R_{t+3}+\gamma^{3} R_{t+4}+\cdots \\ &=R_{t+1}+\gamma\left(R_{t+2}+\gamma R_{t+3}+\gamma^{2} R_{t+4}+\cdots\right) \\ &=R_{t+1}+\gamma G_{t+1} \end{aligned} \]

有了reward总和的定义,评价Agent策略 \(\pi\) 就可以定义成Agent在状态 s 时采用此策略的Expected Return。

\[ v_{\pi}(s) \doteq \mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s\right] \]

下面公式推导了 \(v_{\pi}(s)\) 数值上和相关状态 \(s{\prime}\) 的关系:

\[ \begin{aligned} v_{\pi}(s) &\doteq \mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s\right] \\ &=\mathbb{E}_{\pi}\left[\sum_{k=0}^{\infty} \gamma^{k} R_{t+k+1} \mid S_{t}=s\right]\\ &=\mathbb{E}_{\pi}\left[R_{t+1}+\gamma G_{t+1} \mid S_{t}=s\right] \\ &=\sum_{a} \pi(a \mid s) \sum_{s^{\prime}} \sum_{r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma \mathbb{E}_{\pi}\left[G_{t+1} \mid S_{t+1}=s^{\prime}\right]\right] \\ &=\sum_{a} \pi(a \mid s) \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{\pi}\left(s^{\prime}\right)\right] \quad \text { for all } s \in \mathcal{S} \end{aligned} \]

注意到如果将 \(v_{\pi}(s)\) 看成未知数,上式即形成 \(\mid \mathcal{S} \mid\) 个未知变量的方程组,可以在数值上解得各个 \(v_{\pi}(s)\)

书中用Backup Diagram来表示递推关系,下图是\(v_{\pi}(s)\)的backup diagram。

尽管v值可以来衡量策略,但由于\(v_{\pi}(s)\) 是Agent在策略\(\pi(a|s)\)的Expected Return,将不同的action拆出来单独计算Expected Return,这样的做法有时更为直接,这就是著名的Q Learning中的q 值,记成\(q_{\pi}(s, a)\)

\[ q_{\pi}(s, a) \doteq \mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s, A_{t}=a\right] \]

下面是 $q_{}(s, a) $ 的递推 backup diagram。

Bellman 最佳原则

对于所有状态集合\(\mathcal{S}\),策略\({\pi}\)的评价指标 \(v_{\pi}(s)\) 是一个向量,本质上是无法相互比较的。但由于存在Bellman 最佳原则(Bellman's principle of optimality):在有限状态情况下,一定存在一个或者多个最好的策略 \({\pi}_{*}\),它在所有状态下的v值都是最好的,即 \(v_{\pi_{*}}(s) \ge v_{\pi^{\prime}}(s) \text { for all } s \in \mathcal{S}\)

因此,最佳v值定义为最佳策略 \({\pi}_{*}\) 对应的 v 值

\[ v_{*}(s) \doteq \max_{\pi} v_{\pi}(s) \]

同理,也存在最佳q值,记为 \[ \begin{aligned} q_{*}(s, a) &\doteq \max_{\pi} q_{\pi}(s,a) \end{aligned} \]

\(v_{*}(s)\) 改写成递推形式,称为 Bellman Optimality Equation,推导如下

\[ \begin{aligned} v_{*}(s) &=\max _{a \in \mathcal{A}(s)} q_{\pi_{*}}(s, a) \\ &=\max _{a} \mathbb{E}_{\pi_{*}}\left[G_{t} \mid S_{t}=s, A_{t}=a\right] \\ &=\max _{a} \mathbb{E}_{\pi_{*}}\left[R_{t+1}+\gamma G_{t+1} \mid S_{t}=s, A_{t}=a\right] \\ &=\max _{a} \mathbb{E}\left[R_{t+1}+\gamma v_{*}\left(S_{t+1}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\max _{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{*}\left(s^{\prime}\right)\right] \end{aligned} \]

直觉上可以理解为状态 s 对应的最佳v值是只采取此状态下的最佳动作后的Expected Return。

最佳q值递归形式的意义为最佳策略下状态s时采取行动 a 的Expected Return,等于所有可能后续状态 s' 下采取最优行动的Expected Return的均值。推导如下:

\[ \begin{aligned} q_{*}(s, a) &=\mathbb{E}\left[R_{t+1}+\gamma \max _{a^{\prime}} q_{*}\left(S_{t+1}, a^{\prime}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma \max _{a^{\prime}} q_{*}\left(s^{\prime}, a^{\prime}\right)\right] \end{aligned} \]

\(v_{*}(s), q_{*}(s, a)\) 的backup diagram 如下图

Grid World 最佳策略和V值

Grid World 的最佳策略如下:尽可能快的走出去

Grid World最佳策略

上面的2D图中不同颜色表示不同V值,终点格子的红色表示0,隔着一步的黄色为-1,隔两步的绿色为-2,最远的紫色为-3。下面是立体图示。

Grid World最佳策略V值

Grid World OpenAI Gym 环境

下面是OpenAI Gym框架下Grid World环境的代码实现。本质是在GridWorldEnv构造函数中构建MDP,类型定义如下

1
2
3
4
5
6
MDP = Dict[State, Dict[Action, List[Tuple[Prob, State, Reward, bool]]]]

# P[state][action] = [
# (prob1, next_state1, reward1, is_done),
# (prob2, next_state2, reward2, is_done), ...]

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Action(Enum):
UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3

State = int
Reward = float
Prob = float
Policy = Dict[State, Dict[Action, Prob]]
Value = List[float]
StateSet = Set[int]
NonTerminalStateSet = Set[int]
MDP = Dict[State, Dict[Action, List[Tuple[Prob, State, Reward, bool]]]]
# P[s][a] = [(prob, next_state, reward, is_done), ...]

class GridWorldEnv(discrete.DiscreteEnv):
"""
Grid World environment described in Sutton and Barto Reinforcement Learning 2nd, chapter 4.
"""

def __init__(self, shape=[4,4]):
self.shape = shape
nS = np.prod(shape)
nA = len(list(Action))
MAX_R = shape[0]
MAX_C = shape[1]
self.grid = np.arange(nS).reshape(shape)
isd = np.ones(nS) / nS

# P[s][a] = [(prob, next_state, reward, is_done), ...]
P: MDP = {}
action_delta = {Action.UP: (-1, 0), Action.DOWN: (1, 0), Action.LEFT: (0, -1), Action.RIGHT: (0, 1)}
for s in range(0, MAX_R * MAX_C):
P[s] = {a.value : [] for a in list(Action)}
is_terminal = self.is_terminal(s)
if is_terminal:
for a in list(Action):
P[s][a.value] = [(1.0, s, 0, True)]
else:
r = s // MAX_R
c = s % MAX_R
for a in list(Action):
neighbor_r = min(MAX_R-1, max(0, r + action_delta[a][0]))
neighbor_c = min(MAX_C-1, max(0, c + action_delta[a][1]))
s_ = neighbor_r * MAX_R + neighbor_c
P[s][a.value] = [(1.0, s_, -1, False)]

super(GridWorldEnv, self).__init__(nS, nA, P, isd)

策略评估(Policy Evaluation)

策略评估需要解决在给定环境dynamics和Agent策略 \(\pi\)下,计算策略的v值 \(v_{\pi}\)。由于所有数量关系都已知,可以通过解方程组的方式求得,但通常会通过数值迭代的方式来计算,即通过一系列 \(v_{0}, v_{1}, ..., v_{k}\) 收敛至 \(v_{\pi}\)。如下迭代方式已经得到证明,当 \(k \rightarrow \infty\) 一定收敛至 \(v_{\pi}\)

\[ \begin{aligned} v_{k+1}(s) & \doteq \mathbb{E}_{\pi}\left[R_{t+1}+\gamma v_{k}\left(S_{t+1}\right) \mid S_{t}=s\right] \\ &=\sum_{a} \pi(a \mid s) \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{k}\left(s^{\prime}\right)\right] \end{aligned} \]

书中具体伪代码如下

\[ \begin{align*} &\textbf{Iterative Policy Evaluation, for estimating } V\approx v_{\pi} \\ & \text{Input } {\pi}, \text{the policy to be evaluated} \\ & \text{Algorithm parameter: a small threshold } \theta > 0 \text{ determining accuracy of estimation} \\ & \text{Initialize } V(s), \text{for all } s \in \mathcal{S}^{+} \text{, arbitrarily except that } V (terminal) = 0\\ & \\ &1: \text{Loop:}\\ &2: \quad \quad \Delta \leftarrow 0\\ &3: \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ &4: \quad \quad \quad \quad v \leftarrow V(s) \\ &5: \quad \quad \quad \quad V(s) \leftarrow \sum_{a} \pi(a \mid s) \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ &6: \quad \quad \quad \quad \Delta \leftarrow \max(\Delta, |v-V(s)|) \\ &7: \text{until } \Delta < \theta \end{align*} \]

下面是python 代码实现,注意这里单run迭代时,新的v值直接覆盖数组里的旧v值,这种做法在书中被证明不仅有效,甚至更为高效。这种做法称为原地(in place)更新。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def policy_evaluate(policy: Policy, env: GridWorldEnv, gamma=1.0, theta=0.0001):
V = np.zeros(env.nS)
while True:
delta = 0
for s in range(env.nS):
v = 0
for a, action_prob in enumerate(policy[s]):
for prob, next_state, reward, done in env.P[s][a]:
v += action_prob * prob * (reward + gamma * V[next_state])
delta = max(delta, np.abs(v - V[s]))
V[s] = v
if delta < theta:
break
return np.array(V)

输入策略为随机选择方向,运行上面的policy_evaluate最终多轮收敛后的V值输出为

{linenos
1
2
3
4
[[  0.         -13.99931242 -19.99901152 -21.99891199]
[-13.99931242 -17.99915625 -19.99908389 -19.99909436]
[-19.99901152 -19.99908389 -17.99922697 -13.99942284]
[-21.99891199 -19.99909436 -13.99942284 0. ]]
在3D V值图中可以发现,由于是随机选择方向的策略, Agent在每个格子的V值绝对数值要比最佳V值大,意味着随机策略下Agent在Grid World会得到更多的负reward。
Grid World随机策略V值

旅行商问题(TSP)是计算机算法中经典的NP hard 问题。 在本系列文章中,我们将首先使用动态规划 AC aizu中的TSP问题,然后再利用深度学习求大规模下的近似解。深度学习应用解决问题时先以PyTorch实现监督学习算法 Pointer Network,进而结合强化学习来无监督学习,提高数据使用效率。 本系列完整列表如下:

  • 第一篇: 递归DP方法 AC AIZU TSP问题

  • 第二篇: 二维空间TSP数据集及其DP解法

  • 第三篇: 深度学习 Pointer Networks 的 Pytorch实现

  • 第四篇: 搜寻最有可能路径:Viterbi算法和其他

  • 第五篇: 深度强化学习无监督算法的 Pytorch实现

TSP 问题回顾

TSP可以用图模型来表达,无论有向图或无向图,无论全连通图或者部分连通的图都可以作为TSP问题。 Wikipedia TSP 中举了一个无向全连通的TSP例子。如下图所示,四个顶点A,B,C,D构成无向全连通图。TSP问题要求在所有遍历所有点后返回初始点的回路中找到最短的回路。例如,\(A \rightarrow B \rightarrow C \rightarrow D \rightarrow A\)\(A \rightarrow C \rightarrow B \rightarrow D \rightarrow A\) 都是有效的回路,但是TSP需要返回这些回路中的最短回路(注意,最短回路可能会有多条)。

Wikipedia 4个顶点组成的图

无论是哪种类型的图,我们都能用邻接矩阵表示出一个图。上面的Wikipedia中的图可以用下面的矩阵来描述。

\[ \begin{matrix} & \begin{matrix}A&B&C&D\end{matrix} \\\\ \begin{matrix}A\\\\B\\\\C\\\\D\end{matrix} & \begin{bmatrix}-&20&42&35\\\\20&-&30&34\\\\42&30&-&12\\\\35&34&12&-\end{bmatrix}\\\\ \end{matrix} \]

当然,大多数情况下,TSP问题会被限定在欧氏空间,即二维地图中的全连通无向图。因为,如果将顶点表示一个地理位置,一般来说它可以和其他所有顶点连通,回来的距离相同,由此构成无向图。

AIZU TSP 问题

AIZU在线题库 有一道有向不完全连通图的TSP问题。给定V个顶点和E条边,输出最小回路值。例如,题目里的例子如下所示,由4个顶点和6条单向边构成。

这个示例的答案是16,对应的回路是 \(0\rightarrow1\rightarrow3\rightarrow2\rightarrow0\),由下图的红色边构成。注意,这个题目可能不存在合法解,原因是无回路存在,此时返回-1,可以合理地理解成无穷大。

暴力解法

一种暴力方法是枚举所有可能的从某一顶点的回路,取其中的最小值即可。下面的 Python 示例如何枚举4个顶点构成的图中从顶点0出发的所有回路。

{linenos
1
2
3
4
5
from itertools import permutations
v = [1,2,3]
p = permutations(v)
for t in list(p):
print([0] + list(t) + [0])

所有从顶点0出发的回路如下:

{linenos
1
2
3
4
5
6
[0, 1, 2, 3, 0]
[0, 1, 3, 2, 0]
[0, 2, 1, 3, 0]
[0, 2, 3, 1, 0]
[0, 3, 1, 2, 0]
[0, 3, 2, 1, 0]

很显然,这种方式的时间复杂度是 O(\(n!\)),无法通过AIZU。

动态规划求解

我们可以使用位状态压缩的动态规划来AC这道题。 首先,需要将回路过程中的状态编码成二进制的表示。例如,在四顶点的例子中,如果顶点2和1都被访问过,并且此时停留在顶点1。将已经访问的顶点对应的位置1,那么编码成0110,此外,还需要保存当前顶点的位置,因此我们将代表状态的数组扩展成二维,第一维是位状态,第二维是顶点所在位置,即 \(dp[bitstate][v]\)。这个例子的状态表示就是 \(dp["0110"][1]\)

状态转移方程如下: \[ dp[bitstate][v] = \min ( dp[bitstate \cup \{u\}][u] + dist(v,u) \mid u \notin bitstate ) \] 这种方法对应的时间复杂度是 O(\(n^2*2^n\) ),因为总共有 \(2^n * n\) 个状态,而每个状态又需要一次遍历。虽然都是指数级复杂度,但是它们的巨大区别由下面可以看出区别。

\(n!\) \(n^2*2^n\)
n=8 40320 16384
n=10 3628800 102400
n=12 479001600 589824
n=14 87178291200 3211264

暂停思考一下为什么状态压缩DP能工作。注意到之前暴力解法中其实是有很多重复计算,下面红圈表示重复的计算节点。

在本篇中,我们将会用Python 3和Java 8 实现自顶向下的DP 缓存版本。这种方式比较符合直觉,因为我们不需要预先考虑计算节点的依赖关系。在Java中我们使用了一个小技巧,dp数组初始化成Integer.MAX_VALUE,如此只需要一条语句就能完成更新dp值。

1
res = Math.min(res, s + g.edges[v][u]);

当然,为了AC 这道题,我们需要区分出真正无法到达的情况并返回-1。 在Python实现中,也可以使用同样的技巧,但是这次示例一般的实现方法:将dp数组初始化成-1并通过 if-else 来区分不同情况。

1
2
3
4
5
6
7
INT_INF = -1

if s != INT_INF and edges[v][u] != INT_INF:
if ret == INT_INF:
ret = s + edges[v][u]
else:
ret = min(ret, s + edges[v][u])

下面附完整的Python 3和Java 8的AC代码,同步在 github

AIZU Java 8 递归DP版本

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// passed http://judge.u-aizu.ac.jp/onlinejudge/description.jsp?id=DPL_2_A
import java.util.Arrays;
import java.util.Scanner;

public class Main {
public static class Graph {
public final int V_NUM;
public final int[][] edges;

public Graph(int V_NUM) {
this.V_NUM = V_NUM;
this.edges = new int[V_NUM][V_NUM];
for (int i = 0; i < V_NUM; i++) {
Arrays.fill(this.edges[i], Integer.MAX_VALUE);
}
}

public void setDist(int src, int dest, int dist) {
this.edges[src][dest] = dist;
}

}

public static class TSP {
public final Graph g;
long[][] dp;

public TSP(Graph g) {
this.g = g;
}

public long solve() {
int N = g.V_NUM;
dp = new long[1 << N][N];
for (int i = 0; i < dp.length; i++) {
Arrays.fill(dp[i], -1);
}

long ret = recurse(0, 0);
return ret == Integer.MAX_VALUE ? -1 : ret;
}

private long recurse(int state, int v) {
int ALL = (1 << g.V_NUM) - 1;
if (dp[state][v] >= 0) {
return dp[state][v];
}
if (state == ALL && v == 0) {
dp[state][v] = 0;
return 0;
}
long res = Integer.MAX_VALUE;
for (int u = 0; u < g.V_NUM; u++) {
if ((state & (1 << u)) == 0) {
long s = recurse(state | 1 << u, u);
res = Math.min(res, s + g.edges[v][u]);
}
}
dp[state][v] = res;
return res;

}

}

public static void main(String[] args) {

Scanner in = new Scanner(System.in);
int V = in.nextInt();
int E = in.nextInt();
Graph g = new Graph(V);
while (E > 0) {
int src = in.nextInt();
int dest = in.nextInt();
int dist = in.nextInt();
g.setDist(src, dest, dist);
E--;
}
System.out.println(new TSP(g).solve());
}
}

AIZU Python 3 递归DP版本

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from typing import List

INT_INF = -1

class Graph:
v_num: int
edges: List[List[int]]

def __init__(self, v_num: int):
self.v_num = v_num
self.edges = [[INT_INF for c in range(v_num)] for r in range(v_num)]

def setDist(self, src: int, dest: int, dist: int):
self.edges[src][dest] = dist


class TSPSolver:
g: Graph
dp: List[List[int]]

def __init__(self, g: Graph):
self.g = g
self.dp = [[None for c in range(g.v_num)] for r in range(1 << g.v_num)]

def solve(self) -> int:
return self._recurse(0, 0)

def _recurse(self, v: int, state: int) -> int:
"""

:param v:
:param state:
:return: -1 means INF
"""
dp = self.dp
edges = self.g.edges

if dp[state][v] is not None:
return dp[state][v]

if (state == (1 << self.g.v_num) - 1) and (v == 0):
dp[state][v] = 0
return dp[state][v]

ret: int = INT_INF
for u in range(self.g.v_num):
if (state & (1 << u)) == 0:
s: int = self._recurse(u, state | 1 << u)
if s != INT_INF and edges[v][u] != INT_INF:
if ret == INT_INF:
ret = s + edges[v][u]
else:
ret = min(ret, s + edges[v][u])
dp[state][v] = ret
return ret


def main():
V, E = map(int, input().split())
g: Graph = Graph(V)
for _ in range(E):
src, dest, dist = map(int, input().split())
g.setDist(src, dest, dist)

tsp: TSPSolver = TSPSolver(g)
print(tsp.solve())


if __name__ == "__main__":
main()

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×