From 6340bcbcf58e8e9f770df2731972da55c83492c9 Mon Sep 17 00:00:00 2001 From: Robert Clark Date: Mon, 19 May 2025 12:29:30 -0700 Subject: [PATCH] Add storage mount options to LeptonExecutor Allow users to specify where storage is mounted from on DGX Cloud Lepton jobs, such as on an attached NFS mounted on all of the nodes in the node group. This can be mounted in jobs for shared storage. Signed-Off-By: Robert Clark --- nemo_run/core/execution/lepton.py | 18 +++++++---- test/core/execution/test_lepton.py | 49 ++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index 9609b952..eaf65810 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -99,9 +99,7 @@ def move_data(self, sleep: float = 10) -> None: image="busybox:1.37.0", # Use a very low resource container command=cmd, ), - mounts=[ - Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts - ], + mounts=[Mount(**mount) for mount in self.mounts], ) spec.resource_requirement = ResourceRequirement( resource_shape="cpu.small", @@ -161,6 +159,15 @@ def _valid_node_ids(self, node_group_id: DedicatedNodeGroup, client: APIClient) return valid_node_ids + def _validate_mounts(self): + """ + Ensure the required arguments are specified for mounts. + """ + for mount in self.mounts: + # Verify that 'path' and 'mount_path' are both present in the mounts list + if not all(key in mount for key in ["path", "mount_path"]): + raise RuntimeError("Must specify a 'path' and 'mount_path' for all mounts") + def create_lepton_job(self, name: str): """ Creates a distributed PyTorch job using the provided project/cluster IDs. @@ -192,9 +199,7 @@ def create_lepton_job(self, name: str): max_failure_retry=None, max_job_failure_retry=None, envs=envs, - mounts=[ - Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts - ], + mounts=[Mount(**mount) for mount in self.mounts], image_pull_secrets=[], ttl_seconds_after_finished=None, intra_job_communication=True, @@ -211,6 +216,7 @@ def create_lepton_job(self, name: str): return created_job def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: + self._validate_mounts() name = name.replace("_", "-").replace(".", "-") # to meet K8s requirements launch_script = f""" wget -O init.sh https://raw.githubusercontent.com/leptonai/scripts/main/lepton_env_to_pytorch.sh diff --git a/test/core/execution/test_lepton.py b/test/core/execution/test_lepton.py index d6d6a5fe..cbe93a97 100644 --- a/test/core/execution/test_lepton.py +++ b/test/core/execution/test_lepton.py @@ -396,6 +396,55 @@ def test_nproc_per_node_default(self): assert executor.nproc_per_node() == 1 + def test_valid_storage_mounts(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + + assert executor._validate_mounts() is None + + def test_valid_storage_mounts_with_mount_from(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[ + {"path": "/workspace", "mount_path": "/workspace", "from": "local-storage:nfs"} + ], + ) + + assert executor._validate_mounts() is None + + def test_missing_storage_mount_options(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace"}], + ) + + with pytest.raises(RuntimeError): + executor._validate_mounts() + + def test_missing_storage_mount_options_mount_path(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"mount_path": "/workspace"}], + ) + + with pytest.raises(RuntimeError): + executor._validate_mounts() + + def test_valid_storage_mounts_with_random_args(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace", "mount_path": "/workspace", "random": True}], + ) + + assert executor._validate_mounts() is None + @patch("nemo_run.core.execution.lepton.APIClient") def test_status_running_and_ready(self, mock_APIClient): mock_instance = MagicMock()