#PyTorch

本篇是深度强化学习动手系列文章,自MyEncyclopedia公众号文章深度强化学习之:DQN训练超级玛丽闯关发布后收到不少关注和反馈,这一期,让我们实现目前主流深度强化学习算法PPO来打另一个红白机经典游戏1942。

相关文章链接如下:

强化学习开源环境集

视频论文解读:PPO算法

视频论文解读:组合优化的强化学习方法

解读TRPO论文,深度强化学习结合传统优化方法

解读深度强化学习基石论文:函数近似的策略梯度方法

NES 1942 环境安装

红白机游戏环境可以由OpenAI Retro来模拟,OpenAI Retro还在 Gym 集成了其他的经典游戏环境,包括Atari 2600,GBA,SNES等。

不过,受到版权原因,除了一些基本的rom,大部分游戏需要自行获取rom。

环境准备部分相关代码如下

1
pip install gym-retro
1
python -m retro.import /path/to/your/ROMs/directory/

OpenAI Gym 输入动作类型

在创建 retro 环境时,可以在retro.make中通过参数use_restricted_actions指定 action space,即按键的配置。

1
env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)

可选参数如下,FILTERED,DISCRETE和MULTI_DISCRETE 都可以指定过滤的动作,过滤动作需要通过配置文件加载。

1
2
3
4
5
6
7
8
class Actions(Enum):
"""
Different settings for the action space of the environment
"""
ALL = 0 #: MultiBinary action space with no filtered actions
FILTERED = 1 #: MultiBinary action space with invalid or not allowed actions filtered out
DISCRETE = 2 #: Discrete action space for filtered actions
MULTI_DISCRETE = 3 #: MultiDiscete action space for filtered actions

DISCRETE和MULTI_DISCRETE 是 Gym 里的 Action概念,它们的基类都是gym.spaces.Space,可以通过 sample()方法采样,下面具体一一介绍。

  • Discrete:对应一维离散空间,例如,Discrete(n=4) 表示 [0, 3] 范围的整数。
1
2
3
from gym.spaces import Discrete
space = Discrete(4)
print(space.sample())

输出是

1
3
  • Box:对应多维连续空间,每一维的范围可以用 [low,high] 指定。 举例,Box(low=-1.0, high=2, shape=(3, 4,), dtype=np.float32) 表示 shape 是 [3, 4],每个范围在 [-1, 2] 的float32型 tensor。
1
2
3
4
from gym.spaces import Box
import numpy as np
space = Box(low=-1.0, high=2.0, shape=(3, 4), dtype=np.float32)
print(space.sample())

输出是

1
2
3
[[-0.7538084   0.96901214  0.38641307 -0.05045208]
[-0.85486996 1.3516271 0.3222616 1.2540635 ]
[-0.29908678 -0.8970335 1.4869047 0.7007356 ]]

  • MultiBinary: 0或1的多维离散空间。例如,MultiBinary([3,2]) 表示 shape 是3x2的0或1的tensor。
    1
    2
    3
    from gym.spaces import MultiBinary
    space = MultiBinary([3,2])
    print(space.sample())

输出是

1
2
3
[[1 0]
[1 1]
[0 0]]
  • MultiDiscrete:多维整型离散空间。例如,MultiDiscrete([5,2,2]) 表示三维Discrete空间,第一维范围在 [0-4],第二,三维范围在[0-1]。
1
2
3
from gym.spaces import MultiDiscrete
space = MultiDiscrete([5,2,2])
print(space.sample())

输出是

1
[2 1 0]
  • Tuple:组合成 tuple 复合空间。举例来说,可以将 Box,Discrete,Discrete组成tuple 空间:Tuple(spaces=(Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32), Discrete(n=3), Discrete(n=2)))
1
2
3
4
from gym.spaces import *
import numpy as np
space = Tuple(spaces=(Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32), Discrete(n=3), Discrete(n=2)))
print(space.sample())

输出是

