Skip to content

Commit a8425c9

Browse files
ko3n1gclaude
andauthored
docs: update KubeflowExecutor guide and add e2e example (#496)
* docs: update KubeflowExecutor guide to TrainJob v2 only Remove the outdated PyTorchJob (v1) example and job_kind parameter references — KubeflowExecutor now only supports TrainJob (trainer.kubeflow.org/v1alpha1). Replace the two-snippet example with a single, comprehensive configuration derived from the real-world local/real_trainjob.py, covering env_list, tolerations, volumes, workdir_pvc, and image_pull_secrets. Add an advanced-options table for less-common parameters (nprocs_per_node, extra_resource_requests, pod_spec_overrides, container_kwargs, workdir_local_path). Signed-off-by: Oliver Koenig <okoenig@nvidia.com> Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com> * docs: add KubeflowExecutor e2e example and link from guide Add examples/kubeflow/hello_kubeflow.py — a self-contained script that shows the complete executor setup (Torchrun launcher, env_list, tolerations, dshm volume, PVC workdir sync) with CLI flags for namespace, image, node count, and PVC name. Update docs/guides/execution.md to link to the new example after the configuration snippet. Signed-off-by: Oliver Koenig <okoenig@nvidia.com> Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com> * docs(kubeflow): add log tailing and SIGINT/SIGTERM cancel handler to example Register SIGINT/SIGTERM handlers before job submission so Ctrl-C or pod eviction triggers executor.cancel(wait=True). Switch from run.run() to run.Experiment so tail_logs=True can be passed to exp.run(), streaming pod logs back to the terminal. Signed-off-by: Oliver Koenig <okoenig@nvidia.com> Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com> * style: apply ruff formatting to hello_kubeflow.py Signed-off-by: Oliver Koenig <okoenig@nvidia.com> Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com> --------- Signed-off-by: Oliver Koenig <okoenig@nvidia.com> Signed-off-by: oliver könig <okoenig@nvidia.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b221881 commit a8425c9

2 files changed

Lines changed: 210 additions & 27 deletions

File tree

docs/guides/execution.md

Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -296,50 +296,78 @@ For a complete end-to-end example using DGX Cloud with NeMo, refer to the [NVIDI
296296

297297
#### KubeflowExecutor
298298

299-
The `KubeflowExecutor` integrates with the [Kubeflow Training Operator](https://github.com/kubeflow/training-operator) to run distributed training jobs on any Kubernetes cluster. It submits CRDs directly via the Kubernetes API — no `kubectl` required.
300-
301-
Two job kinds are supported via the `job_kind` parameter:
302-
303-
- **`"PyTorchJob"`** (default) — Training Operator v1 (`kubeflow.org/v1`)
304-
- **`"TrainJob"`** — Training Operator v2 (`trainer.kubeflow.org/v1alpha1`)
299+
The `KubeflowExecutor` integrates with the [Kubeflow Training Operator v2](https://github.com/kubeflow/training-operator) to run distributed training jobs on any Kubernetes cluster. It submits `TrainJob` CRDs (`trainer.kubeflow.org/v1alpha1`) directly via the Kubernetes API — no `kubectl` required.
305300

306301
Kubernetes configuration is loaded automatically: local kubeconfig is tried first, falling back to in-cluster config when running inside a pod.
307302

303+
A `ClusterTrainingRuntime` named `runtime_ref` must exist in the target namespace; `"torch-distributed"` is the conventional name for PyTorch distributed workloads.
304+
308305
Here's an example configuration:
309306

310307
```python
311-
# PyTorchJob (default)
312-
executor = run.KubeflowExecutor(
313-
namespace="runai-nemo-ci",
314-
image="nvcr.io/nvidia/nemo:26.02",
315-
num_nodes=3, # total pods: 1 Master + (num_nodes-1) Workers
316-
gpus_per_node=8, # also sets nproc_per_node unless overridden explicitly
308+
import nemo_run as run
309+
from nemo_run.core.execution.kubeflow import KubeflowExecutor
310+
311+
executor = KubeflowExecutor(
312+
launcher=run.Torchrun(),
313+
runtime_ref="torch-distributed", # ClusterTrainingRuntime in your cluster
314+
namespace="my-namespace",
315+
image="nvcr.io/nvidia/nemo:25.04",
316+
num_nodes=4,
317+
gpus_per_node=8,
317318
cpu_requests="16",
318319
memory_requests="64Gi",
320+
image_pull_secrets=["ngc-registry-secret"],
321+
# Simple key=value env vars
322+
env_vars={
323+
"NCCL_DEBUG": "INFO",
324+
"PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True",
325+
},
326+
# Full env var dicts — use for secretKeyRef, fieldRef, etc.
327+
env_list=[
328+
{
329+
"name": "WANDB_API_KEY",
330+
"valueFrom": {"secretKeyRef": {"name": "my-secrets", "key": "WANDB_API_KEY"}},
331+
},
332+
],
333+
labels={"app": "my-training-job"},
334+
tolerations=[
335+
{"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Exists"},
336+
],
319337
volumes=[
320-
{"name": "model-cache", "persistentVolumeClaim": {"claimName": "data-pvc"}}
338+
{"name": "dshm", "emptyDir": {"medium": "Memory"}},
339+
{"name": "model-cache", "persistentVolumeClaim": {"claimName": "model-cache"}},
321340
],
322-
volume_mounts=[{"name": "model-cache", "mountPath": "/nemo-workspace"}],
323-
labels={"app": "nemo-ci-training"},
324-
env_vars={"NCCL_DEBUG": "INFO"},
325-
)
326-
327-
# TrainJob (Training Operator v2)
328-
executor = run.KubeflowExecutor(
329-
job_kind="TrainJob",
330-
runtime_ref="torch-distributed", # name of the ClusterTrainingRuntime
331-
namespace="runai-nemo-ci",
332-
image="nvcr.io/nvidia/nemo:26.02",
333-
num_nodes=3,
334-
gpus_per_node=8,
341+
volume_mounts=[
342+
{"name": "dshm", "mountPath": "/dev/shm"},
343+
{"name": "model-cache", "mountPath": "/nemo-workspace"},
344+
],
345+
# Sync the generated launch script to the pod before launch,
346+
# and pull results back after the job completes.
347+
workdir_pvc="model-cache",
348+
workdir_pvc_path="/nemo-workspace",
335349
)
336350
```
337351

338352
`cancel(wait=True)` polls until both the CR and all associated pods are fully terminated before returning.
339353

354+
A self-contained end-to-end example — including volume setup, secret injection, and workdir PVC sync — is available at [`examples/kubeflow/hello_kubeflow.py`](../../examples/kubeflow/hello_kubeflow.py).
355+
356+
##### Advanced options
357+
358+
| Parameter | Purpose |
359+
|-----------|---------|
360+
| `nprocs_per_node` | Override processes per node; defaults to `gpus_per_node` when unset |
361+
| `extra_resource_requests` / `extra_resource_limits` | Non-GPU extended resources, e.g. `{"vpc.amazonaws.com/efa": "32"}` for AWS EFA NICs |
362+
| `pod_spec_overrides` | Merge arbitrary fields into `podTemplateOverrides[].spec`, e.g. `{"nodeSelector": {...}}` |
363+
| `container_kwargs` | Extra container-level fields, e.g. `{"securityContext": {"privileged": True}}` |
364+
| `workdir_local_path` | Local directory merged into the job dir before PVC sync — useful for hand-written scripts not managed by the packager |
365+
| `annotations` | Kubernetes annotations added to the `TrainJob` CR |
366+
| `affinity` | Pod scheduling affinity rules |
367+
340368
##### Limitations
341369

342-
Attributes like `resourceClaims` are not [supported](https://github.com/kubeflow/trainer/issues/3264) and must be injected in different ways, like by Mutating Webhooks.
370+
Attributes like `resourceClaims` are not [supported](https://github.com/kubeflow/trainer/issues/3264) natively and must be injected via Mutating Webhooks or `pod_spec_overrides`.
343371

344372
#### LeptonExecutor
345373

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""End-to-end example: run a distributed training job via KubeflowExecutor.
17+
18+
Prerequisites
19+
-------------
20+
* Kubeflow Training Operator v2 installed in your cluster with a
21+
``ClusterTrainingRuntime`` named ``"torch-distributed"``.
22+
* A kubeconfig pointing at the target cluster (or run from inside a pod).
23+
* An image that contains your training code and all dependencies.
24+
25+
Usage
26+
-----
27+
python examples/kubeflow/hello_kubeflow.py \\
28+
--namespace my-namespace \\
29+
--image nvcr.io/nvidia/nemo:25.04 \\
30+
--pvc model-cache
31+
32+
The job runs ``nvidia-smi`` and a quick PyTorch device check on every worker,
33+
streams logs back to your terminal, and cancels cleanly on SIGINT/SIGTERM.
34+
Swap the ``inline`` script for your real training command.
35+
"""
36+
37+
import argparse
38+
import logging
39+
import signal
40+
import sys
41+
42+
import nemo_run as run
43+
from nemo_run.core.execution.kubeflow import KubeflowExecutor
44+
45+
logging.basicConfig(level=logging.INFO)
46+
log = logging.getLogger(__name__)
47+
48+
# ── CLI ───────────────────────────────────────────────────────────────────────
49+
50+
parser = argparse.ArgumentParser(description="KubeflowExecutor hello-world example")
51+
parser.add_argument("--namespace", default="default", help="Kubernetes namespace")
52+
parser.add_argument("--image", required=True, help="Container image with your training env")
53+
parser.add_argument("--num-nodes", type=int, default=2, help="Number of worker pods")
54+
parser.add_argument("--gpus-per-node", type=int, default=8, help="GPUs per pod")
55+
parser.add_argument("--pvc", default=None, help="PVC name for workdir sync (optional)")
56+
parser.add_argument(
57+
"--runtime-ref",
58+
default="torch-distributed",
59+
help="ClusterTrainingRuntime name in your cluster",
60+
)
61+
args = parser.parse_args()
62+
63+
JOB_NAME = "hello-kubeflow"
64+
65+
# ── Executor ──────────────────────────────────────────────────────────────────
66+
67+
executor = KubeflowExecutor(
68+
# Kubeflow TrainJob settings
69+
launcher=run.Torchrun(),
70+
runtime_ref=args.runtime_ref,
71+
namespace=args.namespace,
72+
image=args.image,
73+
num_nodes=args.num_nodes,
74+
gpus_per_node=args.gpus_per_node,
75+
# Resource requests — tune these to your node type
76+
cpu_requests="8",
77+
memory_requests="32Gi",
78+
# Simple key=value environment variables
79+
env_vars={
80+
"NCCL_DEBUG": "INFO",
81+
"PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True",
82+
},
83+
# Full env var dicts — use for Kubernetes secrets, field references, etc.
84+
# Example: inject a W&B API key from a Kubernetes Secret named "my-secrets"
85+
env_list=[
86+
# {
87+
# "name": "WANDB_API_KEY",
88+
# "valueFrom": {"secretKeyRef": {"name": "my-secrets", "key": "WANDB_API_KEY"}},
89+
# },
90+
],
91+
# Toleration that allows scheduling on GPU-tainted nodes
92+
tolerations=[
93+
{"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Exists"},
94+
],
95+
# Volumes: a memory-backed /dev/shm so PyTorch DataLoader workers have
96+
# enough shared memory (the default Kubernetes limit is only 64 MiB).
97+
volumes=[
98+
{"name": "dshm", "emptyDir": {"medium": "Memory"}},
99+
*(
100+
[{"name": "workdir", "persistentVolumeClaim": {"claimName": args.pvc}}]
101+
if args.pvc
102+
else []
103+
),
104+
],
105+
volume_mounts=[
106+
{"name": "dshm", "mountPath": "/dev/shm"},
107+
*([{"name": "workdir", "mountPath": "/nemo-workspace"}] if args.pvc else []),
108+
],
109+
# Sync the generated launch script to the pod via PVC before launch.
110+
# Required whenever you use a custom launcher (e.g. run.Torchrun()).
111+
workdir_pvc=args.pvc,
112+
workdir_pvc_path="/nemo-workspace",
113+
labels={"app": JOB_NAME},
114+
)
115+
116+
# ── Task ──────────────────────────────────────────────────────────────────────
117+
118+
# Replace this inline script with your real training command.
119+
script = run.Script(
120+
inline="""\
121+
nvidia-smi
122+
python - <<'PY'
123+
import os, torch
124+
rank = int(os.environ.get("RANK", 0))
125+
world = int(os.environ.get("WORLD_SIZE", 1))
126+
print(f"rank {rank}/{world} — cuda devices: {torch.cuda.device_count()}")
127+
PY
128+
"""
129+
)
130+
131+
# ── Signal handling ───────────────────────────────────────────────────────────
132+
133+
134+
# Register SIGINT / SIGTERM handlers *before* submitting so that Ctrl-C or a
135+
# pod eviction during startup still triggers a clean TrainJob deletion.
136+
# executor.cancel() deletes the TrainJob CR and polls until all pods are gone.
137+
def _cancel(signum: int, frame: object) -> None:
138+
log.info("Signal %d received — cancelling %s", signum, JOB_NAME)
139+
try:
140+
executor.cancel(JOB_NAME, wait=True)
141+
except Exception as exc:
142+
log.warning("Cancel failed: %s", exc)
143+
sys.exit(0)
144+
145+
146+
signal.signal(signal.SIGINT, _cancel)
147+
signal.signal(signal.SIGTERM, _cancel)
148+
149+
# ── Launch ────────────────────────────────────────────────────────────────────
150+
151+
# run.Experiment gives direct control over log tailing.
152+
# detach=False blocks until the job finishes; tail_logs=True streams pod logs.
153+
with run.Experiment(JOB_NAME, executor=executor) as exp:
154+
exp.add(script, name=JOB_NAME, tail_logs=False)
155+
exp.run(detach=False, tail_logs=True)

0 commit comments

Comments
 (0)