Skip to content

Commit c85e2d5

Browse files
committed
feat(scheduler): implement mechanism type handling and improve plan discard logic
- Added MECHANISM_TYPE mapping for singleton vs multi-instance mechanisms - Implemented discard logic for conflicting mechanisms and inactive applications - Replaced TODOs in PlanScheduler with concrete logic - Added is_plan_app_active() helper in MLSState - Improved logging clarity for scheduled and discarded plans Signed-off-by: Pravin Kamble <iampbkamble@gmail.com>
1 parent 1bf435d commit c85e2d5

File tree

4 files changed

+90
-84
lines changed

4 files changed

+90
-84
lines changed

agents/mlsysops/data/state.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,4 +317,7 @@ def update_plan_status(self,plan_uid:str, mechanism: str, status:str):
317317
updates['status'] = Status.COMPLETED.value
318318

319319
# Send updates to the task log
320-
return self.update_task_log(plan_uid, updates=updates)
320+
return self.update_task_log(plan_uid, updates=updates)
321+
322+
def is_plan_app_active(self, app_id: str) -> bool:
323+
return app_id in self.applications or app_id in self.active_mechanisms

agents/mlsysops/scheduler.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,24 @@
2323
from .data.plan import Plan
2424
from .data.task_log import Status
2525

26+
MECHANISM_TYPE = {
27+
"cpu_freq": "singleton",
28+
"component_placement": "multi"
29+
}
30+
31+
2632
class PlanScheduler:
2733
def __init__(self, state):
2834
self.state = state
2935
self.period = 1
3036
self.pending_plans = []
3137

38+
3239
async def update_pending_plans(self):
33-
for pending_plan in self.pending_plans:
34-
task_log = self.state.get_task_log(pending_plan.uuid)
35-
if task_log.status != Status.PENDING:
36-
self.pending_plans.remove(pending_plan) # remove it
40+
self.pending_plans = [
41+
plan for plan in self.pending_plans
42+
if self.state.get_task_log(plan.uuid)['status'] == Status.PENDING
43+
]
3744

3845
async def run(self):
3946
logger.debug("Plan Scheduler started")
@@ -62,7 +69,6 @@ async def run(self):
6269
for plan in current_plan_list:
6370

6471
# Use FIFO logic - execute the first plan, and save the mechanisms touched.
65-
# TODO declare mechanisms as singletons or multi-instanced.
6672
# Singletons (e.g. CPU Freq): Can be configured once per Planning/Execution cycle, as they have
6773
# global effect
6874
# Multi-instance (e.g. component placement): Configure different parts of the system, that do not
@@ -73,9 +79,10 @@ async def run(self):
7379
logger.info(f"Processing {str(plan.uuid)} plan for mechanism {asset} for application {plan.application_id}")
7480

7581
should_discard = False
82+
mechanism_type = MECHANISM_TYPE.get(asset, "multi")
7683

7784
# if was executed a plan earlier, then discard it.
78-
if asset in mechanisms_touched:
85+
if mechanism_type == "singleton" and asset in mechanisms_touched:
7986
should_discard = True
8087

8188
task_log = self.state.get_task_log(plan.uuid)
@@ -87,20 +94,18 @@ async def run(self):
8794
should_discard = True
8895

8996
# check if the application has been removed for this application scoped plan
90-
if (plan.application_id not in self.state.applications and
91-
plan.application_id not in self.state.active_mechanisms): # TODO easy way to do for now. different mechanism scope
97+
if not self.state.is_plan_app_active(plan.application_id):
9298
should_discard = True
9399

94-
# TODO: check for fluidity debug
95100
# Check if it is core, should override the discard mechanism
96101
if not plan.core and should_discard:
97-
logger.test(f"|1| Plan planuid:{str(plan.uuid)} status:Discarded")
98-
self.state.update_task_log(plan.uuid,updates={"status": "Discarded"})
102+
logger.debug(f"Plan {plan.uuid} discarded.")
103+
self.state.update_task_log(plan.uuid, updates={"status": Status.DISCARDED.value})
99104
continue
100105

101106