1
2
(array([ 0.22640526,  0.75286865, -0.6309239 ], dtype=float32), 0, 1)

  • Dict:组合成有名字的复合空间。例如,Dict({'position':Discrete(2), 'velocity':Discrete(3)})
    1
    2
    3
    from gym.spaces import *
    space = Dict({'position':Discrete(2), 'velocity':Discrete(3)})
    print(space.sample())

输出是

1
OrderedDict([('position', 1), ('velocity', 1)])

NES 1942 动作空间配置

了解了 gym/retro 的动作空间,我们来看看1942的默认动作空间

1
2
env = retro.make(game='1942-Nes')
print("The size of action is: ", env.action_space.shape)

1
The size of action is:  (9,)

表示有9个 Discrete 动作,包括 start, select这些控制键。

从训练1942角度来说,我们希望指定最少的有效动作取得最好的成绩。根据经验,我们知道这个游戏最重要的键是4个方向加上 fire 键。限定游戏动作空间,官方的做法是在创建游戏环境时,指定预先生成的动作输入配置文件。但是这个方式相对麻烦,我们采用了直接指定按键的二进制表示来达到同样的目的,此时,需要设置 use_restricted_actions=retro.Actions.FILTERED。

下面的代码限制了6种按键,并随机play。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
action_list = [
# No Operation
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# Left
[0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
# Right
[0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
# Down
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
# Up
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
# B
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
]

def random_play(env, action_list, sleep_seconds=0.01):
env.viewer = None
state = env.reset()
score = 0
for j in range(10000):
env.render()
time.sleep(sleep_seconds)
action = np.random.randint(len(action_list))

next_state, reward, done, _ = env.step(action_list[action])
state = next_state
score += reward
if done:
print("Episode Score: ", score)
env.reset()
break

env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)
random_play(env, action_list)

来看看其游戏效果,全随机死的还是比较快。

图像输入处理

一般对于通过屏幕像素作为输入的RL end-to-end训练来说,对图像做预处理很关键。因为原始图像较大,一方面我们希望能尽量压缩图像到比较小的tensor,另一方面又要保证关键信息不丢失,比如子弹的图像不能因为图片缩小而消失。另外的一个通用技巧是将多个连续的frame合并起来组成立体的frame,这样可以有效表示连贯动作。

下面的代码通过 pipeline 将游戏每帧原始图像从shape (224, 240, 3) 转换成 (4, 84, 84),也就是原始的 width=224,height=240,rgb=3转换成 width=84,height=240,stack_size=4的黑白图像。具体 pipeline为

  1. MaxAndSkipEnv:每两帧过滤一帧图像,减少数据量。

  2. FrameDownSample:down sample 图像到指定小分辨率 84x84,并从彩色降到黑白。

  3. FrameBuffer:合并连续的4帧,形成 (4, 84, 84) 的图像输入

1
2
3
4
5
6
7
def build_env():
env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)
env = MaxAndSkipEnv(env, skip=2)
env = FrameDownSample(env, (1, -1, -1, 1))
env = FrameBuffer(env, 4)
env.seed(0)
return env

观察图像维度变换

1
2
3
4
5
env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)
print("Initial shape: ", env.observation_space.shape)

env = build_env(env)
print("Processed shape: ", env.observation_space.shape)

确保shape 从 (224, 240, 3) 转换成 (4, 84, 84)

1
2
Initial shape:  (224, 240, 3)
Processed shape: (4, 84, 84)

FrameDownSample实现如下,我们使用了 cv2 类库来完成黑白化和图像缩放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class FrameDownSample(ObservationWrapper):
def __init__(self, env, exclude, width=84, height=84):
super(FrameDownSample, self).__init__(env)
self.exclude = exclude
self.observation_space = Box(low=0,
high=255,
shape=(width, height, 1),
dtype=np.uint8)
self._width = width
self._height = height

def observation(self, observation):
# convert image to gray scale
screen = cv2.cvtColor(observation, cv2.COLOR_RGB2GRAY)

# crop screen [up: down, left: right]
screen = screen[self.exclude[0]:self.exclude[2], self.exclude[3]:self.exclude[1]]

# to float, and normalized
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255

# resize image
screen = cv2.resize(screen, (self._width, self._height), interpolation=cv2.INTER_AREA)
return screen

