Skip to content

Commit dbc45ca

Browse files
Feature/new federation controller (#54)
* postgres docker created * fix postgres db endpoints * redis docker added * feature federation controller class en API endpoints * feature federation controller api running * feature federation API: - Scenario Builder - Dataset factories * feature scenario building * fix attack assigment on ScenarioBuilder * feature scenario building complete on federation controller * feature scenario building fix and complete. Fix references on backend * feature additionals late deployment logic * feature federation API request parameters using pydantic. Directories changed * fix participants deployment * feature eputation and attacks configuration * feature additionals participant deployment * feature docker federation controller integration * feature federationID on docker controller * feature process deployment and completion of comunication hub-controller * fix remove docker containers * feature process deployment * fix minor details * fix draw graph * remove comments * feature federation ID on all API requests * update API requests * feature node update-done format --------- Co-authored-by: FerTV <fernando.torres.vega@gmail.com>
1 parent 7478800 commit dbc45ca

33 files changed

Lines changed: 2712 additions & 151 deletions

app/deployer.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from watchdog.observers import Observer
1818

1919
from nebula.addons.env import check_environment
20-
from nebula.controller.controller import TermEscapeCodeFormatter
20+
from nebula.controller.web_app_controller import TermEscapeCodeFormatter
2121
from nebula.controller.scenarios import ScenarioManagement
2222
from nebula.utils import DockerUtils, FileUtils, SocketUtils
2323

@@ -637,6 +637,16 @@ def __init__(self, args):
637637
logging.exception(warning_msg)
638638
sys.exit(1)
639639

640+
self.controller_port = int(args.controllerport) if hasattr(args, "controllerport") else 5050
641+
self.federation_controller_port = int(args.federationcontrollerport) if hasattr(args, "federationcontrollerport") else 5051
642+
self.waf_port = int(args.wafport) if hasattr(args, "wafport") else 6000
643+
self.frontend_port = int(args.webport) if hasattr(args, "webport") else 6060
644+
self.grafana_port = int(args.grafanaport) if hasattr(args, "grafanaport") else 6040
645+
self.loki_port = int(args.lokiport) if hasattr(args, "lokiport") else 6010
646+
self.statistics_port = int(args.statsport) if hasattr(args, "statsport") else 8080
647+
self.production = args.production if hasattr(args, "production") else False
648+
self.dev = args.developement if hasattr(args, "developement") else False
649+
self.advanced_analytics = args.advanced_analytics if hasattr(args, "advanced_analytics") else False
640650
self.databases_dir = args.databases if hasattr(args, "databases") else "/nebula/app/databases"
641651
self.config_dir = args.config
642652
self.log_dir = args.logs
@@ -843,6 +853,9 @@ def start(self):
843853
# Check ports available
844854
if not SocketUtils.is_port_open(self.controller_port):
845855
self.controller_port = SocketUtils.find_free_port(start_port=self.controller_port)
856+
857+
if not SocketUtils.is_port_open(self.federation_controller_port):
858+
self.federation_controller_port = SocketUtils.find_free_port(start_port=self.federation_controller_port)
846859

847860
if not SocketUtils.is_port_open(self.frontend_port):
848861
self.frontend_port = SocketUtils.find_free_port(start_port=self.frontend_port)
@@ -1110,11 +1123,13 @@ def run_controller(self):
11101123
"NEBULA_ROOT_HOST": self.root_path,
11111124
"NEBULA_DATABASES_DIR": "/nebula/app/databases",
11121125
"NEBULA_CONTROLLER_LOG": "/nebula/app/logs/controller.log",
1126+
"NEBULA_FEDERATION_CONTROLLER_LOG": "/nebula/app/logs/federation.log",
11131127
"NEBULA_CONFIG_DIR": "/nebula/app/config/",
11141128
"NEBULA_LOGS_DIR": "/nebula/app/logs/",
11151129
"NEBULA_CERTS_DIR": "/nebula/app/certs/",
11161130
"NEBULA_HOST_PLATFORM": self.host_platform,
11171131
"NEBULA_CONTROLLER_PORT": self.controller_port,
1132+
"NEBULA_FEDERATION_CONTROLLER_PORT" : self.federation_controller_port,
11181133
"NEBULA_CONTROLLER_HOST": self.controller_host,
11191134
"NEBULA_FRONTEND_PORT": self.frontend_port,
11201135
"DB_HOST": self.get_container_name("nebula-database"),
@@ -1126,7 +1141,7 @@ def run_controller(self):
11261141

11271142
volumes = ["/nebula", "/var/run/docker.sock"]
11281143

1129-
ports = [self.controller_port]
1144+
ports = [self.controller_port, self.federation_controller_port]
11301145

11311146
host_config = client.api.create_host_config(
11321147
binds=[
@@ -1135,16 +1150,15 @@ def run_controller(self):
11351150
f"{self.databases_dir}:/nebula/app/databases",
11361151
],
11371152
extra_hosts={"host.docker.internal": "host-gateway"},
1138-
port_bindings={self.controller_port: self.controller_port},
1139-
device_requests=[
1140-
{
1141-
"Driver": "nvidia",
1142-
"Count": -1,
1143-
"Capabilities": [["gpu"]],
1144-
}
1145-
]
1146-
if self.gpu_available
1147-
else None,
1153+
port_bindings={
1154+
self.controller_port: self.controller_port,
1155+
self.federation_controller_port: self.federation_controller_port
1156+
},
1157+
device_requests=[{
1158+
"Driver": "nvidia",
1159+
"Count": -1,
1160+
"Capabilities": [["gpu"]],
1161+
}] if self.gpu_available else None,
11481162
)
11491163

11501164
networking_config = client.api.create_networking_config({

app/main.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717
help="Controller port (default: 5050)",
1818
)
1919

20+
argparser.add_argument(
21+
"-fcp",
22+
"--federationcontrollerport",
23+
dest="federationcontrollerport",
24+
default=5051,
25+
help="federation controller port port (default: 5051)",
26+
)
27+
2028
argparser.add_argument(
2129
"--grafanaport",
2230
dest="grafanaport",

nebula/addons/attacks/attacks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def create_attack(engine) -> Attack:
129129
}
130130

131131
# Get attack name and parameters from the engine configuration
132-
attack_params = engine.config.participant["adversarial_args"].get("attack_params", {})
132+
attack_params = engine.config.participant["addons"]["adversarial_args"].get("attack_params", {})
133133
attack_name = attack_params.get("attacks", None)
134134
if attack_name is None:
135135
raise AttackException("No attack specified")

nebula/addons/gps/nebulagps.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ async def is_running(self):
7474
return self._running.is_set()
7575

7676
async def get_geoloc(self):
77-
latitude = self._config.participant["mobility_args"]["latitude"]
78-
longitude = self._config.participant["mobility_args"]["longitude"]
77+
latitude = self._config.participant["addons"]["mobility"]["latitude"]
78+
longitude = self._config.participant["addons"]["mobility"]["longitude"]
7979
return (latitude, longitude)
8080

8181
async def calculate_distance(self, self_lat, self_long, other_lat, other_long):

nebula/addons/mobility.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,18 @@ def __init__(self, config, verbose=False):
5757
self._mobility_task = None # Track the background task
5858

5959
# Mobility configuration
60-
self.mobility = self.config.participant["mobility_args"]["mobility"]
61-
self.mobility_type = self.config.participant["mobility_args"]["mobility_type"]
62-
self.grace_time = self.config.participant["mobility_args"]["grace_time_mobility"]
63-
self.period = self.config.participant["mobility_args"]["change_geo_interval"]
60+
self.mobility = self.config.participant["addons"]["mobility"]["enabled"]
61+
self.mobility_type = self.config.participant["addons"]["mobility"]["mobility_type"]
62+
self.grace_time = self.config.participant["addons"]["mobility"]["grace_time_mobility"]
63+
self.period = self.config.participant["addons"]["mobility"]["change_geo_interval"]
6464
# INFO: These values may change according to the needs of the federation
6565
self.max_distance_with_direct_connections = 150 # meters
6666
self.max_movement_random_strategy = 50 # meters
6767
self.max_movement_nearest_strategy = 50 # meters
6868
self.max_initiate_approximation = self.max_distance_with_direct_connections * 1.2
69-
self.radius_federation = float(config.participant["mobility_args"]["radius_federation"])
70-
self.scheme_mobility = config.participant["mobility_args"]["scheme_mobility"]
71-
self.round_frequency = int(config.participant["mobility_args"]["round_frequency"])
69+
self.radius_federation = float(config.participant["addons"]["mobility"]["radius_federation"])
70+
self.scheme_mobility = config.participant["addons"]["mobility"]["scheme_mobility"]
71+
self.round_frequency = int(config.participant["addons"]["mobility"]["round_frequency"])
7272
# Logging box with mobility information
7373
mobility_msg = f"Mobility: {self.mobility}\nMobility type: {self.mobility_type}\nRadius federation: {self.radius_federation}\nScheme mobility: {self.scheme_mobility}\nEach {self.round_frequency} rounds"
7474
print_msg_box(msg=mobility_msg, indent=2, title="Mobility information")
@@ -267,11 +267,11 @@ async def set_geo_location(self, latitude, longitude):
267267

268268
if latitude < -90 or latitude > 90 or longitude < -180 or longitude > 180:
269269
# If the new location is out of bounds, we keep the old location
270-
latitude = self.config.participant["mobility_args"]["latitude"]
271-
longitude = self.config.participant["mobility_args"]["longitude"]
270+
latitude = self.config.participant["addons"]["mobility"]["latitude"]
271+
longitude = self.config.participant["addons"]["mobility"]["longitude"]
272272

273-
self.config.participant["mobility_args"]["latitude"] = latitude
274-
self.config.participant["mobility_args"]["longitude"] = longitude
273+
self.config.participant["addons"]["mobility"]["latitude"] = latitude
274+
self.config.participant["addons"]["mobility"]["longitude"] = longitude
275275
if self._verbose:
276276
logging.info(f"📍 New geo location: {latitude}, {longitude}")
277277
cle = ChangeLocationEvent(latitude, longitude)
@@ -301,8 +301,8 @@ async def change_geo_location(self):
301301
"""
302302
if self.mobility and (self.mobility_type == "topology" or self.mobility_type == "both"):
303303
random.seed(time.time() + self.config.participant["device_args"]["idx"])
304-
latitude = float(self.config.participant["mobility_args"]["latitude"])
305-
longitude = float(self.config.participant["mobility_args"]["longitude"])
304+
latitude = float(self.config.participant["addons"]["mobility"]["latitude"])
305+
longitude = float(self.config.participant["addons"]["mobility"]["longitude"])
306306
if True:
307307
# Get neighbor closer to me
308308
async with self._nodes_distances_lock:

nebula/addons/networksimulation/nebulanetworksimulator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def cm(self):
3535
async def start(self):
3636
logging.info("🌐 Nebula Network Simulator starting...")
3737
self._running.set()
38-
grace_time = self.cm.config.participant["mobility_args"]["grace_time_mobility"]
38+
grace_time = self.cm.config.participant["addons"]["mobility"]["grace_time_mobility"]
3939
# if self._verbose: logging.info(f"Waiting {grace_time}s to start applying network conditions based on distances between devices")
4040
# await asyncio.sleep(grace_time)
4141
await EventManager.get_instance().subscribe_addonevent(

nebula/addons/reporter.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import os
66
import sys
7+
from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest
78
from typing import TYPE_CHECKING
89

910
import aiohttp
@@ -54,7 +55,7 @@ def __init__(self, config, trainer):
5455
self.frequency = self.config.participant["reporter_args"]["report_frequency"]
5556
self.grace_time = self.config.participant["reporter_args"]["grace_time_reporter"]
5657
self.data_queue = asyncio.Queue()
57-
self.url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant['scenario_args']['name']}/update"
58+
self.url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant['scenario_args']['federation_id']}/update"
5859
self.counter = 0
5960

6061
self.first_net_metrics = True
@@ -170,8 +171,18 @@ async def report_scenario_finished(self):
170171
might be temporarily overloaded.
171172
- Logs exceptions if the connection attempt to the controller fails.
172173
"""
173-
url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant['scenario_args']['name']}/done"
174-
data = json.dumps({"idx": self.config.participant["device_args"]["idx"]})
174+
url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant['scenario_args']['federation_id']}/done"
175+
node_done_req = NodeDoneRequest(idx=self.config.participant["device_args"]["idx"],
176+
deployment=self.config.participant["scenario_args"]["deployment"],
177+
name=self.config.participant["scenario_args"]["name"],
178+
federation_id=self.config.participant["scenario_args"]["federation_id"]
179+
)
180+
payload = node_done_req.model_dump()
181+
data = json.dumps(payload)
182+
# data = json.dumps({"idx": self.config.participant["device_args"]["idx"],
183+
# "deployment": self.config.participant["scenario_args"]["deployment"],
184+
# "name": self.config.participant["scenario_args"]["name"],
185+
# "federation_id": self.config.participant["scenario_args"]["federation_id"]})
175186
headers = {
176187
"Content-Type": "application/json",
177188
"User-Agent": f"NEBULA Participant {self.config.participant['device_args']['idx']}",
@@ -263,11 +274,13 @@ async def __report_status_to_controller(self):
263274
- Delays for 5 seconds upon general exceptions to avoid rapid retry loops.
264275
"""
265276
try:
277+
node_updt_req = NodeUpdateRequest(config=self.config.participant)
278+
payload = node_updt_req.model_dump()
266279
async with (
267280
aiohttp.ClientSession() as session,
268281
session.post(
269282
self.url,
270-
data=json.dumps(self.config.participant),
283+
data=json.dumps(payload),
271284
headers={
272285
"Content-Type": "application/json",
273286
"User-Agent": f"NEBULA Participant {self.config.participant['device_args']['idx']}",

nebula/addons/reputation/reputation.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import time
44
import numpy as np
55
import torch
6-
6+
from nebula.core.addonmanager import NebulaAddon
77
from datetime import datetime
88
from typing import TYPE_CHECKING
99
from nebula.addons.functions import print_msg_box
@@ -54,7 +54,7 @@ def __init__(
5454
self.similarity = []
5555

5656

57-
class Reputation:
57+
class Reputation(NebulaAddon):
5858
"""
5959
Class to define and manage the reputation of a participant in the network.
6060
@@ -114,7 +114,7 @@ def __init__(self, engine: "Engine", config: "Config"):
114114

115115
def _configure_constants(self):
116116
"""Configure system constants from config or use defaults."""
117-
reputation_config = self._config.participant.get("defense_args", {}).get("reputation", {})
117+
reputation_config = self._config.participant.get("addons", {}).get("reputation", {})
118118
constants_config = reputation_config.get("constants", {})
119119

120120
self.REPUTATION_THRESHOLD = constants_config.get("reputation_threshold", self.REPUTATION_THRESHOLD)
@@ -168,7 +168,7 @@ def _initialize_data_structures(self):
168168

169169
def _load_configuration(self):
170170
"""Load and validate reputation configuration."""
171-
reputation_config = self._config.participant["defense_args"]["reputation"]
171+
reputation_config = self._config.participant["addons"]["reputation"]
172172
self._enabled = reputation_config["enabled"]
173173
self._metrics = reputation_config["metrics"]
174174
self._initial_reputation = float(reputation_config["initial_reputation"])
@@ -316,7 +316,7 @@ def save_data(
316316
except Exception:
317317
logging.exception(f"Error saving data for type {type_data} and neighbor {nei}")
318318

319-
async def setup(self):
319+
async def start(self):
320320
"""Set up the reputation system by subscribing to relevant events."""
321321
if self._enabled:
322322
await EventManager.get_instance().subscribe_node_event(RoundStartEvent, self.on_round_start)
@@ -340,6 +340,9 @@ async def setup(self):
340340
)
341341
await EventManager.get_instance().subscribe_node_event(DuplicatedMessageEvent, self.recollect_duplicated_number_message)
342342

343+
async def stop():
344+
pass
345+
343346
async def init_reputation(
344347
self, federation_nodes=None, round_num=None, last_feedback_round=None, init_reputation=None
345348
):
@@ -1963,7 +1966,7 @@ async def recollect_similarity(self, ure: UpdateReceivedEvent):
19631966
if not (self._enabled and self._is_metric_enabled("model_similarity")):
19641967
return
19651968

1966-
if not self._engine.config.participant["adaptive_args"]["model_similarity"]:
1969+
if not self._engine.config.participant["addons"]["reputation"]["adaptive_args"] and not self._engine.config.participant["addons"]["reputation"]["adaptive_args"]["model_similarity"]:
19671970
return
19681971

19691972
if nei == self._addr:

nebula/config/config.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,14 @@ def add_neighbor_from_config(self, addr):
208208

209209
if not neighbors:
210210
self.participant["network_args"]["neighbors"] = [addr]
211-
self.participant["mobility_args"]["neighbors_distance"][addr] = None
211+
self.participant["addons"]["mobility"]["neighbors_distance"][addr] = None
212212
else:
213213
if addr not in neighbors:
214214
self.participant["network_args"]["neighbors"].append(addr)
215-
self.participant["mobility_args"]["neighbors_distance"][addr] = None
215+
self.participant["addons"]["mobility"]["neighbors_distance"][addr] = None
216216

217217
def update_nodes_distance(self, distances: dict):
218-
self.participant["mobility_args"]["neighbors_distance"] = {node: dist for node, (dist, _) in distances.items()}
218+
self.participant["addons"]["mobility"]["neighbors_distance"] = {node: dist for node, (dist, _) in distances.items()}
219219

220220
def update_neighbors_from_config(self, current_connections, dest_addr):
221221
final_neighbors = [n for n in current_connections if n != dest_addr]
@@ -224,10 +224,10 @@ def update_neighbors_from_config(self, current_connections, dest_addr):
224224
self.participant["network_args"]["neighbors"] = final_neighbors
225225

226226
# Update neighbors location
227-
self.participant["mobility_args"]["neighbors_distance"] = {
228-
n: self.participant["mobility_args"]["neighbors_distance"][n]
227+
self.participant["addons"]["mobility"]["neighbors_distance"] = {
228+
n: self.participant["addons"]["mobility"]["neighbors_distance"][n]
229229
for n in final_neighbors
230-
if n in self.participant["mobility_args"]["neighbors_distance"]
230+
if n in self.participant["addons"]["mobility"]["neighbors_distance"]
231231
}
232232

233233
logging.info(f"Final neighbors: {final_neighbors} (config updated)")
@@ -241,8 +241,8 @@ def remove_neighbor_from_config(self, addr):
241241
neighbors.remove(addr)
242242
self.participant["network_args"]["neighbors"] = neighbors
243243

244-
if addr in self.participant["mobility_args"]["neighbors_distance"]:
245-
del self.participant["mobility_args"]["neighbors_distance"][addr]
244+
if addr in self.participant["addons"]["mobility"]["neighbors_distance"]:
245+
del self.participant["addons"]["mobility"]["neighbors_distance"][addr]
246246

247247

248248
def reload_config_file(self):

nebula/controller/database.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ async def list_nodes_by_scenario_name(scenario_name):
194194
except Exception as e:
195195
logging.error(f"Error occurred while listing nodes by scenario name: {e}")
196196
return None
197+
finally:
198+
if conn:
199+
await conn.close()
197200

198201

199202
async def update_node_record(
@@ -321,7 +324,7 @@ async def get_all_scenarios_and_check_completed(username, role, sort_by="start_t
321324
if sort_by not in allowed_sort_fields:
322325
sort_by = "start_time" # Safe default value
323326

324-
# Building the ORDER BY clause
327+
# Building the ORDER BY clause (same as get_all_scenarios)
325328
if sort_by == "start_time":
326329
order_by_clause = """
327330
ORDER BY

0 commit comments

Comments
 (0)