Skip to content

Commit 2761629

Browse files
committed
hotfix: run scenario endpoint
1 parent c69a07b commit 2761629

4 files changed

Lines changed: 193 additions & 185 deletions

File tree

nebula/controller/federation/controllers/docker_federation_controller.py

Lines changed: 53 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -35,33 +35,33 @@ async def get_additionals_to_be_deployed(self, config) -> list:
3535
async with self.federation_deployment_lock:
3636
if not self.additionals_participants:
3737
return False
38-
38+
3939
participant_idx = int(config["device_args"]["idx"])
4040
participant_round = int(config["federation_args"]["round"])
4141
self.round_per_participant[participant_idx] = participant_round
4242
self.federation_round = min(self.round_per_participant.values())
43-
43+
4444
self.additionals_deployables = [
4545
idx
46-
for idx, round in self.additionals_participants.items()
46+
for idx, round in self.additionals_participants.items()
4747
if self.federation_round >= round
4848
]
49-
49+
5050
additionals_deployables = self.additionals_deployables.copy()
5151
for idx in additionals_deployables:
5252
self.additionals_participants.pop(idx)
5353
return additionals_deployables
54-
54+
5555
async def is_experiment_finish(self):
5656
async with self.participants_alive_lock:
5757
self.participants_alive -= 1
58-
if self.participants_alive <= 0:
59-
return True
60-
else:
61-
return False
58+
if self.participants_alive <= 0:
59+
return True
60+
else:
61+
return False
6262

