-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathscheduler.py
More file actions
122 lines (97 loc) · 5.08 KB
/
scheduler.py
File metadata and controls
122 lines (97 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Copyright (c) 2025. MLSysOps Consortium
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import json
import time
import traceback
from .tasks import ExecuteTask
from .logger_util import logger
from .data.plan import Plan
from .data.task_log import Status
MECHANISM_TYPE = {
"cpu_freq": "singleton",
"component_placement": "multi"
}
class PlanScheduler:
def __init__(self, state):
self.state = state
self.period = 1
self.pending_plans = []
async def update_pending_plans(self):
self.pending_plans = [
plan for plan in self.pending_plans
if self.state.get_task_log(plan.uuid)['status'] == Status.PENDING
]
async def run(self):
logger.debug("Plan Scheduler started")
while True:
try:
await asyncio.sleep(self.period)
current_plan_list: list[Plan] = []
# check for previous plans
await self.update_pending_plans()
# Empty the queue
while not self.state.plans.empty():
# Get plans from the queue
item = await self.state.plans.get()
current_plan_list.append(item)
self.state.plans.task_done() # Mark the task as done
# initialize auxiliary dicts
mechanisms_touched = {}
logger.info(f" --------------Scheduler Loop (Plans: {len(current_plan_list)}) ----------")
if len(current_plan_list) > 0:
for plan in current_plan_list:
# Use FIFO logic - execute the first plan, and save the mechanisms touched.
# Singletons (e.g. CPU Freq): Can be configured once per Planning/Execution cycle, as they have
# global effect
# Multi-instance (e.g. component placement): Configure different parts of the system, that do not
# affect anything else
# Iterating over key-value pairs
for asset, command in plan.asset_new_plan.items():
logger.info(f"Processing {str(plan.uuid)} plan for mechanism {asset} for application {plan.application_id}")
should_discard = False
mechanism_type = MECHANISM_TYPE.get(asset, "multi")
# if was executed a plan earlier, then discard it.
if mechanism_type == "singleton" and asset in mechanisms_touched:
should_discard = True
task_log = self.state.get_task_log(plan.uuid)
# Check if there is a pending task log from previous runs
if task_log:
if (task_log['status'] == Status.PENDING.value
and task_log['mechanism'][asset] == Status.PENDING.value):
should_discard = True
# check if the application has been removed for this application scoped plan
if not self.state.is_plan_app_active(plan.application_id):
should_discard = True
# Check if it is core, should override the discard mechanism
if not plan.core and should_discard:
logger.debug(f"Plan {plan.uuid} discarded.")
self.state.update_task_log(plan.uuid, updates={"status": Status.DISCARDED.value})
continue
self.state.update_task_log(plan.uuid, updates={"status": Status.SCHEDULED.value})
logger.debug(f"Plan {plan.uuid} scheduled for execution.")
# mark mechanism touched only for non-core
if not plan.core:
mechanisms_touched[asset] = {
"timestamp": time.time(),
"plan_uid": plan.uuid,
"plan": command
}
# start execution task
plan_task = ExecuteTask(asset,command, self.state, plan.uuid)
asyncio.create_task(plan_task.run())
except Exception as e:
logger.error(f"Scheduler tick error: {traceback.format_exc()}")
continue