Source code for utils.task_assignment_strategies

# utils/task_assignment_strategies.py

import random
import numpy as np
import logging # Use standard logging

# --- Base Class ---
[docs] class BaseRBCStrategy: """Base class for all Rule-Based Controller strategies.""" def __call__(self, task, datacenters: dict, logger: logging.Logger = None): """ Selects a destination datacenter ID for the given task. Args: task (Task): The task object to be assigned. datacenters (dict): A dictionary mapping DC names (e.g., "DC1") to SustainDC environment objects. logger (logging.Logger, optional): Logger instance. Defaults to None. Returns: int or None: The numerical dc_id of the selected datacenter, or None if no suitable datacenter is found. """ raise NotImplementedError
[docs] def reset(self): """Resets any internal state of the strategy (optional).""" pass
# --- Concrete Strategy Implementations ---
[docs] class DistributeMostAvailable(BaseRBCStrategy): """ Assigns the task to the datacenter with the MOST available CPU cores AMONG THOSE THAT CAN SCHEDULE the task. """ def __call__(self, task, datacenters: dict, logger: logging.Logger = None): if not datacenters: if logger: logger.error("MostAvailable: No datacenters provided.") return None # --- Filter DCs that can schedule the task --- schedulable_dcs = [] for dc_name, dc in datacenters.items(): # Check if the DC object has the can_schedule method and if it returns True if hasattr(dc, 'can_schedule') and dc.can_schedule(task): schedulable_dcs.append(dc) # else: # if logger: logger.debug(f"MostAvailable: DC {dc.dc_id} cannot schedule task {task.job_name}. Skipping.") # --- If no DC can schedule the task --- if not schedulable_dcs: if logger: logger.warning(f"MostAvailable: No datacenter can schedule task {task.job_name}. Cannot assign.") return None # Indicate no assignment possible # --- Find the best DC among the schedulable ones --- try: # Find the DC with the maximum available cores among the filtered list best_dc = max(schedulable_dcs, key=lambda dc: getattr(dc, 'available_cores', -float('inf'))) if logger: logger.info(f"MostAvailable choice for task {task.job_name}: DC{best_dc.dc_id} ({getattr(best_dc,'available_cores', 0):.1f} cores avail)") return best_dc.dc_id except Exception as e: if logger: logger.error(f"MostAvailable error during max selection: {e}") # Fallback: return first schedulable DC's ID if error occurs return schedulable_dcs[0].dc_id if schedulable_dcs else None
[docs] class DistributeRandom(BaseRBCStrategy): """Randomly assigns the task to one of the available datacenters.""" def __call__(self, task, datacenters: dict, logger: logging.Logger = None): if not datacenters: if logger: logger.error("Random: No datacenters provided.") return None # Select a random DC name, then get the object random_dc_name = random.choice(list(datacenters.keys())) random_dc = datacenters[random_dc_name] if logger: logger.info(f"Random choice for task {task.job_name}: DC{random_dc.dc_id}") return random_dc.dc_id
[docs] class DistributePriorityOrder(BaseRBCStrategy): """ Assigns tasks following a fixed priority order of DC names, selecting the first one that can schedule the task. """ def __init__(self, priority_order=["DC1", "DC2", "DC3", "DC4", "DC5"]): # Default order, can be customized during instantiation if needed self.priority_order = priority_order super().__init__() def __call__(self, task, datacenters: dict, logger: logging.Logger = None): if not datacenters: if logger: logger.error("PriorityOrder: No datacenters provided.") return None for dc_name in self.priority_order: dc = datacenters.get(dc_name) # Check if DC exists and can schedule the task if dc and hasattr(dc, 'can_schedule') and dc.can_schedule(task): if logger: logger.info(f"PriorityOrder choice for task {task.job_name}: {dc_name} (DC{dc.dc_id})") return dc.dc_id # Return numerical ID # No available datacenter found in the priority list that can schedule if logger: logger.warning(f"PriorityOrder: Task {task.job_name} could not be assigned! No suitable DC found in priority list.") return None # Indicate no suitable DC found
[docs] class DistributeLowestPrice(BaseRBCStrategy): """ Assigns the task to the available datacenter with the lowest current electricity price ($/MWh). """ def __call__(self, task, datacenters: dict, logger: logging.Logger = None): if not datacenters: if logger: logger.error("LowestPrice: No datacenters provided.") return None candidates = [] for dc_name, dc in datacenters.items(): # Check if DC can schedule and has price info if hasattr(dc, 'can_schedule') and dc.can_schedule(task) and hasattr(dc, 'price_manager'): try: price = dc.price_manager.get_current_price() if price is not None: candidates.append((price, dc)) # Store price and DC object else: if logger: logger.warning(f"LowestPrice: Could not get price for {dc_name}") except Exception as e: if logger: logger.error(f"LowestPrice: Error getting price for {dc_name}: {e}") # else: # if logger: logger.debug(f"LowestPrice: Skipping {dc_name} (cannot schedule or no price manager)") if not candidates: if logger: logger.warning(f"LowestPrice: Task {task.job_name} could not be assigned! No schedulable DC found with price info.") # Fallback: maybe assign randomly or to first available? Or return None? Let's return None. return None # Find the DC with the minimum price among candidates candidates.sort(key=lambda item: item[0]) # Sort by price (first element of tuple) best_price, best_dc = candidates[0] if logger: logger.info(f"LowestPrice choice for task {task.job_name}: DC{best_dc.dc_id} (Price: {best_price:.2f} $/MWh)") return best_dc.dc_id
[docs] class DistributeLeastPending(BaseRBCStrategy): """Assigns the task to the datacenter with the fewest pending tasks.""" def __call__(self, task, datacenters: dict, logger: logging.Logger = None): if not datacenters: if logger: logger.error("LeastPending: No datacenters provided.") return None # Find the DC object with the minimum pending task queue length try: best_dc = min(datacenters.values(), key=lambda dc: len(getattr(dc, 'pending_tasks', []))) pending_count = len(getattr(best_dc, 'pending_tasks', [])) if logger: logger.info(f"LeastPending choice for task {task.job_name}: DC{best_dc.dc_id} ({pending_count} pending)") return best_dc.dc_id except Exception as e: if logger: logger.error(f"LeastPending error: {e}") # Fallback return list(datacenters.values())[0].dc_id if datacenters else None
[docs] class DistributeLowestCarbon(BaseRBCStrategy): """ Assigns the task to the available datacenter with the lowest current carbon intensity (gCO2/kWh). """ def __call__(self, task, datacenters: dict, logger: logging.Logger = None): if not datacenters: if logger: logger.error("LowestCarbon: No datacenters provided.") return None candidates = [] for dc_name, dc in datacenters.items(): # Check if DC can schedule and has CI info getter if hasattr(dc, 'can_schedule') and dc.can_schedule(task) and hasattr(dc, 'get_current_carbon_intensity'): try: # Use the helper method that should now exist ci = dc.get_current_carbon_intensity(norm=False) # Get raw gCO2/kWh if ci is not None: candidates.append((ci, dc)) else: if logger: logger.warning(f"LowestCarbon: Could not get CI for {dc_name}") except Exception as e: if logger: logger.error(f"LowestCarbon: Error getting CI for {dc_name}: {e}") # else: # if logger: logger.debug(f"LowestCarbon: Skipping {dc_name} (cannot schedule or no CI getter)") if not candidates: if logger: logger.warning(f"LowestCarbon: Task {task.job_name} could not be assigned! No schedulable DC found with CI info.") return None # Indicate no suitable DC # Find the DC with the minimum CI among candidates candidates.sort(key=lambda item: item[0]) # Sort by CI best_ci, best_dc = candidates[0] if logger: logger.info(f"LowestCarbon choice for task {task.job_name}: DC{best_dc.dc_id} (CI: {best_ci:.2f} gCO2/kWh)") return best_dc.dc_id
[docs] class DistributeRoundRobin(BaseRBCStrategy): """Assigns tasks in a round-robin fashion across datacenters.""" def __init__(self): self.last_assigned_dc_index = -1 # Consistent order based on sorted numerical IDs self._dc_order_ids = [] super().__init__()
[docs] def reset(self): """Resets the round-robin index.""" self.last_assigned_dc_index = -1 self._dc_order_ids = [] # Clear the order cache if logging.getLogger().isEnabledFor(logging.DEBUG): # Avoid calculation if not debugging logging.debug("RoundRobin state reset.")
def __call__(self, task, datacenters: dict, logger: logging.Logger = None): if not datacenters: if logger: logger.error("RoundRobin: No datacenters provided.") return None # Ensure a consistent order (sort by numerical dc_id) # Update the order only if the set of keys changes (more robust) current_dc_keys = sorted(datacenters.keys()) # Example using names, could sort by dc_id too current_dc_ids = sorted([dc.dc_id for dc in datacenters.values()]) # Rebuild ordered list if it's empty or the IDs have changed if not self._dc_order_ids or self._dc_order_ids != current_dc_ids: self._dc_order_ids = current_dc_ids # Optionally reset index when DC set changes, or just continue cycle # self.last_assigned_dc_index = -1 # Uncomment to reset index on change if logger: logger.debug(f"RoundRobin order updated: {self._dc_order_ids}") if not self._dc_order_ids: # Should not happen if datacenters is not empty if logger: logger.error("RoundRobin: Failed to establish DC order.") return list(datacenters.values())[0].dc_id # Fallback # Increment index and wrap around self.last_assigned_dc_index = (self.last_assigned_dc_index + 1) % len(self._dc_order_ids) selected_dc_id = self._dc_order_ids[self.last_assigned_dc_index] # # Optional: Check if the selected DC can schedule the task # selected_dc = next((dc for dc in datacenters.values() if dc.dc_id == selected_dc_id), None) # if selected_dc and not selected_dc.can_schedule(task): # if logger: logger.warning(f"RoundRobin selected DC{selected_dc_id} but it cannot schedule task {task.job_name}. Assigning anyway.") # # Policy: Assign anyway, let the DC queue handle it. Or could try next DC. if logger: logger.info(f"RoundRobin choice for task {task.job_name}: DC{selected_dc_id} (index {self.last_assigned_dc_index})") return selected_dc_id # Return numerical ID
[docs] class DistributeLocalOnly(BaseRBCStrategy): """Assigns the task strictly to its origin datacenter.""" def __call__(self, task, datacenters: dict, logger: logging.Logger = None): if not hasattr(task, 'origin_dc_id') or task.origin_dc_id is None: if logger: logger.error(f"LocalOnly: Task {task.job_name} missing valid origin_dc_id.") # Decide fallback: Maybe assign randomly? Or return None? Let's return None. return None origin_id = task.origin_dc_id # Optional: Check if the origin DC actually exists in the current cluster setup origin_dc_exists = any(dc.dc_id == origin_id for dc in datacenters.values()) if not origin_dc_exists: if logger: logger.error(f"LocalOnly: Origin DC {origin_id} for task {task.job_name} not found in current cluster configuration.") # Fallback strategy needed here as well. Assign randomly? Or None? # Let's assign randomly among existing DCs as a simple fallback. if not datacenters: return None fallback_dc = random.choice(list(datacenters.values())) if logger: logger.warning(f"LocalOnly: Assigning task {task.job_name} randomly to DC{fallback_dc.dc_id} as origin DC{origin_id} is missing.") return fallback_dc.dc_id if logger: logger.info(f"LocalOnly choice for task {task.job_name}: Assigning to origin DC{origin_id}") # Return the numerical origin ID return origin_id
# --- Add other strategies similarly ---
[docs] class DistributeLowestUtilization(BaseRBCStrategy): """Assigns the task to the datacenter with the highest overall average resource availability.""" def __call__(self, task, datacenters: dict, logger: logging.Logger = None): if not datacenters: if logger: logger.error("LowestUtilization: No datacenters provided.") return None def calculate_availability_score(dc): cpu_total = getattr(dc, 'total_cores', 0) gpu_total = getattr(dc, 'total_gpus', 0) mem_total = getattr(dc, 'total_mem_GB', 0) # Use getattr with defaults for safety cpu_avail = getattr(dc, 'available_cores', 0) / cpu_total if cpu_total > 0 else 0 gpu_avail = getattr(dc, 'available_gpus', 0) / gpu_total if gpu_total > 0 else 0 mem_avail = getattr(dc, 'available_mem', 0) / mem_total if mem_total > 0 else 0 # Average availability - weights could be added return (cpu_avail + gpu_avail + mem_avail) / 3.0 try: # Find DC with the maximum availability score best_dc = max(datacenters.values(), key=calculate_availability_score) if logger: score = calculate_availability_score(best_dc) logger.info(f"LowestUtilization (Max Avail) choice for task {task.job_name}: DC{best_dc.dc_id} (Score: {score:.3f})") return best_dc.dc_id except Exception as e: if logger: logger.error(f"LowestUtilization error: {e}") # Fallback return list(datacenters.values())[0].dc_id if datacenters else None
# Example of a Weighted Strategy (Needs helper methods in SustainDC) # class DistributeWeighted(BaseRBCStrategy): # """ # Assigns the task based on a weighted combination of normalized cost, # carbon intensity, and resource availability. Lower score is better. # NOTE: Requires normalization and assumes getter methods exist. # """ # def __init__(self, weights={'cost': 0.3, 'carbon': 0.5, 'utilization': 0.2}): # self.weights = weights # super().__init__() # def __call__(self, task, datacenters: dict, logger: logging.Logger = None): # if not datacenters: return None # dc_scores = [] # # --- Need to get ranges or use running stats for normalization --- # # Example: Placeholder normalization - replace with real stats # all_prices = [dc.price_manager.get_current_price() for dc in datacenters.values() if hasattr(dc,'price_manager')] # all_cis = [dc.get_current_carbon_intensity(norm=False) for dc in datacenters.values() if hasattr(dc,'get_current_carbon_intensity')] # min_price, max_price = min(all_prices) if all_prices else 0, max(all_prices) if all_prices else 1 # min_ci, max_ci = min(all_cis) if all_cis else 0, max(all_cis) if all_cis else 1 # price_range = max(1e-6, max_price - min_price) # ci_range = max(1e-6, max_ci - min_ci) # # --- End Placeholder Normalization --- # for dc_name, dc in datacenters.items(): # try: # norm_cost = (dc.price_manager.get_current_price() - min_price) / price_range if hasattr(dc,'price_manager') else 0.5 # norm_ci = (dc.get_current_carbon_intensity(norm=False) - min_ci) / ci_range if hasattr(dc,'get_current_carbon_intensity') else 0.5 # cpu_util = 1.0 - (getattr(dc,'available_cores',0) / getattr(dc,'total_cores',1)) # gpu_util = 1.0 - (getattr(dc,'available_gpus',0) / getattr(dc,'total_gpus',1)) # mem_util = 1.0 - (getattr(dc,'available_mem',0) / getattr(dc,'total_mem_GB',1)) # avg_util = (cpu_util + gpu_util + mem_util) / 3.0 # # Lower score is better: lower cost, lower ci, lower utilization (higher availability) # score = (norm_cost * self.weights['cost'] + # norm_ci * self.weights['carbon'] + # avg_util * self.weights['utilization']) # Lower utilization = lower score = better? Check logic. # dc_scores.append((score, dc)) # except Exception as e: # if logger: logger.error(f"Weighted scoring error for {dc_name}: {e}") # if not dc_scores: # if logger: logger.warning("Weighted: No DCs could be scored.") # return list(datacenters.values())[0].dc_id if datacenters else None # dc_scores.sort(key=lambda item: item[0]) # Sort by score (ascending) # best_score, best_dc = dc_scores[0] # if logger: # logger.info(f"Weighted choice for task {task.job_name}: DC{best_dc.dc_id} (Score: {best_score:.3f})") # return best_dc.dc_id