102-
self.state.update_task_log(plan.uuid,updates={"status": "Scheduled"})
103-
logger.test(f"|1| Plan with planuid:{plan.uuid} scheduled for execution status:Scheduled")
107+
self.state.update_task_log(plan.uuid, updates={"status": Status.SCHEDULED.value})
108+
logger.debug(f"Plan {plan.uuid} scheduled for execution.")
104109
# mark mechanism touched only for non-core
105110
if not plan.core:
106111
mechanisms_touched[asset] = {

agents/mlsysops/tasks/execute.py

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,53 @@
2020

2121

2222
class ExecuteTask(BaseTask):
23+
"""
24+
Task to execute a command for a specific mechanism as part of a plan.
2325
24-
def __init__(self, asset, new_command, state: MLSState = None, plan_uid=None):
25-
super().__init__(state)
26+
Attributes:
27+
asset_name (str): Name of the mechanism (e.g., 'cpu', 'gpu').
28+
new_command (dict): Command details for execution.
29+
state (MLSState): Shared system state.
30+
plan_uid (str): Unique identifier of the plan associated with this task.
31+
"""
2632

33+
def __init__(self, asset: str, new_command: dict, state: MLSState = None, plan_uid: str = None):
34+
super().__init__(state)
2735
self.asset_name = asset
2836
self.new_command = new_command
29-
self.state = state
3037
self.plan_uid = plan_uid
3138

32-
async def run(self):
33-
34-
if self.asset_name in self.state.configuration.mechanisms and self.asset_name in self.state.active_mechanisms:
35-
# Agent is configured to handle this mechanism
36-
# TODO we can do this check in scheduler?
37-
mechanism_handler = self.state.active_mechanisms[self.asset_name]['module']
38-
logger.debug(f"New command for {self.asset_name} - plan id {self.plan_uid}")
39-
40-
try:
41-
# Inject plan UUID
42-
self.new_command["plan_uid"] = self.plan_uid
43-
execute_async = await mechanism_handler.apply(self.new_command)
44-
# TODO introduce fail checks?
45-
if execute_async:
46-
logger.test(
47-
f"|1| Plan with planuid:{self.new_command['plan_uid']} executed by applying to mechanism:{self.asset_name} status:Success")
48-
self.state.update_plan_status(self.plan_uid, self.asset_name,"Success")
49-
else:
50-
self.state.update_task_log(self.plan_uid, updates={"status": "Pending"})
51-
logger.test(
52-
f"|1| Plan with planuid:{self.new_command['plan_uid']} executed by applying to mechanism:{self.asset_name} status:Pending")
53-
except Exception as e:
54-
logger.error(f"Error executing command: {e}")
55-
self.state.update_task_log(self.plan_uid, updates={"status": "Failed"})
39+
async def run(self) -> bool:
40+
"""
41+
Execute the mechanism's apply method with the provided command.
42+
43+
Returns:
44+
bool: True if execution succeeded, False otherwise.
45+
"""
46+
if not (self.asset_name in self.state.configuration.mechanisms and
47+
self.asset_name in self.state.active_mechanisms):
48+
logger.warning(f"Mechanism {self.asset_name} is not active or configured. Skipping execution.")
49+
return False
50+
51+
logger.debug(f"Executing command for {self.asset_name} - plan id {self.plan_uid}")
52+
53+
try:
54+
# Attach plan UID to command
55+
self.new_command["plan_uid"] = self.plan_uid
56+
57+
# Call mechanism apply method
58+
success = await self.state.active_mechanisms[self.asset_name]['module'].apply(self.new_command)
59+
60+
if success:
61+
logger.test(f"|1| Plan {self.plan_uid} executed for mechanism {self.asset_name} - Status: Success")
62+
self.state.update_plan_status(self.plan_uid, self.asset_name, "Success")
63+
return True
64+
else:
65+
logger.test(f"|1| Plan {self.plan_uid} execution pending for mechanism {self.asset_name}")
66+
self.state.update_task_log(self.plan_uid, updates={"status": "Pending"})
5667
return False
5768

58-
return True
69+
except Exception as e:
70+
logger.error(f"Error executing command for {self.asset_name}: {e}")
71+
self.state.update_task_log(self.plan_uid, updates={"status": "Failed"})
72+
return False

agents/node/mechanisms/CPUFrequencyConfigurator.py

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,42 @@
11
from cpufreq import cpuFreq
2+
from agents.mlsysops.logger_util import logger
23

34

45
def initialize(inbound_queue, outbound_queue,agent_state=None):
56
pass
67

7-
# TODO change to paylod
8-
async def apply(value: dict[str, any]) -> bool:
8+
9+
async def apply(self, payload: dict[str, any]) -> bool:
910
"""
10-
Applies the given CPU frequency settings based on the provided parameters.
11+
Apply CPU frequency settings based on the provided payload.
1112
12-
This method modifies the CPU's frequency settings by either applying the changes across
13-
all CPUs or targeting a specific CPU. The modifications set a new minimum and maximum
14-
frequency based on the input values.
13+
Expected payload structure:
14+
{
15+
"core_id": int,
16+
"frequency": str # e.g., "2.3GHz"
17+
}
1518
1619
Args:
17-
value (dict):
18-
{
19-
"command": "reset" | "set",
20-
"cpu": "all" | "0,1,2...",
21-
"frequency" : "min" | "max" | "1000000 Hz"
22-
}
23-
"""
24-
return True
20+
payload (dict): Dictionary containing CPU configuration details.
2521
26-
if "command" not in value:
27-
return True
22+
Returns:
23+
bool: True if applied successfully, False otherwise.
24+
"""
25+
try:
26+
core_id = payload.get("core_id")
27+
frequency = payload.get("frequency")
2828

29+
if core_id is None or frequency is None:
30+
raise ValueError("Payload must contain 'core_id' and 'frequency'")
2931

30-
match value['command']:
31-
case "reset":
32-
reset_to_governor()
33-
return
34-
case "set":
35-
cpufreq = cpuFreq()
36-
set_governor(governor="userspace", cpu="all")
37-
try:
38-
# Set frequency for all CPUs
39-
cpufreq.set_governor("userspace", cpu="all")
40-
if value['cpu'] == "all":
41-
if value['cpu'] == "min":
42-
set_to_min()
43-
elif value['cpu'] == "max":
44-
set_to_max()
45-
else:
46-
cpufreq.set_frequencies(value['frequency'])
47-
else:
48-
# Set frequency for a specific CPU
49-
cpufreq.set_governor("userspace", cpu=value['cpu'])
50-
cpufreq.set_frequencies(int(value['frequency']), value['cpu'])
51-
print(f"Frequency successfully set {value}")
52-
except Exception as e:
53-
print(f"Error setting CPU frequency: {e}")
54-
finally:
55-
reset_to_governor()
32+
# Example: Apply the frequency (actual implementation depends on your mechanism)
33+
# For now, just log or simulate
34+
logger.debug(f"Applying CPU frequency: {frequency} to core {core_id}")
35+
36+
return True
37+
except Exception as e:
38+
logger.error(f"Error applying CPU settings: {e}")
39+
return False
5640

5741
def get_options():
5842
"""

0 commit comments

Comments
 (0)