Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 65 additions & 12 deletions nebula/addons/gps/nebulagps.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ def __init__(self, config, addr, update_interval: float = 5.0, verbose=False):
self._config = config
self._addr = addr
self.update_interval = update_interval # Frequency
self.running = False
self._node_locations = {} # Dictionary for storing node locations
self._broadcast_socket = None
self._nodes_location_lock = Locker("nodes_location_lock", async_lock=True)
self._verbose = verbose
self._running = asyncio.Event()
self._background_tasks = [] # Track background tasks

async def start(self):
"""Starts the GPS service, sending and receiving locations."""
logging.info("Starting NebulaGPS service...")
self.running = True
self._running.set()

# Create broadcast socket
self._broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
Expand All @@ -38,20 +39,39 @@ async def start(self):
self._broadcast_socket.bind(("", self.BROADCAST_PORT))

# Start sending and receiving tasks
asyncio.create_task(self._send_location_loop())
asyncio.create_task(self._receive_location_loop())
asyncio.create_task(self._notify_geolocs())
self._background_tasks = [
asyncio.create_task(self._send_location_loop(), name="NebulaGPS_send_location"),
asyncio.create_task(self._receive_location_loop(), name="NebulaGPS_receive_location"),
asyncio.create_task(self._notify_geolocs(), name="NebulaGPS_notify_geolocs"),
]

async def stop(self):
"""Stops the GPS service."""
logging.info("Stopping NebulaGPS service...")
self.running = False
logging.info("🛑 Stopping NebulaGPS service...")
self._running.clear()
logging.info("🛑 NebulaGPS _running event cleared")

# Cancel all background tasks
if self._background_tasks:
logging.info(f"🛑 Cancelling {len(self._background_tasks)} background tasks...")
for task in self._background_tasks:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self._background_tasks.clear()
logging.info("🛑 All background tasks cancelled")

if self._broadcast_socket:
self._broadcast_socket.close()
self._broadcast_socket = None
logging.info("🛑 NebulaGPS broadcast socket closed")
logging.info("✅ NebulaGPS service stopped successfully")

async def is_running(self):
return self.running
return self._running.is_set()

async def get_geoloc(self):
latitude = self._config.participant["mobility_args"]["latitude"]
Expand All @@ -64,7 +84,18 @@ async def calculate_distance(self, self_lat, self_long, other_lat, other_long):

async def _send_location_loop(self):
"""Send the geolocation periodically by broadcast."""
while self.running:
while await self.is_running():
# Check if learning cycle has finished
try:
from nebula.core.network.communications import CommunicationsManager

cm = CommunicationsManager.get_instance()
if cm.learning_finished():
logging.info("GPS: Learning cycle finished, stopping location broadcast")
break
except Exception:
pass # If we can't get the communications manager, continue

latitude, longitude = await self.get_geoloc() # Obtener ubicación actual
message = f"GPS-UPDATE {self._addr} {latitude} {longitude}"
self._broadcast_socket.sendto(message.encode(), (self.BROADCAST_IP, self.BROADCAST_PORT))
Expand All @@ -74,7 +105,18 @@ async def _send_location_loop(self):

async def _receive_location_loop(self):
"""Listens to and stores geolocations from other nodes."""
while self.running:
while await self.is_running():
# Check if learning cycle has finished
try:
from nebula.core.network.communications import CommunicationsManager

cm = CommunicationsManager.get_instance()
if cm.learning_finished():
logging.info("GPS: Learning cycle finished, stopping location reception")
break
except Exception:
pass # If we can't get the communications manager, continue

