Source code for utils.managers

import os
import json
import numpy as np
import pandas as pd
import psychrolib as psy

# Set the unit system for psychrolib
psy.SetUnitSystem(psy.SI)

[docs] class CoherentNoise: """Class to add coherent noise to the data. Args: base (List[float]): Base data weight (float): Weight of the noise to be added desired_std_dev (float, optional): Desired standard deviation. Defaults to 0.1. scale (int, optional): Scale. Defaults to 1. """ def __init__(self, base, weight, desired_std_dev=0.1, scale=1): """Initialize CoherentNoise class Args: base (List[float]): Base data weight (float): Weight of the noise to be added desired_std_dev (float, optional): Desired standard deviation. Defaults to 0.1. scale (int, optional): Scale. Defaults to 1. """ self.base = base self.weight = weight self.desired_std_dev = desired_std_dev self.scale = scale
[docs] def seed(self, seed=None): self.rng = np.random.default_rng(seed)
[docs] def generate(self, n_steps): """ Generate coherent noise Args: n_steps (int): Length of the data to generate. Returns: numpy.ndarray: Array of generated coherent noise. """ steps = self.rng.normal(loc=0, scale=self.scale, size=n_steps) random_walk = np.cumsum(self.weight * steps) normalized_noise = (random_walk / np.std(random_walk)) * self.desired_std_dev return self.base + normalized_noise
# Function to normalize a value v given a minimum and a maximum
[docs] def normalize(v, min_v, max_v): """Function to normalize values Args: v (float): Value to be normalized min_v (float): Lower limit max_v (float): Upper limit Returns: float: Normalized value """ return (v - min_v)/(max_v - min_v)
# Function to generate cosine and sine values for a given hour and day
[docs] def sc_obs(current_hour, current_day): """Generate sine and cosine of the hour and day Args: current_hour (int): Current hour of the day current_day (int): Current day of the year Returns: List[float]: Sine and cosine of the hour and day """ # Normalize and round the current hour and day two_pi = np.pi * 2 norm_hour = round(current_hour/24, 3) * two_pi norm_day = round((current_day)/365, 3) * two_pi # Calculate cosine and sine values for the current hour and day cos_hour = np.cos(norm_hour)*0.5 + 0.5 sin_hour = np.sin(norm_hour)*0.5 + 0.5 cos_day = np.cos(norm_day)*0.5 + 0.5 sin_day = np.sin(norm_day)*0.5 + 0.5 return [cos_hour, sin_hour, cos_day, sin_day]
[docs] class Time_Manager(): """ Class to manage the time dimension over an episode and handle termination based on simulation duration. Args: init_day (int, optional): Initial day of the year (0-364). Defaults to 0. timezone_shift (int, optional): Timezone shift in hours from UTC. Defaults to 0. duration_days (int, optional): Maximum duration of an episode in days. If None, the episode runs indefinitely (or until another termination condition is met). Defaults to None. """ def __init__(self, init_day=0, timezone_shift=0, duration_days=None): """Initialize the Time_Manager class.""" self.start_day_config = init_day # Store the configured initial day self.timezone_shift = timezone_shift self.duration_days = duration_days self.timestep_per_hour = 4 # 15-minute timesteps # Internal state variables initialized in reset() self.day = 0 self.hour = 0 self.current_timestep_in_year = 0 # Overall timestep within the year self.episode_step_counter = 0 self.max_episode_timesteps = None
[docs] def reset(self, init_day=None, init_hour=None, seed=None): """ Resets the time manager to a specific day and hour, and resets the episode step counter. Args: init_day (int, optional): Day to start from (0-364). Defaults to the value provided during initialization. init_hour (int, optional): Hour to start from (0-23). Defaults to 0, adjusted by timezone_shift if not specified otherwise. seed (int, optional): Random seed (not used directly here but kept for consistency). Returns: list: Sine and cosine features for the initial hour and day. """ if seed is not None: # We don't use rng here directly, but good practice if needed later pass # Use provided init day/hour or default to configured start day / 0 hour self.day = init_day if init_day is not None else self.start_day_config # Default to 0 hour unless specified, timezone_shift is not applied here anymore # The user should provide the desired starting hour directly. self.hour = init_hour if init_hour is not None else 0 # Ensure day and hour are within valid ranges self.day = int(self.day) % 365 # Wrap around year if needed self.hour = int(self.hour) % 24 # Calculate the absolute timestep within the year self.current_timestep_in_year = int(self.day * 24 * self.timestep_per_hour + self.hour * self.timestep_per_hour) # Reset episode duration tracking self.episode_step_counter = 0 if self.duration_days is not None: self.max_episode_timesteps = self.duration_days * 24 * self.timestep_per_hour else: self.max_episode_timesteps = None # Run indefinitely # Return initial time features return sc_obs(self.hour, self.day)
[docs] def step(self): """ Advances the time by one timestep (15 minutes) and checks for episode termination based on duration. Returns: int: Current day of the year (0-364). float: Current hour of the day (0.0 - 23.75). list: Sine and cosine features for the current hour and day. bool: Done flag (True if episode duration reached, False otherwise). """ # Increment counters self.current_timestep_in_year += 1 self.episode_step_counter += 1 # Advance time self.hour += 1.0 / self.timestep_per_hour if self.hour >= 24.0: self.hour = 0.0 self.day += 1 if self.day >= 365: # Wrap day around the year self.day = 0 # Also wrap the absolute timestep if needed, although less critical usually self.current_timestep_in_year = 0 # Check for termination based on duration done = False if self.max_episode_timesteps is not None: if self.episode_step_counter >= self.max_episode_timesteps: done = True # Return current day, hour, time features, and done flag return self.day, self.hour, sc_obs(self.hour, self.day), done
# Class to manage carbon intensity data
[docs] class CI_Manager(): """Manager of the carbon intensity data. Args: filename (str, optional): Filename of the carbon intensity data. Defaults to ''. location (str, optional): Location identifier. Defaults to 'NYIS'. init_day (int, optional): Initial day of the episode. Defaults to 0. future_steps (int, optional): Number of steps of the CI forecast. Defaults to 4. weight (float, optional): Weight value for coherent noise. Defaults to 0.1. desired_std_dev (float, optional): Desired standard deviation for coherent noise. Defaults to 5. timezone_shift (int, optional): Shift for the timezone. Defaults to 0. """ def __init__(self, location, simulation_year=2020, init_day=0, future_steps=4, weight=0.1, desired_std_dev=5, timezone_shift=0): """Initialize the CI_Manager class. Args: filename (str, optional): Filename of the carbon intensity data. Defaults to ''. location (str, optional): Location identifier. Defaults to 'NYIS'. init_day (int, optional): Initial day of the episode. Defaults to 0. future_steps (int, optional): Number of steps of the CI forecast. Defaults to 4. weight (float, optional): Weight value for coherent noise. Defaults to 0.1. desired_std_dev (float, optional): Desired standard deviation for coherent noise. Defaults to 5. timezone_shift (int, optional): Shift for the timezone. Defaults to 0. """ self.location = location self.simulation_year = simulation_year self.init_day = init_day self.future_steps = future_steps self.weight = weight self.desired_std_dev = desired_std_dev self.timezone_shift = timezone_shift self.timestep_per_hour = 4 self.time_steps_day = 24 * self.timestep_per_hour script_dir = os.path.dirname(os.path.abspath(__file__)) # Obtain the parent directory of the current script parent_dir = os.path.dirname(script_dir) filename = f"data/carbon_intensity/{location}/{simulation_year}/{location}_{simulation_year}_hourly.csv" # Join the parent directory with the filename filename = os.path.join(parent_dir, filename) if not os.path.exists(filename): raise FileNotFoundError(f"CI file not found: {filename}") df = pd.read_csv(filename) df["Datetime (UTC)"] = pd.to_datetime(df["Datetime (UTC)"]) df.set_index("Datetime (UTC)", inplace=True) if timezone_shift != 0: df.index = df.index + pd.Timedelta(hours=timezone_shift) if "Carbon Intensity gCO₂eq/kWh (direct)" not in df.columns: raise ValueError("Expected column missing: 'Carbon Intensity gCO₂eq/kWh (direct)'") self.carbon_data = df["Carbon Intensity gCO₂eq/kWh (direct)"].values # assert len(self.carbon_data) == 8760, "Expected 8760 hourly values." self._interpolate() self.coherent_noise = CoherentNoise(base=0, weight=weight, desired_std_dev=desired_std_dev) def _interpolate(self): x = range(len(self.carbon_data)) x_interp = np.linspace(0, len(self.carbon_data), len(self.carbon_data) * self.timestep_per_hour) smooth = np.interp(x_interp, x, self.carbon_data) self.carbon_smooth = np.roll(smooth, -1 * self.timezone_shift * self.timestep_per_hour) self.original_data = self.carbon_smooth.copy()
[docs] def reset(self, init_day=None, init_hour=None, seed=None): if seed is not None: self.rng = np.random.default_rng(seed) self.coherent_noise.seed(seed) day = init_day if init_day is not None else self.init_day hour = init_hour if init_hour is not None else 0 self.time_step = day * self.time_steps_day + hour * self.timestep_per_hour self.carbon_smooth = self.original_data max_30d = np.max(self.carbon_smooth[self.time_step:self.time_step + 30 * self.time_steps_day]) min_30d = np.min(self.carbon_smooth[self.time_step:self.time_step + 30 * self.time_steps_day]) self.norm_carbon = (self.carbon_smooth - min_30d) / (max_30d - min_30d + 1e-8) return self._get_state()
[docs] def step(self): self.time_step += 1 if self.time_step >= len(self.carbon_smooth): self.time_step = self.init_day * self.time_steps_day return self._get_state()
def _get_state(self): c = self.carbon_smooth[self.time_step] norm = self.norm_carbon[self.time_step] forecast = self.norm_carbon[self.time_step + 1:self.time_step + 1 + self.future_steps] return norm, forecast, c
[docs] def get_current_ci(self, norm=True): if norm: return self.norm_carbon[self.time_step] else: return self.carbon_smooth[self.time_step]
[docs] def get_forecast_ci(self): return self.norm_carbon[self.time_step + 1:self.time_step + 1 + self.future_steps]
[docs] def get_n_past_ci(self, n): return self.norm_carbon[max(0, self.time_step - n):self.time_step]
[docs] def load_weather_data(weather_file): """ Reads weather data from a JSON file and converts it into a Pandas DataFrame. Args: weather_file (str): Path to the weather JSON file. Returns: pd.DataFrame: DataFrame containing hourly weather data. """ with open(weather_file, 'r') as f: weather_data = json.load(f) # Extract timestamps and data variables hourly_data = weather_data["hourly"] # Convert to Pandas DataFrame df = pd.DataFrame({ "time": pd.to_datetime(hourly_data["time"]), "temperature_2m": hourly_data["temperature_2m"], "relative_humidity_2m": hourly_data["relative_humidity_2m"], "cloudcover": hourly_data["cloudcover"], "windspeed_10m": hourly_data["windspeed_10m"] }) return df
# Class to manage weather data
[docs] class Weather_Manager(): def __init__(self, location='US-NY-NYIS', simulation_year=2023, init_day=0, weight=0.02, desired_std_dev=0.75, timezone_shift=0, elevation=27.0, debug=False): self.location = location self.simulation_year = simulation_year self.init_day = init_day self.weight = weight self.desired_std_dev = desired_std_dev self.timezone_shift = timezone_shift self.elevation = elevation self.debug = debug self.timestep_per_hour = 4 self.time_steps_day = self.timestep_per_hour * 24 script_dir = os.path.dirname(os.path.abspath(__file__)) # Obtain the parent directory of the current script parent_dir = os.path.dirname(script_dir) filename = f"data/weather/{self.location}/{self.simulation_year}.json" # Join the parent directory with the filename filename = os.path.join(parent_dir, filename) # Load weather data if not os.path.exists(filename): raise FileNotFoundError(f"Weather data file not found: {filename}") self.weather_df = self._load_weather_data(filename) # Interpolate to simulation timestep (15 min) self._interpolate_weather_data() # Prepare noise and base arrays self.coherent_noise = CoherentNoise(base=0, weight=self.weight, desired_std_dev=self.desired_std_dev) self.original_temp_data = self.temperature_data.copy() self.original_wb_data = self.wet_bulb_data.copy() def _load_weather_data(self, file_path): with open(file_path, 'r') as f: raw = json.load(f) df = pd.DataFrame({ "time": pd.to_datetime(raw["hourly"]["time"]), "temperature_2m": raw["hourly"]["temperature_2m"], "relative_humidity_2m": raw["hourly"]["relative_humidity_2m"], "cloudcover": raw["hourly"]["cloudcover"], "windspeed_10m": raw["hourly"]["windspeed_10m"], }) # Apply timezone shift (shift in hours) df["time"] = df["time"] + pd.Timedelta(hours=self.timezone_shift) df = df.sort_values("time").reset_index(drop=True) return df def _interpolate_weather_data(self): x = np.arange(len(self.weather_df)) new_x = np.linspace(0, len(x), len(x) * self.timestep_per_hour) self.temperature_data = np.interp(new_x, x, self.weather_df["temperature_2m"]) self.humidity_data = np.interp(new_x, x, self.weather_df["relative_humidity_2m"]) self.cloudcover_data = np.interp(new_x, x, self.weather_df["cloudcover"]) self.windspeed_data = np.interp(new_x, x, self.weather_df["windspeed_10m"]) # Wet bulb estimation (placeholder): here you can use psychrolib if needed self.wet_bulb_data = self.temperature_data * 0.9 # Placeholder # Normalize temperature and wet bulb for 30-day local window self.min_temp, self.max_temp = 0, 45 self.min_wb_temp, self.max_wb_temp = 0, 45 self.norm_temp_data = normalize(self.temperature_data, self.min_temp, self.max_temp) self.norm_wet_bulb_data = normalize(self.wet_bulb_data, self.min_wb_temp, self.max_wb_temp) # Function to reset the time step and return the weather at the first time step
[docs] def reset(self, init_day=None, init_hour=None, seed=None): """Reset Weather_Manager to a specific initial day and hour. Args: init_day (int, optional): Day to start from. If None, defaults to the initial day set during initialization. init_hour (int, optional): Hour to start from. If None, defaults to 0. Returns: tuple: Temperature at current step, normalized temperature at current step, wet bulb temperature at current step, normalized wet bulb temperature at current step. """ self.time_step = (init_day if init_day is not None else self.init_day) * self.time_steps_day + (init_hour if init_hour is not None else 0) * self.timestep_per_hour if not self.debug: # Add noise to the temperature data using the CoherentNoise rng = np.random.default_rng(seed) self.coherent_noise.seed(seed) coh_noise = self.coherent_noise.generate(len(self.original_temp_data)) # print(f'TODO: check the generated coherent noise: {coh_noise[:3]} and the original temperature data: {self.original_temp_data[:3]} and the wet bulb data: {self.original_wb_data[:3]}' ) self.temperature_data = self.original_temp_data + coh_noise self.wet_bulb_data = self.original_wb_data + coh_noise num_roll_days = rng.integers(0, 14) # Random roll the temperature some days. self.temperature_data = np.roll(self.temperature_data, num_roll_days*self.timestep_per_hour*24) self.wet_bulb_data = np.roll(self.wet_bulb_data, num_roll_days*self.timestep_per_hour*24) self.temperature_data = np.clip(self.temperature_data, self.min_temp, self.max_temp) max_30_days = np.max(self.temperature_data[self.time_step:30*self.time_steps_day + self.time_step]) min_30_days = np.min(self.temperature_data[self.time_step:30*self.time_steps_day + self.time_step]) self.norm_temp_data = (self.temperature_data - min_30_days) / (max_30_days - min_30_days) self.wet_bulb_data = np.clip(self.wet_bulb_data, self.min_wb_temp, self.max_wb_temp) max_30_days = np.max(self.wet_bulb_data[self.time_step:30*self.time_steps_day + self.time_step]) min_30_days = np.min(self.wet_bulb_data[self.time_step:30*self.time_steps_day + self.time_step]) self.norm_wet_bulb_data = self.wet_bulb_data else: # Use a fixed temperature for debugging and wet bulb temperature self.temperature_data = np.ones_like(self.temperature_data) * 30 self.norm_temp_data = np.ones_like(self.norm_temp_data) * 0.5 self.wet_bulb_data = np.ones_like(self.wet_bulb_data) * 25 self._current_temp = self.temperature_data[self.time_step] self._next_temp = self.temperature_data[self.time_step + 1] self._current_norm_temp = self.norm_temp_data[self.time_step] self._next_norm_temp = self.norm_temp_data[self.time_step + 1] self._current_wet_bulb = self.wet_bulb_data[self.time_step] self._current_norm_wet_bulb = self.norm_wet_bulb_data[self.time_step] return self._current_temp, self._current_norm_temp, self._current_wet_bulb, self._current_norm_wet_bulb
# Function to advance the time step and return the weather at the new time step
[docs] def step(self): """Step on the Weather_Manager Returns: float: Temperature a current step float: Normalized temperature a current step """ self.time_step += 1 # If it tries to read further, restart from the initial index if self.time_step >= len(self.temperature_data): self.time_step = self.init_day*self.time_steps_day self._current_temp = self.temperature_data[self.time_step] self._next_temp = self.temperature_data[self.time_step + 1] self._current_norm_temp = self.norm_temp_data[self.time_step] self._next_norm_temp = self.norm_temp_data[self.time_step + 1] self._current_wet_bulb = self.wet_bulb_data[self.time_step] self._current_norm_wet_bulb = self.norm_wet_bulb_data[self.time_step] return self._current_temp, self._current_norm_temp, self._current_wet_bulb, self._current_norm_wet_bulb
[docs] def get_current_temperature(self): return self._current_norm_temp
[docs] def get_next_temperature(self): return self._next_norm_temp
[docs] def get_n_next_temperature(self, n): return self.norm_temp_data[self.time_step+1:self.time_step+1+n]
[docs] def get_current_wet_bulb(self): return self._current_wet_bulb
[docs] class ElectricityPrice_Manager: def __init__(self, location, simulation_year, timezone_shift=0): self.location = location self.simulation_year = simulation_year self.timezone_shift = timezone_shift self.prices = None self.index = 0 self._load_data() def _load_data(self): script_dir = os.path.dirname(os.path.abspath(__file__)) # Obtain the parent directory of the current script parent_dir = os.path.dirname(script_dir) filename = f"data/electricity_prices/standardized/{self.location}/{self.simulation_year}/{self.location}_electricity_prices_{self.simulation_year}.csv" # Join the parent directory with the filename filename = os.path.join(parent_dir, filename) # print(f"Loading electricity prices from {file_path}") df = pd.read_csv(filename) # Convert UTC timestamp to datetime and sort to ensure order df["Datetime (UTC)"] = pd.to_datetime(df["Datetime (UTC)"]) # df = df.sort_values("Datetime (UTC)") # Store as numpy array of prices self.original_prices = df["Price (USD/MWh)"].values # Remove outliers using IQR Q1 = np.percentile(self.original_prices, 15) Q3 = np.percentile(self.original_prices, 85) IQR = Q3 - Q1 lower = Q1 - 1.5 * IQR upper = Q3 + 1.5 * IQR self.original_prices = np.clip(self.original_prices, lower, upper) # There should be 8760 entries (or 8784 in leap year) assert len(self.original_prices) in [8760, 8784], f"Unexpected number of rows: {len(self.original_prices)}" self._interpolate_prices() def _interpolate_prices(self): self.timestep_per_hour = 4 # 15 min steps x = np.arange(len(self.original_prices)) new_x = np.linspace(0, len(x) - 1, len(x) * self.timestep_per_hour) self.prices = np.interp(new_x, x, self.original_prices)
[docs] def reset(self, init_day, init_hour, seed=None): if seed is not None: np.random.seed(seed) self.index = init_day * 24 * self.timestep_per_hour + init_hour * self.timestep_per_hour return self.get_current_price()
[docs] def get_current_price(self): return self.prices[self.index]
[docs] def get_future_prices(self, n=1): end = min(self.index + n, len(self.prices)) return self.prices[self.index:end]
[docs] def get_past_prices(self, n=1): start = max(self.index - n, 0) return self.prices[start:self.index]
[docs] def step(self): self.index = (self.index + 1) % len(self.prices) return self.get_current_price()