组合游戏系列5: 井字棋、五子棋AlphaGo Zero 算法实战

上一篇我们从原理层面解析了AlphaGo Zero如何改进MCTS算法,通过不断自我对弈,最终实现从零棋力开始训练直至能够打败任何高手。在本篇中,我们在已有的N子棋OpenAI Gym 环境中用Pytorch实现一个简化版的AlphaGo Zero算法。本篇所有代码在 github MyEncyclopedia/ConnectNGym 中,其中部分参考了SongXiaoJun 的 AlphaZero_Gomoku

AlphaGo Zero MCTS 树节点

上一篇中,我们知道AlphaGo Zero 的MCTS树搜索是基于传统MCTS 的UCT (UCB for Tree)的改进版PUCT(Polynomial Upper Confidence Trees)。局面节点的PUCT值由两部分组成,分别是代表Exploitation的action value Q值,和代表Exploration的U值。 \[ PUCT(s, a) =Q(s,a) + U(s,a) \] U值计算由这些参数决定:系数\(c_{puct}\),节点先验概率P(s, a) ,父节点访问次数,本节点的访问次数。具体公式如下 \[ U(s, a)=c_{p u c t} \cdot P(s, a) \cdot \frac{\sqrt{\Sigma_{b} N(s, b)}}{1+N(s, a)} \]

因此在实现过程中,对于一个树节点来说,需要保存其Q值、节点访问次数 _visit_num和先验概率 _prior。其中,_prior在节点初始化后不变,Q值和 visit_num随着游戏MCTS模拟进程而改变。此外,节点保存了 parent和_children变量,用于维护父子关系。c_puct为class variable,作为全局参数。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
class TreeNode:
"""
MCTS Tree Node
"""

c_puct: ClassVar[int] = 5 # class-wise global param c_puct, exploration weight factor.

_parent: TreeNode
_children: Dict[int, TreeNode] # map from action to TreeNode
_visit_num: int
_Q: float # Q value of the node, which is the mean action value.
_prior: float

和上面的计算公式相对应,下列代码根据节点状态计算PUCT(s, a)。

