Source code for utils.managers

import os
import numpy as np
import pandas as pd
import psychrolib as psy

file_path = os.path.abspath(__file__)
PATH = os.path.split(os.path.dirname(file_path))[0]

# Set the unit system for psychrolib
psy.SetUnitSystem(psy.SI)

[docs] class CoherentNoise: """Class to add coherent noise to the data. Args: base (List[float]): Base data weight (float): Weight of the noise to be added desired_std_dev (float, optional): Desired standard deviation. Defaults to 0.1. scale (int, optional): Scale. Defaults to 1. """ def __init__(self, base, weight, desired_std_dev=0.1, scale=1): """Initialize CoherentNoise class Args: base (List[float]): Base data weight (float): Weight of the noise to be added desired_std_dev (float, optional): Desired standard deviation. Defaults to 0.1. scale (int, optional): Scale. Defaults to 1. """ self.base = base self.weight = weight self.desired_std_dev = desired_std_dev self.scale = scale
[docs] def generate(self, n_steps): """ Generate coherent noise Args: n_steps (int): Length of the data to generate. Returns: numpy.ndarray: Array of generated coherent noise. """ steps = np.random.normal(loc=0, scale=self.scale, size=n_steps) random_walk = np.cumsum(self.weight * steps) random_walk_scaled = self.base + (random_walk / np.std(random_walk)) * self.desired_std_dev return random_walk_scaled
# Function to normalize a value v given a minimum and a maximum
[docs] def normalize(v, min_v, max_v): """Function to normalize values Args: v (float): Value to be normalized min_v (float): Lower limit max_v (float): Upper limit Returns: float: Normalized value """ return (v - min_v)/(max_v - min_v)
[docs] def standarize(v): """Function to standarize a list of values Args: v (float): Values to be normalized Returns: float: Normalized values """ return (v - np.mean(v))/np.std(v)
# Function to generate cosine and sine values for a given hour and day
[docs] def sc_obs(current_hour, current_day): """Generate sine and cosine of the hour and day Args: current_hour (int): Current hour of the day current_day (int): Current day of the year Returns: List[float]: Sine and cosine of the hour and day """ # Normalize and round the current hour and day two_pi = np.pi * 2 norm_hour = round(current_hour/24, 3) * two_pi norm_day = round((current_day)/365, 3) * two_pi # Calculate cosine and sine values for the current hour and day cos_hour = np.cos(norm_hour)*0.5 + 0.5 sin_hour = np.sin(norm_hour)*0.5 + 0.5 cos_day = np.cos(norm_day)*0.5 + 0.5 sin_day = np.sin(norm_day)*0.5 + 0.5 return [cos_hour, sin_hour, cos_day, sin_day]
[docs] class Time_Manager(): """Class to manage the time dimenssion over an episode Args: init_day (int, optional): Day to start from. Defaults to 0. days_per_episode (int, optional): Number of days that an episode would last. Defaults to 30. """ def __init__(self, init_day=0, days_per_episode=30, timezone_shift=0): """Class to manage the time dimenssion over an episode Args: init_day (int, optional): Day to start from. Defaults to 0. days_per_episode (int, optional): Number of days that an episode would last. Defaults to 30. """ self.init_day = init_day self.timestep_per_hour = 4 self.days_per_episode = days_per_episode self.timezone_shift = timezone_shift
[docs] def reset(self): """Reset time manager to initial day Returns: List[float]: Hour and day in sine and cosine form """ self.day = self.init_day self.hour = self.timezone_shift return sc_obs(self.hour, self.day)
[docs] def step(self): """Step function for the time maneger Returns: List[float]: Current hour and day in sine and cosine form. bool: Signal if the episode has reach the end. """ if self.hour >= 24: self.hour = 0 self.day += 1 self.hour += 1/self.timestep_per_hour return self.day, self.hour, sc_obs(self.hour, self.day), self.isterminal()
[docs] def isterminal(self): """Function to identify terminal state Returns: bool: Signals if a state is terminal or not """ done = False if self.day > self.init_day+self.days_per_episode - 1: done = True return done
# Class to manage CPU workload data
[docs] class Workload_Manager(): def __init__(self, workload_filename='', init_day=0, future_steps=4, weight=0.01, desired_std_dev=0.025, timezone_shift=0): """Manager of the DC workload Args: workload_filename (str, optional): Filename of the CPU data. Defaults to ''. Should be a .csv file containing the CPU hourly normalized workload data between 0 and 1. Should contains 'cpu_load' column. init_day (int, optional): Initial day of the episode. Defaults to 0. future_steps (int, optional): Number of steps of the workload forecast. Defaults to 4. weight (float, optional): Weight value for coherent noise. Defaults to 0.001. desired_std_dev (float, optional): Desired standard deviation for coherent noise. Defaults to 0.025. flexible_workload_ratio (float, optional): Ratio of the flexible workload amount. Defaults to 0.1. """ # Load CPU data from a CSV file # One year data=24*365=8760 if workload_filename == '': cpu_data_list = pd.read_csv(PATH+'/data/Workload/Alibaba_CPU_Data_Hourly_1.csv')['cpu_load'].values[:8760] else: cpu_data_list = pd.read_csv(PATH+f'/data/Workload/{workload_filename}')['cpu_load'].values[:8760] assert len(cpu_data_list) == 8760, "The number of data points in the workload data is not one year data=24*365=8760." cpu_data_list = cpu_data_list.astype(float) self.time_step = 0 self.future_steps = future_steps self.timestep_per_hour = 4 self.time_steps_day = self.timestep_per_hour*24 self.init_day = init_day self.timezone_shift = timezone_shift # Interpolate the CPU data to increase the number of data points x = range(0, len(cpu_data_list)) xcpu_new = np.linspace(0, len(cpu_data_list), len(cpu_data_list)*self.timestep_per_hour) self.cpu_smooth = np.interp(xcpu_new, x, cpu_data_list) # Shift the data to match the timezone shift self.cpu_smooth = np.roll(self.cpu_smooth, -1*self.timezone_shift*self.timestep_per_hour) # Save a copy of the original data self.original_data = self.cpu_smooth.copy() # Initialize CoherentNoise process self.coherent_noise = CoherentNoise(base=self.original_data[0], weight=weight, desired_std_dev=desired_std_dev) # Function to return all workload data
[docs] def get_total_wkl(self): """Get current workload Returns: List[float]: CPU data """ return np.array(self.cpu_smooth[self.time_step:])
[docs] def scale_array(self, arr): """ Scales the input array so that approximately 90% of its values fall within the range of 0.2 to 0.8, based on the 5th and 95th percentiles. Parameters: arr (np.array): The input numpy array to be scaled. Returns: np.array: The scaled numpy array. """ # Calculate the 5th and 95th percentiles of the array p5 = np.percentile(arr, 5) p95 = np.percentile(arr, 95) # Scale the array based on the percentiles, without clipping # This ensures values outside the 5th to 95th percentile range naturally # fall outside the 0.2 to 0.8 range. scaled_arr = 0.2 + ((arr - p5) * (0.8 - 0.2) / (p95 - p5)) # Clip values to be within 0 to 1 scaled_arr = np.clip(scaled_arr, 0, 1) return scaled_arr
# Function to reset the time step and return the workload at the first time step
[docs] def reset(self): """Reset Workload_Manager Returns: float: CPU workload at current time step float: Amount of daily flexible workload """ self.time_step = self.init_day*self.time_steps_day self.init_time_step = self.time_step baseline = np.random.random()*0.5 - 0.25 # Add noise to the workload data using the CoherentNoise cpu_data = self.original_data * np.random.uniform(0.9, 1.1, len(self.original_data)) cpu_smooth = cpu_data * 0.7 + self.coherent_noise.generate(len(cpu_data)) * 0.3 + baseline self.cpu_smooth = self.scale_array(cpu_smooth) num_roll_weeks = np.random.randint(0, 52) # Random roll the workload because is independed on the month, so I am rolling across weeks (52 weeks in a year) self.cpu_smooth = np.roll(self.cpu_smooth, num_roll_weeks*self.timestep_per_hour*24*7) return self.cpu_smooth[self.time_step]
# Function to advance the time step and return the workload at the new time step
[docs] def step(self): """Step function for the Workload_Manager Returns: float: CPU workload at current time step float: Amount of daily flexible workload """ self.time_step += 1 # If it tries to read further, restart from the inital day if self.time_step - 1 >= len(self.cpu_smooth): self.time_step = self.init_time_step # assert self.time_step < len(self.cpu_smooth), f'Episode length: {self.time_step} is longer than the provide cpu_smooth: {len(self.cpu_smooth)}' return self.cpu_smooth[max(self.time_step - 1,0)] # to avoid logical error
[docs] def get_current_workload(self): return self.cpu_smooth[self.time_step]
[docs] def get_n_step_future_workload(self,n): if (self.time_step + n) >= len(self.cpu_smooth): return self.cpu_smooth[self.init_time_step + (self.time_step + (n-1) - len(self.cpu_smooth))] else: return self.cpu_smooth[self.time_step + n]
[docs] def set_n_step_future_workload(self,n,workload): if (self.time_step + n) >= len(self.cpu_smooth): self.cpu_smooth[self.init_time_step + (self.time_step + (n-1) - len(self.cpu_smooth))] = workload else: self.cpu_smooth[self.time_step + n] = workload
[docs] def set_current_workload(self, workload): self.cpu_smooth[self.time_step] = workload
# Class to manage carbon intensity data
[docs] class CI_Manager(): """Manager of the carbon intensity data Args: filename (str, optional): Filename of the CPU data. Defaults to ' '. location (str, optional): Location identifier. Defaults to 'NYIS'. init_day (int, optional): Initial day of the episode. Defaults to 0. future_steps (int, optional): Number of steps of the CI forecast. Defaults to 4. weight (float, optional): Weight value for coherent noise. Defaults to 0.001. desired_std_dev (float, optional): Desired standard deviation for coherent noise. Defaults to 0.025. """ def __init__(self, filename='', location='NYIS', init_day=0, future_steps=4, weight=0.1, desired_std_dev=5, timezone_shift=0): """Manager of the carbon intesity data Args: filename (str, optional): Filename of the CPU data. Defaults to ''. location (str, optional): Location identifier. Defaults to 'NYIS'. init_day (int, optional): Initial day of the episode. Defaults to 0. future_steps (int, optional): Number of steps of the CI forecast. Defaults to 4. weight (float, optional): Weight value for coherent noise. Defaults to 0.001. desired_std_dev (float, optional): Desired standard deviation for coherent noise. Defaults to 0.025. """ # Load carbon intensity data from a CSV file # One year data=24*365=8760 if not location == '': carbon_data_list = pd.read_csv(PATH+f"/data/CarbonIntensity/{location}_NG_&_avgCI.csv")['avg_CI'].values[:8760] else: carbon_data_list = pd.read_csv(PATH+f"/data/CarbonIntensity/{filename}")['avg_CI'].values[:8760] assert len(carbon_data_list) == 8760, "The number of data points in the carbon intensity data is not one year data=24*365=8760." carbon_data_list = carbon_data_list.astype(float) self.init_day = init_day self.timezone_shift = timezone_shift self.timestep_per_hour = 4 self.time_steps_day = self.timestep_per_hour*24 # Handle nan values just in case. Replace with average value if np.isnan(carbon_data_list).any(): avg_value = np.nanmean(carbon_data_list) carbon_data_list = np.nan_to_num(carbon_data_list, nan=avg_value) x = range(0, len(carbon_data_list)) xcarbon_new = np.linspace(0, len(carbon_data_list), len(carbon_data_list)*self.timestep_per_hour) # Interpolate the carbon data to increase the number of data points self.carbon_smooth = np.interp(xcarbon_new, x, carbon_data_list) # Shift the data to match the timezone shift self.carbon_smooth = np.roll(self.carbon_smooth, -1*self.timezone_shift*self.timestep_per_hour) # Save a copy of the original data self.original_data = self.carbon_smooth.copy() self.time_step = 0 # Initialize CoherentNoise process self.coherent_noise = CoherentNoise(base=self.original_data[0], weight=weight, desired_std_dev=desired_std_dev) self.future_steps = future_steps # Function to return all carbon intensity data
[docs] def get_total_ci(self): """Function to obtain the total carbon intensity Returns: List[float]: Total carbon intesity """ return self.carbon_smooth[self.time_step:]
[docs] def reset(self): """Reset CI_Manager Returns: float: Carbon intensity at current time step float: Normalized carbon intensity at current time step and it's forecast """ self.time_step = self.init_day*self.time_steps_day # Add noise to the carbon data using the CoherentNoise self.carbon_smooth = self.original_data + self.coherent_noise.generate(len(self.original_data)) self.carbon_smooth = np.clip(self.carbon_smooth, 0, None) num_roll_days = np.random.randint(0, 14) # Random roll the workload some days. I can roll the carbon intensity up to 14 days. self.carbon_smooth = np.roll(self.carbon_smooth, num_roll_days*self.timestep_per_hour*24) self.min_ci = min(self.carbon_smooth) self.max_ci = max(self.carbon_smooth) self.norm_carbon = normalize(self.carbon_smooth, self.min_ci, self.max_ci) # self.norm_carbon = standarize(self.carbon_smooth) # self.norm_carbon = (np.clip(self.norm_carbon, -1, 1) + 1) * 0.5 return self.carbon_smooth[self.time_step], self.norm_carbon[self.time_step:self.time_step+self.future_steps]
# Function to advance the time step and return the carbon intensity at the new time step
[docs] def step(self): """Step CI_Manager Returns: float: Carbon intensity at current time step float: Normalized carbon intensity at current time step and it's forecast """ self.time_step +=1 # If it tries to read further, restart from the initial index if self.time_step - 1 >= len(self.carbon_smooth): self.time_step = self.init_day*self.time_steps_day # assert self.time_step < len(self.carbon_smooth), 'Eposide length is longer than the provide CI_data' if self.time_step - 1 + self.future_steps > len(self.carbon_smooth): data = self.norm_carbon[self.time_step-1]*np.ones(shape=(self.future_steps)) else: data = self.norm_carbon[(self.time_step-1):self.time_step-1+self.future_steps] return self.carbon_smooth[self.time_step-1], data
[docs] def get_current_ci(self): # Normalize the carbon_smooth with min=250 and max=870 min_ci = 600 max_ci = 900 return (self.carbon_smooth[self.time_step] - min_ci)/(max_ci - min_ci)
# return self.carbon_smooth[self.time_step] # return self.norm_carbon[self.time_step]
[docs] def get_forecast_ci(self, steps=4): if self.time_step + steps > len(self.carbon_smooth): data = self.norm_carbon[self.time_step]*np.ones(shape=(steps)) else: data = self.norm_carbon[self.time_step:self.time_step+steps] return data
# Class to manage weather data # Where to obtain other weather files: # https://climate.onebuilding.org/
[docs] class Weather_Manager(): """Manager of the weather data. Where to obtain other weather files: https://climate.onebuilding.org/ Args: filename (str, optional): Filename of the weather data. Defaults to ''. location (str, optional): Location identifier. Defaults to 'NY'. init_day (int, optional): Initial day of the year. Defaults to 0. weight (float, optional): Weight value for coherent noise. Defaults to 0.001. desired_std_dev (float, optional): Desired standard deviation for coherent noise. Defaults to 0.025. temp_column (int, optional): Columng that contains the temperature data. Defaults to 6. """ def __init__(self, filename='', location='NY', init_day=0, weight=0.02, desired_std_dev=0.75, temp_column=6, rh_column=8, pres_column=9, timezone_shift=0): """Manager of the weather data. Args: filename (str, optional): Filename of the weather data. Defaults to ''. location (str, optional): Location identifier. Defaults to 'NY'. init_day (int, optional): Initial day of the year. Defaults to 0. weight (float, optional): Weight value for coherent noise. Defaults to 0.001. desired_std_dev (float, optional): Desired standard deviation for coherent noise. Defaults to 0.025. temp_column (int, optional): Columng that contains the temperature data. Defaults to 6. """ # Load weather data from a CSV file if not location == '': weather_data = pd.read_csv(PATH+f'/data/Weather/{location}', skiprows=8, header=None).values else: weather_data = pd.read_csv(PATH+f'/data/Weather/{filename}', skiprows=8, header=None).values temperature_data = weather_data[:,temp_column].astype(float) relative_humidity_data = weather_data[:,rh_column].astype(float) # Added for relative humidity pressure_data = weather_data[:,pres_column].astype(float) # Added for atmospheric pressure self.wet_bulb_data = [psy.GetTWetBulbFromRelHum(t, rh / 100, p) for t, rh, p in zip(temperature_data, relative_humidity_data, pressure_data)] # Normalize wet bulb temperature data self.min_wb_temp = 0 self.max_wb_temp = 40 self.init_day = init_day # One year data=24*365=8760 x = range(0, len(temperature_data)) self.timestep_per_hour = 4 xtemperature_new = np.linspace(0, len(temperature_data), len(temperature_data)*self.timestep_per_hour ) self.min_temp = 0 self.max_temp = 40 # Interpolate the data to increase the number of data points self.wet_bulb_data = np.interp(xtemperature_new, x, self.wet_bulb_data) self.norm_wet_bulb_data = normalize(self.wet_bulb_data, self.min_wb_temp, self.max_wb_temp) self.temperature_data = np.interp(xtemperature_new, x, temperature_data) self.norm_temp_data = normalize(self.temperature_data, self.min_temp, self.max_temp) self.time_step = 0 self.timezone_shift = timezone_shift # Shift the data to match the timezone shift self.temperature_data = np.roll(self.temperature_data, -1*self.timezone_shift*self.timestep_per_hour) self.wet_bulb_data = np.roll(self.wet_bulb_data, -1*self.timezone_shift*self.timestep_per_hour) # Save a copy of the original data self.original_temp_data = self.temperature_data.copy() self.original_wb_data = self.wet_bulb_data.copy() # Initialize CoherentNoise process self.coherent_noise = CoherentNoise(base=0, weight=weight, desired_std_dev=desired_std_dev) self.time_steps_day = self.timestep_per_hour*24 # Function to return all weather data
[docs] def get_total_weather(self): """Obtain the weather data in a List form Returns: List[form]: Total temperature data """ return self.temperature_data[self.time_step:]
# Function to reset the time step and return the weather at the first time step
[docs] def reset(self): """Reset Weather_Manager Returns: float: Temperature a current step float: Normalized temperature a current step """ self.time_step = self.init_day*self.time_steps_day # Add noise to the temperature data using the CoherentNoise coh_noise = self.coherent_noise.generate(len(self.original_temp_data)) self.temperature_data = self.original_temp_data + coh_noise self.wet_bulb_data = self.original_wb_data + coh_noise num_roll_days = np.random.randint(0, 14) # Random roll the workload some days. I can roll the carbon intensity up to 14 days. self.temperature_data = np.roll(self.temperature_data, num_roll_days*self.timestep_per_hour*24) self.wet_bulb_data = np.roll(self.wet_bulb_data, num_roll_days*self.timestep_per_hour*24) self.temperature_data = np.clip(self.temperature_data, self.min_temp, self.max_temp) self.norm_temp_data = normalize(self.temperature_data, self.min_temp, self.max_temp) self.wet_bulb_data = np.clip(self.wet_bulb_data, self.min_wb_temp, self.max_wb_temp) self.norm_wet_bulb_data = normalize(self.wet_bulb_data, self.min_wb_temp, self.max_wb_temp) # return self.temperature_data[self.time_step], self.norm_temp_data[self.time_step] return (self.temperature_data[self.time_step], self.norm_temp_data[self.time_step], self.wet_bulb_data[self.time_step], self.norm_wet_bulb_data[self.time_step]) # Added wet bulb temp
# Function to advance the time step and return the weather at the new time step
[docs] def step(self): """Step on the Weather_Manager Returns: float: Temperature a current step float: Normalized temperature a current step """ self.time_step += 1 # If it tries to read further, restart from the initial index if self.time_step - 1 >= len(self.temperature_data): self.time_step = self.init_day*self.time_steps_day # assert self.time_step < len(self.temperature_data), 'Episode length is longer than the provide Temperature_data' # return self.temperature_data[self.time_step], self.norm_temp_data[self.time_step] return (self.temperature_data[self.time_step - 1], self.norm_temp_data[self.time_step - 1], self.wet_bulb_data[self.time_step - 1], self.norm_wet_bulb_data[self.time_step - 1]) # Added wet bulb temp
[docs] def get_current_weather(self): # return self.temperature_data[self.time_step] return self.norm_temp_data[self.time_step]
[docs] class GeoLag_Workload_Manager(Workload_Manager): def __init__(self, workload_filename='', init_day=0, future_steps=4, weight=0.01, desired_std_dev=0.025, timezone_shift=0, sustained_duration : int = 4): super().__init__(workload_filename, init_day, future_steps, weight, desired_std_dev, timezone_shift) self.N = sustained_duration # period over which a non-base workload is executed in steps self.l_nonbase = 0 self.s_or_r = 0
[docs] def get_current_workload(self): self.s_or_r = super().get_current_workload() + self.l_nonbase/self.N return self.s_or_r
[docs] def set_current_workload(self, workload): # unlike previous workload manager class, # advance time here # we only update the non-base workload component if this dc is receiving load # we only update the smoothed workload if this dc is sending load self.time_step += 1 # If it tries to read further, restart from the inital day if self.time_step >= len(self.cpu_smooth): self.time_step = self.init_time_step self.l_nonbase = (1-1/self.N) * self.l_nonbase if workload > 0: self.l_nonbase += (workload >= self.s_or_r) * (workload - self.s_or_r) self.cpu_smooth[self.time_step] -= (workload < self.s_or_r) * (self.s_or_r - workload)
# Function to to provide the load to be serviced by the data center
[docs] def step(self): """Step function for the Workload_Manager Returns: float: CPU workload at current time step """ # assert self.time_step < len(self.cpu_smooth), f'Episode length: {self.time_step} is longer than the provide cpu_smooth: {len(self.cpu_smooth)}' return self.cpu_smooth[self.time_step] + self.l_nonbase/self.N