diff --git a/docs/design/docker_job_launcher_design.md b/docs/design/docker_job_launcher_design.md index 3198399e0e..cf94f43cc8 100644 --- a/docs/design/docker_job_launcher_design.md +++ b/docs/design/docker_job_launcher_design.md @@ -296,7 +296,7 @@ default: mode: ro ``` -At launch time, `DockerJobLauncher` looks up the study name and bind-mounts each configured dataset into the SJ/CJ container at `/data//`. In Docker mode, `source` is the host path passed to Docker and `mode` must be `ro` or `rw`. If the file doesn't exist or the study has no entry, no data volume is added. +At launch time, `DockerJobLauncher` looks up the study name and bind-mounts each configured dataset into the SJ/CJ container at `/data//`. In Docker mode, `source` is the host path passed to Docker and `mode` must be `ro` or `rw`. If the file doesn't exist or the study has no entry, no data volume is added and the launcher logs a warning. This YAML schema replaces the legacy flat `study -> path` map. A stale flat-format file now fails validation instead of being ignored. If a configured dataset host `source` path does not exist, Docker reports the bind-mount failure when the job container is created. @@ -459,7 +459,7 @@ study_a: mode: ro ``` -Top-level keys are study names from `meta.json`; nested keys are dataset names. Each dataset entry defines the host `source` path and mount `mode`. Dataset names appear in the container path as `/data//`. If the file is absent or the job's study has no entry, no data volume is added and the job runs without a data mount. Legacy flat `study -> path` entries are invalid in this schema. +Top-level keys are study names from `meta.json`; nested keys are dataset names. Each dataset entry defines the host `source` path and mount `mode`. Dataset names appear in the container path as `/data//`. If the file is absent or the job's study has no entry, no data volume is added, the launcher logs a warning, and the job runs without a data mount. Legacy flat `study -> path` entries are invalid in this schema. ### Step 5 — Submit a job diff --git a/docs/design/job_launcher_and_job_handle.md b/docs/design/job_launcher_and_job_handle.md index cecceac161..97c4ebe248 100644 --- a/docs/design/job_launcher_and_job_handle.md +++ b/docs/design/job_launcher_and_job_handle.md @@ -352,7 +352,7 @@ Constructor parameters: |-----------|---------|---------| | `config_file_path` | required | Path to kubeconfig. Loaded lazily on first `launch_job`. | | `workspace_pvc` | required | PVC claim name for workspace volume. | -| `study_data_pvc_file_path` | required | YAML file mapping study/dataset names to PVC claim names. Validated lazily; missing study entries skip data PVC mounts. | +| `study_data_pvc_file_path` | required | YAML file mapping study/dataset names to PVC claim names. Validated lazily; missing study entries skip data PVC mounts and log a warning. | | `timeout` | `None` | Wall-clock seconds for `enter_states([RUNNING])`; also `_max_stuck_count`. | | `namespace` | `"default"` | Kubernetes namespace. | | `pending_timeout` | `120` | Stuck-detection threshold (poll iterations) when `timeout` is `None`. | @@ -367,7 +367,7 @@ Launch sequence: | 0 | Lazy init: load kubeconfig and create `CoreV1Api`. | | 1 | Sanitize job ID via `uuid4_to_rfc1123`. Extract `site_name`, `job_image` from `get_job_launcher_spec(job_meta, site_name, "k8s")`. Raise if `WORKSPACE_OBJECT` missing. | | 2 | Read `JOB_PROCESS_ARGS`; raise if absent or `EXE_MODULE` missing. Resolve dataset PVC mounts from `study_data_pvc_file_path` when the YAML file contains entries for the job study. | -| 3 | Build `job_config`: name, image, args from `get_module_args()`. Use `launcher_spec[site][k8s].python_path` for the pod command when present, falling back to `default_python_path`. Mount the job workspace at `workspace_mount_path`, mount the startup-kit Secret at `/startup`, and set custom-code `PYTHONPATH` under `workspace_mount_path`. Add the workspace `emptyDir.sizeLimit` and `resources.requests/limits["ephemeral-storage"]` from `launcher_spec[site][k8s].ephemeral_storage` when present, falling back to the launcher default. Add K8s `resources.limits` from `launcher_spec` `num_of_gpus`, `cpu`, and `memory` when present; `num_of_gpus` falls back to flat `resource_spec[site]` for backward compatibility. Missing study entries skip data PVC mounts. | +| 3 | Build `job_config`: name, image, args from `get_module_args()`. Use `launcher_spec[site][k8s].python_path` for the pod command when present, falling back to `default_python_path`. Mount the job workspace at `workspace_mount_path`, mount the startup-kit Secret at `/startup`, and set custom-code `PYTHONPATH` under `workspace_mount_path`. Add the workspace `emptyDir.sizeLimit` and `resources.requests/limits["ephemeral-storage"]` from `launcher_spec[site][k8s].ephemeral_storage` when present, falling back to the launcher default. Add K8s `resources.limits` from `launcher_spec` `num_of_gpus`, `cpu`, and `memory` when present; `num_of_gpus` falls back to flat `resource_spec[site]` for backward compatibility. Missing study entries skip data PVC mounts and log a warning. | | 4 | Create `K8sJobHandle`. | | 5 | `core_v1.create_namespaced_pod()`. On any exception: set `terminal_state = TERMINATED`, return handle. | | 6 | `job_handle.enter_states([RUNNING])`. On any `BaseException`: `terminate()` then re-raise. | @@ -544,7 +544,7 @@ See [docker_job_launcher_design.md](docker_job_launcher_design.md) for the full } ``` -The `study_data_pvc_file_path` YAML maps study and dataset names to PVC claim names. Missing study entries mean no data PVC is mounted: +The `study_data_pvc_file_path` YAML maps study and dataset names to PVC claim names. Missing study entries mean no data PVC is mounted, and the launcher logs a warning: ```yaml default: diff --git a/nvflare/app_opt/job_launcher/docker_launcher.py b/nvflare/app_opt/job_launcher/docker_launcher.py index 1a491852ec..e9f93ee4ef 100644 --- a/nvflare/app_opt/job_launcher/docker_launcher.py +++ b/nvflare/app_opt/job_launcher/docker_launcher.py @@ -517,8 +517,8 @@ def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec: study_data_file = os.path.join(self.WORKSPACE_MOUNT, self.STUDY_DATA_PATH_FILE) study = job_meta.get(JobMetaKey.STUDY.value) if should_mount_study_data(study): - study_data_map = load_study_data_file(study_data_file) - data_mounts = resolve_study_dataset_mounts(study_data_map, study, study_data_file) + study_data_map = load_study_data_file(study_data_file, logger=self.logger) + data_mounts = resolve_study_dataset_mounts(study_data_map, study, study_data_file, logger=self.logger) for dataset_mount in data_mounts: self.logger.info( "mounting study '%s' dataset '%s' from %s -> %s", diff --git a/nvflare/app_opt/job_launcher/k8s_launcher.py b/nvflare/app_opt/job_launcher/k8s_launcher.py index 00f05eef8c..a17da9b757 100644 --- a/nvflare/app_opt/job_launcher/k8s_launcher.py +++ b/nvflare/app_opt/job_launcher/k8s_launcher.py @@ -464,8 +464,10 @@ def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec: data_mounts = [] if should_mount_study_data(study): if self.study_data_pvc_dict is None: - self.study_data_pvc_dict = load_study_data_file(self.study_data_pvc_file_path) - data_mounts = resolve_study_dataset_mounts(self.study_data_pvc_dict, study, self.study_data_pvc_file_path) + self.study_data_pvc_dict = load_study_data_file(self.study_data_pvc_file_path, logger=self.logger) + data_mounts = resolve_study_dataset_mounts( + self.study_data_pvc_dict, study, self.study_data_pvc_file_path, logger=self.logger + ) site_resources = (job_meta.get(JobMetaKey.RESOURCE_SPEC.value) or {}).get(site_name) or {} flat_gpu_count = ( 0 diff --git a/nvflare/app_opt/job_launcher/study_data.py b/nvflare/app_opt/job_launcher/study_data.py index f02750abe3..09a5d69fdf 100644 --- a/nvflare/app_opt/job_launcher/study_data.py +++ b/nvflare/app_opt/job_launcher/study_data.py @@ -14,6 +14,7 @@ from __future__ import annotations +import logging import re from dataclasses import dataclass from typing import Optional @@ -48,11 +49,17 @@ def _validate_path_component(value: str, label: str, file_path: str) -> None: raise ValueError(f"{label} {value!r} in '{file_path}' is not a valid study-data path component.") -def load_study_data_file(file_path: str) -> dict: +def _log_warning(logger: Optional[logging.Logger], message: str, *args) -> None: + if logger: + logger.warning(message, *args) + + +def load_study_data_file(file_path: str, logger: Optional[logging.Logger] = None) -> dict: try: with open(file_path, "rt") as f: study_data = yaml.safe_load(f) except FileNotFoundError: + _log_warning(logger, "study data file '%s' was not found; no study data mounts will be configured", file_path) return {} except OSError as e: raise ValueError(f"Could not read study data file '{file_path}': {e}") from e @@ -65,6 +72,11 @@ def load_study_data_file(file_path: str) -> dict: if not isinstance(study_data, dict): raise ValueError(f"file at study_data_file_path '{file_path}' does not contain a dictionary.") + if not study_data: + _log_warning( + logger, "study data file '%s' has no study entries; no study data mounts will be configured", file_path + ) + for study, datasets in study_data.items(): _validate_path_component(study, "study name", file_path) if not isinstance(datasets, dict): @@ -92,11 +104,27 @@ def should_mount_study_data(study: Optional[str]) -> bool: return bool(study) -def resolve_study_dataset_mounts(study_data: dict, study: str, file_path: str) -> list[StudyDatasetMount]: +def resolve_study_dataset_mounts( + study_data: dict, study: str, file_path: str, logger: Optional[logging.Logger] = None +) -> list[StudyDatasetMount]: + """Resolve mounts for a study. Empty-file warnings are emitted by load_study_data_file().""" datasets = study_data.get(study) if datasets is None: + if study_data: + _log_warning( + logger, + "study data file '%s' has no entry for study '%s'; no study data mounts will be configured", + file_path, + study, + ) return [] if not datasets: + _log_warning( + logger, + "study data file '%s' entry for study '%s' has no datasets; no study data mounts will be configured", + file_path, + study, + ) return [] _validate_path_component(study, "study name", file_path) diff --git a/tests/unit_test/app_opt/job_launcher/docker_launcher_test.py b/tests/unit_test/app_opt/job_launcher/docker_launcher_test.py index 70730fdc7d..6c2d9c4040 100644 --- a/tests/unit_test/app_opt/job_launcher/docker_launcher_test.py +++ b/tests/unit_test/app_opt/job_launcher/docker_launcher_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import sys from types import ModuleType from unittest.mock import MagicMock, patch @@ -600,7 +601,7 @@ def test_launch_study_data_mounts_nested_datasets(self): with patch("nvflare.app_opt.job_launcher.docker_launcher.os.path.exists", return_value=True): launcher.launch_job(_make_job_meta(study="study-a"), fl_ctx) - mock_load.assert_called_once_with("/var/tmp/nvflare/workspace/local/study_data.yaml") + mock_load.assert_called_once_with("/var/tmp/nvflare/workspace/local/study_data.yaml", logger=launcher.logger) call_kwargs = dc.containers.run.call_args[1] assert call_kwargs["command"] == [ "/usr/local/bin/python", @@ -689,7 +690,7 @@ def test_launch_default_study_without_mapping_does_not_mount_data(self): with patch("nvflare.app_opt.job_launcher.docker_launcher.load_study_data_file", return_value={}) as mock_load: launcher.launch_job(_make_job_meta(study="default"), fl_ctx) - mock_load.assert_called_once_with("/var/tmp/nvflare/workspace/local/study_data.yaml") + mock_load.assert_called_once_with("/var/tmp/nvflare/workspace/local/study_data.yaml", logger=launcher.logger) mounts_by_target = _mounts_by_target(dc.containers.run.call_args[1]["mounts"]) assert set(mounts_by_target) == { "/var/tmp/nvflare/workspace", @@ -720,7 +721,7 @@ def test_launch_default_study_mounts_default_mapping_when_present(self): "ReadOnly": True, } - def test_launch_omits_data_mount_when_study_mapping_is_missing(self): + def test_launch_omits_data_mount_when_study_mapping_is_missing(self, caplog): launcher = _make_launcher() dc = launcher._docker_client container = MagicMock() @@ -730,8 +731,9 @@ def test_launch_omits_data_mount_when_study_mapping_is_missing(self): study_data = {"other-study": {"training": {"source": "/data/train", "mode": "ro"}}} fl_ctx, _ = _make_fl_ctx() - with patch("nvflare.app_opt.job_launcher.docker_launcher.load_study_data_file", return_value=study_data): - launcher.launch_job(_make_job_meta(study="study-a"), fl_ctx) + with caplog.at_level(logging.WARNING): + with patch("nvflare.app_opt.job_launcher.docker_launcher.load_study_data_file", return_value=study_data): + launcher.launch_job(_make_job_meta(study="study-a"), fl_ctx) mounts_by_target = _mounts_by_target(dc.containers.run.call_args[1]["mounts"]) assert set(mounts_by_target) == { @@ -740,6 +742,7 @@ def test_launch_omits_data_mount_when_study_mapping_is_missing(self): "/var/tmp/nvflare/workspace/local", "/var/tmp/nvflare/workspace/job-1", } + assert "has no entry for study 'study-a'" in caplog.text def test_launch_no_docker_socket_in_job_container(self): """Job containers must never receive the Docker socket.""" diff --git a/tests/unit_test/app_opt/job_launcher/k8s_launcher_test.py b/tests/unit_test/app_opt/job_launcher/k8s_launcher_test.py index 383faab41f..57a3ed98a1 100644 --- a/tests/unit_test/app_opt/job_launcher/k8s_launcher_test.py +++ b/tests/unit_test/app_opt/job_launcher/k8s_launcher_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import sys from types import ModuleType from unittest.mock import MagicMock, Mock, patch @@ -1507,17 +1508,19 @@ def test_pod_manifest_uses_default_study_mapping_when_present(self): finally: _exit_patches(patches) - def test_pod_manifest_omits_data_pvc_when_study_mapping_is_missing(self): + def test_pod_manifest_omits_data_pvc_when_study_mapping_is_missing(self, caplog): patches = _make_k8s_launcher_patches() launcher, mock_api = self._setup(patches, study_data_pvc_dict={"other-study": {}}) self._prime_running(mock_api) try: - launcher.launch_job(_make_launch_job_meta(study="study-a"), _make_launch_fl_ctx()) + with caplog.at_level(logging.WARNING): + launcher.launch_job(_make_launch_job_meta(study="study-a"), _make_launch_fl_ctx()) manifest = mock_api.create_namespaced_pod.call_args.kwargs["body"] vol_names = {v["name"] for v in manifest["spec"]["volumes"]} mount_names = {m["name"] for m in manifest["spec"]["containers"][0]["volumeMounts"]} assert vol_names == {"workspace-job", "startup-kit"} assert mount_names == {"workspace-job", "startup-kit"} + assert "has no entry for study 'study-a'" in caplog.text finally: _exit_patches(patches) diff --git a/tests/unit_test/app_opt/job_launcher/study_data_test.py b/tests/unit_test/app_opt/job_launcher/study_data_test.py index fbba813a1d..82e9e4074a 100644 --- a/tests/unit_test/app_opt/job_launcher/study_data_test.py +++ b/tests/unit_test/app_opt/job_launcher/study_data_test.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + import pytest import yaml @@ -30,6 +32,26 @@ def test_load_missing_file_returns_empty_mapping(tmp_path): assert load_study_data_file(str(tmp_path / "missing.yaml")) == {} +def test_load_missing_file_logs_warning_when_logger_supplied(tmp_path, caplog): + logger = logging.getLogger("study-data-test") + with caplog.at_level(logging.WARNING, logger="study-data-test"): + assert load_study_data_file(str(tmp_path / "missing.yaml"), logger=logger) == {} + + assert "was not found" in caplog.text + assert "no study data mounts will be configured" in caplog.text + + +def test_load_empty_file_logs_warning_when_logger_supplied(tmp_path, caplog): + path = tmp_path / "study_data.yaml" + path.write_text("") + logger = logging.getLogger("study-data-test") + + with caplog.at_level(logging.WARNING, logger="study-data-test"): + assert load_study_data_file(str(path), logger=logger) == {} + + assert "has no study entries" in caplog.text + + def test_load_unreadable_file_raises_value_error(monkeypatch, tmp_path): path = tmp_path / "study_data.yaml" path.write_text("{}") @@ -110,6 +132,16 @@ def test_resolve_returns_empty_when_study_mapping_is_missing(): assert resolve_study_dataset_mounts({}, "study-a", "study_data.yaml") == [] +def test_resolve_logs_warning_when_study_mapping_is_missing(caplog): + logger = logging.getLogger("study-data-test") + study_data = {"other-study": {"training": {"source": "/data/train", "mode": "ro"}}} + + with caplog.at_level(logging.WARNING, logger="study-data-test"): + assert resolve_study_dataset_mounts(study_data, "study-a", "study_data.yaml", logger=logger) == [] + + assert "has no entry for study 'study-a'" in caplog.text + + def test_resolve_returns_empty_for_unmapped_invalid_study_name(): assert resolve_study_dataset_mounts({}, "Study_A", "study_data.yaml") == [] @@ -118,6 +150,15 @@ def test_resolve_returns_empty_when_study_mapping_is_empty(): assert resolve_study_dataset_mounts({"study-a": {}}, "study-a", "study_data.yaml") == [] +def test_resolve_logs_warning_when_study_mapping_is_empty(caplog): + logger = logging.getLogger("study-data-test") + + with caplog.at_level(logging.WARNING, logger="study-data-test"): + assert resolve_study_dataset_mounts({"study-a": {}}, "study-a", "study_data.yaml", logger=logger) == [] + + assert "entry for study 'study-a' has no datasets" in caplog.text + + def test_resolve_default_study_mapping_when_present(): study_data = {"default": {"training": {"source": "/data/train", "mode": "ro"}}}