MaxAndSkipEnv,每两帧过滤一帧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class MaxAndSkipEnv(Wrapper):
def __init__(self, env=None, skip=4):
super(MaxAndSkipEnv, self).__init__(env)
self._obs_buffer = deque(maxlen=2)
self._skip = skip

def step(self, action):
total_reward = 0.0
done = None
for _ in range(self._skip):
obs, reward, done, info = self.env.step(action)
self._obs_buffer.append(obs)
total_reward += reward
if done:
break
max_frame = np.max(np.stack(self._obs_buffer), axis=0)
return max_frame, total_reward, done, info

def reset(self):
self._obs_buffer.clear()
obs = self.env.reset()
self._obs_buffer.append(obs)
return obs

FrameBuffer,将最近的4帧合并起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class FrameBuffer(ObservationWrapper):
def __init__(self, env, num_steps, dtype=np.float32):
super(FrameBuffer, self).__init__(env)
obs_space = env.observation_space
self._dtype = dtype
self.observation_space = Box(low=0, high=255, shape=(num_steps, obs_space.shape[0], obs_space.shape[1]), dtype=self._dtype)

def reset(self):
frame = self.env.reset()
self.buffer = np.stack(arrays=[frame, frame, frame, frame])
return self.buffer

def observation(self, observation):
self.buffer[:-1] = self.buffer[1:]
self.buffer[-1] = observation
return self.buffer

最后,visualize 处理后的图像,同样还是在随机play中,确保关键信息不丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def random_play_preprocessed(env, action_list, sleep_seconds=0.01):
import matplotlib.pyplot as plt

env.viewer = None
state = env.reset()
score = 0
for j in range(10000):
time.sleep(sleep_seconds)
action = np.random.randint(len(action_list))

plt.imshow(state[-1], cmap="gray")
plt.title('Pre Processed image')
plt.pause(sleep_seconds)

next_state, reward, done, _ = env.step(action_list[action])
state = next_state
score += reward
if done:
print("Episode Score: ", score)
env.reset()
break

matplotlib 动画输出

CNN Actor & Critic

Actor 和 Critic 模型相同,输入是 (4, 84, 84) 的图像,输出是 [0, 5] 的action index。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Actor(nn.Module):
def __init__(self, input_shape, num_actions):
super(Actor, self).__init__()
self.input_shape = input_shape
self.num_actions = num_actions

self.features = nn.Sequential(
nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU()
)

self.fc = nn.Sequential(
nn.Linear(self.feature_size(), 512),
nn.ReLU(),
nn.Linear(512, self.num_actions),
nn.Softmax(dim=1)
)

def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
dist = Categorical(x)
return dist

PPO核心代码

先计算 \(r_t(\theta)\),这里采用了一个技巧,对 \(\pi_\theta\) 取 log,相减再取 exp,这样可以增强数值稳定性。

1
2
3
4
dist = self.actor_net(state)
new_log_probs = dist.log_prob(action)
ratio = (new_log_probs - old_log_probs).exp()
surr1 = ratio * advantage

surr1 对应PPO论文中的 \(L^{CPI}\)

然后计算 surr2,对应 \(L^{CLIP}\) 中的 clip 部分,clip可以由 torch.clamp 函数实现。\(L^{CLIP}\) 则对应 actor_loss。

1
2
surr2 = torch.clamp(ratio, 1.0 - self.clip_param, 1.0 + self.clip_param) * advantage
actor_loss = - torch.min(surr1, surr2).mean()

最后,计算总的 loss \(L_t^{CLIP+VF+S}\),包括 actor_loss,critic_loss 和 policy的 entropy。

1
2
3
4
entropy = dist.entropy().mean()

critic_loss = (return_ - value).pow(2).mean()
loss = actor_loss + 0.5 * critic_loss - 0.001 * entropy

上述完整代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
for _ in range(self.ppo_epoch):
for state, action, old_log_probs, return_, advantage in sample_batch():
dist = self.actor_net(state)
value = self.critic_net(state)

entropy = dist.entropy().mean()
new_log_probs = dist.log_prob(action)

