#Pygame

继上一篇完成了井字棋(N子棋)的minimax 最佳策略后,我们基于Pygame来创造一个图形游戏环境,可供人机和机器对弈,为后续模拟AlphaGo的自我强化学习算法做环境准备。OpenAI Gym 在强化学习领域是事实标准,我们最终封装成OpenAI Gym的接口。本篇所有代码都在github.com/MyEncyclopedia/ConnectNGym

井字棋、五子棋 Pygame 实现

Pygame 井字棋玩家对弈效果

Python 上有Tkinter,PyQt等跨平台GUI类库,主要用于桌面程序编程,但此类库容量较大,编程也相对麻烦。Pygame具有代码少,开发快的优势,比较适合快速开发五子棋这类桌面小游戏。 ### Pygame 极简入门

与所有的GUI开发相同,Pygame也是基于事件的单线程编程模型。下面的例子包含了显示一个最简单GUI窗口,操作系统产生事件并发送到Pygame窗口,while True 控制了python主线程永远轮询事件。我们在这里仅仅判断了当前是否是关闭应用程序事件,如果是则退出进程。此外,clock 用于控制FPS。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
import sys
import pygame
pygame.init()
display = pygame.display.set_mode((800,600))
clock = pygame.time.Clock()

while True:
for event in pygame.event.get():
if event.type == pygame.QUIT:
sys.exit(0)
else:
pygame.display.update()
clock.tick(1)

PyGameBoard 主体代码

