-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathMLSClusterAgent.py
More file actions
383 lines (330 loc) · 22.4 KB
/
MLSClusterAgent.py
File metadata and controls
383 lines (330 loc) · 22.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# Copyright (c) 2025. MLSysOps Consortium
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import json
import traceback
from copy import deepcopy
import mlsysops
from mlsysops.agent import MLSAgent
from mlsysops.logger_util import logger
import copy
from mlsysops.events import MessageEvents
class MLSClusterAgent(MLSAgent):
def __init__(self):
super().__init__()
self.state.agent = self
self.nodes_state = {}
async def run(self):
"""
Main process of the MLSAgent.
"""
await super().run()
logger.info("Starting MLSAgent process...")
# Start the message queue listener task
message_queue_task = asyncio.create_task(self.message_queue_listener())
self.running_tasks.append(message_queue_task)
# start fluidity message queue listener
fluidity_listener = asyncio.create_task(self.fluidity_message_listener())
self.running_tasks.append(fluidity_listener)
await asyncio.gather(*self.running_tasks)
async def stop(self):
await super().stop()
for running_task in self.running_tasks:
running_task.cancel()
logger.debug("Finished cleaning up MLSClusterAgent resources...")
async def message_queue_listener(self):
"""
Waits for and processes messages from a message queue, handling different event types by executing corresponding
tasks such as delegating telemetry configurations, deploying/removing components, or syncing node states.
This method uses asynchronous operations to handle message processing and ensures all events are logged appropriately.
It plays a critical role in managing communication and operations between different components of the system.
Parameters
----------
self : object
Instance of the class that manages this method. May include attributes such as message queues, telemetry controllers,
mechanisms controllers, and node states for handling functionality defined within this method.
Returns
-------
None
This method does not return any value but processes incoming messages and performs respective actions asynchronously.
Raises
------
Exception
Catches and logs any exception that occurs during the processing of messages from the queue.
"""
logger.info("Started Message Queue Listener...")
while True:
try:
# Wait for a message from the queue
message = await self.message_queue.get()
# Extract event type and application details from the message
event = message.get("event")
data = message.get("payload")
# Act upon the event type
logger.debug(f"Received message from spade msg queue of event {event}")
match event:
case mlsysops.events.MessageEvents.COMPONENT_PLACED.value:
logger.debug(f"Sending to fluidity: {data}")
return
case mlsysops.events.MessageEvents.OTEL_DEPLOY.value:
logger.debug(f"Received OTEL_DEPLOY msg from spade")
await self.telemetry_controller.remote_apply_otel_configuration(data['node'],data['otel_config'], data['interval'])
case mlsysops.events.MessageEvents.OTEL_REMOVE.value:
logger.debug(f"Received OTEL_REMOVE msg from spade")
self.telemetry_controller.remove_otel_configuration(data['node'])
case mlsysops.events.MessageEvents.NODE_SYSTEM_DESCRIPTION_SUBMITTED.value:
logger.debug(f"Received {mlsysops.events.MessageEvents.NODE_SYSTEM_DESCRIPTION_SUBMITTED.value} from spade")
await self.state.active_mechanisms["fluidity"]['module'].send_message({
"event": mlsysops.events.MessageEvents.NODE_SYSTEM_DESCRIPTION_SUBMITTED.value,
"payload": data
})
case mlsysops.events.MessageEvents.MESSAGE_TO_FLUIDITY.value:
await self.mechanisms_controller.queues['fluidity']['outbound'].put({
"event": event,
"payload": data
})
case mlsysops.events.MessageEvents.NODE_EXPORTER_DEPLOY.value:
logger.debug(f"Received node exporter deploy msg from node")
await self.telemetry_controller.remote_apply_node_exporter(data)
case mlsysops.events.MessageEvents.NODE_EXPORTER_REMOVE.value:
logger.debug(f"Received node exporter remove msg from node")
self.telemetry_controller.remote_remove_node_exporter_pod(data['node'])
case mlsysops.events.MessageEvents.NODE_STATE_SYNC.value:
# logger.debug(f"Going to send {self.nodes_state[data['node']]} to node {data['node']}")
if data['node'] in self.nodes_state.keys():
await self.send_message_to_node(
data['node'],
MessageEvents.NODE_STATE_SYNC.value,
self.nodes_state[data['node']])
case _:
logger.error(f"Unhandled event type: {event}")
except Exception as e:
logger.error(f"Error processing message in message_queue_listener: {e}")
logger.error(traceback.format_exc())
async def fluidity_message_listener(self):
"""
An asynchronous function that listens for incoming fluidity messages and handles events accordingly. This function operates
continuously, processing messages from the inbound queue of the fluidity mechanisms controller. It performs operations such as
managing applications, updating nodes and components, and coordinating responses to various event types.
Attributes:
mechanisms_controller: Reference to the mechanisms controller that manages the message queues.
application_controller: Maintains the state and behavior of applications handled by the system.
policy_controller: Handles policies associated with applications and their execution.
nodes_state: Dictionary containing the state information of nodes in the system.
Raises:
Exceptions based on any failures during asynchronous message handling, specific to implementation.
"""
logger.debug(f"Starting fluidity message listener.... ")
while True:
try:
msg = await self.mechanisms_controller.queues['fluidity']['inbound'].get()
if not msg:
continue
event = msg.get("event")
data = msg.get("payload")
match event:
case MessageEvents.APP_CREATED.value:
logger.debug(f"Received APP_CREATED msg from fluidity")
await self.application_controller.on_application_received(data)
await self.policy_controller.start_application_policies(data['name'])
case MessageEvents.APP_DELETED.value:
app_name = data['name']
logger.debug(f"Application was removed {app_name}")
await self.application_controller.on_application_terminated(app_name)
await self.policy_controller.delete_application_policies(app_name)
# send messages to nodes
for node_name, node_state in self.nodes_state.items():
if app_name in node_state.keys():
for component_in_node in self.nodes_state[node_name][app_name]['components'].keys():
msg_to_send = {
'name': app_name,
'component_name': component_in_node
}
await self.send_message_to_node(node_name, MessageEvents.COMPONENT_REMOVED.value, msg_to_send)
del self.nodes_state[node_name][app_name]
case MessageEvents.APP_UPDATED.value:
logger.debug(f"Application was updated")
await self.application_controller.on_application_updated(data)
case MessageEvents.COMPONENT_PLACED.value: # DEPRACATED
# logger.debug(f"Application component placed")
# logger.debug(f"Data: {data}")
# Update internal structure
await self.application_controller.on_application_updated(data)
app_name = data['name']
comp_dict = data['comp_dict']
qos_metrics = data.get('qos_metrics', None)
for comp_name in comp_dict:
#logger.info(f'comp_name {comp_name}')
send_msg = {
'name': app_name,
'metrics' : qos_metrics,
'spec': None
}
component_list = {}
for comp_spec in comp_dict[comp_name]['specs']:
node = comp_spec['spec']['nodeName']
if node not in component_list.keys():
# create a new
component_list[node] = []
component_list[node].append(copy.deepcopy(comp_spec))
# send the appropriate message in every node
for node_name, comp_specs in component_list.items():
send_msg['spec'] = {
"components": comp_specs
}
logger.debug(f'Going to send {send_msg} to node {node}')
await self.send_message_to_node(node_name, event, send_msg)
case MessageEvents.COMPONENT_REMOVED.value:
# Not used
logger.debug(f"Application component removed")
case MessageEvents.KUBERNETES_NODE_ADDED.value:
logger.debug(f"Kubernetes node event {event}")
case MessageEvents.KUBERNETES_NODE_MODIFIED.value:
logger.debug(f"Kubernetes node event {event}")
case MessageEvents.KUBERNETES_NODE_REMOVED.value:
logger.debug(f"Kubernetes node event {event}")
case MessageEvents.CLUSTER_SYSTEM_DESCRIPTION_SUBMITTED.value:
logger.debug(f"Cluster description event {event}")
case MessageEvents.CLUSTER_SYSTEM_DESCRIPTION_UPDATED.value:
logger.debug(f"Cluster description event {event}")
case MessageEvents.CLUSTER_SYSTEM_DESCRIPTION_REMOVED.value:
logger.debug(f"Cluster description event {event}")
case MessageEvents.POD_ADDED.value:
logger.debug(f"Pod event {event}")
case MessageEvents.POD_MODIFIED.value:
logger.debug(f"Pod event {event}")
case MessageEvents.POD_DELETED.value:
logger.debug(f"Pod event {event}")
case MessageEvents.NODE_SYSTEM_DESCRIPTION_SUBMITTED.value:
logger.debug(f"Node description event {event}")
case MessageEvents.NODE_SYSTEM_DESCRIPTION_UPDATED.value:
logger.debug(f"Node description event {event}")
case MessageEvents.NODE_SYSTEM_DESCRIPTION_REMOVED.value:
logger.debug(f"Node description event {event}")
case MessageEvents.PLAN_EXECUTED.value:
update_status = await self.update_plan_status(data['plan_uid'],"fluidity", data['status'])
if not update_status:
logger.debug(f"Skipping from cluster :{data['plan_uid']} status:{data['status']}")
continue # this task is not present in this level
logger.debug(f"Plan executed event received id:{data['plan_uid']} status:{data['status']}")
logger.test(f"|1| Plan executed event received planuid:{data['plan_uid']} status:{data['status']}")
comp_dict = data['comp_dict']
if not comp_dict:
logger.error('Plan failed, comp dict is empty.')
continue
application_name = data['name']
# Check if plan kind - change_comp spec, do not change component dict
task_object = self.state.get_task_log(data['plan_uid'])
plan_json = json.loads(task_object['plan'])
del plan_json['fluidity']['deployment_plan']['initial_plan']
for component_name_in_plan, component_action_array in plan_json['fluidity']['deployment_plan'].items():
for component_action in component_action_array:
if component_action['action'] in ['change_spec']:
# update spec
node_name_spec_changed = component_action.get('host',None)
if data['status']: # Success
logger.debug(f"Pod {component_name_in_plan} component spec changed on node {node_name_spec_changed}")
for pod_spec in comp_dict[component_name_in_plan]['specs']: # TODO probably we can find better way to get pod_spec
self.nodes_state[node_name_spec_changed][application_name]['components'][component_name_in_plan] = pod_spec
# Remove component from application description that are not placed on the specific node
active_component_names = list(
self.nodes_state[node_name_spec_changed][application_name][
'components'].keys())
# Remove components from data['spec']['components'] if their name is not in component_names
application_description_spec = deepcopy(data['spec'])
application_description_spec['components'] = [
component for component in application_description_spec['components']
if component['metadata']['name'] in active_component_names
]
## send a message to each node
send_msg = {
'name': application_name,
'component_name': component_name_in_plan,
'spec': application_description_spec,
'pod_spec': self.nodes_state[node_name_spec_changed][application_name]['components'][component_name_in_plan]
}
# send the appropriate message in every node
logger.debug(f'Going to send {event} to node {node_name_spec_changed}')
await self.send_message_to_node(
node_name_spec_changed,
MessageEvents.COMPONENT_UPDATED.value,
send_msg)
# Failed do nothing
elif component_action['action'] in ['move', 'deploy', 'remove']:
for component_name, component_object in comp_dict.items():
for pod_name, pod_component in component_object['specs'].items():
if pod_component['event'] == MessageEvents.COMPONENT_REMOVED.value:
logger.debug(f"Pod {pod_name} component {component_name} removed skipping")
node_name_removed = pod_component.get('hostname',{})
send_msg = {
'name': application_name,
'component_name': component_name
}
# send the appropriate message in every node
logger.debug(f'Move plan. Sending {component_name} removal to node {node_name_removed}')
await self.send_message_to_node(
node_name_removed,
MessageEvents.COMPONENT_REMOVED.value,
send_msg)
# remove from local state
del self.nodes_state[node_name_removed][application_name]['components'][component_name]
continue
node_name_placed = pod_component.get('pod_spec',{}).get('pod_spec',{}).get('spec',{}).get('nodeName',{})
logger.debug(f"Pod {pod_name} component {component_name} placed on node {node_name_placed}")
# put in local state
self.nodes_state.setdefault(node_name_placed, {})
self.nodes_state[node_name_placed].setdefault(application_name, {})
self.nodes_state[node_name_placed][application_name]['name'] = application_name
self.nodes_state[node_name_placed][application_name].setdefault('components', {})
self.nodes_state[node_name_placed][application_name]['components'][component_name] = pod_component['pod_spec']
self.nodes_state[node_name_placed][application_name]['spec'] = data['spec']
# Remove component from application description that are not placed on the specific node
active_component_names = list(self.nodes_state[node_name_placed][application_name]['components'].keys())
# Remove components from data['spec']['components'] if their name is not in component_names
application_description_spec = deepcopy(data['spec'])
application_description_spec['components'] = [
component for component in application_description_spec['components']
if component['metadata']['name'] in active_component_names
]
## send a message to each node
send_msg = {
'name': application_name,
'component_name': component_name,
'spec': application_description_spec,
'pod_spec': pod_component.get('pod_spec',{}).get('comp_sec',{})
}
# send the appropriate message in every node
logger.debug(f'Going to send {event} to node {node_name_placed}')
await self.send_message_to_node(
node_name_placed,
MessageEvents.COMPONENT_PLACED.value,
send_msg)
case MessageEvents.MESSAGE_TO_FLUIDITY_PROXY.value:
# forward to node
logger.debug(f"Received {event} from fluidity proxy to node {msg['node']}")
await self.send_message_to_node(msg['node'],event,data)
case _:
logger.debug(f"Unknown event: Received payload from fluidity: {data}")
if data and 'hostname' in data:
node = data['hostname']
await self.send_message_to_node(node,event,data)
except asyncio.CancelledError:
logger.info(f"Stopping fluidity message listener.... ")
break
except Exception as e:
logger.error(f"fluidity_message_listener: Error processing msg: {e}")
logger.error(traceback.format_exc())
await asyncio.sleep(1)
logger.debug(f"MLSAGENT:::: stopping fluidity message listener.... ")