Utils Package

Includes common utilities, configuration tools, environment creation, data management, and optimization algorithms.

Includes common utilities, controllers, rewards, wrappers and custom callbacks.

Submodules

utils.checkpoint_manager.load_checkpoint(path, actor, critic, actor_opt=None, critic_opt=None, device='cpu', reward_stats=None, critic_obs_stats=None)[source]

Loads model and optimizer states, and optionally running stats states.

utils.checkpoint_manager.save_checkpoint(step, actor, critic, actor_opt, critic_opt, save_dir, is_best=False, filename=None, **kwargs)[source]

Saves model and optimizer states, plus optional extra data.

utils.config_loader.load_json(path)[source]
utils.config_loader.load_yaml(path)[source]
utils.config_logger.setup_logger(log_dir, enable_logger)[source]

This file is used to read the data center configuration from user inputs provided inside dc_config.json. It also performs some auxiliary steps to calculate the server power specifications based on the given parameters.

class utils.dc_config_reader.DC_Config(dc_config_file='dc_config.json', total_cores=0, total_gpus=0, total_mem_GB=0, datacenter_capacity_mw=0)[source]

Bases: object

This file contains the data center configurations that may be customized by the user for designing the data center. The use of this file has been deprecated. Any changes to this file will not be reflected in the actual data center design. Instead, modify utils/dc_config.json to design the data center.

utils.make_envs.make_bat_fwd_env(month, max_bat_cap_Mwh=2.0, charging_rate=0.5, max_dc_pw_MW=7.23, dcload_max=2.5, dcload_min=0.1, n_fwd_steps=4, init_day=0)[source]

Method to build the Battery environment.

Parameters:
  • month (int) – Month of the year in which the agent is training.

  • max_bat_cap_Mwh (float, optional) – Max battery capacity. Defaults to 2.0.

  • charging_rate (float, optional) – Charging rate of the battery. Defaults to 0.5.

  • reward_method (str, optional) – Method used to calculate the rewards. Defaults to ‘default_bat_reward’.

Returns:

Batery environment.

Return type:

battery_env_fwd

utils.make_envs.make_dc_env(month=1, location='NYIS', dc_config_file='dc_config_file.json', datacenter_capacity_mw=1, max_bat_cap_Mw=2.0, add_cpu_usage=True, add_gpu_usage=True, add_CI=True, episode_length_in_time=None, use_ls_cpu_load=False, use_ls_gpu_load=False, num_sin_cos_vars=4, total_cores=0, total_gpus=0, dc_memory_GB=0)[source]

Method that creates the data center environment with the timeline, location, proper data files, gym specifications and auxiliary methods

Parameters:
  • month (int, optional) – The month of the year for which the Environment uses the weather and Carbon Intensity data. Defaults to 1.

  • location (str, optional) – The geographical location in a standard format for which Carbon Intensity files are accessed. Supported options are ‘NYIS’, ‘AZPS’, ‘BPAT’. Defaults to ‘NYIS’.

  • dc_memory_GB (int, optional) – The total avaialble memory in a datacenter

  • datacenter_capacity_mw (int, optional) – Maximum capacity (MW) of the data center. This value will scale the number of servers installed in the data center.

  • max_bat_cap_Mw (float, optional) – The battery capacity in Megawatts for the installed battery. Defaults to 2.0.

  • add_cpu_usage (bool, optional) – Boolean Flag to indicate whether cpu usage is part of the environment statespace. Defaults to True.

  • add_gpu_usage (bool, optional) – Boolean Flag to indicate whether gpu usage is part of the environment statespace. Defaults to True.

  • add_CI (bool, optional) – Boolean Flag to indicate whether Carbon Intensity is part of the environment statespace. Defaults to True.

  • episode_length_in_time (pd.Timedelta, optional) – Length of an episode in terms of pandas time-delta object. Defaults to None.

  • use_ls_cpu_load (bool, optional) – Use the cpu workload value from a separate Load Shifting agent. This turns of reading default cpu data. Defaults to False.

  • use_ls_gpu_load (bool, optional) – Use the gpu workload value from a separate Load Shifting agent. This turns of reading default gpu data. Defaults to False.

  • num_sin_cos_vars (int, optional) – Number of sin and cosine variable that will be added externally from the centralized data source

Returns:

The environment instantiated with the particular month.

Return type:

envs.dc_gym.dc_gymenv

utils.make_envs.make_ls_env(month, n_vars_ci=4, n_vars_energy=4, n_vars_battery=1, queue_max_len=500, test_mode=False)[source]

Method to build the Load shifting environment

Parameters:
  • month (int) – Month of the year in which the agent is training.

  • n_vars_energy (int, optional) – Number of variables from the Energy environment. Defaults to 4.

  • n_vars_battery (int, optional) – Number of variables from the Battery environment. Defaults to 1.

  • queue_max_len (int, optional) – The size of the queue where the tasks are stored to be processed latter. Default to 500.

Returns:

Load Shifting environment

Return type:

