Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions nemo_run/core/execution/lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ def move_data(self, sleep: float = 10) -> None:
image="busybox:1.37.0", # Use a very low resource container
command=cmd,
),
mounts=[
Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts
],
mounts=[Mount(**mount) for mount in self.mounts],
)
spec.resource_requirement = ResourceRequirement(
resource_shape="cpu.small",
Expand Down Expand Up @@ -161,6 +159,15 @@ def _valid_node_ids(self, node_group_id: DedicatedNodeGroup, client: APIClient)

return valid_node_ids

def _validate_mounts(self):
"""
Ensure the required arguments are specified for mounts.
"""
for mount in self.mounts:
# Verify that 'path' and 'mount_path' are both present in the mounts list
if not all(key in mount for key in ["path", "mount_path"]):
raise RuntimeError("Must specify a 'path' and 'mount_path' for all mounts")

def create_lepton_job(self, name: str):
"""
Creates a distributed PyTorch job using the provided project/cluster IDs.
Expand Down Expand Up @@ -192,9 +199,7 @@ def create_lepton_job(self, name: str):
max_failure_retry=None,
max_job_failure_retry=None,
envs=envs,
mounts=[
Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts
],
mounts=[Mount(**mount) for mount in self.mounts],
image_pull_secrets=[],
ttl_seconds_after_finished=None,
intra_job_communication=True,
Expand All @@ -211,6 +216,7 @@ def create_lepton_job(self, name: str):
return created_job

def launch(self, name: str, cmd: list[str]) -> tuple[str, str]:
self._validate_mounts()
name = name.replace("_", "-").replace(".", "-") # to meet K8s requirements
launch_script = f"""
wget -O init.sh https://raw.githubusercontent.com/leptonai/scripts/main/lepton_env_to_pytorch.sh
Expand Down
49 changes: 49 additions & 0 deletions test/core/execution/test_lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,55 @@ def test_nproc_per_node_default(self):

assert executor.nproc_per_node() == 1

def test_valid_storage_mounts(self):
executor = LeptonExecutor(
container_image="nvcr.io/nvidia/test:latest",
nemo_run_dir="/workspace/nemo_run",
mounts=[{"path": "/workspace", "mount_path": "/workspace"}],
)

assert executor._validate_mounts() is None

def test_valid_storage_mounts_with_mount_from(self):
executor = LeptonExecutor(
container_image="nvcr.io/nvidia/test:latest",
nemo_run_dir="/workspace/nemo_run",
mounts=[
{"path": "/workspace", "mount_path": "/workspace", "from": "local-storage:nfs"}
],
)

assert executor._validate_mounts() is None

def test_missing_storage_mount_options(self):
executor = LeptonExecutor(
container_image="nvcr.io/nvidia/test:latest",
nemo_run_dir="/workspace/nemo_run",
mounts=[{"path": "/workspace"}],
)

with pytest.raises(RuntimeError):
executor._validate_mounts()

def test_missing_storage_mount_options_mount_path(self):
executor = LeptonExecutor(
container_image="nvcr.io/nvidia/test:latest",
nemo_run_dir="/workspace/nemo_run",
mounts=[{"mount_path": "/workspace"}],
)

with pytest.raises(RuntimeError):
executor._validate_mounts()

def test_valid_storage_mounts_with_random_args(self):
executor = LeptonExecutor(
container_image="nvcr.io/nvidia/test:latest",
nemo_run_dir="/workspace/nemo_run",
mounts=[{"path": "/workspace", "mount_path": "/workspace", "random": True}],
)

assert executor._validate_mounts() is None

@patch("nemo_run.core.execution.lepton.APIClient")
def test_status_running_and_ready(self, mock_APIClient):
mock_instance = MagicMock()
Expand Down
Loading