try:
data, addr = await asyncio.get_running_loop().run_in_executor(
None, self._broadcast_socket.recvfrom, 1024
Expand All @@ -91,7 +133,18 @@ async def _receive_location_loop(self):
logging.exception(f"Error receiving GPS update: {e}")

async def _notify_geolocs(self):
while True:
while await self.is_running():
# Check if learning cycle has finished
try:
from nebula.core.network.communications import CommunicationsManager

cm = CommunicationsManager.get_instance()
if cm.learning_finished():
logging.info("GPS: Learning cycle finished, stopping geolocation notifications")
break
except Exception:
pass # If we can't get the communications manager, continue

await asyncio.sleep(self.update_interval)
await self._nodes_location_lock.acquire_async()
geolocs: dict = self._node_locations.copy()
Expand All @@ -102,7 +155,7 @@ async def _notify_geolocs(self):
for addr, (lat, long) in geolocs.items():
dist = await self.calculate_distance(self_lat, self_long, lat, long)
distances[addr] = (dist, (lat, long))

self._config.update_nodes_distance(distances)
gpsevent = GPSEvent(distances)
asyncio.create_task(EventManager.get_instance().publish_addonevent(gpsevent))
48 changes: 37 additions & 11 deletions nebula/addons/mobility.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,28 @@ def __init__(self, config, verbose=False):
"""
logging.info("Starting mobility module...")
self.config = config
self.grace_time = self.config.participant["mobility_args"]["grace_time_mobility"]
self.period = self.config.participant["mobility_args"]["change_geo_interval"]
self._verbose = verbose
self._running = asyncio.Event()
self._nodes_distances = {}
self._nodes_distances_lock = Locker("nodes_distances_lock", async_lock=True)
self._mobility_task = None # Track the background task

# Mobility configuration
self.mobility = self.config.participant["mobility_args"]["mobility"]
self.mobility_type = self.config.participant["mobility_args"]["mobility_type"]
self.radius_federation = float(self.config.participant["mobility_args"]["radius_federation"])
self.scheme_mobility = self.config.participant["mobility_args"]["scheme_mobility"]
self.round_frequency = int(self.config.participant["mobility_args"]["round_frequency"])
self.grace_time = self.config.participant["mobility_args"]["grace_time_mobility"]
self.period = self.config.participant["mobility_args"]["change_geo_interval"]
# INFO: These values may change according to the needs of the federation
self.max_distance_with_direct_connections = 150 # meters
self.max_movement_random_strategy = 50 # meters
self.max_movement_nearest_strategy = 50 # meters
self.max_initiate_approximation = self.max_distance_with_direct_connections * 1.2
self.radius_federation = float(config.participant["mobility_args"]["radius_federation"])
self.scheme_mobility = config.participant["mobility_args"]["scheme_mobility"]
self.round_frequency = int(config.participant["mobility_args"]["round_frequency"])
# Logging box with mobility information
mobility_msg = f"Mobility: {self.mobility}\nMobility type: {self.mobility_type}\nRadius federation: {self.radius_federation}\nScheme mobility: {self.scheme_mobility}\nEach {self.round_frequency} rounds"
print_msg_box(msg=mobility_msg, indent=2, title="Mobility information")
self._nodes_distances = {}
self._nodes_distances_lock = Locker("nodes_distances_lock", async_lock=True)
self._verbose = verbose

@cached_property
def cm(self):
Expand Down Expand Up @@ -103,8 +107,30 @@ async def start(self):
"""
await EventManager.get_instance().subscribe_addonevent(GPSEvent, self.update_nodes_distances)
await EventManager.get_instance().subscribe_addonevent(GPSEvent, self.update_nodes_distances)
task = asyncio.create_task(self.run_mobility())
return task
self._running.set()
self._mobility_task = asyncio.create_task(self.run_mobility(), name="Mobility_run_mobility")
return self._mobility_task

async def stop(self):
"""
Stops the mobility module.
"""
logging.info("Stopping Mobility module...")
self._running.clear()

# Cancel the background task
if self._mobility_task and not self._mobility_task.done():
logging.info("🛑 Cancelling Mobility background task...")
self._mobility_task.cancel()
try:
await self._mobility_task
except asyncio.CancelledError:
pass
self._mobility_task = None
logging.info("🛑 Mobility background task cancelled")

async def is_running(self):
return self._running.is_set()

async def update_nodes_distances(self, gpsevent: GPSEvent):
distances = await gpsevent.get_event_data()
Expand Down Expand Up @@ -138,7 +164,7 @@ async def run_mobility(self):
if not self.mobility:
return
# await asyncio.sleep(self.grace_time)
while True:
while await self.is_running():
await self.change_geo_location()
await asyncio.sleep(self.period)

Expand Down
72 changes: 42 additions & 30 deletions nebula/addons/networksimulation/nebulanetworksimulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ def __init__(self, changing_interval, interface, verbose=False):
self._network_conditions = self.NETWORK_CONDITIONS.copy()
self._network_conditions_lock = Locker("network_conditions_lock", async_lock=True)
self._current_network_conditions = {}
self._running = False
self._running = asyncio.Event()

@cached_property
def cm(self):
return CommunicationsManager.get_instance()

async def start(self):
logging.info("🌐 Nebula Network Simulator starting...")
self._running = True
self._running.set()
grace_time = self.cm.config.participant["mobility_args"]["grace_time_mobility"]
# if self._verbose: logging.info(f"Waiting {grace_time}s to start applying network conditions based on distances between devices")
# await asyncio.sleep(grace_time)
Expand All @@ -43,37 +43,49 @@ async def start(self):
)

