From aa30ac7f880ed598e9939299fd79a1e890452b5b Mon Sep 17 00:00:00 2001 From: "Alejandro.A.S" Date: Sat, 24 May 2025 09:04:44 +0200 Subject: [PATCH 1/4] fix experiment wrong check --- nebula/core/engine.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nebula/core/engine.py b/nebula/core/engine.py index 9828a83b9..e5bb32988 100644 --- a/nebula/core/engine.py +++ b/nebula/core/engine.py @@ -605,10 +605,6 @@ async def _learning_cycle(self): else: logging.error("Error reporting scenario finished") - logging.info("Checking if all my connections reached the total rounds...") - while not self.cm.check_finished_experiment(): - await asyncio.sleep(1) - await asyncio.sleep(5) # Kill itself From d0c9ebb2eaadaef99cedbbbf720d204099464c5a Mon Sep 17 00:00:00 2001 From: "Alejandro.A.S" Date: Sat, 24 May 2025 09:09:21 +0200 Subject: [PATCH 2/4] fix container kill --- nebula/core/engine.py | 8 +- nebula/core/network/communications.py | 214 +++++++----------- .../awareness/sanetwork/sanetwork.py | 2 +- 3 files changed, 90 insertions(+), 134 deletions(-) diff --git a/nebula/core/engine.py b/nebula/core/engine.py index e5bb32988..2d13b2f3c 100644 --- a/nebula/core/engine.py +++ b/nebula/core/engine.py @@ -2,7 +2,7 @@ import logging import os import time - +import socket import docker from nebula.addons.attacks.attacks import create_attack @@ -601,7 +601,7 @@ async def _learning_cycle(self): if self.config.participant["scenario_args"]["controller"] != "nebula-test": result = await self.reporter.report_scenario_finished() if result: - pass + logging.info("Scenario finished reported succesfully") else: logging.error("Error reporting scenario finished") @@ -610,7 +610,9 @@ async def _learning_cycle(self): # Kill itself if self.config.participant["scenario_args"]["deployment"] == "docker": try: - self.client.containers.get(self.docker_id).stop() + self.docker_id = socket.gethostname() + logging.info(f"docker id: {self.docker_id} killing docker") + self.client.containers.get(self.docker_id).kill() except Exception as e: print(f"Error stopping Docker container with ID {self.docker_id}: {e}") diff --git a/nebula/core/network/communications.py b/nebula/core/network/communications.py index 4cd33b7a6..2f8a02080 100755 --- a/nebula/core/network/communications.py +++ b/nebula/core/network/communications.py @@ -25,6 +25,8 @@ "model", "offer_model" ] + + class CommunicationsManager: _instance = None @@ -139,8 +141,9 @@ async def check_federation_ready(self): logging.info( f"🔗 check_federation_ready | Ready connections: {self.ready_connections} | Connections: {self.connections.keys()}" ) - if set(self.connections.keys()) == self.ready_connections: - return True + async with self.connections_lock: + if set(self.connections.keys()) == self.ready_connections: + return True async def add_ready_connection(self, addr): self.ready_connections.add(addr) @@ -156,7 +159,7 @@ async def start_communications(self, initial_neighbors): addr = f"{i.split(':')[0]}:{i.split(':')[1]}" await self.connect(addr, direct=True) await asyncio.sleep(1) - while not self.verify_connections(initial_neighbors): + while not await self.verify_connections(initial_neighbors): await asyncio.sleep(1) current_connections = await self.get_addrs_current_connections() logging.info(f"Connections verified: {current_connections}") @@ -264,7 +267,6 @@ async def stablish_connection_to_federation(self, msg_type="discover_join", addr if neighbors: addrs.difference_update(neighbors) - # logging.info(f"neighbors: {neighbors} | addr filtered: {addrs}") discovers_sent = 0 connections_made = set() if addrs: @@ -275,7 +277,7 @@ async def stablish_connection_to_federation(self, msg_type="discover_join", addr connections_made.add(addr) await asyncio.sleep(1) for i in range(0, max_tries): - if self.verify_any_connections(addrs): + if await self.verify_any_connections(addrs): break await asyncio.sleep(1) current_connections = await self.get_addrs_current_connections(only_undirected=True) @@ -357,24 +359,25 @@ async def process_connection(reader, writer, priority="medium"): return async with self.connections_manager_lock: - if len(self.connections) >= self.max_connections: - logging.info("🔗 [incoming] Maximum number of connections reached") - logging.info(f"🔗 [incoming] Sending CONNECTION//CLOSE to {addr}") - writer.write(b"CONNECTION//CLOSE\n") - await writer.drain() - writer.close() - await writer.wait_closed() - return + async with self.connections_lock: + if len(self.connections) >= self.max_connections: + logging.info("🔗 [incoming] Maximum number of connections reached") + logging.info(f"🔗 [incoming] Sending CONNECTION//CLOSE to {addr}") + writer.write(b"CONNECTION//CLOSE\n") + await writer.drain() + writer.close() + await writer.wait_closed() + return - logging.info(f"🔗 [incoming] Connections: {self.connections}") - if connection_addr in self.connections: - logging.info(f"🔗 [incoming] Already connected with {self.connections[connection_addr]}") - logging.info(f"🔗 [incoming] Sending CONNECTION//EXISTS to {addr}") - writer.write(b"CONNECTION//EXISTS\n") - await writer.drain() - writer.close() - await writer.wait_closed() - return + logging.info(f"🔗 [incoming] Connections: {self.connections}") + if connection_addr in self.connections: + logging.info(f"🔗 [incoming] Already connected with {self.connections[connection_addr]}") + logging.info(f"🔗 [incoming] Sending CONNECTION//EXISTS to {addr}") + writer.write(b"CONNECTION//EXISTS\n") + await writer.drain() + writer.close() + await writer.wait_closed() + return if connection_addr in self.pending_connections: logging.info(f"🔗 [incoming] Connection with {connection_addr} is already pending") @@ -415,14 +418,15 @@ async def process_connection(reader, writer, priority="medium"): prio=priority, ) async with self.connections_manager_lock: - logging.info(f"🔗 [incoming] Including {connection_addr} in connections") - self.connections[connection_addr] = connection - logging.info(f"🔗 [incoming] Sending CONNECTION//NEW to {addr}") - writer.write(b"CONNECTION//NEW\n") - await writer.drain() - writer.write(f"{self.id}\n".encode()) - await writer.drain() - await connection.start() + async with self.connections_lock: + logging.info(f"🔗 [incoming] Including {connection_addr} in connections") + self.connections[connection_addr] = connection + logging.info(f"🔗 [incoming] Sending CONNECTION//NEW to {addr}") + writer.write(b"CONNECTION//NEW\n") + await writer.drain() + writer.write(f"{self.id}\n".encode()) + await writer.drain() + await connection.start() except Exception as e: logging.exception(f"❗️ [incoming] Error while handling connection with {addr}: {e}") @@ -447,13 +451,14 @@ async def terminate_failed_reconnection(self, conn: Connection): async def stop(self): logging.info("🌐 Stopping Communications Manager... [Removing connections and stopping network engine]") - connections = list(self.connections.values()) - for node in connections: - await node.stop() - if hasattr(self, "server"): - self.network_engine.close() - await self.network_engine.wait_closed() - self.network_task.cancel() + async with self.connections_lock: + connections = list(self.connections.values()) + for node in connections: + await node.stop() + if hasattr(self, "server"): + self.network_engine.close() + await self.network_engine.wait_closed() + self.network_task.cancel() async def run_reconnections(self): for connection in self.connections_reconnect: @@ -466,20 +471,22 @@ async def run_reconnections(self): async def clear_unused_undirect_connections(self): async with self.connections_lock: - for conn in self.connections.values(): - if not conn.direct and await conn.is_inactive(): - logging.info(f"Cleaning unused connection: {conn.addr}") - asyncio.create_task(self.disconnect(conn.addr, mutual_disconnection=False)) + inactive_connections = [conn for conn in self.connections.values() if await conn.is_inactive()] + for conn in inactive_connections: + logging.info(f"Cleaning unused connection: {conn.addr}") + asyncio.create_task(self.disconnect(conn.addr, mutual_disconnection=False)) - def verify_any_connections(self, neighbors): + async def verify_any_connections(self, neighbors): # Return True if any neighbors are connected - if any(neighbor in self.connections for neighbor in neighbors): - return True - return False + async with self.connections_lock: + if any(neighbor in self.connections for neighbor in neighbors): + return True + return False - def verify_connections(self, neighbors): + async def verify_connections(self, neighbors): # Return True if all neighbors are connected - return bool(all(neighbor in self.connections for neighbor in neighbors)) + async with self.connections_lock: + return bool(all(neighbor in self.connections for neighbor in neighbors)) async def network_wait(self): await self.stop_network_engine.wait() @@ -487,9 +494,6 @@ async def network_wait(self): async def deploy_additional_services(self): logging.info("🌐 Deploying additional services...") await self._forwarder.start() - - # await self._discoverer.start() - # await self._health.start() self._propagator.start() async def include_received_message_hash(self, hash_message): @@ -553,7 +557,7 @@ async def process_establish_connection(addr, direct, reconnect, priority): if host == self.host and port == self.port: logging.info("🔗 [outgoing] Connection with yourself is not allowed") return False - + blacklist = await self.bl.get_blacklist() if blacklist: logging.info(f"blacklist: {blacklist}, source trying to connect: {addr}") @@ -562,13 +566,14 @@ async def process_establish_connection(addr, direct, reconnect, priority): return async with self.connections_manager_lock: - if addr in self.connections: - logging.info(f"🔗 [outgoing] Already connected with {self.connections[addr]}") - if not self.connections[addr].get_direct() and (direct == True): - self.connections[addr].set_direct(direct) - return True - else: - return False + async with self.connections_lock: + if addr in self.connections: + logging.info(f"🔗 [outgoing] Already connected with {self.connections[addr]}") + if not self.connections[addr].get_direct() and (direct == True): + self.connections[addr].set_direct(direct) + return True + else: + return False if addr in self.pending_connections: logging.info(f"🔗 [outgoing] Connection with {addr} is already pending") if int(self.host.split(".")[3]) >= int(host.split(".")[3]): @@ -606,7 +611,8 @@ async def process_establish_connection(addr, direct, reconnect, priority): connection_status = connection_status.decode("utf-8").strip() logging.info(f"🔗 [outgoing] Received connection status {connection_status} (from {addr})") - logging.info(f"🔗 [outgoing] Connections: {self.connections}") + async with self.connections_lock: + logging.info(f"🔗 [outgoing] Connections: {self.connections}") if connection_status == "CONNECTION//CLOSE": logging.info(f"🔗 [outgoing] Connection with {addr} closed") @@ -634,7 +640,8 @@ async def process_establish_connection(addr, direct, reconnect, priority): await writer.wait_closed() return False elif connection_status == "CONNECTION//EXISTS": - logging.info(f"🔗 [outgoing] Already connected {self.connections[addr]}") + async with self.connections_lock: + logging.info(f"🔗 [outgoing] Already connected {self.connections[addr]}") writer.close() await writer.wait_closed() return True @@ -656,7 +663,8 @@ async def process_establish_connection(addr, direct, reconnect, priority): config=self.config, prio=priority, ) - self.connections[addr] = connection + async with self.connections_lock: + self.connections[addr] = connection await connection.start() else: logging.info(f"🔗 [outgoing] Unknown connection status {connection_status}") @@ -692,9 +700,8 @@ async def process_establish_connection(addr, direct, reconnect, priority): asyncio.create_task(process_establish_connection(addr, direct, reconnect, priority)) async def connect(self, addr, direct=True, priority="medium"): - await self.get_connections_lock().acquire_async() - duplicated = addr in self.connections - await self.get_connections_lock().release_async() + async with self.connections_lock: + duplicated = addr in self.connections if duplicated: if direct: # Upcoming direct connection if not self.connections[addr].get_direct(): @@ -739,14 +746,18 @@ async def disconnect(self, dest_addr, mutual_disconnection=True, forced=False): await self.add_to_blacklist(dest_addr) logging.info(f"Trying to disconnect {dest_addr}") - if dest_addr not in self.connections: - logging.info(f"Connection {dest_addr} not found") - return + async with self.connections_lock: + if dest_addr not in self.connections: + logging.info(f"Connection {dest_addr} not found") + return try: if mutual_disconnection: await self.connections[dest_addr].send(data=self.create_message("connection", "disconnect")) await asyncio.sleep(1) - await self.connections[dest_addr].stop() + async with self.connections_lock: + conn = self.connections.pop(dest_addr) + await conn.stop() + #await self.connections[dest_addr].stop() except Exception as e: logging.exception(f"❗️ Error while disconnecting {dest_addr}: {e!s}") if dest_addr in self.connections: @@ -754,8 +765,11 @@ async def disconnect(self, dest_addr, mutual_disconnection=True, forced=False): # del self.connections[dest_addr] try: removed = True - await self.connections[dest_addr].stop() - del self.connections[dest_addr] + async with self.connections_lock: + conn = self.connections.pop(dest_addr) + await conn.stop() + # await self.connections[dest_addr].stop() + # del self.connections[dest_addr] except Exception as e: logging.exception(f"❗️ Error while removing connection {dest_addr}: {e!s}") current_connections = await self.get_all_addrs_current_connections(only_direct=True) @@ -768,14 +782,6 @@ async def disconnect(self, dest_addr, mutual_disconnection=True, forced=False): if is_neighbor: await self.engine.update_neighbors(dest_addr, current_connections, remove=removed) - async def remove_temporary_connection(self, temp_addr): - logging.info(f"Removing temporary conneciton:{temp_addr}..") - try: - await self.get_connections_lock().acquire_async() - self.connections.pop(temp_addr, None) - finally: - await self.get_connections_lock().release_async() - async def get_all_addrs_current_connections(self, only_direct=False, only_undirected=False): try: await self.get_connections_lock().acquire_async() @@ -797,63 +803,11 @@ async def get_addrs_current_connections(self, only_direct=False, only_undirected current_connections.add(self.addr) return current_connections - async def get_connection_by_addr(self, addr): - try: - await self.get_connections_lock().acquire_async() - for key, conn in self.connections.items(): - if addr in key: - return conn - return None - except Exception as e: - logging.exception(f"Error getting connection by address: {e}") - return None - finally: - await self.get_connections_lock().release_async() - - async def get_direct_connections(self): - try: - await self.get_connections_lock().acquire_async() - return {conn for _, conn in self.connections.items() if conn.get_direct()} - finally: - await self.get_connections_lock().release_async() - - async def get_undirect_connections(self): - try: - await self.get_connections_lock().acquire_async() - return {conn for _, conn in self.connections.items() if not conn.get_direct()} - finally: - await self.get_connections_lock().release_async() - - async def get_nearest_connections(self, top: int = 1): - try: - await self.get_connections_lock().acquire_async() - sorted_connections = sorted( - self.connections.values(), - key=lambda conn: ( - conn.get_neighbor_distance() if conn.get_neighbor_distance() is not None else float("inf") - ), - ) - if sorted_connections: - if top == 1: - return sorted_connections[0] - else: - return sorted_connections[:top] - else: - return None - finally: - await self.get_connections_lock().release_async() - def get_ready_connections(self): return {addr for addr, conn in self.connections.items() if conn.get_ready()} def learning_finished(self): return self.engine.learning_cycle_finished() - def check_finished_experiment(self): - return all( - conn.get_federated_round() == self.config.participant["scenario_args"]["rounds"] - 1 - for conn in self.connections.values() - ) - def __str__(self): return f"Connections: {[str(conn) for conn in self.connections.values()]}" diff --git a/nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py b/nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py index b09904ba9..c8531c1d9 100644 --- a/nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py +++ b/nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py @@ -149,7 +149,7 @@ async def _check_external_connection_service_status(self): await EventManager.get_instance().subscribe_node_event(BeaconRecievedEvent, self.beacon_received) await self.cm.start_beacon() - async def experiment_finish(self): + async def experiment_finish(self, efe: ExperimentFinishEvent): await self.cm.stop_external_connection_service() async def beacon_received(self, beacon_recieved_event: BeaconRecievedEvent): From 40cad11c22f99591369750fde5879f53c1ca37ac Mon Sep 17 00:00:00 2001 From: enriquetomasmb Date: Sun, 25 May 2025 17:44:01 +0200 Subject: [PATCH 3/4] remove docker id from node config, remove unusued code --- nebula/core/engine.py | 16 ++++++++-------- nebula/core/network/communications.py | 12 ++---------- nebula/frontend/config/participant.json.example | 1 - 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/nebula/core/engine.py b/nebula/core/engine.py index 2d13b2f3c..59124a207 100644 --- a/nebula/core/engine.py +++ b/nebula/core/engine.py @@ -1,8 +1,9 @@ import asyncio import logging import os -import time import socket +import time + import docker from nebula.addons.attacks.attacks import create_attack @@ -86,7 +87,6 @@ def __init__( self.addr = config.participant["network_args"]["addr"] self.role = config.participant["device_args"]["role"] self.name = config.participant["device_args"]["name"] - self.docker_id = config.participant["device_args"]["docker_id"] self.client = docker.from_env() print_banner() @@ -601,20 +601,20 @@ async def _learning_cycle(self): if self.config.participant["scenario_args"]["controller"] != "nebula-test": result = await self.reporter.report_scenario_finished() if result: - logging.info("Scenario finished reported succesfully") + logging.info("📝 Scenario finished reported succesfully") else: - logging.error("Error reporting scenario finished") + logging.error("📝 Error reporting scenario finished") await asyncio.sleep(5) # Kill itself if self.config.participant["scenario_args"]["deployment"] == "docker": try: - self.docker_id = socket.gethostname() - logging.info(f"docker id: {self.docker_id} killing docker") - self.client.containers.get(self.docker_id).kill() + docker_id = socket.gethostname() + logging.info(f"📦 Killing docker container with ID {docker_id}") + self.client.containers.get(docker_id).kill() except Exception as e: - print(f"Error stopping Docker container with ID {self.docker_id}: {e}") + logging.exception(f"📦 Error stopping Docker container with ID {docker_id}: {e}") async def _extended_learning_cycle(self): """ diff --git a/nebula/core/network/communications.py b/nebula/core/network/communications.py index 2f8a02080..2f22c9069 100755 --- a/nebula/core/network/communications.py +++ b/nebula/core/network/communications.py @@ -21,11 +21,7 @@ BLACKLIST_EXPIRATION_TIME = 60 -_COMPRESSED_MESSAGES = [ - "model", - "offer_model" -] - +_COMPRESSED_MESSAGES = ["model", "offer_model"] class CommunicationsManager: @@ -557,7 +553,7 @@ async def process_establish_connection(addr, direct, reconnect, priority): if host == self.host and port == self.port: logging.info("🔗 [outgoing] Connection with yourself is not allowed") return False - + blacklist = await self.bl.get_blacklist() if blacklist: logging.info(f"blacklist: {blacklist}, source trying to connect: {addr}") @@ -757,19 +753,15 @@ async def disconnect(self, dest_addr, mutual_disconnection=True, forced=False): async with self.connections_lock: conn = self.connections.pop(dest_addr) await conn.stop() - #await self.connections[dest_addr].stop() except Exception as e: logging.exception(f"❗️ Error while disconnecting {dest_addr}: {e!s}") if dest_addr in self.connections: logging.info(f"Removing {dest_addr} from connections") - # del self.connections[dest_addr] try: removed = True async with self.connections_lock: conn = self.connections.pop(dest_addr) await conn.stop() - # await self.connections[dest_addr].stop() - # del self.connections[dest_addr] except Exception as e: logging.exception(f"❗️ Error while removing connection {dest_addr}: {e!s}") current_connections = await self.get_all_addrs_current_connections(only_direct=True) diff --git a/nebula/frontend/config/participant.json.example b/nebula/frontend/config/participant.json.example index 48446432d..14061de65 100755 --- a/nebula/frontend/config/participant.json.example +++ b/nebula/frontend/config/participant.json.example @@ -13,7 +13,6 @@ "device_args": { "uid": "", "idx": "", - "docker_id": "", "name": "", "username": "pi", "password": "pi", From 0b1b71b85ef1810b9173be557e51a9a75842b176 Mon Sep 17 00:00:00 2001 From: enriquetomasmb Date: Sun, 25 May 2025 18:00:26 +0200 Subject: [PATCH 4/4] remove stop button when scenario is finished --- nebula/frontend/static/js/monitor/monitor.js | 4 ++++ nebula/frontend/templates/monitor.html | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/nebula/frontend/static/js/monitor/monitor.js b/nebula/frontend/static/js/monitor/monitor.js index 8f4b7b338..a389a9ddb 100644 --- a/nebula/frontend/static/js/monitor/monitor.js +++ b/nebula/frontend/static/js/monitor/monitor.js @@ -1812,6 +1812,10 @@ class Monitor { } else if (allOffline) { statusBadge.className = 'badge bg-danger-subtle text-danger px-3 py-2 ms-3'; statusBadge.innerHTML = 'Finished'; + const stopButton = document.getElementById('stop_button'); + if (stopButton) { + stopButton.style.display = 'none'; + } } else { statusBadge.className = 'badge bg-warning-subtle text-warning px-3 py-2 ms-3'; statusBadge.innerHTML = 'Running'; diff --git a/nebula/frontend/templates/monitor.html b/nebula/frontend/templates/monitor.html index d942374e3..1e0534f9c 100755 --- a/nebula/frontend/templates/monitor.html +++ b/nebula/frontend/templates/monitor.html @@ -53,7 +53,7 @@
{% if scenario.status == "running" or scenario.status == "completed" %} - Stop scenario