CarbonLoadEnv

class utils.managers.CI_Manager(location, simulation_year=2020, init_day=0, future_steps=4, weight=0.1, desired_std_dev=5, timezone_shift=0)[source]

Bases: object

Manager of the carbon intensity data.

Parameters:
  • filename (str, optional) – Filename of the carbon intensity data. Defaults to ‘’.

  • location (str, optional) – Location identifier. Defaults to ‘NYIS’.

  • init_day (int, optional) – Initial day of the episode. Defaults to 0.

  • future_steps (int, optional) – Number of steps of the CI forecast. Defaults to 4.

  • weight (float, optional) – Weight value for coherent noise. Defaults to 0.1.

  • desired_std_dev (float, optional) – Desired standard deviation for coherent noise. Defaults to 5.

  • timezone_shift (int, optional) – Shift for the timezone. Defaults to 0.

get_current_ci(norm=True)[source]
get_forecast_ci()[source]
get_n_past_ci(n)[source]
reset(init_day=None, init_hour=None, seed=None)[source]
step()[source]
class utils.managers.CoherentNoise(base, weight, desired_std_dev=0.1, scale=1)[source]

Bases: object

Class to add coherent noise to the data.

Parameters:
  • base (List[float]) – Base data

  • weight (float) – Weight of the noise to be added

  • desired_std_dev (float, optional) – Desired standard deviation. Defaults to 0.1.

  • scale (int, optional) – Scale. Defaults to 1.

generate(n_steps)[source]

Generate coherent noise

Parameters:

n_steps (int) – Length of the data to generate.

Returns:

Array of generated coherent noise.

Return type:

numpy.ndarray

seed(seed=None)[source]
class utils.managers.ElectricityPrice_Manager(location, simulation_year, timezone_shift=0)[source]

Bases: object

get_current_price()[source]
get_future_prices(n=1)[source]
get_past_prices(n=1)[source]
reset(init_day, init_hour, seed=None)[source]
step()[source]
class utils.managers.Time_Manager(init_day=0, timezone_shift=0, duration_days=None)[source]

Bases: object

Class to manage the time dimension over an episode and handle termination based on simulation duration.

Parameters:
  • init_day (int, optional) – Initial day of the year (0-364). Defaults to 0.

  • timezone_shift (int, optional) – Timezone shift in hours from UTC. Defaults to 0.

  • duration_days (int, optional) – Maximum duration of an episode in days. If None, the episode runs indefinitely (or until another termination condition is met). Defaults to None.

reset(init_day=None, init_hour=None, seed=None)[source]

Resets the time manager to a specific day and hour, and resets the episode step counter.

Parameters:
  • init_day (int, optional) – Day to start from (0-364). Defaults to the value provided during initialization.

  • init_hour (int, optional) – Hour to start from (0-23). Defaults to 0, adjusted by timezone_shift if not specified otherwise.

  • seed (int, optional) – Random seed (not used directly here but kept for consistency).

Returns:

Sine and cosine features for the initial hour and day.

Return type:

list

step()[source]

Advances the time by one timestep (15 minutes) and checks for episode termination based on duration.

Returns:

Current day of the year (0-364). float: Current hour of the day (0.0 - 23.75). list: Sine and cosine features for the current hour and day. bool: Done flag (True if episode duration reached, False otherwise).

Return type:

int

class utils.managers.Weather_Manager(location='US-NY-NYIS', simulation_year=2023, init_day=0, weight=0.02, desired_std_dev=0.75, timezone_shift=0, elevation=27.0, debug=False)[source]

Bases: object

get_current_temperature()[source]
get_current_wet_bulb()[source]
get_n_next_temperature(n)[source]
get_next_temperature()[source]
reset(init_day=None, init_hour=None, seed=None)[source]

Reset Weather_Manager to a specific initial day and hour.

Parameters:
  • init_day (int, optional) – Day to start from. If None, defaults to the initial day set during initialization.

  • init_hour (int, optional) – Hour to start from. If None, defaults to 0.

Returns:

Temperature at current step, normalized temperature at current step, wet bulb temperature at current step, normalized wet bulb temperature at current step.

Return type:

tuple

step()[source]

Step on the Weather_Manager

Returns:

Temperature a current step float: Normalized temperature a current step

Return type:

float

utils.managers.load_weather_data(weather_file)[source]

Reads weather data from a JSON file and converts it into a Pandas DataFrame.

Parameters:

weather_file (str) – Path to the weather JSON file.

Returns:

DataFrame containing hourly weather data.

Return type:

pd.DataFrame

utils.managers.normalize(v, min_v, max_v)[source]

Function to normalize values

Parameters:
  • v (float) – Value to be normalized

  • min_v (float) – Lower limit

  • max_v (float) – Upper limit

Returns:

Normalized value

Return type:

float

utils.managers.sc_obs(current_hour, current_day)[source]

Generate sine and cosine of the hour and day

Parameters:
  • current_hour (int) – Current hour of the day

  • current_day (int) – Current day of the year

