The issue where the implemented PPO algorithm fails to train

19 hours ago 2
ARTICLE AD BOX

I wrote a PPO-based reinforcement learning code for the Gymnasium CarRacing-v3 environment.

(The code was generated with the help of Gemini)

However, even after 200,000 frames, the training does not seem to improve.

<Result>

Frame: 40000 | Mean Reward: 0.07 | Loss: 3.1248 Frame: 80000 | Mean Reward: 0.04 | Loss: 7.0629 Frame: 120000 | Mean Reward: 0.06 | Loss: 3.9565 Frame: 160000 | Mean Reward: 0.07 | Loss: 5.4525 Frame: 200000 | Mean Reward: 0.03 | Loss: 3.8550

I would like to know if there are any issues or mistakes in the code below.

import gymnasium as gym from collections import deque from gymnasium.spaces import Box from gymnasium.wrappers import GrayscaleObservation, ResizeObservation import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F from torch.distributions import Normal import numpy as np max_frames = 200000 action_repeat = 4 class EarlyStopWrapper(gym.Wrapper): def __init__(self, env, patience=40): super().__init__(env) self.patience = patience self.neg_counter = 0 self.step_count = 0 def reset(self, **kwargs): self.neg_counter = 0 self.step_count = 0 return self.env.reset(**kwargs) def step(self, action): obs, reward, terminated, truncated, info = self.env.step(action) self.step_count += 1 if self.step_count > 50: if reward < 0: self.neg_counter += 1 else: self.neg_counter = 0 if self.neg_counter >= self.patience: truncated = True reward -= 5.0 return obs, reward, terminated, truncated, info class ActionRepeat(gym.Wrapper): def __init__(self, env, repeat): super().__init__(env) self.repeat = repeat def step(self, action): total_reward = 0.0 terminated = False truncated = False for _ in range(self.repeat): obs, reward, term, trunc, info = self.env.step(action) total_reward += reward terminated = term or terminated truncated = trunc or truncated if terminated or truncated: break return obs, total_reward, terminated, truncated, info class CustomRewardWrapper(gym.Wrapper): def step(self, action): obs, reward, terminated, truncated, info = self.env.step(action) if reward > 0: reward *= 1.2 return obs, reward, terminated, truncated, info class FrameStack(gym.Wrapper): def __init__(self, env, num_stack): super().__init__(env) self.num_stack = num_stack self.frames = deque(maxlen=num_stack) low = np.repeat(env.observation_space.low[np.newaxis, ...], num_stack, axis=0) high = np.repeat(env.observation_space.high[np.newaxis, ...], num_stack, axis=0) self.observation_space = Box(low=low, high=high, dtype=env.observation_space.dtype) def reset(self, **kwargs): obs, info = self.env.reset(**kwargs) for _ in range(self.num_stack): self.frames.append(obs) return self._get_obs(), info def step(self, action): obs, reward, terminated, truncated, info = self.env.step(action) self.frames.append(obs) return self._get_obs(), reward, terminated, truncated, info def _get_obs(self): return np.array(self.frames) def preprocess(obs): obs = torch.from_numpy(obs).float() / 255.0 return obs def layer_init(layer, std=np.sqrt(2), bias_const=0.0): torch.nn.init.orthogonal_(layer.weight, std) torch.nn.init.constant_(layer.bias, bias_const) return layer class CarPolicy(nn.Module): def __init__(self): super().__init__() self.conv = nn.Sequential( layer_init(nn.Conv2d(4, 32, kernel_size=3, stride=2)), nn.ReLU(), layer_init(nn.Conv2d(32, 64, kernel_size=3, stride=2)), nn.ReLU(), layer_init(nn.Conv2d(64, 128, kernel_size=3, stride=2)), nn.ReLU(), nn.Flatten() ) self.fc = nn.Sequential(layer_init(nn.Linear(15488, 256)), nn.ReLU()) self.fc_mu = layer_init(nn.Linear(256, 3), std=0.01) self.fc_std = layer_init(nn.Linear(256, 3), std=0.01) self.fc_value = layer_init(nn.Linear(256, 1), std=1) def forward(self, x): x = self.fc(self.conv(x)) raw_mu = self.fc_mu(x) mu_steer = torch.tanh(raw_mu[:, 0:1]) mu_gas = torch.sigmoid(raw_mu[:, 1:2]) mu_brake = torch.sigmoid(raw_mu[:, 2:3]) mu = torch.cat([mu_steer, mu_gas, mu_brake], dim=1) std = F.softplus(self.fc_std(x)) + 0.001 std = torch.clamp(std, 0.001, 1.0) dist = Normal(mu, std) action = dist.sample() log_prob = dist.log_prob(action).sum(dim=-1) value = self.fc_value(x) return action, log_prob, value, dist def compute_gae(next_value, rewards, masks, values, gamma=0.99, tau=0.95): values = values + [next_value] gae = 0 returns = [] advantages = [] for step in reversed(range(len(rewards))): delta = rewards[step] + gamma * values[step + 1] * masks[step] - values[step] gae = delta + gamma * tau * masks[step] * gae advantages.append(gae) returns.append(gae + values[step]) returns.reverse() advantages.reverse() returns = torch.tensor(returns, dtype=torch.float32).to(device) advantages = torch.tensor(advantages, dtype=torch.float32).to(device) return returns, advantages def main(): env = gym.make("CarRacing-v3", render_mode=None) env = GrayscaleObservation(env, keep_dim=False) env = EarlyStopWrapper(env, patience=20) env = CustomRewardWrapper(env) env = ActionRepeat(env, repeat=action_repeat) env = FrameStack(env, num_stack=4) model = CarPolicy().to(device) optimizer = optim.Adam(model.parameters(), lr=1e-4) batch_size = 2000 obs, _ = env.reset() total_frame = 0 while total_frame < max_frames: states, actions, log_probs, rewards, masks, values = [], [], [], [], [], [] for _ in range(batch_size): input_tensor = preprocess(obs).unsqueeze(0).to(device) with torch.no_grad(): action, log_prob, value, _ = model(input_tensor) real_action = action[0].cpu().numpy() next_obs, reward, done, truncated, _ = env.step(real_action) states.append(input_tensor) actions.append(action) log_probs.append(log_prob) rewards.append(reward) masks.append(1 - (done or truncated)) values.append(value.item()) obs = next_obs if done or truncated: obs, _ = env.reset() next_input = preprocess(obs).unsqueeze(0).to(device) with torch.no_grad(): _, _, next_value, _ = model(next_input) returns, advantages = compute_gae(next_value.item(), rewards, masks, values) advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) states_batch = torch.cat(states) actions_batch = torch.cat(actions) old_log_probs_batch = torch.cat(log_probs) for _ in range(10): _, new_log_probs, new_values, dist = model(states_batch) ratio = torch.exp(new_log_probs - old_log_probs_batch) surr1 = ratio * advantages surr2 = torch.clamp(ratio, 0.8, 1.2) * advantages actor_loss = -torch.min(surr1, surr2).mean() critic_loss = F.mse_loss(new_values.squeeze(), returns) entropy_loss = dist.entropy().mean() loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy_loss optimizer.zero_grad() loss.backward() optimizer.step() total_frame += batch_size * action_repeat env.close() if __name__ == "__main__": main()
Read Entire Article