Skip to content

Commit 2a3e524

Browse files
Feature create federation (#56)
* feature creation federation ID * feature refactor file paths * fix scenario_name on response to run scenario * fix log dir path * fix log dir path on proccesses * feature federation_id added in database * feature alias added in database * fix path concurrency isues * fix wrong directories names * feature federation_id added in database_api and hub endpoints --------- Co-authored-by: Alejandro.A.S <jandrosambasil@gmail.com>
1 parent 2761629 commit 2a3e524

12 files changed

Lines changed: 497 additions & 407 deletions

File tree

nebula/config/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def __default_config(self):
8787

8888
def __set_default_logging(self, mode="w"):
8989
experiment_name = self.participant["scenario_args"]["name"]
90-
self.log_dir = os.path.join(self.participant["tracking_args"]["log_dir"], experiment_name)
90+
self.log_dir =self.participant["tracking_args"]["log_dir"]
9191
if not os.path.exists(self.log_dir):
9292
os.makedirs(self.log_dir)
9393
self.log_filename = f"{self.log_dir}/participant_{self.participant['device_args']['idx']}"

nebula/controller/federation/controllers/docker_federation_controller.py

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ def __init__(self):
3030
self.federation_round: int = 0
3131
self.federation_deployment_lock = Locker("federation_deployment_lock", async_lock=True)
3232
self.participants_alive_lock = Locker("participants_alive_lock", async_lock=True)
33+
self.config_dir = ""
34+
self.log_dir = ""
3335

3436
async def get_additionals_to_be_deployed(self, config) -> list:
3537
async with self.federation_deployment_lock:
@@ -89,7 +91,7 @@ async def run_scenario(self, federation_id: str, scenario_data: Dict, user: str)
8991
federation = await self._add_nebula_federation_to_pool(federation_id, user)
9092
scenario_info = {}
9193
if federation:
92-
scenario_builder = ScenarioBuilder(federation_id)
94+
scenario_builder = ScenarioBuilder(federation_id, user=user)
9395
await self._initialize_scenario(scenario_builder, scenario_data, federation)
9496
generate_ca_certificate(dir_path=self.cert_dir)
9597
await self._load_configuration_and_start_nodes(scenario_builder, federation)
@@ -301,12 +303,14 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat
301303
# Initialize Scenario builder using scenario_data from user
302304
self.logger.info("🔧 Initializing Scenario Builder using scenario data")
303305
sb.set_scenario_data(scenario_data)
304-
scenario_name = sb.get_scenario_name()
306+
scenario_name = sb.get_scenario_name(user_to=True)
305307

306308
self.root_path = os.environ.get("NEBULA_ROOT_HOST")
307309
self.host_platform = os.environ.get("NEBULA_HOST_PLATFORM")
308-
self.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name)
309-
self.log_dir = os.environ.get("NEBULA_LOGS_DIR")
310+
# self.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name)
311+
# self.log_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), scenario_name)
312+
federation.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), scenario_name)
313+
federation.log_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), scenario_name)
310314
self.cert_dir = os.environ.get("NEBULA_CERTS_DIR")
311315
self.advanced_analytics = os.environ.get("NEBULA_ADVANCED_ANALYTICS", "False") == "True"
312316
#self.config = Config(entity="FederationController")
@@ -317,17 +321,17 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat
317321
self.url = f"{os.environ.get('NEBULA_CONTROLLER_HOST')}:{os.environ.get('NEBULA_FEDERATION_CONTROLLER_PORT')}"
318322

319323
# Create Scenario management dirs
320-
os.makedirs(self.config_dir, exist_ok=True)
321-
os.makedirs(os.path.join(self.log_dir, scenario_name), exist_ok=True)
324+
os.makedirs(federation.config_dir, exist_ok=True)
325+
os.makedirs(federation.log_dir, exist_ok=True)
322326
os.makedirs(self.cert_dir, exist_ok=True)
323327

324328
# Give permissions to the directories
325-
os.chmod(self.config_dir, 0o777)
326-
os.chmod(os.path.join(self.log_dir, scenario_name), 0o777)
329+
os.chmod(federation.config_dir, 0o777)
330+
os.chmod(federation.log_dir, 0o777)
327331
os.chmod(self.cert_dir, 0o777)
328332

329333
# Save the scenario configuration
330-
scenario_file = os.path.join(self.config_dir, "scenario.json")
334+
scenario_file = os.path.join(federation.config_dir, "scenario.json")
331335
with open(scenario_file, "w") as f:
332336
json.dump(scenario_data, f, sort_keys=False, indent=2)
333337

