Gym 在强化学习领域是事实标准,我们最终封装成OpenAI
Gym的接口。本篇所有代码都在github.com/MyEncyclopedia/ConnectNGym 。
井字棋、五子棋 Pygame 实现
Pygame 井字棋玩家对弈效果
### Pygame 极简入门
{linenos 1 2 3 4 5 6 7 8 9 10 11 12 13 import sysimport pygamepygame.init() display = pygame.display.set_mode((800 ,600 )) clock = pygame.time.Clock() while True : for event in pygame.event.get(): if event.type == pygame.QUIT: sys.exit(0 ) else : pygame.display.update() clock.tick(1 )
PyGameBoard 主体代码
PyGameBoard Class Diagram
{linenos 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class PyGameBoard : def __init__ (self, connectNGame: ConnectNGame ): self.connectNGame = connectNGame pygame.init() def next_user_input (self ) -> Tuple [int , int ]: self.action = None while not self.action: self.check_event() self._render() self.clock.tick(60 ) return self.action def move (self, r: int , c: int ) -> int : return self.connectNGame.move(r, c) if __name__ == '__main__' : connectNGame = ConnectNGame() pygameBoard = PyGameBoard(connectNGame) while not pygameBoard.isGameOver(): pos = pygameBoard.next_user_input() pygameBoard.move(*pos) pygame.quit()
int],例如(0, 0)表示棋盘最左上角位置。
{linenos 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def check_event (self ): for e in pygame.event.get(): if e.type == pygame.QUIT: pygame.quit() sys.exit(0 ) elif e.type == pygame.MOUSEBUTTONDOWN: self._handle_user_input(e) def _handle_user_input (self, e: Event ) -> Tuple [int , int ]: origin_x = self.start_x - self.edge_size origin_y = self.start_y - self.edge_size size = (self.board_size - 1 ) * self.grid_size + self.edge_size * 2 pos = e.pos if origin_x <= pos[0 ] <= origin_x + size and origin_y <= pos[1 ] <= origin_y + size: if not self.connectNGame.gameOver: x = pos[0 ] - origin_x y = pos[1 ] - origin_y r = int (y // self.grid_size) c = int (x // self.grid_size) valid = self.connectNGame.checkAction(r, c) if valid: self.action = (r, c) return self.action
OpenAI Gym 接口规范
{linenos 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 class ConnectNGym (gym.Env ): def reset (self ) -> ConnectNGame: """Resets the state of the environment and returns an initial observation. Returns: observation (object): the initial observation. """ raise NotImplementedError def step (self, action: Tuple [int , int ] ) -> Tuple [ConnectNGame, int , bool , None ]: """Run one timestep of the environment's dynamics. When end of episode is reached, you are responsible for calling `reset()` to reset this environment's state. Accepts an action and returns a tuple (observation, reward, done, info). Args: action (object): an action provided by the agent Returns: observation (object): agent's observation of the current environment reward (float) : amount of reward returned after previous action done (bool): whether the episode has ended, in which case further step() calls will return undefined results info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning) """ raise NotImplementedError def render (self, mode='human' ): """ Renders the environment. The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is: - human: render to the current display or terminal and return nothing. Usually for human consumption. - rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video. - ansi: Return a string (str) or StringIO.StringIO containing a terminal-style text representation. The text can include newlines and ANSI escape sequences (e.g. for colors). Note: Make sure that your class's metadata 'render.modes' key includes the list of supported modes. It's recommended to call super() in implementations to use the functionality of this method. Args: mode (str): the mode to render with """ raise NotImplementedError
reset 方法
1 def reset (self ) -> ConnectNGame
step 方法
1 def step (self, action: Tuple [int , int ] ) -> Tuple [ConnectNGame, int , bool , None ]
Agent 选择了某一action后,由环境来执行这个action并返回4个值:1.
环境执行了这个action回馈给agent的reward;3. 环境是否结束;4.
状态 ((0, 0, 0), (0, 0, 0), (0, 0, 0))
Agent A 选择action = (0, 0),执行ConnectNGym.step 后返回值:status =
((1, 0, 0), (0, 0, 0), (0, 0, 0)),reward = 0,game_end = False
状态 ((1, 0, 0), (0, 0, 0), (0, 0, 0))
Agent B 选择action = (1, 1),执行ConnectNGym.step 后返回值:status =
((1, 0, 0), (0, -1, 0), (0, 0, 0)),reward = 0,game_end = False
状态 ((1, 0, 0), (0, -1, 0), (0, 0, 0))
终结状态 ((1, 1, 1), (-1, -1, 0), (0, 0, 0))
此时step的返回值为:status = ((1, 1, 1), (-1, -1, 0), (0, 0,
0)),reward = 1,game_end = True
render 方法
1 def render (self, mode='human' )
ConnectNGym 代码
{linenos 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class ConnectNGym (gym.Env ): def __init__ (self, pygameBoard: PyGameBoard, isGUI=True , displaySec=2 ): self.pygameBoard = pygameBoard self.isGUI = isGUI self.displaySec = displaySec self.action_space = spaces.Discrete(pygameBoard.board_size * pygameBoard.board_size) self.observation_space = spaces.Discrete(pygameBoard.board_size * pygameBoard.board_size) self.seed() self.reset() def reset (self ) -> ConnectNGame: self.pygameBoard.connectNGame.reset() return copy.deepcopy(self.pygameBoard.connectNGame) def step (self, action: Tuple [int , int ] ) -> Tuple [ConnectNGame, int , bool , None ]: r, c = action reward = REWARD_NONE result = self.pygameBoard.move(r, c) if self.pygameBoard.isGameOver(): reward = result return copy.deepcopy(self.pygameBoard.connectNGame), reward, not result is None , None def render (self, mode='human' ): if not self.isGUI: self.pygameBoard.connectNGame.drawText() time.sleep(self.displaySec) else : self.pygameBoard.display(sec=self.displaySec) def get_available_actions (self ) -> List [Tuple [int , int ]]: return self.pygameBoard.getAvailablePositions()
{linenos 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def similarStatus (self, status: Tuple [Tuple [int , ...]] ) -> List [Tuple [Tuple [int , ...]]]: ret = [] rotatedS = status for _ in range (4 ): rotatedS = self.rotate(rotatedS) ret.append(rotatedS) return ret def rotate (self, status: Tuple [Tuple [int , ...]] ) -> Tuple [Tuple [int , ...]]: N = len (status) board = [[ConnectNGame.AVAILABLE] * N for _ in range (N)] for r in range (N): for c in range (N): board[c][N - 1 - r] = status[r][c] return tuple ([tuple (board[i]) for i in range (N)])
Minimax 策略预计算
{linenos 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class PlannedMinimaxStrategy (Strategy ): def __init__ (self, game: ConnectNGame ): super ().__init__() self.game = copy.deepcopy(game) self.dpMap = {} self.result = self.minimax(game.getStatus()) def action (self, game: ConnectNGame ) -> Tuple [int , Tuple [int , int ]]: game = copy.deepcopy(game) player = game.currentPlayer bestResult = player * -1 bestMove = None for move in game.getAvailablePositions(): game.move(*move) status = game.getStatus() game.undo() result = self.dpMap[status] if player == ConnectNGame.PLAYER_A: bestResult = max (bestResult, result) else : bestResult = min (bestResult, result) bestMove = move if bestResult == result else bestMove print (f'move {move} => {result} ' ) return bestResult, bestMove
Agent 类和对弈逻辑
Agent 类的抽象并不是 OpenAI
{linenos 1 2 3 4 5 6 class BaseAgent (object ): def __init__ (self ): pass def act (self, game: PyGameBoard, available_actions ): return random.choice(available_actions)
AIAgent 实现act并代理给 strategy 的action方法。
{linenos 1 2 3 4 5 6 7 8 class AIAgent (BaseAgent ): def __init__ (self, strategy: Strategy ): self.strategy = strategy def act (self, game: PyGameBoard, available_actions ): result, move = self.strategy.action(game.connectNGame) assert move in available_actions return move
HumanAgent 实现act并代理给 PyGameBoard 的next_user_input方法。
{linenos 1 2 3 4 5 6 class HumanAgent (BaseAgent ): def __init__ (self ): pass def act (self, game: PyGameBoard, available_actions ): return game.next_user_input()
Agent Class Diagram
{linenos 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def play_ai_vs_ai (env: ConnectNGym ): plannedMinimaxAgent = AIAgent(PlannedMinimaxStrategy(env.pygameBoard.connectNGame)) play(env, plannedMinimaxAgent, plannedMinimaxAgent) def play (env: ConnectNGym, agent1: BaseAgent, agent2: BaseAgent ): agents = [agent1, agent2] while True : env.reset() done = False agent_id = -1 while not done: agent_id = (agent_id + 1 ) % 2 available_actions = env.get_available_actions() agent = agents[agent_id] action = agent.act(pygameBoard, available_actions) _, reward, done, info = env.step(action) env.render(True ) if done: print (f'result={reward} ' ) time.sleep(3 ) break if __name__ == '__main__' : pygameBoard = PyGameBoard(connectNGame=ConnectNGame(board_size=3 , N=3 )) env = ConnectNGym(pygameBoard) env.render(True ) play_ai_vs_ai(env)
Class Diagram 总览