Skip to content

Commit 5cf880d

Browse files
committed
Implement Inline Script Execution in Kubeflow Executor
Add support for inline script execution in the KubeflowExecutor using the SDK's function argument injection style. This change allows users to pass scripts directly as inline parameters, enhancing flexibility in task execution. Key changes include: - Introduced `_nemo_inline_entry_params` function for handling inline script execution. - Updated `create_trainjob` and `submit` methods to support inline scripts. - Enhanced logging for better tracking of execution modes. - Improved Kubernetes runtime management, enabling reuse of ClusterTrainingRuntime across experiments with similar configurations. Signed-off-by: Krishnaswamy Subramanian <subramk@thoughtworks.com>
1 parent 22a8e11 commit 5cf880d

File tree

8 files changed

+625
-314
lines changed

8 files changed

+625
-314
lines changed

nemo_run/core/execution/kubeflow.py

Lines changed: 266 additions & 157 deletions
Large diffs are not rendered by default.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
apiVersion: trainer.kubeflow.org/v1alpha1
2+
kind: ClusterTrainingRuntime
3+
metadata:
4+
name: {{ runtime_name }}
5+
namespace: {{ namespace }}
6+
spec:
7+
mlPolicy:
8+
numNodes: {{ nodes }}
9+
torch:
10+
numProcPerNode: "auto"
11+
template:
12+
spec:
13+
replicatedJobs:
14+
- name: node
15+
replicas: {{ nodes }}
16+
template:
17+
metadata:
18+
labels:
19+
trainer.kubeflow.org/trainjob-ancestor-step: trainer
20+
spec:
21+
template:
22+
spec:
23+
volumes:
24+
- name: workspace
25+
configMap:
26+
name: {{ configmap_name }}
27+
containers:
28+
- name: node
29+
image: {{ image }}
30+
volumeMounts:
31+
- name: workspace
32+
mountPath: {{ volume_mount_path }}
33+
resources:
34+
requests: {}
35+
limits:
36+
{% if cpu_limit %}cpu: {{ cpu_limit }}{% endif %}
37+
{% if memory_limit %}memory: {{ memory_limit }}{% endif %}
38+
{% if gpus %}"nvidia.com/gpu": {{ gpus }}{% endif %}

nemo_run/core/packaging/configmap.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class ConfigMapPackager(Packager):
4141
relative_path: str | List[str] = "."
4242
namespace: str = "default"
4343
configmap_prefix: str = "nemo-workspace"
44+
configmap_id: Optional[str] = None # Reusable configmap identifier
4445