ratio = (new_log_probs - old_log_probs).exp()
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1.0 - self.clip_param, 1.0 + self.clip_param) * advantage

actor_loss = - torch.min(surr1, surr2).mean()
critic_loss = (return_ - value).pow(2).mean()

loss = actor_loss + 0.5 * critic_loss - 0.001 * entropy

# Minimize the loss
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.step()

补充一下 GAE 的计算,advantage 根据公式

可以转换成如下代码

1
2
3
4
5
6
7
8
9
def compute_gae(self, next_value):
gae = 0
returns = []
values = self.values + [next_value]
for step in reversed(range(len(self.rewards))):
delta = self.rewards[step] + self.gamma * values[step + 1] * self.masks[step] - values[step]
gae = delta + self.gamma * self.tau * self.masks[step] * gae
returns.insert(0, gae + values[step])
return returns

外层 Training 代码

外层调用代码基于随机 play 的逻辑,agent.act()封装了采样和 forward prop,agent.step() 则封装了 backprop 和参数学习迭代的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for i_episode in range(start_epoch + 1, n_episodes + 1):
state = env.reset()
score = 0
timestamp = 0

while timestamp < 10000:
action, log_prob, value = agent.act(state)
next_state, reward, done, info = env.step(action_list[action])
score += reward
timestamp += 1

agent.step(state, action, value, log_prob, reward, done, next_state)
if done:
break
else:
state = next_state

训练结果

让我们来看看学习的效果吧,注意我们的飞机学到了一些关键的技巧,躲避子弹;飞到角落尽快击毙敌机;一定程度预测敌机出现的位置并预先走到位置。

上一期 MyEncyclopedia公众号文章 从Q-Learning 演化到 DQN,我们从原理上讲解了DQN算法,这一期,让我们通过代码来实现任天堂游戏机中经典的超级玛丽的自动通关吧。本文所有代码在 https://github.com/MyEncyclopedia/reinforcement-learning-2nd/tree/master/super_mario。

DQN 算法回顾

上期详细讲解了DQN中的两个重要的技术:Target Network 和 Experience Replay,正是有了它们才使得 Deep Q Network在实战中容易收敛,以下是Deepmind 发表在Nature 的 Human-level control through deep reinforcement learning 的完整算法流程。

超级玛丽 NES OpenAI 环境

安装基于OpenAI gym的超级玛丽环境执行下面的 pip 命令即可。

1
pip install gym-super-mario-bros

我们先来看一下游戏环境的输入和输出。下面代码采用随机的action来和游戏交互。有了 组合游戏系列3: 井字棋、五子棋的OpenAI Gym GUI环境 对于OpenAI Gym 接口的介绍,现在对于其基本的交互步骤已经不陌生了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import gym_super_mario_bros
from random import random, randrange
from gym_super_mario_bros.actions import RIGHT_ONLY
from nes_py.wrappers import JoypadSpace
from gym import wrappers

env = gym_super_mario_bros.make('SuperMarioBros-v0')
env = JoypadSpace(env, RIGHT_ONLY)

# Play randomly
done = False
env.reset()

step = 0
while not done:
action = randrange(len(RIGHT_ONLY))
state, reward, done, info = env.step(action)
print(done, step, info)
env.render()
step += 1

env.close()

游戏render效果如下

。。。

注意我们在游戏环境初始化的时候用了参数 RIGHT_ONLY,它定义成五种动作的list,表示仅使用右键的一些组合,适用于快速训练来完成Mario第一关。

1
2
3
4
5
6
7
RIGHT_ONLY = [
['NOOP'],
['right'],
['right', 'A'],
['right', 'B'],
['right', 'A', 'B'],
]

观察一些 info 输出内容,coins表示金币获得数量,flag_get 表示是否取得最后的旗子,time 剩余时间,以及 Mario 大小状态和所在的 x,y位置。

1
2
3
4
5
6
7
8
9
10
11
12
{
"coins":0,
"flag_get":False,
"life":2,
"score":0,
"stage":1,
"status":"small",
"time":381,
"world":1,
"x_pos":594,
"y_pos":89
}

游戏图像处理

