#Reinforcement Learning

从这期开始我们进入Sutton强化学习第二版,第五章蒙特卡洛方法。蒙特卡洛方法是一种在工程各领域都存在的基本方法,在强化领域中,其特点是无需知道环境的dynamics,只需不断模拟记录并分析数据即可逼近理论真实值。蒙特卡洛方法本篇将会用21点游戏作为示例来具体讲解其原理和代码实现。

21点游戏问题

21点游戏是一个经典的赌博游戏。大致规则是玩家和庄家各发两张牌,一张明牌,一张暗牌。玩家和庄家可以决定加牌或停止加牌,新加的牌均为暗牌,最后比较两个玩家的牌面和,更接近21点的获胜。游戏的变化因素是牌Ace,既可以作为11也可以作为1来计算,算作11的时候称作usable。

Sutton教材中的21点游戏规则简化了几个方面用于控制问题状态数:

  • 已发的牌的无状态性:和一副牌的21点游戏不同的是,游戏环境简化为牌是可以无穷尽被补充的,一副牌的某一张被派发后,同样的牌会被补充进来,或者可以认为每次发放的牌都是从一副新牌中抽出的。统计学中的术语称为重复采样(sample with replacement)。这种规则下极端情况下,玩家可以拥有 5个A或者5个2。另外,这会导致玩家无法通过开局看到的3张牌的信息推断后续发牌的概率,如此就大规模减小了游戏状态数。
  • 庄家和玩家独立游戏,无需按轮次要牌。开局给定4张牌后,玩家先行动,加牌直至超21点或者停止要牌,如果超21点,玩家输,否则,等待庄家行动,庄家加牌直至超21点或者停止要牌,如果超21点,庄家输,否则比较两者的总点数。这种方式可以认为当玩家和庄家看到初始的三张牌后独立做一系列决策,最后比较结果,避免了交互模式下因为能观察到每一轮后对方牌数变化产生额外的信息而导致的游戏状态数爆炸。

有以上两个规则的简化,21点游戏问题的总状态数有下面三个维度

  • 自己手中的点数和(12到21)

  • 庄家明牌的点数(A到10)

  • 庄家明牌是否有 A(True, False)。

状态总计总数为三个维度的乘积 10 * 10 * 2 = 200。

关于游戏状态有几个比较subtle的假设或者要素。首先,玩家初始时能看到三张牌,这三张牌确定了状态的三个维度的值,当然也就确定了Agent的初始状态,随后按照独立游戏的规则进行,玩家根据初始状态依照某种策略决策要牌还是结束要牌,新拿的牌更新了游戏状态,玩家转移到新状态下继续做决策。举个例子,假设初始时玩家明牌为8,暗牌为6,庄家明牌为7,则游戏状态为Tuple (14, 7, False)。若玩家的策略为教材中的固定规则策略:没到20或者21继续要牌。下一步玩家拿到牌3,则此时新状态为 (17, 7, False),按照策略继续要牌。

第二个方面是游戏的状态完全等价于玩家观察到的信息。比如尽管初始时有4张牌,真正的状态是这四张牌的值,但是出于简化目的,不考虑partially observable 的情况,即不将暗牌纳入游戏状态中。另外,庄家做决策的时候也无法得知玩家的手中的总牌数。

第三个方面是关于玩家点数。考虑玩家初始时的两张牌为2,3,总点数是5,那么为何不将5加入到游戏状态中呢?原则上是可以将初始总和为2到11都加入到游戏状态,但是意义不大,原因在于我们已经假设了已发牌的无状态性,拿到的这两张牌并不会改变后续补充的牌的出现概率。当玩家初始总和为2到11时一定会追加牌,因为无论第三张牌是什么,都不会超过21点,只会增加获胜概率。若后续第三张牌为8,总和变成13,就进入了有效的游戏状态,因为此时如果继续要牌,获得10,则游戏输掉。因此,我们关心的游戏状态并不完全等价于所有可能的游戏状态。

21点游戏 OpenAI Gym环境

OpenAI Gym 已经实现了Sutton版本的21点游戏环境,并按上述规则来进行。在安装完OpenAI Gym包之后 import BlackjackEnv即可使用。

1
from gym.envs.toy_text import BlackjackEnv

根据这个游戏环境,我们先来定义一些类型,可以令代码更具可读性和抽象化。State 上文说过是由三个分量组成的Tuple。Action 为bool类型 表示是否继续要牌。Reward 为+1或者-1,玩家叫牌过程中为0。StateValue 为书中的 \(V_{\pi}\),实现上是一个Dict。DeterministicPolicy 为一个函数,输入是某一状态,输出是唯一的决策动作。

