"""MAPPO algorithm."""
import numpy as np
import torch
import torch.nn as nn
from harl.utils.envs_tools import check
from harl.utils.models_tools import get_grad_norm
from harl.algorithms.actors.on_policy_base import OnPolicyBase
[docs]
class MAPPO(OnPolicyBase):
def __init__(self, args, obs_space, act_space, device=torch.device("cpu")):
"""Initialize MAPPO algorithm.
Args:
args: (dict) arguments.
obs_space: (gym.spaces or list) observation space.
act_space: (gym.spaces) action space.
device: (torch.device) device to use for tensor operations.
"""
super(MAPPO, self).__init__(args, obs_space, act_space, device)
self.clip_param = args["clip_param"]
self.ppo_epoch = args["ppo_epoch"]
self.actor_num_mini_batch = args["actor_num_mini_batch"]
self.entropy_coef = args["entropy_coef"]
self.use_max_grad_norm = args["use_max_grad_norm"]
self.max_grad_norm = args["max_grad_norm"]
[docs]
def update(self, sample):
"""Update actor network.
Args:
sample: (Tuple) contains data batch with which to update networks.
Returns:
policy_loss: (torch.Tensor) actor(policy) loss value.
dist_entropy: (torch.Tensor) action entropies.
actor_grad_norm: (torch.Tensor) gradient norm from actor update.
imp_weights: (torch.Tensor) importance sampling weights.
"""
(
obs_batch,
rnn_states_batch,
actions_batch,
masks_batch,
active_masks_batch,
old_action_log_probs_batch,
adv_targ,
available_actions_batch,
) = sample
old_action_log_probs_batch = check(old_action_log_probs_batch).to(**self.tpdv)
adv_targ = check(adv_targ).to(**self.tpdv)
active_masks_batch = check(active_masks_batch).to(**self.tpdv)
# reshape to do in a single forward pass for all steps
action_log_probs, dist_entropy, _ = self.evaluate_actions(
obs_batch,
rnn_states_batch,
actions_batch,
masks_batch,
available_actions_batch,
active_masks_batch,
)
# update actor
imp_weights = getattr(torch, self.action_aggregation)(
torch.exp(action_log_probs - old_action_log_probs_batch),
dim=-1,
keepdim=True,
)
surr1 = imp_weights * adv_targ
surr2 = torch.clamp(imp_weights, 1.0 - self.clip_param, 1.0 + self.clip_param) * adv_targ
if self.use_policy_active_masks:
policy_action_loss = (
-torch.sum(torch.min(surr1, surr2), dim=-1, keepdim=True) * active_masks_batch
).sum() / active_masks_batch.sum()
else:
policy_action_loss = -torch.sum(torch.min(surr1, surr2), dim=-1, keepdim=True).mean()
policy_loss = policy_action_loss
self.actor_optimizer.zero_grad()
(policy_loss - dist_entropy * self.entropy_coef).backward()
if self.use_max_grad_norm:
actor_grad_norm = nn.utils.clip_grad_norm_(self.actor.parameters(), self.max_grad_norm)
else:
actor_grad_norm = get_grad_norm(self.actor.parameters())
self.actor_optimizer.step()
return policy_loss, dist_entropy, actor_grad_norm, imp_weights
[docs]
def train(self, actor_buffer, advantages, state_type):
"""Perform a training update for non-parameter-sharing MAPPO using minibatch GD.
Args:
actor_buffer: (OnPolicyActorBuffer) buffer containing training data related to actor.
advantages: (np.ndarray) advantages.
state_type: (str) type of state.
Returns:
train_info: (dict) contains information regarding training update (e.g. loss, grad norms, etc).
"""
train_info = {}
train_info["policy_loss"] = 0
train_info["dist_entropy"] = 0
train_info["actor_grad_norm"] = 0
train_info["ratio"] = 0
if np.all(actor_buffer.active_masks[:-1] == 0.0):
return train_info
if state_type == "EP":
advantages_copy = advantages.copy()
advantages_copy[actor_buffer.active_masks[:-1] == 0.0] = np.nan
mean_advantages = np.nanmean(advantages_copy)
std_advantages = np.nanstd(advantages_copy)
advantages = (advantages - mean_advantages) / (std_advantages + 1e-5)
for _ in range(self.ppo_epoch):
if self.use_recurrent_policy:
data_generator = actor_buffer.recurrent_generator_actor(
advantages, self.actor_num_mini_batch, self.data_chunk_length
)
elif self.use_naive_recurrent_policy:
data_generator = actor_buffer.naive_recurrent_generator_actor(
advantages, self.actor_num_mini_batch
)
else:
data_generator = actor_buffer.feed_forward_generator_actor(
advantages, self.actor_num_mini_batch
)
for sample in data_generator:
policy_loss, dist_entropy, actor_grad_norm, imp_weights = self.update(sample)
train_info["policy_loss"] += policy_loss.item()
train_info["dist_entropy"] += dist_entropy.item()
train_info["actor_grad_norm"] += actor_grad_norm
train_info["ratio"] += imp_weights.mean()
num_updates = self.ppo_epoch * self.actor_num_mini_batch
for k in train_info.keys():
train_info[k] /= num_updates
return train_info
[docs]
def share_param_train(self, actor_buffer, advantages, num_agents, state_type):
"""Perform a training update for parameter-sharing MAPPO using minibatch GD.
Args:
actor_buffer: (list[OnPolicyActorBuffer]) buffer containing training data related to actor.
advantages: (np.ndarray) advantages.
num_agents: (int) number of agents.
state_type: (str) type of state.
Returns:
train_info: (dict) contains information regarding training update (e.g. loss, grad norms, etc).
"""
train_info = {}
train_info["policy_loss"] = 0
train_info["dist_entropy"] = 0
train_info["actor_grad_norm"] = 0
train_info["ratio"] = 0
if state_type == "EP":
advantages_ori_list = []
advantages_copy_list = []
for agent_id in range(num_agents):
advantages_ori = advantages.copy()
advantages_ori_list.append(advantages_ori)
advantages_copy = advantages.copy()
advantages_copy[actor_buffer[agent_id].active_masks[:-1] == 0.0] = np.nan
advantages_copy_list.append(advantages_copy)
advantages_ori_tensor = np.array(advantages_ori_list)
advantages_copy_tensor = np.array(advantages_copy_list)
mean_advantages = np.nanmean(advantages_copy_tensor)
std_advantages = np.nanstd(advantages_copy_tensor)
normalized_advantages = (advantages_ori_tensor - mean_advantages) / (
std_advantages + 1e-5
)
advantages_list = []
for agent_id in range(num_agents):
advantages_list.append(normalized_advantages[agent_id])
elif state_type == "FP":
advantages_list = []
for agent_id in range(num_agents):
advantages_list.append(advantages[:, :, agent_id])
for _ in range(self.ppo_epoch):
data_generators = []
for agent_id in range(num_agents):
if self.use_recurrent_policy:
data_generator = actor_buffer[agent_id].recurrent_generator_actor(
advantages_list[agent_id],
self.actor_num_mini_batch,
self.data_chunk_length,
)
elif self.use_naive_recurrent_policy:
data_generator = actor_buffer[agent_id].naive_recurrent_generator_actor(
advantages_list[agent_id], self.actor_num_mini_batch
)
else:
data_generator = actor_buffer[agent_id].feed_forward_generator_actor(
advantages_list[agent_id], self.actor_num_mini_batch
)
data_generators.append(data_generator)
for _ in range(self.actor_num_mini_batch):
batches = [[] for _ in range(8)]
for generator in data_generators:
sample = next(generator)
for i in range(8):
batches[i].append(sample[i])
for i in range(7):
batches[i] = np.concatenate(batches[i], axis=0)
if batches[7][0] is None:
batches[7] = None
else:
batches[7] = np.concatenate(batches[7], axis=0)
policy_loss, dist_entropy, actor_grad_norm, imp_weights = self.update(
tuple(batches)
)
train_info["policy_loss"] += policy_loss.item()
train_info["dist_entropy"] += dist_entropy.item()
train_info["actor_grad_norm"] += actor_grad_norm
train_info["ratio"] += imp_weights.mean()
num_updates = self.ppo_epoch * self.actor_num_mini_batch
for k in train_info.keys():
train_info[k] /= num_updates
return train_info