Skip to content

Commit b491a5c

Browse files
alexanpatrananos
authored andcommitted
feat: automated telemetry deployments. dynamic policy changes. fluidity updates. and more.
Signed-off-by: Alexandros Patras <patras@uth.gr>
1 parent f07cebd commit b491a5c

112 files changed

Lines changed: 4926 additions & 920 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

agents/cluster/MLSClusterAgent.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#
1515

1616
import asyncio
17+
import traceback
1718

1819
import mlsysops
1920
from mlsysops.application import MLSApplication
@@ -67,7 +68,7 @@ async def message_queue_listener(self):
6768
"""
6869
Task to listen for messages from the message queue and act upon them.
6970
"""
70-
print("MLSAGENT:::: Starting Message Queue Listener...")
71+
logger.debug("MLSAGENT:::: Starting Message Queue Listener...")
7172
while True:
7273
try:
7374
# Wait for a message from the queue
@@ -80,6 +81,7 @@ async def message_queue_listener(self):
8081
#node = data.get("hostname")
8182
# Act upon the event type
8283
logger.debug(f"Received message from spade msg queue of event {event}")
84+
logger.debug(f"Payload: {data}")
8385

8486
match event:
8587
case mlsysops.events.MessageEvents.COMPONENT_PLACED.value:
@@ -105,6 +107,12 @@ async def message_queue_listener(self):
105107
"event": event,
106108
"payload": data
107109
})
110+
case mlsysops.events.MessageEvents.NODE_EXPORTER_DEPLOY.value:
111+
logger.debug(f"Received node exporter deploy msg from node")
112+
await self.telemetry_controller.remote_apply_node_exporter(data)
113+
case mlsysops.events.MessageEvents.NODE_EXPORTER_REMOVE.value:
114+
logger.debug(f"Received node exporter remove msg from node")
115+
self.telemetry_controller.remote_remove_node_exporter_pod(data['node'])
108116
case _:
109117
print(f"Unhandled event type: {event}")
110118
print("Going to sleep for 1 second...")
@@ -123,6 +131,8 @@ async def fluidity_message_listener(self):
123131
event = msg.get("event") # Expected event field
124132
logger.debug(f'Event {event}')
125133
data = msg.get("payload") # Additional application-specific data
134+
logger.debug(f"Payload: {data}")
135+
126136
match event:
127137
case MessageEvents.APP_CREATED.value:
128138
logger.debug(f"Received APP_CREATED msg from fluidity")
@@ -185,5 +195,6 @@ async def fluidity_message_listener(self):
185195
break
186196
except Exception as e:
187197
logger.error(f"fluidity_message_listener: Error processing msg: {e}")
198+
logger.debug(traceback.format_exc())
188199
await asyncio.sleep(1)
189-
print(f"MLSAGENT:::: stopping fluidity message listener.... ")
200+
logger.debug(f"MLSAGENT:::: stopping fluidity message listener.... ")

0 commit comments

Comments
 (0)