{linenos
1
2
3
4
5
State = Tuple[int, int, bool]
Action = bool
Reward = float
StateValue = Dict[State, float]
DeterministicPolicy = Callable[[State], Action]

以下代码是 BlackjackEnv 核心代码,step 方法的输入为玩家的决策动作(叫牌还是结束),并输出State, Reward, is_done。简单解释一下代码逻辑,当玩家继续加牌时,需要判断是否超21点,如果没有超过的话,返回下一状态,同时reward 为0,等待下一step方法。若玩家停止叫牌,则按照庄家策略:小于17时叫牌。游戏终局时产生+1表示玩家获胜,-1表示庄家获胜。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class BlackjackEnv(gym.Env):

def step(self, action):
assert self.action_space.contains(action)
if action: # hit: add a card to players hand and return
self.player.append(draw_card(self.np_random))
if is_bust(self.player):
done = True
reward = -1.
else:
done = False
reward = 0.
else: # stick: play out the dealers hand, and score
done = True
while sum_hand(self.dealer) < 17:
self.dealer.append(draw_card(self.np_random))
reward = cmp(score(self.player), score(self.dealer))
if self.natural and is_natural(self.player) and reward == 1.:
reward = 1.5
return self._get_obs(), reward, done, {}

def _get_obs(self):
return (sum_hand(self.player), self.dealer[0], usable_ace(self.player))

下面示例如何调用step方法生成一个episode的数据集。数据集的类型为 List[Tuple[State, Action, Reward]]。

{linenos
1
2
3
4
5
6
7
8
9
10
def gen_episode_data(policy: DeterministicPolicy, env: BlackjackEnv) -> List[Tuple[State, Action, Reward]]:
episode_history = []
state = env.reset()
done = False
while not done:
action = policy(state)
next_state, reward, done, _ = env.step(action)
episode_history.append((state, action, reward))
state = next_state
return episode_history

策略的蒙特卡洛值预测

Monte Carlo Prediction解决如下问题:当给定Agent 策略\(\pi\)时,反复试验来预估策略的 \(V_{\pi}\) 值。具体来说,产生一系列的episode数据之后,对于出现了的所有状态分别计算其Return,再通过 average 某一状态 s 的Return来估计 \(V_{\pi}(s)\),理论上,依据大数定理(Law of large numbers),在可以无限模拟的情况下,Monte Carlo prediction 一定会收敛到真实的 \(V_{\pi}\)。算法实现上有两个略微不同的版本,一个版本称为 First-visit,另一个版本称为 Every-visit,区别在于如何计算出现的状态 s 的 Return值。

对于 First-visit 来说,当状态 s 第一次出现时计算一次 Returns,若继续出现状态 s 不再重复计算。对于Every-visit来说,每次出现 s 计算一次 Returns(s)。举个例子,某episode 数据如下: \[ S_1, R_1, S_2, R_2, S_1, R_3, S_3, R_4 \] First-visit 对于状态S1的Returns计算为

\[ Returns(S_1) = R_1 + R_2 + R_3 + R_4 \]

Every-visit 对于状态S1的Returns计算了两次,因为S1出现了两次。 \[ \begin{align*} Returns(S_1) = \frac{Return_1(S_1) + Return_2(S_1)}2 \\ = \frac{(R_1 + R_2 + R_3 + R_4) + (R_3 + R_4)} 2 \end{align*} \]

下面用Monte Carlo来模拟解得书中示例玩家固定策略的V值,策略具体为:加牌直到手中点数>=20,代码为

{linenos
1
2
3
4
5
6
def fixed_policy(observation):
"""
sticks if the player score is >= 20 and hits otherwise.
"""
score, dealer_score, usable_ace = observation
return 0 if score >= 20 else 1

First-visit MC Predicition

伪代码如下,注意考虑到实现上的高效性,在遍历episode序列数据时是从后向前扫的,这样可以边扫边更新G。

\[ \begin{align*} &\textbf{First-visit MC prediction, for estimating } V \approx v_{\pi} \\ & \text{Input: a policy } \pi \text{ to be evaluated} \\ & \text{Initialize} \\ & \quad V(s) \in \mathbb R \text{, arbitrarily, for all }s \in \mathcal{S} \\ & \quad Returns(s) \leftarrow \text{ an empty list, arbitrarily, for all }s \in \mathcal{S} \\ & \\ & \text{Loop forever (for episode):}\\ & \quad \text{Generate an episode following } \pi: S_0, A_0, R_1, S_1, A_1, R_2, ..., S_{T-1}, A_{T-1}, R_T\\ & \quad G \leftarrow 0\\ & \quad \text{Loop for each step of episode, } t = T-1, T-2, ..., 0:\\ & \quad \quad \quad G \leftarrow \gamma G + R_{t+1}\\ & \quad \quad \quad \text{Unless } S_t \text{ appears in } S_0, S_1, ..., S_{t-1}\\ & \quad \quad \quad \quad \text{Append } G \text { to }Returns(S_t) \\ & \quad \quad \quad \quad V(S_t) \leftarrow \operatorname{average}(Returns(S_t))\\ \end{align*} \]

对应的 python 实现

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def mc_prediction_first_visit(policy: DeterministicPolicy, env: BlackjackEnv,
num_episodes, discount_factor=1.0) -> StateValue:
returns_sum = defaultdict(float)
returns_count = defaultdict(float)

for episode_i in range(1, num_episodes + 1):
episode_history = gen_episode_data(policy, env)

G = 0
for t in range(len(episode_history) - 1, -1, -1):
s, a, r = episode_history[t]
G = discount_factor * G + r
if not any(s_a_r[0] == s for s_a_r in episode_history[0: t]):
returns_sum[s] += G
returns_count[s] += 1.0

V = defaultdict(float)
V.update({s: returns_sum[s] / returns_count[s] for s in returns_sum.keys()})
return V

Every-visit MC Prediciton

Every-visit 代码实现相对更简单一些,t 从后往前遍历时更新对应s的状态变量。如下所示

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def mc_prediction_every_visit(policy: DeterministicPolicy, env: BlackjackEnv,
num_episodes, discount_factor=1.0) -> StateValue:
returns_sum = defaultdict(float)
returns_count = defaultdict(float)

for episode_i in range(1, num_episodes + 1):
episode_history = gen_episode_data(policy, env)

G = 0
for t in range(len(episode_history) - 1, -1, -1):
s, a, r = episode_history[t]
G = discount_factor * G + r
returns_sum[s] += G
returns_count[s] += 1.0

V = defaultdict(float)
V.update({s: returns_sum[s] / returns_count[s] for s in returns_sum.keys()})
return V

策略 V值 3D 可视化

运行first-visit 算法,模拟10000次episode,fixed_policy的V值的3D图为下面两张图,分别是不含usable Ace和包含usable Ace。总的说来,一旦玩家能到达20点或21点获胜概率极大,到达13-17获胜概率较小,在11-13时有一定获胜概率,比较符合经验直觉。

first-visit MC 10000次没有usable A的V值
first-visit MC 10000次含有usable A的V值

同样运行every-visit 算法,模拟10000次的V值图。对比两种方法结果比较接近。

every-visit MC 10000次没有usable A的V值
every-visit MC 10000次含有usable A的V值

上一期 通过代码学Sutton强化学习1:Grid World OpenAI环境和策略评价算法,我们引入了 Grid World 问题,实现了对应的OpenAI Gym 环境,也分析了其最佳策略和对应的V值。这一期中,继续通过这个例子详细讲解策略提升(Policy Improvment)、策略迭代(Policy Iteration)、值迭代(Value Iteration)和异步迭代方法。

回顾 Grid World 问题

Grid World 问题
在Grid World 中,Agent初始可以出现在编号1-14的网格中,Agent 每往四周走一步得到 -1 reward,因此需要尽快走到两个出口。当然最佳策略是以最小步数往出口逃离,如下所示。
Grid World 最佳策略

最佳策略对应的状态V值和3D heatmap如下

1
2
3
4
[[ 0. -1. -2. -3.]
[-1. -2. -3. -2.]
[-2. -3. -2. -1.]
[-3. -2. -1. 0.]]

Grid World V值 3D heatmap

策略迭代

上一篇中,我们知道如何evaluate 给定policy \(\pi\)\(v_{\pi}\)值,那么是否可能在此基础上改进生成更好的策略 \(\pi^{\prime}\)。如果可以,能否最终找到最佳策略\({\pi}_{*}\)?答案是肯定的,因为存在策略提升定理(Policy Improvement Theorem)。

策略提升定理

在 4.2 节 Policy Improvement Theorem 可以证明,利用 \(v_{\pi}\) 信息对于每个状态采取最 greedy 的 action (又称exploitation)能够保证生成的新 \({\pi}^{\prime}\) 是不差于旧的policy \({\pi}\),即

\[ q_{\pi}(s, {\pi}^{\prime}(s)) \gt v_{\pi}(s) \]

\[ v_{\pi^{\prime}}(s) \gt v_{\pi}(s) \]

因此,可以通过在当前policy求得v值,再选取最greedy action的方式形成如下迭代,就能够不断逼近最佳策略。

\[ \pi_{0} \stackrel{\mathrm{E}}{\longrightarrow} v_{\pi_{0}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{1} \stackrel{\mathrm{E}}{\longrightarrow} v_{\pi_{1}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{2} \stackrel{\mathrm{E}}{\longrightarrow} \cdots \stackrel{\mathrm{I}}{\longrightarrow} \pi_{*} \stackrel{\mathrm{E}}{\longrightarrow} v_{*} \]

策略迭代算法

以下为书中4.3的policy iteration伪代码。其中policy evaluation的算法在上一篇中已经实现。Policy improvement 的精髓在于一次遍历所有状态后,通过policy 的最大Q值找到该状态的最佳action,并更新成最新policy,循环直至没有 action 变更。

\[ \begin{align*} &\textbf{Policy Iteration (using iterative policy evaluation) for estimating } \pi\approx {\pi}_{*} \\ &1. \quad \text{Initialization} \\ & \quad \quad V(s) \in \mathbb R\text{ and } \pi(s) \in \mathcal A(s) \text{ arbitrarily for all }s \in \mathcal{S} \\ & \\ &2. \quad \text{Policy Evaluation} \\ & \quad \quad \text{Loop:}\\ & \quad \quad \Delta \leftarrow 0\\ & \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ & \quad \quad \quad \quad v \leftarrow V(s) \\ & \quad \quad \quad \quad V(s) \leftarrow \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ & \quad \quad \quad \quad \Delta \leftarrow \max(\Delta, |v-V(s)|) \\ & \quad \quad \text{until } \Delta < \theta \text{ (a small positive number determining the accuracy of estimation)}\\ & \\ &3. \quad \text{Policy Improvement} \\ & \quad \quad policy\text{-}stable\leftarrow true \\ & \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ & \quad \quad \quad \quad old\text{-}action\leftarrow \pi(s) \\ & \quad \quad \quad \quad \pi(s) \leftarrow \operatorname{argmax}_{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ & \quad \quad \quad \quad \text{If } old\text{-}action \neq \pi\text{,then }policy\text{-}stable\leftarrow false \\ & \quad \quad \text{If } policy\text{-}stable \text{, then stop and return }V \approx v_{*} \text{ and } \pi\approx {\pi}_{*}\text{; else go to 2} \end{align*} \]

注意到状态Q值 \(q_{\pi}(s, a)\) 会被多处调用,将其封装为单独的函数。

\[ \begin{aligned} q_{\pi}(s, a) &=\sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{\pi}\left(s^{\prime}\right)\right] \end{aligned} \]

Q值函数实现如下:

{linenos
1
2
3
4
5
6
def action_value(env: GridWorldEnv, state: State, V: StateValue, gamma=1.0) -> ActionValue:
q = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[state][a]:
q[a] += prob * (reward + gamma * V[next_state])
return q

有了 action_value 和上期的 policy_evaluate,policy iteration 实现完全对应上面的伪代码。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def policy_improvement(env: GridWorldEnv, policy: Policy, V: StateValue, gamma=1.0) -> bool:
policy_stable = True

for s in range(env.nS):
old_action = np.argmax(policy[s])
Q_s = action_value(env, s, V)
best_action = np.argmax(Q_s)
policy[s] = np.eye(env.nA)[best_action]

if old_action != best_action:
policy_stable = False
return policy_stable


def policy_iteration(env: GridWorldEnv, policy: Policy, gamma=1.0) -> Tuple[Policy, StateValue]:
iter = 0
while True:
V = policy_evaluate(policy, env, gamma)
policy_stable = policy_improvement(env, policy, V)
iter += 1

if policy_stable:
return policy, V

Grid World 例子通过两轮迭代就可以收敛,以下是初始时随机策略的V值和第一次迭代后的V值。
初始随机策略 V 值
第一次迭代后的 V 值

值迭代

值迭代( Value Iteration)的本质是,将policy iteration中的policy evaluation过程从不断循环到收敛直至小于theta,改成只执行一遍,并直接用最佳Q值更新到状态V值,如此可以不用显示地算出\({\pi}\) 而直接在V值上迭代。具体迭代公式如下:

\[ \begin{aligned} v_{k+1}(s) & \doteq \max _{a} \mathbb{E}\left[R_{t+1}+\gamma v_{k}\left(S_{t+1}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\max_{a} q_{\pi_k}(s, a) \\ &=\max _{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{k}\left(s^{\prime}\right)\right] \end{aligned} \]

完整的伪代码为:

\[ \begin{align*} &\textbf{Value Iteration, for estimating } \pi\approx \pi_{*} \\ & \text{Algorithm parameter: a small threshold } \theta > 0 \text{ determining accuracy of estimation} \\ & \text{Initialize } V(s), \text{for all } s \in \mathcal{S}^{+} \text{, arbitrarily except that } V (terminal) = 0\\ & \\ &1: \text{Loop:}\\ &2: \quad \quad \Delta \leftarrow 0\\ &3: \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ &4: \quad \quad \quad \quad v \leftarrow V(s) \\ &5: \quad \quad \quad \quad V(s) \leftarrow \operatorname{max}_{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ &6: \quad \quad \quad \quad \Delta \leftarrow \max(\Delta, |v-V(s)|) \\ &7: \text{until } \Delta < \theta \\ & \\ & \text{Output a deterministic policy, }\pi\approx \pi_{*} \text{, such that} \\ & \quad \quad \pi(s) \leftarrow \operatorname{argmax}_{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \end{align*} \]

代码实现也比较直接,可以复用上面已经实现的 action_value 函数。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def value_iteration(env:GridWorldEnv, gamma=1.0, theta=0.0001) -> Tuple[Policy, StateValue]:
V = np.zeros(env.nS)
while True:
delta = 0
for s in range(env.nS):
action_values = action_value(env, s, V, gamma=gamma)
best_action_value = np.max(action_values)
delta = max(delta, np.abs(best_action_value - V[s]))
V[s] = best_action_value
if delta < theta:
break

policy = np.zeros([env.nS, env.nA])
for s in range(env.nS):
action_values = action_value(env, s, V, gamma=gamma)
best_action = np.argmax(action_values)
policy[s, best_action] = 1.0

return policy, V

异步迭代

在第4.5节中提到了DP迭代方式的改进版:异步方式迭代(Asychronous Iteration)。这里的异步是指每一轮无需全部扫一遍所有状态,而是根据上一轮变化的状态决定下一轮需要最多计算的状态数,类似于Dijkstra最短路径算法中用 heap 来维护更新节点集合,减少运算量。下面我们通过异步值迭代来演示异步迭代的工作方式。

下图表示状态的变化方向,若上一轮 \(V(s)\) 发生更新,那么下一轮就要考虑状态 s 可能会影响到上游状态的集合( p1,p2),避免下一轮必须遍历所有状态的V值计算。

Async 反向传播

要做到部分更新就必须知道每个状态可能影响到的上游状态集合,上图对应的映射关系可以表示为

\[ \begin{align*} s'_1 &\rightarrow \{s\} \\ s'_2 &\rightarrow \{s\} \\ s &\rightarrow \{p_1, p_2\} \end{align*} \]

建立映射关系的代码如下,build_reverse_mapping 返回类型为 Dict[State, Set[State]]。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
def build_reverse_mapping(env:GridWorldEnv) -> Dict[State, Set[State]]:
MAX_R, MAX_C = env.shape[0], env.shape[1]
mapping = {s: set() for s in range(0, MAX_R * MAX_C)}
action_delta = {Action.UP: (-1, 0), Action.DOWN: (1, 0), Action.LEFT: (0, -1), Action.RIGHT: (0, 1)}
for s in range(0, MAX_R * MAX_C):
r = s // MAX_R
c = s % MAX_R
for a in list(Action):
neighbor_r = min(MAX_R - 1, max(0, r + action_delta[a][0]))
neighbor_c = min(MAX_C - 1, max(0, c + action_delta[a][1]))
s_ = neighbor_r * MAX_R + neighbor_c
mapping[s_].add(s)
return mapping

有了描述状态依赖的映射 dict 后,代码也比较简洁,changed_state_set 变量保存了这轮必须计算的状态集合。新的一轮迭代时,将下一轮需要计算的状态保存到 changed_state_set_ 中,本轮结束后,changed_state_set 更新成changed_state_set_,开始下一轮循环直至没有状态需要更新。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def value_iteration_async(env:GridWorldEnv, gamma=1.0, theta=0.0001) -> Tuple[Policy, StateValue]:
mapping = build_reverse_mapping(env)

V = np.zeros(env.nS)
changed_state_set = set(s for s in range(env.nS))

iter = 0
while len(changed_state_set) > 0:
changed_state_set_ = set()
for s in changed_state_set:
action_values = action_value(env, s, V, gamma=gamma)
best_action_value = np.max(action_values)
v_diff = np.abs(best_action_value - V[s])
if v_diff > theta:
changed_state_set_.update(mapping[s])
V[s] = best_action_value
changed_state_set = changed_state_set_
iter += 1

policy = np.zeros([env.nS, env.nA])
for s in range(env.nS):
action_values = action_value(env, s, V, gamma=gamma)
best_action = np.argmax(action_values)
policy[s, best_action] = 1.0

return policy, V
比较值迭代和异步值迭代方法后发现,值迭代用了4次循环,每次涉及所有状态,总计算状态数为 4 x 16 = 64。异步值迭代也用了4次循环,但是总计更新了54个状态。由于Grid World 的状态数很少,异步值迭代优势并不明显,但是对于状态数众多并且迭代最终集中在少部分状态的环境下,节省的计算量还是很可观的。

经典教材Reinforcement Learning: An Introduction 第二版由强化领域权威Richard S. Sutton 和 Andrew G. Barto 完成编写,内容深入浅出,非常适合初学者。在本篇中,引入Grid World示例,结合强化学习核心概念,并用python代码实现OpenAI Gym的模拟环境,进一步实现策略评价算法。

Grid World 问题

第四章例子4.1提出了一个简单的离散空间状态问题:Grid World,其大致意思是在4x4的网格世界中有14个格子是非终点状态,在这些非终点状态的格子中可以往上下左右四个方向走,直至走到两个终点状态格子,则游戏结束。每走一步,Agent收获reward -1,表示Agent希望在Grid World中尽早出去。另外,Agent在Grid World边缘时,无法继续往外只能呆在原地,reward也是-1。

Finite MDP 模型

先来回顾一下强化学习的建模基础:有限马尔可夫决策过程(Finite Markov Decision Process, Finite MDP)。如下图,强化学习模型将世界抽象成两个实体,强化学习解决目标的主体Agent和其他外部环境。它们之间的交互过程遵从有限马尔可夫决策过程:若Agent在t时间步骤时处于状态 \(S_t\),采取动作 \(A_t\),然后环境根据自身机制,产生Reward \(R_{t+1}\) 并将Agent状态变为 \(S_{t+1}\)

环境自身机制又称为dynamics,工程上可以看成一个输入(S, A),输出(S, R)的方法。由于MDP包含随机过程,某个输入并不能确定唯一输出,而会根据概率分布输出不同的(S, R)。Finite MDP简化了时间对于模型的影响,因为(S, R)只和(S, A)有关,不和时间t有关。另外,有限指的是S,A,R的状态数量是有限的。

数学上dynamics可以如下表示

\[ p\left(s^{\prime}, r \mid s, a\right) \doteq \operatorname{Pr}\left\{S_{t}=s^{\prime}, R_{t}=r \mid S_{t-1}=s, A_{t-1}=a\right\} \]

即是四元组作为输入的概率函数 \(p: S \times R \times S \times A \rightarrow [0, 1]\)

满足 \[ \sum_{s^{\prime} \in \mathcal{S}} \sum_{r \in \mathcal{R}} p\left(s^{\prime}, r \mid s, a\right)=1, \text { for all } s \in \mathcal{S}, a \in \mathcal{A}(s) \]

以Grid World为例,当Agent处于编号1的网格时,可以往四个方向走,往任意方向走都只产生一种 S, R,因为这个简单的游戏是确定性的,不存在某一动作导致stochastic状态。例如,在1号网格往左就到了终点网格(编号0),得到Reward -1这个规则可以如下表示 \[ p\left(s^{\prime}=0, r=-1 \mid s=1, a=\text{L}\right) = 1 \] 因此,状态s=1的所有dynamics概率映射为

\[ \begin{aligned} p\left(s^{\prime}=0, r=-1 \mid s=1, a=\text{L}\right) &=& 1 \\ p\left(s^{\prime}=2, r=-1 \mid s=1, a=\text{R}\right) &=& 1 \\ p\left(s^{\prime}=1, r=-1 \mid s=1, a=\text{U}\right) &=& 1 \\ p\left(s^{\prime}=5, r=-1 \mid s=1, a=\text{D}\right) &=& 1 \end{aligned} \]

强化学习的目的

在给定了问题以及定义了强化学习的模型之后,强化学习的目的当然是通过学习让Agent能够学到最佳策略\(\pi_{*}\),也就是在某个状态下的行动分布,记成 \(\pi(a|s)\)。对应在数值上的优化目标是Agent在一系列过程中采取某种策略的reward总和的期望(Expected Return)。下面公式定义了t步往后的reward总和,其中 \(\gamma\) 为discount factor,用于权衡短期和长期reward对于当前Agent的效用影响。等式最后一步的意义是t步后的reward总和等价于t步所获的立即reward \(R_{t+1}\),加上t+1步后的reward总和 \(\gamma G_{t+1}\)

\[ \begin{aligned} G_{t} & \doteq R_{t+1}+\gamma R_{t+2}+\gamma^{2} R_{t+3}+\gamma^{3} R_{t+4}+\cdots \\ &=R_{t+1}+\gamma\left(R_{t+2}+\gamma R_{t+3}+\gamma^{2} R_{t+4}+\cdots\right) \\ &=R_{t+1}+\gamma G_{t+1} \end{aligned} \]

有了reward总和的定义,评价Agent策略 \(\pi\) 就可以定义成Agent在状态 s 时采用此策略的Expected Return。

\[ v_{\pi}(s) \doteq \mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s\right] \]

下面公式推导了 \(v_{\pi}(s)\) 数值上和相关状态 \(s{\prime}\) 的关系:

\[ \begin{aligned} v_{\pi}(s) &\doteq \mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s\right] \\ &=\mathbb{E}_{\pi}\left[\sum_{k=0}^{\infty} \gamma^{k} R_{t+k+1} \mid S_{t}=s\right]\\ &=\mathbb{E}_{\pi}\left[R_{t+1}+\gamma G_{t+1} \mid S_{t}=s\right] \\ &=\sum_{a} \pi(a \mid s) \sum_{s^{\prime}} \sum_{r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma \mathbb{E}_{\pi}\left[G_{t+1} \mid S_{t+1}=s^{\prime}\right]\right] \\ &=\sum_{a} \pi(a \mid s) \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{\pi}\left(s^{\prime}\right)\right] \quad \text { for all } s \in \mathcal{S} \end{aligned} \]

注意到如果将 \(v_{\pi}(s)\) 看成未知数,上式即形成 \(\mid \mathcal{S} \mid\) 个未知变量的方程组,可以在数值上解得各个 \(v_{\pi}(s)\)

书中用Backup Diagram来表示递推关系,下图是\(v_{\pi}(s)\)的backup diagram。

尽管v值可以来衡量策略,但由于\(v_{\pi}(s)\) 是Agent在策略\(\pi(a|s)\)的Expected Return,将不同的action拆出来单独计算Expected Return,这样的做法有时更为直接,这就是著名的Q Learning中的q 值,记成\(q_{\pi}(s, a)\)

\[ q_{\pi}(s, a) \doteq \mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s, A_{t}=a\right] \]

下面是 $q_{}(s, a) $ 的递推 backup diagram。

Bellman 最佳原则

对于所有状态集合\(\mathcal{S}\),策略\({\pi}\)的评价指标 \(v_{\pi}(s)\) 是一个向量,本质上是无法相互比较的。但由于存在Bellman 最佳原则(Bellman's principle of optimality):在有限状态情况下,一定存在一个或者多个最好的策略 \({\pi}_{*}\),它在所有状态下的v值都是最好的,即 \(v_{\pi_{*}}(s) \ge v_{\pi^{\prime}}(s) \text { for all } s \in \mathcal{S}\)

因此,最佳v值定义为最佳策略 \({\pi}_{*}\) 对应的 v 值

\[ v_{*}(s) \doteq \max_{\pi} v_{\pi}(s) \]

同理,也存在最佳q值,记为 \[ \begin{aligned} q_{*}(s, a) &\doteq \max_{\pi} q_{\pi}(s,a) \end{aligned} \]

\(v_{*}(s)\) 改写成递推形式,称为 Bellman Optimality Equation,推导如下

\[ \begin{aligned} v_{*}(s) &=\max _{a \in \mathcal{A}(s)} q_{\pi_{*}}(s, a) \\ &=\max _{a} \mathbb{E}_{\pi_{*}}\left[G_{t} \mid S_{t}=s, A_{t}=a\right] \\ &=\max _{a} \mathbb{E}_{\pi_{*}}\left[R_{t+1}+\gamma G_{t+1} \mid S_{t}=s, A_{t}=a\right] \\ &=\max _{a} \mathbb{E}\left[R_{t+1}+\gamma v_{*}\left(S_{t+1}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\max _{a} \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{*}\left(s^{\prime}\right)\right] \end{aligned} \]

直觉上可以理解为状态 s 对应的最佳v值是只采取此状态下的最佳动作后的Expected Return。

最佳q值递归形式的意义为最佳策略下状态s时采取行动 a 的Expected Return,等于所有可能后续状态 s' 下采取最优行动的Expected Return的均值。推导如下:

\[ \begin{aligned} q_{*}(s, a) &=\mathbb{E}\left[R_{t+1}+\gamma \max _{a^{\prime}} q_{*}\left(S_{t+1}, a^{\prime}\right) \mid S_{t}=s, A_{t}=a\right] \\ &=\sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma \max _{a^{\prime}} q_{*}\left(s^{\prime}, a^{\prime}\right)\right] \end{aligned} \]

\(v_{*}(s), q_{*}(s, a)\) 的backup diagram 如下图

Grid World 最佳策略和V值

Grid World 的最佳策略如下:尽可能快的走出去

Grid World最佳策略

上面的2D图中不同颜色表示不同V值,终点格子的红色表示0,隔着一步的黄色为-1,隔两步的绿色为-2,最远的紫色为-3。下面是立体图示。

Grid World最佳策略V值

Grid World OpenAI Gym 环境

下面是OpenAI Gym框架下Grid World环境的代码实现。本质是在GridWorldEnv构造函数中构建MDP,类型定义如下

1
2
3
4
5
6
MDP = Dict[State, Dict[Action, List[Tuple[Prob, State, Reward, bool]]]]

# P[state][action] = [
# (prob1, next_state1, reward1, is_done),
# (prob2, next_state2, reward2, is_done), ...]

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Action(Enum):
UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3

State = int
Reward = float
Prob = float
Policy = Dict[State, Dict[Action, Prob]]
Value = List[float]
StateSet = Set[int]
NonTerminalStateSet = Set[int]
MDP = Dict[State, Dict[Action, List[Tuple[Prob, State, Reward, bool]]]]
# P[s][a] = [(prob, next_state, reward, is_done), ...]

class GridWorldEnv(discrete.DiscreteEnv):
"""
Grid World environment described in Sutton and Barto Reinforcement Learning 2nd, chapter 4.
"""

def __init__(self, shape=[4,4]):
self.shape = shape
nS = np.prod(shape)
nA = len(list(Action))
MAX_R = shape[0]
MAX_C = shape[1]
self.grid = np.arange(nS).reshape(shape)
isd = np.ones(nS) / nS

# P[s][a] = [(prob, next_state, reward, is_done), ...]
P: MDP = {}
action_delta = {Action.UP: (-1, 0), Action.DOWN: (1, 0), Action.LEFT: (0, -1), Action.RIGHT: (0, 1)}
for s in range(0, MAX_R * MAX_C):
P[s] = {a.value : [] for a in list(Action)}
is_terminal = self.is_terminal(s)
if is_terminal:
for a in list(Action):
P[s][a.value] = [(1.0, s, 0, True)]
else:
r = s // MAX_R
c = s % MAX_R
for a in list(Action):
neighbor_r = min(MAX_R-1, max(0, r + action_delta[a][0]))
neighbor_c = min(MAX_C-1, max(0, c + action_delta[a][1]))
s_ = neighbor_r * MAX_R + neighbor_c
P[s][a.value] = [(1.0, s_, -1, False)]

super(GridWorldEnv, self).__init__(nS, nA, P, isd)

策略评估(Policy Evaluation)

策略评估需要解决在给定环境dynamics和Agent策略 \(\pi\)下,计算策略的v值 \(v_{\pi}\)。由于所有数量关系都已知,可以通过解方程组的方式求得,但通常会通过数值迭代的方式来计算,即通过一系列 \(v_{0}, v_{1}, ..., v_{k}\) 收敛至 \(v_{\pi}\)。如下迭代方式已经得到证明,当 \(k \rightarrow \infty\) 一定收敛至 \(v_{\pi}\)

\[ \begin{aligned} v_{k+1}(s) & \doteq \mathbb{E}_{\pi}\left[R_{t+1}+\gamma v_{k}\left(S_{t+1}\right) \mid S_{t}=s\right] \\ &=\sum_{a} \pi(a \mid s) \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma v_{k}\left(s^{\prime}\right)\right] \end{aligned} \]

书中具体伪代码如下

\[ \begin{align*} &\textbf{Iterative Policy Evaluation, for estimating } V\approx v_{\pi} \\ & \text{Input } {\pi}, \text{the policy to be evaluated} \\ & \text{Algorithm parameter: a small threshold } \theta > 0 \text{ determining accuracy of estimation} \\ & \text{Initialize } V(s), \text{for all } s \in \mathcal{S}^{+} \text{, arbitrarily except that } V (terminal) = 0\\ & \\ &1: \text{Loop:}\\ &2: \quad \quad \Delta \leftarrow 0\\ &3: \quad \quad \text{Loop for each } s \in \mathcal{S}:\\ &4: \quad \quad \quad \quad v \leftarrow V(s) \\ &5: \quad \quad \quad \quad V(s) \leftarrow \sum_{a} \pi(a \mid s) \sum_{s^{\prime}, r} p\left(s^{\prime}, r \mid s, a\right)\left[r+\gamma V\left(s^{\prime}\right)\right] \\ &6: \quad \quad \quad \quad \Delta \leftarrow \max(\Delta, |v-V(s)|) \\ &7: \text{until } \Delta < \theta \end{align*} \]

下面是python 代码实现,注意这里单run迭代时,新的v值直接覆盖数组里的旧v值,这种做法在书中被证明不仅有效,甚至更为高效。这种做法称为原地(in place)更新。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def policy_evaluate(policy: Policy, env: GridWorldEnv, gamma=1.0, theta=0.0001):
V = np.zeros(env.nS)
while True:
delta = 0
for s in range(env.nS):
v = 0
for a, action_prob in enumerate(policy[s]):
for prob, next_state, reward, done in env.P[s][a]:
v += action_prob * prob * (reward + gamma * V[next_state])
delta = max(delta, np.abs(v - V[s]))
V[s] = v
if delta < theta:
break
return np.array(V)

输入策略为随机选择方向,运行上面的policy_evaluate最终多轮收敛后的V值输出为

{linenos
1
2
3
4
[[  0.         -13.99931242 -19.99901152 -21.99891199]
[-13.99931242 -17.99915625 -19.99908389 -19.99909436]
[-19.99901152 -19.99908389 -17.99922697 -13.99942284]
[-21.99891199 -19.99909436 -13.99942284 0. ]]
在3D V值图中可以发现,由于是随机选择方向的策略, Agent在每个格子的V值绝对数值要比最佳V值大,意味着随机策略下Agent在Grid World会得到更多的负reward。
Grid World随机策略V值

上一篇我们从原理层面解析了AlphaGo Zero如何改进MCTS算法,通过不断自我对弈,最终实现从零棋力开始训练直至能够打败任何高手。在本篇中,我们在已有的N子棋OpenAI Gym 环境中用Pytorch实现一个简化版的AlphaGo Zero算法。本篇所有代码在 github MyEncyclopedia/ConnectNGym 中,其中部分参考了SongXiaoJun 的 AlphaZero_Gomoku

AlphaGo Zero MCTS 树节点

上一篇中,我们知道AlphaGo Zero 的MCTS树搜索是基于传统MCTS 的UCT (UCB for Tree)的改进版PUCT(Polynomial Upper Confidence Trees)。局面节点的PUCT值由两部分组成,分别是代表Exploitation的action value Q值,和代表Exploration的U值。 \[ PUCT(s, a) =Q(s,a) + U(s,a) \] U值计算由这些参数决定:系数\(c_{puct}\),节点先验概率P(s, a) ,父节点访问次数,本节点的访问次数。具体公式如下 \[ U(s, a)=c_{p u c t} \cdot P(s, a) \cdot \frac{\sqrt{\Sigma_{b} N(s, b)}}{1+N(s, a)} \]

因此在实现过程中,对于一个树节点来说,需要保存其Q值、节点访问次数 _visit_num和先验概率 _prior。其中,_prior在节点初始化后不变,Q值和 visit_num随着游戏MCTS模拟进程而改变。此外,节点保存了 parent和_children变量,用于维护父子关系。c_puct为class variable,作为全局参数。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
class TreeNode:
"""
MCTS Tree Node
"""

c_puct: ClassVar[int] = 5 # class-wise global param c_puct, exploration weight factor.

_parent: TreeNode
_children: Dict[int, TreeNode] # map from action to TreeNode
_visit_num: int
_Q: float # Q value of the node, which is the mean action value.
_prior: float

和上面的计算公式相对应,下列代码根据节点状态计算PUCT(s, a)。

{linenos
1
2
3
4
5
6
7
8
9
10
class TreeNode:

def get_puct(self) -> float:
"""
Computes AlphaGo Zero PUCT (polynomial upper confidence trees) of the node.

:return: Node PUCT value.
"""
U = (TreeNode.c_puct * self._prior * np.sqrt(self._parent._visit_num) / (1 + self._visit_num))
return self._Q + U

AlphaGo Zero MCTS在playout时遇到已经被展开的节点,会根据selection规则选择子节点,该规则本质上是在所有子节点中选择最大的PUCT值的节点。

\[ a=\operatorname{argmax}_a(PUCT(s, a))=\operatorname{argmax}_a(Q(s,a) + U(s,a)) \]

{linenos
1
2
3
4
5
6
7
8
9
class TreeNode:

def select(self) -> Tuple[Pos, TreeNode]:
"""
Selects an action(Pos) having max UCB value.

:return: Action and corresponding node
"""
return max(self._children.items(), key=lambda act_node: act_node[1].get_puct())

新的叶节点一旦在playout时产生,关联的 v 值会一路向上更新至根节点,具体新节点的v值将在下一节中解释。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class TreeNode:

def propagate_to_root(self, leaf_value: float):
"""
Updates current node with observed leaf_value and propagates to root node.

:param leaf_value:
:return:
"""
if self._parent:
self._parent.propagate_to_root(-leaf_value)
self._update(leaf_value)

def _update(self, leaf_value: float):
"""
Updates the node by newly observed leaf_value.

:param leaf_value:
:return:
"""
self._visit_num += 1
# new Q is updated towards deviation from existing Q
self._Q += 0.5 * (leaf_value - self._Q)

AlphaGo Zero MCTS Player 实现

AlphaGo Zero MCTS 在训练阶段分为如下几个步骤。游戏初始局面下,整个局面树的建立由子节点的不断被探索而丰富起来。AlphaGo Zero对弈一次即产生了一次完整的游戏开始到结束的动作系列。在对弈过程中的某一游戏局面,需要采样海量的playout,又称MCTS模拟,以此来决定此局面的下一步动作。一次playout可视为在真实游戏状态树的一种特定采样,playout可能会产生游戏结局,生成真实的v值;也可能explore 到新的叶子节点,此时v值依赖策略价值网络的输出,目的是利用训练的神经网络来产生高质量的游戏对战局面。每次playout会从当前给定局面递归向下,向下的过程中会遇到下面三种节点情况。

  • 若局面节点是游戏结局(叶子节点),可以得到游戏的真实价值 z。从底部节点带着z向上更新沿途节点的Q值,直至根节点(初始局面)。
  • 若局面节点从未被扩展过(叶子节点),此时会将局面编码输入到策略价值双头网络,输出结果为网络预估的action分布和v值。Action分布作为节点先验概率P(s, a)来初始化子节点,预估的v值和上面真实游戏价值z一样,从叶子节点向上沿途更新到根节点。
  • 若局面节点已经被扩展过,则根据PUCT的select规则继续选择下一节点。

海量的playout模拟后,建立了游戏状态树的节点信息。但至此,AI玩家只是收集了信息,还仍未给定局面落子,而落子的决定由Play规则产生。下图展示了给定局面(Current节点)下,MCST模拟进行的多次playout探索后生成的局面树,play规则根据这些节点信息,产生Current 节点的动作分布 \(\pi\) ,确定下一步落子。

MCTS Playout和Play关系

Play 给定局面

对于当前需要做落子决定的某游戏局面\(s_0\),根据如下play公式生成落子分布 $$ ,子局面的落子概率正比于其访问次数的某次方。其中,某次方的倒数称为温度参数(Temperature)。

\[ \pi\left(a \mid s_{0}\right)=\frac{N\left(s_{0}, a\right)^{1 / \tau}}{\sum_{b} N\left(s_{0}, b\right)^{1 / \tau}} \]

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class MCTSAlphaGoZeroPlayer(BaseAgent):

def _next_step_play_act_probs(self, game: ConnectNGame) -> Tuple[List[Pos], ActionProbs]:
"""
For the given game status, run playouts number of times specified by self._playout_num.
Returns the action distribution according to AlphaGo Zero MCTS play formula.

:param game:
:return: actions and their probability
"""

for n in range(self._playout_num):
self._playout(copy.deepcopy(game))

act_visits = [(act, node._visit_num) for act, node in self._current_root._children.items()]
acts, visits = zip(*act_visits)
act_probs = softmax(1.0 / MCTSAlphaGoZeroPlayer.temperature * np.log(np.array(visits) + 1e-10))

return acts, act_probs

在训练模式时,考虑到偏向exploration的目的,在\(\pi\) 落子分布的基础上增加了 Dirichlet 分布。

\[ P(s,a) = (1-\epsilon)*\pi(a \mid s) + \epsilon * \boldsymbol{\eta} \quad (\boldsymbol{\eta} \sim \operatorname{Dir}(0.3)) \]

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MCTSAlphaGoZeroPlayer(BaseAgent):

def get_action(self, board: PyGameBoard) -> Pos:
"""
Method defined in BaseAgent.

:param board:
:return: next move for the given game board.
"""
return self._get_action(copy.deepcopy(board.connect_n_game))[0]

def _get_action(self, game: ConnectNGame) -> Tuple[MoveWithProb]:
epsilon = 0.25
avail_pos = game.get_avail_pos()
move_probs: ActionProbs = np.zeros(game.board_size * game.board_size)
assert len(avail_pos) > 0

# the pi defined in AlphaGo Zero paper
acts, act_probs = self._next_step_play_act_probs(game)
move_probs[list(acts)] = act_probs
if self._is_training:
# add Dirichlet Noise when training in favour of exploration
p_ = (1-epsilon) * act_probs + epsilon * np.random.dirichlet(0.3 * np.ones(len(act_probs)))
move = np.random.choice(acts, p=p_)
assert move in game.get_avail_pos()
else:
move = np.random.choice(acts, p=act_probs)

self.reset()
return move, move_probs

一次完整的对弈

一次完整的AI对弈就是从初始局面迭代play直至游戏结束,对弈生成的数据是一系列的 $(s, , z) $。

如下图 s0 到 s5 是某次井字棋的对弈。最终结局是先手黑棋玩家赢,即对于黑棋玩家 z = +1。需要注意的是:z = +1 是对于所有黑棋面临的局面,即s0, s2, s4,而对应的其余白棋玩家来说 z = -1。

一局完整对弈

\[ \begin{align*} &0: (s_0, \vec{\pi_0}, +1) \\ &1: (s_1, \vec{\pi_1}, -1) \\ &2: (s_2, \vec{\pi_2}, +1) \\ &3: (s_3, \vec{\pi_3}, -1) \\ &4: (s_4, \vec{\pi_4}, +1) \end{align*} \]

以下代码展示如何在AI对弈时收集数据 $(s, , z) $

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MCTSAlphaGoZeroPlayer(BaseAgent):

def self_play_one_game(self, game: ConnectNGame) \
-> List[Tuple[NetGameState, ActionProbs, NDArray[(Any), np.float]]]:
"""

:param game:
:return:
Sequence of (s, pi, z) of a complete game play. The number of list is the game play length.
"""

states: List[NetGameState] = []
probs: List[ActionProbs] = []
current_players: List[np.float] = []

while not game.game_over:
move, move_probs = self._get_action(game)
states.append(convert_game_state(game))
probs.append(move_probs)
current_players.append(game.current_player)
game.move(move)

current_player_z = np.zeros(len(current_players))
current_player_z[np.array(current_players) == game.game_result] = 1.0
current_player_z[np.array(current_players) == -game.game_result] = -1.0
self.reset()

return list(zip(states, probs, current_player_z))

Playout 代码实现

一次playout会从当前局面根据PUCT selection规则下沉到叶子节点,如果此叶子节点非游戏终结点,则会扩展当前节点生成下一层新节点,其先验分布由策略价值网络输出的action分布决定。一次playout最终会得到叶子节点的 v 值,并沿着MCTS树向上更新沿途的所有父节点 Q值。 从上一篇文章已知,游戏节点的数量随着参数而指数级增长,举例来说,井字棋(k=3,m=n=3)的状态数量是5478,k=3,m=n=4时是6035992 ,k=m=n=4时是9722011 。如果我们将初始局面节点作为根节点,同时保存海量playout探索得到的局面节点,实现时会发现我们无法将所有探索到的局面节点都保存在内存中。这里的一种解决方法是在一次self play中每轮playout之后,将根节点重置成落子的节点,从而有效控制整颗局面树中的节点数量。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MCTSAlphaGoZeroPlayer(BaseAgent):

def _playout(self, game: ConnectNGame):
"""
From current game status, run a sequence down to a leaf node, either because game ends or unexplored node.
Get the leaf value of the leaf node, either the actual reward of game or action value returned by policy net.
And propagate upwards to root node.

:param game:
"""
player_id = game.current_player

node = self._current_root
while True:
if node.is_leaf():
break
act, node = node.select()
game.move(act)

# now game state is a leaf node in the tree, either a terminal node or an unexplored node
act_and_probs: Iterator[MoveWithProb]
act_and_probs, leaf_value = self._policy_value_net.policy_value_fn(game)

if not game.game_over:
# case where encountering an unexplored leaf node, update leaf_value estimated by policy net to root
for act, prob in act_and_probs:
game.move(act)
child_node = node.expand(act, prob)
game.undo()
else:
# case where game ends, update actual leaf_value to root
if game.game_result == ConnectNGame.RESULT_TIE:
leaf_value = ConnectNGame.RESULT_TIE
else:
leaf_value = 1 if game.game_result == player_id else -1
leaf_value = float(leaf_value)

# Update leaf_value and propagate up to root node
node.propagate_to_root(-leaf_value)

编码游戏局面

为了将信息有效的传递给策略神经网络,必须从当前玩家的角度编码游戏局面。局面不仅要反映棋盘上黑白棋子的位置,也需要考虑最后一个落子的位置以及是否为当前玩家棋局。因此,我们将某局面按照当前玩家来编码,返回类型为4个棋盘大小组成的ndarray,即shape [4, board_size, board_size],其中

  1. 第一个数组编码当前玩家的棋子位置
  2. 第二个数组编码对手玩家棋子位置
  3. 第三个表示最后落子位置
  4. 第四个全1表示此局面为先手(黑棋)局面,全0表示白棋局面

例如之前游戏对弈中的前四步:

s1->s2 后局面s2的编码:当前玩家为黑棋玩家,编码局面s2 返回如下ndarray,数组[0] 为s2黑子位置,[1]为白子位置,[2]表示最后一个落子(1, 1) ,[3] 全1表示当前是黑棋落子的局面。

编码黑棋玩家局面 s2
s2->s3 后局面s3的编码:当前玩家为白棋玩家,编码返回如下,数组[0] 为s3白子位置,[1]为黑子位置,[2]表示最后一个落子(1, 0) ,[3] 全0表示当前是白棋落子的局面。
编码白棋玩家局面 s3

具体代码实现如下。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
NetGameState = NDArray[(4, Any, Any), np.int]


def convert_game_state(game: ConnectNGame) -> NetGameState:
"""
Converts game state to type NetGameState as ndarray.

:param game:
:return:
Of shape 4 * board_size * board_size.
[0] is current player positions.
[1] is opponent positions.
[2] is last move location.
[3] all 1 meaning move by black player, all 0 meaning move by white.
"""
state_matrix = np.zeros((4, game.board_size, game.board_size))

if game.action_stack:
actions = np.array(game.action_stack)
move_curr = actions[::2]
move_oppo = actions[1::2]
for move in move_curr:
state_matrix[0][move] = 1.0
for move in move_oppo:
state_matrix[1][move] = 1.0
# indicate the last move location
state_matrix[2][actions[-1]] = 1.0
if len(game.action_stack) % 2 == 0:
state_matrix[3][:, :] = 1.0 # indicate the colour to play
return state_matrix[:, ::-1, :]

策略价值网络训练

策略价值网络是一个共享参数 \(\theta\) 的双头网络,给定上面的游戏局面编码会产生预估的p和v。

\[ \vec{p_{\theta}}, v_{\theta}=f_{\theta}(s) \] 结合真实游戏对弈后产生三元组数据 $(s, , z) $ ,按照论文中的loss 来训练神经网络。 \[ l=\sum_{t}\left(v_{\theta}\left(s_{t}\right)-z_{t}\right)^{2}-\vec{\pi_{t}} \cdot \log \left(\vec{p_{\theta}}\left(s_{t}\right)\right) + c {\lVert \theta \rVert}^2 \]

下面代码为Pytorch backward部分。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def backward_step(self, state_batch: List[NetGameState], probs_batch: List[ActionProbs],
value_batch: List[NDArray[(Any), np.float]], lr) -> Tuple[float, float]:
if self.use_gpu:
state_batch = Variable(torch.FloatTensor(state_batch).cuda())
probs_batch = Variable(torch.FloatTensor(probs_batch).cuda())
value_batch = Variable(torch.FloatTensor(value_batch).cuda())
else:
state_batch = Variable(torch.FloatTensor(state_batch))
probs_batch = Variable(torch.FloatTensor(probs_batch))
value_batch = Variable(torch.FloatTensor(value_batch))

self.optimizer.zero_grad()
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr

log_act_probs, value = self.policy_value_net(state_batch)
# loss = (z - v)^2 - pi*T * log(p) + c||theta||^2
value_loss = F.mse_loss(value.view(-1), value_batch)
policy_loss = -torch.mean(torch.sum(probs_batch * log_act_probs, 1))
loss = value_loss + policy_loss
loss.backward()
self.optimizer.step()
entropy = -torch.mean(torch.sum(torch.exp(log_act_probs) * log_act_probs, 1))
return loss.item(), entropy.item()

参考资料

AlphaGo Zero是Deepmind 最后一代AI围棋算法,因为已经达到了棋类游戏AI的终极目的:给定任何游戏规则,AI从零出发只通过自我对弈的方式提高,最终可以取得超越任何对手(包括顶级人类棋手和上一代AlphaGo)的能力。换种方式说,当给定足够多的时间和计算资源,可以取得无限逼近游戏真实解的能力。这一篇,我们深入分析AlphaGo Zero的设计理念和关键组件的细节并解释组件之间的关联。下一篇中,我们将在已有的N子棋OpenAI Gym 环境中用Pytorch实现一个简化版的AlphaGo Zero算法。

AlphaGo Zero 综述

AlphaGo Zero 作为Deepmind在围棋领域的最后一代AI Agent,已经可以达到棋类游戏的终极目标:在只给定游戏规则的情况下,AI 棋手从最初始的随机状态开始,通过不断的自我对弈的强化学习来实现超越以往任何人类棋手和上一代Alpha的能力,并且同样的算法和模型应用到了其他棋类也得出相同的效果。这一篇,从原理上来解析AlphaGo Zero的运行方式。

AlphaGo Zero 算法由三种元素构成:强化学习(RL)、深度学习(DL)和蒙特卡洛树搜索(MCTS,Monte Carlo Tree Search)。核心思想是基于神经网络的Policy Iteration强化学习,即最终学的是一个深度学习的policy network,输入是某棋盘局面 s,输出是此局面下可走位的概率分布:\(p(a|s)\)

在第一代AlphaGo算法中,这个初始policy network通过收集专业人类棋手的海量棋局训练得来,再采用传统RL 的Monte Carlo Tree Search Rollout 技术来强化现有的AI对于局面落子(Policy Network)的判断。Monte Carlo Tree Search Rollout 简单说来就是海量棋局模拟,AI Agent在通过现有的Policy Network策略完成一次从某局面节点到最终游戏胜负结束的对弈,这个完整的对弈叫做rollout,又称playout。完成一次rollout之后,通过局面树层层回溯到初始局面节点,并在回溯过程中同步修订所有经过的局面节点的统计指标,修正原先policy network对于落子导致输赢的判断。通过海量并发的棋局模拟来提升基准policy network,即在各种局面下提高好的落子的\(p(a_{win}|s)\),降低坏的落子的\(p(a_{lose}|s)\)

举例如下井字棋局面:
局面s

基准policy network返回 p(s) 如下 \[ p(a|s) = \begin{align*} \left\lbrace \begin{array}{r@{}l} 0.1, & & a = (0,2) \\ 0.05, & & a = (1,0) \\ 0.5, & & a = (1,1) \\ 0.05, & & a = (1,2)\\ 0.2, & & a = (2,0) \\ 0.05, & & a = (2,1) \\ 0.05, & & a = (2,2) \end{array} \right. \end{align*} \] 通过海量并发模拟后,修订成如下的action概率分布,然后通过policy iteration迭代新的网络来逼近 \(p'\) 就提高了棋力。 \[ p'(a|s) = \begin{align*} \left\lbrace \begin{array}{r@{}l} 0, & & a = (0,2) \\ 0, & & a = (1,0) \\ 0.9, & & a = (1,1) \\ 0, & & a = (1,2)\\ 0, & & a = (2,0) \\ 0, & & a = (2,1) \\ 0.1, & & a = (2,2) \end{array} \right. \end{align*} \]

蒙特卡洛树搜索(MCTS)概述

Monte Carlo Tree Search 是Monte Carlo 在棋类游戏中的变种,棋类游戏的一大特点是可以用动作(move)联系的决策树来表示,树的节点数量取决于分支的数量和树的深度。MCTS的目的是在树节点非常多的情况下,通过实验模拟(rollout, playout)的方式来收集尽可能多的局面输赢情况,并基于这些统计信息,将搜索资源的重点均衡地放在未被探索的节点和值得探索的节点上,减少在大概率输的节点上的模拟资源投入。传统MCTS有四个过程:Selection, Expansion, Simulation 和Backpropagation。下图是Wikipedia 的例子:

  • Selection:从根节点出发,根据现有统计的信息和selection规则,选择子节点递归向下做决定,后面我们会详细介绍AlphaGo的UCB规则。图中节点的数字,例如根节点11/21,分别代表赢的次数和总模拟次数。从根节点一路向下分别选择节点 7/10, 1/6直到叶子节点3/3,叶子节点表示它未被探索过。
  • Expansion:由于3/3节点未被探索过,初始化其所有子节点为0/0,图中3/3只有一个子节点。后面我们会看到神经网络在初始化子节点的时候起到的指导作用,即所有子节点初始权重并非相同,而是由神经网络给出估计。
  • Simulation:重复selection和expansion,根据游戏规则递归向下直至游戏结束。
  • Backpropagation:游戏结束在终点节点产生游戏真实的价值,回溯向上调整所有父节点的统计状态。

权衡 Exploration 和 Exploitation

在不断扩张决策树并收集节点统计信息的同时,MCTS根据规则来权衡探索目的(采样不足)或利用目的来做决策,这个权衡规则叫做Upper Confidence Bound(UCB)。典型的UCB公式如下:w表示通过节点的赢的次数,n表示通过节点的总次数,N是父节点的访问次数,c是调节Exploration 和 Exploitation权重的超参。

\[ {\frac{w_i}{n_i}} + c \sqrt{\frac{\ln N_i}{n_i}} \]

假设某节点有两个子节点s1, s2,它们的统计指标为 s1: w/n = 3/4,s2: w/n = 6/8,由于两者输赢比率一样,因此根据公式,访问次数少的节点出于Exploration的目的胜出,MCTS最终决定从s局面走向s1。

从第一性原理来理解AlphaGo Zero

前一代的AlphaGo已经战胜了世界冠军,取得了空前的成就,AlphaGo Zero 的设计目标变得更加General,去除围棋相关的处理和知识,用统一的框架和算法来解决棋类问题。 1. 无人工先验数据

改进之前需要专家棋手对弈数据来冷启动初始棋力

  1. 无特定游戏特征工程

    无需围棋特定技巧,只包含下棋规则,可以适用到所有棋类游戏

  2. 单一神经网络

    统一Policy Network和Value Network,使用一个共享参数的双头神经网络

  3. 简单树搜索

    去除传统MCTS的Rollout 方式,用神经网络来指导MCTS更有效产生搜索策略

搜索空间的两个优化原则

尽管理论上围棋是有解的,即先手必赢、被逼平或必输,通过遍历所有可能局面可以求得解。同理,通过海量模拟所有可能游戏局面,也可以无限逼近所有局面下的真实输赢概率,直至收敛于局面落子的确切最佳结果。但由于围棋棋局的数目远远大于宇宙原子数目,3^361 >> 10^80,因此需要将计算资源有效的去模拟值得探索的局面,例如对于显然的被动局面减小模拟次数,所以如何有效地减小搜索空间是AlphaGo Zero 需要解决的重大问题。David Silver 在Deepmind AlphaZero - Mastering Games Without Human Knowledge中提到AlphaGo Zero 采用两个原则来有效减小搜索空间。

原则1: 通过Value Network减少搜索的深度

Value Network 通过预测给定局面的value来直接预测最终结果,思想和上一期Minimax DP 策略中直接缓存当前局面的胜负状态一样,减少每次必须靠模拟到最后才能知道当前局面的输赢概率,或者需要多层树搜索才能知道输赢概率。

原则2: 通过Policy Network减少搜索的宽度

搜索广度的减少是由Policy Network预估来达成的,将下一步搜索局限在高概率的动作上,大幅度提升原先MCTS新节点生成后冷启动的搜索宽度。

神经网络结构

AlphaGo Zero 使用一个单一的深度神经网络来完成policy 和value的预测。具体实现方式是将policy network和value network合并成一个共享参数 $ $ 的双头网络。其中z是真实游戏结局的效用,范围为[-1, 1] 。

\[ (p, v)=f_{\theta}(s) \] \[ p_{a}=\operatorname{Pr}(a \mid s) \] \[ v = \mathop{\mathbb{E}}[z|s] \]

Monte Carlo Tree Search (MCTS) 建立了棋局搜索树,节点的初始状态由神经网络输出的p和v值来估计,由此初始的动作策略和价值预判就会建立在高手的水平之上。模拟一局游戏之后向上回溯,会同步更新路径上节点的统计数值并生成更好的MCTS搜索策略 \(\vec{\pi}\)。进一步来看,MCTS和神经网络互相形成了正循环。神经网络指导了未知节点的MCTS初始搜索策略,产生自我对弈游戏结局后,通过减小 \(\vec{p}\)\(\vec{\pi}\)的 Loss ,最终又提高了神经网络对于局面的估计能力。神经网络value network的提升也是通过不断减小网络预测的结果和最终结果的差异来提升。 因此,具体神经网络的Loss函数由三部分组成,value network的损失,policy network的损失以及正则项。 \[ l=\sum_{t}\left(v_{\theta}\left(s_{t}\right)-z_{t}\right)^{2}-\vec{\pi}_{t} \cdot \log \left(\vec{p}_{\theta}\left(s_{t}\right)\right) + c {\lVert \theta \rVert}^2 \]

AlphaGo Zero MCTS 具体过程

AlphaGo Plays Games Against Itself

AlphaGo Zero的MCTS和传统MCTS都有相似的四个过程,但AlphaGo Zero的MCTS步骤相对更复杂。 首先,除了W/N统计指标之外,AlphaGo Zero的MCTS保存了决策边 a|s 的Q(s,a):Action Value,也就是Q-Learning中的Q值,其初始值由神经网络给出。此外,Q 值也用于串联自底向上更新节点的Value值。具体说来,当某个新节点被Explore后,会将网络给出的Q值向上传递,并逐层更新父节点的Q值。当游戏结局产生时,也会向上更新所有父节点的Q值。 此外对于某一游戏局面s进行多次模拟,每次在局面s出发向下探索,每次探索在已知节点按Selection规则深入一步,直至达到未探索的局面或者游戏结束,产生Q值后向上回溯到最初局面s,回溯过程中更新路径上的局面的统计值或者Q值。在多次模拟结束后根据Play的算法,决定局面s的下一步行动。尽管每次模拟探索可能会深入多层,但最终play阶段的算法规则仅决定给定局面s的下一层落子动作。多次向下探索的优势在于:

  1. 探索和采样更多的叶子节点,在更多信息下做决策。

  2. 通过average out多次模拟下一层落子决定,尽可能提升MCTS策略的下一步判断能力,提高 \(\pi\) 能力,更有效指导神经网络,提高其学习效率。

New Policy Network V' is Trained to Predict Winner
  1. Selection:

从游戏局面s开始,选择a向下递归,直至未展开的节点(搜索树中的叶子节点)或者游戏结局。具体在局面s下选择a的规则由以下UCB(Upper Confidence Bound)决定
\[ a=\operatorname{argmax}_a(Q(s,a) + u(s,a)) \]

其中,Q(s,a) 和u(s,a) 项分别代表Exploitation 和Exploration。两项相加来均衡Exploitation和Exploration,保证初始时每个节点被explore,在有足够多的信息时逐渐偏向exploitation。

\[ u(s, a)=c_{p u c t} \cdot P(s, a) \cdot \frac{\sqrt{\Sigma_{b} N(s, b)}}{1+N(s, a)} \]

  1. Expand

当遇到一个未展开的节点(搜索树中的叶子节点)时,对其每个子节点使用现有网络进行预估,即

\[ (p(s), v(s))=f_{\theta}(s) \]

  1. Backup

当新的叶子节点展开时或者到达终点局面时,向上更新父节点的Q值,具体公式为 \[ Q(s, a)=\frac{1}{N(s, a)} \sum_{s^{\prime} \mid s, a \rightarrow s^{\prime}} V\left(s^{\prime}\right) \]

  1. Play

多次模拟结束后,使用得到搜索概率分布 ${a} $来确定最终的落子动作。正比于访问次数的某次方 $ {a} N(s, a)^{1 / }\(,其中\)$为温度参数(temperature parameter)。

New Policy Network V' is Trained to Predict Winner

参考资料

继上一篇介绍了Minimax 和Alpha Beta 剪枝算法之后,本篇选择了Leetcode中的井字棋游戏题目,积累相关代码后实现井字棋游戏并扩展到五子棋和N子棋(战略井字棋),随后用Minimax和Alpha Beta剪枝算法解得小规模下N子棋的游戏结局,并分析其状态数量和每一步的最佳策略。后续篇章中,我们基于本篇代码完成一个N子棋的OpenAI Gym 图形环境,可用于人机对战或机器对战,并最终实现棋盘规模稍大的五子棋或者N子棋中的蒙特卡洛树搜索(MCTS)算法。

Leetcode 上的井字棋系列

Leetcode 1275. 找出井字棋的获胜者 (简单)

A 和 B 在一个 3 x 3 的网格上玩井字棋。
井字棋游戏的规则如下:
玩家轮流将棋子放在空方格 (" ") 上。
第一个玩家 A 总是用 "X" 作为棋子,而第二个玩家 B 总是用 "O" 作为棋子。
"X" 和 "O" 只能放在空方格中,而不能放在已经被占用的方格上。
只要有 3 个相同的(非空)棋子排成一条直线(行、列、对角线)时,游戏结束。
如果所有方块都放满棋子(不为空),游戏也会结束。
游戏结束后,棋子无法再进行任何移动。
给你一个数组 moves,其中每个元素是大小为 2 的另一个数组(元素分别对应网格的行和列),它按照 A 和 B 的行动顺序(先 A 后 B)记录了两人各自的棋子位置。
如果游戏存在获胜者(A 或 B),就返回该游戏的获胜者;如果游戏以平局结束,则返回 "Draw";如果仍会有行动(游戏未结束),则返回 "Pending"。
你可以假设 moves 都 有效(遵循井字棋规则),网格最初是空的,A 将先行动。

示例 1:
输入:moves = [[0,0],[2,0],[1,1],[2,1],[2,2]]
输出:"A"
解释:"A" 获胜,他总是先走。
"X " "X " "X " "X " "X "
" " -> " " -> " X " -> " X " -> " X "
" " "O " "O " "OO " "OOX"

示例 2: 输入:moves = [[0,0],[1,1],[0,1],[0,2],[1,0],[2,0]]
输出:"B"
解释:"B" 获胜。
"X " "X " "XX " "XXO" "XXO" "XXO"
" " -> " O " -> " O " -> " O " -> "XO " -> "XO "
" " " " " " " " " " "O "

第一种解法,检查A或者B赢的所有可能情况:某玩家占据8种连线的任意一种情况则胜利,我们使用八个变量来保存所有情况。下面的代码使用了一个小技巧,将moves转换成3x3的棋盘状态数组,元素的值为1,-1和0。1,-1代表两个玩家,0代表空的棋盘格子,其优势在于后续我们只需累加棋盘的值到八个变量中关联的若干个,再检查这八个变量是否满足取胜条件。例如,row[0]表示第一行的状态,当遍历一次所有棋盘格局后,row[0]为第一行的3个格子的总和,只有当row[0] == 3 才表明玩家A占据了第一行,-3表明玩家B占据了第一行。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# AC
from typing import List

class Solution:
def tictactoe(self, moves: List[List[int]]) -> str:
board = [[0] * 3 for _ in range(3)]
for idx, xy in enumerate(moves):
player = 1 if idx % 2 == 0 else -1
board[xy[0]][xy[1]] = player

turn = 0
row, col = [0, 0, 0], [0, 0, 0]
diag1, diag2 = False, False
for r in range(3):
for c in range(3):
turn += board[r][c]
row[r] += board[r][c]
col[c] += board[r][c]
if r == c:
diag1 += board[r][c]
if r + c == 2:
diag2 += board[r][c]

oWin = any(row[r] == 3 for r in range(3)) or any(col[c] == 3 for c in range(3)) or diag1 == 3 or diag2 == 3
xWin = any(row[r] == -3 for r in range(3)) or any(col[c] == -3 for c in range(3)) or diag1 == -3 or diag2 == -3

return "A" if oWin else "B" if xWin else "Draw" if len(moves) == 9 else "Pending"

下面我们给出另一种解法,这种解法虽然代码较多,但可以不必遍历棋盘每个格子,比上一种严格遍历一次棋盘的解法略为高效。原理如下,题目保证了moves过程中不会产生输赢结果,因此我们直接检查最后一个棋子向外的八个方向,若任意方向有三连子,则此玩家获胜。这种解法主要是为后续井字棋扩展到五子棋时判断每个落子是否产生输赢做代码准备。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# AC
from typing import List

class Solution:
def checkWin(self, r: int, c: int) -> bool:
north = self.getConnectedNum(r, c, -1, 0)
south = self.getConnectedNum(r, c, 1, 0)

east = self.getConnectedNum(r, c, 0, 1)
west = self.getConnectedNum(r, c, 0, -1)

south_east = self.getConnectedNum(r, c, 1, 1)
north_west = self.getConnectedNum(r, c, -1, -1)

north_east = self.getConnectedNum(r, c, -1, 1)
south_west = self.getConnectedNum(r, c, 1, -1)

if (north + south + 1 >= 3) or (east + west + 1 >= 3) or \
(south_east + north_west + 1 >= 3) or (north_east + south_west + 1 >= 3):
return True
return False

def getConnectedNum(self, r: int, c: int, dr: int, dc: int) -> int:
player = self.board[r][c]
result = 0
i = 1
while True:
new_r = r + dr * i
new_c = c + dc * i
if 0 <= new_r < 3 and 0 <= new_c < 3:
if self.board[new_r][new_c] == player:
result += 1
else:
break
else:
break
i += 1
return result

def tictactoe(self, moves: List[List[int]]) -> str:
self.board = [[0] * 3 for _ in range(3)]
for idx, xy in enumerate(moves):
player = 1 if idx % 2 == 0 else -1
self.board[xy[0]][xy[1]] = player

# only check last move
r, c = moves[-1]
win = self.checkWin(r, c)
if win:
return "A" if len(moves) % 2 == 1 else "B"

return "Draw" if len(moves) == 9 else "Pending"

Leetcode 794. 有效的井字游戏 (中等)

用字符串数组作为井字游戏的游戏板 board。当且仅当在井字游戏过程中,玩家有可能将字符放置成游戏板所显示的状态时,才返回 true。
该游戏板是一个 3 x 3 数组,由字符 " ","X" 和 "O" 组成。字符 " " 代表一个空位。
以下是井字游戏的规则:
玩家轮流将字符放入空位(" ")中。
第一个玩家总是放字符 “X”,且第二个玩家总是放字符 “O”。
“X” 和 “O” 只允许放置在空位中,不允许对已放有字符的位置进行填充。
当有 3 个相同(且非空)的字符填充任何行、列或对角线时,游戏结束。
当所有位置非空时,也算为游戏结束。
如果游戏结束,玩家不允许再放置字符。

示例 1:
输入: board = ["O ", " ", " "]
输出: false
解释: 第一个玩家总是放置“X”。

示例 2:
输入: board = ["XOX", " X ", " "]
输出: false
解释: 玩家应该是轮流放置的。

示例 3:
输入: board = ["XXX", " ", "OOO"]
输出: false

示例 4:
输入: board = ["XOX", "O O", "XOX"]
输出: true
说明:

游戏板 board 是长度为 3 的字符串数组,其中每个字符串 board[i] 的长度为 3。 board[i][j] 是集合 {" ", "X", "O"} 中的一个字符。

这道题第一反应是需要DFS来判断给定状态是否可达,但其实可以用上面1275的思路,即通过检验最终棋盘的一些特点来判断给定状态是否合法。比如,X和O的数量只有可能相同,或X比O多一个。其关键在于需要找到判断状态合法的充要条件,就可以在\(O(1)\) 时间复杂度完成判断。 此外,这道题给了我们井字棋所有可能状态数量的启示。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# AC
from typing import List

class Solution:

def convertCell(self, c:str):
return 1 if c == 'X' else -1 if c == 'O' else 0

def validTicTacToe(self, board: List[str]) -> bool:
turn = 0
row, col = [0, 0, 0], [0, 0, 0]
diag1, diag2 = False, False
for r in range(3):
for c in range(3):
turn += self.convertCell(board[r][c])
row[r] += self.convertCell(board[r][c])
col[c] += self.convertCell(board[r][c])
if r == c:
diag1 += self.convertCell(board[r][c])
if r + c == 2:
diag2 += self.convertCell(board[r][c])

xWin = any(row[r] == 3 for r in range(3)) or any(col[c] == 3 for c in range(3)) or diag1 == 3 or diag2 == 3
oWin = any(row[r] == -3 for r in range(3)) or any(col[c] == -3 for c in range(3)) or diag1 == -3 or diag2 == -3
if (xWin and turn == 0) or (oWin and turn == 1):
return False
return (turn == 0 or turn == 1) and (not xWin or not oWin)

Leetcode 348. 判定井字棋胜负 (中等,加锁)

请在 n × n 的棋盘上,实现一个判定井字棋(Tic-Tac-Toe)胜负的神器,判断每一次玩家落子后,是否有胜出的玩家。
在这个井字棋游戏中,会有 2 名玩家,他们将轮流在棋盘上放置自己的棋子。
在实现这个判定器的过程中,你可以假设以下这些规则一定成立:
每一步棋都是在棋盘内的,并且只能被放置在一个空的格子里;
一旦游戏中有一名玩家胜出的话,游戏将不能再继续;
一个玩家如果在同一行、同一列或者同一斜对角线上都放置了自己的棋子,那么他便获得胜利。

示例: 给定棋盘边长 n = 3, 玩家 1 的棋子符号是 "X",玩家 2 的棋子符号是 "O"。
TicTacToe toe = new TicTacToe(3);
toe.move(0, 0, 1); -> 函数返回 0 (此时,暂时没有玩家赢得这场对决)
|X| | |
| | | | // 玩家 1 在 (0, 0) 落子。
| | | |

toe.move(0, 2, 2); -> 函数返回 0 (暂时没有玩家赢得本场比赛)
|X| |O|
| | | | // 玩家 2 在 (0, 2) 落子。
| | | |

toe.move(2, 2, 1); -> 函数返回 0 (暂时没有玩家赢得比赛)
|X| |O|
| | | | // 玩家 1 在 (2, 2) 落子。
| | |X|

toe.move(1, 1, 2); -> 函数返回 0 (暂没有玩家赢得比赛)
|X| |O|
| |O| | // 玩家 2 在 (1, 1) 落子。
| | |X|

toe.move(2, 0, 1); -> 函数返回 0 (暂无玩家赢得比赛)
|X| |O|
| |O| | // 玩家 1 在 (2, 0) 落子。
|X| |X|

toe.move(1, 0, 2); -> 函数返回 0 (没有玩家赢得比赛)
|X| |O|
|O|O| | // 玩家 2 在 (1, 0) 落子.
|X| |X|

toe.move(2, 1, 1); -> 函数返回 1 (此时,玩家 1 赢得了该场比赛)
|X| |O|
|O|O| | // 玩家 1 在 (2, 1) 落子。
|X|X|X|

348 是道加锁题,对于每次玩家的move,可以用1275第二种解法中的checkWin 函数。下面代码给出了另一种基于1275解法一的方法:保存八个关键变量,每次落子后更新这个子所关联的某几个变量。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# AC
class TicTacToe:

def __init__(self, n:int):
"""
Initialize your data structure here.
:type n: int
"""
self.row, self.col, self.diag1, self.diag2, self.n = [0] * n, [0] * n, 0, 0, n

def move(self, row:int, col:int, player:int) -> int:
"""
Player {player} makes a move at ({row}, {col}).
@param row The row of the board.
@param col The column of the board.
@param player The player, can be either 1 or 2.
@return The current winning condition, can be either:
0: No one wins.
1: Player 1 wins.
2: Player 2 wins.
"""
if player == 2:
player = -1

self.row[row] += player
self.col[col] += player
if row == col:
self.diag1 += player
if row + col == self.n - 1:
self.diag2 += player

if self.n in [self.row[row], self.col[col], self.diag1, self.diag2]:
return 1
if -self.n in [self.row[row], self.col[col], self.diag1, self.diag2]:
return 2
return 0


井字棋最佳策略

井字棋的规模可以很自然的扩展成四子棋或五子棋等,区别在于棋盘大小和胜利时的连子数量。这类游戏最一般的形式为 M,n,k-game,中文可能翻译为战略井字游戏,表示棋盘大小为M x N,当k连子时获胜。 下面的ConnectNGame类实现了战略井字游戏(M=N)中,两个玩家轮流下子、更新棋盘状态和判断每次落子输赢等逻辑封装。其中undo方法用于撤销最后一个落子,方便在后续寻找最佳策略时回溯。

ConnectNGame

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ConnectNGame:

PLAYER_A = 1
PLAYER_B = -1
AVAILABLE = 0
RESULT_TIE = 0
RESULT_A_WIN = 1
RESULT_B_WIN = -1

def __init__(self, N:int = 3, board_size:int = 3):
assert N <= board_size
self.N = N
self.board_size = board_size
self.board = [[ConnectNGame.AVAILABLE] * board_size for _ in range(board_size)]
self.gameOver = False
self.gameResult = None
self.currentPlayer = ConnectNGame.PLAYER_A
self.remainingPosNum = board_size * board_size
self.actionStack = []

def move(self, r: int, c: int) -> int:
"""

:param r:
:param c:
:return: None: game ongoing
"""
assert self.board[r][c] == ConnectNGame.AVAILABLE
self.board[r][c] = self.currentPlayer
self.actionStack.append((r, c))
self.remainingPosNum -= 1
if self.checkWin(r, c):
self.gameOver = True
self.gameResult = self.currentPlayer
return self.currentPlayer
if self.remainingPosNum == 0:
self.gameOver = True
self.gameResult = ConnectNGame.RESULT_TIE
return ConnectNGame.RESULT_TIE
self.currentPlayer *= -1

def undo(self):
if len(self.actionStack) > 0:
lastAction = self.actionStack.pop()
r, c = lastAction
self.board[r][c] = ConnectNGame.AVAILABLE
self.currentPlayer = ConnectNGame.PLAYER_A if len(self.actionStack) % 2 == 0 else ConnectNGame.PLAYER_B
self.remainingPosNum += 1
self.gameOver = False
self.gameResult = None
else:
raise Exception('No lastAction')

def getAvailablePositions(self) -> List[Tuple[int, int]]:
return [(i,j) for i in range(self.board_size) for j in range(self.board_size) if self.board[i][j] == ConnectNGame.AVAILABLE]

def getStatus(self) -> Tuple[Tuple[int, ...]]:
return tuple([tuple(self.board[i]) for i in range(self.board_size)])

其中checkWin和1275解法二中的逻辑一致。

Minimax 算法

此战略井字游戏的逻辑代码,结合之前的minimax算法,可以实现游戏最佳策略。

先定义一个通用的策略基类和抽象方法 action。action表示给定一个棋盘状态,返回一个动作决定。返回Tuple的第一个int值表示估计走这一步的结局,第二个值类型是Tuple[int, int],表示这次落子的位置,例如(1,1)。

{linenos
1
2
3
4
5
6
7
8
class Strategy(ABC):

def __init__(self):
super().__init__()

@abstractmethod
def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
pass
MinimaxStrategy 的逻辑和之前的minimax模版算法大致相同,多了保存最佳move对应的动作,用于最后返回。
{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class MinimaxStrategy(Strategy):
def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
self.game = copy.deepcopy(game)
result, move = self.minimax()
return result, move

def minimax(self) -> Tuple[int, Tuple[int, int]]:
game = self.game
bestMove = None
assert not game.gameOver
if game.currentPlayer == ConnectNGame.PLAYER_A:
ret = -math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
result, oppMove = self.minimax()
game.undo()
ret = max(ret, result)
bestMove = move if ret == result else bestMove
if ret == 1:
return 1, move
return ret, bestMove
else:
ret = math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
result, oppMove = self.minimax()
game.undo()
ret = min(ret, result)
bestMove = move if ret == result else bestMove
if ret == -1:
return -1, move
return ret, bestMove
通过上面的代码可以画出初始两步的井字棋最终结局。对于先手O来说可以落9个位置,排除对称位置后只有三种,分别为角落,边上和正中。但无论哪一个位置作为先手,最好的结局都是被对方逼平,不存在必赢的开局。所以井字棋的结局是:如果两个玩家都采用最优策略(无失误),游戏结果为双方逼平。
井字棋第一步结局
下面分别画出三种开局后进一步的游戏结局。
井字棋角落开局
井字棋边上开局
井字棋中间开局

井字棋游戏状态数和解

有趣的是井字棋游戏的状态数量,简单的上限估算是\(3^9=19683\)。这显然是个较宽泛的上限,因为很多状态在游戏结束后无法达到。 这篇文章 Tic-Tac-Toe (Naughts and Crosses, Cheese and Crackers, etc 中列出了每一步的状态数,合计5478个。

Moves Positions Terminal Positions
0 1
1 9
2 72
3 252
4 756
5 1260 120
6 1520 148
7 1140 444
8 390 168
9 78 78
Total 5478 958

我们已经实现了井字棋的minimax策略,算法本质上遍历了所有情况,稍加改造后增加dp数组,就可以确认上面的总状态数。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

class CountingMinimaxStrategy(Strategy):
def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
self.game = copy.deepcopy(game)
self.dpMap = {}
result, move = self.minimax(game.getStatus())
return result, move

def minimax(self, gameStatus: Tuple[Tuple[int, ...]]) -> Tuple[int, Tuple[int, int]]:
# print(f'Current {len(strategy.dpMap)}')

if gameStatus in self.dpMap:
return self.dpMap[gameStatus]

game = self.game
bestMove = None
assert not game.gameOver
if game.currentPlayer == ConnectNGame.PLAYER_A:
ret = -math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
result, oppMove = self.minimax(game.getStatus())
self.dpMap[game.getStatus()] = result, oppMove
else:
self.dpMap[game.getStatus()] = result, move
game.undo()
ret = max(ret, result)
bestMove = move if ret == result else bestMove
self.dpMap[gameStatus] = ret, bestMove
return ret, bestMove
else:
ret = math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)

if result is None:
assert not game.gameOver
result, oppMove = self.minimax(game.getStatus())
self.dpMap[game.getStatus()] = result, oppMove
else:
self.dpMap[game.getStatus()] = result, move
game.undo()
ret = min(ret, result)
bestMove = move if ret == result else bestMove
self.dpMap[gameStatus] = ret, bestMove
return ret, bestMove


if __name__ == '__main__':
tic_tac_toe = ConnectNGame(N=3, board_size=3)
strategy = CountingMinimaxStrategy()
strategy.action(tic_tac_toe)
print(f'Game States Number {len(strategy.dpMap)}')

运行程序证实了井字棋状态数为5478,下面是一些极小规模时代码运行结果:

3x3 4x4
k=3 5478 (Draw) 6035992 (Win)
k=4 9722011 (Draw)
k=5

根据 Wikipedia M,n,k-game, 列出了一些小规模下的游戏解:

3x3 4x4 5x5 6x6
k=3 Draw Win Win Win
k=4 Draw Draw Win
k=5 Draw Draw

值得一提的是,五子棋(棋盘15x15或以上)被 L. Victor Allis证明是先手赢。

Alpha-Beta剪枝策略

Alpha Beta 剪枝策略的代码如下(和之前代码比较类似,不再赘述):

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class AlphaBetaStrategy(Strategy):
def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
self.game = game
result, move = self.alpha_beta(self.game.getStatus(), -math.inf, math.inf)
return result, move

def alpha_beta(self, gameStatus: Tuple[Tuple[int, ...]], alpha:int=None, beta:int=None) -> Tuple[int, Tuple[int, int]]:
game = self.game
bestMove = None
assert not game.gameOver
if game.currentPlayer == ConnectNGame.PLAYER_A:
ret = -math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
result, oppMove = self.alpha_beta(game.getStatus(), alpha, beta)
game.undo()
alpha = max(alpha, result)
ret = max(ret, result)
bestMove = move if ret == result else bestMove
if alpha >= beta or ret == 1:
return ret, move
return ret, bestMove
else:
ret = math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
result, oppMove = self.alpha_beta(game.getStatus(), alpha, beta)
game.undo()
beta = min(beta, result)
ret = min(ret, result)
bestMove = move if ret == result else bestMove
if alpha >= beta or ret == -1:
return ret, move
return ret, bestMove

Alpha Beta 的DP版本中,由于lru_cache无法指定cache的有效参数,递归函数并没有传入alpha, beta。因此我们将alpha,beta参数隐式放入自己维护的栈中,并保证栈的状态和alpha_beta_dp函数调用状态一致。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class AlphaBetaDPStrategy(Strategy):
def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
self.game = game
self.alphaBetaStack = [(-math.inf, math.inf)]
result, move = self.alpha_beta_dp(self.game.getStatus())
return result, move

@lru_cache(maxsize=None)
def alpha_beta_dp(self, gameStatus: Tuple[Tuple[int, ...]]) -> Tuple[int, Tuple[int, int]]:
alpha, beta = self.alphaBetaStack[-1]
game = self.game
bestMove = None
assert not game.gameOver
if game.currentPlayer == ConnectNGame.PLAYER_A:
ret = -math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
self.alphaBetaStack.append((alpha, beta))
result, oppMove = self.alpha_beta_dp(game.getStatus())
self.alphaBetaStack.pop()
game.undo()
alpha = max(alpha, result)
ret = max(ret, result)
bestMove = move if ret == result else bestMove
if alpha >= beta or ret == 1:
return ret, move
return ret, bestMove
else:
ret = math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
self.alphaBetaStack.append((alpha, beta))
result, oppMove = self.alpha_beta_dp(game.getStatus())
self.alphaBetaStack.pop()
game.undo()
beta = min(beta, result)
ret = min(ret, result)
bestMove = move if ret == result else bestMove
if alpha >= beta or ret == -1:
return ret, move
return ret, bestMove

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×