Skip to content

Commit b221881

Browse files
feat: Support for Lepton preemption configuration (#487)
Signed-off-by: Mike McKiernan <mmckiernan@nvidia.com>
1 parent 5289e5c commit b221881

3 files changed

Lines changed: 171 additions & 7 deletions

File tree

docs/guides/execution.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,12 @@ def your_lepton_executor(nodes: int, gpus_per_node: int, container_image: str):
375375
# pre_launch_commands=["nvidia-smi"],
376376
# Optional: Specify image pull secrets for authenticating with container registries
377377
# image_pull_secrets=["my-image-pull-secret"],
378-
# packager=run.GitArchivePackager() # Choose appropriate packager
378+
# Optional: Enable preemption scheduling
379+
# can_be_preempted=True, # job yields nodes to higher-priority jobs
380+
# can_preempt=True, # job can evict lower-priority jobs
381+
# queue_priority="mid-4000", # required when either preemption flag is set
382+
# Choose appropriate packager
383+
# packager=run.GitArchivePackager()
379384
)
380385
return executor
381386

nemo_run/core/execution/lepton.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
EnvValue,
3636
LeptonContainer,
3737
Mount,
38+
QueueConfig,
3839
)
3940
from leptonai.api.v1.types.job import (
4041
LeptonJob,
@@ -83,6 +84,9 @@ class LeptonExecutor(Executor):
8384
pre_launch_commands: list[str] = field(default_factory=list) # Custom commands before launch
8485
head_resource_shape: Optional[str] = "" # Only used for LeptonRayCluster
8586
ray_version: Optional[str] = None # Only used for LeptonRayCluster
87+
can_be_preempted: bool = False # job yields nodes to higher-priority jobs
88+
can_preempt: bool = False # job can evict lower-priority jobs
89+
queue_priority: Optional[str] = None # e.g. "mid-4000"; required when either flag is set
8690

8791
def stop_job(self, job_id: str):
8892
"""
@@ -285,7 +289,13 @@ def create_lepton_job(self, name: str):
285289
privileged=False,
286290
metrics=None,
287291
log=None,
288-
queue_config=None,
292+
queue_config=QueueConfig(
293+
priority_class=self.queue_priority or "mid-4000",
294+
can_be_preempted=self.can_be_preempted if self.can_be_preempted else None,
295+
can_preempt=self.can_preempt if self.can_preempt else None,
296+
)
297+
if (self.can_be_preempted or self.can_preempt)
298+
else None,
289299
stopped=None,
290300
)
291301

test/core/execution/test_lepton.py

Lines changed: 154 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@
2121
import pytest
2222
from leptonai.api.v1.types.common import LeptonVisibility, Metadata
2323
from leptonai.api.v1.types.deployment import (
24+
EnvValue,
25+
EnvVar,
2426
LeptonContainer,
2527
LeptonResourceAffinity,
2628
Mount,
27-
EnvVar,
28-
EnvValue,
29+
QueueConfig,
2930
)
3031
from leptonai.api.v1.types.job import LeptonJob, LeptonJobUserSpec
3132

@@ -486,6 +487,145 @@ def test_create_lepton_job_with_empty_reservation_config(self, mock_APIClient_cl
486487
created_job = mock_client.job.create.call_args[0][0]
487488
assert created_job.spec.reservation_config is None
488489

490+
def test_init_preemption_defaults(self):
491+
"""Test that preemption fields default to off."""
492+
executor = LeptonExecutor(
493+
container_image="test-image",
494+
nemo_run_dir="/test/path",
495+
mounts=[{"path": "/test", "mount_path": "/test"}],
496+
)
497+
498+
assert executor.can_be_preempted is False
499+
assert executor.can_preempt is False
500+
assert executor.queue_priority is None
501+
502+
def test_init_with_can_be_preempted(self):
503+
executor = LeptonExecutor(
504+
container_image="test-image",
505+
nemo_run_dir="/test/path",
506+
mounts=[{"path": "/test", "mount_path": "/test"}],
507+
can_be_preempted=True,
508+
)
509+
510+
assert executor.can_be_preempted is True
511+
512+
def test_init_with_can_preempt(self):
513+
executor = LeptonExecutor(
514+
container_image="test-image",
515+
nemo_run_dir="/test/path",
516+
mounts=[{"path": "/test", "mount_path": "/test"}],
517+
can_preempt=True,
518+
)
519+
520+
assert executor.can_preempt is True
521+
522+
def test_init_with_queue_priority(self):
523+
executor = LeptonExecutor(
524+
container_image="test-image",
525+
nemo_run_dir="/test/path",
526+
mounts=[{"path": "/test", "mount_path": "/test"}],
527+
can_be_preempted=True,
528+
queue_priority="high-8000",
529+
)
530+
531+
assert executor.queue_priority == "high-8000"
532+
533+
@patch("nemo_run.core.execution.lepton.APIClient")
534+
def test_create_lepton_job_without_preemption(self, mock_APIClient_class):
535+
"""Test queue_config is None when neither preemption flag is set."""
536+
mock_client = mock_APIClient_class.return_value
537+
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
538+
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))
539+
540+
executor = LeptonExecutor(
541+
container_image="test-image",
542+
nemo_run_dir="/test/path",
543+
node_group="123456",
544+
mounts=[{"path": "/test", "mount_path": "/test"}],
545+
)
546+
executor._valid_node_ids = MagicMock(return_value=["node-id-1"])
547+
executor._node_group_id = MagicMock(return_value=node_group)
548+
549+
executor.create_lepton_job("my-lepton-job")
550+
551+
created_job = mock_client.job.create.call_args[0][0]
552+
assert created_job.spec.queue_config is None
553+
554+
@patch("nemo_run.core.execution.lepton.APIClient")
555+
def test_create_lepton_job_with_can_be_preempted(self, mock_APIClient_class):
556+
"""Test queue_config is set correctly when can_be_preempted=True."""
557+
mock_client = mock_APIClient_class.return_value
558+
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
559+
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))
560+
561+
executor = LeptonExecutor(
562+
container_image="test-image",
563+
nemo_run_dir="/test/path",
564+
node_group="123456",
565+
mounts=[{"path": "/test", "mount_path": "/test"}],
566+
can_be_preempted=True,
567+
)
568+
executor._valid_node_ids = MagicMock(return_value=["node-id-1"])
569+
executor._node_group_id = MagicMock(return_value=node_group)
570+
571+
executor.create_lepton_job("my-lepton-job")
572+
573+
created_job = mock_client.job.create.call_args[0][0]
574+
assert created_job.spec.queue_config == QueueConfig(
575+
priority_class="mid-4000",
576+
can_be_preempted=True,
577+
can_preempt=None,
578+
)
579+
580+
@patch("nemo_run.core.execution.lepton.APIClient")
581+
def test_create_lepton_job_with_can_preempt(self, mock_APIClient_class):
582+
"""Test queue_config is set correctly when can_preempt=True."""
583+
mock_client = mock_APIClient_class.return_value
584+
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
585+
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))
586+
587+
executor = LeptonExecutor(
588+
container_image="test-image",
589+
nemo_run_dir="/test/path",
590+
node_group="123456",
591+
mounts=[{"path": "/test", "mount_path": "/test"}],
592+
can_preempt=True,
593+
)
594+
executor._valid_node_ids = MagicMock(return_value=["node-id-1"])
595+
executor._node_group_id = MagicMock(return_value=node_group)
596+
597+
executor.create_lepton_job("my-lepton-job")
598+
599+
created_job = mock_client.job.create.call_args[0][0]
600+
assert created_job.spec.queue_config == QueueConfig(
601+
priority_class="mid-4000",
602+
can_be_preempted=None,
603+
can_preempt=True,
604+
)
605+
606+
@patch("nemo_run.core.execution.lepton.APIClient")
607+
def test_create_lepton_job_with_custom_queue_priority(self, mock_APIClient_class):
608+
"""Test that a custom queue_priority overrides the 'mid-4000' default."""
609+
mock_client = mock_APIClient_class.return_value
610+
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
611+
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))
612+
613+
executor = LeptonExecutor(
614+
container_image="test-image",
615+
nemo_run_dir="/test/path",
616+
node_group="123456",
617+
mounts=[{"path": "/test", "mount_path": "/test"}],
618+
can_be_preempted=True,
619+
queue_priority="high-8000",
620+
)
621+
executor._valid_node_ids = MagicMock(return_value=["node-id-1"])
622+
executor._node_group_id = MagicMock(return_value=node_group)
623+
624+
executor.create_lepton_job("my-lepton-job")
625+
626+
created_job = mock_client.job.create.call_args[0][0]
627+
assert created_job.spec.queue_config.priority_class == "high-8000"
628+
489629
def test_nnodes(self):
490630
executor = LeptonExecutor(
491631
container_image="nvcr.io/nvidia/test:latest",
@@ -549,7 +689,11 @@ def test_valid_storage_mounts_with_mount_from(self):
549689
container_image="nvcr.io/nvidia/test:latest",
550690
nemo_run_dir="/workspace/nemo_run",
551691
mounts=[
552-
{"path": "/workspace", "mount_path": "/workspace", "from": "local-storage:nfs"}
692+
{
693+
"path": "/workspace",
694+
"mount_path": "/workspace",
695+
"from": "local-storage:nfs",
696+
}
553697
],
554698
)
555699

@@ -730,7 +874,10 @@ def test_package_configs(self, mock_file, mock_makedirs):
730874
mounts=[{"path": "/test", "mount_path": "/test"}],
731875
)
732876

733-
configs = [("config1.yaml", "key: value"), ("subdir/config2.yaml", "another: config")]
877+
configs = [
878+
("config1.yaml", "key: value"),
879+
("subdir/config2.yaml", "another: config"),
880+
]
734881

735882
filenames = executor.package_configs(*configs)
736883

@@ -855,7 +1002,9 @@ def test_launch_method_comprehensive(
8551002
"""Test launch method name validation, pre_launch_commands, and script generation."""
8561003
# Setup
8571004
executor = LeptonExecutor(
858-
container_image="test-image", nemo_run_dir="/test", pre_launch_commands=["echo setup"]
1005+
container_image="test-image",
1006+
nemo_run_dir="/test",
1007+
pre_launch_commands=["echo setup"],
8591008
)
8601009
executor.job_dir = executor.lepton_job_dir = "/fake"
8611010
mock_join.return_value = "/fake/script.sh"

0 commit comments

Comments
 (0)