import gymnasium as gym
from gymnasium import spaces
import numpy as np
import math
from collections import deque
[docs]
class CarbonLoadEnv(gym.Env):
def __init__(
self,
env_config = {},
future=True,
n_vars_ci=4,
flexible_workload_ratio=0.2,
n_vars_energy=0,
n_vars_battery=1,
test_mode=False,
queue_max_len=500,
initialize_queue_at_reset=False
):
"""Creates load shifting envrionemnt
Args:
env_config (dict, optional): Customizable environment confing. Defaults to {}.
future (bool, optional): To include CI forecast to the observation. Defaults to True.
future_steps (int, optional): Number of time steps in the future in the forecast. Defaults to 4.
flexible_workload_ratio (float, optional): Percentage of flexible workload. Defaults to 0.1.
n_vars_energy (int, optional): Additional number of energy variables. Defaults to 0.
n_vars_battery (int, optional): Additional number of variables from the battery. Defaults to 1.
test_mode (bool, optional): Used for evaluation of the model. Defaults to False.
"""
assert flexible_workload_ratio < 0.9, "flexible_workload_ratio should be lower than 0.9"
self.flexible_workload_ratio = flexible_workload_ratio
self.shiftable_tasks_percentage = self.flexible_workload_ratio
self.non_shiftable_tasks_percentage = 1 - self.flexible_workload_ratio
# Define a discrete action space with 3 actions:
# 0: Defer all shiftable tasks, 1: Do nothing, 2: Process all deferred tasks
self.action_space = spaces.Discrete(3)
# State: [Sin(h), Cos(h), Sin(day_of_year), Cos(day_of_year), self.ls_state, ci_i_future (n_vars_ci), var_to_LS_energy (n_vars_energy), batSoC (n_vars_battery)],
# self.ls_state = [current_workload, queue status]
self.observation_space = spaces.Box(low=-2.0, high=2.0, shape=(26,), dtype=np.float32)
self.global_total_steps = 0
self.test_mode = test_mode
self.time_steps_day = 96
# self.load_to_assign = 3 * flexible_workload_ratio
# self.day_workload = 0
self.workload = 0
# Initialize the queue to manage individual delayed tasks
# self.tasks_queue = [] # A list to hold individual tasks
self.queue_max_len = queue_max_len
self.tasks_queue = deque(maxlen=self.queue_max_len)
self.initialize_queue_at_reset = False
# Calculate task age histogram
[docs]
def get_task_age_histogram(self, tasks_queue, current_day, current_hour):
age_bins = [0, 6, 12, 18, 24, np.inf] # Age bins in hours
task_ages = [
(current_day - task['day']) * 24 + (current_hour - task['hour'])
for task in self.tasks_queue
]
histogram, _ = np.histogram(task_ages, bins=age_bins)
normalized_histogram = histogram / max(len(self.tasks_queue), 1) # Avoid division by zero
normalized_histogram[-1] = 1 if normalized_histogram[-1] > 0 else 0
return normalized_histogram # Returns an array of proportions
[docs]
def reset(self, *, seed=None, options=None):
"""
Reset `CarbonLoadEnv` to initial state.
Returns:
observations (List[float]): Current state of the environmment
info (dict): A dictionary that containing additional information about the environment state
"""
self.global_total_steps = 0
self.tasks_queue.clear()
if self.initialize_queue_at_reset:
# Initialize the task queue with tasks of varying ages
initial_queue_length = np.random.randint(1, self.queue_max_len // 4)
# Generate random task ages between 0 and 24 hours
max_task_age = 24 # Maximum age in hours
task_ages = np.random.random_integers(0, max_task_age*4, initial_queue_length)/4
# Generate task ages using an exponential distribution
max_task_age = 24 # Maximum age in hours
# Set the rate parameter (lambda) for the exponential distribution
lambda_param = 1.0 / 4.0 # Mean age of 6 hours (adjust as needed)
task_ages = np.round(np.random.exponential(scale=1.0 / lambda_param, size=initial_queue_length) * 4)/4
# Cap the task ages at max_task_age
task_ages = np.clip(task_ages, 0, max_task_age)
# Sort the task ages in descending order
task_ages = np.sort(task_ages)[::-1]
for age in task_ages:
# Compute the day and hour when the task was added
task_day = self.current_day
task_hour = self.current_hour - age
# Adjust day and hour if task_hour is negative
while task_hour < 0:
task_hour += 24
task_day -= 1 # Task was added on a previous day
# Ensure task_day is non-negative
if task_day < 0:
task_day = 0
task_hour = 0 # Reset to the earliest possible time
# Create the task with its timestamp
task = {'day': task_day, 'hour': task_hour, 'utilization': 1}
self.tasks_queue.append(task)
else:
# Start with an empty task queue
pass
# Calculate queue_length, oldest_task_age, average_task_age
tasks_in_queue = len(self.tasks_queue)
if tasks_in_queue > 0:
task_ages = [
(self.current_day - task['day']) * 24 + (self.current_hour - task['hour'])
for task in self.tasks_queue
]
oldest_task_age = max(task_ages)
average_task_age = sum(task_ages) / len(task_ages)
else:
oldest_task_age = 0.0
average_task_age = 0.0
task_age_histogram = self.get_task_age_histogram(self.tasks_queue, self.current_day, self.current_hour)
# Compute state
current_workload = self.workload # Ensure self.workload is set appropriately
state = np.asarray(np.hstack(([current_workload,
tasks_in_queue/self.queue_max_len,
oldest_task_age/24,
average_task_age/24,
task_age_histogram])), dtype=np.float32)
info = {"ls_original_workload": self.workload,
"ls_shifted_workload": self.workload,
"action": 0,
"info_load_left": 0,
'ls_queue_max_len': self.queue_max_len,
"ls_tasks_dropped": 0,
"ls_tasks_in_queue": 0,
"ls_norm_tasks_in_queue": 0,
'ls_tasks_processed': 0,
'ls_enforced': False,
'ls_oldest_task_age': oldest_task_age,
'ls_average_task_age': average_task_age,
'ls_overdue_penalty': 0,
'ls_task_age_histogram': task_age_histogram,}
return state, info
[docs]
def step(self, action):
"""
Makes an environment step in `CarbonLoadEnv`.
Args:
action (float): Continuous action between -1 and 1.
-1: Defer all shiftable tasks.
1: Process tasks from the DTQ to maximize utilization.
Values between -1 and 0 defer a fraction of tasks, and values between 0 and 1 process a fraction of tasks in the DTQ.
Returns:
state (List[float]): Current state of the environment.
reward (float): Reward value.
done (bool): A boolean signaling if the episode has ended.
info (dict): A dictionary containing additional information about the environment state.
"""
# self.current_hour += 0.25
enforced = 0
# One task is equivalent to 1% of the workload
# Workload is a float between 0 and 1
non_shiftable_tasks = int(math.ceil(self.workload * self.non_shiftable_tasks_percentage * 100))
shiftable_tasks = int(math.floor(self.workload * self.shiftable_tasks_percentage * 100))
tasks_dropped = 0 # Track the number of dropped tasks
actual_tasks_to_process = 0 # Track the number of processed tasks
# action_value = -1.0
# print(f'Action value: {action_value}')
# Step 1: Identify tasks older than 24 hours
# overdue_tasks = []
# for task in self.tasks_queue:
# # Calculate task age based on both day and hour
# task_age = (self.current_day - task['day']) * 24 + (self.current_hour - task['hour'])
# if task_age > 24:
# overdue_tasks.append(task)
# Retrieve overdue tasks
overdue_tasks = [task for task in self.tasks_queue if (self.current_day - task['day']) * 24 + (self.current_hour - task['hour']) > 24]
overdue_penalty = len(overdue_tasks)
# Calculate initial available capacity (before accounting for overdue tasks)
available_capacity = 90 - (non_shiftable_tasks + shiftable_tasks) # Limit the available capacity to 90% to avoid overloading the system
# Step 2: If there is available capacity, add overdue tasks to non-shiftable tasks
overdue_tasks_to_process = 0
overdue_task_count = 0
if available_capacity > 0 and len(overdue_tasks) > 0:
overdue_task_count = len(overdue_tasks)
tasks_that_can_be_processed = min(overdue_task_count, available_capacity)
# non_shiftable_tasks += tasks_that_can_be_processed
overdue_task_count = tasks_that_can_be_processed
# print(f'Overdue tasks: {overdue_task_count}, tasks that can be processed: {tasks_that_can_be_processed}, available capacity: {available_capacity}, non_shiftable_tasks: {non_shiftable_tasks}')
# Remove those overdue tasks that can be processed
overdue_tasks_to_process = tasks_that_can_be_processed
for task in overdue_tasks[:tasks_that_can_be_processed]:
self.tasks_queue.remove(task)
# Update available capacity after adding overdue tasks
available_capacity = 90 - (non_shiftable_tasks + shiftable_tasks + overdue_tasks_to_process) # Limit the available capacity to 90% to avoid overloading the system
# action = action[0]
if action == 0: # Defer tasks
tasks_to_defer = shiftable_tasks
# Attempt to queue deferred tasks
tasks_to_add = min(tasks_to_defer, self.queue_max_len - len(self.tasks_queue))
tasks_dropped += tasks_to_defer - tasks_to_add
# Vectorized: Add multiple tasks at once
self.tasks_queue.extend([{'day': self.current_day, 'hour': self.current_hour, 'utilization': 1}] * tasks_to_add)
# Update utilization
self.current_utilization = (overdue_tasks_to_process + (shiftable_tasks - tasks_to_add)) / 100
elif action == 2: # Process tasks from the queue
# available_capacity = 100 - (non_shiftable_tasks + shiftable_tasks) # Remaining capacity
if available_capacity >= 1: # At least 1 task can be processed
# Cap the number of tasks to process by the available capacity
tasks_to_process = min(shiftable_tasks, available_capacity, len(self.tasks_queue))
actual_tasks_to_process = tasks_to_process # Track for info
# if we can process all of the tasks in the queue, make it faster with clear() instead of with a for loop
if actual_tasks_to_process == len(self.tasks_queue):
self.tasks_queue.clear()
else:
# Vectorized: Pop multiple tasks at once
for _ in range(actual_tasks_to_process):
self.tasks_queue.popleft()
# Update utilization to include processed DTQ tasks
self.current_utilization = (shiftable_tasks + actual_tasks_to_process + overdue_tasks_to_process) / 100
else:
self.current_utilization = (shiftable_tasks + overdue_tasks_to_process) / 100
else: # action == 1, Do nothing
# self.current_utilization = (non_shiftable_tasks + shiftable_tasks) / 100
self.current_utilization = (shiftable_tasks + overdue_tasks_to_process) / 100
self.global_total_steps += 1
original_workload = self.workload
tasks_in_queue = len(self.tasks_queue)
if not self.initialize_queue_at_reset: # That means that we are on eval mode
self.current_utilization += non_shiftable_tasks / 100
reward = 0
# Calculate the age of each task in the queue
if len(self.tasks_queue) > 0:
task_ages = [(self.current_day - task['day']) * 24 + (self.current_hour - task['hour']) for task in self.tasks_queue]
oldest_task_age = max(task_ages)
average_task_age = sum(task_ages) / len(task_ages)
else:
oldest_task_age = 0.0
average_task_age = 0.0
task_age_histogram = self.get_task_age_histogram(self.tasks_queue, self.current_day, self.current_hour)
info = {"ls_original_workload": original_workload,
"ls_shifted_workload": self.current_utilization,
"ls_action": action,
"ls_norm_load_left": 0,
"ls_unasigned_day_load_left": 0,
"ls_penalty_flag": 0,
'ls_queue_max_len': self.queue_max_len,
'ls_tasks_in_queue': tasks_in_queue,
'ls_norm_tasks_in_queue': tasks_in_queue/self.queue_max_len,
'ls_tasks_dropped': tasks_dropped,
'ls_current_hour': self.current_hour,
'ls_tasks_processed': actual_tasks_to_process,
'ls_enforced': enforced,
'ls_oldest_task_age': oldest_task_age/24,
'ls_average_task_age': average_task_age/24,
'ls_overdue_penalty': overdue_penalty,
'ls_computed_tasks': int(self.current_utilization*100),
'ls_task_age_histogram': task_age_histogram,}
#Done and truncated are managed by the main class, implement individual function if needed
truncated = False
done = False
if self.current_utilization > 1 or self.current_utilization < 0:
print('WARNING, the utilization is out of bounds')
state = np.asarray(np.hstack(([self.current_utilization,
tasks_in_queue/self.queue_max_len,
oldest_task_age/24,
average_task_age/24,
task_age_histogram])), dtype=np.float32)
return state, reward, done, truncated, info
[docs]
def update_workload(self, workload):
"""
Makes an environment step in`BatteryEnvFwd.
Workload should be a float between 0 and 1.
Args:
workload (float): Workload assigned at the current time step
"""
if workload < 0 or workload > 1:
print('WARNING, the workload is out of bounds')
# Raise an error if the workload is out of bounds
raise ValueError("The workload should be between 0 and 1")
self.workload = workload
[docs]
def update_current_date(self, current_day, current_hour):
"""
Update the current hour in the environment.
Args:
current_hour (float): Current hour in the environment.
"""
self.current_day = current_day
self.current_hour = current_hour