6363
class DockerFederationController(FederationController):
64-
64+
6565
def __init__(self, hub_url, logger):
6666
super().__init__(hub_url, logger)
6767
self.root_path = ""
@@ -73,7 +73,7 @@ def __init__(self, hub_url, logger):
7373
self.url = ""
7474
self._nebula_federations_pool: dict[str, NebulaFederationDocker] = {}
7575
self._federations_dict_lock = Locker("federations_dict_lock", async_lock=True)
76-
76+
7777
@property
7878
def nfp(self):
7979
"""Nebula Federations Pool"""
@@ -87,24 +87,24 @@ def nfp(self):
8787
async def run_scenario(self, federation_id: str, scenario_data: Dict, user: str):
8888
#TODO maintain files on memory, not read them again
8989
federation = await self._add_nebula_federation_to_pool(federation_id, user)
90-
id = ""
90+
scenario_info = {}
9191
if federation:
9292
scenario_builder = ScenarioBuilder(federation_id)
9393
await self._initialize_scenario(scenario_builder, scenario_data, federation)
9494
generate_ca_certificate(dir_path=self.cert_dir)
9595
await self._load_configuration_and_start_nodes(scenario_builder, federation)
9696
self._start_initial_nodes(scenario_builder, federation)
97-
id = scenario_builder.get_scenario_name()
97+
scenario_info = scenario_builder.get_scenario_info()
9898
try:
9999
nebula_federation = self.nfp[federation_id]
100-
nebula_federation.scenario_name = id
100+
nebula_federation.scenario_name = scenario_builder.get_scenario_name()
101101
except Exception as e:
102102
self.logger.info(f"ERROR: federation ID: ({federation_id}) not found on pool..")
103103
return None
104104
else:
105105
self.logger.info(f"ERROR: federation ID: ({federation_id}) already exists..")
106-
return id
107-
106+
return scenario_info
107+
108108
async def stop_scenario(self, federation_id: str):
109109
"""
110110
Remove all participant containers and the scenario network.
@@ -114,7 +114,7 @@ async def stop_scenario(self, federation_id: str):
114114
federation_scenario_name = await self._remove_nebula_federation_from_pool(federation_id)
115115
if not federation_scenario_name:
116116
return False
117-
117+
118118
# Try multiple possible config directory locations. This depends on where the user called the function from.
119119
possible_config_dirs = [
120120
os.environ.get("NEBULA_CONFIG_DIR"),
@@ -154,7 +154,7 @@ async def stop_scenario(self, federation_id: str):
154154
for scenario_dir in scenario_dirs:
155155
if scenario_dir != federation_scenario_name:
156156
continue
157-
157+
158158
metadata_path = os.path.join(scenario_dir, "scenario.metadata")
159159
if not os.path.exists(metadata_path):
160160
self.logger.info(f"Skipping {scenario_dir} - no scenario.metadata found")
@@ -195,10 +195,10 @@ async def stop_scenario(self, federation_id: str):
195195
os.remove(metadata_path)
196196
except Exception as e:
197197
self.logger.info(f"Could not remove scenario.metadata: {e}")
198-
198+
199199
if scenario_dir == federation_scenario_name:
200200
break
201-
201+
202202
return True #TODO care about cases
203203

204204
async def update_nodes(self, federation_id: str, node_update_request: NodeUpdateRequest):
@@ -219,7 +219,7 @@ async def update_nodes(self, federation_id: str, node_update_request: NodeUpdate
219219
for index in additionals:
220220
if index in adds_deployed:
221221
continue
222-
222+
223223
for idx, node in enumerate(nebula_federation.config.participants):
224224
if index == idx:
225225
if index in additionals:
@@ -235,7 +235,7 @@ async def update_nodes(self, federation_id: str, node_update_request: NodeUpdate
235235
return {"message": "Node updated successfully in Federation Controller"}
236236
except Exception as e:
237237
self.logger.info(f"ERROR: federation ID: ({fed_id}), {e}")
238-
return {"message": "Node updated failed in Federation Controller"}
238+
return {"message": "Node updated failed in Federation Controller"}
239239

240240
async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest):
241241
nebula_federation = self.nfp[federation_id]
@@ -255,7 +255,7 @@ async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest
255255
# FUNCTIONALITIES #
256256
###############################
257257
"""
258-
258+
259259
async def _add_nebula_federation_to_pool(self, federation_id: str, user: str):
260260
fed = None
261261
async with self._federations_dict_lock:
@@ -265,8 +265,8 @@ async def _add_nebula_federation_to_pool(self, federation_id: str, user: str):
265265
self.logger.info(f"SUCCESS: new ID: ({federation_id}) added to the pool")
266266
else:
267267
self.logger.info(f"ERROR: trying to add ({federation_id}) to federations pool..")
268-
return fed
269-
268+
return fed
269+
270270
async def _remove_nebula_federation_from_pool(self, federation_id: str):
271271
async with self._federations_dict_lock:
272272
if federation_id in self.nfp:
@@ -286,8 +286,8 @@ async def _update_federation_on_pool(self, federation_id: str, user: str, nf: Ne
286286
self.logger.info(f"UPDATED: federation: ({federation_id}) successfully updated")
287287
else:
288288
self.logger.info(f"ERROR: trying to update ({federation_id}) on federations pool..")
289-
return updated
290-
289+
return updated
290+
291291
async def _send_to_hub(self, path, payload, scenario_name="", federation_id="" ):
292292
try:
293293
url_request = self._hub_url + factory_requests_path(path, scenario_name, federation_id)
@@ -302,7 +302,7 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat
302302
self.logger.info("🔧 Initializing Scenario Builder using scenario data")
303303
sb.set_scenario_data(scenario_data)
304304
scenario_name = sb.get_scenario_name()
305-
305+
306306
self.root_path = os.environ.get("NEBULA_ROOT_HOST")
307307
self.host_platform = os.environ.get("NEBULA_HOST_PLATFORM")
308308
self.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name)
@@ -313,9 +313,9 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat
313313
self.env_tag = os.environ.get("NEBULA_ENV_TAG", "dev")
314314
self.prefix_tag = os.environ.get("NEBULA_PREFIX_TAG", "dev")
315315
self.user_tag = os.environ.get("NEBULA_USER_TAG", os.environ.get("USER", "unknown"))
316-
316+
317317
self.url = f"{os.environ.get('NEBULA_CONTROLLER_HOST')}:{os.environ.get('NEBULA_FEDERATION_CONTROLLER_PORT')}"
318-
318+
319319
# Create Scenario management dirs
320320
os.makedirs(self.config_dir, exist_ok=True)
321321
os.makedirs(os.path.join(self.log_dir, scenario_name), exist_ok=True)
@@ -348,12 +348,12 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat
348348
json.dump(settings, f, sort_keys=False, indent=2)
349349

350350
os.chmod(settings_file, 0o777)
351-
351+
352352
# Attacks assigment and mobility
353353
self.logger.info("🔧 Building general configuration")
354354
sb.build_general_configuration()
355355
self.logger.info("✅ Building general configuration done")
356-
356+
357357
# Create participant configs and .json
358358
for index, (_, node) in enumerate(sb.get_federation_nodes().items()):
359359
self.logger.info(f"Creating .json file for participant: {index}, Configuration: {node}")
@@ -364,8 +364,8 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat
364364
os.makedirs(os.path.dirname(participant_file), exist_ok=True)
365365
except Exception as e:
366366
self.logger.info(f"ERROR while creating files: {e}")
367-
368-
try:
367+
368+
try:
369369
participant_config = sb.build_scenario_config_for_node(index, node)
370370
#self.logger.info(f"dictionary: {participant_config}")
371371
except Exception as e:
@@ -379,7 +379,7 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat
379379
self.logger.info(f"ERROR while dumping configuration into files: {e}")
380380