Deep Reinforcement Learning 一般是 end-to-end learning,意味着游戏的 screen image 作为observation直接视为真实状态,喂给神经网络训练。于此相反的另一种做法是,通过游戏环境拿到内部状态,例如所有相关物品的位置和属性作为模型输入。这两种方式的区别有两点。第一点,用观察到的屏幕像素代替真正的状态 s,在partially observable 的环境时可能因为 non-stationarity 导致无法很好的工作,而拿内部状态利用了额外的作弊信息,在partially observable环境中也可以工作。第二点,第一种方式屏幕像素维度比较高,输入数据量大,需要神经网络的大量训练拟合,第二种方式,内部真实状态往往维度低得多,训练起来很快,但缺点是因为除了内部状态往往还需要游戏相关规则作为输入,因此generalization能力不如前者强。

这里,我们当然采样屏幕像素的 end-to-end 方式了,自然首要任务是将游戏帧图像有效处理。超级玛丽游戏环境的屏幕输出是 (240, 256, 3) shape的 numpy array,通过下面一系列的转换,尽可能的在不影响训练效果的情况下减小采样到的数据量。

  1. MaxAndSkipFrameWrapper:每4个frame连在一起,采取同样的动作,降低frame数量。

  2. FrameDownsampleWrapper:将原始的 (240, 256, 3) down sample 到 (84, 84, 1)

  3. ImageToPyTorchWrapper:转换成适合 pytorch 的 (1, 84, 84) shape

  4. FrameBufferWrapper:保存最后4次屏幕采样

  5. NormalizeFloats:Normalize 成 [0., 1.0] 的浮点值

1
2
3
4
5
6
7
8
9
def wrap_environment(env_name: str, action_space: list) -> Wrapper:
env = make(env_name)
env = JoypadSpace(env, action_space)
env = MaxAndSkipFrameWrapper(env)
env = FrameDownsampleWrapper(env)
env = ImageToPyTorchWrapper(env)
env = FrameBufferWrapper(env, 4)
env = NormalizeFloats(env)
return env

CNN 模型

模型比较简单,三个卷积层后做 softmax输出,输出维度数为离散动作数。act() 采用了epsilon-greedy 模式,即在epsilon小概率时采取随机动作来 explore,大于epsilon时采取估计的最可能动作来 exploit。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class DQNModel(nn.Module):
def __init__(self, input_shape, num_actions):
super(DQNModel, self).__init__()
self._input_shape = input_shape
self._num_actions = num_actions

self.features = nn.Sequential(
nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU()
)

self.fc = nn.Sequential(
nn.Linear(self.feature_size, 512),
nn.ReLU(),
nn.Linear(512, num_actions)
)

def forward(self, x):
x = self.features(x).view(x.size()[0], -1)
return self.fc(x)

def act(self, state, epsilon, device):
if random() > epsilon:
state = torch.FloatTensor(np.float32(state)).unsqueeze(0).to(device)
q_value = self.forward(state)
action = q_value.max(1)[1].item()
else:
action = randrange(self._num_actions)
return action

Experience Replay 缓存

实现采用了 Pytorch CartPole DQN 的官方代码,本质是一个最大为 capacity 的 list 保存采样的 (s, a, r, s', is_done) 五元组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done'))

class ReplayMemory:

def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0

def push(self, *args):
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position + 1) % self.capacity

def sample(self, batch_size):
return random.sample(self.memory, batch_size)

def __len__(self):
return len(self.memory)

DQNAgent

我们将 DQN 的逻辑封装在 DQNAgent 类中。DQNAgent 成员变量包括两个 DQNModel,一个ReplayMemory。

train() 方法中会每隔一定时间将 Target Network 的参数同步成现行Network的参数。在td_loss_backprop()方法中采样 ReplayMemory 中的五元组,通过minimize TD error方式来改进现行 Network 参数 \(\theta\)。Loss函数为:

\[ L\left(\theta_{i}\right)=\mathbb{E}_{\left(s, a, r, s^{\prime}\right) \sim \mathrm{U}(D)}\left[\left(r+\gamma \max _{a^{\prime}} Q_{target}\left(s^{\prime}, a^{\prime} ; \theta_{i}^{-}\right)-Q\left(s, a ; \theta_{i}\right)\right)^{2}\right] \]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class DQNAgent():

def act(self, state, episode_idx):
self.update_epsilon(episode_idx)
action = self.model.act(state, self.epsilon, self.device)
return action

def process(self, episode_idx, state, action, reward, next_state, done):
self.replay_mem.push(state, action, reward, next_state, done)
self.train(episode_idx)

def train(self, episode_idx):
if len(self.replay_mem) > self.initial_learning:
if episode_idx % self.target_update_frequency == 0:
self.target_model.load_state_dict(self.model.state_dict())
self.optimizer.zero_grad()
self.td_loss_backprop()
self.optimizer.step()

def td_loss_backprop(self):
transitions = self.replay_mem.sample(self.batch_size)
batch = Transition(*zip(*transitions))

state = Variable(FloatTensor(np.float32(batch.state))).to(self.device)
action = Variable(LongTensor(batch.action)).to(self.device)
reward = Variable(FloatTensor(batch.reward)).to(self.device)
next_state = Variable(FloatTensor(np.float32(batch.next_state))).to(self.device)
done = Variable(FloatTensor(batch.done)).to(self.device)

q_values = self.model(state)
next_q_values = self.target_net(next_state)

q_value = q_values.gather(1, action.unsqueeze(-1)).squeeze(-1)
next_q_value = next_q_values.max(1)[0]
expected_q_value = reward + self.gamma * next_q_value * (1 - done)

loss = (q_value - expected_q_value.detach()).pow(2)
loss = loss.mean()
loss.backward()

外层 Training 代码

最后是外层调用代码,基本和以前文章一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def train(env, args, agent):
for episode_idx in range(args.num_episodes):
episode_reward = 0.0
state = env.reset()

while True:
action = agent.act(state, episode_idx)
if args.render:
env.render()
next_state, reward, done, stats = env.step(action)
agent.process(episode_idx, state, action, reward, next_state, done)
state = next_state
episode_reward += reward
if done:
print(f'{episode_idx}: {episode_reward}')
break

本篇是TSP问题从DP算法到深度学习系列第三篇,在这一篇中,我们会开始进入深度学习领域来求近似解法。本文会介绍并实现指针网络(Pointer Networks),一种seq-to-seq模型,它的设计目的就是为了解决TSP问题或者凸包(Convex Hull)问题。本文代码在 https://github.com/MyEncyclopedia/blog/tree/master/tsp/ptr_net_pytorch 中。

Pointer Networks

随着深度学习 seq-to-seq 模型作为概率近似模型在各领域的成功,TSP问题似乎也可以用同样的思路去解决。然而,传统的seq-to-seq 模型其输出的类别是预先固定的。例如,NLP RNN生成模型每一步会从 \(|V|\) 大的词汇表中产生一个单词。 然而,有很大一类问题,譬如TSP问题、凸包(Convex Hull)问题、Delaunay三角剖分问题,输出的类别不是事先固定的,而是随着输入而变化的。 Pointer Networks 的出现解决了这种限制:输出的类别可以通过指向某个输入,以此克服类别的问题,因此形象地取名为指针网络(Pointer Networks)。先来看看原论文中提到的三个问题。

凸包问题(Convex Hull)

如下图所示,需要在给定的10个点中找到若干个点,使得这些点包住了所有点。问题输入是不确定个数 n 个点的位置信息,输出是 k (k<=n)个点的。 这个经典的算法问题已经被证明找出精确解等价于排序问题(wikipedia 链接),因此时间复杂度为 \(O(n*log(n))\)

image info

\[ \begin{align*} &\text{Input: } \mathcal{P} &=& \left\{ P_{1}, \ldots, P_{10} \right\} \\ &\text{Output: } C^{\mathcal{P}} &=& \{2,4,3,5,6,7,2\} \end{align*} \]

TSP 问题

TSP 和凸包问题很类似,输入为不确定个数的 n 个点信息,输出为这 n 个点的某序列。在。。。中,我们可以将确定解的时间复杂度从 \(O(n!)\) 降到 \(O(n^2*2^n)\)

