diff --git a/nvflare/tool/deploy/deploy_commands.py b/nvflare/tool/deploy/deploy_commands.py index a2f01fc77f..b6cee50863 100644 --- a/nvflare/tool/deploy/deploy_commands.py +++ b/nvflare/tool/deploy/deploy_commands.py @@ -32,7 +32,7 @@ from cryptography.hazmat.backends import default_backend from cryptography.x509.oid import NameOID -from nvflare.tool.cli_output import output_error_message, output_ok +from nvflare.tool.cli_output import output_error_message, output_ok, print_human RUNTIME_DOCKER = "docker" RUNTIME_K8S = "k8s" @@ -65,12 +65,26 @@ PASSTHROUGH_RESOURCE_MANAGER = ( "nvflare.app_common.resource_managers.passthrough_resource_manager.PassthroughResourceManager" ) +GPU_RESOURCE_MANAGER = "nvflare.app_common.resource_managers.gpu_resource_manager.GPUResourceManager" DOCKER_CLIENT_LAUNCHER = "nvflare.app_opt.job_launcher.docker_launcher.ClientDockerJobLauncher" DOCKER_SERVER_LAUNCHER = "nvflare.app_opt.job_launcher.docker_launcher.ServerDockerJobLauncher" K8S_CLIENT_LAUNCHER = "nvflare.app_opt.job_launcher.k8s_launcher.ClientK8sJobLauncher" K8S_SERVER_LAUNCHER = "nvflare.app_opt.job_launcher.k8s_launcher.ServerK8sJobLauncher" +PROCESS_CLIENT_LAUNCHER = "nvflare.app_common.job_launcher.client_process_launcher.ClientProcessJobLauncher" +PROCESS_SERVER_LAUNCHER = "nvflare.app_common.job_launcher.server_process_launcher.ServerProcessJobLauncher" +GPU_RESOURCE_CONSUMER = "nvflare.app_common.resource_consumers.gpu_resource_consumer.GPUResourceConsumer" LAUNCHER_IDS = {"process_launcher", "docker_launcher", "k8s_launcher"} +BUILTIN_LAUNCHER_PATHS = { + DOCKER_CLIENT_LAUNCHER, + DOCKER_SERVER_LAUNCHER, + K8S_CLIENT_LAUNCHER, + K8S_SERVER_LAUNCHER, + PROCESS_CLIENT_LAUNCHER, + PROCESS_SERVER_LAUNCHER, +} +BUILTIN_RESOURCE_MANAGER_PATHS = {GPU_RESOURCE_MANAGER, PASSTHROUGH_RESOURCE_MANAGER} +BUILTIN_RESOURCE_CONSUMER_PATHS = {GPU_RESOURCE_CONSUMER} RESOURCE_CONSUMER_IDS = {"resource_consumer"} DOCKER_RESERVED_KWARGS = { "volumes", @@ -228,6 +242,8 @@ def _prepare_docker(kit_info: KitInfo, final_output: Path, config: dict[str, Any "default_job_env": job_launcher.get("default_job_env") or {}, } _patch_resources(kit_info.kit_dir, "docker_launcher", launcher_path, launcher_args) + if kit_info.role == ROLE_SERVER: + _relocate_server_storage_to_workspace(kit_info.kit_dir, WORKSPACE_MOUNT_PATH) _patch_comm_config_for_docker(kit_info.kit_dir) _ensure_study_data_template(kit_info.kit_dir) _remove_start_scripts(kit_info.kit_dir, keep={DOCKER_START_SH}) @@ -468,6 +484,7 @@ def _patch_resources(kit_dir: Path, launcher_id: str, launcher_path: str, launch if not isinstance(components, list): _fail("INVALID_KIT", "resources.json.default components must be a list.", "Fix the startup kit resources file.") + _warn_for_replaced_components(components, launcher_id, launcher_path) components[:] = [ c for c in components if c.get("id") not in LAUNCHER_IDS and c.get("id") not in RESOURCE_CONSUMER_IDS ] @@ -483,6 +500,56 @@ def _patch_resources(kit_dir: Path, launcher_id: str, launcher_path: str, launch _write_resources(local_dir, resources) +def _warn_for_replaced_components(components: list[dict[str, Any]], launcher_id: str, launcher_path: str) -> None: + for component in components: + component_id = component.get("id") + if component_id in RESOURCE_CONSUMER_IDS and _component_has_custom_config( + component, BUILTIN_RESOURCE_CONSUMER_PATHS + ): + _warn( + f"deploy prepare removes component '{component_id}' from resources.json.default; " + "existing resource consumer configuration will not be used by the prepared runtime." + ) + elif component_id == "resource_manager" and _component_has_custom_config( + component, BUILTIN_RESOURCE_MANAGER_PATHS + ): + _warn( + "deploy prepare replaces component 'resource_manager' with PassthroughResourceManager; " + "existing resource manager path/args will not be used by the prepared runtime." + ) + elif component_id in LAUNCHER_IDS and _launcher_replacement_discards_config( + component, launcher_id, launcher_path + ): + _warn( + f"deploy prepare replaces component '{component_id}' with generated '{launcher_id}' configuration; " + "existing launcher path/args will not be used by the prepared runtime." + ) + + +def _component_has_custom_config(component: dict[str, Any], builtin_paths: set[str]) -> bool: + args = component.get("args") + # Empty args are the canonical "no arguments" shape in generated resources files. + if args: + return True + path = component.get("path") + return bool(path) and path not in builtin_paths + + +def _launcher_replacement_discards_config(component: dict[str, Any], launcher_id: str, launcher_path: str) -> bool: + if component.get("args"): + return True + path = component.get("path") + if not path: + return False + if component.get("id") == launcher_id and path != launcher_path: + return True + return path not in BUILTIN_LAUNCHER_PATHS + + +def _warn(message: str) -> None: + print_human(f"Warning: {message}") + + def _write_resources(local_dir: Path, resources: dict[str, Any]) -> None: payload = json.dumps(resources, indent=4) (local_dir / RESOURCES_JSON_DEFAULT).write_text(payload + "\n", encoding="utf-8") @@ -874,12 +941,17 @@ def _helm_src(role: str, filename: str) -> Path: def _relocate_server_storage_to_workspace(kit_dir: Path, workspace_mount_path: str) -> None: local_dir = kit_dir / "local" resources = _load_json_file(local_dir / RESOURCES_JSON_DEFAULT, RESOURCES_JSON_DEFAULT) - try: - resources["snapshot_persistor"]["args"]["storage"]["args"][ - "root_dir" - ] = f"{workspace_mount_path}/snapshot-storage" - except KeyError: - pass + if "snapshot_persistor" in resources: + try: + resources["snapshot_persistor"]["args"]["storage"]["args"][ + "root_dir" + ] = f"{workspace_mount_path}/snapshot-storage" + except (KeyError, TypeError): + _warn( + "snapshot_persistor is present, but deploy prepare could not relocate snapshot storage to the " + f"workspace at {workspace_mount_path}/snapshot-storage. Expected nested key: " + "snapshot_persistor.args.storage.args.root_dir." + ) for component in resources.get("components", []): if component.get("id") == "job_manager": component.setdefault("args", {})["uri_root"] = f"{workspace_mount_path}/jobs-storage" diff --git a/tests/unit_test/tool/deploy/deploy_commands_test.py b/tests/unit_test/tool/deploy/deploy_commands_test.py index 37b2e04655..683a812fa0 100644 --- a/tests/unit_test/tool/deploy/deploy_commands_test.py +++ b/tests/unit_test/tool/deploy/deploy_commands_test.py @@ -24,8 +24,11 @@ from cryptography.x509.oid import NameOID from nvflare.tool.deploy.deploy_commands import ( + GPU_RESOURCE_CONSUMER, + GPU_RESOURCE_MANAGER, HELM_RELEASE_NAME_MAX_LENGTH, K8S_PARENT_PYTHON_PATH, + PROCESS_CLIENT_LAUNCHER, _k8s_release_name, prepare_deployment, ) @@ -180,6 +183,28 @@ def _component(resources, component_id): return next(c for c in resources["components"] if c["id"] == component_id) +def _add_server_storage(resources_path, snapshot_persistor=None): + resources = json.loads(resources_path.read_text()) + resources["snapshot_persistor"] = snapshot_persistor or { + "path": "nvflare.app_common.state_persistors.storage_state_persistor.StorageStatePersistor", + "args": { + "uri_root": "/", + "storage": { + "path": "nvflare.app_common.storages.filesystem_storage.FilesystemStorage", + "args": {"root_dir": "/tmp/nvflare/snapshot-storage", "uri_root": "/"}, + }, + }, + } + resources["components"].append( + { + "id": "job_manager", + "path": "nvflare.apis.impl.job_def_manager.SimpleJobDefManager", + "args": {"uri_root": "/tmp/nvflare/jobs-storage", "job_store_id": "job_store"}, + } + ) + _write_json(resources_path, resources) + + def test_prepare_docker_client_copies_and_patches_runtime_files(tmp_path, capsys): kit = _make_client_kit(tmp_path) output = tmp_path / "site-1-docker" @@ -286,6 +311,30 @@ def test_prepare_docker_server_adds_logical_server_network_alias(tmp_path, capsy assert "--network-alias server" in script +def test_prepare_docker_server_relocates_storage_to_mounted_workspace(tmp_path, capsys): + kit = _make_server_kit(tmp_path) + _add_server_storage(kit / "local" / "resources.json.default") + output = tmp_path / "server-docker" + + _run_prepare( + kit, + output, + { + "runtime": "docker", + "parent": {"docker_image": "repo/nvflare:dev"}, + }, + ) + captured = capsys.readouterr() + + assert "snapshot_persistor is present" not in captured.out + captured.err + resources = json.loads((output / "local" / "resources.json.default").read_text()) + assert ( + resources["snapshot_persistor"]["args"]["storage"]["args"]["root_dir"] + == "/var/tmp/nvflare/workspace/snapshot-storage" + ) + assert _component(resources, "job_manager")["args"]["uri_root"] == "/var/tmp/nvflare/workspace/jobs-storage" + + @pytest.mark.parametrize( "admin_port, expected_admin_port", [ @@ -376,6 +425,58 @@ def test_prepare_k8s_launcher_default_python_path_matches_parent_default(tmp_pat assert values["command"] == [K8S_PARENT_PYTHON_PATH] +@pytest.mark.parametrize("runtime", ["docker", "k8s"]) +def test_prepare_server_without_snapshot_persistor_is_silent(tmp_path, capsys, runtime): + kit = _make_server_kit(tmp_path) + output = tmp_path / f"server-{runtime}" + + _run_prepare( + kit, + output, + { + "runtime": runtime, + "parent": {"docker_image": "repo/nvflare:dev"}, + }, + ) + captured = capsys.readouterr() + + assert "snapshot_persistor is present" not in captured.out + captured.err + + +@pytest.mark.parametrize("runtime", ["docker", "k8s"]) +def test_prepare_server_warns_when_snapshot_persistor_shape_is_unexpected(tmp_path, capsys, runtime): + kit = _make_server_kit(tmp_path) + _add_server_storage( + kit / "local" / "resources.json.default", + snapshot_persistor={ + "path": "custom.SnapshotPersistor", + "args": { + "storage": { + "path": "custom.Storage", + } + }, + }, + ) + output = tmp_path / f"server-{runtime}" + + _run_prepare( + kit, + output, + { + "runtime": runtime, + "parent": {"docker_image": "repo/nvflare:dev"}, + }, + ) + captured = capsys.readouterr() + + combined_output = captured.out + captured.err + assert "snapshot_persistor is present" in combined_output + assert "snapshot_persistor.args.storage.args.root_dir" in combined_output + resources = json.loads((output / "local" / "resources.json.default").read_text()) + assert "args" not in resources["snapshot_persistor"]["args"]["storage"] + assert _component(resources, "job_manager")["args"]["uri_root"] == "/var/tmp/nvflare/workspace/jobs-storage" + + def test_prepare_docker_reads_org_from_cert_without_sub_start(tmp_path, capsys): kit = _make_client_kit(tmp_path) (kit / "startup" / "sub_start.sh").unlink() @@ -572,6 +673,77 @@ def test_prepare_k8s_rejects_invalid_namespace(tmp_path, capsys, namespace): assert not output.exists() +def test_prepare_warns_when_replacing_custom_resource_and_launcher_config(tmp_path, capsys): + kit = _make_client_kit(tmp_path) + resources_path = kit / "local" / "resources.json.default" + resources = json.loads(resources_path.read_text()) + resources["components"] = [ + { + "id": "resource_manager", + "path": "nvflare.app_common.resource_managers.list_resource_manager.ListResourceManager", + "args": {"resources": [{"gpu": 1}]}, + }, + { + "id": "resource_consumer", + "path": "custom.AuditResourceConsumer", + "args": {}, + }, + { + "id": "process_launcher", + "path": "custom.ProcessLauncher", + "args": {}, + }, + { + "id": "k8s_launcher", + "path": "custom.K8sLauncher", + "args": {"timeout": 99}, + }, + ] + _write_json(resources_path, resources) + output = tmp_path / "site-1-k8s" + + _run_prepare( + kit, + output, + { + "runtime": "k8s", + "parent": {"docker_image": "repo/nvflare:dev"}, + }, + ) + captured = capsys.readouterr() + + combined_output = captured.out + captured.err + assert "replaces component 'resource_manager'" in combined_output + assert "removes component 'resource_consumer'" in combined_output + assert "replaces component 'process_launcher'" in combined_output + assert "replaces component 'k8s_launcher'" in combined_output + + +def test_prepare_does_not_warn_for_default_components_with_empty_args(tmp_path, capsys): + kit = _make_client_kit(tmp_path) + resources_path = kit / "local" / "resources.json.default" + resources = json.loads(resources_path.read_text()) + resources["components"] = [ + {"id": "resource_manager", "path": GPU_RESOURCE_MANAGER, "args": {}}, + {"id": "resource_consumer", "path": GPU_RESOURCE_CONSUMER, "args": {}}, + {"id": "process_launcher", "path": PROCESS_CLIENT_LAUNCHER, "args": {}}, + ] + _write_json(resources_path, resources) + output = tmp_path / "site-1-k8s" + + _run_prepare( + kit, + output, + { + "runtime": "k8s", + "parent": {"docker_image": "repo/nvflare:dev"}, + }, + ) + captured = capsys.readouterr() + + assert "Warning:" not in captured.out + captured.err + + def test_prepare_k8s_client_sanitizes_service_name_without_changing_site_identity(tmp_path, capsys): kit = _make_client_kit(tmp_path, name="Site_1") output = tmp_path / "site-1-k8s"