Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/design/docker_job_launcher_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ default:
mode: ro
```

At launch time, `DockerJobLauncher` looks up the study name and bind-mounts each configured dataset into the SJ/CJ container at `/data/<study>/<dataset>`. In Docker mode, `source` is the host path passed to Docker and `mode` must be `ro` or `rw`. If the file doesn't exist or the study has no entry, no data volume is added.
At launch time, `DockerJobLauncher` looks up the study name and bind-mounts each configured dataset into the SJ/CJ container at `/data/<study>/<dataset>`. In Docker mode, `source` is the host path passed to Docker and `mode` must be `ro` or `rw`. If the file doesn't exist or the study has no entry, no data volume is added and the launcher logs a warning.

This YAML schema replaces the legacy flat `study -> path` map. A stale flat-format file now fails validation instead of being ignored. If a configured dataset host `source` path does not exist, Docker reports the bind-mount failure when the job container is created.

Expand Down Expand Up @@ -459,7 +459,7 @@ study_a:
mode: ro
```

Top-level keys are study names from `meta.json`; nested keys are dataset names. Each dataset entry defines the host `source` path and mount `mode`. Dataset names appear in the container path as `/data/<study>/<dataset>`. If the file is absent or the job's study has no entry, no data volume is added and the job runs without a data mount. Legacy flat `study -> path` entries are invalid in this schema.
Top-level keys are study names from `meta.json`; nested keys are dataset names. Each dataset entry defines the host `source` path and mount `mode`. Dataset names appear in the container path as `/data/<study>/<dataset>`. If the file is absent or the job's study has no entry, no data volume is added, the launcher logs a warning, and the job runs without a data mount. Legacy flat `study -> path` entries are invalid in this schema.

### Step 5 — Submit a job

