Skip to content

Commit 9cf12ca

Browse files
enriquetomasmbAlejandroAvilesSerranoFerTV
authored
[Fix] Include a management system for the NEBULA components and async tasks (#45)
* fix deadlock during disconnection * remove unnecessary reconnection attempts after the experiment finishes * remove debug code * fix race conditions and soem issues * fix aggregation skipped * fix issues in blacklist * remove unnecesary condition * improve cleaning when the federation ends * fix stop from frontend * add kill tasks * create a shutdown system, improve readability and management * improve error management in async functions --------- Co-authored-by: Alejandro.A.S <jandrosambasil@gmail.com> Co-authored-by: FerTV <fernando.torres.vega@gmail.com>
1 parent 6f81d6e commit 9cf12ca

27 files changed

Lines changed: 1068 additions & 424 deletions

nebula/addons/gps/nebulagps.py

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@ def __init__(self, config, addr, update_interval: float = 5.0, verbose=False):
1919
self._config = config
2020
self._addr = addr
2121
self.update_interval = update_interval # Frequency
22-
self.running = False
2322
self._node_locations = {} # Dictionary for storing node locations
2423
self._broadcast_socket = None
2524
self._nodes_location_lock = Locker("nodes_location_lock", async_lock=True)
2625
self._verbose = verbose
26+
self._running = asyncio.Event()
27+
self._background_tasks = [] # Track background tasks
2728

2829
async def start(self):
2930
"""Starts the GPS service, sending and receiving locations."""
3031
logging.info("Starting NebulaGPS service...")
31-
self.running = True
32+
self._running.set()
3233

3334
# Create broadcast socket
3435
self._broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@@ -38,20 +39,39 @@ async def start(self):
3839
self._broadcast_socket.bind(("", self.BROADCAST_PORT))
3940

4041
# Start sending and receiving tasks
41-
asyncio.create_task(self._send_location_loop())
42-
asyncio.create_task(self._receive_location_loop())
43-
asyncio.create_task(self._notify_geolocs())
42+
self._background_tasks = [
43+
asyncio.create_task(self._send_location_loop(), name="NebulaGPS_send_location"),
44+
asyncio.create_task(self._receive_location_loop(), name="NebulaGPS_receive_location"),
45+
asyncio.create_task(self._notify_geolocs(), name="NebulaGPS_notify_geolocs"),
46+
]
4447

4548
async def stop(self):
4649
"""Stops the GPS service."""
47-
logging.info("Stopping NebulaGPS service...")
48-
self.running = False
50+
logging.info("🛑 Stopping NebulaGPS service...")
51+
self._running.clear()
52+
logging.info("🛑 NebulaGPS _running event cleared")
53+
54+
# Cancel all background tasks
55+
if self._background_tasks:
56+
logging.info(f"🛑 Cancelling {len(self._background_tasks)} background tasks...")
57+
for task in self._background_tasks:
58+
if not task.done():
59+
task.cancel()
60+
try:
61+
await task
62+
except asyncio.CancelledError:
63+
pass
64+
self._background_tasks.clear()
65+
logging.info("🛑 All background tasks cancelled")
66+
4967
if self._broadcast_socket:
5068
self._broadcast_socket.close()
5169
self._broadcast_socket = None
70+
logging.info("🛑 NebulaGPS broadcast socket closed")
71+
logging.info("✅ NebulaGPS service stopped successfully")
5272

5373
async def is_running(self):
54-
return self.running
74+
return self._running.is_set()
5575

5676
async def get_geoloc(self):
5777
latitude = self._config.participant["mobility_args"]["latitude"]
@@ -64,7 +84,18 @@ async def calculate_distance(self, self_lat, self_long, other_lat, other_long):
6484

6585
async def _send_location_loop(self):
6686
"""Send the geolocation periodically by broadcast."""
67-
while self.running:
87+
while await self.is_running():
88+
# Check if learning cycle has finished
89+
try:
90+
from nebula.core.network.communications import CommunicationsManager
91+
92+
cm = CommunicationsManager.get_instance()
93+
if cm.learning_finished():
94+
logging.info("GPS: Learning cycle finished, stopping location broadcast")
95+
break
96+
except Exception:
97+
pass # If we can't get the communications manager, continue
98+
6899
latitude, longitude = await self.get_geoloc() # Obtener ubicación actual
69100
message = f"GPS-UPDATE {self._addr} {latitude} {longitude}"
70101
self._broadcast_socket.sendto(message.encode(), (self.BROADCAST_IP, self.BROADCAST_PORT))
@@ -74,7 +105,18 @@ async def _send_location_loop(self):
74105

75106
async def _receive_location_loop(self):
76107
"""Listens to and stores geolocations from other nodes."""
77-
while self.running:
108+
while await self.is_running():
109+
# Check if learning cycle has finished
110+
try:
111+
from nebula.core.network.communications import CommunicationsManager
112+
113+
cm = CommunicationsManager.get_instance()
114+
if cm.learning_finished():
115+
logging.info("GPS: Learning cycle finished, stopping location reception")
116+
break
117+
except Exception:
118+
pass # If we can't get the communications manager, continue
119+
78120
try:
79121
data, addr = await asyncio.get_running_loop().run_in_executor(
80122
None, self._broadcast_socket.recvfrom, 1024
@@ -91,7 +133,18 @@ async def _receive_location_loop(self):
91133
logging.exception(f"Error receiving GPS update: {e}")
92134

93135
async def _notify_geolocs(self):
94-
while True:
136+
while await self.is_running():
137+
# Check if learning cycle has finished
138+
try:
139+
from nebula.core.network.communications import CommunicationsManager
140+
141+
cm = CommunicationsManager.get_instance()
142+
if cm.learning_finished():
143+
logging.info("GPS: Learning cycle finished, stopping geolocation notifications")
144+
break
145+
except Exception:
146+
pass # If we can't get the communications manager, continue
147+
95148
await asyncio.sleep(self.update_interval)
96149
await self._nodes_location_lock.acquire_async()
97150
geolocs: dict = self._node_locations.copy()
@@ -102,7 +155,7 @@ async def _notify_geolocs(self):
102155
for addr, (lat, long) in geolocs.items():
103156
dist = await self.calculate_distance(self_lat, self_long, lat, long)
104157
distances[addr] = (dist, (lat, long))
105-
158+
106159
self._config.update_nodes_distance(distances)
107160
gpsevent = GPSEvent(distances)
108161
asyncio.create_task(EventManager.get_instance().publish_addonevent(gpsevent))

nebula/addons/mobility.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,28 @@ def __init__(self, config, verbose=False):
5050
"""
5151
logging.info("Starting mobility module...")
5252
self.config = config
53-
self.grace_time = self.config.participant["mobility_args"]["grace_time_mobility"]
54-
self.period = self.config.participant["mobility_args"]["change_geo_interval"]
53+
self._verbose = verbose
54+
self._running = asyncio.Event()
55+
self._nodes_distances = {}
56+
self._nodes_distances_lock = Locker("nodes_distances_lock", async_lock=True)
57+
self._mobility_task = None # Track the background task
58+
59+
# Mobility configuration
5560
self.mobility = self.config.participant["mobility_args"]["mobility"]
5661
self.mobility_type = self.config.participant["mobility_args"]["mobility_type"]
57-
self.radius_federation = float(self.config.participant["mobility_args"]["radius_federation"])
58-
self.scheme_mobility = self.config.participant["mobility_args"]["scheme_mobility"]
59-
self.round_frequency = int(self.config.participant["mobility_args"]["round_frequency"])
62+
self.grace_time = self.config.participant["mobility_args"]["grace_time_mobility"]
63+
self.period = self.config.participant["mobility_args"]["change_geo_interval"]
6064
# INFO: These values may change according to the needs of the federation
6165
self.max_distance_with_direct_connections = 150 # meters
6266
self.max_movement_random_strategy = 50 # meters
6367
self.max_movement_nearest_strategy = 50 # meters
6468
self.max_initiate_approximation = self.max_distance_with_direct_connections * 1.2
69+
self.radius_federation = float(config.participant["mobility_args"]["radius_federation"])
70+
self.scheme_mobility = config.participant["mobility_args"]["scheme_mobility"]
71+
self.round_frequency = int(config.participant["mobility_args"]["round_frequency"])
6572
# Logging box with mobility information
6673
mobility_msg = f"Mobility: {self.mobility}\nMobility type: {self.mobility_type}\nRadius federation: {self.radius_federation}\nScheme mobility: {self.scheme_mobility}\nEach {self.round_frequency} rounds"
6774
print_msg_box(msg=mobility_msg, indent=2, title="Mobility information")
68-
self._nodes_distances = {}
69-
self._nodes_distances_lock = Locker("nodes_distances_lock", async_lock=True)
70-
self._verbose = verbose
7175

7276
@cached_property
7377
def cm(self):
@@ -103,8 +107,30 @@ async def start(self):
103107
"""
104108
await EventManager.get_instance().subscribe_addonevent(GPSEvent, self.update_nodes_distances)
105109
await EventManager.get_instance().subscribe_addonevent(GPSEvent, self.update_nodes_distances)
106-
task = asyncio.create_task(self.run_mobility())
107-
return task
110+
self._running.set()
111+
self._mobility_task = asyncio.create_task(self.run_mobility(), name="Mobility_run_mobility")
112+
return self._mobility_task
113+
114+
async def stop(self):
115+
"""
116+
Stops the mobility module.
117+
"""
118+
logging.info("Stopping Mobility module...")
119+
self._running.clear()
120+
121+
# Cancel the background task
122+
if self._mobility_task and not self._mobility_task.done():
123+
logging.info("🛑 Cancelling Mobility background task...")
124+
self._mobility_task.cancel()
125+
try:
126+
await self._mobility_task
127+
except asyncio.CancelledError:
128+
pass
129+
self._mobility_task = None
130+
logging.info("🛑 Mobility background task cancelled")
131+
132+
async def is_running(self):
133+
return self._running.is_set()
108134

109135
async def update_nodes_distances(self, gpsevent: GPSEvent):
110136
distances = await gpsevent.get_event_data()
@@ -138,7 +164,7 @@ async def run_mobility(self):
138164
if not self.mobility:
139165
return
140166
# await asyncio.sleep(self.grace_time)
141-
while True:
167+
while await self.is_running():
142168
await self.change_geo_location()
143169
await asyncio.sleep(self.period)
144170

nebula/addons/networksimulation/nebulanetworksimulator.py

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ def __init__(self, changing_interval, interface, verbose=False):
2626
self._network_conditions = self.NETWORK_CONDITIONS.copy()
2727
self._network_conditions_lock = Locker("network_conditions_lock", async_lock=True)
2828
self._current_network_conditions = {}
29-
self._running = False
29+
self._running = asyncio.Event()
3030

3131
@cached_property
3232
def cm(self):
3333
return CommunicationsManager.get_instance()
3434

3535
async def start(self):
3636
logging.info("🌐 Nebula Network Simulator starting...")
37-
self._running = True
37+
self._running.set()
3838
grace_time = self.cm.config.participant["mobility_args"]["grace_time_mobility"]
3939
# if self._verbose: logging.info(f"Waiting {grace_time}s to start applying network conditions based on distances between devices")
4040
# await asyncio.sleep(grace_time)
@@ -43,37 +43,49 @@ async def start(self):
4343
)
4444

4545
async def stop(self):
46-
self._running = False
46+
logging.info("🌐 Nebula Network Simulator stopping...")
47+
self._running.clear()
48+
49+
async def is_running(self):
50+
return self._running.is_set()
4751

4852
async def _change_network_conditions_based_on_distances(self, gpsevent: GPSEvent):
4953
distances = await gpsevent.get_event_data()
50-
await asyncio.sleep(self._refresh_interval)
51-
if self._verbose:
52-
logging.info("Refresh | conditions based on distances...")
53-
try:
54-
for addr, (distance, _) in distances.items():
55-
if distance is None:
56-
# If the distance is not found, we skip the node
57-
continue
58-
conditions = await self._calculate_network_conditions(distance)
59-
# Only update the network conditions if they have changed
60-
if addr not in self._current_network_conditions or self._current_network_conditions[addr] != conditions:
61-
addr_ip = addr.split(":")[0]
62-
self._set_network_condition_for_addr(
63-
self._node_interface, addr_ip, conditions["bandwidth"], conditions["delay"]
64-
)
65-
self._set_network_condition_for_multicast(
66-
self._node_interface, addr_ip, self.IP_MULTICAST, conditions["bandwidth"], conditions["delay"]
67-
)
68-
async with self._network_conditions_lock:
69-
self._current_network_conditions[addr] = conditions
70-
else:
71-
if self._verbose:
72-
logging.info("network conditions havent changed since last time")
73-
except KeyError:
74-
logging.exception(f"📍 Connection {addr} not found")
75-
except Exception:
76-
logging.exception("📍 Error changing connections based on distance")
54+
while await self.is_running():
55+
if self._verbose:
56+
logging.info("Refresh | conditions based on distances...")
57+
try:
58+
for addr, (distance, _) in distances.items():
59+
if distance is None:
60+
# If the distance is not found, we skip the node
61+
continue
62+
conditions = await self._calculate_network_conditions(distance)
63+
# Only update the network conditions if they have changed
64+
if (
65+
addr not in self._current_network_conditions
66+
or self._current_network_conditions[addr] != conditions
67+
):
68+
addr_ip = addr.split(":")[0]
69+
self._set_network_condition_for_addr(
70+
self._node_interface, addr_ip, conditions["bandwidth"], conditions["delay"]
71+
)
72+
self._set_network_condition_for_multicast(
73+
self._node_interface,
74+
addr_ip,
75+
self.IP_MULTICAST,
76+
conditions["bandwidth"],
77+
conditions["delay"],
78+
)
79+
async with self._network_conditions_lock:
80+
self._current_network_conditions[addr] = conditions
81+
else:
82+
if self._verbose:
83+
logging.info("network conditions havent changed since last time")
84+
except KeyError:
85+
logging.exception(f"📍 Connection {addr} not found")
86+
except Exception:
87+
logging.exception("📍 Error changing connections based on distance")
88+
await asyncio.sleep(self._refresh_interval)
7789

7890
async def set_thresholds(self, thresholds: dict):
7991
async with self._network_conditions_lock:

nebula/addons/reporter.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ def __init__(self, config, trainer):
6767
self.acc_bytes_recv = 0
6868
self.acc_packets_sent = 0
6969
self.acc_packets_recv = 0
70+
self._running = asyncio.Event()
71+
self._reporter_task = None # Track the background task
7072

7173
@property
7274
def cm(self):
@@ -113,9 +115,10 @@ async def start(self):
113115
- The grace period allows for a delay before the first reporting cycle.
114116
- The reporter loop runs in the background, ensuring continuous data updates.
115117
"""
118+
self._running.set()
116119
await asyncio.sleep(self.grace_time)
117-
task = asyncio.create_task(self.run_reporter())
118-
return task
120+
self._reporter_task = asyncio.create_task(self.run_reporter(), name="Reporter_run_reporter")
121+
return self._reporter_task
119122

120123
async def run_reporter(self):
121124
"""
@@ -132,7 +135,7 @@ async def run_reporter(self):
132135
Notes:
133136
- The reporting frequency is determined by the 'report_frequency' setting in the config file.
134137
"""
135-
while True:
138+
while self._running.is_set():
136139
if self.config.participant["reporter_args"]["report_status_data_queue"]:
137140
if self.config.participant["scenario_args"]["controller"] != "nebula-test":
138141
await self.__report_status_to_controller()
@@ -194,6 +197,21 @@ async def report_scenario_finished(self):
194197
logging.exception(f"Error connecting to the controller at {url}")
195198
return False
196199

200+
async def stop(self):
201+
logging.info("🔍 Stopping reporter module...")
202+
self._running.clear()
203+
204+
# Cancel the background task
205+
if self._reporter_task and not self._reporter_task.done():
206+
logging.info("🛑 Cancelling Reporter background task...")
207+
self._reporter_task.cancel()
208+
try:
209+
await self._reporter_task
210+
except asyncio.CancelledError:
211+
pass
212+
self._reporter_task = None
213+
logging.info("🛑 Reporter background task cancelled")
214+
197215
async def __report_data_queue(self):
198216
"""
199217
Processes and reports queued data entries.

0 commit comments

Comments
 (0)