async def stop(self):
self._running = False
logging.info("🌐 Nebula Network Simulator stopping...")
self._running.clear()

async def is_running(self):
return self._running.is_set()

async def _change_network_conditions_based_on_distances(self, gpsevent: GPSEvent):
distances = await gpsevent.get_event_data()
await asyncio.sleep(self._refresh_interval)
if self._verbose:
logging.info("Refresh | conditions based on distances...")
try:
for addr, (distance, _) in distances.items():
if distance is None:
# If the distance is not found, we skip the node
continue
conditions = await self._calculate_network_conditions(distance)
# Only update the network conditions if they have changed
if addr not in self._current_network_conditions or self._current_network_conditions[addr] != conditions:
addr_ip = addr.split(":")[0]
self._set_network_condition_for_addr(
self._node_interface, addr_ip, conditions["bandwidth"], conditions["delay"]
)
self._set_network_condition_for_multicast(
self._node_interface, addr_ip, self.IP_MULTICAST, conditions["bandwidth"], conditions["delay"]
)
async with self._network_conditions_lock:
self._current_network_conditions[addr] = conditions
else:
if self._verbose:
logging.info("network conditions havent changed since last time")
except KeyError:
logging.exception(f"📍 Connection {addr} not found")
except Exception:
logging.exception("📍 Error changing connections based on distance")
while await self.is_running():
if self._verbose:
logging.info("Refresh | conditions based on distances...")
try:
for addr, (distance, _) in distances.items():
if distance is None:
# If the distance is not found, we skip the node
continue
conditions = await self._calculate_network_conditions(distance)
# Only update the network conditions if they have changed
if (
addr not in self._current_network_conditions
or self._current_network_conditions[addr] != conditions
):
addr_ip = addr.split(":")[0]
self._set_network_condition_for_addr(
self._node_interface, addr_ip, conditions["bandwidth"], conditions["delay"]
)
self._set_network_condition_for_multicast(
self._node_interface,
addr_ip,
self.IP_MULTICAST,
conditions["bandwidth"],
conditions["delay"],
)
async with self._network_conditions_lock:
self._current_network_conditions[addr] = conditions
else:
if self._verbose:
logging.info("network conditions havent changed since last time")
except KeyError:
logging.exception(f"📍 Connection {addr} not found")
except Exception:
logging.exception("📍 Error changing connections based on distance")
await asyncio.sleep(self._refresh_interval)

async def set_thresholds(self, thresholds: dict):
async with self._network_conditions_lock:
Expand Down
24 changes: 21 additions & 3 deletions nebula/addons/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def __init__(self, config, trainer):
self.acc_bytes_recv = 0
self.acc_packets_sent = 0
self.acc_packets_recv = 0
self._running = asyncio.Event()
self._reporter_task = None # Track the background task

@property
def cm(self):
Expand Down Expand Up @@ -113,9 +115,10 @@ async def start(self):
- The grace period allows for a delay before the first reporting cycle.
- The reporter loop runs in the background, ensuring continuous data updates.
"""
self._running.set()
await asyncio.sleep(self.grace_time)
task = asyncio.create_task(self.run_reporter())
return task
self._reporter_task = asyncio.create_task(self.run_reporter(), name="Reporter_run_reporter")
return self._reporter_task

async def run_reporter(self):
"""
Expand All @@ -132,7 +135,7 @@ async def run_reporter(self):
Notes:
- The reporting frequency is determined by the 'report_frequency' setting in the config file.
"""
while True:
while self._running.is_set():
if self.config.participant["reporter_args"]["report_status_data_queue"]:
if self.config.participant["scenario_args"]["controller"] != "nebula-test":
await self.__report_status_to_controller()
Expand Down Expand Up @@ -194,6 +197,21 @@ async def report_scenario_finished(self):
logging.exception(f"Error connecting to the controller at {url}")
return False

async def stop(self):
logging.info("🔍 Stopping reporter module...")
self._running.clear()

# Cancel the background task
if self._reporter_task and not self._reporter_task.done():
logging.info("🛑 Cancelling Reporter background task...")
self._reporter_task.cancel()
try:
await self._reporter_task
except asyncio.CancelledError:
pass
self._reporter_task = None
logging.info("🛑 Reporter background task cancelled")

async def __report_data_queue(self):
"""
Processes and reports queued data entries.
Expand Down
Loading
Loading