Returns:

Sine and cosine of the hour and day

Return type:

List[float]

class utils.task_assignment_strategies.BaseRBCStrategy[source]

Bases: object

Base class for all Rule-Based Controller strategies.

reset()[source]

Resets any internal state of the strategy (optional).

class utils.task_assignment_strategies.DistributeLeastPending[source]

Bases: BaseRBCStrategy

Assigns the task to the datacenter with the fewest pending tasks.

class utils.task_assignment_strategies.DistributeLocalOnly[source]

Bases: BaseRBCStrategy

Assigns the task strictly to its origin datacenter.

class utils.task_assignment_strategies.DistributeLowestCarbon[source]

Bases: BaseRBCStrategy

Assigns the task to the available datacenter with the lowest current carbon intensity (gCO2/kWh).

class utils.task_assignment_strategies.DistributeLowestPrice[source]

Bases: BaseRBCStrategy

Assigns the task to the available datacenter with the lowest current electricity price ($/MWh).

class utils.task_assignment_strategies.DistributeLowestUtilization[source]

Bases: BaseRBCStrategy

Assigns the task to the datacenter with the highest overall average resource availability.

class utils.task_assignment_strategies.DistributeMostAvailable[source]

Bases: BaseRBCStrategy

Assigns the task to the datacenter with the MOST available CPU cores AMONG THOSE THAT CAN SCHEDULE the task.

class utils.task_assignment_strategies.DistributePriorityOrder(priority_order=['DC1', 'DC2', 'DC3', 'DC4', 'DC5'])[source]

Bases: BaseRBCStrategy

Assigns tasks following a fixed priority order of DC names, selecting the first one that can schedule the task.

class utils.task_assignment_strategies.DistributeRandom[source]

Bases: BaseRBCStrategy

Randomly assigns the task to one of the available datacenters.

class utils.task_assignment_strategies.DistributeRoundRobin[source]

Bases: BaseRBCStrategy

Assigns tasks in a round-robin fashion across datacenters.

reset()[source]

Resets the round-robin index.

utils.transmission_cost_loader.load_transmission_matrix(cloud_provider)[source]

Loads and caches the transmission matrix CSV for a given cloud provider.

This module maps real-world datacenter location codes to cloud-specific transmission cost regions.

Supported providers: “gcp”, “aws”, “azure”, and “custom”

To define a custom cost matrix: 1. Add your mapping in location_to_custom_region 2. Save your transmission cost CSV in: data/transmission_costs/custom_transmission_cost_matrix.csv

  • Format: rows and columns must match your custom region names

  1. Use cloud_provider=’custom’ when initializing DatacenterClusterManager

Each row/col in the CSV must represent the cost per GB from origin -> destination.

utils.transmission_region_mapper.map_location_to_region(location_code, provider)[source]
utils.utils_cf.get_energy_variables(state)[source]

Obtain energy variables from the energy observation

Parameters:

state (List[float]) – agent_dc observation

Returns:

Subset of the agent_dc observation

Return type:

List[float]

utils.utils_cf.get_init_day(weather_file, start_month=0)[source]

Obtain the initial day of the year to start the episode.

Parameters:
  • weather_file (str) – Path to the weather JSON file.

  • start_month (int, optional) – Starting month (0=Jan, 11=Dec). Defaults to 0.

Returns:

Day of the year corresponding to the first day of the month.

Return type:

int

utils.utils_cf.obtain_paths(location)[source]

Obtain the correct name for the data files

Parameters:

location (string) – Location identifier

Raises:

ValueError – If location identifier is not defined

Returns:

Naming for the data files

Return type:

List[string]

utils.workload_utils.assign_task_origins(tasks, datacenter_configs, current_time_utc, logger=None)[source]

Assigns each task an origin datacenter (DC), based on: - Population weight of the DC - Local time activity (higher weight during 8am–8pm local time)

Parameters:
  • tasks (List[Task]) – List of Task objects to assign.

  • datacenter_configs (List[dict]) – DC configuration including weights and timezone offsets.

  • current_time_utc (datetime) – Current simulation time in UTC.

  • logger (logging.Logger or None) – Optional logger for debug output.

utils.workload_utils.extract_tasks_from_row(row, scale=1, datacenter_configs=None, current_time_utc=None, logger=None, task_scale=5, group_size=1)[source]

Convert a row from task_df into a list of Task objects, scaling the number of tasks if needed.

Parameters:
  • row (pd.Series) – A row from task_df containing ‘tasks_matrix’.

  • scale (int) – Scaling factor for task duplication.

  • datacenter_configs (List[dict]) – DC configurations for assigning task origins.

  • current_time_utc (datetime) – Current simulation time in UTC.

  • logger (logging.Logger or None) – Optional logger for debug statements.

  • group_size (int) – Number of consecutive tasks to group into one meta-task. Defaults to 1 (no grouping).

Returns:

A list of Task objects extracted and scaled from the row.

Return type:

List[Task]