From ddfc4ae00362270bbafdc5b6492e063fd8048f54 Mon Sep 17 00:00:00 2001 From: Robert Clark Date: Fri, 25 Apr 2025 09:17:19 -0700 Subject: [PATCH 1/5] Add support for NVIDIA DGX Cloud Lepton NVIDIA DGX Cloud Lepton is another platform available for launching distributed jobs using NeMo-Run. The new LeptonExecutor leverages the Lepton Python SDK to authenticate with a DGX Cloud Lepton cluster and launch jobs on available resources. Signed-Off-By: Robert Clark --- docs/source/guides/execution.md | 34 ++ docs/source/guides/why-use-nemo-run.md | 1 + docs/source/index.rst | 6 + nemo_run/__init__.py | 4 +- nemo_run/core/execution/__init__.py | 11 +- nemo_run/core/execution/lepton.py | 352 ++++++++++++++++++ nemo_run/run/experiment.py | 12 +- .../run/torchx_backend/components/torchrun.py | 15 + nemo_run/run/torchx_backend/packaging.py | 2 + nemo_run/run/torchx_backend/schedulers/api.py | 3 + .../run/torchx_backend/schedulers/lepton.py | 233 ++++++++++++ pyproject.toml | 2 + 12 files changed, 670 insertions(+), 5 deletions(-) create mode 100644 nemo_run/core/execution/lepton.py create mode 100644 nemo_run/run/torchx_backend/schedulers/lepton.py diff --git a/docs/source/guides/execution.md b/docs/source/guides/execution.md index 8761271f..690093de 100644 --- a/docs/source/guides/execution.md +++ b/docs/source/guides/execution.md @@ -7,6 +7,7 @@ Each execution of a single configured task requires an executor. Nemo-Run provid - `run.DockerExecutor` - `run.SlurmExecutor` with an optional `SSHTunnel` for executing on Slurm clusters from your local machine - `run.SkypilotExecutor` (available under the optional feature `skypilot` in the python package). +- `run.LeptonExecutor` A tuple of task and executor form an execution unit. A key goal of NeMo-Run is to allow you to mix and match tasks and executors to arbitrarily define execution units. @@ -41,6 +42,7 @@ The packager support matrix is described below: | SlurmExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager | | SkypilotExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager | | DGXCloudExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager | +| LeptonExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager | `run.Packager` is a passthrough base packager. @@ -264,3 +266,35 @@ def your_dgx_executor(nodes: int, gpus_per_node: int, container_image: str): ``` For a complete end-to-end example using DGX Cloud with NeMo, refer to the [NVIDIA DGX Cloud NeMo End-to-End Workflow Example](https://docs.nvidia.com/dgx-cloud/run-ai/latest/nemo-e2e-example.html). + +#### LeptonExecutor + +The `LeptonExecutor` integrates with an NVIDIA DGX Cloud Lepton cluster's Python SDK to launch distributed jobs. It uses API calls behind the Lepton SDK to authenticate, identify the target node group and resource shapes, and submit the job specification which will be launched as a batch job on the cluster. + +Here's an example configuration: + +```python +def your_lepton_executor(nodes: int, gpus_per_node: int, container_image: str): + # Ensure these are set correctly for your DGX Cloud environment + # You might fetch these from environment variables or a config file + resource_shape = "gpu.8xh100-80gb" # Replace with your desired resource shape representing the number of GPUs in a pod + node_group = "my-node-group" # The node group to run the job in + nemo_run_dir = "/nemo-workspace/nemo-run" # The NeMo-Run directory where experiments are saved + + executor = run.LeptonExecutor( + resource_shape=resource_shape, + node_group=node_group, + container_image=container_image, + nodes=nodes, + nemo_run_dir=nemo_run_dir, + gpus_per_node=gpus_per_node, + # Optional: Add custom environment variables or PyTorch specs if needed + env_vars=common_envs(), + # packager=run.GitArchivePackager() # Choose appropriate packager + ) + return executor + +# Example usage: +executor = your_lepton_executor(nodes=4, gpus_per_node=8, container_image="your-nemo-image") + +``` diff --git a/docs/source/guides/why-use-nemo-run.md b/docs/source/guides/why-use-nemo-run.md index 539df0b1..fa85cca6 100644 --- a/docs/source/guides/why-use-nemo-run.md +++ b/docs/source/guides/why-use-nemo-run.md @@ -29,6 +29,7 @@ But once defined, it is seamless to launch your tasks. Currently, we support the - LocalExecutor - SlurmExecutor - SkypilotExecutor +- LeptonExecutor This means that you can launch your configured task on one slurm cluster or the other, on a Kubernetes cluster, on one cloud or the other, or on all of them at the same time. diff --git a/docs/source/index.rst b/docs/source/index.rst index f7eadb41..ed201772 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -38,6 +38,12 @@ will install Skypilot w all clouds You can also manually install Skypilot from https://skypilot.readthedocs.io/en/latest/getting-started/installation.html +If using DGX Cloud Lepton, use the following command to install the Lepton CLI: + +``pip install leptonai`` + +To authenticate with the DGX Cloud Lepton cluster, navigate to the **Settings > Tokens** page in the DGX Cloud Lepton UI and copy the ``lep login`` command shown on the page and run it in the terminal. + Make sure you have `pip` installed and configured properly. diff --git a/nemo_run/__init__.py b/nemo_run/__init__.py index 78b73630..07755c2c 100644 --- a/nemo_run/__init__.py +++ b/nemo_run/__init__.py @@ -19,11 +19,13 @@ from nemo_run import cli from nemo_run.api import autoconvert, dryrun_fn +from nemo_run.cli.lazy import LazyEntrypoint, lazy_imports from nemo_run.config import Config, ConfigurableMixin, Partial, Script from nemo_run.core.execution.base import Executor, ExecutorMacros, import_executor from nemo_run.core.execution.dgxcloud import DGXCloudExecutor from nemo_run.core.execution.docker import DockerExecutor from nemo_run.core.execution.launcher import FaultTolerance, SlurmRay, SlurmTemplate, Torchrun +from nemo_run.core.execution.lepton import LeptonExecutor from nemo_run.core.execution.local import LocalExecutor from nemo_run.core.execution.skypilot import SkypilotExecutor from nemo_run.core.execution.slurm import SlurmExecutor @@ -31,7 +33,6 @@ from nemo_run.core.tunnel.client import LocalTunnel, SSHTunnel from nemo_run.devspace.base import DevSpace from nemo_run.help import help -from nemo_run.cli.lazy import LazyEntrypoint, lazy_imports from nemo_run.package_info import __package_name__, __version__ from nemo_run.run.api import run from nemo_run.run.experiment import Experiment @@ -58,6 +59,7 @@ "GitArchivePackager", "PatternPackager", "help", + "LeptonExecutor", "LocalExecutor", "LocalTunnel", "Packager", diff --git a/nemo_run/core/execution/__init__.py b/nemo_run/core/execution/__init__.py index 6dc35078..7c787a16 100644 --- a/nemo_run/core/execution/__init__.py +++ b/nemo_run/core/execution/__init__.py @@ -13,9 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from nemo_run.core.execution.dgxcloud import DGXCloudExecutor +from nemo_run.core.execution.lepton import LeptonExecutor from nemo_run.core.execution.local import LocalExecutor from nemo_run.core.execution.skypilot import SkypilotExecutor from nemo_run.core.execution.slurm import SlurmExecutor -from nemo_run.core.execution.dgxcloud import DGXCloudExecutor -__all__ = ["LocalExecutor", "SlurmExecutor", "SkypilotExecutor", "DGXCloudExecutor"] +__all__ = [ + "LocalExecutor", + "SlurmExecutor", + "SkypilotExecutor", + "DGXCloudExecutor", + "LeptonExecutor", +] diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py new file mode 100644 index 00000000..936e8e90 --- /dev/null +++ b/nemo_run/core/execution/lepton.py @@ -0,0 +1,352 @@ +import logging +import os +import re +import subprocess +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Optional, Type + +from invoke.context import Context +from leptonai.api.v1.client import APIClient +from leptonai.api.v1.types.affinity import LeptonResourceAffinity +from leptonai.api.v1.types.common import Metadata +from leptonai.api.v1.types.deployment import EnvVar, LeptonContainer, Mount +from leptonai.api.v1.types.job import LeptonJob, LeptonJobState, LeptonJobUserSpec +from leptonai.api.v1.types.replica import Replica + +from nemo_run.config import get_nemorun_home +from nemo_run.core.execution.base import Executor, ExecutorMacros +from nemo_run.core.packaging.base import Packager +from nemo_run.core.packaging.git import GitArchivePackager + +logger = logging.getLogger(__name__) + + +@dataclass(kw_only=True) +class LeptonExecutor(Executor): + """ + Dataclass to configure a Lepton Executor. + + This executor integrates with a Lepton endpoint for launching jobs + via a REST API. It acquires an auth token, identifies the project/cluster, + and launches jobs with a specified command. It can be adapted to meet user + authentication and job-submission requirements on Lepton. + """ + + container_image: str + nemo_run_dir: str + launched_from_cluster: bool = False + nodes: int = 1 + gpus_per_node: int = 0 + nprocs_per_node: int = 1 + shared_memory_size: int = 65536 + resource_shape: str = "" + node_group: str = "" + lepton_job_dir: str = field(init=False, default="") + custom_spec: dict[str, Any] = field(default_factory=dict) + + def stop_job(self, job_id: str): + """ + Send a stop signal to the requested job + """ + client = APIClient() + + if not self.status(job_id) == LeptonJobState.Running: + logger.info(f"Job {job_id} is not currently running. No action taken.") + return + + # Send a "Stopped" signal to the job + client.job.update(job_id, spec={"spec": {"stopped": True}}) + logger.info(f"Job {job_id} stopped successfully.") + + def move_data(self, sleep: float = 10) -> None: + """ + Moves job directory into PVC and deletes the workload after completion + """ + client = APIClient() + client.storage.create_dir(additional_path=self.lepton_job_dir) + + # Create all sub-directories in the directory tree + # Then, copy all files to the storage + for root, dirs, files in os.walk(self.job_dir): + # Create the sub-directories + for dir in dirs: + abs_path = os.path.join(root, dir) + relative_path = os.path.join(self.lepton_job_dir, abs_path.replace(self.job_dir, "").lstrip("/")) + client.storage.create_dir(additional_path=relative_path) + # Copy the files in each sub-directory to the remote filesystem + for file in files: + abs_path = os.path.join(root, file) + relative_path = os.path.join(self.lepton_job_dir, abs_path.replace(self.job_dir, "").lstrip("/")) + client.storage.create_file( + local_path=abs_path, + remote_path=relative_path + ) + + def setup_distributed_pytorch(self) -> str: + """ + Runs a custom script from Lepton to setup the distributed PyTorch + environment variables required for distributed PyTorch jobs. + """ + distributed_command = ( + "wget -O init.sh https://raw.githubusercontent.com/leptonai/scripts/main/lepton_env_to_pytorch.sh && " + "chmod +x init.sh && " + "source init.sh" + ) + return distributed_command + + def create_lepton_job(self, name: str): + """ + Creates a distributed PyTorch job using the provided project/cluster IDs. + """ + client = APIClient() + + envs = [EnvVar(name=key, value=value) for key, value in self.env_vars.items()] + + cmd = [ + "/bin/bash", + "-c", + f"chmod +x {self.lepton_job_dir}/launch_script.sh && bash {self.lepton_job_dir}/launch_script.sh" + ] + + # Get node groups + node_groups = client.nodegroup.list_all() + node_group_map = {ng.metadata.name: ng for ng in node_groups} + node_group_id = node_group_map[self.node_group] + + # Get node IDs + valid_node_ids = set() + node_ids = client.nodegroup.list_nodes(node_group_id) + for node in node_ids: + valid_node_ids.add(node.metadata.id_) + + job_spec = LeptonJobUserSpec( + resource_shape=self.resource_shape, + affinity=LeptonResourceAffinity( + allowed_dedicated_node_groups=[node_group_id.metadata.id_], + allowed_nodes_in_node_group=list(valid_node_ids), + ), + container=LeptonContainer(image=self.container_image, command=cmd), + shared_memory_size=self.shared_memory_size, + completions=self.nodes, + parallelism=self.nodes, + max_failure_retry=None, + max_job_failure_retry=None, + envs=envs, + mounts=[ + Mount(path="/nemo-workspace", mount_path="/nemo-workspace") + ], + image_pull_secrets=[], + ttl_seconds_after_finished=None, + intra_job_communication=True, + privileged=False, + metrics=None, + log=None, + queue_config=None, + stopped=None, + reservation_config=None, + ) + job = LeptonJob(spec=job_spec, metadata=Metadata(id=name)) + + created_job = client.job.create(job) + return created_job + + def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: + name = name.replace("_", "-").replace(".", "-") # to meet K8s requirements + launch_script = f""" +wget -O init.sh https://raw.githubusercontent.com/leptonai/scripts/main/lepton_env_to_pytorch.sh +chmod +x init.sh +source init.sh +ln -s {self.lepton_job_dir}/ /nemo_run +cd /nemo_run/code +{" ".join(cmd)} +""" + + with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f: + f.write(launch_script) + + logger.info("Copying experiment directory to remote filesystem") + self.move_data() + + logger.info("Creating distributed workload") + job = self.create_lepton_job(name) + if not job: + raise RuntimeError(f"Failed to create Lepton job") + + job_id = job.metadata.id_ + status = self.status(job_id) + return job_id, status + + def nnodes(self) -> int: + return self.nodes + + def nproc_per_node(self) -> int: + # Default to the number of GPUs specified per node + # If user doesn't want GPUs, can run multiple processes with CPU only + if self.gpus_per_node: + return self.gpus_per_node + elif self.nprocs_per_node: + return self.nprocs_per_node + return 1 + + def status(self, job_id: str) -> Optional[LeptonJobState]: + client = APIClient() + job = client.job.get(job_id) + + if not job: + return LeptonJobState.Unknown + + # Lepton marks a job as Running when at least one pod is running + # which can cause issues as all pods need to be running in order + # to query it. Override the job state to check if all pods are + # actually running. If not, set the status to Starting and wait + # until all pods are ready. + if job.status.state == LeptonJobState.Running: + if job.status.ready < job.status.active: + return LeptonJobState.Starting + return job.status.state + + def cancel(self, job_id: str): + client = APIClient() + client.job.delete(job_id) + logger.info(f"Successfully cancelled job {job_id} on Lepton") + + @classmethod + def logs(cls: Type["LeptonExecutor"], app_id: str, fallback_path: Optional[str]): + client = APIClient() + + # Get the first replica from the job which contains the job logs + def _first_replica(job_id: str) -> Replica: + client = APIClient() + first_replica = None + + replicas = client.job.get_replicas(job_id) + + for replica in replicas: + replica_id = replica.metadata.id_ + # The first replica has the pattern -0-xxxxx + # where xxxxx is a unique ID for each worker. Subsequent + # workers increase the number between and the + # unique ID. For example, if a job-id is "my-nemo-job" + # the first replica would be "my-nemo-job-0-xxxxx", + # the second would be "my-nemo-job-1-yyyyy", and so on. + if replica_id.replace(job_id, "").startswith("-0"): + first_replica = replica + if not first_replica: + raise RuntimeError(f"Unable to retrieve workers for job {job_id}") + return first_replica + + def _status(job_id: str): + client = APIClient() + job = client.job.get(job_id) + + if not job: + return LeptonJobState.Unknown + + # Lepton marks a job as Running when at least one pod is running + # which can cause issues as all pods need to be running in order + # to query it. Override the job state to check if all pods are + # actually running. If not, set the status to Starting and wait + # until all pods are ready. + if job.status.state == LeptonJobState.Running: + if job.status.ready < job.status.active: + return LeptonJobState.Starting + return job.status.state + + # Regex pattern to remove everything up to and including the second '___' + job_id = re.sub(r"^(?:.*?___){2}", "", app_id) + + # Wait for the job to be in the Running state prior to reading the logs + while _status(job_id) != LeptonJobState.Running: + time.sleep(1) + replica = _first_replica(job_id) + logs = client.job.get_log(id_or_job=job_id, replica=replica) + + for line in logs: + print(line) + + def cleanup(self, handle: str): ... + + def assign( + self, + exp_id: str, + exp_dir: str, + task_id: str, + task_dir: str, + ): + self.job_name = task_id + self.experiment_dir = exp_dir + self.job_dir = os.path.join(exp_dir, task_dir) + + # setting linked PVC job directory + nemo_run_home = get_nemorun_home() + job_subdir = self.job_dir[len(nemo_run_home) + 1 :] # +1 to remove the initial backslash + self.lepton_job_dir = os.path.join(self.nemo_run_dir, job_subdir) + + logger.info( + "Lepton job directory set as: %s", + self.lepton_job_dir, + ) + self.experiment_id = exp_id + + def get_launcher_prefix(self) -> Optional[list[str]]: + launcher = self.get_launcher() + if launcher.nsys_profile: + return launcher.get_nsys_prefix(profile_dir="/nemo_run") + + def package_configs(self, *cfgs: tuple[str, str]) -> list[str]: + filenames = [] + basepath = os.path.join(self.job_dir, "configs") + for name, cfg in cfgs: + filename = os.path.join(basepath, name) + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, "w") as f: + f.write(cfg) + filenames.append( + os.path.join( + "/nemo_run/configs", + name, + ) + ) + return filenames + + def package(self, packager: Packager, job_name: str): + assert self.experiment_id, "Executor not assigned to an experiment." + if isinstance(packager, GitArchivePackager): + output = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], + check=True, + stdout=subprocess.PIPE, + ) + path = output.stdout.splitlines()[0].decode() + base_path = Path(path).absolute() + else: + base_path = Path(os.getcwd()).absolute() + + local_pkg = packager.package(base_path, self.job_dir, job_name) + local_code_extraction_path = os.path.join(self.job_dir, "code") + ctx = Context() + ctx.run(f"mkdir -p {local_code_extraction_path}") + + if self.get_launcher().nsys_profile: + remote_nsys_extraction_path = os.path.join( + self.job_dir, self.get_launcher().nsys_folder + ) + ctx.run(f"mkdir -p {remote_nsys_extraction_path}") + if local_pkg: + ctx.run( + f"tar -xvzf {local_pkg} -C {local_code_extraction_path} --ignore-zeros", hide=True + ) + + def macro_values(self) -> Optional[ExecutorMacros]: + return None + + def _default_headers(self, token: Optional[str] = None) -> dict: + headers = { + "Accept": "application/json", + "Content-Type": "application/json", + } + if token: + headers["Authorization"] = f"Bearer {token}" + return headers diff --git a/nemo_run/run/experiment.py b/nemo_run/run/experiment.py index 93b6b60f..ddc7b00e 100644 --- a/nemo_run/run/experiment.py +++ b/nemo_run/run/experiment.py @@ -35,8 +35,9 @@ from rich.console import Group from rich.live import Live from rich.panel import Panel -from rich.progress import BarColumn, Progress, SpinnerColumn, TaskID, TimeElapsedColumn +from rich.progress import BarColumn, Progress, SpinnerColumn from rich.progress import Task as RichTask +from rich.progress import TaskID, TimeElapsedColumn from rich.syntax import Syntax from torchx.specs.api import AppState @@ -52,6 +53,7 @@ from nemo_run.core.execution.base import Executor from nemo_run.core.execution.dgxcloud import DGXCloudExecutor from nemo_run.core.execution.docker import DockerExecutor +from nemo_run.core.execution.lepton import LeptonExecutor from nemo_run.core.execution.local import LocalExecutor from nemo_run.core.execution.skypilot import SkypilotExecutor from nemo_run.core.execution.slurm import SlurmExecutor @@ -202,8 +204,14 @@ class Experiment(ConfigurableMixin): SkypilotExecutor, DockerExecutor, DGXCloudExecutor, + LeptonExecutor, + ) + _DETACH_SUPPORTED_EXECUTORS = ( + SlurmExecutor, + SkypilotExecutor, + DGXCloudExecutor, + LeptonExecutor, ) - _DETACH_SUPPORTED_EXECUTORS = (SlurmExecutor, SkypilotExecutor, DGXCloudExecutor) _DEPENDENCY_SUPPORTED_EXECUTORS = (SlurmExecutor,) _RUNNER_DEPENDENT_EXECUTORS = (LocalExecutor,) _CONFIG_FILE = "_CONFIG" diff --git a/nemo_run/run/torchx_backend/components/torchrun.py b/nemo_run/run/torchx_backend/components/torchrun.py index 8dc9e498..f1ea4d4f 100644 --- a/nemo_run/run/torchx_backend/components/torchrun.py +++ b/nemo_run/run/torchx_backend/components/torchrun.py @@ -61,6 +61,7 @@ def torchrun( mounts: Optional[list[str]] = None, debug: bool = False, dgxc: bool = False, + lepton: bool = False, use_env: bool = False, ) -> specs.AppDef: """ @@ -96,6 +97,7 @@ def torchrun( See scheduler documentation for more info. debug: whether to run with preset debug flags enabled dgxc: whether to use a subset of settings for DGX Cloud + lepton: whether the experiment is running on Lepton AI """ if (script is None) == (m is None): raise ValueError("exactly one of --script and -m must be specified") @@ -140,6 +142,19 @@ def torchrun( if dgxc: cmd = ["--nnodes", nnodes_rep, "--nproc-per-node", nproc_per_node] + elif lepton: + cmd = [ + "--nnodes", + nnodes_rep, + "--nproc-per-node", + nproc_per_node, + "--node-rank", + torchx_dist._noquote("$NODE_RANK"), + "--master-addr", + torchx_dist._noquote("$MASTER_ADDR"), + "--master-port", + torchx_dist._noquote("$MASTER_PORT"), + ] else: cmd = [ "--rdzv-backend", diff --git a/nemo_run/run/torchx_backend/packaging.py b/nemo_run/run/torchx_backend/packaging.py index 49857c90..1de83ba2 100644 --- a/nemo_run/run/torchx_backend/packaging.py +++ b/nemo_run/run/torchx_backend/packaging.py @@ -25,6 +25,7 @@ from nemo_run.core.execution.base import Executor from nemo_run.core.execution.dgxcloud import DGXCloudExecutor from nemo_run.core.execution.launcher import FaultTolerance, Torchrun +from nemo_run.core.execution.lepton import LeptonExecutor from nemo_run.core.execution.local import LocalExecutor from nemo_run.core.serialization.yaml import YamlSerializer from nemo_run.core.serialization.zlib_json import ZlibJSONSerializer @@ -167,6 +168,7 @@ def _get_details_from_script(fn_or_script: Script, serialize_configs: bool): debug=executor.packager.debug, max_retries=executor.retries, dgxc=isinstance(executor, DGXCloudExecutor), + lepton=isinstance(executor, LeptonExecutor), use_env=use_env, ) elif launcher and isinstance(launcher, FaultTolerance): diff --git a/nemo_run/run/torchx_backend/schedulers/api.py b/nemo_run/run/torchx_backend/schedulers/api.py index 853b335a..5ade157d 100644 --- a/nemo_run/run/torchx_backend/schedulers/api.py +++ b/nemo_run/run/torchx_backend/schedulers/api.py @@ -20,6 +20,7 @@ from nemo_run.core.execution.base import Executor from nemo_run.core.execution.dgxcloud import DGXCloudExecutor from nemo_run.core.execution.docker import DockerExecutor +from nemo_run.core.execution.lepton import LeptonExecutor from nemo_run.core.execution.local import LocalExecutor from nemo_run.core.execution.skypilot import SkypilotExecutor from nemo_run.core.execution.slurm import SlurmExecutor @@ -30,6 +31,7 @@ LocalExecutor: "local_persistent", DockerExecutor: "docker_persistent", DGXCloudExecutor: "dgx_cloud", + LeptonExecutor: "lepton", } REVERSE_EXECUTOR_MAPPING: dict[str, Type[Executor]] = { @@ -38,6 +40,7 @@ "local_persistent": LocalExecutor, "docker_persistent": DockerExecutor, "dgx_cloud": DGXCloudExecutor, + "lepton": LeptonExecutor, } diff --git a/nemo_run/run/torchx_backend/schedulers/lepton.py b/nemo_run/run/torchx_backend/schedulers/lepton.py new file mode 100644 index 00000000..f68e86a2 --- /dev/null +++ b/nemo_run/run/torchx_backend/schedulers/lepton.py @@ -0,0 +1,233 @@ +import json +import logging +import os +import shutil +import tempfile +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Optional + +import fiddle as fdl +import fiddle._src.experimental.dataclasses as fdl_dc +from leptonai.api.v1.types.job import LeptonJobState +from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, ListAppResponse, Scheduler +from torchx.specs import AppDef, AppState, ReplicaStatus, Role, RoleStatus, runopts + +from nemo_run.config import get_nemorun_home +from nemo_run.core.execution.base import Executor +from nemo_run.core.execution.lepton import LeptonExecutor +from nemo_run.core.serialization.zlib_json import ZlibJSONSerializer +from nemo_run.run.torchx_backend.schedulers.api import SchedulerMixin + +# Local placeholder for storing Lepton job states +LEPTON_JOB_DIRS = os.path.join(get_nemorun_home(), ".lepton_jobs.json") + +# Example mapping from some Lepton statuses to the TorchX AppState +LEPTON_STATES: dict[LeptonJobState, AppState] = { + LeptonJobState.Starting: AppState.PENDING, + LeptonJobState.Running: AppState.RUNNING, + LeptonJobState.Failed: AppState.FAILED, + LeptonJobState.Completed: AppState.SUCCEEDED, + LeptonJobState.Deleting: AppState.RUNNING, + LeptonJobState.Restarting: AppState.PENDING, + LeptonJobState.Archived: AppState.SUCCEEDED, + LeptonJobState.Stopped: AppState.CANCELLED, + LeptonJobState.Stopping: AppState.CANCELLED, + LeptonJobState.Unknown: AppState.FAILED, +} + +log = logging.getLogger(__name__) + + +@dataclass +class LeptonRequest: + """ + Wrapper around the torchx AppDef and the Lepton executor. + This object is used to store job submission info for the scheduler. + """ + + app: AppDef + executor: LeptonExecutor + cmd: list[str] + name: str + + +class LeptonScheduler(SchedulerMixin, Scheduler[dict[str, str]]): # type: ignore + def __init__(self, session_name: str) -> None: + super().__init__("lepton", session_name) + + def _run_opts(self) -> runopts: + opts = runopts() + opts.add( + "job_dir", + type_=str, + help="The directory to place the job code and outputs." + " The directory must not exist and will be created.", + ) + return opts + + def _submit_dryrun( # type: ignore + self, + app: AppDef, + cfg: Executor, + ) -> AppDryRunInfo[LeptonRequest]: + assert isinstance(cfg, LeptonExecutor), ( + f"{cfg.__class__} not supported for Lepton scheduler." + ) + executor = cfg + + assert len(app.roles) == 1, "Only single-role apps are supported." + role = app.roles[0] + values = cfg.macro_values() + if values: + role = values.apply(role) + + cmd = [role.entrypoint] + role.args + return AppDryRunInfo( + LeptonRequest(app=app, executor=executor, cmd=cmd, name=role.name), + # Minimal function to show the config, if any + lambda req: f"Lepton job for app: {req.app.name}, cmd: {' '.join(cmd)}, executor: {executor}", + ) + + def schedule(self, dryrun_info: AppDryRunInfo[LeptonRequest]) -> str: + """ + Launches a job on Lepton using the LeptonExecutor. Returns an app_id + used by TorchX for subsequent queries/cancellations. + """ + req = dryrun_info.request + executor = req.executor + + # If needed, package or prepare code (executor may no-op). + executor.package(executor.packager, job_name=executor.job_name) + + # The LeptonExecutor's launch call typically returns (job_id, handle). + # We'll call it without additional parameters here. + job_id, status = executor.launch(name=req.name, cmd=req.cmd) + if not job_id: + raise RuntimeError("Failed scheduling run on Lepton: no job_id returned") + + # Example app_id format: + # ______ + # If we have no explicit role name, we fall back to the app name. + role_name = req.app.roles[0].name + # If experiment_id is not set, fake one for demonstration: + experiment_id = getattr(executor, "experiment_id", "lepton_experiment") + app_id = f"{experiment_id}___{role_name}___{job_id}" + + # Store a status entry or logs path if available + # Currently, the LeptonExecutor status is placeholder, but we keep the pattern + _save_job_dir(app_id, job_status=status, executor=executor) + + return app_id + + def describe(self, app_id: str) -> Optional[DescribeAppResponse]: + """ + Returns information about the job. If there's no recognized job, + returns None. + """ + # We split out the stored values from the JSON file + stored_data = _get_job_dirs() + job_info = stored_data.get(app_id) + _, role_name, job_id = app_id.split("___") + roles = [Role(name=role_name, image="", num_replicas=1)] + roles_statuses = [ + RoleStatus( + role_name, + replicas=[ + ReplicaStatus(id=0, role=role_name, state=AppState.SUBMITTED, hostname="") + ], + ) + ] + + if not job_info: + return None + + executor: LeptonExecutor = job_info.get("executor", None) # type: ignore + if not executor: + return None + + lepton_state = executor.status(job_id) or LeptonJobState.Unknown + app_state = LEPTON_STATES.get(lepton_state, AppState.UNKNOWN) + roles_statuses[0].replicas[0].state = app_state + + return DescribeAppResponse( + app_id=app_id, + roles=roles, + roles_statuses=roles_statuses, + state=app_state, + msg="", + ) + + def _cancel_existing(self, app_id: str) -> None: + """ + Cancels the job by calling the LeptonExecutor's cancel method. + """ + stored_data = _get_job_dirs() + job_info = stored_data.get(app_id) + _, _, job_id = app_id.split("___") + executor: LeptonExecutor = job_info.get("executor", None) # type: ignore + if not executor: + return None + executor.cancel(job_id) + + def list(self) -> list[ListAppResponse]: ... + + def _validate(self, app: AppDef, scheduler: str) -> None: + # For demonstration, skip validation + pass + + +def create_scheduler(session_name: str, **kwargs: Any) -> LeptonScheduler: + return LeptonScheduler(session_name=session_name) + + +def _save_job_dir(app_id: str, job_status: str, executor: LeptonExecutor) -> None: + """ + Saves or updates local record of job status in JSON for demonstration. + """ + original_apps = {} + os.makedirs(os.path.dirname(LEPTON_JOB_DIRS), exist_ok=True) + if not os.path.isfile(LEPTON_JOB_DIRS): + Path(LEPTON_JOB_DIRS).touch() + + serializer = ZlibJSONSerializer() + with open(LEPTON_JOB_DIRS, "r+") as f: + try: + original_apps = json.load(f) + except Exception: + original_apps = {} + + app = { + "job_status": job_status, + "executor": serializer.serialize( + fdl_dc.convert_dataclasses_to_configs(executor, allow_post_init=True) + ), + } + original_apps[app_id] = app + + with tempfile.NamedTemporaryFile(mode="w+", delete=False) as fp: + json.dump(original_apps, fp) + temp_path = fp.name + + f.close() + shutil.move(temp_path, LEPTON_JOB_DIRS) + + +def _get_job_dirs() -> dict[str, dict[str, str]]: + """ + Retrieves local record of job status in JSON for demonstration. + """ + if not os.path.isfile(LEPTON_JOB_DIRS): + return {} + with open(LEPTON_JOB_DIRS, "r") as f: + data = json.load(f) + + serializer = ZlibJSONSerializer() + for app in data.values(): + try: + app["executor"] = fdl.build(serializer.deserialize(app["executor"])) + except Exception as e: + log.debug(f"Failed to deserialize executor: {e}") + continue + + return data diff --git a/pyproject.toml b/pyproject.toml index 458df193..92db7bd4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "cryptography < 43.0.0", "networkx >= 3.3", "omegaconf>=2.3.0", + "leptonai>=0.23.1", "packaging", "toml" ] @@ -46,6 +47,7 @@ skypilot = "nemo_run.run.torchx_backend.schedulers.skypilot:create_scheduler" local_persistent = "nemo_run.run.torchx_backend.schedulers.local:create_scheduler" docker_persistent = "nemo_run.run.torchx_backend.schedulers.docker:create_scheduler" dgx_cloud = "nemo_run.run.torchx_backend.schedulers.dgxcloud:create_scheduler" +lepton = "nemo_run.run.torchx_backend.schedulers.lepton:create_scheduler" [project.optional-dependencies] skypilot = [ From 100933cd60a3b22040a34b330a0feacd17a276d9 Mon Sep 17 00:00:00 2001 From: Robert Clark Date: Thu, 1 May 2025 14:33:56 -0700 Subject: [PATCH 2/5] Add custom mounts to Lepton batch jobs Allow users to specify custom mounts using Lepton's Filesystem functionality. Signed-Off-By: Robert Clark --- docs/source/guides/execution.md | 5 +++++ nemo_run/core/execution/lepton.py | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/source/guides/execution.md b/docs/source/guides/execution.md index 690093de..c976797c 100644 --- a/docs/source/guides/execution.md +++ b/docs/source/guides/execution.md @@ -280,6 +280,10 @@ def your_lepton_executor(nodes: int, gpus_per_node: int, container_image: str): resource_shape = "gpu.8xh100-80gb" # Replace with your desired resource shape representing the number of GPUs in a pod node_group = "my-node-group" # The node group to run the job in nemo_run_dir = "/nemo-workspace/nemo-run" # The NeMo-Run directory where experiments are saved + # Define the remote storage directory that will be mounted in the job pods + # Ensure the path specified here contains your NEMORUN_HOME + storage_path = "/nemo-workspace" # The remote storage directory to mount in jobs + mount_path = "/nemo-workspace" # The path where the remote storage directory will be mounted inside the container executor = run.LeptonExecutor( resource_shape=resource_shape, @@ -288,6 +292,7 @@ def your_lepton_executor(nodes: int, gpus_per_node: int, container_image: str): nodes=nodes, nemo_run_dir=nemo_run_dir, gpus_per_node=gpus_per_node, + mounts=[{"path": storage_path, "mount_path": mount_path}], # Optional: Add custom environment variables or PyTorch specs if needed env_vars=common_envs(), # packager=run.GitArchivePackager() # Choose appropriate packager diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index 936e8e90..b396f885 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -43,6 +43,7 @@ class LeptonExecutor(Executor): shared_memory_size: int = 65536 resource_shape: str = "" node_group: str = "" + mounts: list[dict[str, Any]] = field(default_factory=list) lepton_job_dir: str = field(init=False, default="") custom_spec: dict[str, Any] = field(default_factory=dict) @@ -135,7 +136,7 @@ def create_lepton_job(self, name: str): max_job_failure_retry=None, envs=envs, mounts=[ - Mount(path="/nemo-workspace", mount_path="/nemo-workspace") + Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts ], image_pull_secrets=[], ttl_seconds_after_finished=None, From f66819f589a8087d701420d59f76ce3ab10db802 Mon Sep 17 00:00:00 2001 From: Robert Clark Date: Tue, 6 May 2025 14:31:26 -0700 Subject: [PATCH 3/5] Add error handling to LeptonExecutor Handle more possible failure scenarios for the LeptonExecutor where the code could run into a bad state and the user should be alerted with helpful debug info. Signed-Off-By: Robert Clark --- nemo_run/core/execution/lepton.py | 63 ++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index b396f885..e297b9a2 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -5,12 +5,13 @@ import time from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Optional, Type +from typing import Any, Optional, Set, Type from invoke.context import Context from leptonai.api.v1.client import APIClient from leptonai.api.v1.types.affinity import LeptonResourceAffinity from leptonai.api.v1.types.common import Metadata +from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup from leptonai.api.v1.types.deployment import EnvVar, LeptonContainer, Mount from leptonai.api.v1.types.job import LeptonJob, LeptonJobState, LeptonJobUserSpec from leptonai.api.v1.types.replica import Replica @@ -85,17 +86,32 @@ def move_data(self, sleep: float = 10) -> None: remote_path=relative_path ) - def setup_distributed_pytorch(self) -> str: + def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup: """ - Runs a custom script from Lepton to setup the distributed PyTorch - environment variables required for distributed PyTorch jobs. + Find the node group ID for the passed node group. + + Lists all node groups available to the user and matches the node group requested + from the user with the list of node groups. Assumes there are no duplicate node groups. """ - distributed_command = ( - "wget -O init.sh https://raw.githubusercontent.com/leptonai/scripts/main/lepton_env_to_pytorch.sh && " - "chmod +x init.sh && " - "source init.sh" - ) - return distributed_command + node_groups = client.nodegroup.list_all() + node_group_map = {ng.metadata.name: ng for ng in node_groups} + node_group_id = node_group_map[self.node_group] + return node_group_id + + def _valid_node_ids(self, node_group_id: DedicatedNodeGroup, client: APIClient) -> Set: + """ + Find all of the node IDs that are available within the requested node group. + + Lepton will only schedule jobs on nodes that are part of the requested node + group that match the user-specified resource shape. List all of the node IDs + within the node group and set them as available nodes. + """ + valid_node_ids = set() + node_ids = client.nodegroup.list_nodes(node_group_id) + for node in node_ids: + valid_node_ids.add(node.metadata.id_) + + return valid_node_ids def create_lepton_job(self, name: str): """ @@ -111,16 +127,13 @@ def create_lepton_job(self, name: str): f"chmod +x {self.lepton_job_dir}/launch_script.sh && bash {self.lepton_job_dir}/launch_script.sh" ] - # Get node groups - node_groups = client.nodegroup.list_all() - node_group_map = {ng.metadata.name: ng for ng in node_groups} - node_group_id = node_group_map[self.node_group] + # Get ID of requested node group + node_group_id = self._node_group_id(client) + if not node_group_id.metadata.id_: + raise RuntimeError(f"Unable to find node group ID for node group {self.node_group}") # Get node IDs - valid_node_ids = set() - node_ids = client.nodegroup.list_nodes(node_group_id) - for node in node_ids: - valid_node_ids.add(node.metadata.id_) + valid_node_ids = self._valid_node_ids(node_group_id, client) job_spec = LeptonJobUserSpec( resource_shape=self.resource_shape, @@ -173,10 +186,16 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: logger.info("Creating distributed workload") job = self.create_lepton_job(name) if not job: - raise RuntimeError(f"Failed to create Lepton job") + raise RuntimeError("Failed to create Lepton job") job_id = job.metadata.id_ + + if not job_id: + raise RuntimeError("Failed to retrieve job information") status = self.status(job_id) + + if not status: + raise RuntimeError("Failed to retrieve job status") return job_id, status def nnodes(self) -> int: @@ -195,7 +214,7 @@ def status(self, job_id: str) -> Optional[LeptonJobState]: client = APIClient() job = client.job.get(job_id) - if not job: + if not job or not job.status: return LeptonJobState.Unknown # Lepton marks a job as Running when at least one pod is running @@ -226,6 +245,8 @@ def _first_replica(job_id: str) -> Replica: for replica in replicas: replica_id = replica.metadata.id_ + if not replica_id: + continue # The first replica has the pattern -0-xxxxx # where xxxxx is a unique ID for each worker. Subsequent # workers increase the number between and the @@ -242,7 +263,7 @@ def _status(job_id: str): client = APIClient() job = client.job.get(job_id) - if not job: + if not job or not job.status: return LeptonJobState.Unknown # Lepton marks a job as Running when at least one pod is running From 2071ad1fcef9769b8cabb1ad6ae05b3ed3f688c4 Mon Sep 17 00:00:00 2001 From: Robert Clark Date: Wed, 7 May 2025 10:09:35 -0700 Subject: [PATCH 4/5] Use a low-resource pod to move data to Lepton Running a low-resource pod to copy experiment data to the Lepton cluster is more reliable and broadly compatible with various cluster types versus the storage API. Signed-Off-By: Robert Clark --- nemo_run/core/execution/lepton.py | 89 +++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 27 deletions(-) diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index e297b9a2..9168d3c8 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -1,18 +1,28 @@ +import base64 import logging import os import re import subprocess +import tempfile import time from dataclasses import dataclass, field +from datetime import datetime from pathlib import Path -from typing import Any, Optional, Set, Type +from typing import Any, List, Optional, Set, Type from invoke.context import Context from leptonai.api.v1.client import APIClient from leptonai.api.v1.types.affinity import LeptonResourceAffinity -from leptonai.api.v1.types.common import Metadata +from leptonai.api.v1.types.common import LeptonVisibility, Metadata from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup -from leptonai.api.v1.types.deployment import EnvVar, LeptonContainer, Mount +from leptonai.api.v1.types.deployment import ( + EnvVar, + LeptonContainer, + LeptonDeployment, + LeptonDeploymentUserSpec, + Mount, + ResourceRequirement, +) from leptonai.api.v1.types.job import LeptonJob, LeptonJobState, LeptonJobUserSpec from leptonai.api.v1.types.replica import Replica @@ -62,29 +72,58 @@ def stop_job(self, job_id: str): client.job.update(job_id, spec={"spec": {"stopped": True}}) logger.info(f"Job {job_id} stopped successfully.") + def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> List: + with tempfile.TemporaryDirectory() as temp_dir: + tarball_path = os.path.join(temp_dir, "archive.tar.gz") + subprocess.run(f"tar -czf {tarball_path} -C {local_dir_path} .", shell=True, check=True) + with open(tarball_path, "rb") as file: + file_data = file.read() + encoded_data = base64.b64encode(file_data).decode("utf-8") + + # Delete and recreate directory if it already exists, command to decode base64 data, save to a file, and extract inside the pod + cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz" + full_command = ["sh", "-c", cmd] + return full_command + def move_data(self, sleep: float = 10) -> None: """ - Moves job directory into PVC and deletes the workload after completion + Moves job directory into remote storage and deletes the workload after completion. """ client = APIClient() - client.storage.create_dir(additional_path=self.lepton_job_dir) - - # Create all sub-directories in the directory tree - # Then, copy all files to the storage - for root, dirs, files in os.walk(self.job_dir): - # Create the sub-directories - for dir in dirs: - abs_path = os.path.join(root, dir) - relative_path = os.path.join(self.lepton_job_dir, abs_path.replace(self.job_dir, "").lstrip("/")) - client.storage.create_dir(additional_path=relative_path) - # Copy the files in each sub-directory to the remote filesystem - for file in files: - abs_path = os.path.join(root, file) - relative_path = os.path.join(self.lepton_job_dir, abs_path.replace(self.job_dir, "").lstrip("/")) - client.storage.create_file( - local_path=abs_path, - remote_path=relative_path - ) + cmd = self.copy_directory_data_command(self.job_dir, self.lepton_job_dir) + node_group_id = self._node_group_id(client) + valid_node_ids = self._valid_node_ids(node_group_id, client) + + spec = LeptonDeploymentUserSpec( + container=LeptonContainer( + image="busybox:1.37.0", # Use a very low resource container + command=cmd, + ), + mounts=[ + Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts + ], + ) + spec.resource_requirement = ResourceRequirement( + resource_shape="cpu.small", + affinity=LeptonResourceAffinity( + allowed_dedicated_node_groups=[node_group_id.metadata.id_], + allowed_nodes_in_node_group=valid_node_ids, + ), + min_replicas=1, + max_replicas=1, + ) + custom_name = f"data-mover-{int(datetime.now().timestamp())}" + + deployment = LeptonDeployment( + metadata=Metadata( + id=custom_name, + name=custom_name, + visibility=LeptonVisibility("private"), + ), + spec=spec, + ) + + client.deployment.create(deployment) def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup: """ @@ -121,11 +160,7 @@ def create_lepton_job(self, name: str): envs = [EnvVar(name=key, value=value) for key, value in self.env_vars.items()] - cmd = [ - "/bin/bash", - "-c", - f"chmod +x {self.lepton_job_dir}/launch_script.sh && bash {self.lepton_job_dir}/launch_script.sh" - ] + cmd = ["/bin/bash", "-c", f"bash {self.lepton_job_dir}/launch_script.sh"] # Get ID of requested node group node_group_id = self._node_group_id(client) From 94bce5b10649d977d46223cb6d6442dee18aff05 Mon Sep 17 00:00:00 2001 From: Robert Clark Date: Mon, 12 May 2025 13:32:54 -0700 Subject: [PATCH 5/5] Add unit tests to LeptonExecutor Signed-Off-By: Robert Clark --- nemo_run/core/execution/lepton.py | 11 +- test/core/execution/test_lepton.py | 597 +++++++++++++++++++++++++++++ 2 files changed, 607 insertions(+), 1 deletion(-) create mode 100644 test/core/execution/test_lepton.py diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index 9168d3c8..9609b952 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -133,8 +133,17 @@ def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup: from the user with the list of node groups. Assumes there are no duplicate node groups. """ node_groups = client.nodegroup.list_all() + if len(node_groups) < 1: + raise RuntimeError( + "No node groups found in cluster. Ensure Lepton workspace has at least one node group." + ) node_group_map = {ng.metadata.name: ng for ng in node_groups} - node_group_id = node_group_map[self.node_group] + try: + node_group_id = node_group_map[self.node_group] + except KeyError: + raise RuntimeError( + "Could not find node group that matches requested ID in the Lepton workspace. Ensure your requested node group exists." + ) return node_group_id def _valid_node_ids(self, node_group_id: DedicatedNodeGroup, client: APIClient) -> Set: diff --git a/test/core/execution/test_lepton.py b/test/core/execution/test_lepton.py new file mode 100644 index 00000000..d6d6a5fe --- /dev/null +++ b/test/core/execution/test_lepton.py @@ -0,0 +1,597 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +import tempfile +from types import SimpleNamespace +from unittest.mock import MagicMock, mock_open, patch + +import pytest +from leptonai.api.v1.types.common import LeptonVisibility, Metadata +from leptonai.api.v1.types.deployment import ( + LeptonContainer, + LeptonDeployment, + LeptonDeploymentUserSpec, + LeptonResourceAffinity, + Mount, + ResourceRequirement, +) +from leptonai.api.v1.types.job import LeptonJob + +from nemo_run.core.execution.lepton import LeptonExecutor, LeptonJobState +from nemo_run.core.packaging.git import GitArchivePackager + + +class MockLeptonJob: + def __init__(self, state, ready=0, active=0): + self.status = MagicMock() + self.status.state = state + self.status.ready = ready + self.status.active = active + + +class TestLeptonExecutor: + def test_init(self): + executor = LeptonExecutor( + resource_shape="gpu.8xh100-80gb", + node_group="my-node-group", + container_image="nvcr.io/nvidia/test:latest", + nodes=2, + gpus_per_node=8, + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + + assert executor.resource_shape == "gpu.8xh100-80gb" + assert executor.node_group == "my-node-group" + assert executor.container_image == "nvcr.io/nvidia/test:latest" + assert executor.nodes == 2 + assert executor.gpus_per_node == 8 + assert executor.nemo_run_dir == "/workspace/nemo_run" + assert executor.mounts == [{"path": "/workspace", "mount_path": "/workspace"}] + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_stop_job(self, mock_APIClient): + mock_instance = MagicMock() + mock_job_api = MagicMock() + mock_instance.job = mock_job_api + + mock_job_api.get = MagicMock( + return_value=MockLeptonJob(LeptonJobState.Running, ready=2, active=2) + ) + + mock_APIClient.return_value = mock_instance + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + executor.stop_job("job123") + + mock_job_api.update.assert_called_once_with("job123", spec={"spec": {"stopped": True}}) + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_stop_job_not_running(self, mock_APIClient): + mock_instance = MagicMock() + mock_job_api = MagicMock() + mock_instance.job = mock_job_api + + mock_job_api.get = MagicMock( + return_value=MockLeptonJob( + LeptonJobState.Completed, + ) + ) + + mock_APIClient.return_value = mock_instance + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + executor.stop_job("job123") + + mock_job_api.update.assert_not_called() + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_stop_job_not_found(self, mock_APIClient): + mock_instance = MagicMock() + mock_job_api = MagicMock() + mock_instance.job = mock_job_api + + mock_job_api.get = MagicMock(return_value=None) + + mock_APIClient.return_value = mock_instance + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + executor.stop_job("job123") + + mock_job_api.update.assert_not_called() + + @patch("subprocess.run") + @patch("builtins.open", new_callable=mock_open, read_data=b"mock tarball") + def test_copy_directory_data_command_success(self, mock_file, mock_subprocess): + local_dir_path = "/mock/local/dir" + dest_path = "/mock/destination/path" + + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + response = executor.copy_directory_data_command(local_dir_path, dest_path) + + # The response is in the format ["sh", "-c", ""] + # The actual command is in the final index of the response + command = response[-1] + mock_subprocess.assert_called_once() + assert mock_file.call_count == 1 + + assert "rm -rf /mock/destination/path && mkdir -p /mock/destination/path && echo" in command + assert ( + "base64 -d > /mock/destination/path/archive.tar.gz && tar -xzf /mock/destination/path/archive.tar.gz -C /mock/destination/path && rm /mock/destination/path/archive.tar.gz" + in command + ) + + @patch("tempfile.TemporaryDirectory") + def test_copy_directory_data_command_fails(self, mock_tempdir): + local_dir_path = "/mock/local/dir" + dest_path = "/mock/destination/path" + + mock_tempdir.side_effect = OSError("Temporary directory creation failed") + + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + with pytest.raises(OSError, match="Temporary directory creation failed"): + executor.copy_directory_data_command(local_dir_path, dest_path) + + @patch.object(LeptonExecutor, "copy_directory_data_command") + @patch("nemo_run.core.execution.lepton.datetime") + @patch("nemo_run.core.execution.lepton.APIClient") + def test_move_data_success(self, mock_APIClient, mock_datetime, mock_copy): + mock_instance = MagicMock() + mock_deployment_api = MagicMock() + mock_instance.deployment = mock_deployment_api + mock_deployment_api.create = MagicMock() + mock_copy.return_value = ["sh", "-c", "echo 'hello world'"] + mock_APIClient.return_value = mock_instance + mock_client = mock_APIClient.return_value + mock_nodegroup = MagicMock() + mock_datetime = MagicMock() + mock_datetime_now = MagicMock() + mock_datetime.now = mock_datetime_now + mock_timestamp = MagicMock() + mock_datetime_now.timestamp = MagicMock() + mock_timestamp.return_value = "1" + mock_client.nodegroup = mock_nodegroup + mock_nodegroup.list_all.return_value = [ + SimpleNamespace(metadata=SimpleNamespace(name="123456", id_="my-node-id")) + ] + mock_nodegroup.list_nodes.return_value = [ + SimpleNamespace(metadata=SimpleNamespace(id_="10-10-10-10")) + ] + + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + node_group="123456", + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + + executor.move_data() + + spec = LeptonDeploymentUserSpec( + container=LeptonContainer( + image="busybox:1.37.0", + command=["sh", "-c", "echo 'hello world'"], + ), + mounts=[Mount(path="/workspace", mount_path="/workspace")], + ) + spec.resource_requirement = ResourceRequirement( + resource_shape="cpu.small", + affinity=LeptonResourceAffinity( + allowed_dedicated_node_groups=["my-node-id"], + allowed_nodes_in_node_group=["10-10-10-10"], + ), + min_replicas=1, + max_replicas=1, + ) + custom_name = "data-mover-1" + deployment = LeptonDeployment( + metadata=Metadata( + id=custom_name, name=custom_name, visibility=LeptonVisibility("private") + ), + spec=spec, + ) + + mock_copy.assert_called_once_with("", "") + mock_deployment_api.create.assert_called_once_with(deployment) + + def test_node_group_id(self): + mock_client = MagicMock( + nodegroup=MagicMock( + list_all=MagicMock( + return_value=[ + SimpleNamespace(metadata=SimpleNamespace(name="123456")), + SimpleNamespace(metadata=SimpleNamespace(name="abcdef")), + ] + ) + ) + ) + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + node_group_id = executor._node_group_id(mock_client) + + assert node_group_id == SimpleNamespace(metadata=SimpleNamespace(name="123456")) + + def test_node_group_id_no_groups(self): + mock_client = MagicMock(nodegroup=MagicMock(list_all=MagicMock(return_value=[]))) + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + with pytest.raises(RuntimeError): + executor._node_group_id(mock_client) + + def test_node_group_id_unmatched_node_id(self): + mock_client = MagicMock( + nodegroup=MagicMock( + list_all=MagicMock( + return_value=[ + SimpleNamespace(metadata=SimpleNamespace(name="123456")), + SimpleNamespace(metadata=SimpleNamespace(name="abcdef")), + ] + ) + ) + ) + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="zzzzz", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + with pytest.raises(RuntimeError): + executor._node_group_id(mock_client) + + def test_valid_node_id(self): + mock_client = MagicMock( + nodegroup=MagicMock( + list_nodes=MagicMock( + return_value=[ + SimpleNamespace(metadata=SimpleNamespace(id_="10-10-10-10")), + SimpleNamespace(metadata=SimpleNamespace(id_="20-20-20-20")), + ] + ) + ) + ) + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + node_ids = executor._valid_node_ids(None, mock_client) + + assert node_ids == set(["10-10-10-10", "20-20-20-20"]) + + def test_valid_node_id_no_ids(self): + mock_client = MagicMock(nodegroup=MagicMock(list_nodes=MagicMock(return_value=[]))) + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + node_ids = executor._valid_node_ids(None, mock_client) + + assert node_ids == set([]) + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_create_lepton_job(self, mock_APIClient_class): + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + mock_client.nodegroup.list_all.return_value = [] + valid_node_ids = ["node-id-1", "node-id-2"] + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + executor._valid_node_ids = MagicMock(return_value=valid_node_ids) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + mock_client.job.create.assert_called_once() + + def test_nnodes(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + nodes=3, + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + + assert executor.nnodes() == 3 + + def test_nnodes_default(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + + assert executor.nnodes() == 1 + + def test_nproc_per_node_with_gpus(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + gpus_per_node=4, + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + + assert executor.nproc_per_node() == 4 + + def test_nproc_per_node_with_nprocs(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + gpus_per_node=0, + nprocs_per_node=3, + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + + assert executor.nproc_per_node() == 3 + + def test_nproc_per_node_default(self): + executor = LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + mounts=[{"path": "/workspace", "mount_path": "/workspace"}], + ) + + assert executor.nproc_per_node() == 1 + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_status_running_and_ready(self, mock_APIClient): + mock_instance = MagicMock() + mock_job_api = MagicMock() + mock_instance.job = mock_job_api + + mock_job_api.get = MagicMock( + return_value=MockLeptonJob(LeptonJobState.Running, ready=2, active=2) + ) + + mock_APIClient.return_value = mock_instance + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + status = executor.status("job123") + assert status == LeptonJobState.Running + + mock_job_api.get.assert_called_once_with("job123") + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_status_running_and_not_all_ready(self, mock_APIClient): + mock_instance = MagicMock() + mock_job_api = MagicMock() + mock_instance.job = mock_job_api + + mock_job_api.get = MagicMock( + return_value=MockLeptonJob(LeptonJobState.Running, ready=1, active=2) + ) + + mock_APIClient.return_value = mock_instance + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + status = executor.status("job123") + assert status == LeptonJobState.Starting + + mock_job_api.get.assert_called_once_with("job123") + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_status_starting(self, mock_APIClient): + mock_instance = MagicMock() + mock_job_api = MagicMock() + mock_instance.job = mock_job_api + + mock_job_api.get = MagicMock( + return_value=MockLeptonJob(LeptonJobState.Starting, ready=0, active=2) + ) + + mock_APIClient.return_value = mock_instance + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + status = executor.status("job123") + assert status == LeptonJobState.Starting + + mock_job_api.get.assert_called_once_with("job123") + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_status_unknown(self, mock_APIClient): + mock_instance = MagicMock() + mock_job_api = MagicMock() + mock_instance.job = mock_job_api + + mock_job_api.get = MagicMock( + return_value=MockLeptonJob( + LeptonJobState.Unknown, + ) + ) + + mock_APIClient.return_value = mock_instance + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + status = executor.status("job123") + assert status == LeptonJobState.Unknown + + mock_job_api.get.assert_called_once_with("job123") + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_status_no_job(self, mock_APIClient): + mock_instance = MagicMock() + mock_job_api = MagicMock() + mock_instance.job = mock_job_api + + mock_job_api.get = MagicMock(return_value=None) + + mock_APIClient.return_value = mock_instance + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + status = executor.status("job123") + assert status == LeptonJobState.Unknown + + mock_job_api.get.assert_called_once_with("job123") + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_cancel_job(self, mock_APIClient): + mock_instance = MagicMock() + mock_job_api = MagicMock() + mock_instance.job = mock_job_api + + mock_job_api.get = MagicMock( + return_value=MockLeptonJob(LeptonJobState.Running, ready=2, active=2) + ) + + mock_APIClient.return_value = mock_instance + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + executor.cancel("job123") + + mock_job_api.delete.assert_called_once_with("job123") + + @patch("os.makedirs") + @patch("builtins.open", new_callable=mock_open) + def test_package_configs(self, mock_file, mock_makedirs): + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + configs = [("config1.yaml", "key: value"), ("subdir/config2.yaml", "another: config")] + + filenames = executor.package_configs(*configs) + + assert len(filenames) == 2 + assert filenames[0] == "/nemo_run/configs/config1.yaml" + assert filenames[1] == "/nemo_run/configs/subdir/config2.yaml" + mock_makedirs.assert_called() + assert mock_file.call_count == 2 + + @patch("invoke.context.Context.run") + @patch("subprocess.run") + def test_package_git_packager(self, mock_subprocess_run, mock_context_run): + # Mock subprocess.run which is used to get the git repo path + mock_process = MagicMock() + mock_process.stdout = b"/path/to/repo\n" + mock_subprocess_run.return_value = mock_process + + # Mock the Context.run to avoid actually running commands + mock_context_run.return_value = MagicMock() + + with tempfile.TemporaryDirectory() as tmp_dir: + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + executor.experiment_id = "test_exp" + executor.job_dir = tmp_dir + + packager = GitArchivePackager() + # Mock the package method to avoid real git operations + with patch.object(packager, "package", return_value="/mocked/package.tar.gz"): + executor.package(packager, "test_job") + + # Check that the right methods were called + mock_subprocess_run.assert_called_once_with( + ["git", "rev-parse", "--show-toplevel"], + check=True, + stdout=subprocess.PIPE, + ) + assert mock_context_run.called + + def test_macro_values(self): + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + result = executor.macro_values() + + assert result is None