Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 79 additions & 7 deletions nvflare/tool/deploy/deploy_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from cryptography.hazmat.backends import default_backend
from cryptography.x509.oid import NameOID

from nvflare.tool.cli_output import output_error_message, output_ok
from nvflare.tool.cli_output import output_error_message, output_ok, print_human

RUNTIME_DOCKER = "docker"
RUNTIME_K8S = "k8s"
Expand Down Expand Up @@ -65,12 +65,26 @@
PASSTHROUGH_RESOURCE_MANAGER = (
"nvflare.app_common.resource_managers.passthrough_resource_manager.PassthroughResourceManager"
)
GPU_RESOURCE_MANAGER = "nvflare.app_common.resource_managers.gpu_resource_manager.GPUResourceManager"
DOCKER_CLIENT_LAUNCHER = "nvflare.app_opt.job_launcher.docker_launcher.ClientDockerJobLauncher"
DOCKER_SERVER_LAUNCHER = "nvflare.app_opt.job_launcher.docker_launcher.ServerDockerJobLauncher"
K8S_CLIENT_LAUNCHER = "nvflare.app_opt.job_launcher.k8s_launcher.ClientK8sJobLauncher"
K8S_SERVER_LAUNCHER = "nvflare.app_opt.job_launcher.k8s_launcher.ServerK8sJobLauncher"
PROCESS_CLIENT_LAUNCHER = "nvflare.app_common.job_launcher.client_process_launcher.ClientProcessJobLauncher"
PROCESS_SERVER_LAUNCHER = "nvflare.app_common.job_launcher.server_process_launcher.ServerProcessJobLauncher"
GPU_RESOURCE_CONSUMER = "nvflare.app_common.resource_consumers.gpu_resource_consumer.GPUResourceConsumer"

