diff --git a/docs/guides/execution.md b/docs/guides/execution.md index e1efc51f..77ef25e0 100644 --- a/docs/guides/execution.md +++ b/docs/guides/execution.md @@ -375,7 +375,12 @@ def your_lepton_executor(nodes: int, gpus_per_node: int, container_image: str): # pre_launch_commands=["nvidia-smi"], # Optional: Specify image pull secrets for authenticating with container registries # image_pull_secrets=["my-image-pull-secret"], - # packager=run.GitArchivePackager() # Choose appropriate packager + # Optional: Enable preemption scheduling + # can_be_preempted=True, # job yields nodes to higher-priority jobs + # can_preempt=True, # job can evict lower-priority jobs + # queue_priority="mid-4000", # required when either preemption flag is set + # Choose appropriate packager + # packager=run.GitArchivePackager() ) return executor diff --git a/nemo_run/core/execution/lepton.py b/nemo_run/core/execution/lepton.py index 3e3bb916..fbbaa575 100644 --- a/nemo_run/core/execution/lepton.py +++ b/nemo_run/core/execution/lepton.py @@ -35,6 +35,7 @@ EnvValue, LeptonContainer, Mount, + QueueConfig, ) from leptonai.api.v1.types.job import ( LeptonJob, @@ -83,6 +84,9 @@ class LeptonExecutor(Executor): pre_launch_commands: list[str] = field(default_factory=list) # Custom commands before launch head_resource_shape: Optional[str] = "" # Only used for LeptonRayCluster ray_version: Optional[str] = None # Only used for LeptonRayCluster + can_be_preempted: bool = False # job yields nodes to higher-priority jobs + can_preempt: bool = False # job can evict lower-priority jobs + queue_priority: Optional[str] = None # e.g. "mid-4000"; required when either flag is set def stop_job(self, job_id: str): """ @@ -285,7 +289,13 @@ def create_lepton_job(self, name: str): privileged=False, metrics=None, log=None, - queue_config=None, + queue_config=QueueConfig( + priority_class=self.queue_priority or "mid-4000", + can_be_preempted=self.can_be_preempted if self.can_be_preempted else None, + can_preempt=self.can_preempt if self.can_preempt else None, + ) + if (self.can_be_preempted or self.can_preempt) + else None, stopped=None, ) diff --git a/test/core/execution/test_lepton.py b/test/core/execution/test_lepton.py index 4d725133..5aa7e567 100644 --- a/test/core/execution/test_lepton.py +++ b/test/core/execution/test_lepton.py @@ -21,11 +21,12 @@ import pytest from leptonai.api.v1.types.common import LeptonVisibility, Metadata from leptonai.api.v1.types.deployment import ( + EnvValue, + EnvVar, LeptonContainer, LeptonResourceAffinity, Mount, - EnvVar, - EnvValue, + QueueConfig, ) from leptonai.api.v1.types.job import LeptonJob, LeptonJobUserSpec @@ -486,6 +487,145 @@ def test_create_lepton_job_with_empty_reservation_config(self, mock_APIClient_cl created_job = mock_client.job.create.call_args[0][0] assert created_job.spec.reservation_config is None + def test_init_preemption_defaults(self): + """Test that preemption fields default to off.""" + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + + assert executor.can_be_preempted is False + assert executor.can_preempt is False + assert executor.queue_priority is None + + def test_init_with_can_be_preempted(self): + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + can_be_preempted=True, + ) + + assert executor.can_be_preempted is True + + def test_init_with_can_preempt(self): + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + can_preempt=True, + ) + + assert executor.can_preempt is True + + def test_init_with_queue_priority(self): + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + mounts=[{"path": "/test", "mount_path": "/test"}], + can_be_preempted=True, + queue_priority="high-8000", + ) + + assert executor.queue_priority == "high-8000" + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_create_lepton_job_without_preemption(self, mock_APIClient_class): + """Test queue_config is None when neither preemption flag is set.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + ) + executor._valid_node_ids = MagicMock(return_value=["node-id-1"]) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + created_job = mock_client.job.create.call_args[0][0] + assert created_job.spec.queue_config is None + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_create_lepton_job_with_can_be_preempted(self, mock_APIClient_class): + """Test queue_config is set correctly when can_be_preempted=True.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + can_be_preempted=True, + ) + executor._valid_node_ids = MagicMock(return_value=["node-id-1"]) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + created_job = mock_client.job.create.call_args[0][0] + assert created_job.spec.queue_config == QueueConfig( + priority_class="mid-4000", + can_be_preempted=True, + can_preempt=None, + ) + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_create_lepton_job_with_can_preempt(self, mock_APIClient_class): + """Test queue_config is set correctly when can_preempt=True.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + can_preempt=True, + ) + executor._valid_node_ids = MagicMock(return_value=["node-id-1"]) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + created_job = mock_client.job.create.call_args[0][0] + assert created_job.spec.queue_config == QueueConfig( + priority_class="mid-4000", + can_be_preempted=None, + can_preempt=True, + ) + + @patch("nemo_run.core.execution.lepton.APIClient") + def test_create_lepton_job_with_custom_queue_priority(self, mock_APIClient_class): + """Test that a custom queue_priority overrides the 'mid-4000' default.""" + mock_client = mock_APIClient_class.return_value + mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job")) + node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456")) + + executor = LeptonExecutor( + container_image="test-image", + nemo_run_dir="/test/path", + node_group="123456", + mounts=[{"path": "/test", "mount_path": "/test"}], + can_be_preempted=True, + queue_priority="high-8000", + ) + executor._valid_node_ids = MagicMock(return_value=["node-id-1"]) + executor._node_group_id = MagicMock(return_value=node_group) + + executor.create_lepton_job("my-lepton-job") + + created_job = mock_client.job.create.call_args[0][0] + assert created_job.spec.queue_config.priority_class == "high-8000" + def test_nnodes(self): executor = LeptonExecutor( container_image="nvcr.io/nvidia/test:latest", @@ -549,7 +689,11 @@ def test_valid_storage_mounts_with_mount_from(self): container_image="nvcr.io/nvidia/test:latest", nemo_run_dir="/workspace/nemo_run", mounts=[ - {"path": "/workspace", "mount_path": "/workspace", "from": "local-storage:nfs"} + { + "path": "/workspace", + "mount_path": "/workspace", + "from": "local-storage:nfs", + } ], ) @@ -730,7 +874,10 @@ def test_package_configs(self, mock_file, mock_makedirs): mounts=[{"path": "/test", "mount_path": "/test"}], ) - configs = [("config1.yaml", "key: value"), ("subdir/config2.yaml", "another: config")] + configs = [ + ("config1.yaml", "key: value"), + ("subdir/config2.yaml", "another: config"), + ] filenames = executor.package_configs(*configs) @@ -855,7 +1002,9 @@ def test_launch_method_comprehensive( """Test launch method name validation, pre_launch_commands, and script generation.""" # Setup executor = LeptonExecutor( - container_image="test-image", nemo_run_dir="/test", pre_launch_commands=["echo setup"] + container_image="test-image", + nemo_run_dir="/test", + pre_launch_commands=["echo setup"], ) executor.job_dir = executor.lepton_job_dir = "/fake" mock_join.return_value = "/fake/script.sh"