From 69d5343894da3ad526de4b7017e74f3c975ab026 Mon Sep 17 00:00:00 2001 From: Sebastian Dittert Date: Tue, 3 Oct 2023 16:11:23 +0200 Subject: [PATCH] [Algorithm] Update DDPG Example (#1525) Co-authored-by: vmoens --- .../linux_examples/scripts/run_test.sh | 10 +- examples/ddpg/config.yaml | 38 +++--- examples/ddpg/ddpg.py | 127 ++++++++++-------- examples/ddpg/utils.py | 99 +++++++++----- 4 files changed, 164 insertions(+), 110 deletions(-) diff --git a/.github/unittest/linux_examples/scripts/run_test.sh b/.github/unittest/linux_examples/scripts/run_test.sh index 112152ed7fc..2a9e258c35a 100755 --- a/.github/unittest/linux_examples/scripts/run_test.sh +++ b/.github/unittest/linux_examples/scripts/run_test.sh @@ -66,13 +66,12 @@ python .github/unittest/helpers/coverage_run_parallel.py examples/ppo/ppo_atari. python .github/unittest/helpers/coverage_run_parallel.py examples/ddpg/ddpg.py \ collector.total_frames=48 \ collector.init_random_frames=10 \ - optimization.batch_size=10 \ + optim.batch_size=10 \ collector.frames_per_batch=16 \ - collector.num_workers=4 \ collector.env_per_collector=2 \ collector.collector_device=cuda:0 \ network.device=cuda:0 \ - optimization.utd_ratio=1 \ + optim.utd_ratio=1 \ replay_buffer.size=120 \ env.name=Pendulum-v1 \ logger.backend= @@ -183,13 +182,12 @@ python .github/unittest/helpers/coverage_run_parallel.py examples/dreamer/dreame python .github/unittest/helpers/coverage_run_parallel.py examples/ddpg/ddpg.py \ collector.total_frames=48 \ collector.init_random_frames=10 \ - optimization.batch_size=10 \ + optim.batch_size=10 \ collector.frames_per_batch=16 \ - collector.num_workers=2 \ collector.env_per_collector=1 \ collector.collector_device=cuda:0 \ network.device=cuda:0 \ - optimization.utd_ratio=1 \ + optim.utd_ratio=1 \ replay_buffer.size=120 \ env.name=Pendulum-v1 \ logger.backend= diff --git a/examples/ddpg/config.yaml b/examples/ddpg/config.yaml index 464632f8bf3..5997ccb8fb3 100644 --- a/examples/ddpg/config.yaml +++ b/examples/ddpg/config.yaml @@ -1,45 +1,47 @@ -# Environment +# environment and task env: name: HalfCheetah-v3 task: "" - exp_name: "HalfCheetah-DDPG" - library: gym - frame_skip: 1 - seed: 1 + exp_name: ${env.name}_DDPG + library: gymnasium + max_episode_steps: 1000 + seed: 42 -# Collection +# collector collector: - total_frames: 1000000 - init_random_frames: 10000 + total_frames: 1_000_000 + init_random_frames: 25_000 frames_per_batch: 1000 - max_frames_per_traj: 1000 init_env_steps: 1000 - async_collection: 1 + reset_at_each_iter: False collector_device: cpu env_per_collector: 1 - num_workers: 1 -# Replay Buffer + +# replay buffer replay_buffer: size: 1000000 prb: 0 # use prioritized experience replay + scratch_dir: ${env.exp_name}_${env.seed} -# Optimization -optimization: +# optimization +optim: utd_ratio: 1.0 gamma: 0.99 - loss_function: smooth_l1 - lr: 3e-4 - weight_decay: 2e-4 + loss_function: l2 + lr: 3.0e-4 + weight_decay: 1e-4 batch_size: 256 target_update_polyak: 0.995 +# network network: hidden_sizes: [256, 256] activation: relu device: "cuda:0" + noise_type: "ou" # ou or gaussian -# Logging +# logging logger: backend: wandb mode: online diff --git a/examples/ddpg/ddpg.py b/examples/ddpg/ddpg.py index b77494bc52f..273947569be 100644 --- a/examples/ddpg/ddpg.py +++ b/examples/ddpg/ddpg.py @@ -11,15 +11,19 @@ The helper functions are coded in the utils.py associated with this script. """ +import time + import hydra import numpy as np import torch import torch.cuda import tqdm + from torchrl.envs.utils import ExplorationType, set_exploration_type from torchrl.record.loggers import generate_exp_name, get_logger from utils import ( + log_metrics, make_collector, make_ddpg_agent, make_environment, @@ -33,6 +37,7 @@ def main(cfg: "DictConfig"): # noqa: F821 device = torch.device(cfg.network.device) + # Create logger exp_name = generate_exp_name("DDPG", cfg.env.exp_name) logger = None if cfg.logger.backend: @@ -43,137 +48,149 @@ def main(cfg: "DictConfig"): # noqa: F821 wandb_kwargs={"mode": cfg.logger.mode, "config": cfg}, ) + # Set seeds torch.manual_seed(cfg.env.seed) np.random.seed(cfg.env.seed) - # Create Environments + # Create environments train_env, eval_env = make_environment(cfg) - # Create Agent + # Create agent model, exploration_policy = make_ddpg_agent(cfg, train_env, eval_env, device) - # Create Loss Module and Target Updater + # Create DDPG loss loss_module, target_net_updater = make_loss_module(cfg, model) - # Make Off-Policy Collector + # Create off-policy collector collector = make_collector(cfg, train_env, exploration_policy) - # Make Replay Buffer + # Create replay buffer replay_buffer = make_replay_buffer( - batch_size=cfg.optimization.batch_size, + batch_size=cfg.optim.batch_size, prb=cfg.replay_buffer.prb, buffer_size=cfg.replay_buffer.size, + buffer_scratch_dir="/tmp/" + cfg.replay_buffer.scratch_dir, device=device, ) - # Make Optimizers + # Create optimizers optimizer_actor, optimizer_critic = make_optimizer(cfg, loss_module) - rewards = [] - rewards_eval = [] - # Main loop + start_time = time.time() collected_frames = 0 pbar = tqdm.tqdm(total=cfg.collector.total_frames) - r0 = None - q_loss = None init_random_frames = cfg.collector.init_random_frames num_updates = int( cfg.collector.env_per_collector * cfg.collector.frames_per_batch - * cfg.optimization.utd_ratio + * cfg.optim.utd_ratio ) prb = cfg.replay_buffer.prb - env_per_collector = cfg.collector.env_per_collector - frames_per_batch, frame_skip = cfg.collector.frames_per_batch, cfg.env.frame_skip + frames_per_batch = cfg.collector.frames_per_batch eval_iter = cfg.logger.eval_iter - eval_rollout_steps = cfg.collector.max_frames_per_traj // frame_skip + eval_rollout_steps = cfg.env.max_episode_steps - for i, tensordict in enumerate(collector): + sampling_start = time.time() + for _, tensordict in enumerate(collector): + sampling_time = time.time() - sampling_start + # Update exploration policy exploration_policy.step(tensordict.numel()) - # update weights of the inference policy + + # Update weights of the inference policy collector.update_policy_weights_() - if r0 is None: - r0 = tensordict["next", "reward"].sum(-1).mean().item() pbar.update(tensordict.numel()) tensordict = tensordict.reshape(-1) current_frames = tensordict.numel() + # Add to replay buffer replay_buffer.extend(tensordict.cpu()) collected_frames += current_frames - # optimization steps + # Optimization steps + training_start = time.time() if collected_frames >= init_random_frames: ( actor_losses, q_losses, ) = ([], []) for _ in range(num_updates): - # sample from replay buffer + # Sample from replay buffer sampled_tensordict = replay_buffer.sample().clone() + # Compute loss loss_td = loss_module(sampled_tensordict) - optimizer_critic.zero_grad() - optimizer_actor.zero_grad() - actor_loss = loss_td["loss_actor"] q_loss = loss_td["loss_value"] - (actor_loss + q_loss).backward() + # Update critic + optimizer_critic.zero_grad() + q_loss.backward() optimizer_critic.step() - q_losses.append(q_loss.item()) + # Update actor + optimizer_actor.zero_grad() + actor_loss.backward() optimizer_actor.step() + + q_losses.append(q_loss.item()) actor_losses.append(actor_loss.item()) - # update qnet_target params + # Update qnet_target params target_net_updater.step() - # update priority + # Update priority if prb: replay_buffer.update_priority(sampled_tensordict) - rewards.append( - (i, tensordict["next", "reward"].sum().item() / env_per_collector) + training_time = time.time() - training_start + episode_end = ( + tensordict["next", "done"] + if tensordict["next", "done"].any() + else tensordict["next", "truncated"] ) - train_log = { - "train_reward": rewards[-1][1], - "collected_frames": collected_frames, - } - if q_loss is not None: - train_log.update( - { - "actor_loss": np.mean(actor_losses), - "q_loss": np.mean(q_losses), - } + episode_rewards = tensordict["next", "episode_reward"][episode_end] + + # Logging + metrics_to_log = {} + if len(episode_rewards) > 0: + episode_length = tensordict["next", "step_count"][episode_end] + metrics_to_log["train/reward"] = episode_rewards.mean().item() + metrics_to_log["train/episode_length"] = episode_length.sum().item() / len( + episode_length ) - if logger is not None: - for key, value in train_log.items(): - logger.log_scalar(key, value, step=collected_frames) - if abs(collected_frames % eval_iter) < frames_per_batch * frame_skip: + + if collected_frames >= init_random_frames: + metrics_to_log["train/q_loss"] = np.mean(q_losses) + metrics_to_log["train/a_loss"] = np.mean(actor_losses) + metrics_to_log["train/sampling_time"] = sampling_time + metrics_to_log["train/training_time"] = training_time + + # Evaluation + if abs(collected_frames % eval_iter) < frames_per_batch: with set_exploration_type(ExplorationType.MODE), torch.no_grad(): + eval_start = time.time() eval_rollout = eval_env.rollout( eval_rollout_steps, exploration_policy, auto_cast_to_device=True, break_when_any_done=True, ) + eval_time = time.time() - eval_start eval_reward = eval_rollout["next", "reward"].sum(-2).mean().item() - rewards_eval.append((i, eval_reward)) - eval_str = f"eval cumulative reward: {rewards_eval[-1][1]: 4.4f} (init: {rewards_eval[0][1]: 4.4f})" - if logger is not None: - logger.log_scalar( - "evaluation_reward", rewards_eval[-1][1], step=collected_frames - ) - if len(rewards_eval): - pbar.set_description( - f"reward: {rewards[-1][1]: 4.4f} (r0 = {r0: 4.4f})," + eval_str - ) + metrics_to_log["eval/reward"] = eval_reward + metrics_to_log["eval/time"] = eval_time + if logger is not None: + log_metrics(logger, metrics_to_log, collected_frames) + sampling_start = time.time() collector.shutdown() + end_time = time.time() + execution_time = end_time - start_time + print(f"Training took {execution_time:.2f} seconds to finish") if __name__ == "__main__": diff --git a/examples/ddpg/utils.py b/examples/ddpg/utils.py index ab4083fff28..5709c3ff59e 100644 --- a/examples/ddpg/utils.py +++ b/examples/ddpg/utils.py @@ -1,3 +1,7 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. import torch from torch import nn, optim @@ -10,12 +14,14 @@ EnvCreator, InitTracker, ParallelEnv, + RewardSum, + StepCounter, TransformedEnv, ) -from torchrl.envs.libs.gym import GymEnv -from torchrl.envs.transforms import RewardScaling +from torchrl.envs.libs.gym import GymEnv, set_gym_backend from torchrl.envs.utils import ExplorationType, set_exploration_type from torchrl.modules import ( + AdditiveGaussianWrapper, MLP, OrnsteinUhlenbeckProcessWrapper, SafeModule, @@ -33,17 +39,23 @@ # ----------------- -def env_maker(task, frame_skip=1, device="cpu", from_pixels=False): - return GymEnv(task, device=device, frame_skip=frame_skip, from_pixels=from_pixels) +def env_maker(task, device="cpu", from_pixels=False): + with set_gym_backend("gym"): + return GymEnv( + task, + device=device, + from_pixels=from_pixels, + ) -def apply_env_transforms(env, reward_scaling=1.0): +def apply_env_transforms(env, max_episode_steps=1000): transformed_env = TransformedEnv( env, Compose( InitTracker(), - RewardScaling(loc=0.0, scale=reward_scaling), + StepCounter(max_episode_steps), DoubleToFloat(), + RewardSum(), ), ) return transformed_env @@ -57,7 +69,9 @@ def make_environment(cfg): ) parallel_env.set_seed(cfg.env.seed) - train_env = apply_env_transforms(parallel_env) + train_env = apply_env_transforms( + parallel_env, max_episode_steps=cfg.env.max_episode_steps + ) eval_env = TransformedEnv( ParallelEnv( @@ -80,7 +94,8 @@ def make_collector(cfg, train_env, actor_model_explore): train_env, actor_model_explore, frames_per_batch=cfg.collector.frames_per_batch, - max_frames_per_traj=cfg.collector.max_frames_per_traj, + init_random_frames=cfg.collector.init_random_frames, + reset_at_each_iter=cfg.collector.reset_at_each_iter, total_frames=cfg.collector.total_frames, device=cfg.collector.collector_device, ) @@ -128,17 +143,6 @@ def make_replay_buffer( # ----- -def get_activation(cfg): - if cfg.network.activation == "relu": - return nn.ReLU - elif cfg.network.activation == "tanh": - return nn.Tanh - elif cfg.network.activation == "leaky_relu": - return nn.LeakyReLU - else: - raise NotImplementedError - - def make_ddpg_agent(cfg, train_env, eval_env, device): """Make DDPG agent.""" # Define Actor Network @@ -199,10 +203,22 @@ def make_ddpg_agent(cfg, train_env, eval_env, device): eval_env.close() # Exploration wrappers: - actor_model_explore = OrnsteinUhlenbeckProcessWrapper( - model[0], - annealing_num_steps=1_000_000, - ).to(device) + if cfg.network.noise_type == "ou": + actor_model_explore = OrnsteinUhlenbeckProcessWrapper( + model[0], + annealing_num_steps=1_000_000, + ).to(device) + elif cfg.network.noise_type == "gaussian": + actor_model_explore = AdditiveGaussianWrapper( + model[0], + sigma_end=1.0, + sigma_init=1.0, + mean=0.0, + std=0.1, + ).to(device) + else: + raise NotImplementedError + return model, actor_model_explore @@ -217,14 +233,14 @@ def make_loss_module(cfg, model): loss_module = DDPGLoss( actor_network=model[0], value_network=model[1], - loss_function=cfg.optimization.loss_function, + loss_function=cfg.optim.loss_function, + delay_actor=True, + delay_value=True, ) - loss_module.make_value_estimator(gamma=cfg.optimization.gamma) + loss_module.make_value_estimator(gamma=cfg.optim.gamma) # Define Target Network Updater - target_net_updater = SoftUpdate( - loss_module, eps=cfg.optimization.target_update_polyak - ) + target_net_updater = SoftUpdate(loss_module, eps=cfg.optim.target_update_polyak) return loss_module, target_net_updater @@ -233,11 +249,32 @@ def make_optimizer(cfg, loss_module): actor_params = list(loss_module.actor_network_params.flatten_keys().values()) optimizer_actor = optim.Adam( - actor_params, lr=cfg.optimization.lr, weight_decay=cfg.optimization.weight_decay + actor_params, lr=cfg.optim.lr, weight_decay=cfg.optim.weight_decay ) optimizer_critic = optim.Adam( critic_params, - lr=cfg.optimization.lr, - weight_decay=cfg.optimization.weight_decay, + lr=cfg.optim.lr, + weight_decay=cfg.optim.weight_decay, ) return optimizer_actor, optimizer_critic + + +# ==================================================================== +# General utils +# --------- + + +def log_metrics(logger, metrics, step): + for metric_name, metric_value in metrics.items(): + logger.log_scalar(metric_name, metric_value, step) + + +def get_activation(cfg): + if cfg.network.activation == "relu": + return nn.ReLU + elif cfg.network.activation == "tanh": + return nn.Tanh + elif cfg.network.activation == "leaky_relu": + return nn.LeakyReLU + else: + raise NotImplementedError