4546
def __post_init__(self):
4647
"""Initialize the Kubernetes client."""
@@ -111,11 +112,12 @@ def package(self, path: Path, job_dir: str, name: str) -> str:
111112
Returns:
112113
The name of the created ConfigMap (or intended name if not created)
113114
"""
115+
# Resolve the final ConfigMap name centrally
116+
configmap_name = self.resolve_configmap_name(name)
117+
114118
if self.v1 is None:
115119
logger.warning("Kubernetes client not available, skipping ConfigMap creation")
116-
return f"{self.configmap_prefix}-{name}"
117-
118-
configmap_name = f"{self.configmap_prefix}-{name}"
120+
return configmap_name
119121
files_to_stage = self._find_files_to_package(path)
120122
if not files_to_stage:
121123
logger.warning("No files found to package into ConfigMap")
@@ -165,6 +167,18 @@ def package(self, path: Path, job_dir: str, name: str) -> str:
165167
logger.error(f"Failed to create ConfigMap {configmap_name}: {e}")
166168
return configmap_name
167169

170+
def resolve_configmap_name(self, name: str) -> str:
171+
"""
172+
Resolve the full ConfigMap name from a caller-provided suffix.
173+
174+
Centralizes naming logic so callers never assemble full names.
175+
If configmap_id is set, it takes precedence and is sanitized.
176+
Otherwise, returns "{configmap_prefix}-{name}".
177+
"""
178+
if self.configmap_id:
179+
return f"{self.configmap_prefix}-{sanitize_kubernetes_name(self.configmap_id)}"
180+
return f"{self.configmap_prefix}-{name}"
181+
168182
def _find_files_to_package(self, base_path: Path) -> List[Path]:
169183
"""
170184
Find files to package based on include_pattern and relative_path.
@@ -198,7 +212,8 @@ def cleanup(self, name: str) -> None:
198212
"""
199213
if self.v1 is None:
200214
return
201-
configmap_name = f"{self.configmap_prefix}-{name}"
215+
# Use the same resolution logic as in package()
216+
configmap_name = self.resolve_configmap_name(name)
202217
try:
203218
self.v1.delete_namespaced_config_map(name=configmap_name, namespace=self.namespace)
204219
logger.info(f"Cleaned up ConfigMap: {configmap_name}")

nemo_run/run/experiment.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ def __init__(
342342

343343
self.log_level = log_level
344344
self._runner = get_runner(component_defaults=None, experiment=self)
345+
self._detach_mode = False # Will be set in _run_dag
345346

346347
if not _reconstruct:
347348
self.executor = executor if executor else LocalExecutor()
@@ -471,6 +472,23 @@ def _add_single_job(
471472
task_dir=name if reuse_job_dir else task_id,
472473
)
473474

475+
# Set detach mode on executor if supported
476+
if hasattr(self, "detach") and hasattr(executor, "set_detach_mode"):
477+
set_detach_mode = getattr(executor, "set_detach_mode", None)
478+
if set_detach_mode:
479+
self.console.log(
480+
f"Setting detach mode to {self.detach} on executor {type(executor).__name__}"
481+
)
482+
set_detach_mode(self.detach)
483+
else:
484+
self.console.log(
485+
f"Executor {type(executor).__name__} doesn't support set_detach_mode"
486+
)
487+
else:
488+
self.console.log(
489+
f"Experiment detach mode: {getattr(self, 'detach', 'not set')}, Executor has set_detach_mode: {hasattr(executor, 'set_detach_mode')}"
490+
)
491+
474492
cloned = copy.deepcopy(task) if isinstance(task, Script) else task.clone()
475493
job = Job(
476494
id=task_id,
@@ -783,6 +801,12 @@ def _run_dag(self, detach: bool, tail_logs: bool, executors: set[Executor]):
783801
)
784802
wait = False
785803
self.detach = detach
804+
self._detach_mode = detach
805+
806+
# Create a new runner with detach mode for this execution
807+
from nemo_run.run.torchx_backend.runner import get_runner
808+
809+
self._runner = get_runner(component_defaults=None, detach_mode=detach)
786810

787811
for level in order:
788812
# Launch jobs in this level concurrently since they are independent

nemo_run/run/torchx_backend/runner.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ def schedule(self, dryrun_info: AppDryRunInfo) -> AppHandle:
112112

113113
def get_runner(
114114
component_defaults: Optional[dict[str, dict[str, str]]] = None,
115+
detach_mode: bool = False,
115116
**scheduler_params: Any,
116117
) -> Runner:
117118
"""
@@ -144,5 +145,9 @@ def get_runner(
144145
"""
145146
name = "nemo_run"
146147

148+
# Add detach_mode to scheduler_params for kubeflow scheduler
149+
if detach_mode:
150+
scheduler_params["detach_mode"] = detach_mode
151+
147152
scheduler_factories = get_scheduler_factories()
148153
return Runner(name, scheduler_factories, component_defaults, scheduler_params=scheduler_params)

nemo_run/run/torchx_backend/schedulers/kubeflow.py

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from nemo_run.core.execution.base import Executor
2828
from nemo_run.core.execution.kubeflow import KubeflowExecutor
29+
from nemo_run.core.packaging.configmap import ConfigMapPackager
2930
from nemo_run.run.torchx_backend.schedulers.api import SchedulerMixin
3031

3132
logger = logging.getLogger(__name__)
@@ -43,11 +44,13 @@ def __init__(
4344
self,
4445
session_name: str,
4546
namespace: str = "default",
47+
detach_mode: bool = False,
4648
**kwargs: Any,
4749
) -> None:
4850
self.backend = "kubeflow"
4951
self.session_name = session_name
5052
self.namespace = namespace
53+
self.detach_mode = detach_mode
5154
self._apps: dict[str, dict[str, Any]] = {}
5255

5356
def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[dict[str, Any]]:
@@ -60,23 +63,47 @@ def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[dict[str,
6063
job_config = self._appdef_to_kubeflow_config(app, cfg)
6164

6265
return AppDryRunInfo(
63-
app_id=f"kubeflow://{self.session_name}/{app.name}",
64-
app=app,
65-
request=job_config,
66-
repr=f"Kubeflow job: {app.name}",
66+
job_config,
67+
lambda _: f"Kubeflow job: {app.name}",
6768
)
6869

6970
def schedule(self, dryrun_info: AppDryRunInfo[dict[str, Any]]) -> str:
7071
"""Submit the job to Kubeflow."""
71-
app = dryrun_info.app
72-
cfg = dryrun_info.request["executor"]
72+
job_config = dryrun_info.request
73+
cfg = job_config["executor"]
7374

7475
# Create the TrainJob using KubeflowExecutor
75-
job_id = cfg.create_trainjob(app.name)
76+
# Extract the task from the app definition
77+
app = job_config["app"]
78+
task = None
79+
80+
# Try to extract task from the app roles
81+
if app.roles and len(app.roles) > 0:
82+
main_role = app.roles[0]
83+
if main_role.args:
84+
# Create a simple task object for the executor
85+
from nemo_run.config import Script
86+
87+
task = Script(inline=" ".join(main_role.args))
88+
89+
if task is None:
90+
# Create a default task if none found
91+
from nemo_run.config import Script
92+
93+
task = Script(inline="echo 'No task specified'")
94+
95+
# Stage files via ConfigMap if configured
96+
try:
97+
if isinstance(cfg.packager, ConfigMapPackager):
98+
cfg.stage_files(cfg.default_task_dir, task)
99+
except Exception as e:
100+
logger.error(f"Failed to stage files via ConfigMapPackager: {e}")
101+
102+
job_id = cfg.create_trainjob(job_config["app"].name, task)
76103

77104
# Store job info for later reference
78105
self._apps[job_id] = {
79-
"app": app,
106+
"app": job_config["app"],
80107
"executor": cfg,
81108
"job_id": job_id,
82109
"state": AppState.SUBMITTED,
@@ -103,7 +130,7 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
103130
state=app_state,
104131
num_restarts=0, # Kubeflow handles restarts internally
105132
msg=f"Kubeflow job status: {status}",
106-
structured_error_msg=None,
133+
structured_error_msg="",
107134
roles_statuses=[],
108135
)
109136
except Exception as e:
@@ -166,12 +193,10 @@ def _appdef_to_kubeflow_config(self, app: AppDef, cfg: KubeflowExecutor) -> dict
166193
# If we have a script with inline content, extract it
167194
if len(main_role.args) >= 2 and main_role.args[0] == "python":
168195
# This is a file-based execution
169-
cfg.python_file = main_role.args[1]
196+
logger.info(f"File-based execution: {main_role.args[1]}")
170197
elif len(main_role.args) >= 2 and main_role.args[0] == "-c":
171198
# This is inline script execution
172-
script_content = main_role.args[1]
173-
# For now, we'll create a temporary file or use a default
174-
cfg.python_file = "inline_script.py"
199+
logger.info("Inline script execution detected")
175200
logger.warning("Inline script execution not fully implemented yet")
176201

177202
return {
@@ -195,15 +220,39 @@ def _map_kubeflow_status_to_torchx(self, kubeflow_status: str) -> AppState:
195220
else:
196221
return AppState.UNKNOWN
197222

223+
def _validate(self, app: AppDef, scheduler: str) -> None:
224+
"""Validate the app definition for Kubeflow."""
225+
# For now, skip validation as Kubeflow handles this internally
226+
pass
227+
228+
def close(self) -> None:
229+
"""Clean up resources when the scheduler is closed."""
230+
# Cancel all running jobs unless in detach mode
231+
for app_id in list(self._apps.keys()):
232+
try:
233+
# Check if scheduler is in detach mode
234+
if self.detach_mode:
235+
logger.info(f"Skipping cleanup for job {app_id} in detach mode")
236+
continue
237+
238+
self.cancel(app_id)
239+
except Exception as e:
240+
logger.error(f"Failed to cancel job {app_id} during close: {e}")
241+
242+
# Clear the apps dictionary
243+
self._apps.clear()
244+
198245

199246
def create_scheduler(
200247
session_name: str,
201248
namespace: str = "default",
249+
detach_mode: bool = False,
202250
**kwargs: Any,
203251
) -> KubeflowScheduler:
204252
"""Create a Kubeflow scheduler instance."""
205253
return KubeflowScheduler(
206254
session_name=session_name,
207255
namespace=namespace,
256+
detach_mode=detach_mode,
208257
**kwargs,
209258
)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ skypilot = "nemo_run.run.torchx_backend.schedulers.skypilot:create_scheduler"
4949
local_persistent = "nemo_run.run.torchx_backend.schedulers.local:create_scheduler"
5050
docker_persistent = "nemo_run.run.torchx_backend.schedulers.docker:create_scheduler"
5151
dgx_cloud = "nemo_run.run.torchx_backend.schedulers.dgxcloud:create_scheduler"
52-
lepton = "nemo_run.run.torchx_backend.schedulers.lepton:create_scheduler"
5352
kubeflow = "nemo_run.run.torchx_backend.schedulers.kubeflow:create_scheduler"
53+
lepton = "nemo_run.run.torchx_backend.schedulers.lepton:create_scheduler"
5454

5555
[project.optional-dependencies]
5656
skypilot = ["skypilot[kubernetes]>=0.10.0"]

0 commit comments

Comments
 (0)