diff --git a/app/deployer.py b/app/deployer.py index 968ef62da..a0a5d83d7 100644 --- a/app/deployer.py +++ b/app/deployer.py @@ -17,7 +17,7 @@ from nebula.addons.env import check_environment from nebula.controller.controller import TermEscapeCodeFormatter from nebula.controller.scenarios import ScenarioManagement -from nebula.utils import DockerUtils, SocketUtils +from nebula.utils import DockerUtils, FileUtils, SocketUtils class NebulaEventHandler(PatternMatchingEventHandler): @@ -289,17 +289,17 @@ def run_script(self, script): def kill_script_processes(self, pids_file): """ Forcefully terminates processes listed in a given PID file, including their child processes. - + Args: pids_file (str): Path to the file containing PIDs, one per line. - + Behavior: - Reads the PIDs from the file. - For each PID, checks if the process exists. - If it exists, kills all child processes recursively before killing the main process. - Handles and logs exceptions such as missing processes or invalid PID entries. - Logs warnings and errors appropriately. - + Typical use case: Used to clean up running processes related to a scenario or script that has been deleted or stopped. """ @@ -344,7 +344,7 @@ def run_observer(): """ Starts a watchdog observer to monitor the configuration directory for changes. - This function is typically used to execute additional scripts or trigger events + This function is typically used to execute additional scripts or trigger events during the execution of a federated learning session by monitoring file system changes. Main functionalities: @@ -357,7 +357,7 @@ def run_observer(): - Trigger specific actions during a federation lifecycle. Note: - The observer runs in a blocking mode and will keep the process alive + The observer runs in a blocking mode and will keep the process alive until manually stopped or interrupted. """ # Watchdog for running additional scripts in the host machine (i.e. during the execution of a federation) @@ -373,8 +373,8 @@ class Deployer: """ Handles the configuration and initialization of deployment parameters for the NEBULA system. - This class reads and stores various deployment-related settings such as port assignments, - environment paths, logging configuration, and system mode (production, development, or simulation). + This class reads and stores various deployment-related settings such as port assignments, + environment paths, logging configuration, and system mode (production or development). Main functionalities: - Parses and validates input arguments for deployment. @@ -384,7 +384,7 @@ class Deployer: Typical use cases: - Used to deploy the NEBULA system components with the correct configuration. - - Enables deployment in different environments (e.g., local simulation, production, development). + - Enables deployment in different environments (e.g., production, development). Attributes: - controller_port (int): Port for the main controller service. @@ -395,9 +395,7 @@ class Deployer: - statistics_port (int): Port for the statistics service. - production (bool): Flag indicating if the system is in production mode. - dev (bool): Flag indicating if the system is in development mode. - - advanced_analytics (bool): Enables advanced analytics modules. - databases_dir (str): Path to the database directory. - - simulation (str): Simulation scenario path. - config_dir (str): Path to the configuration directory. - log_dir (str): Path to the logs directory. - env_path (str): Path to the Python environment. @@ -409,18 +407,145 @@ class Deployer: Note: This class does not launch any services directly; it only prepares and stores configuration. """ + + DEPLOYER_PID_FILE = os.path.join(os.path.dirname(__file__), "deployer.pid") + METADATA_FILE = os.path.join(os.path.dirname(__file__), "deployer.metadata") + + @staticmethod + def _read_metadata(): + try: + with open(Deployer.METADATA_FILE, "r") as f: + data = json.load(f) + # Backward compatibility: if it's a list, treat as containers only + if isinstance(data, list): + return {"containers": data, "networks": []} + return data + except (FileNotFoundError, json.JSONDecodeError): + return {"containers": [], "networks": []} + + @staticmethod + def _write_metadata(metadata): + with open(Deployer.METADATA_FILE, "w") as f: + json.dump(metadata, f, indent=2) + + @staticmethod + def _add_container_to_metadata(container_name): + metadata = Deployer._read_metadata() + if container_name not in metadata["containers"]: + metadata["containers"].append(container_name) + Deployer._write_metadata(metadata) + + @staticmethod + def _add_network_to_metadata(network_name): + metadata = Deployer._read_metadata() + if network_name not in metadata["networks"]: + metadata["networks"].append(network_name) + Deployer._write_metadata(metadata) + + @staticmethod + def _remove_all_containers_from_metadata(): + metadata = Deployer._read_metadata() + containers = metadata["containers"] + if containers: + try: + import docker + + client = docker.from_env() + for name in containers: + try: + container = client.containers.get(name) + container.remove(force=True) + logging.info(f"Container {name} removed via metadata.") + except Exception as e: + logging.warning(f"Could not remove container {name}: {e}") + except Exception as e: + logging.warning(f"Docker error during metadata removal: {e}") + metadata["containers"] = [] + Deployer._write_metadata(metadata) + + @staticmethod + def _remove_all_networks_from_metadata(): + metadata = Deployer._read_metadata() + networks = metadata["networks"] + if networks: + try: + import docker + + client = docker.from_env() + for name in networks: + try: + network = client.networks.get(name) + network.remove() + logging.info(f"Network {name} removed via metadata.") + except Exception as e: + logging.warning(f"Could not remove network {name}: {e}") + except Exception as e: + logging.warning(f"Docker error during network metadata removal: {e}") + metadata["networks"] = [] + Deployer._write_metadata(metadata) + def __init__(self, args): - self.controller_port = int(args.controllerport) if hasattr(args, "controllerport") else 5050 - self.waf_port = int(args.wafport) if hasattr(args, "wafport") else 6000 - self.frontend_port = int(args.webport) if hasattr(args, "webport") else 6060 - self.grafana_port = int(args.grafanaport) if hasattr(args, "grafanaport") else 6040 - self.loki_port = int(args.lokiport) if hasattr(args, "lokiport") else 6010 - self.statistics_port = int(args.statsport) if hasattr(args, "statsport") else 8080 - self.production = args.production if hasattr(args, "production") else False - self.dev = args.developement if hasattr(args, "developement") else False - self.advanced_analytics = args.advanced_analytics if hasattr(args, "advanced_analytics") else False + """ + Initializes the Deployer with robust handling of environment and prefix logic using tags only. + - Only sets NEBULA_ENV_TAG, NEBULA_PREFIX_TAG, and NEBULA_USER_TAG in the .env file. + - All logic and naming use tag helpers and tag variables. + - Defaults for prefix and production are consistent with main.py. + """ + # Prevent running NEBULA twice by checking metadata file + if os.path.exists(self.METADATA_FILE): + try: + with open(self.METADATA_FILE) as f: + data = json.load(f) + if (isinstance(data, dict) and (data.get("containers") or data.get("networks"))) or ( + isinstance(data, list) and data + ): + warning_msg = ( + "\n\033[91mERROR: NEBULA appears to be already running or was not cleanly shut down. " + "Please stop the existing instance or remove the metadata file before starting a new one.\033[0m\n" + "You can use 'docker ps -a --filter name={deployment_prefix}' to see the containers." + ) + logging.exception(warning_msg) + sys.exit(1) + except Exception: + warning_msg = ( + "\n\033[91mERROR: NEBULA metadata file is corrupt or unreadable. " + "Please remove or fix the file before starting a new instance.\033[0m\n" + ) + logging.exception(warning_msg) + sys.exit(1) + + # --- Tag logic: CLI args > environment > fallback --- + arg_production = getattr(args, "production", False) + arg_prefix = getattr(args, "prefix", "dev") + arg_user = os.environ.get("USER", "unknown") + + env_tag = os.environ.get("NEBULA_ENV_TAG") + prefix_tag = os.environ.get("NEBULA_PREFIX_TAG") + user_tag = os.environ.get("NEBULA_USER_TAG", arg_user) + + self.env_tag = ("prod" if arg_production else "dev") if env_tag is None else env_tag + self.prefix_tag = arg_prefix if arg_prefix else (prefix_tag if prefix_tag else "dev") + self.user_tag = user_tag + + FileUtils.update_env_file(getattr(args, "env", ".env"), "NEBULA_ENV_TAG", self.env_tag) + FileUtils.update_env_file(getattr(args, "env", ".env"), "NEBULA_PREFIX_TAG", self.prefix_tag) + FileUtils.update_env_file(getattr(args, "env", ".env"), "NEBULA_USER_TAG", self.user_tag) + + self.production = self.env_tag == "prod" + self.prefix = self.prefix_tag + + deployment_prefix = f"{self.env_tag}_{self.prefix_tag}_{self.user_tag}_" + if DockerUtils.check_docker_by_prefix(deployment_prefix): + warning_msg = ( + f"\n\033[91mERROR: Found existing Docker containers with prefix '{deployment_prefix}'. " + f"NEBULA cannot be deployed with the same prefix. " + f"Please stop/remove existing containers before starting a new deployment.\033[0m\n" + f"You can use 'docker ps -a --filter name={deployment_prefix}' to see the containers." + ) + logging.exception(warning_msg) + sys.exit(1) + self.databases_dir = args.databases if hasattr(args, "databases") else "/nebula/app/databases" - self.simulation = args.simulation self.config_dir = args.config self.log_dir = args.logs self.env_path = args.env @@ -430,15 +555,61 @@ def __init__(self, args): else os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) self.host_platform = "windows" if sys.platform == "win32" else "unix" - self.controller_host = f"{os.environ['USER']}_nebula-controller" self.gpu_available = False + + self.controller_host = self.get_container_name("nebula-controller") + self.controller_port = int(args.controllerport) if hasattr(args, "controllerport") else 5050 + self.waf_port = int(args.wafport) if hasattr(args, "wafport") else 6000 + self.frontend_port = int(args.webport) if hasattr(args, "webport") else 6060 + self.grafana_port = int(args.grafanaport) if hasattr(args, "grafanaport") else 6040 + self.loki_port = int(args.lokiport) if hasattr(args, "lokiport") else 6010 + self.statistics_port = int(args.statsport) if hasattr(args, "statsport") else 8080 + self.configure_logger() + def get_container_name(self, role_tag: str) -> str: + """ + Generate a standardized container name using tags. + Args: + role_tag (str): The component role (e.g., 'nebula-controller'). + Returns: + str: The composed container name. + """ + return f"{self.env_tag}_{self.prefix_tag}_{self.user_tag}_{role_tag}" + + def get_network_name(self, suffix: str) -> str: + """ + Generate a standardized network name using tags. + Args: + suffix (str): Suffix for the network (default: 'net-base'). + Returns: + str: The composed network name. + """ + return f"{self.env_tag}_{self.prefix_tag}_{self.user_tag}_{suffix}" + + @property + def deployment_prefix(self): + """ + Returns the deployment prefix for the current deployment. + + This property is used to prefix the names of the containers and networks + in the deployment. + + Returns: + str: The deployment prefix, either "production" or "dev". + + Typical use cases: + - Prefixing container and network names in the deployment. + - Ensuring consistent naming conventions across different environments. + + """ + return self.prefix + def configure_logger(self): """ Configures the logging system for the deployment controller. - This method sets up both console and file logging with a consistent format and appropriate log levels. + This method sets up both console and file logging with a consistent format and appropriate log levels. It also ensures that Uvicorn loggers are properly configured to avoid duplicate log outputs. Main functionalities: @@ -452,7 +623,7 @@ def configure_logger(self): - Ensures clean and consistent logging output during deployment. Note: - This method does not set up file logging directly, but prepares the base configuration + This method does not set up file logging directly, but prepares the base configuration and Uvicorn logger behavior for further logging use. """ log_console_format = "[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s" @@ -475,7 +646,7 @@ def ensure_directory_access(self, directory_path: str) -> str: """ Ensures that the specified directory exists and is writable. - This method attempts to create the directory if it does not exist and verifies + This method attempts to create the directory if it does not exist and verifies write access by writing and deleting a temporary metadata file. Args: @@ -514,15 +685,17 @@ def ensure_directory_access(self, directory_path: str) -> str: except Exception as e: logging.exception(f"Failed to create/access directory {directory_path}: {str(e)}") - logging.exception("Please check directory permissions or choose a different location using --database option") + logging.exception( + "Please check directory permissions or choose a different location using --database option" + ) raise SystemExit(1) from e def start(self): """ Starts the NEBULA deployment process and all associated services. - This method initializes the NEBULA platform by setting up the environment, - checking port availability, starting key services (controller, frontend, WAF), + This method initializes the NEBULA platform by setting up the environment, + checking port availability, starting key services (controller, frontend, WAF), and launching a filesystem observer to react to configuration changes. Main functionalities: @@ -535,14 +708,14 @@ def start(self): - Handles system signals for clean shutdown. Typical use cases: - - Used to launch NEBULA in production, development, or simulation environments. + - Used to launch NEBULA in production or development environments. - Central entry point for managing NEBULA components during deployment. Note: - The method blocks indefinitely until manually interrupted, + The method blocks indefinitely until manually interrupted, and ensures graceful shutdown upon receiving SIGINT or SIGTERM. """ - banner = """ + banner = f""" ███╗ ██╗███████╗██████╗ ██╗ ██╗██╗ █████╗ ████╗ ██║██╔════╝██╔══██╗██║ ██║██║ ██╔══██╗ ██╔██╗ ██║█████╗ ██████╔╝██║ ██║██║ ███████║ @@ -558,6 +731,8 @@ def start(self): • Fernando Torres Vega https://nebula-dfl.com / https://nebula-dfl.eu + + [{"Production" if self.production else "Development"} mode] [{self.deployment_prefix} prefix] """ print("\x1b[0;36m" + banner + "\x1b[0m") @@ -589,7 +764,8 @@ def start(self): logging.info(f"NEBULA Databases created in {self.databases_dir}") self.run_frontend() logging.info(f"NEBULA Frontend is running at http://localhost:{self.frontend_port}") - if self.production: + if self.production and self.prefix == "production": + logging.info("Deploying NEBULA WAF in production mode") self.run_waf() logging.info("NEBULA WAF is running") @@ -616,8 +792,8 @@ def signal_handler(self, sig, frame): """ Handles system termination signals to ensure a clean shutdown. - This method is triggered when the application receives SIGTERM or SIGINT signals - (e.g., via Ctrl+C or `kill`). It logs the event, performs cleanup actions, and + This method is triggered when the application receives SIGTERM or SIGINT signals + (e.g., via Ctrl+C or `kill`). It logs the event, performs cleanup actions, and terminates the process gracefully. Args: @@ -639,19 +815,19 @@ def signal_handler(self, sig, frame): def run_frontend(self): """ - Runs the Nebula controller within a Docker container, ensuring the required Docker environment is available. + Runs the NEBULA controller within a Docker container, ensuring the required Docker environment is available. This method: - Checks if Docker is running by verifying the Docker socket presence (platform-dependent). - - Creates a dedicated Docker network for the Nebula system. + - Creates a dedicated Docker network for the NEBULA system. - Configures environment variables, volume mounts, ports, and network settings for the container. - - Creates and starts the Nebula controller Docker container with the specified configuration. + - Creates and starts the NEBULA controller Docker container with the specified configuration. Raises: Exception: If Docker is not running or Docker Compose is not installed. Typical use cases: - - Launching the Nebula controller as part of the federated learning infrastructure. + - Launching the NEBULA controller as part of the federated learning infrastructure. - Ensuring proper Docker networking and environment setup for container execution. Note: @@ -668,17 +844,19 @@ def run_frontend(self): "/var/run/docker.sock not found, please check if Docker is running and Docker Compose is installed." ) - network_name = f"{os.environ['USER']}_nebula-net-base" + network_name = self.get_network_name("net-base") # Create the Docker network base = DockerUtils.create_docker_network(network_name) + Deployer._add_network_to_metadata(network_name) client = docker.from_env() environment = { - "NEBULA_CONTROLLER_NAME": os.environ["USER"], "NEBULA_PRODUCTION": self.production, - "NEBULA_ADVANCED_ANALYTICS": self.advanced_analytics, + "NEBULA_ENV_TAG": self.env_tag, + "NEBULA_PREFIX_TAG": self.prefix_tag, + "NEBULA_USER_TAG": self.user_tag, "NEBULA_FRONTEND_LOG": "/nebula/app/logs/frontend.log", "NEBULA_LOGS_DIR": "/nebula/app/logs/", "NEBULA_CONFIG_DIR": "/nebula/app/config/", @@ -709,9 +887,19 @@ def run_frontend(self): f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.100") }) + frontend_container_name = self.get_container_name("nebula-frontend") + + try: + existing = client.containers.get(frontend_container_name) + logging.warning( + f"Container {frontend_container_name} already exists. Deployment may fail or cause conflicts." + ) + except docker.errors.NotFound: + pass # No conflict, safe to proceed + container_id = client.api.create_container( image="nebula-frontend", - name=f"{os.environ['USER']}_nebula-frontend", + name=frontend_container_name, detach=True, environment=environment, volumes=volumes, @@ -721,20 +909,8 @@ def run_frontend(self): ) client.api.start(container_id) - - @staticmethod - def stop_frontend(): - """ - Stops and removes all NEBULA frontend Docker containers associated with the current user. - - Responsibilities: - - Detects running Docker containers with names starting with '_nebula-frontend'. - - Gracefully stops and removes these frontend containers. - - Typical use cases: - - Cleaning up frontend containers during shutdown or redeployment processes. - """ - DockerUtils.remove_containers_by_prefix(f"{os.environ['USER']}_nebula-frontend") + # Add to metadata + Deployer._add_container_to_metadata(frontend_container_name) def run_controller(self): if sys.platform == "win32": @@ -748,8 +924,8 @@ def run_controller(self): "/var/run/docker.sock not found, please check if Docker is running and Docker Compose is installed." ) - network_name = f"{os.environ['USER']}_nebula-net-base" - + network_name = self.get_network_name("net-base") + try: subprocess.check_call(["nvidia-smi"]) self.gpu_available = True @@ -758,14 +934,16 @@ def run_controller(self): # Create the Docker network base = DockerUtils.create_docker_network(network_name) + Deployer._add_network_to_metadata(network_name) client = docker.from_env() environment = { "USER": os.environ["USER"], - "NEBULA_PRODUCTION": self.production, + "NEBULA_ENV_TAG": self.env_tag, + "NEBULA_PREFIX_TAG": self.prefix_tag, + "NEBULA_USER_TAG": self.user_tag, "NEBULA_ROOT_HOST": self.root_path, - "NEBULA_ADVANCED_ANALYTICS": self.advanced_analytics, "NEBULA_DATABASES_DIR": "/nebula/app/databases", "NEBULA_CONTROLLER_LOG": "/nebula/app/logs/controller.log", "NEBULA_CONFIG_DIR": "/nebula/app/config/", @@ -785,24 +963,38 @@ def run_controller(self): binds=[ f"{self.root_path}:/nebula", "/var/run/docker.sock:/var/run/docker.sock", - f"{self.databases_dir}:/nebula/app/databases" + f"{self.databases_dir}:/nebula/app/databases", ], extra_hosts={"host.docker.internal": "host-gateway"}, port_bindings={self.controller_port: self.controller_port}, - device_requests=[{ - "Driver": "nvidia", - "Count": -1, - "Capabilities": [["gpu"]], - }] if self.gpu_available else None, + device_requests=[ + { + "Driver": "nvidia", + "Count": -1, + "Capabilities": [["gpu"]], + } + ] + if self.gpu_available + else None, ) networking_config = client.api.create_networking_config({ f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.150") }) + controller_container_name = self.get_container_name("nebula-controller") + + try: + existing = client.containers.get(controller_container_name) + logging.warning( + f"Container {controller_container_name} already exists. Deployment may fail or cause conflicts." + ) + except docker.errors.NotFound: + pass # No conflict, safe to proceed + container_id = client.api.create_container( image="nebula-controller", - name=f"{os.environ['USER']}_nebula-controller", + name=controller_container_name, detach=True, environment=environment, volumes=volumes, @@ -812,21 +1004,8 @@ def run_controller(self): ) client.api.start(container_id) - - @staticmethod - def stop_controller(): - """ - Stops all running Docker containers with names starting with '_nebula-controller'. - - Responsibilities: - - Initiates shutdown of all participant nodes related to the scenario. - - Gracefully stops and removes controller containers to ensure clean shutdown. - - Typical use cases: - - Used when stopping or restarting the Nebula controller service. - """ - ScenarioManagement.stop_participants() - DockerUtils.remove_containers_by_prefix(f"{os.environ['USER']}_nebula-controller") + # Add to metadata + Deployer._add_container_to_metadata(controller_container_name) def run_waf(self): """ @@ -841,11 +1020,12 @@ def run_waf(self): - Assigns static IP addresses to all containers within the created Docker network for consistent communication. Typical use cases: - - Deploying an integrated WAF solution alongside monitoring and logging components in the Nebula system. + - Deploying an integrated WAF solution alongside monitoring and logging components in the NEBULA system. - Ensuring comprehensive security monitoring and log management through containerized services. """ - network_name = f"{os.environ['USER']}_nebula-net-base" + network_name = self.get_network_name("net-base") base = DockerUtils.create_docker_network(network_name) + Deployer._add_network_to_metadata(network_name) client = docker.from_env() @@ -863,9 +1043,17 @@ def run_waf(self): f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.200") }) + waf_container_name = self.get_container_name("nebula-waf") + + try: + existing = client.containers.get(waf_container_name) + logging.warning(f"Container {waf_container_name} already exists. Deployment may fail or cause conflicts.") + except docker.errors.NotFound: + pass # No conflict, safe to proceed + container_id_waf = client.api.create_container( image="nebula-waf", - name=f"{os.environ['USER']}_nebula-waf", + name=waf_container_name, detach=True, volumes=volumes_waf, host_config=host_config_waf, @@ -874,6 +1062,7 @@ def run_waf(self): ) client.api.start(container_id_waf) + Deployer._add_container_to_metadata(waf_container_name) environment = { "GF_SECURITY_ADMIN_PASSWORD": "admin", @@ -897,9 +1086,19 @@ def run_waf(self): f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.201") }) + waf_grafana_container_name = self.get_container_name("nebula-waf-grafana") + + try: + existing = client.containers.get(waf_grafana_container_name) + logging.warning( + f"Container {waf_grafana_container_name} already exists. Deployment may fail or cause conflicts." + ) + except docker.errors.NotFound: + pass # No conflict, safe to proceed + container_id = client.api.create_container( image="nebula-waf-grafana", - name=f"{os.environ['USER']}_nebula-waf-grafana", + name=waf_grafana_container_name, detach=True, environment=environment, host_config=host_config, @@ -908,6 +1107,7 @@ def run_waf(self): ) client.api.start(container_id) + Deployer._add_container_to_metadata(waf_grafana_container_name) command = ["-config.file=/mnt/config/loki-config.yml"] @@ -921,9 +1121,19 @@ def run_waf(self): f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.202") }) + waf_loki_container_name = self.get_container_name("nebula-waf-loki") + + try: + existing = client.containers.get(waf_loki_container_name) + logging.warning( + f"Container {waf_loki_container_name} already exists. Deployment may fail or cause conflicts." + ) + except docker.errors.NotFound: + pass # No conflict, safe to proceed + container_id_loki = client.api.create_container( image="nebula-waf-loki", - name=f"{os.environ['USER']}_nebula-waf-loki", + name=waf_loki_container_name, detach=True, command=command, host_config=host_config_loki, @@ -932,6 +1142,7 @@ def run_waf(self): ) client.api.start(container_id_loki) + Deployer._add_container_to_metadata(waf_loki_container_name) volumes_promtail = ["/var/log/nginx"] @@ -945,9 +1156,19 @@ def run_waf(self): f"{network_name}": client.api.create_endpoint_config(ipv4_address=f"{base}.203") }) + waf_promtail_container_name = self.get_container_name("nebula-waf-promtail") + + try: + existing = client.containers.get(waf_promtail_container_name) + logging.warning( + f"Container {waf_promtail_container_name} already exists. Deployment may fail or cause conflicts." + ) + except docker.errors.NotFound: + pass # No conflict, safe to proceed + container_id_promtail = client.api.create_container( image="nebula-waf-promtail", - name=f"{os.environ['USER']}_nebula-waf-promtail", + name=waf_promtail_container_name, detach=True, volumes=volumes_promtail, host_config=host_config_promtail, @@ -955,48 +1176,79 @@ def run_waf(self): ) client.api.start(container_id_promtail) + Deployer._add_container_to_metadata(waf_promtail_container_name) @staticmethod - def stop_waf(): - """ - Stops all running Docker containers with names starting with '_nebula-waf'. - - Responsibilities: - - Gracefully shuts down and removes all WAF-related containers for the current user. - - Typical use cases: - - Cleaning up WAF containers during shutdown or redeployment of the Nebula system. - """ - DockerUtils.remove_containers_by_prefix(f"{os.environ['USER']}_nebula-waf") + def stop_deployer(): + if os.path.exists(Deployer.DEPLOYER_PID_FILE): + try: + with open(Deployer.DEPLOYER_PID_FILE) as f: + pid = int(f.read()) + os.remove(Deployer.DEPLOYER_PID_FILE) + # Check if process still exists before trying to kill it + if psutil.pid_exists(pid): + os.kill(pid, signal.SIGKILL) + logging.info(f"Deployer process {pid} terminated") + else: + logging.info(f"Deployer process {pid} already terminated") + except (ValueError, OSError) as e: + logging.warning(f"Error stopping deployer process: {e}") + except Exception as e: + logging.warning(f"Unexpected error stopping deployer process: {e}") @staticmethod def stop_all(): """ - Stops all running Nebula-related Docker containers and networks, then terminates the deployer process. + Stops all running NEBULA-related Docker containers and networks, then terminates the deployer process. Responsibilities: - Stops frontend, controller, and WAF containers for the current user. - - Removes all Docker containers and networks with names starting with the user's prefix. + - Removes all Docker containers tracked in the metadata file. - Reads and kills the deployer process using its PID file. - Exits the system cleanly, handling any exceptions during shutdown. Typical use cases: - - Full shutdown and cleanup of all Nebula components and resources on the host system. + - Full shutdown and cleanup of all NEBULA components and resources on the host system. """ print("Closing NEBULA (exiting from components)... Please wait") + errors = [] + try: - Deployer.stop_frontend() - Deployer.stop_controller() - Deployer.stop_waf() - DockerUtils.remove_containers_by_prefix(f"{os.environ['USER']}_") - DockerUtils.remove_docker_networks_by_prefix(f"{os.environ['USER']}_") - deployer_pid_file = os.path.join(os.path.dirname(__file__), "deployer.pid") - with open(deployer_pid_file) as f: - pid = int(f.read()) - os.remove(deployer_pid_file) - os.kill(pid, signal.SIGKILL) - sys.exit(0) + # Remove all scenario containers + ScenarioManagement.cleanup_scenario_containers() except Exception as e: - print(f"Nebula is closed with errors {e}") - finally: - sys.exit(0) + errors.append(f"Scenario cleanup error: {e}") + logging.warning(f"Error during scenario cleanup: {e}") + + try: + Deployer._remove_all_containers_from_metadata() + except Exception as e: + errors.append(f"Container cleanup error: {e}") + logging.warning(f"Error during container cleanup: {e}") + + try: + Deployer._remove_all_networks_from_metadata() + except Exception as e: + errors.append(f"Network cleanup error: {e}") + logging.warning(f"Error during network cleanup: {e}") + + try: + # Remove the metadata file after cleanup + if os.path.exists(Deployer.METADATA_FILE): + os.remove(Deployer.METADATA_FILE) + except Exception as e: + errors.append(f"Metadata file removal error: {e}") + logging.warning(f"Error removing metadata file: {e}") + + try: + Deployer.stop_deployer() + except Exception as e: + errors.append(f"Deployer stop error: {e}") + logging.warning(f"Error stopping deployer: {e}") + + if errors: + print(f"NEBULA is closed with errors: {'; '.join(errors)}") + else: + print("NEBULA closed successfully") + + sys.exit(0) diff --git a/app/main.py b/app/main.py index 4c4de4f70..e13d1641e 100755 --- a/app/main.py +++ b/app/main.py @@ -64,14 +64,12 @@ help="Stop NEBULA platform or nodes only (use '--stop nodes' to stop only the nodes)", ) -argparser.add_argument("-s", "--simulation", action="store_false", dest="simulation", help="Run simulation") - argparser.add_argument( "-c", "--config", dest="config", default=os.path.join(os.path.dirname(os.path.abspath(__file__)), "config"), - help="Config directory path", + help="NEBULA config directory path", ) argparser.add_argument( @@ -79,7 +77,7 @@ "--database", dest="databases", default=os.path.join(os.path.dirname(os.path.abspath(__file__)), "databases"), - help="Nebula databases path", + help="NEBULA databases directory path", ) argparser.add_argument( @@ -87,7 +85,7 @@ "--logs", dest="logs", default=os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs"), - help="Logs directory path", + help="NEBULA logs directory path", ) argparser.add_argument( @@ -95,7 +93,7 @@ "--certs", dest="certs", default=os.path.join(os.path.dirname(os.path.abspath(__file__)), "certs"), - help="Certs directory path", + help="NEBULA certs directory path", ) argparser.add_argument( @@ -106,24 +104,21 @@ help=".env file path", ) -argparser.add_argument("-dev", "--developement", dest="developement", default=True, help="Nebula for devs") - argparser.add_argument( "-p", "--production", dest="production", action="store_true", default=False, - help="Production mode", + help="Deploy NEBULA in production mode", ) argparser.add_argument( - "-ad", - "--advanced", - dest="advanced_analytics", - action="store_true", - default=False, - help="Advanced analytics", + "-pr", + "--prefix", + dest="prefix", + default="dev", + help="Deploy NEBULA components with a prefix", ) argparser.add_argument( diff --git a/nebula/controller/controller.py b/nebula/controller/controller.py index a00d142d1..acf03c19f 100755 --- a/nebula/controller/controller.py +++ b/nebula/controller/controller.py @@ -264,24 +264,24 @@ async def get_available_gpu(): def validate_physical_fields(data: dict): if data.get("deployment") != "physical": - return - + return + ips = data.get("physical_ips") if not ips: raise HTTPException( status_code=400, detail="physical deployment requires 'physical_ips'" ) - + if len(ips) != data.get("n_nodes"): raise HTTPException( status_code=400, detail="'physical_ips' must have the same length as 'n_nodes'" ) - + try: for ip in ips: - ipaddress.ip_address(ip) + ipaddress.ip_address(ip) print(ip) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -347,31 +347,27 @@ async def stop_scenario( ): """ Stops the execution of a federated learning scenario and performs cleanup operations. - + This endpoint: - Stops all participant containers associated with the specified scenario. - Removes Docker containers and network resources tied to the scenario and user. - Sets the scenario's status to "finished" in the database. - Optionally finalizes all active scenarios if the 'all' flag is set. - + Args: scenario_name (str): Name of the scenario to stop. username (str): User who initiated the stop operation. all (bool): Whether to stop all running scenarios instead of just one (default: False). - + Raises: HTTPException: Returns a 500 status code if any step fails. - + Note: This function does not currently trigger statistics generation. """ from nebula.controller.scenarios import ScenarioManagement - # ScenarioManagement.stop_participants(scenario_name) - DockerUtils.remove_containers_by_prefix(f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_{username}-participant") - DockerUtils.remove_docker_network( - f"{(os.environ.get('NEBULA_CONTROLLER_NAME'))}_{str(username).lower()}-nebula-net-scenario" - ) + ScenarioManagement.cleanup_scenario_containers() try: if all: scenario_set_all_status_to_finished() @@ -847,27 +843,27 @@ async def discover_vpn(): stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - + # 2) Wait for it to finish and capture stdout/stderr out, err = await proc.communicate() if proc.returncode != 0: # If the CLI returned an error, raise to be caught below raise RuntimeError(err.decode()) - + # 3) Parse the JSON output data = json.loads(out.decode()) - + # 4) Collect only the IPv4 addresses from each peer ips = [] for peer in data.get("Peer", {}).values(): for ip in peer.get("TailscaleIPs", []): - if ":" not in ip: + if ":" not in ip: # Skip IPv6 entries (they contain colons) ips.append(ip) - + # 5) Return the list of IPv4s return {"ips": ips} - + except Exception as e: # 6) Log any failure and respond with HTTP 500 logging.error(f"Error discovering VPN devices: {e}") @@ -877,14 +873,14 @@ async def discover_vpn(): @app.get("/physical/run/{ip}", tags=["physical"]) async def physical_run(ip: str): status, data = await remote_get(ip, "/run/") - + if status == 200: return data if status is None: raise HTTPException(status_code=502, detail=f"Node unreachable: {data}") raise HTTPException(status_code=status, detail=data) - - + + @app.get("/physical/stop/{ip}", tags=["physical"]) async def physical_stop(ip: str): status, data = await remote_get(ip, "/stop/") @@ -893,8 +889,8 @@ async def physical_stop(ip: str): if status is None: raise HTTPException(status_code=502, detail=f"Node unreachable: {data}") raise HTTPException(status_code=status, detail=data) - - + + @app.put("/physical/setup/{ip}", tags=["physical"], status_code=status.HTTP_201_CREATED) async def physical_setup( @@ -903,7 +899,7 @@ async def physical_setup( global_test: UploadFile = File(..., description="Global Dataset*.h5*"), train_set: UploadFile = File(..., description="Training dataset*.h5*"), ): - + form = aiohttp.FormData() await config.seek(0) form.add_field("config", config.file, @@ -914,17 +910,17 @@ async def physical_setup( await train_set.seek(0) form.add_field("train_set", train_set.file, filename=train_set.filename, content_type="application/octet-stream") - + status_code, data = await remote_post_form( ip, "/setup/", form, method="PUT" ) - + if status_code == 201: return data if status_code is None: raise HTTPException(status_code=502, detail=f"Node unreachable: {data}") raise HTTPException(status_code=status_code, detail=data) - + # ────────────────────────────────────────────────────────────── # Physical · single-node state # ────────────────────────────────────────────────────────────── @@ -932,22 +928,22 @@ async def physical_setup( async def get_physical_node_state(ip: str): """ Query a single Raspberry Pi (or other node) for its training state. - + Parameters ---------- ip : str IP address or hostname of the node. - + Returns ------- dict - • running (bool) – True if a training process is active. + • running (bool) – True if a training process is active. • error (str) – Optional error message when the node is unreachable or returns a non-200 HTTP status. """ # Short global timeout so a dead node doesn't block the whole request timeout = aiohttp.ClientTimeout(total=3) # seconds - + try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(f"http://{ip}/state/") as resp: @@ -960,8 +956,8 @@ async def get_physical_node_state(ip: str): except Exception as exc: # Network errors, timeouts, DNS failures, … return {"running": False, "error": str(exc)} - - + + # ────────────────────────────────────────────────────────────── # Physical · aggregate state for an entire scenario # ────────────────────────────────────────────────────────────── @@ -969,12 +965,12 @@ async def get_physical_node_state(ip: str): async def get_physical_scenario_state(scenario_name: str): """ Check the training state of *every* physical node assigned to a scenario. - + Parameters ---------- scenario_name : str Scenario identifier. - + Returns ------- dict @@ -989,16 +985,16 @@ async def get_physical_scenario_state(scenario_name: str): scenario = await get_scenario_by_name(scenario_name) if not scenario: raise HTTPException(status_code=404, detail="Scenario not found") - + nodes = await list_nodes_by_scenario_name(scenario_name) if not nodes: raise HTTPException(status_code=404, detail="No nodes found for scenario") - + # 2) Probe all nodes concurrently ips = [n["ip"] for n in nodes] tasks = [get_physical_node_state(ip) for ip in ips] states = await asyncio.gather(*tasks) # parallel HTTP calls - + # 3) Aggregate results nodes_state = dict(zip(ips, states)) any_running = any(s.get("running") for s in states) @@ -1007,7 +1003,7 @@ async def get_physical_scenario_state(scenario_name: str): all_available = all( (not s.get("running")) and (not s.get("error")) for s in states ) - + return { "running": any_running, "nodes_state": nodes_state, diff --git a/nebula/controller/scenarios.py b/nebula/controller/scenarios.py index acf1e98e5..7276f3893 100644 --- a/nebula/controller/scenarios.py +++ b/nebula/controller/scenarios.py @@ -199,30 +199,30 @@ def __init__( self.mobile_participants_percent = mobile_participants_percent self.additional_participants = additional_participants self.with_trustworthiness = with_trustworthiness - self.robustness_pillar = robustness_pillar, - self.resilience_to_attacks = resilience_to_attacks, - self.algorithm_robustness = algorithm_robustness, - self.client_reliability = client_reliability, - self.privacy_pillar = privacy_pillar, - self.technique = technique, - self.uncertainty = uncertainty, - self.indistinguishability = indistinguishability, - self.fairness_pillar = fairness_pillar, - self.selection_fairness = selection_fairness, - self.performance_fairness = performance_fairness, - self.class_distribution = class_distribution, - self.explainability_pillar = explainability_pillar, - self.interpretability = interpretability, - self.post_hoc_methods = post_hoc_methods, - self.accountability_pillar = accountability_pillar, - self.factsheet_completeness = factsheet_completeness, - self.architectural_soundness_pillar = architectural_soundness_pillar, - self.client_management = client_management, - self.optimization = optimization, - self.sustainability_pillar = sustainability_pillar, - self.energy_source = energy_source, - self.hardware_efficiency = hardware_efficiency, - self.federation_complexity = federation_complexity, + self.robustness_pillar = (robustness_pillar,) + self.resilience_to_attacks = (resilience_to_attacks,) + self.algorithm_robustness = (algorithm_robustness,) + self.client_reliability = (client_reliability,) + self.privacy_pillar = (privacy_pillar,) + self.technique = (technique,) + self.uncertainty = (uncertainty,) + self.indistinguishability = (indistinguishability,) + self.fairness_pillar = (fairness_pillar,) + self.selection_fairness = (selection_fairness,) + self.performance_fairness = (performance_fairness,) + self.class_distribution = (class_distribution,) + self.explainability_pillar = (explainability_pillar,) + self.interpretability = (interpretability,) + self.post_hoc_methods = (post_hoc_methods,) + self.accountability_pillar = (accountability_pillar,) + self.factsheet_completeness = (factsheet_completeness,) + self.architectural_soundness_pillar = (architectural_soundness_pillar,) + self.client_management = (client_management,) + self.optimization = (optimization,) + self.sustainability_pillar = (sustainability_pillar,) + self.energy_source = (energy_source,) + self.hardware_efficiency = (hardware_efficiency,) + self.federation_complexity = (federation_complexity,) self.schema_additional_participants = schema_additional_participants self.random_topology_probability = random_topology_probability self.with_sa = with_sa @@ -579,12 +579,16 @@ def __init__(self, scenario, user=None): self.scenario_name = f"nebula_{self.scenario.federation}_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}" self.root_path = os.environ.get("NEBULA_ROOT_HOST") self.host_platform = os.environ.get("NEBULA_HOST_PLATFORM") - self.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR"), self.scenario_name) - self.log_dir = os.environ.get("NEBULA_LOGS_DIR") - self.cert_dir = os.environ.get("NEBULA_CERTS_DIR") - self.advanced_analytics = os.environ.get("NEBULA_ADVANCED_ANALYTICS", "False") == "True" + self.config_dir = os.path.join(os.environ.get("NEBULA_CONFIG_DIR", "config"), self.scenario_name) + self.log_dir = os.environ.get("NEBULA_LOGS_DIR", "logs") + self.cert_dir = os.environ.get("NEBULA_CERTS_DIR", "certs") self.config = Config(entity="scenarioManagement") + # Tag-based naming for scenario resources + self.env_tag = os.environ.get("NEBULA_ENV_TAG", "dev") + self.prefix_tag = os.environ.get("NEBULA_PREFIX_TAG", "dev") + self.user_tag = os.environ.get("NEBULA_USER_TAG", os.environ.get("USER", "unknown")) + # If physical set the neighbours correctly if self.scenario.deployment == "physical" and self.scenario.physical_ips: for idx, ip in enumerate(self.scenario.physical_ips): @@ -761,6 +765,26 @@ def __init__(self, scenario, user=None): with open(participant_file, "w") as f: json.dump(participant_config, f, sort_keys=False, indent=2) + def get_network_name(self, suffix: str) -> str: + """ + Generate a standardized network name using tags. + Args: + suffix (str): Suffix for the network (default: 'net-base'). + Returns: + str: The composed network name. + """ + return f"{self.env_tag}_{self.prefix_tag}_{self.user_tag}_{suffix}" + + def get_participant_container_name(self, idx: int) -> str: + """ + Generate a standardized container name for a participant using tags. + Args: + idx (int): The participant index. + Returns: + str: The composed container name. + """ + return f"{self.env_tag}_{self.prefix_tag}_{self.user_tag}_{self.scenario_name}_participant{idx}" + @staticmethod def stop_participants(scenario_name=None): """ @@ -870,7 +894,6 @@ async def load_configurations_and_start_nodes( # Update participants configuration is_start_node = False config_participants = [] - # ap = len(additional_participants) if additional_participants else 0 additional_nodes = len(additional_participants) if additional_participants else 0 logging.info(f"######## nodes: {self.n_nodes} + additionals: {additional_nodes} ######") @@ -902,7 +925,7 @@ async def load_configurations_and_start_nodes( participant_config["mobility_args"]["latitude"] = self.scenario.latitude participant_config["mobility_args"]["longitude"] = self.scenario.longitude # If not, use the given coordinates in the frontend - participant_config["tracking_args"]["local_tracking"] = "advanced" if self.advanced_analytics else "basic" + participant_config["tracking_args"]["local_tracking"] = "default" participant_config["tracking_args"]["log_dir"] = self.log_dir participant_config["tracking_args"]["config_dir"] = self.config_dir @@ -953,7 +976,6 @@ async def load_configurations_and_start_nodes( logging.info(f"Configuration | additional nodes | participant: {self.n_nodes + i + 1}") last_ip = participant_config["network_args"]["ip"] - logging.info(f"Valores de la ultima ip: ({last_ip})") participant_config["scenario_args"]["n_nodes"] = self.n_nodes + additional_nodes # self.n_nodes + i + 1 participant_config["device_args"]["idx"] = last_participant_index + i participant_config["network_args"]["neighbors"] = "" @@ -1178,7 +1200,8 @@ def start_nodes_docker(self): logging.info("Starting nodes using Docker Compose...") logging.info(f"env path: {self.env_path}") - network_name = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_{str(self.user).lower()}-nebula-net-scenario" + network_name = self.get_network_name(f"{self.scenario_name}-net-scenario") + base_network_name = self.get_network_name("net-base") # Create the Docker network base = DockerUtils.create_docker_network(network_name) @@ -1188,9 +1211,10 @@ def start_nodes_docker(self): self.config.participants.sort(key=lambda x: x["device_args"]["idx"]) i = 2 container_ids = [] + container_names = [] # Track names for metadata for idx, node in enumerate(self.config.participants): image = "nebula-core" - name = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_{self.user}-participant{node['device_args']['idx']}" + name = self.get_participant_container_name(node["device_args"]["idx"]) if node["device_args"]["accelerator"] == "gpu": environment = { @@ -1223,10 +1247,10 @@ def start_nodes_docker(self): ] networking_config = client.api.create_networking_config({ - f"{network_name}": client.api.create_endpoint_config( + network_name: client.api.create_endpoint_config( ipv4_address=f"{base}.{i}", ), - f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_nebula-net-base": client.api.create_endpoint_config(), + base_network_name: client.api.create_endpoint_config(), }) node["tracking_args"]["log_dir"] = "/nebula/app/logs" @@ -1238,6 +1262,12 @@ def start_nodes_docker(self): node["security_args"]["cafile"] = "/nebula/app/certs/ca_cert.pem" node = json.loads(json.dumps(node).replace("192.168.50.", f"{base}.")) # TODO change this + try: + existing = client.containers.get(name) + logging.warning(f"Container {name} already exists. Deployment may fail or cause conflicts.") + except docker.errors.NotFound: + pass # No conflict, safe to proceed + # Write the config file in config directory with open(f"{self.config_dir}/participant_{node['device_args']['idx']}.json", "w") as f: json.dump(node, f, indent=4) @@ -1259,10 +1289,16 @@ def start_nodes_docker(self): try: client.api.start(container_id) container_ids.append(container_id) + container_names.append(name) except Exception as e: logging.exception(f"Starting participant {name} error: {e}") i += 1 + # Write scenario-level metadata for cleanup + scenario_metadata = {"containers": container_names, "network": network_name} + with open(os.path.join(self.config_dir, "scenario.metadata"), "w") as f: + json.dump(scenario_metadata, f, indent=2) + def start_nodes_process(self): """ Starts participant nodes as independent background processes on the host machine. @@ -1529,3 +1565,94 @@ def scenario_finished(self, timeout_seconds): return False time.sleep(5) + + @staticmethod + def cleanup_scenario_containers(): + """ + Remove all participant containers and the scenario network. + Reads ALL scenario.metadata and removes all listed containers and the network, then deletes the metadata file. + Also forcibly stops and removes any containers still attached to the network before removing it. + """ + import json + import logging + import os + + import docker + + # Try multiple possible config directory locations. This depends on where the user called the function from. + possible_config_dirs = [ + os.environ.get("NEBULA_CONFIG_DIR"), + "/nebula/app/config", + "./app/config", + os.path.join(os.getcwd(), "app", "config"), + os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "app", "config"), + ] + + config_dir = None + for dir_path in possible_config_dirs: + if dir_path and os.path.exists(dir_path): + config_dir = dir_path + break + + if not config_dir: + logging.warning("No valid config directory found, skipping cleanup") + return + + scenario_dirs = [] + logging.info(f"Config directory: {config_dir}") + if os.path.exists(config_dir): + for item in os.listdir(config_dir): + scenario_path = os.path.join(config_dir, item) + if os.path.isdir(scenario_path): + metadata_file = os.path.join(scenario_path, "scenario.metadata") + if os.path.exists(metadata_file): + scenario_dirs.append(scenario_path) + + logging.info(f"Removing scenario containers for {scenario_dirs}") + if not scenario_dirs: + logging.info("No active scenarios found to clean up") + return + + client = docker.from_env() + + for scenario_dir in scenario_dirs: + metadata_path = os.path.join(scenario_dir, "scenario.metadata") + if not os.path.exists(metadata_path): + logging.info(f"Skipping {scenario_dir} - no scenario.metadata found") + continue + + with open(metadata_path) as f: + meta = json.load(f) + + # Remove containers listed in metadata + for name in meta.get("containers", []): + try: + container = client.containers.get(name) + container.remove(force=True) + logging.info(f"Removed scenario container {name}") + except Exception as e: + logging.warning(f"Could not remove scenario container {name}: {e}") + + # Remove network, but first forcibly remove any containers still attached + network_name = meta.get("network") + if network_name: + try: + network = client.networks.get(network_name) + attached_containers = network.attrs.get("Containers") or {} + for container_id in attached_containers: + try: + c = client.containers.get(container_id) + c.remove(force=True) + logging.info(f"Force-removed container {c.name} attached to {network_name}") + except Exception as e: + logging.warning(f"Could not force-remove container {container_id}: {e}") + network.remove() + logging.info(f"Removed scenario network {network_name}") + except Exception as e: + logging.warning(f"Could not remove scenario network {network_name}: {e}") + + # Remove metadata file + try: + os.remove(metadata_path) + except Exception as e: + logging.warning(f"Could not remove scenario.metadata: {e}") diff --git a/nebula/controller/start_services.sh b/nebula/controller/start_services.sh index 98e53411c..fa0eaf031 100644 --- a/nebula/controller/start_services.sh +++ b/nebula/controller/start_services.sh @@ -11,13 +11,7 @@ echo "path $(pwd)" # Start Gunicorn NEBULA_SOCK=nebula.sock -echo "NEBULA_PRODUCTION: $NEBULA_PRODUCTION" -if [ "$NEBULA_PRODUCTION" = "False" ]; then - echo "Starting Gunicorn in dev mode..." - uvicorn nebula.controller.controller:app --host 0.0.0.0 --port $NEBULA_CONTROLLER_PORT --log-level debug --proxy-headers --forwarded-allow-ips "*" & -else - echo "Starting Gunicorn in production mode..." - uvicorn nebula.controller.controller:app --host 0.0.0.0 --port $NEBULA_CONTROLLER_PORT --log-level info --proxy-headers --forwarded-allow-ips "*" & -fi +echo "Starting Gunicorn..." +uvicorn nebula.controller.controller:app --host 0.0.0.0 --port $NEBULA_CONTROLLER_PORT --log-level debug --proxy-headers --forwarded-allow-ips "*" & tail -f /dev/null diff --git a/nebula/core/node.py b/nebula/core/node.py index 86a73cc2a..adc83fe1d 100755 --- a/nebula/core/node.py +++ b/nebula/core/node.py @@ -42,7 +42,6 @@ from nebula.core.models.mnist.mlp import MNISTModelMLP from nebula.core.engine import Engine from nebula.core.training.lightning import Lightning -from nebula.core.training.siamese import Siamese # os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" # os.environ["TORCH_LOGS"] = "+dynamo" @@ -70,7 +69,6 @@ async def main(config: Config): Raises: ValueError: If an unsupported model, dataset, or device role is specified. - NotImplementedError: If an unsupported training strategy (e.g., "scikit") is requested. Returns: Coroutine that initializes and starts the NEBULA node. @@ -169,10 +167,6 @@ async def main(config: Config): trainer_str = config.participant["training_args"]["trainer"] if trainer_str == "lightning": trainer = Lightning - elif trainer_str == "scikit": - raise NotImplementedError - elif trainer_str == "siamese": - trainer = Siamese else: raise ValueError(f"Trainer {trainer_str} not supported") diff --git a/nebula/core/training/lightning.py b/nebula/core/training/lightning.py index d83975147..edaa7f1c5 100755 --- a/nebula/core/training/lightning.py +++ b/nebula/core/training/lightning.py @@ -152,9 +152,7 @@ def set_datamodule(self, datamodule): self.datamodule = datamodule def create_logger(self): - if self.config.participant["tracking_args"]["local_tracking"] == "csv": - nebulalogger = CSVLogger(f"{self.log_dir}", name="metrics", version=f"participant_{self.idx}") - elif self.config.participant["tracking_args"]["local_tracking"] == "basic": + if self.config.participant["tracking_args"]["local_tracking"] == "default": logger_config = None if self._logger is not None: logger_config = self._logger.get_logger_config() @@ -167,6 +165,8 @@ def create_logger(self): ) # Restore logger configuration nebulalogger.set_logger_config(logger_config) + elif self.config.participant["tracking_args"]["local_tracking"] == "csv": + nebulalogger = CSVLogger(f"{self.log_dir}", name="metrics", version=f"participant_{self.idx}") else: nebulalogger = None diff --git a/nebula/core/training/scikit.py b/nebula/core/training/scikit.py deleted file mode 100755 index 99cb02b9d..000000000 --- a/nebula/core/training/scikit.py +++ /dev/null @@ -1,79 +0,0 @@ -import logging -import pickle -import traceback - -from sklearn.metrics import accuracy_score - - -class Scikit: - def __init__(self, model, data, config=None, logger=None): - self.model = model - self.data = data - self.config = config - self.logger = logger - self.round = 0 - self.epochs = 1 - self.logger.log_data({"Round": self.round}, step=self.logger.global_step) - - def set_model(self, model): - self.model = model - - def get_round(self): - return self.round - - def set_data(self, data): - self.data = data - - def serialize_model(self, params=None): - if params is None: - params = self.model.get_params() - return pickle.dumps(params) - - def deserialize_model(self, data): - try: - params = pickle.loads(data) - return params - except: - raise Exception("Error decoding parameters") - - def set_model_parameters(self, params): - self.model.set_params(**params) - - def get_model_parameters(self): - return self.model.get_params() - - def set_epochs(self, epochs): - self.epochs = epochs - - def fit(self): - try: - X_train, y_train = self.data.train_dataloader() - self.model.fit(X_train, y_train) - except Exception as e: - logging.exception(f"Error with scikit-learn fit. {e}") - logging.exception(traceback.format_exc()) - - def interrupt_fit(self): - pass - - def evaluate(self): - try: - X_test, y_test = self.data.test_dataloader() - y_pred = self.model.predict(X_test) - accuracy = accuracy_score(y_test, y_pred) - logging.info(f"Accuracy: {accuracy}") - except Exception as e: - logging.exception(f"Error with scikit-learn evaluate. {e}") - logging.exception(traceback.format_exc()) - return None - - def get_train_size(self): - return ( - len(self.data.train_dataloader()), - len(self.data.test_dataloader()), - ) - - def finalize_round(self): - self.round += 1 - if self.logger: - self.logger.log_data({"Round": self.round}) diff --git a/nebula/core/training/siamese.py b/nebula/core/training/siamese.py deleted file mode 100755 index 21999c3a5..000000000 --- a/nebula/core/training/siamese.py +++ /dev/null @@ -1,193 +0,0 @@ -import hashlib -import io -import logging -import traceback -from collections import OrderedDict - -import torch -from lightning import Trainer -from lightning.pytorch.callbacks import RichModelSummary, RichProgressBar -from lightning.pytorch.callbacks.progress.rich_progress import RichProgressBarTheme - -from nebula.core.utils.deterministic import enable_deterministic - - -class Siamese: - def __init__(self, model, data, config=None, logger=None): - # self.model = torch.compile(model, mode="reduce-overhead") - self.model = model - self.data = data - self.config = config - self.logger = logger - self.__trainer = None - self.epochs = 1 - logging.getLogger("lightning.pytorch").setLevel(logging.INFO) - self.round = 0 - enable_deterministic(seed=self.config.participant["scenario_args"]["random_seed"]) - self.logger.log_data({"Round": self.round}, step=self.logger.global_step) - - @property - def logger(self): - return self._logger - - def get_round(self): - return self.round - - def set_model(self, model): - self.model = model - - def set_data(self, data): - self.data = data - - def create_trainer(self): - logging.info( - "[Trainer] Creating trainer with accelerator: {}".format( - self.config.participant["device_args"]["accelerator"] - ) - ) - progress_bar = RichProgressBar( - theme=RichProgressBarTheme( - description="green_yellow", - progress_bar="green1", - progress_bar_finished="green1", - progress_bar_pulse="#6206E0", - batch_progress="green_yellow", - time="grey82", - processing_speed="grey82", - metrics="grey82", - ), - leave=True, - ) - if self.config.participant["device_args"]["accelerator"] == "gpu": - # NEBULA uses 2 GPUs (max) to distribute the nodes. - if self.config.participant["device_args"]["devices"] > 1: - # If you have more than 2 GPUs, you should specify which ones to use. - gpu_id = ([1] if self.config.participant["device_args"]["idx"] % 2 == 0 else [2],) - else: - # If there is only one GPU, it will be used. - gpu_id = [1] - - self.__trainer = Trainer( - callbacks=[RichModelSummary(max_depth=1), progress_bar], - max_epochs=self.epochs, - accelerator=self.config.participant["device_args"]["accelerator"], - devices=gpu_id, - logger=self.logger, - log_every_n_steps=50, - enable_checkpointing=False, - enable_model_summary=False, - enable_progress_bar=True, - # deterministic=True - ) - else: - # NEBULA uses only CPU to distribute the nodes - self.__trainer = Trainer( - callbacks=[RichModelSummary(max_depth=1), progress_bar], - max_epochs=self.epochs, - accelerator=self.config.participant["device_args"]["accelerator"], - devices="auto", - logger=self.logger, - log_every_n_steps=50, - enable_checkpointing=False, - enable_model_summary=False, - enable_progress_bar=True, - # deterministic=True - ) - - def get_global_model_parameters(self): - return self.model.get_global_model_parameters() - - def set_parameter_second_aggregation(self, params): - try: - logging.info("Setting parameters in second aggregation...") - self.model.load_state_dict(params) - except: - raise Exception("Error setting parameters") - - def get_model_parameters(self, bytes=False): - if bytes: - return self.serialize_model(self.model.state_dict()) - else: - return self.model.state_dict() - - def get_hash_model(self): - """ - Returns: - str: SHA256 hash of model parameters - """ - return hashlib.sha256(self.serialize_model()).hexdigest() - - def set_epochs(self, epochs): - self.epochs = epochs - - #### - # Model parameters serialization/deserialization - # From https://pytorch.org/docs/stable/notes/serialization.html - #### - def serialize_model(self, model): - try: - buffer = io.BytesIO() - # with gzip.GzipFile(fileobj=buffer, mode='wb') as f: - # torch.save(params, f) - torch.save(model, buffer) - return buffer.getvalue() - except: - raise Exception("Error serializing model") - - def deserialize_model(self, data): - try: - buffer = io.BytesIO(data) - # with gzip.GzipFile(fileobj=buffer, mode='rb') as f: - # params_dict = torch.load(f, map_location='cpu') - params_dict = torch.load(buffer, map_location="cpu") - return OrderedDict(params_dict) - except: - raise Exception("Error decoding parameters") - - def set_model_parameters(self, params, initialize=False): - try: - if initialize: - self.model.load_state_dict(params) - self.model.global_load_state_dict(params) - self.model.historical_load_state_dict(params) - else: - # First aggregation - self.model.global_load_state_dict(params) - except: - raise Exception("Error setting parameters") - - def train(self): - try: - self.create_trainer() - # torch.autograd.set_detect_anomaly(True) - # TODO: It is necessary to train only the local model, save the history of the previous model and then load it, the global model is the aggregation of all the models. - self.__trainer.fit(self.model, self.data) - # Save local model as historical model (previous round) - # It will be compared the next round during training local model (constrantive loss) - # When aggregation in global model (first) and aggregation with similarities and weights (second), the historical model keeps inmutable - logging.info("Saving historical model...") - self.model.save_historical_model() - except Exception as e: - logging.exception(f"Error training model: {e}") - logging.exception(traceback.format_exc()) - - def test(self): - try: - self.create_trainer() - self.__trainer.test(self.model, self.data, verbose=True) - except Exception as e: - logging.exception(f"Error testing model: {e}") - logging.exception(traceback.format_exc()) - - def get_model_weight(self): - return ( - len(self.data.train_dataloader().dataset), - len(self.data.test_dataloader().dataset), - ) - - def finalize_round(self): - self.logger.global_step = self.logger.global_step + self.logger.local_step - self.logger.local_step = 0 - self.round += 1 - self.logger.log_data({"Round": self.round}, step=self.logger.global_step) - pass diff --git a/nebula/frontend/app.py b/nebula/frontend/app.py index 8cb38d9cb..830a87c85 100755 --- a/nebula/frontend/app.py +++ b/nebula/frontend/app.py @@ -30,8 +30,9 @@ class Settings: controller_port (int): Port on which the Nebula controller listens (default: 5050). resources_threshold (float): Threshold for resource usage alerts (default: 0.0). port (int): Port for the Nebula frontend service (default: 6060). - production (bool): Whether the application is running in production mode. - advanced_analytics (bool): Whether advanced analytics features are enabled. + env_tag (str): Tag for the environment (e.g., 'dev', 'prod'). + prefix_tag (str): Tag for the deployment prefix (e.g., 'dev', 'prod'). + user_tag (str): Tag for the user (e.g., 'admin', 'user'). host_platform (str): Underlying host operating platform (e.g., 'unix'). log_dir (str): Directory path where application logs are stored. config_dir (str): Directory path for general configuration files. @@ -49,8 +50,9 @@ class Settings: controller_port: int = os.environ.get("NEBULA_CONTROLLER_PORT", 5050) resources_threshold: float = 0.0 port: int = os.environ.get("NEBULA_FRONTEND_PORT", 6060) - production: bool = os.environ.get("NEBULA_PRODUCTION", "False") == "True" - advanced_analytics: bool = os.environ.get("NEBULA_ADVANCED_ANALYTICS", "False") == "True" + env_tag: str = os.environ.get("NEBULA_ENV_TAG", "dev") + prefix_tag: str = os.environ.get("NEBULA_PREFIX_TAG", "dev") + user_tag: str = os.environ.get("NEBULA_USER_TAG", os.environ.get("USER", "unknown")) host_platform: str = os.environ.get("NEBULA_HOST_PLATFORM", "unix") log_dir: str = os.environ.get("NEBULA_LOGS_DIR") config_dir: str = os.environ.get("NEBULA_CONFIG_DIR") @@ -116,7 +118,8 @@ class Settings: logging.info(f"🚀 Starting Nebula Frontend on port {settings.port}") -logging.info(f"NEBULA_PRODUCTION: {settings.production}") +logging.info(f"NEBULA_PRODUCTION: {settings.env_tag == 'prod'}") +logging.info(f"NEBULA_DEPLOYMENT_PREFIX: {settings.prefix_tag}") if "SECRET_KEY" not in os.environ: logging.info("Generating SECRET_KEY") @@ -289,9 +292,11 @@ def add_global_context(request: Request): Returns: dict[str, bool]: is_production: Flag indicating if the application is running in production mode. + prefix: The prefix of the application. """ return { - "is_production": settings.production, + "is_production": settings.env_tag == "prod", + "prefix": settings.prefix_tag, } @@ -1856,8 +1861,6 @@ async def remove_scenario(scenario_name=None, user=None): user_data = user_data_store[user] - if settings.advanced_analytics: - logging.info("Advanced analytics enabled") # Remove registered nodes and conditions user_data.nodes_registration.pop(scenario_name, None) await remove_nodes_by_scenario_name(scenario_name) @@ -1939,14 +1942,6 @@ async def nebula_remove_scenario(scenario_name: str, session: dict = Depends(get raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) -if settings.advanced_analytics: - logging.info("Advanced analytics enabled") -else: - logging.info("Advanced analytics disabled") - - # TENSORBOARD START - - @app.get("/platform/dashboard/statistics/", response_class=HTMLResponse) @app.get("/platform/dashboard/{scenario_name}/statistics/", response_class=HTMLResponse) async def nebula_dashboard_statistics(request: Request, scenario_name: str = None): diff --git a/nebula/frontend/config/participant.json.example b/nebula/frontend/config/participant.json.example index ca1d1cfd6..0bca1feb5 100755 --- a/nebula/frontend/config/participant.json.example +++ b/nebula/frontend/config/participant.json.example @@ -104,7 +104,7 @@ }, "tracking_args": { "enable_remote_tracking": false, - "local_tracking": "basic", + "local_tracking": "default", "log_dir": "/Users/enrique/Documents/nebula/app/logs", "config_dir": "/Users/enrique/Documents/nebula/app/config", "run_hash": "" diff --git a/nebula/frontend/start_services.sh b/nebula/frontend/start_services.sh index b2d26e8a2..facac33b1 100755 --- a/nebula/frontend/start_services.sh +++ b/nebula/frontend/start_services.sh @@ -27,11 +27,7 @@ else uvicorn app:app --uds /tmp/$NEBULA_SOCK --log-level info --proxy-headers --forwarded-allow-ips "*" & fi -if [ "$NEBULA_ADVANCED_ANALYTICS" = "False" ]; then - echo "Starting Tensorboard analytics" - tensorboard --host 0.0.0.0 --port 8080 --logdir $NEBULA_LOGS_DIR --window_title "NEBULA Statistics" --reload_interval 30 --max_reload_threads 10 --reload_multifile true & -else - echo "Advanced analytics are enabled" -fi +echo "Starting Tensorboard analytics" +tensorboard --host 0.0.0.0 --port 8080 --logdir $NEBULA_LOGS_DIR --window_title "NEBULA Statistics" --reload_interval 30 --max_reload_threads 10 --reload_multifile true & tail -f /dev/null diff --git a/nebula/frontend/static/css/style.css b/nebula/frontend/static/css/style.css index 262732d28..0d2644142 100755 --- a/nebula/frontend/static/css/style.css +++ b/nebula/frontend/static/css/style.css @@ -633,8 +633,8 @@ hr.styled { .container { max-width: 100%; - padding-right: 10px; - padding-left: 10px; + padding-right: 25px; + padding-left: 25px; } } diff --git a/nebula/utils.py b/nebula/utils.py index cfc7a558b..60819ed1a 100644 --- a/nebula/utils.py +++ b/nebula/utils.py @@ -33,6 +33,27 @@ def check_path(cls, base_path, relative_path): raise Exception("Not allowed") return full_path + @classmethod + def update_env_file(cls, env_file, key, value): + """ + Update or add a key-value pair in the .env file. + """ + import re + lines = [] + if os.path.exists(env_file): + with open(env_file, "r") as f: + lines = f.readlines() + key_found = False + for i, line in enumerate(lines): + if re.match(rf"^{key}=.*", line): + lines[i] = f"{key}={value}\n" + key_found = True + break + if not key_found: + lines.append(f"{key}={value}\n") + with open(env_file, "w") as f: + f.writelines(lines) + class SocketUtils: """ @@ -174,98 +195,8 @@ def check_docker_by_prefix(cls, prefix): for container in containers: if container.name.startswith(prefix): return True - - return False - - except docker.errors.APIError: - logging.exception("Error interacting with Docker") - except Exception: - logging.exception("Unexpected error") - - @classmethod - def remove_docker_network(cls, network_name): - """ - Removes a Docker network by name. - - Args: - network_name (str): Name of the Docker network to remove. - - Returns: - None - """ - try: - # Connect to Docker - client = docker.from_env() - - # Get the network by name - network = client.networks.get(network_name) - - # Remove the network - network.remove() - - logging.info(f"Network {network_name} removed successfully.") - except docker.errors.NotFound: - logging.exception(f"Network {network_name} not found.") - except docker.errors.APIError: - logging.exception("Error interacting with Docker") - except Exception: - logging.exception("Unexpected error") - - @classmethod - def remove_docker_networks_by_prefix(cls, prefix): - """ - Removes all Docker networks whose names start with the given prefix. - - Args: - prefix (str): Prefix string to match network names. - Returns: - None - """ - try: - # Connect to Docker - client = docker.from_env() - - # List all networks - networks = client.networks.list() - - # Filter and remove networks with names starting with the prefix - for network in networks: - if network.name.startswith(prefix): - network.remove() - logging.info(f"Network {network.name} removed successfully.") - - except docker.errors.NotFound: - logging.info(f"One or more networks with prefix {prefix} not found.") - except docker.errors.APIError: - logging.info("Error interacting with Docker") - except Exception: - logging.info("Unexpected error") - - @classmethod - def remove_containers_by_prefix(cls, prefix): - """ - Removes all Docker containers whose names start with the given prefix. - Containers are forcibly removed even if they are running. - - Args: - prefix (str): Prefix string to match container names. - - Returns: - None - """ - try: - # Connect to Docker client - client = docker.from_env() - - containers = client.containers.list(all=True) # `all=True` to include stopped containers - - # Iterate through containers and remove those with the matching prefix - for container in containers: - if container.name.startswith(prefix): - logging.info(f"Removing container: {container.name}") - container.remove(force=True) # force=True to stop and remove if running - logging.info(f"Container {container.name} removed successfully.") + return False except docker.errors.APIError: logging.exception("Error interacting with Docker")