LAUNCHER_IDS = {"process_launcher", "docker_launcher", "k8s_launcher"}
BUILTIN_LAUNCHER_PATHS = {
DOCKER_CLIENT_LAUNCHER,
DOCKER_SERVER_LAUNCHER,
K8S_CLIENT_LAUNCHER,
K8S_SERVER_LAUNCHER,
PROCESS_CLIENT_LAUNCHER,
PROCESS_SERVER_LAUNCHER,
}
BUILTIN_RESOURCE_MANAGER_PATHS = {GPU_RESOURCE_MANAGER, PASSTHROUGH_RESOURCE_MANAGER}
BUILTIN_RESOURCE_CONSUMER_PATHS = {GPU_RESOURCE_CONSUMER}
RESOURCE_CONSUMER_IDS = {"resource_consumer"}
DOCKER_RESERVED_KWARGS = {
"volumes",
Expand Down Expand Up @@ -228,6 +242,8 @@ def _prepare_docker(kit_info: KitInfo, final_output: Path, config: dict[str, Any
"default_job_env": job_launcher.get("default_job_env") or {},
}
_patch_resources(kit_info.kit_dir, "docker_launcher", launcher_path, launcher_args)
if kit_info.role == ROLE_SERVER:
_relocate_server_storage_to_workspace(kit_info.kit_dir, WORKSPACE_MOUNT_PATH)
_patch_comm_config_for_docker(kit_info.kit_dir)
_ensure_study_data_template(kit_info.kit_dir)
_remove_start_scripts(kit_info.kit_dir, keep={DOCKER_START_SH})
Expand Down Expand Up @@ -468,6 +484,7 @@ def _patch_resources(kit_dir: Path, launcher_id: str, launcher_path: str, launch
if not isinstance(components, list):
_fail("INVALID_KIT", "resources.json.default components must be a list.", "Fix the startup kit resources file.")

_warn_for_replaced_components(components, launcher_id, launcher_path)
components[:] = [
c for c in components if c.get("id") not in LAUNCHER_IDS and c.get("id") not in RESOURCE_CONSUMER_IDS
]
Expand All @@ -483,6 +500,56 @@ def _patch_resources(kit_dir: Path, launcher_id: str, launcher_path: str, launch
_write_resources(local_dir, resources)


def _warn_for_replaced_components(components: list[dict[str, Any]], launcher_id: str, launcher_path: str) -> None:
for component in components:
component_id = component.get("id")
if component_id in RESOURCE_CONSUMER_IDS and _component_has_custom_config(
component, BUILTIN_RESOURCE_CONSUMER_PATHS
):
_warn(
f"deploy prepare removes component '{component_id}' from resources.json.default; "
"existing resource consumer configuration will not be used by the prepared runtime."
)
elif component_id == "resource_manager" and _component_has_custom_config(
component, BUILTIN_RESOURCE_MANAGER_PATHS
):
_warn(
"deploy prepare replaces component 'resource_manager' with PassthroughResourceManager; "
"existing resource manager path/args will not be used by the prepared runtime."
)
elif component_id in LAUNCHER_IDS and _launcher_replacement_discards_config(
component, launcher_id, launcher_path
):
_warn(
f"deploy prepare replaces component '{component_id}' with generated '{launcher_id}' configuration; "
"existing launcher path/args will not be used by the prepared runtime."
)

Comment thread
YuanTingHsieh marked this conversation as resolved.

def _component_has_custom_config(component: dict[str, Any], builtin_paths: set[str]) -> bool:
args = component.get("args")
# Empty args are the canonical "no arguments" shape in generated resources files.
if args:
return True
path = component.get("path")
return bool(path) and path not in builtin_paths


def _launcher_replacement_discards_config(component: dict[str, Any], launcher_id: str, launcher_path: str) -> bool:
if component.get("args"):
return True
path = component.get("path")
if not path:
return False
if component.get("id") == launcher_id and path != launcher_path:
return True
return path not in BUILTIN_LAUNCHER_PATHS


def _warn(message: str) -> None:
print_human(f"Warning: {message}")


def _write_resources(local_dir: Path, resources: dict[str, Any]) -> None:
payload = json.dumps(resources, indent=4)
(local_dir / RESOURCES_JSON_DEFAULT).write_text(payload + "\n", encoding="utf-8")
Expand Down Expand Up @@ -874,12 +941,17 @@ def _helm_src(role: str, filename: str) -> Path:
def _relocate_server_storage_to_workspace(kit_dir: Path, workspace_mount_path: str) -> None:
local_dir = kit_dir / "local"
resources = _load_json_file(local_dir / RESOURCES_JSON_DEFAULT, RESOURCES_JSON_DEFAULT)
try:
resources["snapshot_persistor"]["args"]["storage"]["args"][
"root_dir"
] = f"{workspace_mount_path}/snapshot-storage"
except KeyError:
pass
if "snapshot_persistor" in resources:
try:
resources["snapshot_persistor"]["args"]["storage"]["args"][
"root_dir"
] = f"{workspace_mount_path}/snapshot-storage"
except (KeyError, TypeError):
_warn(
"snapshot_persistor is present, but deploy prepare could not relocate snapshot storage to the "
f"workspace at {workspace_mount_path}/snapshot-storage. Expected nested key: "
"snapshot_persistor.args.storage.args.root_dir."
)
for component in resources.get("components", []):
if component.get("id") == "job_manager":
component.setdefault("args", {})["uri_root"] = f"{workspace_mount_path}/jobs-storage"
Expand Down
172 changes: 172 additions & 0 deletions tests/unit_test/tool/deploy/deploy_commands_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
from cryptography.x509.oid import NameOID

from nvflare.tool.deploy.deploy_commands import (
GPU_RESOURCE_CONSUMER,
GPU_RESOURCE_MANAGER,
HELM_RELEASE_NAME_MAX_LENGTH,
K8S_PARENT_PYTHON_PATH,
PROCESS_CLIENT_LAUNCHER,
_k8s_release_name,
prepare_deployment,
)
Expand Down Expand Up @@ -180,6 +183,28 @@ def _component(resources, component_id):
return next(c for c in resources["components"] if c["id"] == component_id)


def _add_server_storage(resources_path, snapshot_persistor=None):
resources = json.loads(resources_path.read_text())
resources["snapshot_persistor"] = snapshot_persistor or {
"path": "nvflare.app_common.state_persistors.storage_state_persistor.StorageStatePersistor",
"args": {
"uri_root": "/",
"storage": {
"path": "nvflare.app_common.storages.filesystem_storage.FilesystemStorage",
"args": {"root_dir": "/tmp/nvflare/snapshot-storage", "uri_root": "/"},
},
},
}
resources["components"].append(
{
"id": "job_manager",
"path": "nvflare.apis.impl.job_def_manager.SimpleJobDefManager",
"args": {"uri_root": "/tmp/nvflare/jobs-storage", "job_store_id": "job_store"},
}
)
_write_json(resources_path, resources)


def test_prepare_docker_client_copies_and_patches_runtime_files(tmp_path, capsys):
kit = _make_client_kit(tmp_path)
output = tmp_path / "site-1-docker"
Expand Down Expand Up @@ -286,6 +311,30 @@ def test_prepare_docker_server_adds_logical_server_network_alias(tmp_path, capsy
assert "--network-alias server" in script


def test_prepare_docker_server_relocates_storage_to_mounted_workspace(tmp_path, capsys):
kit = _make_server_kit(tmp_path)
_add_server_storage(kit / "local" / "resources.json.default")
output = tmp_path / "server-docker"

_run_prepare(
kit,
output,
{
"runtime": "docker",
"parent": {"docker_image": "repo/nvflare:dev"},
},
)
captured = capsys.readouterr()

assert "snapshot_persistor is present" not in captured.out + captured.err
resources = json.loads((output / "local" / "resources.json.default").read_text())
assert (
resources["snapshot_persistor"]["args"]["storage"]["args"]["root_dir"]
== "/var/tmp/nvflare/workspace/snapshot-storage"
)
assert _component(resources, "job_manager")["args"]["uri_root"] == "/var/tmp/nvflare/workspace/jobs-storage"


@pytest.mark.parametrize(
"admin_port, expected_admin_port",
[
Expand Down Expand Up @@ -376,6 +425,58 @@ def test_prepare_k8s_launcher_default_python_path_matches_parent_default(tmp_pat
assert values["command"] == [K8S_PARENT_PYTHON_PATH]


@pytest.mark.parametrize("runtime", ["docker", "k8s"])
def test_prepare_server_without_snapshot_persistor_is_silent(tmp_path, capsys, runtime):
kit = _make_server_kit(tmp_path)
output = tmp_path / f"server-{runtime}"

_run_prepare(
kit,
output,
{
"runtime": runtime,
"parent": {"docker_image": "repo/nvflare:dev"},
},
)
captured = capsys.readouterr()

assert "snapshot_persistor is present" not in captured.out + captured.err


@pytest.mark.parametrize("runtime", ["docker", "k8s"])
def test_prepare_server_warns_when_snapshot_persistor_shape_is_unexpected(tmp_path, capsys, runtime):
kit = _make_server_kit(tmp_path)
_add_server_storage(
kit / "local" / "resources.json.default",
snapshot_persistor={
"path": "custom.SnapshotPersistor",
"args": {
"storage": {
"path": "custom.Storage",
}
},
},
)
output = tmp_path / f"server-{runtime}"

_run_prepare(
kit,
output,
{
"runtime": runtime,
"parent": {"docker_image": "repo/nvflare:dev"},
},
)
captured = capsys.readouterr()

combined_output = captured.out + captured.err
assert "snapshot_persistor is present" in combined_output
assert "snapshot_persistor.args.storage.args.root_dir" in combined_output
resources = json.loads((output / "local" / "resources.json.default").read_text())
assert "args" not in resources["snapshot_persistor"]["args"]["storage"]
assert _component(resources, "job_manager")["args"]["uri_root"] == "/var/tmp/nvflare/workspace/jobs-storage"


def test_prepare_docker_reads_org_from_cert_without_sub_start(tmp_path, capsys):
kit = _make_client_kit(tmp_path)
(kit / "startup" / "sub_start.sh").unlink()
Expand Down Expand Up @@ -572,6 +673,77 @@ def test_prepare_k8s_rejects_invalid_namespace(tmp_path, capsys, namespace):
assert not output.exists()


def test_prepare_warns_when_replacing_custom_resource_and_launcher_config(tmp_path, capsys):
kit = _make_client_kit(tmp_path)
resources_path = kit / "local" / "resources.json.default"
resources = json.loads(resources_path.read_text())
resources["components"] = [
{
"id": "resource_manager",
"path": "nvflare.app_common.resource_managers.list_resource_manager.ListResourceManager",
"args": {"resources": [{"gpu": 1}]},
},
{
"id": "resource_consumer",
"path": "custom.AuditResourceConsumer",
"args": {},
},
{
"id": "process_launcher",
"path": "custom.ProcessLauncher",
"args": {},
},
{
"id": "k8s_launcher",
"path": "custom.K8sLauncher",
"args": {"timeout": 99},
},
]
_write_json(resources_path, resources)
output = tmp_path / "site-1-k8s"

_run_prepare(
kit,
output,
{
"runtime": "k8s",
"parent": {"docker_image": "repo/nvflare:dev"},
},
)
captured = capsys.readouterr()

combined_output = captured.out + captured.err
assert "replaces component 'resource_manager'" in combined_output
assert "removes component 'resource_consumer'" in combined_output
assert "replaces component 'process_launcher'" in combined_output
assert "replaces component 'k8s_launcher'" in combined_output


def test_prepare_does_not_warn_for_default_components_with_empty_args(tmp_path, capsys):
kit = _make_client_kit(tmp_path)
resources_path = kit / "local" / "resources.json.default"
resources = json.loads(resources_path.read_text())
resources["components"] = [
{"id": "resource_manager", "path": GPU_RESOURCE_MANAGER, "args": {}},
{"id": "resource_consumer", "path": GPU_RESOURCE_CONSUMER, "args": {}},
{"id": "process_launcher", "path": PROCESS_CLIENT_LAUNCHER, "args": {}},
]
_write_json(resources_path, resources)
output = tmp_path / "site-1-k8s"

_run_prepare(
kit,
output,
{
"runtime": "k8s",
"parent": {"docker_image": "repo/nvflare:dev"},
},
)
captured = capsys.readouterr()

assert "Warning:" not in captured.out + captured.err


def test_prepare_k8s_client_sanitizes_service_name_without_changing_site_identity(tmp_path, capsys):
kit = _make_client_kit(tmp_path, name="Site_1")
output = tmp_path / "site-1-k8s"
Expand Down
Loading