381381
self.logger.info("✅ Initializing Scenario Builder done")
382-
382+
383383
async def _load_configuration_and_start_nodes(self, sb: ScenarioBuilder, federation: NebulaFederationDocker):
384384
self.logger.info("🔧 Loading Scenario configuration...")
385385
# Get participants configurations
@@ -391,19 +391,19 @@ async def _load_configuration_and_start_nodes(self, sb: ScenarioBuilder, federat
391391
federation.config.set_participants_config(participant_files)
392392
n_nodes = len(participant_files)
393393
#self.logger.info(f"Number of nodes: {n_nodes}")
394-
394+
395395
sb.create_topology_manager(federation.config)
396-
396+
397397
# Update participants configuration
398398
is_start_node = False
399399
config_participants = []
400-
400+
401401
additional_participants = sb.get_additional_nodes()
402402
additional_nodes = len(additional_participants) if additional_participants else 0
403403
#self.logger.info(f"######## nodes: {n_nodes} + additionals: {additional_nodes} ######")
404-
404+
405405
participant_files.sort(key=lambda x: int(x.split("_")[-1].split(".")[0]))
406-
406+
407407
# Initial participants
408408
self.logger.info("🔧 Building preload configuration for initial nodes...")
409409
for i in range(n_nodes):
@@ -437,12 +437,12 @@ async def _load_configuration_and_start_nodes(self, sb: ScenarioBuilder, federat
437437
raise ValueError("Only one node can be start node")
438438

439439
self.logger.info("✅ Building preload configuration for initial nodes done")
440-
440+
441441
federation.config.set_participants_config(participant_files)
442-
442+
443443
# Add role to the topology (visualization purposes)
444444
sb.visualize_topology(config_participants, path=f"{self.config_dir}/topology.png", plot=False)
445-
445+
446446
# Additional participants
447447
self.logger.info("🔧 Building preload configuration for additional nodes...")
448448
additional_participants_files = []
@@ -459,7 +459,7 @@ async def _load_configuration_and_start_nodes(self, sb: ScenarioBuilder, federat
459459

460460
self.logger.info(f"Configuration | additional nodes | participant: {n_nodes + i}")
461461
sb.build_preload_additional_node_configuration(last_participant_index, i, participant_config)
462-
462+
463463
with open(additional_participant_file, "w") as f:
464464
json.dump(participant_config, f, sort_keys=False, indent=2)
465465

@@ -473,8 +473,8 @@ async def _load_configuration_and_start_nodes(self, sb: ScenarioBuilder, federat
473473

474474
self.logger.info("✅ Building preload configuration for additional nodes done")
475475
self.logger.info("✅ Loading Scenario configuration done")
476-
477-
# Build dataset
476+
477+
# Build dataset
478478
dataset = sb.configure_dataset(self.config_dir)
479479
self.logger.info(f"🔧 Splitting {sb.get_dataset_name()} dataset...")
480480
dataset.initialize_dataset()
@@ -489,7 +489,7 @@ def _get_network_name(self, suffix: str) -> str:
489489
str: The composed network name.
490490
"""
491491
return f"{self.env_tag}_{self.prefix_tag}_{self.user_tag}_{suffix}"
492-
492+
493493
def _get_participant_container_name(self, scenario_name, idx: int) -> str:
494494
"""
495495
Generate a standardized container name for a participant using tags.
@@ -507,11 +507,11 @@ def _start_initial_nodes(self, sb: ScenarioBuilder, federation: NebulaFederation
507507

508508
# Create the Docker network
509509
federation.base = DockerUtils.create_docker_network(federation.network_name)
510-
510+
511511
federation.config.participants.sort(key=lambda x: x["device_args"]["idx"])
512512
federation.last_index_deployed = 2
513513
for idx, node in enumerate(federation.config.participants):
514-
514+
515515
if node["deployment_args"]["additional"]:
516516
federation.additionals_participants[idx] = int(node["deployment_args"]["deployment_round"])
517517
federation.participants_alive += 1
@@ -524,7 +524,7 @@ def _start_initial_nodes(self, sb: ScenarioBuilder, federation: NebulaFederation
524524
if deployed_successfully:
525525
federation.last_index_deployed += 1
526526
federation.participants_alive += 1
527-
527+
528528
def _start_node(self, scenario_name, node, network_name, base_network_name, base, i, federation: NebulaFederationDocker):
529529
success = True
530530
client = docker.from_env()
@@ -607,7 +607,7 @@ def _start_node(self, scenario_name, node, network_name, base_network_name, base
607607
except Exception as e:
608608
success = False
609609
self.logger.info(f"Starting participant {name} error: {e}")
610-
610+
611611
# Write scenario-level metadata for cleanup
612612
scenario_metadata = {"containers": container_names, "network": network_name}
613613
with open(os.path.join(self.config_dir, "scenario.metadata"), "a") as f:
@@ -619,5 +619,5 @@ def _start_node(self, scenario_name, node, network_name, base_network_name, base
619619
metadata["containers"].extend(container_names)
620620
with open(os.path.join(self.config_dir, "scenario.metadata"), "w") as f:
621621
json.dump(metadata, f, indent=2)
622-
623-
return success
622+
623+
return success

0 commit comments

Comments
 (0)