{linenos
1
2
3
4
5
6
7
8
9
10
class TreeNode:

def get_puct(self) -> float:
"""
Computes AlphaGo Zero PUCT (polynomial upper confidence trees) of the node.

:return: Node PUCT value.
"""
U = (TreeNode.c_puct * self._prior * np.sqrt(self._parent._visit_num) / (1 + self._visit_num))
return self._Q + U

AlphaGo Zero MCTS在playout时遇到已经被展开的节点,会根据selection规则选择子节点,该规则本质上是在所有子节点中选择最大的PUCT值的节点。

\[ a=\operatorname{argmax}_a(PUCT(s, a))=\operatorname{argmax}_a(Q(s,a) + U(s,a)) \]

{linenos
1
2
3
4
5
6
7
8
9
class TreeNode:

def select(self) -> Tuple[Pos, TreeNode]:
"""
Selects an action(Pos) having max UCB value.

:return: Action and corresponding node
"""
return max(self._children.items(), key=lambda act_node: act_node[1].get_puct())

新的叶节点一旦在playout时产生,关联的 v 值会一路向上更新至根节点,具体新节点的v值将在下一节中解释。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class TreeNode:

def propagate_to_root(self, leaf_value: float):
"""
Updates current node with observed leaf_value and propagates to root node.

:param leaf_value:
:return:
"""
if self._parent:
self._parent.propagate_to_root(-leaf_value)
self._update(leaf_value)

def _update(self, leaf_value: float):
"""
Updates the node by newly observed leaf_value.

:param leaf_value:
:return:
"""
self._visit_num += 1
# new Q is updated towards deviation from existing Q
self._Q += 0.5 * (leaf_value - self._Q)

AlphaGo Zero MCTS Player 实现

AlphaGo Zero MCTS 在训练阶段分为如下几个步骤。游戏初始局面下,整个局面树的建立由子节点的不断被探索而丰富起来。AlphaGo Zero对弈一次即产生了一次完整的游戏开始到结束的动作系列。在对弈过程中的某一游戏局面,需要采样海量的playout,又称MCTS模拟,以此来决定此局面的下一步动作。一次playout可视为在真实游戏状态树的一种特定采样,playout可能会产生游戏结局,生成真实的v值;也可能explore 到新的叶子节点,此时v值依赖策略价值网络的输出,目的是利用训练的神经网络来产生高质量的游戏对战局面。每次playout会从当前给定局面递归向下,向下的过程中会遇到下面三种节点情况。

  • 若局面节点是游戏结局(叶子节点),可以得到游戏的真实价值 z。从底部节点带着z向上更新沿途节点的Q值,直至根节点(初始局面)。
  • 若局面节点从未被扩展过(叶子节点),此时会将局面编码输入到策略价值双头网络,输出结果为网络预估的action分布和v值。Action分布作为节点先验概率P(s, a)来初始化子节点,预估的v值和上面真实游戏价值z一样,从叶子节点向上沿途更新到根节点。
  • 若局面节点已经被扩展过,则根据PUCT的select规则继续选择下一节点。

海量的playout模拟后,建立了游戏状态树的节点信息。但至此,AI玩家只是收集了信息,还仍未给定局面落子,而落子的决定由Play规则产生。下图展示了给定局面(Current节点)下,MCST模拟进行的多次playout探索后生成的局面树,play规则根据这些节点信息,产生Current 节点的动作分布 \(\pi\) ,确定下一步落子。

MCTS Playout和Play关系

Play 给定局面

对于当前需要做落子决定的某游戏局面\(s_0\),根据如下play公式生成落子分布 $$ ,子局面的落子概率正比于其访问次数的某次方。其中,某次方的倒数称为温度参数(Temperature)。

\[ \pi\left(a \mid s_{0}\right)=\frac{N\left(s_{0}, a\right)^{1 / \tau}}{\sum_{b} N\left(s_{0}, b\right)^{1 / \tau}} \]

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class MCTSAlphaGoZeroPlayer(BaseAgent):

def _next_step_play_act_probs(self, game: ConnectNGame) -> Tuple[List[Pos], ActionProbs]:
"""
For the given game status, run playouts number of times specified by self._playout_num.
Returns the action distribution according to AlphaGo Zero MCTS play formula.

:param game:
:return: actions and their probability
"""

for n in range(self._playout_num):
self._playout(copy.deepcopy(game))

act_visits = [(act, node._visit_num) for act, node in self._current_root._children.items()]
acts, visits = zip(*act_visits)
act_probs = softmax(1.0 / MCTSAlphaGoZeroPlayer.temperature * np.log(np.array(visits) + 1e-10))

return acts, act_probs

在训练模式时,考虑到偏向exploration的目的,在\(\pi\) 落子分布的基础上增加了 Dirichlet 分布。

\[ P(s,a) = (1-\epsilon)*\pi(a \mid s) + \epsilon * \boldsymbol{\eta} \quad (\boldsymbol{\eta} \sim \operatorname{Dir}(0.3)) \]

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MCTSAlphaGoZeroPlayer(BaseAgent):

def get_action(self, board: PyGameBoard) -> Pos:
"""
Method defined in BaseAgent.

:param board:
:return: next move for the given game board.
"""
return self._get_action(copy.deepcopy(board.connect_n_game))[0]

def _get_action(self, game: ConnectNGame) -> Tuple[MoveWithProb]:
epsilon = 0.25
avail_pos = game.get_avail_pos()
move_probs: ActionProbs = np.zeros(game.board_size * game.board_size)
assert len(avail_pos) > 0

# the pi defined in AlphaGo Zero paper
acts, act_probs = self._next_step_play_act_probs(game)
move_probs[list(acts)] = act_probs
if self._is_training:
# add Dirichlet Noise when training in favour of exploration
p_ = (1-epsilon) * act_probs + epsilon * np.random.dirichlet(0.3 * np.ones(len(act_probs)))
move = np.random.choice(acts, p=p_)
assert move in game.get_avail_pos()
else:
move = np.random.choice(acts, p=act_probs)

self.reset()
return move, move_probs

一次完整的对弈

一次完整的AI对弈就是从初始局面迭代play直至游戏结束,对弈生成的数据是一系列的 $(s, , z) $。

如下图 s0 到 s5 是某次井字棋的对弈。最终结局是先手黑棋玩家赢,即对于黑棋玩家 z = +1。需要注意的是:z = +1 是对于所有黑棋面临的局面,即s0, s2, s4,而对应的其余白棋玩家来说 z = -1。

一局完整对弈

\[ \begin{align*} &0: (s_0, \vec{\pi_0}, +1) \\ &1: (s_1, \vec{\pi_1}, -1) \\ &2: (s_2, \vec{\pi_2}, +1) \\ &3: (s_3, \vec{\pi_3}, -1) \\ &4: (s_4, \vec{\pi_4}, +1) \end{align*} \]

以下代码展示如何在AI对弈时收集数据 $(s, , z) $

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MCTSAlphaGoZeroPlayer(BaseAgent):

def self_play_one_game(self, game: ConnectNGame) \
-> List[Tuple[NetGameState, ActionProbs, NDArray[(Any), np.float]]]:
"""

:param game:
:return:
Sequence of (s, pi, z) of a complete game play. The number of list is the game play length.
"""

states: List[NetGameState] = []
probs: List[ActionProbs] = []
current_players: List[np.float] = []

while not game.game_over:
move, move_probs = self._get_action(game)
states.append(convert_game_state(game))
probs.append(move_probs)
current_players.append(game.current_player)
game.move(move)

current_player_z = np.zeros(len(current_players))
current_player_z[np.array(current_players) == game.game_result] = 1.0
current_player_z[np.array(current_players) == -game.game_result] = -1.0
self.reset()

return list(zip(states, probs, current_player_z))

Playout 代码实现

一次playout会从当前局面根据PUCT selection规则下沉到叶子节点,如果此叶子节点非游戏终结点,则会扩展当前节点生成下一层新节点,其先验分布由策略价值网络输出的action分布决定。一次playout最终会得到叶子节点的 v 值,并沿着MCTS树向上更新沿途的所有父节点 Q值。 从上一篇文章已知,游戏节点的数量随着参数而指数级增长,举例来说,井字棋(k=3,m=n=3)的状态数量是5478,k=3,m=n=4时是6035992 ,k=m=n=4时是9722011 。如果我们将初始局面节点作为根节点,同时保存海量playout探索得到的局面节点,实现时会发现我们无法将所有探索到的局面节点都保存在内存中。这里的一种解决方法是在一次self play中每轮playout之后,将根节点重置成落子的节点,从而有效控制整颗局面树中的节点数量。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MCTSAlphaGoZeroPlayer(BaseAgent):

def _playout(self, game: ConnectNGame):
"""
From current game status, run a sequence down to a leaf node, either because game ends or unexplored node.
Get the leaf value of the leaf node, either the actual reward of game or action value returned by policy net.
And propagate upwards to root node.

:param game:
"""
player_id = game.current_player

node = self._current_root
while True:
if node.is_leaf():
break
act, node = node.select()
game.move(act)

# now game state is a leaf node in the tree, either a terminal node or an unexplored node
act_and_probs: Iterator[MoveWithProb]
act_and_probs, leaf_value = self._policy_value_net.policy_value_fn(game)

if not game.game_over:
# case where encountering an unexplored leaf node, update leaf_value estimated by policy net to root
for act, prob in act_and_probs:
game.move(act)
child_node = node.expand(act, prob)
game.undo()
else:
# case where game ends, update actual leaf_value to root
if game.game_result == ConnectNGame.RESULT_TIE:
leaf_value = ConnectNGame.RESULT_TIE
else:
leaf_value = 1 if game.game_result == player_id else -1
leaf_value = float(leaf_value)

# Update leaf_value and propagate up to root node
node.propagate_to_root(-leaf_value)

编码游戏局面

为了将信息有效的传递给策略神经网络,必须从当前玩家的角度编码游戏局面。局面不仅要反映棋盘上黑白棋子的位置,也需要考虑最后一个落子的位置以及是否为当前玩家棋局。因此,我们将某局面按照当前玩家来编码,返回类型为4个棋盘大小组成的ndarray,即shape [4, board_size, board_size],其中

  1. 第一个数组编码当前玩家的棋子位置
  2. 第二个数组编码对手玩家棋子位置
  3. 第三个表示最后落子位置
  4. 第四个全1表示此局面为先手(黑棋)局面,全0表示白棋局面

例如之前游戏对弈中的前四步:

s1->s2 后局面s2的编码:当前玩家为黑棋玩家,编码局面s2 返回如下ndarray,数组[0] 为s2黑子位置,[1]为白子位置,[2]表示最后一个落子(1, 1) ,[3] 全1表示当前是黑棋落子的局面。

编码黑棋玩家局面 s2
s2->s3 后局面s3的编码:当前玩家为白棋玩家,编码返回如下,数组[0] 为s3白子位置,[1]为黑子位置,[2]表示最后一个落子(1, 0) ,[3] 全0表示当前是白棋落子的局面。
编码白棋玩家局面 s3

具体代码实现如下。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
NetGameState = NDArray[(4, Any, Any), np.int]


def convert_game_state(game: ConnectNGame) -> NetGameState:
"""
Converts game state to type NetGameState as ndarray.

:param game:
:return:
Of shape 4 * board_size * board_size.
[0] is current player positions.
[1] is opponent positions.
[2] is last move location.
[3] all 1 meaning move by black player, all 0 meaning move by white.
"""
state_matrix = np.zeros((4, game.board_size, game.board_size))

if game.action_stack:
actions = np.array(game.action_stack)
move_curr = actions[::2]
move_oppo = actions[1::2]
for move in move_curr:
state_matrix[0][move] = 1.0
for move in move_oppo:
state_matrix[1][move] = 1.0
# indicate the last move location
state_matrix[2][actions[-1]] = 1.0
if len(game.action_stack) % 2 == 0:
state_matrix[3][:, :] = 1.0 # indicate the colour to play
return state_matrix[:, ::-1, :]

策略价值网络训练

策略价值网络是一个共享参数 \(\theta\) 的双头网络,给定上面的游戏局面编码会产生预估的p和v。

\[ \vec{p_{\theta}}, v_{\theta}=f_{\theta}(s) \] 结合真实游戏对弈后产生三元组数据 $(s, , z) $ ,按照论文中的loss 来训练神经网络。 \[ l=\sum_{t}\left(v_{\theta}\left(s_{t}\right)-z_{t}\right)^{2}-\vec{\pi_{t}} \cdot \log \left(\vec{p_{\theta}}\left(s_{t}\right)\right) + c {\lVert \theta \rVert}^2 \]

下面代码为Pytorch backward部分。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def backward_step(self, state_batch: List[NetGameState], probs_batch: List[ActionProbs],
value_batch: List[NDArray[(Any), np.float]], lr) -> Tuple[float, float]:
if self.use_gpu:
state_batch = Variable(torch.FloatTensor(state_batch).cuda())
probs_batch = Variable(torch.FloatTensor(probs_batch).cuda())
value_batch = Variable(torch.FloatTensor(value_batch).cuda())
else:
state_batch = Variable(torch.FloatTensor(state_batch))
probs_batch = Variable(torch.FloatTensor(probs_batch))
value_batch = Variable(torch.FloatTensor(value_batch))

self.optimizer.zero_grad()
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr

log_act_probs, value = self.policy_value_net(state_batch)
# loss = (z - v)^2 - pi*T * log(p) + c||theta||^2
value_loss = F.mse_loss(value.view(-1), value_batch)
policy_loss = -torch.mean(torch.sum(probs_batch * log_act_probs, 1))
loss = value_loss + policy_loss
loss.backward()
self.optimizer.step()
entropy = -torch.mean(torch.sum(torch.exp(log_act_probs) * log_act_probs, 1))
return loss.item(), entropy.item()

参考资料

从蒙特卡罗模拟,数学递推到直觉模式来思考 Leetcode 1227 飞机座位分配概率 组合游戏系列4: AlphaGo Zero 强化学习算法原理深度分析

Author and License Contact MyEncyclopedia to Authorize
myencyclopedia.top link https://blog.myencyclopedia.top/zh/2020/combinatorial-game-5-alphago-zero-connect-n/
github.io link https://myencyclopedia.github.io/zh/2020/combinatorial-game-5-alphago-zero-connect-n/

You need to set install_url to use ShareThis. Please set it in _config.yml.

评论

You forgot to set the shortname for Disqus. Please set it in _config.yml.
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×