Source code for harl.common.base_logger

"""Base logger."""

import time
import os
import numpy as np


[docs] class BaseLogger: """Base logger class. Used for logging information in the on-policy training pipeline. """ def __init__(self, args, algo_args, env_args, num_agents, writter, run_dir): """Initialize the logger.""" self.args = args self.algo_args = algo_args self.env_args = env_args self.task_name = self.get_task_name() self.num_agents = num_agents self.writter = writter self.run_dir = run_dir self.log_file = open( os.path.join(run_dir, "progress.txt"), "w", encoding="utf-8" )
[docs] def get_task_name(self): """Get the task name.""" raise NotImplementedError
[docs] def init(self, episodes): """Initialize the logger.""" self.start = time.time() self.episodes = episodes self.train_episode_rewards = np.zeros( self.algo_args["train"]["n_rollout_threads"] ) self.done_episodes_rewards = []
[docs] def episode_init(self, episode): """Initialize the logger for each episode.""" self.episode = episode
[docs] def per_step(self, data): """Process data per step.""" ( obs, share_obs, rewards, dones, infos, available_actions, values, actions, action_log_probs, rnn_states, rnn_states_critic, ) = data dones_env = np.all(dones, axis=1) reward_env = np.mean(rewards, axis=1).flatten() self.train_episode_rewards += reward_env for t in range(self.algo_args["train"]["n_rollout_threads"]): if dones_env[t]: self.done_episodes_rewards.append(self.train_episode_rewards[t]) self.train_episode_rewards[t] = 0
[docs] def episode_log( self, actor_train_infos, critic_train_info, actor_buffer, critic_buffer ): """Log information for each episode.""" self.total_num_steps = ( self.episode * self.algo_args["train"]["episode_length"] * self.algo_args["train"]["n_rollout_threads"] ) self.end = time.time() print( "Env {} Task {} Algo {} Exp {} updates {}/{} episodes, total num timesteps {}/{}, FPS {}.".format( self.args["env"], self.task_name, self.args["algo"], self.args["exp_name"], self.episode, self.episodes, self.total_num_steps, self.algo_args["train"]["num_env_steps"], int(self.total_num_steps / (self.end - self.start)), ) ) critic_train_info["average_step_rewards"] = critic_buffer.get_mean_rewards() self.log_train(actor_train_infos, critic_train_info) print( "Average step reward is {}.".format( critic_train_info["average_step_rewards"] ) ) if len(self.done_episodes_rewards) > 0: aver_episode_rewards = np.mean(self.done_episodes_rewards) print( "Some episodes done, average episode reward is {}.\n".format( aver_episode_rewards ) ) self.writter.add_scalar("train/average_step_rewards", aver_episode_rewards, self.total_num_steps) # self.writter.add_scalars( # "train/train_episode_rewards", # {"aver_rewards": aver_episode_rewards}, # self.total_num_steps, # ) self.done_episodes_rewards = []
[docs] def eval_init(self): """Initialize the logger for evaluation.""" self.total_num_steps = ( self.episode * self.algo_args["train"]["episode_length"] * self.algo_args["train"]["n_rollout_threads"] ) self.eval_episode_rewards = [] self.one_episode_rewards = [] for eval_i in range(self.algo_args["eval"]["n_eval_rollout_threads"]): self.one_episode_rewards.append([]) self.eval_episode_rewards.append([])
[docs] def eval_init_off_policy(self,total_num_steps): self.total_num_steps = total_num_steps self.eval_episode_rewards = [] self.one_episode_rewards = [] for eval_i in range(self.algo_args["eval"]["n_eval_rollout_threads"]): self.one_episode_rewards.append([]) self.eval_episode_rewards.append([])
[docs] def eval_per_step(self, eval_data): """Log evaluation information per step.""" ( eval_obs, eval_share_obs, eval_rewards, eval_dones, eval_infos, eval_available_actions, ) = eval_data for eval_i in range(self.algo_args["eval"]["n_eval_rollout_threads"]): self.one_episode_rewards[eval_i].append(eval_rewards[eval_i]) self.eval_infos = eval_infos
[docs] def eval_thread_done(self, tid): """Log evaluation information.""" self.eval_episode_rewards[tid].append( np.sum(self.one_episode_rewards[tid], axis=0) ) self.one_episode_rewards[tid] = []
[docs] def eval_log(self, eval_episode): """Log evaluation information.""" self.eval_episode_rewards = np.concatenate( [rewards for rewards in self.eval_episode_rewards if rewards] ) eval_env_infos = { "eval_average_episode_rewards": self.eval_episode_rewards, "eval_max_episode_rewards": [np.max(self.eval_episode_rewards)], } self.log_env(eval_env_infos) eval_avg_rew = np.mean(self.eval_episode_rewards) print("Evaluation average episode reward is {}.\n".format(eval_avg_rew)) self.log_file.write( ",".join(map(str, [self.total_num_steps, eval_avg_rew])) + "\n" ) self.log_file.flush()
[docs] def log_train(self, actor_train_infos, critic_train_info): """Log training information.""" # log actor for agent_id in range(self.num_agents): for k, v in actor_train_infos[agent_id].items(): agent_k = "train/agent%i/" % agent_id + k # self.writter.add_scalars(f"train/{agent_k}", {agent_k: v}, self.total_num_steps) tag = f"train/agent{agent_id}/{k}" # Simplified tag self.writter.add_scalar(tag, v, self.total_num_steps) # log critic for k, v in critic_train_info.items(): critic_k = "critic/" + k # self.writter.add_scalars(f"train/{critic_k}", {critic_k: v}, self.total_num_steps) tag = f"train/critic/{k}" # Simplified tag self.writter.add_scalar(tag, v, self.total_num_steps)
[docs] def log_env(self, env_infos): """Log environment information.""" for k, v in env_infos.items(): if len(v) > 0: # self.writter.add_scalars(f"train/{k}", {k: np.mean(v)}, self.total_num_steps) tag = f"metrics/{k}" # Use 'metrics' for evaluation metrics self.writter.add_scalar(tag, np.mean(v), self.total_num_steps)
[docs] def close(self): """Close the logger.""" self.log_file.close()