#Algorithm

In last episode, we finished up minimax strategy for Connect-N games, including Tic-Tac-Toe and Gomoku. This episode, we will implement its GUI environment based on Pygame library for human vs. human, AI vs. AI or human vs. AI plays, which is essential for self-play AlphaGo Zero reinforcement learning. The environment is further embedded into OpenAI Gym as it's the standard in game reinforcement learning. All code in this series is in ConnectNGym github.

Connect-N Pygame Implementation

Pygame Tic-Tac-Toe Human Play

Python has several well-known multi-platform GUI libraries such as Tkinter, PyQt. They are mainly targeted at desktop GUI programming, whose API family is complicated and learning curve is steep. In contrast, Pygame is tailored specifically for desktop small game development so we adopt it. ### Pygame 101

Pygame is, no exceptionally, the same as all GUI development, that is based on single thread event driven model. Here is the simplest desktop Pygame application showing a window. while True infinitely retrieves events dispatched by OS to the window. In the example, we only handle quit event (user clicking on close button) to exit the whole process. In addition, clock variable controls FPS, which we won't elaborate on.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
import sys
import pygame
pygame.init()
display = pygame.display.set_mode((800,600))
clock = pygame.time.Clock()

while True:
for event in pygame.event.get():
if event.type == pygame.QUIT:
sys.exit(0)
else:
pygame.display.update()
clock.tick(1)

PyGameBoard Class

PyGameBoard class encapsulates GUI interaction and rendering logics. In last episode, we have coded ConnectNGame class. PyGameBoard is instantiated with a pre-initialized ConnectNGame instance. It handles GUI mouse event to determine next valid move and then further manipulates its internal state, which is just the ConnectNGame instance passed in. Concretely, PyGameBoard instance method, next_user_input(self) loops until a valid action is identified by current player.