Expand Down
6 changes: 3 additions & 3 deletions docs/design/job_launcher_and_job_handle.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ Constructor parameters:
|-----------|---------|---------|
| `config_file_path` | required | Path to kubeconfig. Loaded lazily on first `launch_job`. |
| `workspace_pvc` | required | PVC claim name for workspace volume. |
| `study_data_pvc_file_path` | required | YAML file mapping study/dataset names to PVC claim names. Validated lazily; missing study entries skip data PVC mounts. |
| `study_data_pvc_file_path` | required | YAML file mapping study/dataset names to PVC claim names. Validated lazily; missing study entries skip data PVC mounts and log a warning. |
| `timeout` | `None` | Wall-clock seconds for `enter_states([RUNNING])`; also `_max_stuck_count`. |
| `namespace` | `"default"` | Kubernetes namespace. |
| `pending_timeout` | `120` | Stuck-detection threshold (poll iterations) when `timeout` is `None`. |
Expand All @@ -367,7 +367,7 @@ Launch sequence:
| 0 | Lazy init: load kubeconfig and create `CoreV1Api`. |
| 1 | Sanitize job ID via `uuid4_to_rfc1123`. Extract `site_name`, `job_image` from `get_job_launcher_spec(job_meta, site_name, "k8s")`. Raise if `WORKSPACE_OBJECT` missing. |
| 2 | Read `JOB_PROCESS_ARGS`; raise if absent or `EXE_MODULE` missing. Resolve dataset PVC mounts from `study_data_pvc_file_path` when the YAML file contains entries for the job study. |
| 3 | Build `job_config`: name, image, args from `get_module_args()`. Use `launcher_spec[site][k8s].python_path` for the pod command when present, falling back to `default_python_path`. Mount the job workspace at `workspace_mount_path`, mount the startup-kit Secret at `<workspace_mount_path>/startup`, and set custom-code `PYTHONPATH` under `workspace_mount_path`. Add the workspace `emptyDir.sizeLimit` and `resources.requests/limits["ephemeral-storage"]` from `launcher_spec[site][k8s].ephemeral_storage` when present, falling back to the launcher default. Add K8s `resources.limits` from `launcher_spec` `num_of_gpus`, `cpu`, and `memory` when present; `num_of_gpus` falls back to flat `resource_spec[site]` for backward compatibility. Missing study entries skip data PVC mounts. |
| 3 | Build `job_config`: name, image, args from `get_module_args()`. Use `launcher_spec[site][k8s].python_path` for the pod command when present, falling back to `default_python_path`. Mount the job workspace at `workspace_mount_path`, mount the startup-kit Secret at `<workspace_mount_path>/startup`, and set custom-code `PYTHONPATH` under `workspace_mount_path`. Add the workspace `emptyDir.sizeLimit` and `resources.requests/limits["ephemeral-storage"]` from `launcher_spec[site][k8s].ephemeral_storage` when present, falling back to the launcher default. Add K8s `resources.limits` from `launcher_spec` `num_of_gpus`, `cpu`, and `memory` when present; `num_of_gpus` falls back to flat `resource_spec[site]` for backward compatibility. Missing study entries skip data PVC mounts and log a warning. |
| 4 | Create `K8sJobHandle`. |
| 5 | `core_v1.create_namespaced_pod()`. On any exception: set `terminal_state = TERMINATED`, return handle. |
| 6 | `job_handle.enter_states([RUNNING])`. On any `BaseException`: `terminate()` then re-raise. |
Expand Down Expand Up @@ -544,7 +544,7 @@ See [docker_job_launcher_design.md](docker_job_launcher_design.md) for the full
}
```

The `study_data_pvc_file_path` YAML maps study and dataset names to PVC claim names. Missing study entries mean no data PVC is mounted:
The `study_data_pvc_file_path` YAML maps study and dataset names to PVC claim names. Missing study entries mean no data PVC is mounted, and the launcher logs a warning:

```yaml
default:
Expand Down
4 changes: 2 additions & 2 deletions nvflare/app_opt/job_launcher/docker_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,8 @@ def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:
study_data_file = os.path.join(self.WORKSPACE_MOUNT, self.STUDY_DATA_PATH_FILE)
study = job_meta.get(JobMetaKey.STUDY.value)
if should_mount_study_data(study):
study_data_map = load_study_data_file(study_data_file)
data_mounts = resolve_study_dataset_mounts(study_data_map, study, study_data_file)
study_data_map = load_study_data_file(study_data_file, logger=self.logger)
data_mounts = resolve_study_dataset_mounts(study_data_map, study, study_data_file, logger=self.logger)
for dataset_mount in data_mounts:
self.logger.info(
"mounting study '%s' dataset '%s' from %s -> %s",
Expand Down
6 changes: 4 additions & 2 deletions nvflare/app_opt/job_launcher/k8s_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,10 @@ def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:
data_mounts = []
if should_mount_study_data(study):
if self.study_data_pvc_dict is None:
self.study_data_pvc_dict = load_study_data_file(self.study_data_pvc_file_path)
data_mounts = resolve_study_dataset_mounts(self.study_data_pvc_dict, study, self.study_data_pvc_file_path)
self.study_data_pvc_dict = load_study_data_file(self.study_data_pvc_file_path, logger=self.logger)
data_mounts = resolve_study_dataset_mounts(
self.study_data_pvc_dict, study, self.study_data_pvc_file_path, logger=self.logger
)
site_resources = (job_meta.get(JobMetaKey.RESOURCE_SPEC.value) or {}).get(site_name) or {}
flat_gpu_count = (
0
Expand Down
32 changes: 30 additions & 2 deletions nvflare/app_opt/job_launcher/study_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import annotations

import logging
import re
from dataclasses import dataclass
from typing import Optional
Expand Down Expand Up @@ -48,11 +49,17 @@ def _validate_path_component(value: str, label: str, file_path: str) -> None:
raise ValueError(f"{label} {value!r} in '{file_path}' is not a valid study-data path component.")


def load_study_data_file(file_path: str) -> dict:
def _log_warning(logger: Optional[logging.Logger], message: str, *args) -> None:
if logger:
logger.warning(message, *args)
Comment thread
YuanTingHsieh marked this conversation as resolved.


def load_study_data_file(file_path: str, logger: Optional[logging.Logger] = None) -> dict:
try:
with open(file_path, "rt") as f:
study_data = yaml.safe_load(f)
except FileNotFoundError:
_log_warning(logger, "study data file '%s' was not found; no study data mounts will be configured", file_path)
return {}
except OSError as e:
raise ValueError(f"Could not read study data file '{file_path}': {e}") from e
Expand All @@ -65,6 +72,11 @@ def load_study_data_file(file_path: str) -> dict:
if not isinstance(study_data, dict):
raise ValueError(f"file at study_data_file_path '{file_path}' does not contain a dictionary.")

if not study_data:
_log_warning(
logger, "study data file '%s' has no study entries; no study data mounts will be configured", file_path
)

for study, datasets in study_data.items():
_validate_path_component(study, "study name", file_path)
if not isinstance(datasets, dict):
Expand Down Expand Up @@ -92,11 +104,27 @@ def should_mount_study_data(study: Optional[str]) -> bool:
return bool(study)


def resolve_study_dataset_mounts(study_data: dict, study: str, file_path: str) -> list[StudyDatasetMount]:
def resolve_study_dataset_mounts(
study_data: dict, study: str, file_path: str, logger: Optional[logging.Logger] = None
) -> list[StudyDatasetMount]:
"""Resolve mounts for a study. Empty-file warnings are emitted by load_study_data_file()."""
datasets = study_data.get(study)
if datasets is None:
if study_data:
_log_warning(
logger,
"study data file '%s' has no entry for study '%s'; no study data mounts will be configured",
file_path,
study,
)
return []
if not datasets:
_log_warning(
logger,
"study data file '%s' entry for study '%s' has no datasets; no study data mounts will be configured",
file_path,
study,
)
return []

_validate_path_component(study, "study name", file_path)
Expand Down
13 changes: 8 additions & 5 deletions tests/unit_test/app_opt/job_launcher/docker_launcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import sys
from types import ModuleType
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -600,7 +601,7 @@ def test_launch_study_data_mounts_nested_datasets(self):
with patch("nvflare.app_opt.job_launcher.docker_launcher.os.path.exists", return_value=True):
launcher.launch_job(_make_job_meta(study="study-a"), fl_ctx)

mock_load.assert_called_once_with("/var/tmp/nvflare/workspace/local/study_data.yaml")
mock_load.assert_called_once_with("/var/tmp/nvflare/workspace/local/study_data.yaml", logger=launcher.logger)
call_kwargs = dc.containers.run.call_args[1]
assert call_kwargs["command"] == [
"/usr/local/bin/python",
Expand Down Expand Up @@ -689,7 +690,7 @@ def test_launch_default_study_without_mapping_does_not_mount_data(self):
with patch("nvflare.app_opt.job_launcher.docker_launcher.load_study_data_file", return_value={}) as mock_load:
launcher.launch_job(_make_job_meta(study="default"), fl_ctx)

mock_load.assert_called_once_with("/var/tmp/nvflare/workspace/local/study_data.yaml")
mock_load.assert_called_once_with("/var/tmp/nvflare/workspace/local/study_data.yaml", logger=launcher.logger)
mounts_by_target = _mounts_by_target(dc.containers.run.call_args[1]["mounts"])
assert set(mounts_by_target) == {
"/var/tmp/nvflare/workspace",
Expand Down Expand Up @@ -720,7 +721,7 @@ def test_launch_default_study_mounts_default_mapping_when_present(self):
"ReadOnly": True,
}

def test_launch_omits_data_mount_when_study_mapping_is_missing(self):
def test_launch_omits_data_mount_when_study_mapping_is_missing(self, caplog):
launcher = _make_launcher()
dc = launcher._docker_client
container = MagicMock()
Expand All @@ -730,8 +731,9 @@ def test_launch_omits_data_mount_when_study_mapping_is_missing(self):
study_data = {"other-study": {"training": {"source": "/data/train", "mode": "ro"}}}

fl_ctx, _ = _make_fl_ctx()
with patch("nvflare.app_opt.job_launcher.docker_launcher.load_study_data_file", return_value=study_data):
launcher.launch_job(_make_job_meta(study="study-a"), fl_ctx)
with caplog.at_level(logging.WARNING):
with patch("nvflare.app_opt.job_launcher.docker_launcher.load_study_data_file", return_value=study_data):
launcher.launch_job(_make_job_meta(study="study-a"), fl_ctx)

mounts_by_target = _mounts_by_target(dc.containers.run.call_args[1]["mounts"])
assert set(mounts_by_target) == {
Expand All @@ -740,6 +742,7 @@ def test_launch_omits_data_mount_when_study_mapping_is_missing(self):
"/var/tmp/nvflare/workspace/local",
"/var/tmp/nvflare/workspace/job-1",
}
assert "has no entry for study 'study-a'" in caplog.text
Comment thread
YuanTingHsieh marked this conversation as resolved.

def test_launch_no_docker_socket_in_job_container(self):
"""Job containers must never receive the Docker socket."""
Expand Down
7 changes: 5 additions & 2 deletions tests/unit_test/app_opt/job_launcher/k8s_launcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import sys
from types import ModuleType
from unittest.mock import MagicMock, Mock, patch
Expand Down Expand Up @@ -1507,17 +1508,19 @@ def test_pod_manifest_uses_default_study_mapping_when_present(self):
finally:
_exit_patches(patches)

def test_pod_manifest_omits_data_pvc_when_study_mapping_is_missing(self):
def test_pod_manifest_omits_data_pvc_when_study_mapping_is_missing(self, caplog):
patches = _make_k8s_launcher_patches()
launcher, mock_api = self._setup(patches, study_data_pvc_dict={"other-study": {}})
self._prime_running(mock_api)
try:
launcher.launch_job(_make_launch_job_meta(study="study-a"), _make_launch_fl_ctx())
with caplog.at_level(logging.WARNING):
launcher.launch_job(_make_launch_job_meta(study="study-a"), _make_launch_fl_ctx())
manifest = mock_api.create_namespaced_pod.call_args.kwargs["body"]
vol_names = {v["name"] for v in manifest["spec"]["volumes"]}
mount_names = {m["name"] for m in manifest["spec"]["containers"][0]["volumeMounts"]}
assert vol_names == {"workspace-job", "startup-kit"}
assert mount_names == {"workspace-job", "startup-kit"}
assert "has no entry for study 'study-a'" in caplog.text
finally:
_exit_patches(patches)

Expand Down
41 changes: 41 additions & 0 deletions tests/unit_test/app_opt/job_launcher/study_data_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

import pytest
import yaml

Expand All @@ -30,6 +32,26 @@ def test_load_missing_file_returns_empty_mapping(tmp_path):
assert load_study_data_file(str(tmp_path / "missing.yaml")) == {}


def test_load_missing_file_logs_warning_when_logger_supplied(tmp_path, caplog):
logger = logging.getLogger("study-data-test")
with caplog.at_level(logging.WARNING, logger="study-data-test"):
assert load_study_data_file(str(tmp_path / "missing.yaml"), logger=logger) == {}

assert "was not found" in caplog.text
assert "no study data mounts will be configured" in caplog.text


def test_load_empty_file_logs_warning_when_logger_supplied(tmp_path, caplog):
path = tmp_path / "study_data.yaml"
path.write_text("")
logger = logging.getLogger("study-data-test")

with caplog.at_level(logging.WARNING, logger="study-data-test"):
assert load_study_data_file(str(path), logger=logger) == {}

assert "has no study entries" in caplog.text


def test_load_unreadable_file_raises_value_error(monkeypatch, tmp_path):
path = tmp_path / "study_data.yaml"
path.write_text("{}")
Expand Down Expand Up @@ -110,6 +132,16 @@ def test_resolve_returns_empty_when_study_mapping_is_missing():
assert resolve_study_dataset_mounts({}, "study-a", "study_data.yaml") == []


def test_resolve_logs_warning_when_study_mapping_is_missing(caplog):
logger = logging.getLogger("study-data-test")
study_data = {"other-study": {"training": {"source": "/data/train", "mode": "ro"}}}

with caplog.at_level(logging.WARNING, logger="study-data-test"):
assert resolve_study_dataset_mounts(study_data, "study-a", "study_data.yaml", logger=logger) == []

assert "has no entry for study 'study-a'" in caplog.text


def test_resolve_returns_empty_for_unmapped_invalid_study_name():
assert resolve_study_dataset_mounts({}, "Study_A", "study_data.yaml") == []

Expand All @@ -118,6 +150,15 @@ def test_resolve_returns_empty_when_study_mapping_is_empty():
assert resolve_study_dataset_mounts({"study-a": {}}, "study-a", "study_data.yaml") == []


def test_resolve_logs_warning_when_study_mapping_is_empty(caplog):
logger = logging.getLogger("study-data-test")

with caplog.at_level(logging.WARNING, logger="study-data-test"):
assert resolve_study_dataset_mounts({"study-a": {}}, "study-a", "study_data.yaml", logger=logger) == []

assert "entry for study 'study-a' has no datasets" in caplog.text


def test_resolve_default_study_mapping_when_present():
study_data = {"default": {"training": {"source": "/data/train", "mode": "ro"}}}

Expand Down
Loading