PyGameBoard类封装了Pygame实现游戏交互和显示的逻辑。上一篇中,我们完成了ConnectNGame逻辑,这里PyGameBoard需要在初始化时,指定传入ConnectNGame 实例(见下图),支持通过API 方式改变其状态,也支持GUI交互方式等待人类玩家的输入。next_user_input(self)实现了等待人类玩家输入的逻辑,本质上是循环检查GUI事件直到有合法的落子产生。
PyGameBoard Class Diagram
{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class PyGameBoard:

def __init__(self, connectNGame: ConnectNGame):
self.connectNGame = connectNGame
pygame.init()

def next_user_input(self) -> Tuple[int, int]:
self.action = None
while not self.action:
self.check_event()
self._render()
self.clock.tick(60)
return self.action

def move(self, r: int, c: int) -> int:
return self.connectNGame.move(r, c)

if __name__ == '__main__':
connectNGame = ConnectNGame()
pygameBoard = PyGameBoard(connectNGame)
while not pygameBoard.isGameOver():
pos = pygameBoard.next_user_input()
pygameBoard.move(*pos)

pygame.quit()

check_event 较之极简版本增加了处理用户输入事件,这里我们仅支持人类玩家鼠标输入。方法_handle_user_input 将鼠标点击事件转换成棋盘行列值,并判断点击位置是否合法,合法则返回落子位置,类型为Tuple[int, int],例如(0, 0)表示棋盘最左上角位置。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def check_event(self):
for e in pygame.event.get():
if e.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
elif e.type == pygame.MOUSEBUTTONDOWN:
self._handle_user_input(e)

def _handle_user_input(self, e: Event) -> Tuple[int, int]:
origin_x = self.start_x - self.edge_size
origin_y = self.start_y - self.edge_size
size = (self.board_size - 1) * self.grid_size + self.edge_size * 2
pos = e.pos
if origin_x <= pos[0] <= origin_x + size and origin_y <= pos[1] <= origin_y + size:
if not self.connectNGame.gameOver:
x = pos[0] - origin_x
y = pos[1] - origin_y
r = int(y // self.grid_size)
c = int(x // self.grid_size)
valid = self.connectNGame.checkAction(r, c)
if valid:
self.action = (r, c)
return self.action

OpenAI Gym 接口规范

OpenAI Gym规范了Agent和环境(Env)之间的互动,核心抽象接口类是gym.Env,自定义的游戏环境需要继承Env,并实现 reset、step和render方法。下面我们看一下如何具体实现ConnectNGym的这几个方法:

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ConnectNGym(gym.Env):

def reset(self) -> ConnectNGame:
"""Resets the state of the environment and returns an initial observation.

Returns:
observation (object): the initial observation.
"""
raise NotImplementedError


def step(self, action: Tuple[int, int]) -> Tuple[ConnectNGame, int, bool, None]:
"""Run one timestep of the environment's dynamics. When end of
episode is reached, you are responsible for calling `reset()`
to reset this environment's state.

Accepts an action and returns a tuple (observation, reward, done, info).

Args:
action (object): an action provided by the agent

Returns:
observation (object): agent's observation of the current environment
reward (float) : amount of reward returned after previous action
done (bool): whether the episode has ended, in which case further step() calls will return undefined results
info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
"""
raise NotImplementedError



def render(self, mode='human'):
"""
Renders the environment.

The set of supported modes varies per environment. (And some
environments do not support rendering at all.) By convention,
if mode is:

- human: render to the current display or terminal and
return nothing. Usually for human consumption.
- rgb_array: Return an numpy.ndarray with shape (x, y, 3),
representing RGB values for an x-by-y pixel image, suitable
for turning into a video.
- ansi: Return a string (str) or StringIO.StringIO containing a
terminal-style text representation. The text can include newlines
and ANSI escape sequences (e.g. for colors).

Note:
Make sure that your class's metadata 'render.modes' key includes
the list of supported modes. It's recommended to call super()
in implementations to use the functionality of this method.

Args:
mode (str): the mode to render with
"""
raise NotImplementedError

reset 方法

1
def reset(self) -> ConnectNGame

重置环境状态,并返回给Agent重置后环境下观察到的状态。ConnectNGym内部维护了ConnectNGame实例作为自身状态,每个agent落子后会更新这个实例。由于棋类游戏对于玩家来说是完全信息的,我们直接返回ConnectNGame的deepcopy。

step 方法

1
def step(self, action: Tuple[int, int]) -> Tuple[ConnectNGame, int, bool, None]

Agent 选择了某一action后,由环境来执行这个action并返回4个值:1. 执行后的环境Agent观察到的状态;2. 环境执行了这个action回馈给agent的reward;3. 环境是否结束;4. 其余信息。

step方法是最核心的接口,因此举例来说明ConnectNGym中的输入和输出:

初始状态
状态 ((0, 0, 0), (0, 0, 0), (0, 0, 0))

Agent A 选择action = (0, 0),执行ConnectNGym.step 后返回值:status = ((1, 0, 0), (0, 0, 0), (0, 0, 0)),reward = 0,game_end = False

状态 ((1, 0, 0), (0, 0, 0), (0, 0, 0))

Agent B 选择action = (1, 1),执行ConnectNGym.step 后返回值:status = ((1, 0, 0), (0, -1, 0), (0, 0, 0)),reward = 0,game_end = False

状态 ((1, 0, 0), (0, -1, 0), (0, 0, 0))
重复此过程直至游戏结束,下面是5步后游戏可能达到的最终状态
终结状态 ((1, 1, 1), (-1, -1, 0), (0, 0, 0))

此时step的返回值为:status = ((1, 1, 1), (-1, -1, 0), (0, 0, 0)),reward = 1,game_end = True

render 方法

1
def render(self, mode='human')

展现环境,通过mode区分是否是人类玩家。

ConnectNGym 代码

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class ConnectNGym(gym.Env):

def __init__(self, pygameBoard: PyGameBoard, isGUI=True, displaySec=2):
self.pygameBoard = pygameBoard
self.isGUI = isGUI
self.displaySec = displaySec
self.action_space = spaces.Discrete(pygameBoard.board_size * pygameBoard.board_size)
self.observation_space = spaces.Discrete(pygameBoard.board_size * pygameBoard.board_size)
self.seed()
self.reset()

def reset(self) -> ConnectNGame:
self.pygameBoard.connectNGame.reset()
return copy.deepcopy(self.pygameBoard.connectNGame)

def step(self, action: Tuple[int, int]) -> Tuple[ConnectNGame, int, bool, None]:
# assert self.action_space.contains(action)

r, c = action
reward = REWARD_NONE
result = self.pygameBoard.move(r, c)
if self.pygameBoard.isGameOver():
reward = result

return copy.deepcopy(self.pygameBoard.connectNGame), reward, not result is None, None

def render(self, mode='human'):
if not self.isGUI:
self.pygameBoard.connectNGame.drawText()
time.sleep(self.displaySec)
else:
self.pygameBoard.display(sec=self.displaySec)

def get_available_actions(self) -> List[Tuple[int, int]]:
return self.pygameBoard.getAvailablePositions()

井字棋(N子棋)Minimax策略玩家

图中当k=3,m=n=3即井字棋游戏中,两个minimax策略玩家的对弈效果,游戏结局符合已知的结论:井字棋的解是先手被对方逼平。

Minimax策略AI对弈

镜像游戏状态的DP处理

上一篇中,我们确认了井字棋的总状态数是5478。当k=3, m=n=4时是6035992,k=4, m=n=4时是9722011,总的来说游戏状态数是以指数级增长的。上一版minimax DP策略还有改善的空间,第一种是旋转格局的处理。对于任意一种棋盘格局可以得到90度旋转后的另外三种格局,它们的最佳结局是一致的。因此,我们在递归过程中解得某一棋盘格局后,将其另外三种旋转后格局的解也一起缓存起来。例如:

游戏状态1
旋转后的三种游戏状态
{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def similarStatus(self, status: Tuple[Tuple[int, ...]]) -> List[Tuple[Tuple[int, ...]]]:
ret = []
rotatedS = status
for _ in range(4):
rotatedS = self.rotate(rotatedS)
ret.append(rotatedS)
return ret

def rotate(self, status: Tuple[Tuple[int, ...]]) -> Tuple[Tuple[int, ...]]:
N = len(status)
board = [[ConnectNGame.AVAILABLE] * N for _ in range(N)]

for r in range(N):
for c in range(N):
board[c][N - 1 - r] = status[r][c]

return tuple([tuple(board[i]) for i in range(N)])

Minimax 策略预计算

之前我们对每个棋局去计算最佳的下一步,并在此过程中做了剪枝,即当已经找到当前玩家必胜落子时直接返回。这对于单一局面的计算是较优的,但是AI Agent 需要在每一步都重复这个过程,当棋盘大小>3时运算非常耗时,因此我们来做第二种优化。初始空棋盘时使用Minimax来保证遍历所有状态,缓存所有棋局的最佳结果。对于AI Agent面临的每个棋局只需查找此棋局下所有的可能落子位置,并返回最佳决定,这样大大减少了每次棋局下重复的minimax递归计算。相关代码如下。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class PlannedMinimaxStrategy(Strategy):
def __init__(self, game: ConnectNGame):
super().__init__()
self.game = copy.deepcopy(game)
self.dpMap = {} # game_status => result, move
self.result = self.minimax(game.getStatus())


def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
game = copy.deepcopy(game)

player = game.currentPlayer
bestResult = player * -1 # assume opponent win as worst result
bestMove = None
for move in game.getAvailablePositions():
game.move(*move)
status = game.getStatus()
game.undo()

result = self.dpMap[status]

if player == ConnectNGame.PLAYER_A:
bestResult = max(bestResult, result)
else:
bestResult = min(bestResult, result)
# update bestMove if any improvement
bestMove = move if bestResult == result else bestMove
print(f'move {move} => {result}')

return bestResult, bestMove

Agent 类和对弈逻辑

Agent 类的抽象并不是 OpenAI Gym的规范,出于代码扩展性,我们也封装了Agent基类及其子类,包括AI玩家和人类玩家。BaseAgent需要子类实现 act方法,默认实现为随机决定。

{linenos
1
2
3
4
5
6
class BaseAgent(object):
def __init__(self):
pass

def act(self, game: PyGameBoard, available_actions):
return random.choice(available_actions)

AIAgent 实现act并代理给 strategy 的action方法。

{linenos
1
2
3
4
5
6
7
8
class AIAgent(BaseAgent):
def __init__(self, strategy: Strategy):
self.strategy = strategy

def act(self, game: PyGameBoard, available_actions):
result, move = self.strategy.action(game.connectNGame)
assert move in available_actions
return move

HumanAgent 实现act并代理给 PyGameBoard 的next_user_input方法。

{linenos
1
2
3
4
5
6
class HumanAgent(BaseAgent):
def __init__(self):
pass

def act(self, game: PyGameBoard, available_actions):
return game.next_user_input()
Agent Class Diagram

下面代码展示如何将Agent,ConnectNGym,PyGameBoard 等所有上述类串联起来,完成人人对弈,人机对弈。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def play_ai_vs_ai(env: ConnectNGym):
plannedMinimaxAgent = AIAgent(PlannedMinimaxStrategy(env.pygameBoard.connectNGame))
play(env, plannedMinimaxAgent, plannedMinimaxAgent)


def play(env: ConnectNGym, agent1: BaseAgent, agent2: BaseAgent):
agents = [agent1, agent2]

while True:
env.reset()
done = False
agent_id = -1
while not done:
agent_id = (agent_id + 1) % 2
available_actions = env.get_available_actions()
agent = agents[agent_id]
action = agent.act(pygameBoard, available_actions)
_, reward, done, info = env.step(action)
env.render(True)

if done:
print(f'result={reward}')
time.sleep(3)
break


if __name__ == '__main__':
pygameBoard = PyGameBoard(connectNGame=ConnectNGame(board_size=3, N=3))
env = ConnectNGym(pygameBoard)
env.render(True)

play_ai_vs_ai(env)
Class Diagram 总览
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×