image info

\[ \begin{align*} &\text{Input: } \mathcal{P} &= &\left\{P_{1}, \ldots, P_{6} \right\} \\ &\text{Output: } C^{\mathcal{P}} &=& \{1,3,2,4,5,6,1\} \end{align*} \]

Delaunay三角剖分

Delaunay三角剖分问题是将平面上的散点集划分成三角形,使得在可能形成的三角剖分中,所形成的三角形的最小角最大。这个问题的输出是若干个集合,每个集合代表一个三角形,由输入点的编号表示。 image info

\[ \begin{align*} &\text{Input: } \mathcal{P} &=& \left\{P_{1}, \ldots, P_{5} \right\} \\ &\text{Output: } C^{\mathcal{P}} &=& \{(1,2,4),(1,4,5),(1,3,5),(1,2,3)\} \end{align*} \]

Seq-to-Seq 模型

现在假设n是固定的,传统基本的seq-to-seq模型(参数部分记为 \(\theta\) ),训练数据若记为\((\mathcal{P}, C^{\mathcal{P}})\),,将拟合以下条件概率:

\[ \begin{equation} p\left(\mathcal{C}^{\mathcal{P}} | \mathcal{P} ; \theta\right)=\prod_{i=1}^{m(\mathcal{P})} p\left(C_{i} | C_{1}, \ldots, C_{i-1}, \mathcal{P} ; \theta\right) \end{equation} \] 训练的方向是找到 \(\theta^{*}\) 来最大化上述联合概率,即: \[ \begin{equation} \theta^{*}=\underset{\theta}{\arg \max } \sum_{\mathcal{P}, \mathcal{C}^{\mathcal{P}}} \log p\left(\mathcal{C}^{\mathcal{P}} | \mathcal{P} ; \theta\right) \end{equation} \]

Content Based Input Attention

一种增强基本seq-to-seq模型的方法是加入attention机制。记encoder和decoder隐藏状态分别是 $ (e_{1}, , e_{n}) $ 和 $ (d_{1}, , d_{m()}) $。seq-to-seq第 i 次输出了 \(d_i\),注意力机制额外计算第i步的注意力向量 \(d_i^{\prime}\),并将其和\(d_i\)连接后作为隐藏状态。\(d_i^{\prime}\)的计算方式如下,输入 $ (e_{1}, , e_{n}) $ 和 i 对应的权重向量 $ (a_{1}^{i}, , a_{n}^{i}) $做点乘。

\[ d_{i} = \sum_{j=1}^{n} a_{j}^{i} e_{j} \]

$ (a_{1}^{i}, , a_{n}^{i}) $ 是向量 $ (u_{1}^{i}, , u_{n}^{i}) $ softmax后的值, \(u_{j}^{i}\) 表示 \(d_{i}\)\(e_{j}\)的距离,Pointer Networks论文中的距离为如下的tanh公式。

\[ \begin{eqnarray} u_{j}^{i} &=& v^{T} \tanh \left(W_{1} e_{j}+W_{2} d_\right) \quad j \in(1, \ldots, n) \\ a_{j}^{i} &=& \operatorname{softmax}\left(u_{j}^{i}\right) \quad j \in(1, \ldots, n) \end{eqnarray} \]

更多Attention计算方式

FloydHub Blog - Attention Mechanism 中,作者清楚地解释了两种经典的attention方法,第一种称为Additive Attention,由Dzmitry Bahdanau 提出,也就是Pointer Networks中通过tanh的计算方式,第二种称为 Multiplicative Attention,由Thang Luong*提出。

Luong Attention 有三种方法计算 \(d_{i}\)\(e_{j}\) 的距离(或者可以认为向量间的对齐得分)。

\[ \operatorname{score} \left( d_i, e_j \right)= \begin{cases} d_i^{\top} e_j & \text { dot } \\ d_i^{\top} W_a e_j & \text { general } \\ v_a^{\top} \tanh \left( W_a \left[ d_i ; e_j \right] \right) & \text { concat } \end{cases} \]

