Source code for utils.workload_utils
import math
import logging
import numpy as np
import pandas as pd
from rl_components.task import Task
[docs]
def assign_task_origins(tasks, datacenter_configs, current_time_utc, logger=None):
"""
Assigns each task an origin datacenter (DC), based on:
- Population weight of the DC
- Local time activity (higher weight during 8am–8pm local time)
Args:
tasks (List[Task]): List of Task objects to assign.
datacenter_configs (List[dict]): DC configuration including weights and timezone offsets.
current_time_utc (datetime): Current simulation time in UTC.
logger (logging.Logger or None): Optional logger for debug output.
"""
def compute_activity_score(local_hour):
# Simulate typical activity pattern: peak between 8am and 8pm local time
return 1.0 if 8 <= local_hour < 20 else 0.3
# Step 1: Calculate scores based on population × activity
scores = {}
for config in datacenter_configs:
dc_id = config['dc_id']
pop_weight = config.get('population_weight', 0.1)
tz_shift = config.get('timezone_shift', 0)
local_hour = (current_time_utc + pd.Timedelta(hours=tz_shift)).hour
scores[dc_id] = pop_weight * compute_activity_score(local_hour)
# Step 2: Normalize scores into probabilities
total_score = sum(scores.values())
probabilities = {dc_id: score / total_score for dc_id, score in scores.items()}
# Step 3: Assign origin DC to each task
dc_ids = list(probabilities.keys())
probs = list(probabilities.values())
for task in tasks:
origin_dc_id = int(np.random.choice(dc_ids, p=probs))
task.origin_dc_id = origin_dc_id
if logger:
logger.debug(f"Task {task.job_name} assigned origin DC{origin_dc_id}.")
[docs]
def extract_tasks_from_row(row, scale=1, datacenter_configs=None,
current_time_utc=None, logger=None,
task_scale: int = 5, # <<<--- NEW PARAMETER
group_size: int = 1): # <<<--- NEW PARAMETER
"""
Convert a row from task_df into a list of Task objects, scaling the number of tasks if needed.
Args:
row (pd.Series): A row from task_df containing 'tasks_matrix'.
scale (int): Scaling factor for task duplication.
datacenter_configs (List[dict]): DC configurations for assigning task origins.
current_time_utc (datetime): Current simulation time in UTC.
logger (logging.Logger or None): Optional logger for debug statements.
group_size (int): Number of consecutive tasks to group into one meta-task.
Defaults to 1 (no grouping).
Returns:
List[Task]: A list of Task objects extracted and scaled from the row.
"""
if group_size < 1:
group_size = 1 # Ensure group size is at least 1
task_scale = task_scale # To simulate tasks that are 5 times larger than the original
individual_tasks = []
# --- Step 1: Extract and scale individual tasks FIRST ---
for task_data in row['tasks_matrix']:
job_name = task_data[0]
arrival_time = current_time_utc # Task arrival time
duration = float(task_data[4])
# Apply task_scale during initial extraction
cores_req = task_scale * float(task_data[5]) / 100.0
gpu_req = task_scale * float(task_data[6]) / 100.0
mem_req = task_scale * float(task_data[7])
bandwidth_gb = float(task_data[8]) # Bandwidth isn't scaled by task_scale here
# Create the original task object (will get origin assigned later)
task = Task(job_name, arrival_time, duration, cores_req, gpu_req, mem_req, bandwidth_gb)
individual_tasks.append(task)
# Create scaled/augmented versions (if scale > 1)
# Note: These scaled tasks will also be part of the grouping later
for i in range(scale - 1):
varied_cpu = max(0.5, cores_req * np.random.uniform(0.8, 1.2))
varied_gpu = max(0.0, gpu_req * np.random.uniform(0.8, 1.2))
varied_mem = max(0.5, mem_req * np.random.uniform(0.8, 1.2))
varied_bw = max(0.1, bandwidth_gb * np.random.uniform(0.8, 1.2))
new_task = Task(
f"{job_name}_scaled_{i}", arrival_time, duration,
varied_cpu, varied_gpu, varied_mem, varied_bw
)
individual_tasks.append(new_task)
# --- Step 2: Assign Origins to ALL individual tasks ---
# This is crucial BEFORE grouping, so we know the 'first' origin
if datacenter_configs and current_time_utc and individual_tasks:
assign_task_origins(individual_tasks, datacenter_configs, current_time_utc, logger=logger)
# --- Step 3: Group tasks if group_size > 1 ---
final_tasks_list = []
if group_size == 1:
final_tasks_list = individual_tasks # No grouping needed
elif individual_tasks: # Only group if there are tasks
if logger:
logger.info(f"Grouping {len(individual_tasks)} tasks into groups of {group_size}")
num_groups = math.ceil(len(individual_tasks) / group_size)
for i in range(num_groups):
start_idx = i * group_size
end_idx = start_idx + group_size
current_group = individual_tasks[start_idx:end_idx]
if not current_group:
continue # Should not happen, but safety check
# Aggregate properties
agg_cores_req = sum(t.cores_req for t in current_group)
agg_gpu_req = sum(t.gpu_req for t in current_group)
agg_mem_req = sum(t.mem_req for t in current_group)
agg_bandwidth_gb = sum(t.bandwidth_gb for t in current_group) # Sum total data
# Duration: Use the maximum duration within the group
agg_duration = max(t.duration for t in current_group)
# SLA Deadline: Use the *earliest* deadline within the group
agg_sla_deadline = max(t.sla_deadline for t in current_group)
# Origin: Use the origin of the *first* task in the group
group_origin_dc_id = current_group[0].origin_dc_id
# Job Name: Create a composite name
group_job_name = f"Group_{i+1}_({current_group[0].job_name})"
# Arrival time is the same for all in this implementation
group_arrival_time = current_group[0].arrival_time
# Create the aggregated meta-task
meta_task = Task(
job_name=group_job_name,
arrival_time=group_arrival_time,
duration=agg_duration,
cores_req=agg_cores_req,
gpu_req=agg_gpu_req,
mem_req=agg_mem_req,
bandwidth_gb=agg_bandwidth_gb
)
# Assign the aggregated/chosen properties
meta_task.origin_dc_id = group_origin_dc_id
meta_task.sla_deadline = agg_sla_deadline # Set the earliest deadline
final_tasks_list.append(meta_task)
if logger:
logger.debug(f" Group[{i}]: {meta_task.job_name} | origin=DC{meta_task.origin_dc_id} | "
f"CPU={meta_task.cores_req:.2f}, GPU={meta_task.gpu_req:.2f}, MEM={meta_task.mem_req:.2f}, "
f"BW={meta_task.bandwidth_gb:.2f}, duration={meta_task.duration:.2f}, "
f"SLA={meta_task.sla_deadline}")
# --- Logging ---
if logger:
log_level = logging.INFO if group_size == 1 else logging.DEBUG # Log details only if grouping
logger.log(log_level, f"extract_tasks_from_row: Returning {len(final_tasks_list)} tasks/groups (group_size={group_size}) at {current_time_utc}.")
# Log details of the final list if debugging grouping
if group_size > 1:
for idx, t in enumerate(final_tasks_list):
logger.debug(
f" FinalTask[{idx}]: {t.job_name} | origin=DC{t.origin_dc_id} | "
f"CPU={t.cores_req:.2f}, GPU={t.gpu_req:.2f}, MEM={t.mem_req:.2f}, "
f"BW={t.bandwidth_gb:.2f}, duration={t.duration:.2f}, SLA={t.sla_deadline}"
)
return final_tasks_list