@@ -337,13 +341,13 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat
337341
settings = {
338342
"scenario_name": scenario_name,
339343
"root_path": self.root_path,
340-
"config_dir": self.config_dir,
341-
"log_dir": self.log_dir,
344+
"config_dir": federation.config_dir,
345+
"log_dir": federation.log_dir,
342346
"cert_dir": self.cert_dir,
343347
"env": None,
344348
}
345349

346-
settings_file = os.path.join(self.config_dir, "settings.json")
350+
settings_file = os.path.join(federation.config_dir, "settings.json")
347351
with open(settings_file, "w") as f:
348352
json.dump(settings, f, sort_keys=False, indent=2)
349353

@@ -359,7 +363,7 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat
359363
self.logger.info(f"Creating .json file for participant: {index}, Configuration: {node}")
360364
node_config = node
361365
try:
362-
participant_file = os.path.join(self.config_dir, f"participant_{node_config['id']}.json")
366+
participant_file = os.path.join(federation.config_dir, f"participant_{node_config['id']}.json")
363367
self.logger.info(f"Filename: {participant_file}")
364368
os.makedirs(os.path.dirname(participant_file), exist_ok=True)
365369
except Exception as e:
@@ -383,7 +387,7 @@ async def _initialize_scenario(self, sb: ScenarioBuilder, scenario_data, federat
383387
async def _load_configuration_and_start_nodes(self, sb: ScenarioBuilder, federation: NebulaFederationDocker):
384388
self.logger.info("🔧 Loading Scenario configuration...")
385389
# Get participants configurations
386-
participant_files = glob.glob(f"{self.config_dir}/participant_*.json")
390+
participant_files = glob.glob(f"{federation.config_dir}/participant_*.json")
387391
participant_files.sort()
388392
if len(participant_files) == 0:
389393
raise ValueError("No participant files found in config folder")
@@ -408,19 +412,19 @@ async def _load_configuration_and_start_nodes(self, sb: ScenarioBuilder, federat
408412
self.logger.info("🔧 Building preload configuration for initial nodes...")
409413
for i in range(n_nodes):
410414
try:
411-
with open(f"{self.config_dir}/participant_" + str(i) + ".json") as f:
415+
with open(f"{federation.config_dir}/participant_" + str(i) + ".json") as f:
412416
participant_config = json.load(f)
413417
except Exception as e:
414418
self.logger.info(f"ERROR: open/load participant .json")
415419

416420
self.logger.info(f"Building preload conf for participant {i}")
417421
try:
418-
sb.build_preload_initial_node_configuration(i, participant_config, self.log_dir, self.config_dir, self.cert_dir, self.advanced_analytics)
422+
sb.build_preload_initial_node_configuration(i, participant_config, federation.log_dir, federation.config_dir, self.cert_dir, self.advanced_analytics)
419423
except Exception as e:
420424
self.logger.info(f"ERROR: cannot build preload configuration")
421425

422426
try:
423-
with open(f"{self.config_dir}/participant_" + str(i) + ".json", "w") as f:
427+
with open(f"{federation.config_dir}/participant_" + str(i) + ".json", "w") as f:
424428
json.dump(participant_config, f, sort_keys=False, indent=2)
425429
except Exception as e:
426430
self.logger.info(f"ERROR: cannot dump preload configuration into participant .json file")
@@ -441,7 +445,7 @@ async def _load_configuration_and_start_nodes(self, sb: ScenarioBuilder, federat
441445
federation.config.set_participants_config(participant_files)
442446

443447
# Add role to the topology (visualization purposes)
444-
sb.visualize_topology(config_participants, path=f"{self.config_dir}/topology.png", plot=False)
448+
sb.visualize_topology(config_participants, path=f"{federation.config_dir}/topology.png", plot=False)
445449

446450
# Additional participants
447451
self.logger.info("🔧 Building preload configuration for additional nodes...")
@@ -451,7 +455,7 @@ async def _load_configuration_and_start_nodes(self, sb: ScenarioBuilder, federat
451455
last_participant_index = len(participant_files)
452456

453457
for i, _ in enumerate(additional_participants):
454-
additional_participant_file = f"{self.config_dir}/participant_{last_participant_index + i}.json"
458+
additional_participant_file = f"{federation.config_dir}/participant_{last_participant_index + i}.json"
455459
shutil.copy(last_participant_file, additional_participant_file)
456460

457461
with open(additional_participant_file) as f:
@@ -475,7 +479,7 @@ async def _load_configuration_and_start_nodes(self, sb: ScenarioBuilder, federat
475479
self.logger.info("✅ Loading Scenario configuration done")
476480

477481
# Build dataset
478-
dataset = sb.configure_dataset(self.config_dir)
482+
dataset = sb.configure_dataset(federation.config_dir)
479483
self.logger.info(f"🔧 Splitting {sb.get_dataset_name()} dataset...")
480484
dataset.initialize_dataset()
481485
self.logger.info(f"✅ Splitting {sb.get_dataset_name()} dataset... Done")
@@ -502,7 +506,7 @@ def _get_participant_container_name(self, scenario_name, idx: int) -> str:
502506

503507
def _start_initial_nodes(self, sb: ScenarioBuilder, federation: NebulaFederationDocker):
504508
self.logger.info("Starting nodes using Docker Compose...")
505-
federation.network_name = self._get_network_name(f"{sb.get_scenario_name()}-net-scenario")
509+
federation.network_name = self._get_network_name(f"{sb.get_scenario_name(user_to=True)}-net-scenario")
506510
federation.base_network_name = self._get_network_name("net-base")
507511

508512
# Create the Docker network
@@ -520,7 +524,7 @@ def _start_initial_nodes(self, sb: ScenarioBuilder, federation: NebulaFederation
520524
# deploy initial nodes
521525
self.logger.info(f"Deployment starting for participant {idx}")
522526
federation.round_per_participant[idx] = 0
523-
deployed_successfully = self._start_node(sb.get_scenario_name(), node, federation.network_name, federation.base_network_name, federation.base, federation.last_index_deployed, federation)
527+
deployed_successfully = self._start_node(sb.get_scenario_name(user_to=True), node, federation.network_name, federation.base_network_name, federation.base, federation.last_index_deployed, federation)
524528
if deployed_successfully:
525529
federation.last_index_deployed += 1
526530
federation.participants_alive += 1
@@ -568,8 +572,8 @@ def _start_node(self, scenario_name, node, network_name, base_network_name, base
568572
),
569573
base_network_name: client.api.create_endpoint_config(),
570574
})
571-
node["tracking_args"]["log_dir"] = "/nebula/app/logs"
572-
node["tracking_args"]["config_dir"] = f"/nebula/app/config/{scenario_name}"
575+
node["tracking_args"]["log_dir"] = federation.log_dir
576+
node["tracking_args"]["config_dir"] = federation.config_dir
573577
node["scenario_args"]["controller"] = self.url
574578
node["scenario_args"]["deployment"] = "docker"
575579
node["security_args"]["certfile"] = f"/nebula/app/certs/participant_{node['device_args']['idx']}_cert.pem"
@@ -583,7 +587,7 @@ def _start_node(self, scenario_name, node, network_name, base_network_name, base
583587
except docker.errors.NotFound:
584588
pass # No conflict, safe to proceed
585589
# Write the config file in config directory
586-
with open(f"{self.config_dir}/participant_{node['device_args']['idx']}.json", "w") as f:
590+
with open(f"{federation.config_dir}/participant_{node['device_args']['idx']}.json", "w") as f:
587591
json.dump(node, f, indent=4)
588592
try:
589593
container_id = client.api.create_container(
@@ -610,14 +614,14 @@ def _start_node(self, scenario_name, node, network_name, base_network_name, base
610614

611615
# Write scenario-level metadata for cleanup
612616
scenario_metadata = {"containers": container_names, "network": network_name}
613-
with open(os.path.join(self.config_dir, "scenario.metadata"), "a") as f:
617+
with open(os.path.join(federation.config_dir, "scenario.metadata"), "a") as f:
614618
if i == 2:
615619
json.dump(scenario_metadata, f, indent=2)
616620
else:
617-
with open(os.path.join(self.config_dir, "scenario.metadata"), "r") as f:
621+
with open(os.path.join(federation.config_dir, "scenario.metadata"), "r") as f:
618622
metadata = json.load(f)
619623
metadata["containers"].extend(container_names)
620-
with open(os.path.join(self.config_dir, "scenario.metadata"), "w") as f:
624+
with open(os.path.join(federation.config_dir, "scenario.metadata"), "w") as f:
621625
json.dump(metadata, f, indent=2)
622626

623627
return success

0 commit comments

Comments
 (0)