Pointer Networks

image info

Pointer Networks 基于Additive Attention,其创新之处在于用 \(u^i_j\) 作为第j个输入的评分,即第 i 次输出为1-n个输入中 \(u^i_j\) 得分最高的j作为输出,这样巧妙的解决了n不是预先固定的限制。

\[ \begin{eqnarray*} u_{j}^{i} &=& v^{T} \tanh \left(W_{1} e_{j}+W_{2} d_{i}\right) \quad j \in(1, \ldots, n) \\ p\left(C_{i} | C_{1}, \ldots, C_{i-1}, \mathcal{P}\right) &=& \operatorname{softmax}\left(u^{i}\right) \end{eqnarray*} \]

PyTorch 代码实现

在本系列第二篇 episode 2,中,我们说明过TSP数据集的格式,每一行字段意义如下

1
x0, y0, x1, y1, ... output 1 v1 v2 v3 ... 1

转换成PyTorch Dataset

每一个case会转换成nd.ndarray,共有五个分量,分别是 (input, input_len, output_in, output_out, output_len) 并且分装成pytorch的 Dataset类。

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
from torch.utils.data import Dataset

class TSPDataset(Dataset):
"each data item of form (input, input_len, output_in, output_out, output_len)"
data: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]]

def __len__(self):
return len(self.data)

def __getitem__(self, index):
input, input_len, output_in, output_out, output_len = self.data[index]
return input, input_len, output_in, output_out, output_len
image info

PyTorch pad_packed_sequence 优化技巧

PyTorch 实现 seq-to-seq 模型一般会使用 pack_padded_sequence 以及 pad_packed_sequence 来减少计算量,本质上可以认为根据pad大小分批进行矩阵运算,减少被pad的矩阵元素导致的无效运算,详细的解释可以参考 https://github.com/sgrvinod/a-PyTorch-Tutorial-to-Image-Captioning#decoder-1。

image info

对应代码如下:

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class RNNEncoder(nn.Module):
rnn: Union[nn.LSTM, nn.GRU, nn.RNN]

def __init__(self, rnn_type: str, bidirectional: bool, num_layers: int, input_size: int, hidden_size: int, dropout: float):
super(RNNEncoder, self).__init__()
if bidirectional:
assert hidden_size % 2 == 0
hidden_size = hidden_size // 2
self.rnn = rnn_init(rnn_type, input_size=input_size, hidden_size=hidden_size, bidirectional=bidirectional,num_layers=num_layers, dropout=dropout)

def forward(self, src: Tensor, src_lengths: Tensor, hidden: Tensor = None) -> Tuple[Tensor, Tensor]:
lengths = src_lengths.view(-1).tolist()
packed_src = pack_padded_sequence(src, lengths)
memory_bank, hidden_final = self.rnn(packed_src, hidden)
memory_bank = pad_packed_sequence(memory_bank)[0]
return memory_bank, hidden_final

注意力机制相关代码

{linenos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Attention(nn.Module):
linear_out: nn.Linear

def __init__(self, dim: int):
super(Attention, self).__init__()
self.linear_out = nn.Linear(dim * 2, dim, bias=False)

def score(self, src: Tensor, target: Tensor) -> Tensor:
batch_size, src_len, dim = src.size()
_, target_len, _ = target.size()
target_ = target
src_ = src.transpose(1, 2)
return torch.bmm(target_, src_)

def forward(self, src: Tensor, target: Tensor, src_lengths: Tensor) -> Tuple[Tensor, Tensor]:
assert target.dim() == 3

batch_size, src_len, dim = src.size()
_, target_len, _ = target.size()

align_score = self.score(src, target)

mask = sequence_mask(src_lengths)
# (batch_size, max_len) -> (batch_size, 1, max_len)
mask = mask.unsqueeze(1)
align_score.data.masked_fill_(~mask, -float('inf'))
align_score = F.softmax(align_score, -1)

c = torch.bmm(align_score, src)

concat_c = torch.cat([c, target], -1)
attn_h = self.linear_out(concat_c)

return attn_h, align_score

参考资料

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×