diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index 9609b952..eaf65810 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -99,9 +99,7 @@ def move_data(self, sleep: float = 10) -> None: image="busybox:1.37.0", # Use a very low resource container command=cmd, ), - mounts=[ - Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts - ], + mounts=[Mount(**mount) for mount in self.mounts], ) spec.resource_requirement = ResourceRequirement( resource_shape="cpu.small", @@ -161,6 +159,15 @@ def _valid_node_ids(self, node_group_id: DedicatedNodeGroup, client: APIClient) return valid_node_ids + def _validate_mounts(self): + """ + Ensure the required arguments are specified for mounts. + """ + for mount in self.mounts: + # Verify that 'path' and 'mount_path' are both present in the mounts list + if not all(key in mount for key in ["path", "mount_path"]): + raise RuntimeError("Must specify a 'path' and 'mount_path' for all mounts") + def create_lepton_job(self, name: str): """ Creates a distributed PyTorch job using the provided project/cluster IDs. @@ -192,9 +199,7 @@ def create_lepton_job(self, name: str): max_failure_retry=None, max_job_failure_retry=None, envs=envs, - mounts=[ - Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts - ], + mounts=[Mount(**mount) for mount in self.mounts], image_pull_secrets=[], ttl_seconds_after_finished=None, intra_job_communication=True, @@ -211,6 +216,7 @@ def create_lepton_job(self, name: str): return created_job def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: + self._validate_mounts() name = name.replace("_", "-").replace(".", "-") # to meet K8s requirements launch_script = f""" wget -O init.sh https://raw.githubusercontent.com/leptonai/scripts/main/lepton_env_to_pytorch.sh diff --git a/test/core/execution/test_lepton.py b/test/core/execution/test_lepton.py index d6d6a5fe..cbe93a97 100644 --- a/test/core/execution/test_lepton.py +++ b/test/core/execution/test_lepton.py @@ -396,6 +396,55 @@ def test_nproc_per_node_default(self): assert executor.nproc_per_node() == 1 + def test_valid_storage_mounts(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + + assert executor._validate_mounts() is None + + def test_valid_storage_mounts_with_mount_from(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[ + {"path": "/workspace", "mount_path": "/workspace", "from": "local-storage:nfs"} + ], + ) + + assert executor._validate_mounts() is None + + def test_missing_storage_mount_options(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace"}], + ) + + with pytest.raises(RuntimeError): + executor._validate_mounts() + + def test_missing_storage_mount_options_mount_path(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"mount_path": "/workspace"}], + ) + + with pytest.raises(RuntimeError): + executor._validate_mounts() + + def test_valid_storage_mounts_with_random_args(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace", "mount_path": "/workspace", "random": True}], + ) + + assert executor._validate_mounts() is None + @patch("nemo_run.core.execution.lepton.APIClient") def test_status_running_and_ready(self, mock_APIClient): mock_instance = MagicMock()