PyGameBoard Class Diagram
{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class PyGameBoard:

def __init__(self, connectNGame: ConnectNGame):
self.connectNGame = connectNGame
pygame.init()

def next_user_input(self) -> Tuple[int, int]:
self.action = None
while not self.action:
self.check_event()
self._render()
self.clock.tick(60)
return self.action

def move(self, r: int, c: int) -> int:
return self.connectNGame.move(r, c)

if __name__ == '__main__':
connectNGame = ConnectNGame()
pygameBoard = PyGameBoard(connectNGame)
while not pygameBoard.isGameOver():
pos = pygameBoard.next_user_input()
pygameBoard.move(*pos)

pygame.quit()

Following Pygame 101, method check_event() handles events dispatched by OS and only player mouse event is consumed. Method _handle_user_input() converts mouse event into row and column indices, validates the move and returns the move in the form of Tuple[int, int]. For instance, (0, 0) is the upper left corner position.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def check_event(self):
for e in pygame.event.get():
if e.type == pygame.QUIT:
pygame.quit()
sys.exit(0)
elif e.type == pygame.MOUSEBUTTONDOWN:
self._handle_user_input(e)

def _handle_user_input(self, e: Event) -> Tuple[int, int]:
origin_x = self.start_x - self.edge_size
origin_y = self.start_y - self.edge_size
size = (self.board_size - 1) * self.grid_size + self.edge_size * 2
pos = e.pos
if origin_x <= pos[0] <= origin_x + size and origin_y <= pos[1] <= origin_y + size:
if not self.connectNGame.gameOver:
x = pos[0] - origin_x
y = pos[1] - origin_y
r = int(y // self.grid_size)
c = int(x // self.grid_size)
valid = self.connectNGame.checkAction(r, c)
if valid:
self.action = (r, c)
return self.action

Integrated into OpenAI Gym

OpenAI Gym specifies how Agent interacts with Env. Env is defined as gym.Env and the major task of creating a new game Environment is subclassing it and overriding reset, step and render methods. Let's see how our ConnectNGym looks like.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ConnectNGym(gym.Env):

def reset(self) -> ConnectNGame:
"""Resets the state of the environment and returns an initial observation.

Returns:
observation (object): the initial observation.
"""
raise NotImplementedError


def step(self, action: Tuple[int, int]) -> Tuple[ConnectNGame, int, bool, None]:
"""Run one timestep of the environment's dynamics. When end of
episode is reached, you are responsible for calling `reset()`
to reset this environment's state.

Accepts an action and returns a tuple (observation, reward, done, info).

Args:
action (object): an action provided by the agent

Returns:
observation (object): agent's observation of the current environment
reward (float) : amount of reward returned after previous action
done (bool): whether the episode has ended, in which case further step() calls will return undefined results
info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
"""
raise NotImplementedError



def render(self, mode='human'):
"""
Renders the environment.

The set of supported modes varies per environment. (And some
environments do not support rendering at all.) By convention,
if mode is:

- human: render to the current display or terminal and
return nothing. Usually for human consumption.
- rgb_array: Return an numpy.ndarray with shape (x, y, 3),
representing RGB values for an x-by-y pixel image, suitable
for turning into a video.
- ansi: Return a string (str) or StringIO.StringIO containing a
terminal-style text representation. The text can include newlines
and ANSI escape sequences (e.g. for colors).

Note:
Make sure that your class's metadata 'render.modes' key includes
the list of supported modes. It's recommended to call super()
in implementations to use the functionality of this method.

Args:
mode (str): the mode to render with
"""
raise NotImplementedError

Method reset()

1
def reset(self) -> ConnectNGame

Resets environment internal state and returns corresponding initial status that can be observed by agent. ConnectNGym holds an instance of ConnectNGame as its internal state and because of the complete observability property in any board games, the observable state by agent is exactly the same as board game internal state. So we return a deepcopy of ConnectNGame instance in reset().

Method step()

1
def step(self, action: Tuple[int, int]) -> Tuple[ConnectNGame, int, bool, None]

Once the agent selects an action and hands back to env, env would execute the action and change its internal state via step() and returns following four items.

  1. The new state observed by the agent
  2. The reward associated with the action
  3. Environment terminated or not
  4. Other auxiliary information

step() is the most core API of gym.Env. We illustrate a sequence of game state transitions together with input and output

Initial State:
State ((0, 0, 0), (0, 0, 0), (0, 0, 0))

Agent A selects action = (0, 0). ConnectNGym.step() executes the action and returns

status = ((1, 0, 0), (0, 0, 0), (0, 0, 0)),reward = 0,game_end = False

State ((1, 0, 0), (0, 0, 0), (0, 0, 0))

Agent B selects action = (1, 1),ConnectNGym.step() executes the action and returns

status = ((1, 0, 0), (0, -1, 0), (0, 0, 0)),reward = 0,game_end = False

State ((1, 0, 0), (0, -1, 0), (0, 0, 0))
Repeat the process and game may end up with following terminal state after 5 rounds.
Terminal State ((1, 1, 1), (-1, -1, 0), (0, 0, 0))

The last step() returns

status = ((1, 1, 1), (-1, -1, 0), (0, 0, 0)),reward = 1,game_end = True

Method render()

1
def render(self, mode='human')

Renders the env. Mode parameter differentiates player being human or AI agent.

ConnectNGym Implementation

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class ConnectNGym(gym.Env):

def __init__(self, pygameBoard: PyGameBoard, isGUI=True, displaySec=2):
self.pygameBoard = pygameBoard
self.isGUI = isGUI
self.displaySec = displaySec
self.action_space = spaces.Discrete(pygameBoard.board_size * pygameBoard.board_size)
self.observation_space = spaces.Discrete(pygameBoard.board_size * pygameBoard.board_size)
self.seed()
self.reset()

def reset(self) -> ConnectNGame:
self.pygameBoard.connectNGame.reset()
return copy.deepcopy(self.pygameBoard.connectNGame)

def step(self, action: Tuple[int, int]) -> Tuple[ConnectNGame, int, bool, None]:
# assert self.action_space.contains(action)

r, c = action
reward = REWARD_NONE
result = self.pygameBoard.move(r, c)
if self.pygameBoard.isGameOver():
reward = result

return copy.deepcopy(self.pygameBoard.connectNGame), reward, not result is None, None

def render(self, mode='human'):
if not self.isGUI:
self.pygameBoard.connectNGame.drawText()
time.sleep(self.displaySec)
else:
self.pygameBoard.display(sec=self.displaySec)

def get_available_actions(self) -> List[Tuple[int, int]]:
return self.pygameBoard.getAvailablePositions()

Connect-N Enhanced Minimax Strategy

The following animation shows two minimax AI players playing Tic-Tac-Toe game (k=3,m=n=3). We know the conclusion from previous episode that Tic-Tac-Toe is solved to be a draw, meaning when two players both play optimal strategy, the first player is forced tie by second one, which corresponds to animation result.

Minimax AI Self-Play

Game State Rotation Enhancement

In last episode, we have confirmed Tic-Tac-Toe has 5478 total states. The number grows exponentially as k, m and n increase. For instance, in case where k=3, m=n=4 the total state number is 6035992 whereas k=4, m=n=4 it's 9722011. We could improve Minimax DP strategy by pruning those game states that are rotated from one solved game state. That is, once a game state is solved, we not only cache this game state but also cache other three game states derived by rotation that share the same result.

For example, game state below has same result as other three rotated ones.

Game State 1
Other Rotated 3 States
{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def similarStatus(self, status: Tuple[Tuple[int, ...]]) -> List[Tuple[Tuple[int, ...]]]:
ret = []
rotatedS = status
for _ in range(4):
rotatedS = self.rotate(rotatedS)
ret.append(rotatedS)
return ret

def rotate(self, status: Tuple[Tuple[int, ...]]) -> Tuple[Tuple[int, ...]]:
N = len(status)
board = [[ConnectNGame.AVAILABLE] * N for _ in range(N)]

for r in range(N):
for c in range(N):
board[c][N - 1 - r] = status[r][c]

return tuple([tuple(board[i]) for i in range(N)])

Minimax Strategy Precomputation

In last version of Minimax DP strategy implementation, we searched best game result given a game state. In the computation, we also leveraged pruning to shortcut if the result is already best. However, for AI agent, we still have to call minimax for each new game state encountered. This is very inefficient because we are solving same game states again and again during top down recursion. An obvious improvement is to compute all game states in first step and cache them all. Later for each given state encountered, we only need to aggregate result by looking at all possible next move positions of that game state. Code of aggregating possible moves is listed below.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class PlannedMinimaxStrategy(Strategy):
def __init__(self, game: ConnectNGame):
super().__init__()
self.game = copy.deepcopy(game)
self.dpMap = {} # game_status => result, move
self.result = self.minimax(game.getStatus())


def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
game = copy.deepcopy(game)

player = game.currentPlayer
bestResult = player * -1 # assume opponent win as worst result
bestMove = None
for move in game.getAvailablePositions():
game.move(*move)
status = game.getStatus()
game.undo()

result = self.dpMap[status]

if player == ConnectNGame.PLAYER_A:
bestResult = max(bestResult, result)
else:
bestResult = min(bestResult, result)
# update bestMove if any improvement
bestMove = move if bestResult == result else bestMove
print(f'move {move} => {result}')

return bestResult, bestMove

Agent Class and Playing Logic

We also construct Agent class hierarchy, allowing AI player and human player to share common code.

BaseAgent is the root class with default act() method being making random decisions over all available actions.

{linenos
1
2
3
4
5
6
class BaseAgent(object):
def __init__(self):
pass

def act(self, game: PyGameBoard, available_actions):
return random.choice(available_actions)

AIAgent has its act() behavior delegated to strategy's action() method.

{linenos
1
2
3
4
5
6
7
8
class AIAgent(BaseAgent):
def __init__(self, strategy: Strategy):
self.strategy = strategy

def act(self, game: PyGameBoard, available_actions):
result, move = self.strategy.action(game.connectNGame)
assert move in available_actions
return move

HumanAgent delegates act() to PyGameBoard next_user_input().

{linenos
1
2
3
4
5
6
class HumanAgent(BaseAgent):
def __init__(self):
pass

def act(self, game: PyGameBoard, available_actions):
return game.next_user_input()
Agent Class Diagram

Below are code snippets showing how Agent, ConnectNGym, PyGameBoard are connected together to make game play.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def play_ai_vs_ai(env: ConnectNGym):
plannedMinimaxAgent = AIAgent(PlannedMinimaxStrategy(env.pygameBoard.connectNGame))
play(env, plannedMinimaxAgent, plannedMinimaxAgent)


def play(env: ConnectNGym, agent1: BaseAgent, agent2: BaseAgent):
agents = [agent1, agent2]

while True:
env.reset()
done = False
agent_id = -1
while not done:
agent_id = (agent_id + 1) % 2
available_actions = env.get_available_actions()
agent = agents[agent_id]
action = agent.act(pygameBoard, available_actions)
_, reward, done, info = env.step(action)
env.render(True)

if done:
print(f'result={reward}')
time.sleep(3)
break


if __name__ == '__main__':
pygameBoard = PyGameBoard(connectNGame=ConnectNGame(board_size=3, N=3))
env = ConnectNGym(pygameBoard)
env.render(True)

play_ai_vs_ai(env)
Class Diagram Overview

This episode extends last one, where Minimax and Alpha Beta Pruning algorithms are introduced. We will solve several tic-tac-toe problems in leetcode, gathering intuition and building blocks for tic-tac-toe game logic, which can be naturally extended to Connect-N game or Gomoku (N=5). Then we solve tic-tac-toe using Minimax and Alpha Beta pruning for small N and analyze their state space. In the following episodes, based on building blocks here, we will implement a Connect-N Open Gym GUI Environment, where we can play against computer visually or compare different computer algorithms. Finally, we demonstrate how to implement a Monte Carlo Tree Search for Connect-N Game.

Leetcode Tic-Tac-Toe Problems

1275. Find Winner on a Tic Tac Toe Game (Easy)

Tic-tac-toe is played by two players A and B on a 3 x 3 grid.
Here are the rules of Tic-Tac-Toe:
Players take turns placing characters into empty squares (" ").
The first player A always places "X" characters, while the second player B always places "O" characters.
"X" and "O" characters are always placed into empty squares, never on filled ones.
The game ends when there are 3 of the same (non-empty) character filling any row, column, or diagonal.
The game also ends if all squares are non-empty.
No more moves can be played if the game is over. Given an array moves where each element is another array of size 2 corresponding to the row and column of the grid where they mark their respective character in the order in which A and B play.
Return the winner of the game if it exists (A or B), in case the game ends in a draw return "Draw", if there are still movements to play return "Pending".
You can assume that moves is valid (It follows the rules of Tic-Tac-Toe), the grid is initially empty and A will play first.

Example 1:
Input: moves = [[0,0],[2,0],[1,1],[2,1],[2,2]]
Output: "A"
Explanation: "A" wins, he always plays first.
"X " "X " "X " "X " "X "
" " -> " " -> " X " -> " X " -> " X "
" " "O " "O " "OO " "OOX"

Example 2:
Input: moves = [[0,0],[1,1],[0,1],[0,2],[1,0],[2,0]]
Output: "B"
Explanation: "B" wins.
"X " "X " "XX " "XXO" "XXO" "XXO"
" " -> " O " -> " O " -> " O " -> "XO " -> "XO "
" " " " " " " " " " "O "

Example 3:
Input: moves = [[0,0],[1,1],[2,0],[1,0],[1,2],[2,1],[0,1],[0,2],[2,2]]
Output: "Draw"
Explanation: The game ends in a draw since there are no moves to make.
"XXO"
"OOX"
"XOX"

Example 4:
Input: moves = [[0,0],[1,1]]
Output: "Pending"
Explanation: The game has not finished yet.
"X "
" O "
" "

The intuitive solution is to permute all 8 possible winning conditions: 3 vertical lines, 3 horizontal lines and 2 diagonal lines. We keep 8 variables representing each winning condition and a simple trick is converting board state to a 3x3 2d array, whose cell has value -1, 1, and 0. In this way, we can traverse the board state exactly once and in the process determine all 8 variables value by summing corresponding cell value. For example, row[0] is for first line winning condition, summed by all 3 cells in first row during board traveral. It indicates win for first player only when it's equal to 3 and win for second player when it's -3.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# AC
from typing import List

class Solution:
def tictactoe(self, moves: List[List[int]]) -> str:
board = [[0] * 3 for _ in range(3)]
for idx, xy in enumerate(moves):
player = 1 if idx % 2 == 0 else -1
board[xy[0]][xy[1]] = player

turn = 0
row, col = [0, 0, 0], [0, 0, 0]
diag1, diag2 = False, False
for r in range(3):
for c in range(3):
turn += board[r][c]
row[r] += board[r][c]
col[c] += board[r][c]
if r == c:
diag1 += board[r][c]
if r + c == 2:
diag2 += board[r][c]

oWin = any(row[r] == 3 for r in range(3)) or any(col[c] == 3 for c in range(3)) or diag1 == 3 or diag2 == 3
xWin = any(row[r] == -3 for r in range(3)) or any(col[c] == -3 for c in range(3)) or diag1 == -3 or diag2 == -3

return "A" if oWin else "B" if xWin else "Draw" if len(moves) == 9 else "Pending"

Below we give another AC solution. Despite more code, it's more efficient than previous one because for a given game state, it does not need to visit each cell on the board. How is it achieved? The problem guarentees each move is valid, so what's sufficent to examine is to check neighbours of the final move and see if any line including final move creates a winning condition. Later we will reuse the code in this solution to create tic-tac-toe game logic.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# AC
from typing import List

class Solution:
def checkWin(self, r: int, c: int) -> bool:
north = self.getConnectedNum(r, c, -1, 0)
south = self.getConnectedNum(r, c, 1, 0)

east = self.getConnectedNum(r, c, 0, 1)
west = self.getConnectedNum(r, c, 0, -1)

south_east = self.getConnectedNum(r, c, 1, 1)
north_west = self.getConnectedNum(r, c, -1, -1)

north_east = self.getConnectedNum(r, c, -1, 1)
south_west = self.getConnectedNum(r, c, 1, -1)

if (north + south + 1 >= 3) or (east + west + 1 >= 3) or \
(south_east + north_west + 1 >= 3) or (north_east + south_west + 1 >= 3):
return True
return False

def getConnectedNum(self, r: int, c: int, dr: int, dc: int) -> int:
player = self.board[r][c]
result = 0
i = 1
while True:
new_r = r + dr * i
new_c = c + dc * i
if 0 <= new_r < 3 and 0 <= new_c < 3:
if self.board[new_r][new_c] == player:
result += 1
else:
break
else:
break
i += 1
return result

def tictactoe(self, moves: List[List[int]]) -> str:
self.board = [[0] * 3 for _ in range(3)]
for idx, xy in enumerate(moves):
player = 1 if idx % 2 == 0 else -1
self.board[xy[0]][xy[1]] = player

# only check last move
r, c = moves[-1]
win = self.checkWin(r, c)
if win:
return "A" if len(moves) % 2 == 1 else "B"

return "Draw" if len(moves) == 9 else "Pending"

Leetcode 794. Valid Tic-Tac-Toe State(Medium)

A Tic-Tac-Toe board is given as a string array board. Return True if and only if it is possible to reach this board position during the course of a valid tic-tac-toe game.
The board is a 3 x 3 array, and consists of characters " ", "X", and "O". The " " character represents an empty square.
Here are the rules of Tic-Tac-Toe:
Players take turns placing characters into empty squares (" ").
The first player A always places "X" characters, while the second player B always places "O" characters.
"X" and "O" characters are always placed into empty squares, never on filled ones.
The game ends when there are 3 of the same (non-empty) character filling any row, column, or diagonal.
The game also ends if all squares are non-empty.
No more moves can be played if the game is over.

Example 1:
Input: board = ["O ", " ", " "]
Output: false
Explanation: The first player always plays "X".

Example 2:
Input: board = ["XOX", " X ", " "]
Output: false
Explanation: Players take turns making moves.

Example 3:
Input: board = ["XXX", " ", "OOO"]
Output: false

Example 4:
Input: board = ["XOX", "O O", "XOX"]
Output: true

Note: board is a length-3 array of strings, where each string board[i] has length 3.
Each board[i][j] is a character in the set {" ", "X", "O"}.

Surely, it can be solved using DFS, checking if the state given would be reached from initial state. However, this involves lots of states to search. Could we do better? There are obvious properties we can rely on. For example, the number of X is either equal to the number of O or one more. If we can enumerate a combination of necessary and sufficient conditions of checking its reachability, we can solve it in O(1) time complexity.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# AC
from typing import List

class Solution:

def convertCell(self, c:str):
return 1 if c == 'X' else -1 if c == 'O' else 0

def validTicTacToe(self, board: List[str]) -> bool:
turn = 0
row, col = [0, 0, 0], [0, 0, 0]
diag1, diag2 = False, False
for r in range(3):
for c in range(3):
turn += self.convertCell(board[r][c])
row[r] += self.convertCell(board[r][c])
col[c] += self.convertCell(board[r][c])
if r == c:
diag1 += self.convertCell(board[r][c])
if r + c == 2:
diag2 += self.convertCell(board[r][c])

xWin = any(row[r] == 3 for r in range(3)) or any(col[c] == 3 for c in range(3)) or diag1 == 3 or diag2 == 3
oWin = any(row[r] == -3 for r in range(3)) or any(col[c] == -3 for c in range(3)) or diag1 == -3 or diag2 == -3
if (xWin and turn == 0) or (oWin and turn == 1):
return False
return (turn == 0 or turn == 1) and (not xWin or not oWin)

Leetcode 348. Design Tic-Tac-Toe (Medium, Locked)

Design a Tic-tac-toe game that is played between two players on a n x n grid.
You may assume the following rules:
A move is guaranteed to be valid and is placed on an empty block.
Once a winning condition is reached, no more moves is allowed.
A player who succeeds in placing n of their marks in a horizontal, vertical, or diagonal row wins the game.

Example:
Given n = 3, assume that player 1 is "X" and player 2 is "O" in the board.
TicTacToe toe = new TicTacToe(3);

toe.move(0, 0, 1); -> Returns 0 (no one wins)
|X| | |
| | | | // Player 1 makes a move at (0, 0).
| | | |

toe.move(0, 2, 2); -> Returns 0 (no one wins)
|X| |O|
| | | | // Player 2 makes a move at (0, 2).
| | | |

toe.move(2, 2, 1); -> Returns 0 (no one wins)
|X| |O|
| | | | // Player 1 makes a move at (2, 2).
| | |X|

toe.move(1, 1, 2); -> Returns 0 (no one wins)
|X| |O|
| |O| | // Player 2 makes a move at (1, 1).
| | |X|

toe.move(2, 0, 1); -> Returns 0 (no one wins)
|X| |O|
| |O| | // Player 1 makes a move at (2, 0).
|X| |X|

toe.move(1, 0, 2); -> Returns 0 (no one wins)
|X| |O|
|O|O| | // Player 2 makes a move at (1, 0).
|X| |X|

toe.move(2, 1, 1); -> Returns 1 (player 1 wins)
|X| |O|
|O|O| | // Player 1 makes a move at (2, 1).
|X|X|X|

Follow up:
Could you do better than O(n2) per move() operation?

348 is a locked problem. For each player's move, we can resort to checkWin function in second solution for 1275. We show another solution based on first solution of 1275, where 8 winning condition flags are kept and each move only touches associated several flag variables.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# AC
class TicTacToe:

def __init__(self, n:int):
"""
Initialize your data structure here.
:type n: int
"""
self.row, self.col, self.diag1, self.diag2, self.n = [0] * n, [0] * n, 0, 0, n

def move(self, row:int, col:int, player:int) -> int:
"""
Player {player} makes a move at ({row}, {col}).
@param row The row of the board.
@param col The column of the board.
@param player The player, can be either 1 or 2.
@return The current winning condition, can be either:
0: No one wins.
1: Player 1 wins.
2: Player 2 wins.
"""
if player == 2:
player = -1

self.row[row] += player
self.col[col] += player
if row == col:
self.diag1 += player
if row + col == self.n - 1:
self.diag2 += player

if self.n in [self.row[row], self.col[col], self.diag1, self.diag2]:
return 1
if -self.n in [self.row[row], self.col[col], self.diag1, self.diag2]:
return 2
return 0


Optimal Strategy of Tic-Tac-Toe

Tic-tac-toe and Gomoku (Connect Five in a Row) share the same rules and are generally considered as M,n,k-game, where board size range to M x N and winning condition changes to k.

ConnectNGame class implements M,n,k-game of MxM board size. It encapsulates the logic of checking each move and also is able to undo last move to facilitate backtrack in game search algorithm later.

ConnectNGame

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ConnectNGame:

PLAYER_A = 1
PLAYER_B = -1
AVAILABLE = 0
RESULT_TIE = 0
RESULT_A_WIN = 1
RESULT_B_WIN = -1

def __init__(self, N:int = 3, board_size:int = 3):
assert N <= board_size
self.N = N
self.board_size = board_size
self.board = [[ConnectNGame.AVAILABLE] * board_size for _ in range(board_size)]
self.gameOver = False
self.gameResult = None
self.currentPlayer = ConnectNGame.PLAYER_A
self.remainingPosNum = board_size * board_size
self.actionStack = []

def move(self, r: int, c: int) -> int:
"""

:param r:
:param c:
:return: None: game ongoing
"""
assert self.board[r][c] == ConnectNGame.AVAILABLE
self.board[r][c] = self.currentPlayer
self.actionStack.append((r, c))
self.remainingPosNum -= 1
if self.checkWin(r, c):
self.gameOver = True
self.gameResult = self.currentPlayer
return self.currentPlayer
if self.remainingPosNum == 0:
self.gameOver = True
self.gameResult = ConnectNGame.RESULT_TIE
return ConnectNGame.RESULT_TIE
self.currentPlayer *= -1

def undo(self):
if len(self.actionStack) > 0:
lastAction = self.actionStack.pop()
r, c = lastAction
self.board[r][c] = ConnectNGame.AVAILABLE
self.currentPlayer = ConnectNGame.PLAYER_A if len(self.actionStack) % 2 == 0 else ConnectNGame.PLAYER_B
self.remainingPosNum += 1
self.gameOver = False
self.gameResult = None
else:
raise Exception('No lastAction')

def getAvailablePositions(self) -> List[Tuple[int, int]]:
return [(i,j) for i in range(self.board_size) for j in range(self.board_size) if self.board[i][j] == ConnectNGame.AVAILABLE]

def getStatus(self) -> Tuple[Tuple[int, ...]]:
return tuple([tuple(self.board[i]) for i in range(self.board_size)])

Note that checkWin code is identical to second solution in 1275.

Minimax Strategy

Now we have Connect-N game logic, let's finish its minimax algorithm to solve the game.

Define a generic strategy base class, where action method needs to be overridden. Action method expects ConnectNGame class telling current game state and returns a tuple of 2 elements, the first element is the estimated or exact game result after taking action specified by second element. The second element is of form Tuple[int, int], denoting the position of the move, for instance, (1,1).

{linenos
1
2
3
4
5
6
7
8
class Strategy(ABC):

def __init__(self):
super().__init__()

@abstractmethod
def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
pass
MinimaxStrategy code is very similar to previous minimax algorithms. The only added piece is the corresponding move returned by action method.
{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class MinimaxStrategy(Strategy):
def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
self.game = copy.deepcopy(game)
result, move = self.minimax()
return result, move

def minimax(self) -> Tuple[int, Tuple[int, int]]:
game = self.game
bestMove = None
assert not game.gameOver
if game.currentPlayer == ConnectNGame.PLAYER_A:
ret = -math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
result, oppMove = self.minimax()
game.undo()
ret = max(ret, result)
bestMove = move if ret == result else bestMove
if ret == 1:
return 1, move
return ret, bestMove
else:
ret = math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
result, oppMove = self.minimax()
game.undo()
ret = min(ret, result)
bestMove = move if ret == result else bestMove
if ret == -1:
return -1, move
return ret, bestMove
We plot up to first 2 moves with code above. For first player O, there are possibly 9 positions, where due to symmetry, only 3 kinds of moves, which we call corner, edge and center, respectively. The following graph shows whatever 9 positions the first player takes, the best result is draw. So solution of tic-tac-toe is draw.
Tic-tac-toe 9 First Step
Plot first step of 3 kinds of moves one by one below.
Tic-tac-toe First Step Corner
Tic-tac-toe First Step Edge
Tic-tac-toe First Step Center

Tic-tac-toe Solution and Number of States

An interesting question is the number of game states of tic-tac-toe. A loosely upper bound can be derived by \(3^9=19683\), which includes lots of inreachable states. This article Tic-Tac-Toe (Naughts and Crosses, Cheese and Crackers, etc lists number of states after each move. The total number is 5478.

Moves Positions Terminal Positions
0 1
1 9
2 72
3 252
4 756
5 1260 120
6 1520 148
7 1140 444
8 390 168
9 78 78
Total 5478 958

We can verify the number if we change a little of existing code to code below.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

class CountingMinimaxStrategy(Strategy):
def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
self.game = copy.deepcopy(game)
self.dpMap = {}
result, move = self.minimax(game.getStatus())
return result, move

def minimax(self, gameStatus: Tuple[Tuple[int, ...]]) -> Tuple[int, Tuple[int, int]]:
# print(f'Current {len(strategy.dpMap)}')

if gameStatus in self.dpMap:
return self.dpMap[gameStatus]

game = self.game
bestMove = None
assert not game.gameOver
if game.currentPlayer == ConnectNGame.PLAYER_A:
ret = -math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
result, oppMove = self.minimax(game.getStatus())
self.dpMap[game.getStatus()] = result, oppMove
else:
self.dpMap[game.getStatus()] = result, move
game.undo()
ret = max(ret, result)
bestMove = move if ret == result else bestMove
self.dpMap[gameStatus] = ret, bestMove
return ret, bestMove
else:
ret = math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)

if result is None:
assert not game.gameOver
result, oppMove = self.minimax(game.getStatus())
self.dpMap[game.getStatus()] = result, oppMove
else:
self.dpMap[game.getStatus()] = result, move
game.undo()
ret = min(ret, result)
bestMove = move if ret == result else bestMove
self.dpMap[gameStatus] = ret, bestMove
return ret, bestMove


if __name__ == '__main__':
tic_tac_toe = ConnectNGame(N=3, board_size=3)
strategy = CountingMinimaxStrategy()
strategy.action(tic_tac_toe)
print(f'Game States Number {len(strategy.dpMap)}')

Running the code proves the total number is 5478. Also illustrate some small scale game configuration results.

3x3 4x4
k=3 5478 (Draw) 6035992 (Win)
k=4 9722011 (Draw)
k=5

According to Wikipedia M,n,k-game, below are results for some game configuration.

3x3 4x4 5x5 6x6
k=3 Draw Win Win Win
k=4 Draw Draw Win
k=5 Draw Draw

What's worth mentioning is that Gomoku (Connect Five in a Row), of board size MxM >= 15x15 is proved by L. Victor Allis to be Win.

Alpha-Beta Pruning Strategy

Alpha Beta Pruning Strategy is pasted below.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class AlphaBetaStrategy(Strategy):
def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
self.game = game
result, move = self.alpha_beta(self.game.getStatus(), -math.inf, math.inf)
return result, move

def alpha_beta(self, gameStatus: Tuple[Tuple[int, ...]], alpha:int=None, beta:int=None) -> Tuple[int, Tuple[int, int]]:
game = self.game
bestMove = None
assert not game.gameOver
if game.currentPlayer == ConnectNGame.PLAYER_A:
ret = -math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
result, oppMove = self.alpha_beta(game.getStatus(), alpha, beta)
game.undo()
alpha = max(alpha, result)
ret = max(ret, result)
bestMove = move if ret == result else bestMove
if alpha >= beta or ret == 1:
return ret, move
return ret, bestMove
else:
ret = math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
result, oppMove = self.alpha_beta(game.getStatus(), alpha, beta)
game.undo()
beta = min(beta, result)
ret = min(ret, result)
bestMove = move if ret == result else bestMove
if alpha >= beta or ret == -1:
return ret, move
return ret, bestMove

Rewrite alpha beta pruning with DP, where we omit alpha and beta parameters in alpha_beta_dp because lru_cache cannot specify effective parameters. Instead, we keep alpha and beta in a stack variable and maintain the stack according to alpha_bate_dp calling stack.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class AlphaBetaDPStrategy(Strategy):
def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
self.game = game
self.alphaBetaStack = [(-math.inf, math.inf)]
result, move = self.alpha_beta_dp(self.game.getStatus())
return result, move

@lru_cache(maxsize=None)
def alpha_beta_dp(self, gameStatus: Tuple[Tuple[int, ...]]) -> Tuple[int, Tuple[int, int]]:
alpha, beta = self.alphaBetaStack[-1]
game = self.game
bestMove = None
assert not game.gameOver
if game.currentPlayer == ConnectNGame.PLAYER_A:
ret = -math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
self.alphaBetaStack.append((alpha, beta))
result, oppMove = self.alpha_beta_dp(game.getStatus())
self.alphaBetaStack.pop()
game.undo()
alpha = max(alpha, result)
ret = max(ret, result)
bestMove = move if ret == result else bestMove
if alpha >= beta or ret == 1:
return ret, move
return ret, bestMove
else:
ret = math.inf
for pos in game.getAvailablePositions():
move = pos
result = game.move(*pos)
if result is None:
assert not game.gameOver
self.alphaBetaStack.append((alpha, beta))
result, oppMove = self.alpha_beta_dp(game.getStatus())
self.alphaBetaStack.pop()
game.undo()
beta = min(beta, result)
ret = min(ret, result)
bestMove = move if ret == result else bestMove
if alpha >= beta or ret == -1:
return ret, move
return ret, bestMove

This series, we deal with zero-sum turn-based board game algorithm, a sub type of combinatorial games. We start off with small search space problem, introduce classic algorithms and corresponding combinatorial gaming theory and ultimately end with modern approximating Deep RL techniques. From there, after stepping stone is laid, we are able to learn and appreciate how AlphaGo works. In this first episode, we illustrate 3 classic gaming problems in leetcode and solve them from brute force version to DP version then finally rewrite them using classic gaming algorithms, minimax and alpha beta pruning.

Leetcode 292 Nim Game (Easy)

Let's start with an easy Leetcode gaming problem, Leetcode 292 Nim Game.

You are playing the following Nim Game with your friend: There is a heap of stones on the table, each time one of you take turns to remove 1 to 3 stones. The one who removes the last stone will be the winner. You will take the first turn to remove the stones.
Both of you are very clever and have optimal strategies for the game. Write a function to determine whether you can win the game given the number of stones in the heap.

Example:
Input: 4
Output: false
Explanation: If there are 4 stones in the heap, then you will never win the game;
No matter 1, 2, or 3 stones you remove, the last stone will always be removed by your friend.

Let \(f(n)\) be the result, either Win or Lose, when you take turn to make optimal move for the case of \(n\) stones. The first non trial case is \(f(4)\). By playing optimal strategies, it is equivalent to saying if there is any chance that leads to Win, you will definitely choose it. So you try 1, 2, 3 stones and see whether your opponent has any chance to win. Obviously, \(f(1) = f(2) = f(3) = Win\). Therefore, \(f(4)\) is guranteed to lose. Generally, the recurrence relation is given by \[ f(n) = \neg (f(n-1) \land f(n-2) \land f(n-3)) \]

This translates straightforwardly to following Python 3 code.
{linenos
1
2
3
4
5
6
7
8
9
10
11
# TLE
# Time Complexity: O(exponential)
class Solution_BruteForce:

def canWinNim(self, n: int) -> bool:
if n <= 3:
return True
for i in range(1, 4):
if not self.canWinNim(n - i):
return True
return False
Since this brute force version has same recursive manner as fibonacci number, the complexity is exponential so it won't pass test. This can be visually verified by following call graph. Notice, node 5 is expanded entirely twice and node 4 is expanded 4 times.
292 Nim Game Brute Force Call Graph, n=7
As what we optimize for computing fibonacci, we cache the result for smaller number and compute larger value based on previous ones. In Python, we can achieve the DP cache effect by merely adding one line, the magical decorator lru_cache. In this way, runtime complexity is drastically reduced to \(O(N)\).
{linenos
1
2
3
4
5
6
7
8
9
10
11
12
# RecursionError: maximum recursion depth exceeded in comparison n=1348820612
# Time Complexity: O(N)
class Solution_DP:
from functools import lru_cache
@lru_cache(maxsize=None)
def canWinNim(self, n: int) -> bool:
if n <= 3:
return True
for i in range(1, 4):
if not self.canWinNim(n - i):
return True
return False
Plotting the call graph below helps to verify that. This time, node 5 and 4 are not explored to bottom multiple times. The green node denotes such cache hit.
292 Nim Game DP Call Graph, n=7

However, for this problem, lru_cache is not enough to AC because for large n, such as 1348820612, the implementation suffers from stack overflow. We can, of course, rewrite it in iterative forwarding loop manner. But still TLE.

{linenos
1
2
3
4
5
6
7
8
9
10
11
# TLE for 1348820612
# Time Complexity: O(N)
class Solution:
def canWinNim(self, n: int) -> bool:
if n <= 3:
return True
last3, last2, last1 = True, True, True
for i in range(4, n+1):
this = not (last3 and last2 and last1)
last3, last2, last1 = last2, last1, this
return last1

So AC code requires at most sublinear complexity. The last version also gives us some intuition that win lose may have period of 4. Actually, if you arrange all \(f(n)\) one by one, it's obvious that any \(n \mod 4 = 0\) leads to Lose and other cases lead to Win. Why? Suppose you start with \(4k+i (i=1,2,3)\), you can always remove \(i\) stones and leave \(4k\) stones to your opponent. Whatever he chooses, you are returned with situation \(4k_1 + i_1 (i_1 = 1,2,3)\). This pattern repeats until you have 1, 2, 3 remaining stones.

Win Lose Distribution

Below is one liner AC version.

{linenos
1
2
3
4
5
# AC
# Time Complexity: O(1)
class Solution:
def canWinNim(self, n: int) -> bool:
return not (n % 4 == 0)

Leetcode 486 Predict the Winner (Medium)

Let's exercise a harder problem, Leetcode 486 Predict the Winner.

Given an array of scores that are non-negative integers. Player 1 picks one of the numbers from either end of the array followed by the player 2 and then player 1 and so on. Each time a player picks a number, that number will not be available for the next player. This continues until all the scores have been chosen. The player with the maximum score wins.
Given an array of scores, predict whether player 1 is the winner. You can assume each player plays to maximize his score.

Example 1:
Input: [1, 5, 2]
Output: False
Explanation: Initially, player 1 can choose between 1 and 2.
If he chooses 2 (or 1), then player 2 can choose from 1 (or 2) and 5. If player 2 chooses 5, then player 1 will be left with 1 (or 2).
So, final score of player 1 is 1 + 2 = 3, and player 2 is 5.
Hence, player 1 will never be the winner and you need to return False.

Example 2:
Input: [1, 5, 233, 7]
Output: True
Explanation: Player 1 first chooses 1. Then player 2 have to choose between 5 and 7. No matter which number player 2 choose, player 1 can choose 233.
Finally, player 1 has more score (234) than player 2 (12), so you need to return True representing player1 can win.

For a player, he can choose leftmost or rightmost one and leave remaining array to his opponent. Let us define maxDiff(l, r) to be the maximum difference current player can get, who is facing situation of subarray \([l, r]\).

\[ \begin{equation*} \operatorname{maxDiff}(l, r) = \max \begin{cases} nums[l] - \operatorname{maxDiff}(l + 1, r)\\\\ nums[r] - \operatorname{maxDiff}(l, r - 1) \end{cases} \end{equation*} \]

Runtime complexity can be written as following recurrence. \[ f(n) = 2f(n-1) = O(2^n) \]

Surprisingly, this time brute force version passed, but on the edge of rejection (6300ms).

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# AC
# Time Complexity: O(2^N)
# Slow: 6300ms
from typing import List

class Solution:

def maxDiff(self, l: int, r:int) -> int:
if l == r:
return self.nums[l]
return max(self.nums[l] - self.maxDiff(l + 1, r), self.nums[r] - self.maxDiff(l, r - 1))

def PredictTheWinner(self, nums: List[int]) -> bool:
self.nums = nums
return self.maxDiff(0, len(nums) - 1) >= 0

Exponential runtime complexity can also be verified by call graph below.
486 Predict the Winner Brute Force Call Graph, n=4

Again, be aware we have repeated computation over same node, for example, [1-2] node is expanded entirely for the second time when going from root to right node. Applying the same lru_cache trick, the one liner decorating maxDiff, we passed again with runtime complexity \(O(n^2)\) and running time 43ms, trial change but substantial improvement!

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# AC
# Time Complexity: O(N^2)
# Fast: 43ms
from functools import lru_cache
from typing import List

class Solution:

@lru_cache(maxsize=None)
def maxDiff(self, l: int, r:int) -> int:
if l == r:
return self.nums[l]
return max(self.nums[l] - self.maxDiff(l + 1, r), self.nums[r] - self.maxDiff(l, r - 1))

def PredictTheWinner(self, nums: List[int]) -> bool:
self.nums = nums
return self.maxDiff(0, len(nums) - 1) >= 0

Taking look at DP version call graph, this time, [1-2] node is not re-computed in right branch.
486 Predict the Winner DP Call Graph, n=4

Leetcode 464 Can I Win (Medium)

A similar but slightly difficult problem is Leetcode 464 Can I Win, where bit mask with DP technique is employed.

In the "100 game," two players take turns adding, to a running total, any integer from 1..10. The player who first causes the running total to reach or exceed 100 wins.
What if we change the game so that players cannot re-use integers?
For example, two players might take turns drawing from a common pool of numbers of 1..15 without replacement until they reach a total >= 100.
Given an integer maxChoosableInteger and another integer desiredTotal, determine if the first player to move can force a win, assuming both players play optimally.
You can always assume that maxChoosableInteger will not be larger than 20 and desiredTotal will not be larger than 300.

Example
Input:
maxChoosableInteger = 10
desiredTotal = 11
Output:
false
Explanation:
No matter which integer the first player choose, the first player will lose.
The first player can choose an integer from 1 up to 10.
If the first player choose 1, the second player can only choose integers from 2 up to 10.
The second player will win by choosing 10 and get a total = 11, which is >= desiredTotal.
Same with other integers chosen by the first player, the second player will always win.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# AC
# Time Complexity: O:(2^m*m), m: maxChoosableInteger
class Solution:
from functools import lru_cache
@lru_cache(maxsize=None)
def recurse(self, status: int, currentTotal: int) -> bool:
for i in range(1, self.maxChoosableInteger + 1):
if not (status >> i & 1):
new_status = 1 << i | status
if currentTotal + i >= self.desiredTotal:
return True
if not self.recurse(new_status, currentTotal + i):
return True
return False


def canIWin(self, maxChoosableInteger: int, desiredTotal: int) -> bool:
self.maxChoosableInteger = maxChoosableInteger
self.desiredTotal = desiredTotal

sum = maxChoosableInteger * (maxChoosableInteger + 1) / 2
if sum < desiredTotal:
return False
return self.recurse(0, 0)

Because there are \(2^m\) states and for each state we need to probe at most \(m\) options, so the overall runtime complexity is \(O(m 2^m)\), where m is maxChoosableInteger.

Minimax Algorithm

Up till now, we've seen serveral zero-sum turn based gaming in leetcode. In fact, there is more general algorithm for this type of gaming, named, minimax algorithm with alternate moves. The general setting is that, two players play in turn. The first player is trying to maximize game value and second player trying to minimize game value. For example, the following graph shows all nodes, labelled by its value. Computing from bottom up, the first player (max) can get optimal value -7, assuming both players play optimially.

Wikipedia Minimax Example

Pseudo code in Python 3 is listed below.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
def minimax(node: Node, depth: int, maximizingPlayer: bool) -> int:
if depth == 0 or is_terminal(node):
return evaluate_terminal(node)
if maximizingPlayer:
value:int = −∞
for child in node:
value = max(value, minimax(child, depth − 1, False))
return value
else: # minimizing player
value := +∞
for child in node:
value = min(value, minimax(child, depth − 1, True))
return value

Minimax: 486 Predict the Winner

We know leetcode 486 Predict the Winner is zero-sum turn-based game. Hence, theoretically, we can come up with a minimax algorithm for it. But the difficulty lies in how we define value or utility for it. In previous section, we've defined maxDiff(l, r) to be the maximum difference for current player, who is left with sub array \([l, r]\). In the most basic case, where only one element x is left, it's intuitive to define +x for max player and -x for min player. If we merge it with minimax algorithm, it's naturally follows that, the total reward got by max player is \(+a_1 + a_2 + ... = A\) and reward by min player is \(-b_1 - b_2 - ... = -B\), and max player aims to \(max(A-B)\) while min player aims to \(min(A-B)\). With that in mind, code is not hard to implement.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# AC
from functools import lru_cache
from typing import List

class Solution:
# max_player: max(A - B)
# min_player: min(A - B)
@lru_cache(maxsize=None)
def minimax(self, l: int, r: int, isMaxPlayer: bool) -> int:
if l == r:
return self.nums[l] * (1 if isMaxPlayer else -1)

if isMaxPlayer:
return max(
self.nums[l] + self.minimax(l + 1, r, not isMaxPlayer),
self.nums[r] + self.minimax(l, r - 1, not isMaxPlayer))
else:
return min(
-self.nums[l] + self.minimax(l + 1, r, not isMaxPlayer),
-self.nums[r] + self.minimax(l, r - 1, not isMaxPlayer))

def PredictTheWinner(self, nums: List[int]) -> bool:
self.nums = nums
v = self.minimax(0, len(nums) - 1, True)
return v >= 0
Minimax 486 Case [1, 5, 2, 4]

Minimax: 464 Can I Win

For this problem, as often processed in other win-lose-tie game without intermediate intrinsic value, it's typically to define +1 in case max player wins, -1 for min player and 0 for tie. Note the shortcut case for both player. For example, the max player can report Win (value=1) once he finds winning condition (>=desiredTotal) is satisfied during enumerating possible moves he can make. This also makes sense since if he gets 1 during maxing, there can not be other value for further probing that is finally returned. The same optimization will be generalized in the next improved algorithm, alpha beta pruning.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# AC
class Solution:
from functools import lru_cache
@lru_cache(maxsize=None)
# currentTotal < desiredTotal
def minimax(self, status: int, currentTotal: int, isMaxPlayer: bool) -> int:
import math
if status == self.allUsed:
return 0 # draw: no winner

if isMaxPlayer:
value = -math.inf
for i in range(1, self.maxChoosableInteger + 1):
if not (status >> i & 1):
new_status = 1 << i | status
if currentTotal + i >= self.desiredTotal:
return 1 # shortcut
value = max(value, self.minimax(new_status, currentTotal + i, not isMaxPlayer))
if value == 1:
return 1
return value
else:
value = math.inf
for i in range(1, self.maxChoosableInteger + 1):
if not (status >> i & 1):
new_status = 1 << i | status
if currentTotal + i >= self.desiredTotal:
return -1 # shortcut
value = min(value, self.minimax(new_status, currentTotal + i, not isMaxPlayer))
if value == -1:
return -1
return value

Alpha-Beta Pruning

We sensed there is space of optimaization during searching, as illustrated in 464 Can I Win minimax algorithm. Let's formalize this idea, called alpha beta pruning. For each node, we maintain two values alpha and beta, which represent the minimum score that the maximizing player is assured of and the maximum score that the minimizing player is assured of, respectively. The root node has initial alpha = −∞ and beta = +∞, forming valid duration [−∞, +∞]. During top down traversal, child node inherits alpha beta value from its parent node, for example, [alpha, beta], if the updated alpha or beta in the child node no longer forms a valid interval, the branch can be pruned and return immediately. Take following example in Wikimedia for example.

  1. Root node, intially: alpha = −∞, beta = +∞

  2. Root node, after 4 is returned, alpha = 4, beta = +∞

  3. Root node, after 5 is returned, alpha = 5, beta = +∞

  4. Rightmost Min node, intially: alpha = 5, beta = +∞

  5. Rightmost Min node, after 1 is returned: alpha = 5, beta = 1

Here we see [5, 1] no longer is valid interval, so it returns without further probing his 2nd and 3rd child. Why? because if the other child returns value > 1, say 2, it will be replaced by 1 as it's a min node with guarenteed value 1. If the other child returns value < 1, it will be abandoned by root node, a max node, which has already guarenteed to have value >=5. So in this situation, whatever other children return does not impact anything.

Wikimedia Alpha Beta Pruning Example

Pseudo code in Python 3 is listed below.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def alpha_beta(node: Node, depth: int, α: int, β: int, maximizingPlayer: bool) -> int:
if depth == 0 or is_terminal(node):
return evaluate_terminal(node)
if maximizingPlayer:
value: int = −∞
for child in node:
value = max(value, alphabeta(child, depth − 1, α, β, False))
α = max(α, value)
if α >= β:
break # β cut-off
return value
else:
value: int = +∞
for child in node:
value = min(value, alphabeta(child, depth − 1, α, β, True))
β = min(β, value)
if β <= α:
break # α cut-off
return value

Alpha-Beta Pruning: 486 Predict the Winner

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# AC
import math
from functools import lru_cache
from typing import List

class Solution:
def alpha_beta(self, l: int, r: int, curr: int, isMaxPlayer: bool, alpha: int, beta: int) -> int:
if l == r:
return curr + self.nums[l] * (1 if isMaxPlayer else -1)

if isMaxPlayer:
ret = self.alpha_beta(l + 1, r, curr + self.nums[l], not isMaxPlayer, alpha, beta)
alpha = max(alpha, ret)
if alpha >= beta:
return alpha
ret = max(ret, self.alpha_beta(l, r - 1, curr + self.nums[r], not isMaxPlayer, alpha, beta))
return ret
else:
ret = self.alpha_beta(l + 1, r, curr - self.nums[l], not isMaxPlayer, alpha, beta)
beta = min(beta, ret)
if alpha >= beta:
return beta
ret = min(ret, self.alpha_beta(l, r - 1, curr - self.nums[r], not isMaxPlayer, alpha, beta))
return ret

def PredictTheWinner(self, nums: List[int]) -> bool:
self.nums = nums
v = self.alpha_beta(0, len(nums) - 1, 0, True, -math.inf, math.inf)
return v >= 0

Alpha-Beta Pruning: 464 Can I Win

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# AC
class Solution:
from functools import lru_cache
@lru_cache(maxsize=None)
# currentTotal < desiredTotal
def alpha_beta(self, status: int, currentTotal: int, isMaxPlayer: bool, alpha: int, beta: int) -> int:
import math
if status == self.allUsed:
return 0 # draw: no winner

if isMaxPlayer:
value = -math.inf
for i in range(1, self.maxChoosableInteger + 1):
if not (status >> i & 1):
new_status = 1 << i | status
if currentTotal + i >= self.desiredTotal:
return 1 # shortcut
value = max(value, self.alpha_beta(new_status, currentTotal + i, not isMaxPlayer, alpha, beta))
alpha = max(alpha, value)
if alpha >= beta:
return value
return value
else:
value = math.inf
for i in range(1, self.maxChoosableInteger + 1):
if not (status >> i & 1):
new_status = 1 << i | status
if currentTotal + i >= self.desiredTotal:
return -1 # shortcut
value = min(value, self.alpha_beta(new_status, currentTotal + i, not isMaxPlayer, alpha, beta))
beta = min(beta, value)
if alpha >= beta:
return value
return value

C++, Java, Javascript for 486 Predict the Winner

As a bonus, we AC leetcode 486 in C++, Java and Javascript with a bottom up iterative DP. We illustrate this method for other languages not just because lru_cache is available in non Python languages, but also because there are other ways to solve the problem. Notice the topological ordering of DP dependency, building larger DP based on smaller and solved ones. In addition, it's worth mentioning that this approach is guaranteed to have \(n^2\) loops but top down caching approach can have sub \(n^2\) loops.

Java AC Code

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// AC
class Solution {
public boolean PredictTheWinner(int[] nums) {
int n = nums.length;
int[][] dp = new int[n][n];
for (int i = 0; i < n; i++) {
dp[i][i] = nums[i];
}

for (int l = n - 1; l >= 0; l--) {
for (int r = l + 1; r < n; r++) {
dp[l][r] = Math.max(
nums[l] - dp[l + 1][r],
nums[r] - dp[l][r - 1]);
}
}
return dp[0][n - 1] >= 0;
}
}

C++ AC Code

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// AC
class Solution {
public:
bool PredictTheWinner(vector<int>& nums) {
int n = nums.size();
vector<vector<int>> dp(n, vector<int>(n, 0));
for (int i = 0; i < n; i++) {
dp[i][i] = nums[i];
}
for (int l = n - 1; l >= 0; l--) {
for (int r = l + 1; r < n; r++) {
dp[l][r] = max(nums[l] - dp[l + 1][r], nums[r] - dp[l][r - 1]);
}
}
return dp[0][n - 1] >= 0;
}
};

Javascript AC Code

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* @param {number[]} nums
* @return {boolean}
*/
var PredictTheWinner = function(nums) {
const n = nums.length;
const dp = new Array(n).fill().map(() => new Array(n));

for (let i = 0; i < n; i++) {
dp[i][i] = nums[i];
}

for (let l = n - 1; l >=0; l--) {
for (let r = i + 1; r < n; r++) {
dp[l][r] = Math.max(nums[l] - dp[l + 1][r],nums[r] - dp[l][r - 1]);
}
}

return dp[0][n-1] >=0;
};

This is fifth episode of series: TSP From DP to Deep Learning. In this episode, we turn to Reinforcement Learning technology, in particular, a model-free policy gradient method that embeds pointer network to learn minimal tour without supervised best tour label in dataset. Full list of this series is listed below.

Pointer Network Refresher

In previous episode Pointer Networks in PyTorch, we implemented Pointer Networks in PyTorch with a 2D Euclidean dataset.

Recall that the input is a graph as a sequence of \(n\) cities in a two dimensional space

\[ s=\{\mathbf{x_i}\}_{i=1}^n, \mathbf{x}_{i} \in \mathbb{R}^{2} \]

The output is a permutation of the points \(\pi\), that visits each city exactly once and returns to starting point with minimal distance.

Let us define the total distance of a \(\pi\) with respect to \(s\) as \(L\)

\[ L(\pi | s)=\left\|\mathbf{x}_{\pi(n)}-\mathbf{x}_{\pi(1)}\right\|_{2}+\sum_{i=1}^{n-1}\left\|\mathbf{x}_{\pi(i)}-\mathbf{x}_{\pi(i+1)}\right\|_{2} \]

The stochastic policy \(p(\pi | s; \theta)\), parameterized by \(\theta\), is aiming to assign high probabilities to short tours and low probabilities to long tours. The joint probability assumes independency to allow factorization.

\[ p(\pi | s; \theta) = \prod_{i=1}^{n} p\left({\pi(i)} | {\pi(1)}, \ldots, {\pi(i-1)} , s; \theta\right) \]

The loss of the model is cross entropy between the network’s output probabilities \(\pi\) and the best tour \(\hat{\pi}\) generated by a TSP solver.

Contribution made by Pointer networks is that it addressed the constraint in that it allows for dynamic index value given by the particular test case, instead of from a fixed-size vocabulary.

Reinforcement Learning

Neural Combinatorial Optimization with Reinforcement Learning combines the power of Reinforcement Learning (RL) and Deep Learning to further eliminate the constraint required by Pointer Networks that the training dataset has to have supervised labels of best tour. With deep RL, test cases do not need to have a solution which is common pattern in deep RL. In the paper, a model-free policy-based RL method is adopted.

Model-Free Policy Gradient Methods

In the authoritative RL book, chapter 8 Planning and Learning with Tabular Methods, there are two major approaches in RL. One is model-based RL and the other is model-free RL. Distinction between the two relies on concept of model, which is stated as follows:

By a model of the environment we mean anything that an agent can use to predict how the environment will respond to its actions.

So model-based methods demand a model of the environment, and hence dynamic programming and heuristic search fall into this category. With model in mind, utility of the state can be computed in various ways and planning stage that essentially builds policy is needed before agent can take any action. In contrast, model-free methods, without building a model, are more direct, ignoring irrelevant information and just focusing on the policy which is ultimately needed. Typical examples of model-free methods are Monte Carlo Control and Temporal-Difference Learning. >Model-based methods rely on planning as their primary component, while model-free methods primarily rely on learning.

In TSP problem, the model is fully determined by all points given, and no feedback is generated for each decision made. So it's unclear to how to map state value with a tour. Therefore, we turn to model-free methods. In chapter 13 Policy Gradient Methods, a particular approximation model-free method that learns a parameterized policy that can select actions without consulting a value function. This approach fits perfectly with aforementioned pointer networks where the parameterized policy \(p(\pi | s; \theta)\) is already defined.

Training objective is obvious, the expected tour length of \(\pi_\theta\) which, given an input graph \(s\)

\[ J(\theta | s) = \mathbb{E}_{\pi \sim p_{\theta}(\cdot | s)} L(\pi | s) \]

Monte Carlo Policy Gradient: REINFORCE with Baseline

In order to find largest reward, a typical way is to optimize the parameters \(\theta\) in the direction of derivative: \(\nabla_{\theta} J(\theta | s)\).

\[ \nabla_{\theta} J(\theta | s)=\nabla_{\theta} \mathbb{E}_{\pi \sim p_{\theta}(\cdot | s)} L(\pi | s) \]

RHS of equation above is the derivative of expectation that we have no idea how to compute or approximate. Here comes the well-known REINFORCE trick that turns it into form of expectation of derivative, which can be approximated easily with Monte Carlo sampling, where the expectation is replaced by averaging.

\[ \nabla_{\theta} J(\theta | s)=\mathbb{E}_{\pi \sim p_{\theta}(. | s)}\left[L(\pi | s) \nabla_{\theta} \log p_{\theta}(\pi | s)\right] \]

Another common trick, subtracting a baseline \(b(s)\), leads the derivative of reward to the following equation. Note that \(b(s)\) denotes a baseline function that must not depend on \(\pi\). \[ \nabla_{\theta} J(\theta | s)=\mathbb{E}_{\pi \sim p_{\theta}(. | s)}\left[(L(\pi | s)-b(s)) \nabla_{\theta} \log p_{\theta}(\pi | s)\right] \]

The trick is explained in as:

Because the baseline could be uniformly zero, this update is a strict generalization of REINFORCE. In general, the baseline leaves the expected value of the update unchanged, but it can have a large effect on its variance.

Finally, the equation can be approximated with Monte Carlo sampling, assuming drawing \(B\) i.i.d: \(s_{1}, s_{2}, \ldots, s_{B} \sim \mathcal{S}\) and sampling a single tour per graph: $ {i} p{}(. | s_{i}) $, as follows \[ \nabla_{\theta} J(\theta) \approx \frac{1}{B} \sum_{i=1}^{B}\left(L\left(\pi_{i} | s_{i}\right)-b\left(s_{i}\right)\right) \nabla_{\theta} \log p_{\theta}\left(\pi_{i} | s_{i}\right) \]

Actor Critic Methods

REINFORCE with baseline works quite well but it also has disadvantage.

REINFORCE with baseline is unbiased and will converge asymptotically to a local minimum, but like all Monte Carlo methods it tends to learn slowly (produce estimates of high variance) and to be inconvenient to implement online or for continuing problems.

A typical improvement is actor–critic methods, that not only learn approximate policy, the actor job, but also learn approximate value funciton, the critic job. This is because it reduces variance and accelerates learning via a bootstrapping critic that introduce bias which is often beneficial. Detailed algorithm in the paper illustrated below.

\[ \begin{align*} &\textbf{Algorithm Actor-critic training} \\ &1: \quad \textbf{ procedure } \text{ TRAIN(training set }S \text{, training steps }T \text{, batch size } B \text{)} \\ &2: \quad \quad \text{Initialize pointer network params } \theta \\ &3: \quad \quad \text{Initialize critic network params } \theta_{v} \\ &4: \quad \quad \textbf{for }t=1 \text{ to } T \textbf{ do }\\ &5: \quad \quad \quad s_{i} \sim \operatorname{SAMPLE INPUT } (S) \text{ for } i \in\{1, \ldots, B\} \\ &6: \quad \quad \quad \pi_{i} \sim \operatorname{SAMPLE SOLUTION } \left(p_{\theta}\left(\cdot | s_{i}\right)\right) \text{ for } i \in\{1, \ldots, B\} \\ &7: \quad \quad \quad b_{i} \leftarrow b_{\theta_{v}}\left(s_{i}\right) \text{ for } i \in\{1, \ldots, B\} \\ &8: \quad \quad \quad g_{\theta} \leftarrow \frac{1}{B} \sum_{i=1}^{B}\left(L\left(\pi_{i} | s_{i}\right)-b_{i}\right) \nabla_{\theta} \log p_{\theta}\left(\pi_{i} | s_{i}\right) \\ &9: \quad \quad \quad \mathcal{L}_{v} \leftarrow \frac{1}{B} \sum_{i=1}^{B} \left\| b_{i}-L\left(\pi_{i}\right) \right\| _{2}^{2} \\ &10: \quad \quad \quad \theta \leftarrow \operatorname{ADAM} \left( \theta, g_{\theta} \right) \\ &11: \quad \quad \quad \theta_{v} \leftarrow \operatorname{ADAM}\left(\theta_{v}, \nabla_{\theta_{v}} \mathcal{L}_{v}\right) \\ &12: \quad \quad \textbf{end for} \\ &13: \quad \textbf{return } \theta \\ &14: \textbf{end procedure} \end{align*} \]

Implementation in PyTorch

Beam Search in OpenNMT-py

In Episode 4 Search for Most Likely Sequence, an 3x3 rectangle trellis is given and several decoding methods are illustrated in plain python. In PyTorch version, there is a package OpenNMT-py that supports efficient batched beam search. But due to its complicated BeamSearch usage, previous problem is demonstrated using its API. For its details, please refer to Implementing Beam Search — Part 1: A Source Code Analysis of OpenNMT-py

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from copy import deepcopy
from math import exp
import torch
from onmt.translate import BeamSearch, GNMTGlobalScorer

def run_example():
BEAM_SIZE = 2
N_BEST = 1
BATCH_SZ = 1
SEQ_LEN = 3

initial = [0.35, 0.25, 0.4]
transition_matrix = [
[0.3, 0.6, 0.1],
[0.4, 0.2, 0.4],
[0.3, 0.4, 0.4]]

beam = BeamSearch(BEAM_SIZE, BATCH_SZ, 0, 1, 2, N_BEST, GNMTGlobalScorer(0.7, 0., "avg", "none"), 0, 30, False, 0, set(), False, 0.)
device_init = torch.zeros(1, 1)
beam.initialize(device_init, torch.randint(0, 30, (BATCH_SZ,)))

def printBestNPaths(beam: BeamSearch, step: int):
print(f'\nstep {step} beam results:')
for k in range(BEAM_SIZE):
best_path = beam.alive_seq[k].squeeze().tolist()[1:]
prob = exp(beam.topk_log_probs[0][k])
print(f'prob {prob:.3f} with path {best_path}')

init_scores = torch.log(torch.tensor([initial], dtype=torch.float))
init_scores = deepcopy(init_scores.repeat(BATCH_SZ * BEAM_SIZE, 1))
beam.advance(init_scores, None)
printBestNPaths(beam, 0)

for step in range(SEQ_LEN - 1):
idx_list = beam.topk_ids.squeeze().tolist()
beam_transition = []
for idx in idx_list:
beam_transition.append(transition_matrix[idx])
beam_transition_tensor = torch.log(torch.tensor(beam_transition))

beam.advance(beam_transition_tensor, None)
beam.update_finished()

printBestNPaths(beam, step + 1)

The output is as follows. When \(k=2\) and 3 steps, the most likely sequence is \(0 \rightarrow 1 \rightarrow 0\), whose probability is 0.084.

1
2
3
4
5
6
7
8
9
10
11
12
step 0 beam results:
prob 0.400 with path [2]
prob 0.350 with path [0]

step 1 beam results:
prob 0.210 with path [0, 1]
prob 0.160 with path [2, 1]

step 2 beam results:
prob 0.084 with path [0, 1, 0]
prob 0.000 with path [0, 1, 2]

RL with PointerNetwork

The complete code is on github TSP RL. Below are partial core classes.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class CombinatorialRL(nn.Module):
actor: PointerNet

def __init__(self, rnn_type, use_embedding, embedding_size, hidden_size, seq_len, num_glimpse, tanh_exploration, use_tanh, attention):
super(CombinatorialRL, self).__init__()

self.actor = PointerNet(rnn_type, use_embedding, embedding_size, hidden_size, seq_len, num_glimpse, tanh_exploration, use_tanh, attention)

def forward(self, batch_input: Tensor) -> Tuple[Tensor, List[Tensor], List[Tensor], List[Tensor]]:
"""
Args:
batch_input: [batch_size * 2 * seq_len]
Returns:
R: Tensor of shape [batch_size]
action_prob_list: List of [seq_len], tensor shape [batch_size]
action_list: List of [seq_len], tensor shape [batch_size * 2]
action_idx_list: List of [seq_len], tensor shape [batch_size]
"""
batch_size = batch_input.size(0)
seq_len = batch_input.size(2)
prob_list, action_idx_list = self.actor(batch_input)

action_list = []
batch_input = batch_input.transpose(1, 2)
for action_id in action_idx_list:
action_list.append(batch_input[[x for x in range(batch_size)], action_id.data, :])
action_prob_list = []
for prob, action_id in zip(prob_list, action_idx_list):
action_prob_list.append(prob[[x for x in range(batch_size)], action_id.data])

R = self.reward(action_list)

return R, action_prob_list, action_list, action_idx_list

def reward(self, sample_solution: List[Tensor]) -> Tensor:
"""
Computes total distance of tour
Args:
sample_solution: list of size N, each tensor of shape [batch_size * 2]

Returns:
tour_len: [batch_size]

"""
batch_size = sample_solution[0].size(0)
n = len(sample_solution)
tour_len = Variable(torch.zeros([batch_size]))

for i in range(n - 1):
tour_len += torch.norm(sample_solution[i] - sample_solution[i + 1], dim=1)
tour_len += torch.norm(sample_solution[n - 1] - sample_solution[0], dim=1)
return tour_len

References

This is fourth episode of series: TSP From DP to Deep Learning. In this episode, we systematically compare different searching algorithms for finding most likely sequence in the context of simplied markov chain setting. These models can be further utilized in deep learning decoding stage, which will be illustrated in reinforcement learning, in the next episode. Full list of this series is listed below.

Problem as Markov Chain

In sequence-to-sequence problem, we are always faced with same problem of determining the best or most likely sequence of output. This kind of recurring problem exists extensively in algorithms, machine learning where we are given initial states and the dynamics of the system, and the goal is to find a path that is most likely. The corresponding concept, in science or mathematical discipline, is called Markov Chain.

Let describe the problem in the context of markov chain. Suppose there are \(n\) states, and initial state is given by $s_0 = [0.35, 0.25, 0.4] $. The transition matrix is defined by \(T\) where $ T[i][j]$ denotes the probability of transitioning from \(i\) to \(j\). Notice that each row sums to \(1.0\). \[ T= \begin{matrix} & \begin{matrix}0&1&2\end{matrix} \\\\ \begin{matrix}0\\\\1\\\\2\end{matrix} & \begin{bmatrix}0.3&0.6&0.1\\\\0.4&0.2&0.4\\\\0.3&0.3&0.4\end{bmatrix}\\\\ \end{matrix} \]

Probability of the next state \(s_1\) is derived by multiplication of \(s_0\) and \(T\), which can be visually interpreted by animation below.

The actual probability distribution value of \(s_1\) is computed numerically below. Recall that left multiplying a row with a matrix amounts to making a linear combination of that row vector.

\[ s_1 = \begin{bmatrix}0.35& 0.25& 0.4\end{bmatrix} \begin{matrix} \begin{bmatrix}0.3&0.6&0.1\\\\0.4&0.2&0.4\\\\0.3&0.3&0.4\end{bmatrix}\\\\ \end{matrix} = \begin{bmatrix}0.325& 0.35& 0.255\end{bmatrix} \] Again, state \(s_2\) can be derived in the same way \(s_1 \times T\), where we assume the transitioning dynamics remains the same. However, in deep learning problem, the dynamics usually depends on \(s_i\), or vary according to the stage.

Suppose there are only 3 stages in our problem, e.g., \(s_0 \rightarrow s_1 \rightarrow s_2\). Let \(L\) be the number of stages and \(N\) be the number of vertices in each stage. Hence \(L=N=3\) in our problem setting. There could be \(N^L\) different paths starting from initial stage and to final stage.

Let us compute an arbitrary path probability as an example, \(2(s_0) \rightarrow 1(s_1) \rightarrow 2(s_2)\). The total probability is

\[ p(2 \rightarrow 1 \rightarrow 2) = s_0[2] \times T[2][1] \times T[1][2] = 0.4 \times 0.3 \times 0.4 = 0.048 \]

First, we implement \(N^L\) exhaustive or brute force search.

The following Python 3 function returns one most likely sequence and its probability. Running the algorithm with our example produces 0.084 and route \(0 \rightarrow 1 \rightarrow 2\).

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def search_brute_force(initial: List, transition: List, L: int) -> Tuple[float, Tuple]:
from itertools import combinations_with_replacement
v = [0, 1, 2]
path_all = combinations_with_replacement(v, L)

max_prop = 0.0
max_route = None
prob = 0.0
for path in list(path_all):
for idx, v in enumerate(path):
if idx == 0:
prob = initial[v] # reset to initial state
else:
prev_v = path[idx-1]
prob *= transition[prev_v][v]
if prob > max_prop:
max_prop = max(max_prop, prob)
max_route = path
return max_prop, max_route

Exhaustive search always generates most likely sequence, as searching for a needle in the hay at the cost of exponential runtime complexity \(O(N^L)\). The simplest strategy, unknown as greedy, identifies one vertex in each stage and then expand the vertex in next stage. This strategy, of course, is not guaranteed to find most likely sequence but is fast. See animation below.

Code in Python 3 is given below. Numpy package is employed to utilize np.argmax() for code clarity. Notice there are 2 for loops (the other is np.argmax) so the runtime complexity is \(O(N\times L)\).

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def search_greedy(initial: List, transition: List, L: int) -> Tuple[float, Tuple]:
import numpy as np
max_route = []
max_prop = 0.0
states = np.array(initial)

prev_max_v = None
for l in range(0, L):
max_v = np.argmax(states)
max_route.append(max_v)
if l == 0:
max_prop = initial[max_v]
else:
max_prop = max_prop * transition[prev_max_v][max_v]
states = max_prop * states
prev_max_v = max_v

return max_prop, max_route

We could improve greedy strategy a little bit by expanding more vertices in each stage. In beam search with \(k\) nodes, the strategy is in each stage, identify \(k\) nodes with highest probability and expand these \(k\) nodes into next stage. In our example, \(k=2\), we select first 2 nodes in stage \(s_0\), expand these 2 nodes and evaluate \(2 \times 3\) nodes in stage \(s_1\), then select 2 nodes and evaluate 6 nodes in stage \(s_2\). Beam search, similar to greedy strategy, is not guaranteed to find most likely sequence but it extends search space with linear complexity.

Below is implementation in Python 3 with PriorityQueue to select top \(k\) nodes. Notice in order to use reverse order of PriorityQueue, a class with @total_ordering is required to be defined. The runtime complexity is \(O(k\times N \times L)\) .

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def search_beam(initial: List, transition: List, L: int, K: int) -> Tuple[float, Tuple]:
N = len(initial)
from queue import PriorityQueue
current_q = PriorityQueue()
next_q = PriorityQueue()

from functools import total_ordering
@total_ordering
class PQItem(object):
def __init__(self, prob, route):
self.prob = prob
self.route = route
self.last_v = int(route[-1])

def __eq__(self, other):
return self.prob == other.prob

def __lt__(self, other):
return self.prob > other.prob

for v in range(N):
next_q.put(PQItem(initial[v], str(v)))

for l in range(1, L):
current_q = next_q
next_q = PriorityQueue()
k = K
while not current_q.empty() and k > 0:
item = current_q.get()
prob, route, prev_v = item.prob, item.route, item.last_v
k -= 1
for v in range(N):
nextItem = PQItem(prob * transition[prev_v][v], route + str(v))
next_q.put(nextItem)

max_item = next_q.get()

return max_item.prob, list(map(lambda x: int(x), max_item.route))

Viterbi DP

Similarly to TSP DP version, there is a dynamic programming approach, widely known as Viterbi algorithm, that always finds out sequence with max probability while reducing runtime complexity from \(O(N^L)\) to \(O(L \times N \times N)\) (corresponding to 3 loops in code below). The core idea is in each stage, an array keeps most likely sequence ending with each vertex and use the dp array as input to next stage. For example, let \(dp[1][0]\) be the most likely probability in \(s_1\) stage and end with vertex 0. \[ dp[1][0] = \max \\{s_0[0] \rightarrow s_1[0], s_0[1] \rightarrow s_1[0], s_0[2] \rightarrow s_1[0]\\} \]

Illustrative code that returns max probability but not route, in order to emphasize 3 loop pattern and max operation, honoring the essence of the algorithm.

{linenos
1
2
3
4
5
6
7
8
9
10
11
def search_dp(initial: List, transition: List, L: int) -> float:
N = len(initial)
dp = [[0.0 for c in range(N)] for r in range(L)]
dp[0] = initial[:]

for l in range(1, L):
for v in range(N):
for prev_v in range(N):
dp[l][v] = max(dp[l][v], dp[l - 1][prev_v] * transition[prev_v][v])

return max(dp[L-1])

Probabilistic Sampling

All algorithms described above are deterministic. However, in NLP deep learning decoding, deterministic property has disadvantage in that it may get trapped into repeated phrases or sentences. For example, paragraph like below is commonly generated:

1
This is the best of best of best of ...
One way to get out of loop is resorting to probabilistic sampling. For example, we can generate one vertex in each stage probabilistically according to their weights or according to total path probability.

For demonstration purpose, here is the code based on greedy strategy which probabilistically determines one node at each stage.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def search_prob_greedy(initial: List, transition: List, L: int) -> Tuple[float, Tuple]:
import random
N = len(initial)
max_route = []
max_prop = 0.0
vertices = [i for i in range(N)]
prob = initial[:]

for l in range(0, L):
v_lst = random.choices(vertices, prob)
v = v_lst[0]
max_route.append(v)
max_prop = prob[v]
prob = [prob[v] * transition[v][v_target] for v_target in range(N)]

return max_prop, max_route

This is third episode of series: TSP From DP to Deep Learning. In this episode, we will be entering the realm of deep learning, specifically, a type of sequence-to-sequence called Pointer Networks is introduced. It is tailored to solve problems like TSP or Convex Hull. Full list of this series is listed below.

Pointer Networks

In traditional sequence-to-sequence RNN, output classes depend on pre-defined size. For instance, a word generating RNN will utter one word from vocabulary of \(|V|\) size at each time step. However, there is large set of problems such as Convex Hull, Delaunay Triangulation and TSP, where range of the each output is not pre-defined, but of variable size, defined by the input. Pointer Networks overcame the constraint by selecting \(i\) -th input with probability derived from attention score.

Convex Hull

In following example, 10 points are given, the output is a sequence of points that bounds the set of all points. Each value in the output sequence is a integer ranging from 1 to 10, in this case, which is the value given by the concrete example. Generally, finding exact solution has been proven to be equivelent to sort problem, and has time complexity \(O(n*log(n))\).

image info

\[ \begin{align*} &\text{Input: } \mathcal{P} &=& \left\{P_{1}, \ldots, P_{10} \right\} \\ &\text{Output: } C^{\mathcal{P}} &=& \{2,4,3,5,6,7,2\} \end{align*} \]

TSP

TSP is almost identical to Convex Hull problem, though output sequence is of fixed length. In previous epsiode, we reduced from \(O(n!)\) to \(O(n^2*2^n)\).

image info

\[ \begin{align*} &\text{Input: } \mathcal{P} &= &\left\{P_{1}, \ldots, P_{6} \right\} \\ &\text{Output: } C^{\mathcal{P}} &=& \{1,3,2,4,5,6,1\} \end{align*} \]

Delaunay Triangulation

A Delaunay triangulation for a set of points in a plane is a triangulation such that each circumcircle of every triangle is empty, meaning no point from \(\mathcal{P}\) in its interior. This kind of problem outputs a sequence of sets, and each item in the set ranges from the input set \(\mathcal{P}\). image info

\[ \begin{align*} &\text{Input: } \mathcal{P} &=& \left\{P_{1}, \ldots, P_{5} \right\} \\ &\text{Output: } C^{\mathcal{P}} &=& \{(1,2,4),(1,4,5),(1,3,5),(1,2,3)\} \end{align*} \]

Sequence-to-Sequence Model

Suppose now n is fixed. given a training pair, \((\mathcal{P}, C^{\mathcal{P}})\), the vanilla sequence-to-sequence model parameterized by \(\theta\) computes the conditional probability.

\[ \begin{equation} p\left(\mathcal{C}^{\mathcal{P}} | \mathcal{P} ; \theta\right)=\prod_{i=1}^{m(\mathcal{P})} p\left(C_{i} | C_{1}, \ldots, C_{i-1}, \mathcal{P} ; \theta\right) \end{equation} \] The parameters of the model are learnt by maximizing the conditional probabilities for the training set, i.e. \[ \begin{equation} \theta^{*}=\underset{\theta}{\arg \max } \sum_{\mathcal{P}, \mathcal{C}^{\mathcal{P}}} \log p\left(\mathcal{C}^{\mathcal{P}} | \mathcal{P} ; \theta\right) \end{equation} \]

Content Based Input Attention

When attention is applied to vanilla sequence-to-sequence model, better result is obtained.

Let encoder and decoder states be $ (e_{1}, , e_{n}) $ and $ (d_{1}, , d_{m()}) $, respectively. At each output time \(i\), compute the attention vector \(d_i\) to be linear combination of $ (e_{1}, , e_{n}) $ with weights $ (a_{1}^{i}, , a_{n}^{i}) $ \[ d_{i} = \sum_{j=1}^{n} a_{j}^{i} e_{j} \]

$ (a_{1}^{i}, , a_{n}^{i}) $ is softmax value of $ (u_{1}^{i}, , u_{n}^{i}) $ and \(u_{j}^{i}\) can be considered as distance between \(d_{i}\) and \(e_{j}\). Notice that \(v\), \(W_1\), and \(W_2\) are learnable parameters of the model.

\[ \begin{eqnarray} u_{j}^{i} &=& v^{T} \tanh \left(W_{1} e_{j}+W_{2} d_\right) \quad j \in(1, \ldots, n) \\ a_{j}^{i} &=& \operatorname{softmax}\left(u_{j}^{i}\right) \quad j \in(1, \ldots, n) \end{eqnarray} \]

Pointer Networks

image info

Pointer Networks does not blend the encoder state \(e_j\) to propagate extra information to the decoder, but instead, use \(u^i_j\) as pointers to the input element.

\[ \begin{eqnarray*} u_{j}^{i} &=& v^{T} \tanh \left(W_{1} e_{j}+W_{2} d_{i}\right) \quad j \in(1, \ldots, n) \\ p\left(C_{i} | C_{1}, \ldots, C_{i-1}, \mathcal{P}\right) &=& \operatorname{softmax}\left(u^{i}\right) \end{eqnarray*} \]

More on Attention

In FloydHub Blog - Attention Mechanism , a clear and detailed explanation of difference and similarity between the classic first type of Attention, commonly referred to as Additive Attention by Dzmitry Bahdanau and second classic type, known as Multiplicative Attention and proposed by Thang Luong , is discussed.

It's well known that in Luong Attention, three ways of alignment scoring function is defined, or the distance between \(d_{i}\) and \(e_{j}\).

\[ \operatorname{score} \left( d_i, e_j \right)= \begin{cases} d_i^{\top} e_j & \text { dot } \\ d_i^{\top} W_a e_j & \text { general } \\ v_a^{\top} \tanh \left( W_a \left[ d_i ; e_j \right] \right) & \text { concat } \end{cases} \]

PyTorch Implementation

In episode 2, we have introduced TSP dataset where each case is a line, of following form.

1
x0, y0, x1, y1, ... output 1 v1 v2 v3 ... 1

PyTorch Dataset

Each case is converted to (input, input_len, output_in, output_out, output_len) of type nd.ndarray with appropriate padding and encapsulated in a extended PyTorch Dataset.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
from torch.utils.data import Dataset

class TSPDataset(Dataset):
"each data item of form (input, input_len, output_in, output_out, output_len)"
data: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]]

def __len__(self):
return len(self.data)

def __getitem__(self, index):
input, input_len, output_in, output_out, output_len = self.data[index]
return input, input_len, output_in, output_out, output_len
image info

PyTorch pad_packed_sequence

Code in PyTorch seq-to-seq model typically utilizes pack_padded_sequence and pad_packed_sequence API to reduce computational cost. A detailed explanation is given here https://github.com/sgrvinod/a-PyTorch-Tutorial-to-Image-Captioning#decoder-1.

image info
{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class RNNEncoder(nn.Module):
rnn: Union[nn.LSTM, nn.GRU, nn.RNN]

def __init__(self, rnn_type: str, bidirectional: bool, num_layers: int, input_size: int, hidden_size: int, dropout: float):
super(RNNEncoder, self).__init__()
if bidirectional:
assert hidden_size % 2 == 0
hidden_size = hidden_size // 2
self.rnn = rnn_init(rnn_type, input_size=input_size, hidden_size=hidden_size, bidirectional=bidirectional,num_layers=num_layers, dropout=dropout)

def forward(self, src: Tensor, src_lengths: Tensor, hidden: Tensor = None) -> Tuple[Tensor, Tensor]:
lengths = src_lengths.view(-1).tolist()
packed_src = pack_padded_sequence(src, lengths)
memory_bank, hidden_final = self.rnn(packed_src, hidden)
memory_bank = pad_packed_sequence(memory_bank)[0]
return memory_bank, hidden_final

Attention Code

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Attention(nn.Module):
linear_out: nn.Linear

def __init__(self, dim: int):
super(Attention, self).__init__()
self.linear_out = nn.Linear(dim * 2, dim, bias=False)

def score(self, src: Tensor, target: Tensor) -> Tensor:
batch_size, src_len, dim = src.size()
_, target_len, _ = target.size()
target_ = target
src_ = src.transpose(1, 2)
return torch.bmm(target_, src_)

def forward(self, src: Tensor, target: Tensor, src_lengths: Tensor) -> Tuple[Tensor, Tensor]:
assert target.dim() == 3

batch_size, src_len, dim = src.size()
_, target_len, _ = target.size()

align_score = self.score(src, target)

mask = sequence_mask(src_lengths)
# (batch_size, max_len) -> (batch_size, 1, max_len)
mask = mask.unsqueeze(1)
align_score.data.masked_fill_(~mask, -float('inf'))
align_score = F.softmax(align_score, -1)

c = torch.bmm(align_score, src)

concat_c = torch.cat([c, target], -1)
attn_h = self.linear_out(concat_c)

return attn_h, align_score

Complete PyTorch implementation source code is also available on github.

References

This is second episode of series: TSP From DP to Deep Learning.

AIZU TSP Bottom Up Iterative DP

In last episode, we provided a top down recursive DP in Python 3 and Java 8. Now we continue to improve and convert it to bottom up iterative DP version. Below is a graph with 3 vertices, the top down recursive calls are completely drawn.

Looking from bottom up, we could identify corresponding topological computing order with ease. First, we compute all bit states with 3 ones, then 2 ones, then 1 one.

Pseudo Java code below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for (int bitset_num = N; bitset_num >=0; bitset_num++) {
while(hasNextCombination(bitset_num)) {
int state = nextCombination(bitset_num);
// compute dp[state][v], v-th bit is set in state
for (int v = 0; v < n; v++) {
for (int u = 0; u < n; u++) {
// for each u not reached by this state
if (!include(state, u)) {
dp[state][v] = min(dp[state][v],
dp[new_state_include_u][u] + dist[v][u]);
}
}
}
}
}

For example, dp[00010][1] is the min distance starting from vertex 0, and just arriving at vertex 1: \(0 \rightarrow 1 \rightarrow ? \rightarrow ? \rightarrow ? \rightarrow 0\). In order to find out total min distance, we need to enumerate all possible u for first question mark. \[ (0 \rightarrow 1) + \begin{align*} \min \left\lbrace \begin{array}{r@{}l} 2 \rightarrow ? \rightarrow ? \rightarrow 0 + dist(1,2) \qquad\text{ new_state=[00110][2] } \qquad\\\\ 3 \rightarrow ? \rightarrow ? \rightarrow 0 + dist(1,3) \qquad\text{ new_state=[01010][3] } \qquad\\\\ 4 \rightarrow ? \rightarrow ? \rightarrow 0 + dist(1,4) \qquad\text{ new_state=[10010][4] } \qquad \end{array} \right. \end{align*} \]

Java Iterative DP Code

AC code in Python 3 and Java 8. Illustrate core Java code below.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public long solve() {
int N = g.V_NUM;
long[][] dp = new long[1 << N][N];
// init dp[][] with MAX
for (int i = 0; i < dp.length; i++) {
Arrays.fill(dp[i], Integer.MAX_VALUE);
}
dp[(1 << N) - 1][0] = 0;

for (int state = (1 << N) - 2; state >= 0; state--) {
for (int v = 0; v < N; v++) {
for (int u = 0; u < N; u++) {
if (((state >> u) & 1) == 0) {
dp[state][v] = Math.min(dp[state][v], dp[state | 1 << u][u] + g.edges[v][u]);
}
}
}
}
return dp[0][0] == Integer.MAX_VALUE ? -1 : dp[0][0];
}

In this way, runtime complexity can be spotted easily, three for loops leading to O(\(2^n * n * n\)) = O(\(2^n*n^2\) ).

DP on Euclidean Dataset

So far, TSP DP has been crystal clear and we move forward to introducing PTR_NET dataset on Google Drive by Oriol Vinyals who is the author of Pointer Networks. Each line in the dataset has the following pattern:

1
x0, y0, x1, y1, ... output 1 v1 v2 v3 ... 1

It first lists n points in (x, y) coordinate, followed by "output", then followed by one of the minimal distance tours, starting and ending with vertex 1 (indexed from 1 not 0).

Some examples of 10 vertices are:

1
2
3
4
0.607122 0.664447 0.953593 0.021519 0.757626 0.921024 0.586376 0.433565 0.786837 0.052959 0.016088 0.581436 0.496714 0.633571 0.227777 0.971433 0.665490 0.074331 0.383556 0.104392 output 1 3 8 6 10 9 5 2 4 7 1 
0.930534 0.747036 0.277412 0.938252 0.794592 0.794285 0.961946 0.261223 0.070796 0.384302 0.097035 0.796306 0.452332 0.412415 0.341413 0.566108 0.247172 0.890329 0.429978 0.232970 output 1 3 2 9 6 5 8 7 10 4 1
0.686712 0.087942 0.443054 0.277818 0.494769 0.985289 0.559706 0.861138 0.532884 0.351913 0.712561 0.199273 0.554681 0.657214 0.909986 0.277141 0.931064 0.639287 0.398927 0.406909 output 1 5 2 10 7 4 3 9 8 6 1

Plot first example using code below.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import matplotlib.pyplot as plt
points='0.607122 0.664447 0.953593 0.021519 0.757626 0.921024 0.586376 0.433565 0.786837 0.052959 0.016088 0.581436 0.496714 0.633571 0.227777 0.971433 0.665490 0.074331 0.383556 0.104392'
float_list = list(map(lambda x: float(x), points.split(' ')))

x,y = [],[]
for idx, p in enumerate(float_list):
if idx % 2 == 0:
x.append(p)
else:
y.append(p)

for i in range(0, len(x)):
for j in range(0, len(x)):
if i == j:
continue
plt.plot((x[i],x[j]),(y[i],y[j]))

plt.show()

TSP Case Fully Connected

Now plot the optimal tour: \[ 1 \rightarrow 3 \rightarrow 8 \rightarrow 6 \rightarrow 10 \rightarrow 9 \rightarrow 5 \rightarrow 2 \rightarrow 4 \rightarrow 7 \rightarrow 1 \]

{linenos
1
2
3
4
5
6
7
8
tour_str = '1 3 8 6 10 9 5 2 4 7 1'
tour = list(map(lambda x: int(x), tour_str.split(' ')))

for i in range(0, len(tour)-1):
p1 = tour[i] - 1
p2 = tour[i + 1] - 1
plt.plot((x[p1],x[p2]),(y[p1],y[p2]))
plt.show()
TSP Case Minimal Tour

Python Code Illustrated

Init Graph Edges

Based on previous top down version, several changes are made. First, we need to have an edge between every 2 vertices and due to our matrix representation of the directed edge, edges of 2 directions are initialized.

{linenos
1
2
3
4
5
6
7
8
g: Graph = Graph(N)
for v in range(N):
for u in range(N):
diff_x = coordinates[v][0] - coordinates[u][0]
diff_y = coordinates[v][1] - coordinates[u][1]
dist: float = math.sqrt(diff_x * diff_x + diff_y * diff_y)
g.setDist(u, v, dist)
g.setDist(v, u, dist)

Auxilliary Variable to Track Tour Vertices

One major enhancement is to record the optimal tour during enumerating. We introduce another variable parent[bitstate][v] to track next vertex u, with shortest path.

{linenos
1
2
3
4
5
6
7
8
9
10
11
ret: float = FLOAT_INF
u_min: int = -1
for u in range(self.g.v_num):
if (state & (1 << u)) == 0:
s: float = self._recurse(u, state | 1 << u)
if s + edges[v][u] < ret:
ret = s + edges[v][u]
u_min = u
dp[state][v] = ret
self.parent[state][v] = u_min

After minimal tour distance is found, one optimal tour is formed with the help of parent variable.

{linenos
1
2
3
4
5
6
7
8
9
def _form_tour(self):
self.tour = [0]
bit = 0
v = 0
for _ in range(self.g.v_num - 1):
v = self.parent[bit][v]
self.tour.append(v)
bit = bit | (1 << v)
self.tour.append(0)

Note that for each test case, only one tour is given after "output". Our code may form a different tour but it has same distance as what the dataset generates, which can be verified by following code snippet. See full code on github.

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
tsp: TSPSolver = TSPSolver(g)
tsp.solve()

output_dist: float = 0.0
output_tour = list(map(lambda x: int(x) - 1, output.split(' ')))
for v in range(1, len(output_tour)):
pre_v = output_tour[v-1]
curr_v = output_tour[v]
diff_x = coordinates[pre_v][0] - coordinates[curr_v][0]
diff_y = coordinates[pre_v][1] - coordinates[curr_v][1]
dist: float = math.sqrt(diff_x * diff_x + diff_y * diff_y)
output_dist += dist

passed = abs(tsp.dist - output_dist) < 10e-5
if passed:
print(f'passed dist={tsp.tour}')
else:
print(f'Min Tour Distance = {output_dist}, Computed Tour Distance = {tsp.dist}, Expected Tour = {output_tour}, Result = {tsp.tour}')

Travelling salesman problem (TSP) is a classic NP hard computer algorithmic problem. In this series, we will first solve TSP problem in an exact manner by ACing TSP on aizu with dynamic programming, and then move on to train a Pointer Network with Pytorch to obtain an approximate solution with deep learning and reinforcement learning technology. Complete episodes are listed as follows:

TSP Problem Review

TSP can be modelled as a graph problem where both directed and undirected graphs and both completely or partially connected graphs are applicable. The following picture in Wikipedia TSP is an undirected but complete TSP with four vertices, A, B, C, D. TSP requries a tour with minimal total distance, starting from arbitrarily picked vertex and ending with the same node while covering all vertices exactly once. For example, \(A \rightarrow B \rightarrow C \rightarrow D \rightarrow A\) and \(A \rightarrow C \rightarrow B \rightarrow D \rightarrow A\) are valid tours and among all tours there is only one minimal distance value (though multiple tours with same minimum may exist).

Wikipedia 4 Vertices Example
Despite different types of graphs, notice that we can always employ an adjacency matrix to represent a graph. The above graph can thus be represented by this matrix

\[ \begin{matrix} & \begin{matrix}A&B&C&D\end{matrix} \\ \begin{matrix}A\\B\\C\\D\end{matrix} & \begin{bmatrix}-&20&42&35\\20&-&30&34\\42&30&-&12\\35&34&12&-\end{bmatrix}\\ \end{matrix} \]

Of course, typically, TSP problem takes the form of n cooridanates in a plane, corresponding to complete and undirected graph, because in plane every pair of vertices has one connected edge and the edge has same distance in both directions.

AIZU TSP Online Judge

AIZU has a TSP problem where a directed and incomplete graph with V vertices and E directed edges is given, and the output expects minimal total distance. For example below having 4 vertices and 6 edges.

This test case has minimal tour distance 16, with corresponding tour being \(0\rightarrow1\rightarrow3\rightarrow2\rightarrow0\), as shown in red edges. However, the AIZU problem may not have a valid result because not every pair of vertices is guaranteed to be connected. In that case, -1 is required, which can also be interpreted as infinity.

Brute Force Solution

A naive way is to enumerate all possible routes starting from vertex 0 and keep minimal total distance ever generated. Python code below illustrates a 4 point vertices graph.

{linenos
1
2
3
4
5
from itertools import permutations
v = [1,2,3]
p = permutations(v)
for t in list(p):
print([0] + list(t) + [0])

The possible routes are

{linenos
1
2
3
4
5
6
[0, 1, 2, 3, 0]
[0, 1, 3, 2, 0]
[0, 2, 1, 3, 0]
[0, 2, 3, 1, 0]
[0, 3, 1, 2, 0]
[0, 3, 2, 1, 0]
This approach has a runtime complexity of O(\(n!\)), which won't pass AIZU.

Dynamic Programming

To AC AIZU TSP, we need to have acceleration of the factorial runtime complexity by using bitmask dynamic programming. First, let us map visited state to a binary value. In the 4 vertices case, it's "0110" if node 2 and 1 already visited and ending at node 1. Besides, we need to track current vertex to start from. So we extend dp from one dimension to two dimensions \(dp[bitstate][v]\). In the example, it's \(dp["0110"][1]\). The transition formula is given by \[ dp[bitstate][v] = \min ( dp[bitstate \cup \{u\}][u] + dist(v,u) \mid u \notin bitstate ) \]

The resulting time complexity is O(\(n^2*2^n\) ), since there are \(2^n * n\) total states and for each state one more round loop is needed. Factorial and exponential functions are significantly different.

\(n!\) \(n^2*2^n\)
n=8 40320 16384
n=10 3628800 102400
n=12 479001600 589824
n=14 87178291200 3211264

Pause a second and think about why bitmask DP works here. Notice there are lots of redundant sub calls, one of which is hightlighted in red ellipse below.

In this episode, a straightforward top down memoization DP version is given in Python 3 and Java 8. Benefit of top down DP approach is that we don't need to consider topological ordering when permuting all states. Notice that there is a trick in Java, where each element of dp is initialized as Integer.MAX_VALUE, so that only one statement is needed to update new dp value.

1
res = Math.min(res, s + g.edges[v][u]);
However, the code simplicity is at cost of clarity and care should be taken when dealing with actual INF (not reachable case). In python version, we could have used the same trick, perhaps by intializing with a large long value representing INF. But for clarity, we manually handle different cases in if-else statements and mark intial value as -1 (INT_INF).

1
2
3
4
5
6
7
INT_INF = -1

if s != INT_INF and edges[v][u] != INT_INF:
if ret == INT_INF:
ret = s + edges[v][u]
else:
ret = min(ret, s + edges[v][u])

Below is complete AC code in Python 3 and Java 8. Also can be downloaded on github.

AIZU Java 8 Recursive Version

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// passed http://judge.u-aizu.ac.jp/onlinejudge/description.jsp?id=DPL_2_A
import java.util.Arrays;
import java.util.Scanner;

public class Main {
public static class Graph {
public final int V_NUM;
public final int[][] edges;

public Graph(int V_NUM) {
this.V_NUM = V_NUM;
this.edges = new int[V_NUM][V_NUM];
for (int i = 0; i < V_NUM; i++) {
Arrays.fill(this.edges[i], Integer.MAX_VALUE);
}
}

public void setDist(int src, int dest, int dist) {
this.edges[src][dest] = dist;
}

}

public static class TSP {
public final Graph g;
long[][] dp;

public TSP(Graph g) {
this.g = g;
}

public long solve() {
int N = g.V_NUM;
dp = new long[1 << N][N];
for (int i = 0; i < dp.length; i++) {
Arrays.fill(dp[i], -1);
}

long ret = recurse(0, 0);
return ret == Integer.MAX_VALUE ? -1 : ret;
}

private long recurse(int state, int v) {
int ALL = (1 << g.V_NUM) - 1;
if (dp[state][v] >= 0) {
return dp[state][v];
}
if (state == ALL && v == 0) {
dp[state][v] = 0;
return 0;
}
long res = Integer.MAX_VALUE;
for (int u = 0; u < g.V_NUM; u++) {
if ((state & (1 << u)) == 0) {
long s = recurse(state | 1 << u, u);
res = Math.min(res, s + g.edges[v][u]);
}
}
dp[state][v] = res;
return res;

}

}

public static void main(String[] args) {

Scanner in = new Scanner(System.in);
int V = in.nextInt();
int E = in.nextInt();
Graph g = new Graph(V);
while (E > 0) {
int src = in.nextInt();
int dest = in.nextInt();
int dist = in.nextInt();
g.setDist(src, dest, dist);
E--;
}
System.out.println(new TSP(g).solve());
}
}

AIZU Python 3 Recursive Version

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from typing import List

INT_INF = -1

class Graph:
v_num: int
edges: List[List[int]]

def __init__(self, v_num: int):
self.v_num = v_num
self.edges = [[INT_INF for c in range(v_num)] for r in range(v_num)]

def setDist(self, src: int, dest: int, dist: int):
self.edges[src][dest] = dist


class TSPSolver:
g: Graph
dp: List[List[int]]

def __init__(self, g: Graph):
self.g = g
self.dp = [[None for c in range(g.v_num)] for r in range(1 << g.v_num)]

def solve(self) -> int:
return self._recurse(0, 0)

def _recurse(self, v: int, state: int) -> int:
"""

:param v:
:param state:
:return: -1 means INF
"""
dp = self.dp
edges = self.g.edges

if dp[state][v] is not None:
return dp[state][v]

if (state == (1 << self.g.v_num) - 1) and (v == 0):
dp[state][v] = 0
return dp[state][v]

ret: int = INT_INF
for u in range(self.g.v_num):
if (state & (1 << u)) == 0:
s: int = self._recurse(u, state | 1 << u)
if s != INT_INF and edges[v][u] != INT_INF:
if ret == INT_INF:
ret = s + edges[v][u]
else:
ret = min(ret, s + edges[v][u])
dp[state][v] = ret
return ret


def main():
V, E = map(int, input().split())
g: Graph = Graph(V)
for _ in range(E):
src, dest, dist = map(int, input().split())
g.setDist(src, dest, dist)

tsp: TSPSolver = TSPSolver(g)
print(tsp.solve())


if __name__ == "__main__":
main()

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×