#Monte Carlo

这期继续Sutton强化学习第二版,第五章蒙特卡洛方法。在上期21点游戏的策略蒙特卡洛值预测学习了如何用Monte Carlo来预估给定策略 \(\pi\)\(V_{\pi}\) 值之后,这一期我们用Monte Carlo方法来解得21点游戏最佳策略 \(\pi_{*}\)

蒙特卡洛策略提升

回顾一下,在Grid World 策略迭代和值迭代中由于存在Policy Improvement Theorem,我们可以利用环境dynamics信息计算出策略v值,再选取最greedy action的方式改进策略,形成策略提示最终能够不断逼近最佳策略。 \[ \pi_{0} \stackrel{\mathrm{E}}{\longrightarrow} v_{\pi_{0}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{1} \stackrel{\mathrm{E}}{\longrightarrow} v_{\pi_{1}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{2} \stackrel{\mathrm{E}}{\longrightarrow} \cdots \stackrel{\mathrm{I}}{\longrightarrow} \pi_{*} \stackrel{\mathrm{E}}{\longrightarrow} v_{*} \] Monte Carlo Control方法搜寻最佳策略 \(\pi{*}\),是否也能沿用同样的思路呢?答案是可行的。不过,不同于第四章中我们已知环境MDP就知道状态的前后依赖关系,进而从v值中能推断出策略 \(\pi\),在Monte Carlo方法中,环境MDP是未知的,因而我们只能从action-value中下手,通过海量Monte Carlo试验来近似 \(q_{\pi}\)。有了策略 Q 值,再和MDP策略迭代方法一样,选取最greedy action的策略,这种策略提示方式理论上被证明了最终能够不断逼近最佳策略。 \[ \pi_{0} \stackrel{\mathrm{E}}{\longrightarrow} q_{\pi_{0}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{1} \stackrel{\mathrm{E}}{\longrightarrow} q_{\pi_{1}} \stackrel{\mathrm{I}}{\longrightarrow} \pi_{2} \stackrel{\mathrm{E}}{\longrightarrow} \cdots \stackrel{\mathrm{I}}{\longrightarrow} \pi_{*} \stackrel{\mathrm{E}}{\longrightarrow} q_{*} \]

但是此方法有一个前提要满足,由于数据是依据策略 \(\pi_{i}\) 生成的,理论上需要保证在无限次的模拟过程中,每个状态都必须被无限次访问到,才能保证最终每个状态的Q估值收敛到真实的 \(q_{*}\)。满足这个前提的一个简单实现是强制随机环境初始状态,保证每个状态都能有一定概率被生成。这个思路就是 Monte Carlo Control with Exploring Starts算法,伪代码如下:

\[ \begin{align*} &\textbf{Monte Carlo ES (Exploring Starts), for estimating } \pi \approx \pi_{*} \\ & \text{Initialize:} \\ & \quad \pi(s) \in \mathcal A(s) \text{ arbitrarily for all }s \in \mathcal{S} \\ & \quad Q(s, a) \in \mathbb R \text{, arbitrarily, for all }s \in \mathcal{S}, a \in \mathcal A(s) \\ & \quad Returns(s, a) \leftarrow \text{ an empty list, for all }s \in \mathcal{S}, a \in \mathcal A(s)\\ & \\ & \text{Loop forever (for episode):}\\ & \quad \text{Choose } S_0\in \mathcal{S}, A_0 \in \mathcal A(S_0) \text{ randomly such that all pairs have probability > 0} \\ & \quad \text{Generate an episode from } S_0, A_0 \text{, following } \pi : S_0, A_0, R_1, S_1, A_1, R_2, ..., S_{T-1}, A_{T-1}, R_T\\ & \quad G \leftarrow 0\\ & \quad \text{Loop for each step of episode, } t = T-1, T-2, ..., 0:\\ & \quad \quad \quad G \leftarrow \gamma G + R_{t+1}\\ & \quad \quad \quad \text{Unless the pair } S_t, A_t \text{ appears in } S_0, A_0, S_1, A_1, ..., S_{t-1}, A_{t-1}\\ & \quad \quad \quad \quad \text{Append } G \text { to }Returns(S_t, A_t) \\ & \quad \quad \quad \quad Q(S_t, A_t) \leftarrow \operatorname{average}(Returns(S_t, A_t))\\ & \quad \quad \quad \quad \pi(S_t) \leftarrow \operatorname{argmax}_a Q(S_t, a)\\ \end{align*} \]

下面我们实现21点游戏的Monte Carlo ES 算法。21点游戏只有200个有效的状态,可以满足算法要求的生成episode前先随机选择某一状态的前提条件。

相对于上一篇,我们增加 ActionValue和Policy的类型定义,ActionValue表示 \(q(s, a)\) ,是一个State到动作分布的Dict,Policy 类型也一样。Actions为一维ndarray,维数是离散动作数量。

{linenos
1
2
3
4
5
6
State = Tuple[int, int, bool]
Action = bool
Reward = float
Actions = np.ndarray
ActionValue = Dict[State, Actions]
Policy = Dict[State, Actions]

下面代码示例如何给定 Policy后,依据指定状态state的动作分布采样,决定下一动作。

1
2
3
policy: Policy
A: ActionValue = policy[state]
action = np.random.choice([0, 1], p=A/sum(A))

整个算法的 python 代码实现如下:

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def mc_control_exploring_starts(env: BlackjackEnv, num_episodes, discount_factor=1.0) \
-> Tuple[ActionValue, Policy]:
states = list(product(range(10, 22), range(1, 11), (True, False)))
policy = {s: np.ones(env.action_space.n) * 1.0 / env.action_space.n for s in states}
Q = defaultdict(lambda: np.zeros(env.action_space.n))
returns_sum = defaultdict(float)
returns_count = defaultdict(float)

for episode_i in range(1, num_episodes + 1):
s0 = random.choice(states)
reset_env_with_s0(env, s0)
episode_history = gen_custom_s0_stochastic_episode(policy, env, s0)

G = 0
for t in range(len(episode_history) - 1, -1, -1):
s, a, r = episode_history[t]
G = discount_factor * G + r
if not any(s_a_r[0] == s and s_a_r[1] == a for s_a_r in episode_history[0: t]):
returns_sum[s, a] += G
returns_count[s, a] += 1.0
Q[s][a] = returns_sum[s, a] / returns_count[s, a]
best_a = np.argmax(Q[s])
policy[s][best_a] = 1.0
policy[s][1-best_a] = 0.0

return Q, policy

在MC Exploring Starts 算法中,我们需要指定环境初始状态,一种做法是env.reset()时接受初始状态,但是考虑到不去修改第三方实现的 BlackjackEnv类,采用一个取巧的办法,在调用reset()后直接改写env 的私有变量,这个逻辑封装在 reset_env_with_s0 方法中。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def reset_env_with_s0(env: BlackjackEnv, s0: State) -> BlackjackEnv:
env.reset()
player_sum = s0[0]
oppo_sum = s0[1]
has_usable = s0[2]

env.dealer[0] = oppo_sum
if has_usable:
env.player[0] = 1
env.player[1] = player_sum - 11
else:
if player_sum > 11:
env.player[0] = 10
env.player[1] = player_sum - 10
else:
env.player[0] = 2
env.player[1] = player_sum - 2
return env

算法结果的可视化和理论对比

下图是有Usable Ace情况下的理论最优策略。
理论最佳策略(有Ace)
Monte Carlo方法策略提示的收敛是比较慢的,下图是运行10,000,000次episode后有Usable Ace时的策略 \(\pi_{*}^{\prime}\)。对比理论最优策略,MC ES在不少的状态下还未收敛到理论最优解。
MC ES 10M的最佳策略(有Ace)
同样的,下两张图是无Usable Ace情况下的理论最优策略和试验结果的对比。
理论最佳策略(无Ace)
MC ES 10M的最佳策略(无Ace)
下面的两张图画出了运行代码10,000,000次episode后 \(\pi{*}\)的V值图。
MC ES 10M的最佳V值(有Ace)
MC ES 10M的最佳V值(无Ace)

Exploring Starts 蒙特卡洛控制改进

为了避免Monte Carlo ES Control在初始时必须访问到任意状态的限制,教材中介绍了一种改进算法,On-policy first-visit MC control for \(\epsilon \text{-soft policies}\) ,它同样基于Monte Carlo 预估Q值,但用 \(\epsilon \text{-soft}\) 策略来代替最有可能的action策略作为下一次迭代策略,\(\epsilon \text{-soft}\) 本质上来说就是对于任意动作都保留 \(\epsilon\) 小概率的访问可能,权衡了exploration和exploitation,由于每个动作都可能被无限次访问到,Explorting Starts中的强制随机初始状态就可以去除了。Monte Carlo ES Control 和 On-policy first-visit MC control for \(\epsilon \text{-soft policies}\) 都属于on-policy算法,其区别于off-policy的本质在于预估 \(q_{\pi}(s,a)\)时是否从同策略\(\pi\)生成的数据来计算。一个比较subtle的例子是著名的Q-Learning,因为根据这个定义,Q-Learning属于off-policy。

\[ \begin{align*} &\textbf{On-policy first-visit MC control (for }\epsilon \textbf{-soft policies), estimating } \pi \approx \pi_{*} \\ & \text{Algorithm parameter: small } \epsilon > 0 \\ & \text{Initialize:} \\ & \quad \pi \leftarrow \text{ an arbitrary } \epsilon \text{-soft policy} \\ & \quad Q(s, a) \in \mathbb R \text{, arbitrarily, for all }s \in \mathcal{S}, a \in \mathcal A(s) \\ & \quad Returns(s, a) \leftarrow \text{ an empty list, for all }s \in \mathcal{S}, a \in \mathcal A(s)\\ & \\ & \text{Repeat forever (for episode):}\\ & \quad \text{Generate an episode following } \pi : S_0, A_0, R_1, S_1, A_1, R_2, ..., S_{T-1}, A_{T-1}, R_T\\ & \quad G \leftarrow 0\\ & \quad \text{Loop for each step of episode, } t = T-1, T-2, ..., 0:\\ & \quad \quad \quad G \leftarrow \gamma G + R_{t+1}\\ & \quad \quad \quad \text{Unless the pair } S_t, A_t \text{ appears in } S_0, A_0, S_1, A_1, ..., S_{t-1}, A_{t-1}\\ & \quad \quad \quad \quad \text{Append } G \text { to }Returns(S_t, A_t) \\ & \quad \quad \quad \quad Q(S_t, A_t) \leftarrow \operatorname{average}(Returns(S_t, A_t))\\ & \quad \quad \quad \quad A^{*} \leftarrow \operatorname{argmax}_a Q(S_t, a)\\ & \quad \quad \quad \quad \text{For all } a \in \mathcal A(S_t):\\ & \quad \quad \quad \quad \quad \pi(a|S_t) \leftarrow \begin{cases} 1 - \epsilon + \epsilon / |\mathcal A(S_t)| & \text{ if } a = A^{*}\\ \epsilon / |\mathcal A(S_t)| & \text{ if } a \neq A^{*}\\ \end{cases} \\ \end{align*} \]

伪代码对应的 Python 实现如下。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def mc_control_epsilon_greedy(env: BlackjackEnv, num_episodes, discount_factor=1.0, epsilon=0.1) \
-> Tuple[ActionValue, Policy]:
returns_sum = defaultdict(float)
returns_count = defaultdict(float)

states = list(product(range(10, 22), range(1, 11), (True, False)))
policy = {s: np.ones(env.action_space.n) * 1.0 / env.action_space.n for s in states}
Q = defaultdict(lambda: np.zeros(env.action_space.n))

def update_epsilon_greedy_policy(policy: Policy, Q: ActionValue, s: State):
policy[s] = np.ones(env.action_space.n, dtype=float) * epsilon / env.action_space.n
best_action = np.argmax(Q[s])
policy[s][best_action] += (1.0 - epsilon)

for episode_i in range(1, num_episodes + 1):
episode_history = gen_stochastic_episode(policy, env)

G = 0
for t in range(len(episode_history) - 1, -1, -1):
s, a, r = episode_history[t]
G = discount_factor * G + r
if not any(s_a_r[0] == s and s_a_r[1] == a for s_a_r in episode_history[0: t]):
returns_sum[s, a] += G
returns_count[s, a] += 1.0
Q[s][a] = returns_sum[s, a] / returns_count[s, a]
update_epsilon_greedy_policy(policy, Q, s)

return Q, policy

从这期开始我们进入Sutton强化学习第二版,第五章蒙特卡洛方法。蒙特卡洛方法是一种在工程各领域都存在的基本方法,在强化领域中,其特点是无需知道环境的dynamics,只需不断模拟记录并分析数据即可逼近理论真实值。蒙特卡洛方法本篇将会用21点游戏作为示例来具体讲解其原理和代码实现。

21点游戏问题

21点游戏是一个经典的赌博游戏。大致规则是玩家和庄家各发两张牌,一张明牌,一张暗牌。玩家和庄家可以决定加牌或停止加牌,新加的牌均为暗牌,最后比较两个玩家的牌面和,更接近21点的获胜。游戏的变化因素是牌Ace,既可以作为11也可以作为1来计算,算作11的时候称作usable。

Sutton教材中的21点游戏规则简化了几个方面用于控制问题状态数:

  • 已发的牌的无状态性:和一副牌的21点游戏不同的是,游戏环境简化为牌是可以无穷尽被补充的,一副牌的某一张被派发后,同样的牌会被补充进来,或者可以认为每次发放的牌都是从一副新牌中抽出的。统计学中的术语称为重复采样(sample with replacement)。这种规则下极端情况下,玩家可以拥有 5个A或者5个2。另外,这会导致玩家无法通过开局看到的3张牌的信息推断后续发牌的概率,如此就大规模减小了游戏状态数。
  • 庄家和玩家独立游戏,无需按轮次要牌。开局给定4张牌后,玩家先行动,加牌直至超21点或者停止要牌,如果超21点,玩家输,否则,等待庄家行动,庄家加牌直至超21点或者停止要牌,如果超21点,庄家输,否则比较两者的总点数。这种方式可以认为当玩家和庄家看到初始的三张牌后独立做一系列决策,最后比较结果,避免了交互模式下因为能观察到每一轮后对方牌数变化产生额外的信息而导致的游戏状态数爆炸。

有以上两个规则的简化,21点游戏问题的总状态数有下面三个维度

  • 自己手中的点数和(12到21)

  • 庄家明牌的点数(A到10)

  • 庄家明牌是否有 A(True, False)。

状态总计总数为三个维度的乘积 10 * 10 * 2 = 200。

关于游戏状态有几个比较subtle的假设或者要素。首先,玩家初始时能看到三张牌,这三张牌确定了状态的三个维度的值,当然也就确定了Agent的初始状态,随后按照独立游戏的规则进行,玩家根据初始状态依照某种策略决策要牌还是结束要牌,新拿的牌更新了游戏状态,玩家转移到新状态下继续做决策。举个例子,假设初始时玩家明牌为8,暗牌为6,庄家明牌为7,则游戏状态为Tuple (14, 7, False)。若玩家的策略为教材中的固定规则策略:没到20或者21继续要牌。下一步玩家拿到牌3,则此时新状态为 (17, 7, False),按照策略继续要牌。

第二个方面是游戏的状态完全等价于玩家观察到的信息。比如尽管初始时有4张牌,真正的状态是这四张牌的值,但是出于简化目的,不考虑partially observable 的情况,即不将暗牌纳入游戏状态中。另外,庄家做决策的时候也无法得知玩家的手中的总牌数。

第三个方面是关于玩家点数。考虑玩家初始时的两张牌为2,3,总点数是5,那么为何不将5加入到游戏状态中呢?原则上是可以将初始总和为2到11都加入到游戏状态,但是意义不大,原因在于我们已经假设了已发牌的无状态性,拿到的这两张牌并不会改变后续补充的牌的出现概率。当玩家初始总和为2到11时一定会追加牌,因为无论第三张牌是什么,都不会超过21点,只会增加获胜概率。若后续第三张牌为8,总和变成13,就进入了有效的游戏状态,因为此时如果继续要牌,获得10,则游戏输掉。因此,我们关心的游戏状态并不完全等价于所有可能的游戏状态。

21点游戏 OpenAI Gym环境

OpenAI Gym 已经实现了Sutton版本的21点游戏环境,并按上述规则来进行。在安装完OpenAI Gym包之后 import BlackjackEnv即可使用。

1
from gym.envs.toy_text import BlackjackEnv

根据这个游戏环境,我们先来定义一些类型,可以令代码更具可读性和抽象化。State 上文说过是由三个分量组成的Tuple。Action 为bool类型 表示是否继续要牌。Reward 为+1或者-1,玩家叫牌过程中为0。StateValue 为书中的 \(V_{\pi}\),实现上是一个Dict。DeterministicPolicy 为一个函数,输入是某一状态,输出是唯一的决策动作。

{linenos
1
2
3
4
5
State = Tuple[int, int, bool]
Action = bool
Reward = float
StateValue = Dict[State, float]
DeterministicPolicy = Callable[[State], Action]

以下代码是 BlackjackEnv 核心代码,step 方法的输入为玩家的决策动作(叫牌还是结束),并输出State, Reward, is_done。简单解释一下代码逻辑,当玩家继续加牌时,需要判断是否超21点,如果没有超过的话,返回下一状态,同时reward 为0,等待下一step方法。若玩家停止叫牌,则按照庄家策略:小于17时叫牌。游戏终局时产生+1表示玩家获胜,-1表示庄家获胜。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class BlackjackEnv(gym.Env):

def step(self, action):
assert self.action_space.contains(action)
if action: # hit: add a card to players hand and return
self.player.append(draw_card(self.np_random))
if is_bust(self.player):
done = True
reward = -1.
else:
done = False
reward = 0.
else: # stick: play out the dealers hand, and score
done = True
while sum_hand(self.dealer) < 17:
self.dealer.append(draw_card(self.np_random))
reward = cmp(score(self.player), score(self.dealer))
if self.natural and is_natural(self.player) and reward == 1.:
reward = 1.5
return self._get_obs(), reward, done, {}

def _get_obs(self):
return (sum_hand(self.player), self.dealer[0], usable_ace(self.player))

下面示例如何调用step方法生成一个episode的数据集。数据集的类型为 List[Tuple[State, Action, Reward]]。

{linenos
1
2
3
4
5
6
7
8
9
10
def gen_episode_data(policy: DeterministicPolicy, env: BlackjackEnv) -> List[Tuple[State, Action, Reward]]:
episode_history = []
state = env.reset()
done = False
while not done:
action = policy(state)
next_state, reward, done, _ = env.step(action)
episode_history.append((state, action, reward))
state = next_state
return episode_history

策略的蒙特卡洛值预测

Monte Carlo Prediction解决如下问题:当给定Agent 策略\(\pi\)时,反复试验来预估策略的 \(V_{\pi}\) 值。具体来说,产生一系列的episode数据之后,对于出现了的所有状态分别计算其Return,再通过 average 某一状态 s 的Return来估计 \(V_{\pi}(s)\),理论上,依据大数定理(Law of large numbers),在可以无限模拟的情况下,Monte Carlo prediction 一定会收敛到真实的 \(V_{\pi}\)。算法实现上有两个略微不同的版本,一个版本称为 First-visit,另一个版本称为 Every-visit,区别在于如何计算出现的状态 s 的 Return值。

对于 First-visit 来说,当状态 s 第一次出现时计算一次 Returns,若继续出现状态 s 不再重复计算。对于Every-visit来说,每次出现 s 计算一次 Returns(s)。举个例子,某episode 数据如下: \[ S_1, R_1, S_2, R_2, S_1, R_3, S_3, R_4 \] First-visit 对于状态S1的Returns计算为

\[ Returns(S_1) = R_1 + R_2 + R_3 + R_4 \]

Every-visit 对于状态S1的Returns计算了两次,因为S1出现了两次。 \[ \begin{align*} Returns(S_1) = \frac{Return_1(S_1) + Return_2(S_1)}2 \\ = \frac{(R_1 + R_2 + R_3 + R_4) + (R_3 + R_4)} 2 \end{align*} \]

下面用Monte Carlo来模拟解得书中示例玩家固定策略的V值,策略具体为:加牌直到手中点数>=20,代码为

{linenos
1
2
3
4
5
6
def fixed_policy(observation):
"""
sticks if the player score is >= 20 and hits otherwise.
"""
score, dealer_score, usable_ace = observation
return 0 if score >= 20 else 1

First-visit MC Predicition

伪代码如下,注意考虑到实现上的高效性,在遍历episode序列数据时是从后向前扫的,这样可以边扫边更新G。

\[ \begin{align*} &\textbf{First-visit MC prediction, for estimating } V \approx v_{\pi} \\ & \text{Input: a policy } \pi \text{ to be evaluated} \\ & \text{Initialize} \\ & \quad V(s) \in \mathbb R \text{, arbitrarily, for all }s \in \mathcal{S} \\ & \quad Returns(s) \leftarrow \text{ an empty list, arbitrarily, for all }s \in \mathcal{S} \\ & \\ & \text{Loop forever (for episode):}\\ & \quad \text{Generate an episode following } \pi: S_0, A_0, R_1, S_1, A_1, R_2, ..., S_{T-1}, A_{T-1}, R_T\\ & \quad G \leftarrow 0\\ & \quad \text{Loop for each step of episode, } t = T-1, T-2, ..., 0:\\ & \quad \quad \quad G \leftarrow \gamma G + R_{t+1}\\ & \quad \quad \quad \text{Unless } S_t \text{ appears in } S_0, S_1, ..., S_{t-1}\\ & \quad \quad \quad \quad \text{Append } G \text { to }Returns(S_t) \\ & \quad \quad \quad \quad V(S_t) \leftarrow \operatorname{average}(Returns(S_t))\\ \end{align*} \]

对应的 python 实现

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def mc_prediction_first_visit(policy: DeterministicPolicy, env: BlackjackEnv,
num_episodes, discount_factor=1.0) -> StateValue:
returns_sum = defaultdict(float)
returns_count = defaultdict(float)

for episode_i in range(1, num_episodes + 1):
episode_history = gen_episode_data(policy, env)

G = 0
for t in range(len(episode_history) - 1, -1, -1):
s, a, r = episode_history[t]
G = discount_factor * G + r
if not any(s_a_r[0] == s for s_a_r in episode_history[0: t]):
returns_sum[s] += G
returns_count[s] += 1.0

V = defaultdict(float)
V.update({s: returns_sum[s] / returns_count[s] for s in returns_sum.keys()})
return V

Every-visit MC Prediciton

Every-visit 代码实现相对更简单一些,t 从后往前遍历时更新对应s的状态变量。如下所示

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def mc_prediction_every_visit(policy: DeterministicPolicy, env: BlackjackEnv,
num_episodes, discount_factor=1.0) -> StateValue:
returns_sum = defaultdict(float)
returns_count = defaultdict(float)

for episode_i in range(1, num_episodes + 1):
episode_history = gen_episode_data(policy, env)

G = 0
for t in range(len(episode_history) - 1, -1, -1):
s, a, r = episode_history[t]
G = discount_factor * G + r
returns_sum[s] += G
returns_count[s] += 1.0

V = defaultdict(float)
V.update({s: returns_sum[s] / returns_count[s] for s in returns_sum.keys()})
return V

策略 V值 3D 可视化

运行first-visit 算法,模拟10000次episode,fixed_policy的V值的3D图为下面两张图,分别是不含usable Ace和包含usable Ace。总的说来,一旦玩家能到达20点或21点获胜概率极大,到达13-17获胜概率较小,在11-13时有一定获胜概率,比较符合经验直觉。

first-visit MC 10000次没有usable A的V值
first-visit MC 10000次含有usable A的V值

同样运行every-visit 算法,模拟10000次的V值图。对比两种方法结果比较接近。

every-visit MC 10000次没有usable A的V值
every-visit MC 10000次含有usable A的V值

Leetcode 1227 是一道有意思的概率题,本篇将从多个角度来讨论这道题。题目如下

有 n 位乘客即将登机,飞机正好有 n 个座位。第一位乘客的票丢了,他随便选了一个座位坐下。 剩下的乘客将会: 如果他们自己的座位还空着,就坐到自己的座位上, 当他们自己的座位被占用时,随机选择其他座位,第 n 位乘客坐在自己的座位上的概率是多少?

示例 1: 输入:n = 1 输出:1.00000 解释:第一个人只会坐在自己的位置上。

示例 2: 输入: n = 2 输出: 0.50000 解释:在第一个人选好座位坐下后,第二个人坐在自己的座位上的概率是 0.5。

提示: 1 <= n <= 10^5

假设规模为n时答案为f(n),一般来说,这种递推问题在数学形式上可能有关于n的简单数学表达式(closed form),或者肯定有f(n)关于f(n-k)的递推表达式。工程上,我们可以通过通过多次模拟即蒙特卡罗模拟来算得近似的数值解。

Monte Carlo 模拟发现规律

首先,我们先来看看如何高效的用代码来模拟。根据题意的描述过程,直接可以写出下面代码。seats为n大小的bool 数组,每个位置表示此位置是否已经被占据。然后依次给第i个人按题意分配座位。注意,每次参数随机数范围在[0,n-1],因此,会出现已经被占据的情况,此时需要再次随机,直至分配到空位。

暴力直接模拟

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def simulate_bruteforce(n: int) -> bool:
"""
Simulates one round. Unbounded time complexity.
:param n: total number of seats
:return: True if last one has last seat, otherwise False
"""

seats = [False for _ in range(n)]

for i in range(n-1):
if i == 0: # first one, always random
seats[random.randint(0, n - 1)] = True
else:
if not seats[i]: # i-th has his seat
seats[i] = True
else:
while True:
rnd = random.randint(0, n - 1) # random until no conflicts
if not seats[rnd]:
seats[rnd] = True
break
return not seats[n-1]

运行上面的代码来模拟 n 从 2 到10 的情况,每种情况跑500次模拟,输出如下

1
2
3
4
5
6
7
8
9
10
1 => 1.0
2 => 0.55
3 => 0.54
4 => 0.486
5 => 0.488
6 => 0.498
7 => 0.526
8 => 0.504
9 => 0.482
10 => 0.494

发现当 n>=2 时,似乎概率都是0.5。

标准答案

其实,这道题的标准答案就是 n=1 为1,n>=2 为0.5。下面是 python 3 标准答案。本篇后面会从多个角度来探讨为什么是0.5 。

{linenos
1
2
3
class Solution:
def nthPersonGetsNthSeat(self, n: int) -> float:
return 1.0 if n == 1 else 0.5

O(n) 改进算法

上面的暴力直接模拟版本有个最大的问题是当n很大时,随机分配座位会产生大量冲突,因此,最坏复杂度是没有任何上限的。解决方法是每次发生随机分配时保证不冲突,能直接选到空位。下面是一种最坏复杂度O(n)的模拟过程,seats数组初始话成 0,1,...,n-1,表示座位号。当第i个人登机时,seats[i:n] 的值为他可以选择的座位集合,而seats[0:i]为已经被占据的座位集合。由于[i: n]是连续空间,产生随机数就能保证不冲突。当第i个人选完座位时,将他选中的seats[k]和seats[i] 交换,保证第i+i个人面临的seats[i+1:n]依然为可选座位集合。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def simulate_online(n: int) -> bool:
"""
Simulates one round of complexity O(N).
:param n: total number of seats
:return: True if last one has last seat, otherwise False
"""

seats = [i for i in range(n)]

def swap(i, j):
tmp = seats[i]
seats[i] = seats[j]
seats[j] = tmp

# for each person, the seats array idx available are [i, n-1]
for i in range(n-1):
if i == 0: # first one, always random
rnd = random.randint(0, n - 1)
swap(rnd, 0)
else:
if seats[i] == i: # i-th still has his seat
pass
else:
rnd = random.randint(i, n - 1) # selects idx from [i, n-1]
swap(rnd, i)
return seats[n-1] == n - 1

递推思维

这一节我们用数学递推思维来解释0.5的解。令f(n) 为第 n 位乘客坐在自己的座位上的概率,考察第一个人的情况(first step analysis),有三种可能

  1. 第一个人选了第一个即自己的座位,那么最后一个人一定能保证坐在自己的座位。
  2. 第一个人选了最后一个人的座位,无论中间什么过程,最后一个人无法坐到自己座位
  3. 第一个人选了第i个座位,(1<i<n),那么第i个人前面的除了第一个外的人都会坐在自己位置上,第i个人由于没有自己座位,随机在剩余的座位1,座位 [i+1,n] 中随机选择,此时,问题转变为f(n-i+1),如下图所示。
第一个人选了位置i
第i个人将问题转换成f(n-i+1)

通过上面分析,得到概率递推关系如下

\[ f(n) = \begin{align*} \left\lbrace \begin{array}{r@{}l} 1 & & p=\frac{1}{n} \quad \text{选了第一个位置} \\\\\\ f(n-i+1) & & p=\frac{1}{n} \quad \text{选了第i个位置,1<i<n} \\\\\\ 0 & & p=\frac{1}{n} \quad \text{选了第n个位置} \end{array} \right. \end{align*} \]

即f(n)的递推式为: \[ f(n) = \frac{1}{n} + \frac{1}{n} \times [ f(n-1) + f(n-2) + ...+ f(2)], \quad n>=2 \] 同理,f(n+1)递推式如下 \[ f(n+1) = \frac{1}{n+1} + \frac{1}{n+1} \times [ f(n) + f(n-1) + ...+ f(2)] \] \((n+1)f(n+1) - nf(n)\) 抵消 \(f(n-1) + ...f(2)\) 项,可得 \[ (n+1)f(n+1) - nf(n) = f(n) \]\[ f(n+1) = f(n) = \frac{1}{2} \quad n>=2 \]

用数学归纳法也可以证明 n>=2 时 f(n)=0.5。

简化的思考方式

我们再仔细思考一下上面的第三种情况,就是第一个人坐了第i个座位,1<i<n,此时,程序继续,不产生结果,直至产生结局1或者2,也就是case 1和2是真正的结局节点,它们产生的概率相同,因此答案是1/2。

从调用图可以看出这种关系,由于中间节点 f(4),f(3),f(2)生成Case 1和2的概率一样,因此无论它们之间是什么关系,最后结果都是1/2.

知乎上有个很形象的类比理解方式

考虑一枚硬币,正面向上的概率为 1/n,反面也是,立起来的概率为 (n-2)/n 。我们规定硬币立起来重新抛,但重新抛时,n会至少减小1。求结果为反面的概率。这样很显然结果为 1/2 。

这里,正面向上对应Case 2,反面对应Case 1。

这种思想可以写出如下代码,seats为 n 大小的bool 数组,当第i个人(0<i<n)发现自己座位被占的话,此时必然seats[0]没有被占,同时seats[i+1:]都是空的。假设seats[0]被占的话,要么是第一个人占的,要么是第p个人(p<i)坐了,两种情况下乱序都已经恢复了,此时第i个座位一定是空的。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def simulate(n: int) -> bool:
"""
Simulates one round of complexity O(N).
:param n: total number of seats
:return: True if last one has last seat, otherwise False
"""

seats = [False for _ in range(n)]

for i in range(n-1):
if i == 0: # first one, always random
rnd = random.randint(0, n - 1)
seats[rnd] = True
else:
if not seats[i]: # i-th still has his seat
seats[i] = True
else:
# 0 must not be available, now we have 0 and [i+1, n-1],
rnd = random.randint(i, n - 1)
if rnd == i:
seats[0] = True
else:
seats[rnd] = True
return not seats[n-1]

上一篇我们从原理层面解析了AlphaGo Zero如何改进MCTS算法,通过不断自我对弈,最终实现从零棋力开始训练直至能够打败任何高手。在本篇中,我们在已有的N子棋OpenAI Gym 环境中用Pytorch实现一个简化版的AlphaGo Zero算法。本篇所有代码在 github MyEncyclopedia/ConnectNGym 中,其中部分参考了SongXiaoJun 的 AlphaZero_Gomoku

AlphaGo Zero MCTS 树节点

上一篇中,我们知道AlphaGo Zero 的MCTS树搜索是基于传统MCTS 的UCT (UCB for Tree)的改进版PUCT(Polynomial Upper Confidence Trees)。局面节点的PUCT值由两部分组成,分别是代表Exploitation的action value Q值,和代表Exploration的U值。 \[ PUCT(s, a) =Q(s,a) + U(s,a) \] U值计算由这些参数决定:系数\(c_{puct}\),节点先验概率P(s, a) ,父节点访问次数,本节点的访问次数。具体公式如下 \[ U(s, a)=c_{p u c t} \cdot P(s, a) \cdot \frac{\sqrt{\Sigma_{b} N(s, b)}}{1+N(s, a)} \]

因此在实现过程中,对于一个树节点来说,需要保存其Q值、节点访问次数 _visit_num和先验概率 _prior。其中,_prior在节点初始化后不变,Q值和 visit_num随着游戏MCTS模拟进程而改变。此外,节点保存了 parent和_children变量,用于维护父子关系。c_puct为class variable,作为全局参数。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
class TreeNode:
"""
MCTS Tree Node
"""

c_puct: ClassVar[int] = 5 # class-wise global param c_puct, exploration weight factor.

_parent: TreeNode
_children: Dict[int, TreeNode] # map from action to TreeNode
_visit_num: int
_Q: float # Q value of the node, which is the mean action value.
_prior: float

和上面的计算公式相对应,下列代码根据节点状态计算PUCT(s, a)。

{linenos
1
2
3
4
5
6
7
8
9
10
class TreeNode:

def get_puct(self) -> float:
"""
Computes AlphaGo Zero PUCT (polynomial upper confidence trees) of the node.

:return: Node PUCT value.
"""
U = (TreeNode.c_puct * self._prior * np.sqrt(self._parent._visit_num) / (1 + self._visit_num))
return self._Q + U

AlphaGo Zero MCTS在playout时遇到已经被展开的节点,会根据selection规则选择子节点,该规则本质上是在所有子节点中选择最大的PUCT值的节点。

\[ a=\operatorname{argmax}_a(PUCT(s, a))=\operatorname{argmax}_a(Q(s,a) + U(s,a)) \]

{linenos
1
2
3
4
5
6
7
8
9
class TreeNode:

def select(self) -> Tuple[Pos, TreeNode]:
"""
Selects an action(Pos) having max UCB value.

:return: Action and corresponding node
"""
return max(self._children.items(), key=lambda act_node: act_node[1].get_puct())

新的叶节点一旦在playout时产生,关联的 v 值会一路向上更新至根节点,具体新节点的v值将在下一节中解释。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class TreeNode:

def propagate_to_root(self, leaf_value: float):
"""
Updates current node with observed leaf_value and propagates to root node.

:param leaf_value:
:return:
"""
if self._parent:
self._parent.propagate_to_root(-leaf_value)
self._update(leaf_value)

def _update(self, leaf_value: float):
"""
Updates the node by newly observed leaf_value.

:param leaf_value:
:return:
"""
self._visit_num += 1
# new Q is updated towards deviation from existing Q
self._Q += 0.5 * (leaf_value - self._Q)

AlphaGo Zero MCTS Player 实现

AlphaGo Zero MCTS 在训练阶段分为如下几个步骤。游戏初始局面下,整个局面树的建立由子节点的不断被探索而丰富起来。AlphaGo Zero对弈一次即产生了一次完整的游戏开始到结束的动作系列。在对弈过程中的某一游戏局面,需要采样海量的playout,又称MCTS模拟,以此来决定此局面的下一步动作。一次playout可视为在真实游戏状态树的一种特定采样,playout可能会产生游戏结局,生成真实的v值;也可能explore 到新的叶子节点,此时v值依赖策略价值网络的输出,目的是利用训练的神经网络来产生高质量的游戏对战局面。每次playout会从当前给定局面递归向下,向下的过程中会遇到下面三种节点情况。

  • 若局面节点是游戏结局(叶子节点),可以得到游戏的真实价值 z。从底部节点带着z向上更新沿途节点的Q值,直至根节点(初始局面)。
  • 若局面节点从未被扩展过(叶子节点),此时会将局面编码输入到策略价值双头网络,输出结果为网络预估的action分布和v值。Action分布作为节点先验概率P(s, a)来初始化子节点,预估的v值和上面真实游戏价值z一样,从叶子节点向上沿途更新到根节点。
  • 若局面节点已经被扩展过,则根据PUCT的select规则继续选择下一节点。

海量的playout模拟后,建立了游戏状态树的节点信息。但至此,AI玩家只是收集了信息,还仍未给定局面落子,而落子的决定由Play规则产生。下图展示了给定局面(Current节点)下,MCST模拟进行的多次playout探索后生成的局面树,play规则根据这些节点信息,产生Current 节点的动作分布 \(\pi\) ,确定下一步落子。

MCTS Playout和Play关系

Play 给定局面

对于当前需要做落子决定的某游戏局面\(s_0\),根据如下play公式生成落子分布 $$ ,子局面的落子概率正比于其访问次数的某次方。其中,某次方的倒数称为温度参数(Temperature)。

\[ \pi\left(a \mid s_{0}\right)=\frac{N\left(s_{0}, a\right)^{1 / \tau}}{\sum_{b} N\left(s_{0}, b\right)^{1 / \tau}} \]

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class MCTSAlphaGoZeroPlayer(BaseAgent):

def _next_step_play_act_probs(self, game: ConnectNGame) -> Tuple[List[Pos], ActionProbs]:
"""
For the given game status, run playouts number of times specified by self._playout_num.
Returns the action distribution according to AlphaGo Zero MCTS play formula.

:param game:
:return: actions and their probability
"""

for n in range(self._playout_num):
self._playout(copy.deepcopy(game))

act_visits = [(act, node._visit_num) for act, node in self._current_root._children.items()]
acts, visits = zip(*act_visits)
act_probs = softmax(1.0 / MCTSAlphaGoZeroPlayer.temperature * np.log(np.array(visits) + 1e-10))

return acts, act_probs

在训练模式时,考虑到偏向exploration的目的,在\(\pi\) 落子分布的基础上增加了 Dirichlet 分布。

\[ P(s,a) = (1-\epsilon)*\pi(a \mid s) + \epsilon * \boldsymbol{\eta} \quad (\boldsymbol{\eta} \sim \operatorname{Dir}(0.3)) \]

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MCTSAlphaGoZeroPlayer(BaseAgent):

def get_action(self, board: PyGameBoard) -> Pos:
"""
Method defined in BaseAgent.

:param board:
:return: next move for the given game board.
"""
return self._get_action(copy.deepcopy(board.connect_n_game))[0]

def _get_action(self, game: ConnectNGame) -> Tuple[MoveWithProb]:
epsilon = 0.25
avail_pos = game.get_avail_pos()
move_probs: ActionProbs = np.zeros(game.board_size * game.board_size)
assert len(avail_pos) > 0

# the pi defined in AlphaGo Zero paper
acts, act_probs = self._next_step_play_act_probs(game)
move_probs[list(acts)] = act_probs
if self._is_training:
# add Dirichlet Noise when training in favour of exploration
p_ = (1-epsilon) * act_probs + epsilon * np.random.dirichlet(0.3 * np.ones(len(act_probs)))
move = np.random.choice(acts, p=p_)
assert move in game.get_avail_pos()
else:
move = np.random.choice(acts, p=act_probs)

self.reset()
return move, move_probs

一次完整的对弈

一次完整的AI对弈就是从初始局面迭代play直至游戏结束,对弈生成的数据是一系列的 $(s, , z) $。

如下图 s0 到 s5 是某次井字棋的对弈。最终结局是先手黑棋玩家赢,即对于黑棋玩家 z = +1。需要注意的是:z = +1 是对于所有黑棋面临的局面,即s0, s2, s4,而对应的其余白棋玩家来说 z = -1。

一局完整对弈

\[ \begin{align*} &0: (s_0, \vec{\pi_0}, +1) \\ &1: (s_1, \vec{\pi_1}, -1) \\ &2: (s_2, \vec{\pi_2}, +1) \\ &3: (s_3, \vec{\pi_3}, -1) \\ &4: (s_4, \vec{\pi_4}, +1) \end{align*} \]

以下代码展示如何在AI对弈时收集数据 $(s, , z) $

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MCTSAlphaGoZeroPlayer(BaseAgent):

def self_play_one_game(self, game: ConnectNGame) \
-> List[Tuple[NetGameState, ActionProbs, NDArray[(Any), np.float]]]:
"""

:param game:
:return:
Sequence of (s, pi, z) of a complete game play. The number of list is the game play length.
"""

states: List[NetGameState] = []
probs: List[ActionProbs] = []
current_players: List[np.float] = []

while not game.game_over:
move, move_probs = self._get_action(game)
states.append(convert_game_state(game))
probs.append(move_probs)
current_players.append(game.current_player)
game.move(move)

current_player_z = np.zeros(len(current_players))
current_player_z[np.array(current_players) == game.game_result] = 1.0
current_player_z[np.array(current_players) == -game.game_result] = -1.0
self.reset()

return list(zip(states, probs, current_player_z))

Playout 代码实现

一次playout会从当前局面根据PUCT selection规则下沉到叶子节点,如果此叶子节点非游戏终结点,则会扩展当前节点生成下一层新节点,其先验分布由策略价值网络输出的action分布决定。一次playout最终会得到叶子节点的 v 值,并沿着MCTS树向上更新沿途的所有父节点 Q值。 从上一篇文章已知,游戏节点的数量随着参数而指数级增长,举例来说,井字棋(k=3,m=n=3)的状态数量是5478,k=3,m=n=4时是6035992 ,k=m=n=4时是9722011 。如果我们将初始局面节点作为根节点,同时保存海量playout探索得到的局面节点,实现时会发现我们无法将所有探索到的局面节点都保存在内存中。这里的一种解决方法是在一次self play中每轮playout之后,将根节点重置成落子的节点,从而有效控制整颗局面树中的节点数量。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MCTSAlphaGoZeroPlayer(BaseAgent):

def _playout(self, game: ConnectNGame):
"""
From current game status, run a sequence down to a leaf node, either because game ends or unexplored node.
Get the leaf value of the leaf node, either the actual reward of game or action value returned by policy net.
And propagate upwards to root node.

:param game:
"""
player_id = game.current_player

node = self._current_root
while True:
if node.is_leaf():
break
act, node = node.select()
game.move(act)

# now game state is a leaf node in the tree, either a terminal node or an unexplored node
act_and_probs: Iterator[MoveWithProb]
act_and_probs, leaf_value = self._policy_value_net.policy_value_fn(game)

if not game.game_over:
# case where encountering an unexplored leaf node, update leaf_value estimated by policy net to root
for act, prob in act_and_probs:
game.move(act)
child_node = node.expand(act, prob)
game.undo()
else:
# case where game ends, update actual leaf_value to root
if game.game_result == ConnectNGame.RESULT_TIE:
leaf_value = ConnectNGame.RESULT_TIE
else:
leaf_value = 1 if game.game_result == player_id else -1
leaf_value = float(leaf_value)

# Update leaf_value and propagate up to root node
node.propagate_to_root(-leaf_value)

编码游戏局面

为了将信息有效的传递给策略神经网络,必须从当前玩家的角度编码游戏局面。局面不仅要反映棋盘上黑白棋子的位置,也需要考虑最后一个落子的位置以及是否为当前玩家棋局。因此,我们将某局面按照当前玩家来编码,返回类型为4个棋盘大小组成的ndarray,即shape [4, board_size, board_size],其中

  1. 第一个数组编码当前玩家的棋子位置
  2. 第二个数组编码对手玩家棋子位置
  3. 第三个表示最后落子位置
  4. 第四个全1表示此局面为先手(黑棋)局面,全0表示白棋局面

例如之前游戏对弈中的前四步:

s1->s2 后局面s2的编码:当前玩家为黑棋玩家,编码局面s2 返回如下ndarray,数组[0] 为s2黑子位置,[1]为白子位置,[2]表示最后一个落子(1, 1) ,[3] 全1表示当前是黑棋落子的局面。

编码黑棋玩家局面 s2
s2->s3 后局面s3的编码:当前玩家为白棋玩家,编码返回如下,数组[0] 为s3白子位置,[1]为黑子位置,[2]表示最后一个落子(1, 0) ,[3] 全0表示当前是白棋落子的局面。
编码白棋玩家局面 s3

具体代码实现如下。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
NetGameState = NDArray[(4, Any, Any), np.int]


def convert_game_state(game: ConnectNGame) -> NetGameState:
"""
Converts game state to type NetGameState as ndarray.

:param game:
:return:
Of shape 4 * board_size * board_size.
[0] is current player positions.
[1] is opponent positions.
[2] is last move location.
[3] all 1 meaning move by black player, all 0 meaning move by white.
"""
state_matrix = np.zeros((4, game.board_size, game.board_size))

if game.action_stack:
actions = np.array(game.action_stack)
move_curr = actions[::2]
move_oppo = actions[1::2]
for move in move_curr:
state_matrix[0][move] = 1.0
for move in move_oppo:
state_matrix[1][move] = 1.0
# indicate the last move location
state_matrix[2][actions[-1]] = 1.0
if len(game.action_stack) % 2 == 0:
state_matrix[3][:, :] = 1.0 # indicate the colour to play
return state_matrix[:, ::-1, :]

策略价值网络训练

策略价值网络是一个共享参数 \(\theta\) 的双头网络,给定上面的游戏局面编码会产生预估的p和v。

\[ \vec{p_{\theta}}, v_{\theta}=f_{\theta}(s) \] 结合真实游戏对弈后产生三元组数据 $(s, , z) $ ,按照论文中的loss 来训练神经网络。 \[ l=\sum_{t}\left(v_{\theta}\left(s_{t}\right)-z_{t}\right)^{2}-\vec{\pi_{t}} \cdot \log \left(\vec{p_{\theta}}\left(s_{t}\right)\right) + c {\lVert \theta \rVert}^2 \]

下面代码为Pytorch backward部分。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def backward_step(self, state_batch: List[NetGameState], probs_batch: List[ActionProbs],
value_batch: List[NDArray[(Any), np.float]], lr) -> Tuple[float, float]:
if self.use_gpu:
state_batch = Variable(torch.FloatTensor(state_batch).cuda())
probs_batch = Variable(torch.FloatTensor(probs_batch).cuda())
value_batch = Variable(torch.FloatTensor(value_batch).cuda())
else:
state_batch = Variable(torch.FloatTensor(state_batch))
probs_batch = Variable(torch.FloatTensor(probs_batch))
value_batch = Variable(torch.FloatTensor(value_batch))

self.optimizer.zero_grad()
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr

log_act_probs, value = self.policy_value_net(state_batch)
# loss = (z - v)^2 - pi*T * log(p) + c||theta||^2
value_loss = F.mse_loss(value.view(-1), value_batch)
policy_loss = -torch.mean(torch.sum(probs_batch * log_act_probs, 1))
loss = value_loss + policy_loss
loss.backward()
self.optimizer.step()
entropy = -torch.mean(torch.sum(torch.exp(log_act_probs) * log_act_probs, 1))
return loss.item(), entropy.item()

参考资料

AlphaGo Zero是Deepmind 最后一代AI围棋算法,因为已经达到了棋类游戏AI的终极目的:给定任何游戏规则,AI从零出发只通过自我对弈的方式提高,最终可以取得超越任何对手(包括顶级人类棋手和上一代AlphaGo)的能力。换种方式说,当给定足够多的时间和计算资源,可以取得无限逼近游戏真实解的能力。这一篇,我们深入分析AlphaGo Zero的设计理念和关键组件的细节并解释组件之间的关联。下一篇中,我们将在已有的N子棋OpenAI Gym 环境中用Pytorch实现一个简化版的AlphaGo Zero算法。

AlphaGo Zero 综述

AlphaGo Zero 作为Deepmind在围棋领域的最后一代AI Agent,已经可以达到棋类游戏的终极目标:在只给定游戏规则的情况下,AI 棋手从最初始的随机状态开始,通过不断的自我对弈的强化学习来实现超越以往任何人类棋手和上一代Alpha的能力,并且同样的算法和模型应用到了其他棋类也得出相同的效果。这一篇,从原理上来解析AlphaGo Zero的运行方式。

AlphaGo Zero 算法由三种元素构成:强化学习(RL)、深度学习(DL)和蒙特卡洛树搜索(MCTS,Monte Carlo Tree Search)。核心思想是基于神经网络的Policy Iteration强化学习,即最终学的是一个深度学习的policy network,输入是某棋盘局面 s,输出是此局面下可走位的概率分布:\(p(a|s)\)

在第一代AlphaGo算法中,这个初始policy network通过收集专业人类棋手的海量棋局训练得来,再采用传统RL 的Monte Carlo Tree Search Rollout 技术来强化现有的AI对于局面落子(Policy Network)的判断。Monte Carlo Tree Search Rollout 简单说来就是海量棋局模拟,AI Agent在通过现有的Policy Network策略完成一次从某局面节点到最终游戏胜负结束的对弈,这个完整的对弈叫做rollout,又称playout。完成一次rollout之后,通过局面树层层回溯到初始局面节点,并在回溯过程中同步修订所有经过的局面节点的统计指标,修正原先policy network对于落子导致输赢的判断。通过海量并发的棋局模拟来提升基准policy network,即在各种局面下提高好的落子的\(p(a_{win}|s)\),降低坏的落子的\(p(a_{lose}|s)\)

举例如下井字棋局面:
局面s

基准policy network返回 p(s) 如下 \[ p(a|s) = \begin{align*} \left\lbrace \begin{array}{r@{}l} 0.1, & & a = (0,2) \\ 0.05, & & a = (1,0) \\ 0.5, & & a = (1,1) \\ 0.05, & & a = (1,2)\\ 0.2, & & a = (2,0) \\ 0.05, & & a = (2,1) \\ 0.05, & & a = (2,2) \end{array} \right. \end{align*} \] 通过海量并发模拟后,修订成如下的action概率分布,然后通过policy iteration迭代新的网络来逼近 \(p'\) 就提高了棋力。 \[ p'(a|s) = \begin{align*} \left\lbrace \begin{array}{r@{}l} 0, & & a = (0,2) \\ 0, & & a = (1,0) \\ 0.9, & & a = (1,1) \\ 0, & & a = (1,2)\\ 0, & & a = (2,0) \\ 0, & & a = (2,1) \\ 0.1, & & a = (2,2) \end{array} \right. \end{align*} \]

蒙特卡洛树搜索(MCTS)概述

Monte Carlo Tree Search 是Monte Carlo 在棋类游戏中的变种,棋类游戏的一大特点是可以用动作(move)联系的决策树来表示,树的节点数量取决于分支的数量和树的深度。MCTS的目的是在树节点非常多的情况下,通过实验模拟(rollout, playout)的方式来收集尽可能多的局面输赢情况,并基于这些统计信息,将搜索资源的重点均衡地放在未被探索的节点和值得探索的节点上,减少在大概率输的节点上的模拟资源投入。传统MCTS有四个过程:Selection, Expansion, Simulation 和Backpropagation。下图是Wikipedia 的例子:

  • Selection:从根节点出发,根据现有统计的信息和selection规则,选择子节点递归向下做决定,后面我们会详细介绍AlphaGo的UCB规则。图中节点的数字,例如根节点11/21,分别代表赢的次数和总模拟次数。从根节点一路向下分别选择节点 7/10, 1/6直到叶子节点3/3,叶子节点表示它未被探索过。
  • Expansion:由于3/3节点未被探索过,初始化其所有子节点为0/0,图中3/3只有一个子节点。后面我们会看到神经网络在初始化子节点的时候起到的指导作用,即所有子节点初始权重并非相同,而是由神经网络给出估计。
  • Simulation:重复selection和expansion,根据游戏规则递归向下直至游戏结束。
  • Backpropagation:游戏结束在终点节点产生游戏真实的价值,回溯向上调整所有父节点的统计状态。

权衡 Exploration 和 Exploitation

在不断扩张决策树并收集节点统计信息的同时,MCTS根据规则来权衡探索目的(采样不足)或利用目的来做决策,这个权衡规则叫做Upper Confidence Bound(UCB)。典型的UCB公式如下:w表示通过节点的赢的次数,n表示通过节点的总次数,N是父节点的访问次数,c是调节Exploration 和 Exploitation权重的超参。

\[ {\frac{w_i}{n_i}} + c \sqrt{\frac{\ln N_i}{n_i}} \]

假设某节点有两个子节点s1, s2,它们的统计指标为 s1: w/n = 3/4,s2: w/n = 6/8,由于两者输赢比率一样,因此根据公式,访问次数少的节点出于Exploration的目的胜出,MCTS最终决定从s局面走向s1。

从第一性原理来理解AlphaGo Zero

前一代的AlphaGo已经战胜了世界冠军,取得了空前的成就,AlphaGo Zero 的设计目标变得更加General,去除围棋相关的处理和知识,用统一的框架和算法来解决棋类问题。 1. 无人工先验数据

改进之前需要专家棋手对弈数据来冷启动初始棋力

  1. 无特定游戏特征工程

    无需围棋特定技巧,只包含下棋规则,可以适用到所有棋类游戏

  2. 单一神经网络

    统一Policy Network和Value Network,使用一个共享参数的双头神经网络

  3. 简单树搜索

    去除传统MCTS的Rollout 方式,用神经网络来指导MCTS更有效产生搜索策略

搜索空间的两个优化原则

尽管理论上围棋是有解的,即先手必赢、被逼平或必输,通过遍历所有可能局面可以求得解。同理,通过海量模拟所有可能游戏局面,也可以无限逼近所有局面下的真实输赢概率,直至收敛于局面落子的确切最佳结果。但由于围棋棋局的数目远远大于宇宙原子数目,3^361 >> 10^80,因此需要将计算资源有效的去模拟值得探索的局面,例如对于显然的被动局面减小模拟次数,所以如何有效地减小搜索空间是AlphaGo Zero 需要解决的重大问题。David Silver 在Deepmind AlphaZero - Mastering Games Without Human Knowledge中提到AlphaGo Zero 采用两个原则来有效减小搜索空间。

原则1: 通过Value Network减少搜索的深度

Value Network 通过预测给定局面的value来直接预测最终结果,思想和上一期Minimax DP 策略中直接缓存当前局面的胜负状态一样,减少每次必须靠模拟到最后才能知道当前局面的输赢概率,或者需要多层树搜索才能知道输赢概率。

原则2: 通过Policy Network减少搜索的宽度

搜索广度的减少是由Policy Network预估来达成的,将下一步搜索局限在高概率的动作上,大幅度提升原先MCTS新节点生成后冷启动的搜索宽度。

神经网络结构

AlphaGo Zero 使用一个单一的深度神经网络来完成policy 和value的预测。具体实现方式是将policy network和value network合并成一个共享参数 $ $ 的双头网络。其中z是真实游戏结局的效用,范围为[-1, 1] 。

\[ (p, v)=f_{\theta}(s) \] \[ p_{a}=\operatorname{Pr}(a \mid s) \] \[ v = \mathop{\mathbb{E}}[z|s] \]

Monte Carlo Tree Search (MCTS) 建立了棋局搜索树,节点的初始状态由神经网络输出的p和v值来估计,由此初始的动作策略和价值预判就会建立在高手的水平之上。模拟一局游戏之后向上回溯,会同步更新路径上节点的统计数值并生成更好的MCTS搜索策略 \(\vec{\pi}\)。进一步来看,MCTS和神经网络互相形成了正循环。神经网络指导了未知节点的MCTS初始搜索策略,产生自我对弈游戏结局后,通过减小 \(\vec{p}\)\(\vec{\pi}\)的 Loss ,最终又提高了神经网络对于局面的估计能力。神经网络value network的提升也是通过不断减小网络预测的结果和最终结果的差异来提升。 因此,具体神经网络的Loss函数由三部分组成,value network的损失,policy network的损失以及正则项。 \[ l=\sum_{t}\left(v_{\theta}\left(s_{t}\right)-z_{t}\right)^{2}-\vec{\pi}_{t} \cdot \log \left(\vec{p}_{\theta}\left(s_{t}\right)\right) + c {\lVert \theta \rVert}^2 \]

AlphaGo Zero MCTS 具体过程

AlphaGo Plays Games Against Itself

AlphaGo Zero的MCTS和传统MCTS都有相似的四个过程,但AlphaGo Zero的MCTS步骤相对更复杂。 首先,除了W/N统计指标之外,AlphaGo Zero的MCTS保存了决策边 a|s 的Q(s,a):Action Value,也就是Q-Learning中的Q值,其初始值由神经网络给出。此外,Q 值也用于串联自底向上更新节点的Value值。具体说来,当某个新节点被Explore后,会将网络给出的Q值向上传递,并逐层更新父节点的Q值。当游戏结局产生时,也会向上更新所有父节点的Q值。 此外对于某一游戏局面s进行多次模拟,每次在局面s出发向下探索,每次探索在已知节点按Selection规则深入一步,直至达到未探索的局面或者游戏结束,产生Q值后向上回溯到最初局面s,回溯过程中更新路径上的局面的统计值或者Q值。在多次模拟结束后根据Play的算法,决定局面s的下一步行动。尽管每次模拟探索可能会深入多层,但最终play阶段的算法规则仅决定给定局面s的下一层落子动作。多次向下探索的优势在于:

  1. 探索和采样更多的叶子节点,在更多信息下做决策。

  2. 通过average out多次模拟下一层落子决定,尽可能提升MCTS策略的下一步判断能力,提高 \(\pi\) 能力,更有效指导神经网络,提高其学习效率。

New Policy Network V' is Trained to Predict Winner
  1. Selection:

从游戏局面s开始,选择a向下递归,直至未展开的节点(搜索树中的叶子节点)或者游戏结局。具体在局面s下选择a的规则由以下UCB(Upper Confidence Bound)决定
\[ a=\operatorname{argmax}_a(Q(s,a) + u(s,a)) \]

其中,Q(s,a) 和u(s,a) 项分别代表Exploitation 和Exploration。两项相加来均衡Exploitation和Exploration,保证初始时每个节点被explore,在有足够多的信息时逐渐偏向exploitation。

\[ u(s, a)=c_{p u c t} \cdot P(s, a) \cdot \frac{\sqrt{\Sigma_{b} N(s, b)}}{1+N(s, a)} \]

  1. Expand

当遇到一个未展开的节点(搜索树中的叶子节点)时,对其每个子节点使用现有网络进行预估,即

\[ (p(s), v(s))=f_{\theta}(s) \]

  1. Backup

当新的叶子节点展开时或者到达终点局面时,向上更新父节点的Q值,具体公式为 \[ Q(s, a)=\frac{1}{N(s, a)} \sum_{s^{\prime} \mid s, a \rightarrow s^{\prime}} V\left(s^{\prime}\right) \]

  1. Play

多次模拟结束后,使用得到搜索概率分布 ${a} $来确定最终的落子动作。正比于访问次数的某次方 $ {a} N(s, a)^{1 / }\(,其中\)$为温度参数(temperature parameter)。

New Policy Network V' is Trained to Predict Winner

参考资料

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×