Skip to content

Commit 435b65c

Browse files
ko3n1gclaude
andcommitted
docs: add KubeflowExecutor guide, PyTorchJob and TrainJob e2e examples
- docs/guides/execution.md: new KubeflowExecutor section showing both PyTorchJob and TrainJob configurations; packager support table updated - local/example.py: PyTorchJob e2e — launch(wait=True), log sentinel verification, cancel(wait=True); tested against AWS EKS cluster - local/example_trainjob.py: TrainJob e2e with full pod config — volumes, tolerations, affinity, imagePullSecrets, secretKeyRef env vars, and resourceClaims via pod_spec_overrides; targets GKE cluster (not yet ready) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 3decd46 commit 435b65c

3 files changed

Lines changed: 221 additions & 0 deletions

File tree

docs/guides/execution.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ The packager support matrix is described below:
5353
| SkypilotExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager |
5454
| DGXCloudExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager |
5555
| LeptonExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager |
56+
| KubeflowExecutor | run.Packager |
5657

5758
`run.Packager` is a passthrough base packager.
5859

@@ -293,6 +294,49 @@ def your_dgx_executor(nodes: int, gpus_per_node: int, container_image: str):
293294

294295
For a complete end-to-end example using DGX Cloud with NeMo, refer to the [NVIDIA DGX Cloud NeMo End-to-End Workflow Example](https://docs.nvidia.com/dgx-cloud/run-ai/latest/nemo-e2e-example.html).
295296

297+
#### KubeflowExecutor
298+
299+
The `KubeflowExecutor` integrates with the [Kubeflow Training Operator](https://github.com/kubeflow/training-operator) to run distributed training jobs on any Kubernetes cluster. It submits CRDs directly via the Kubernetes API — no `kubectl` required.
300+
301+
Two job kinds are supported via the `job_kind` parameter:
302+
303+
- **`"PyTorchJob"`** (default) — Training Operator v1 (`kubeflow.org/v1`)
304+
- **`"TrainJob"`** — Training Operator v2 (`trainer.kubeflow.org/v1alpha1`)
305+
306+
Kubernetes configuration is loaded automatically: local kubeconfig is tried first, falling back to in-cluster config when running inside a pod.
307+
308+
Here's an example configuration:
309+
310+
```python
311+
# PyTorchJob (default)
312+
executor = run.KubeflowExecutor(
313+
namespace="runai-nemo-ci",
314+
image="nvcr.io/nvidian/nemo:nightly",
315+
num_nodes=3, # total pods: 1 Master + (num_nodes-1) Workers
316+
gpus_per_node=8, # also sets nproc_per_node unless overridden explicitly
317+
cpu_requests="16",
318+
memory_requests="64Gi",
319+
volumes=[
320+
{"name": "model-cache", "persistentVolumeClaim": {"claimName": "nemo-ci-datasets-project-nkf5l"}}
321+
],
322+
volume_mounts=[{"name": "model-cache", "mountPath": "/nemo-workspace"}],
323+
labels={"app": "nemo-ci-training"},
324+
env_vars={"NCCL_DEBUG": "INFO"},
325+
)
326+
327+
# TrainJob (Training Operator v2)
328+
executor = run.KubeflowExecutor(
329+
job_kind="TrainJob",
330+
runtime_ref="torch-distributed", # name of the ClusterTrainingRuntime
331+
namespace="runai-nemo-ci",
332+
image="nvcr.io/nvidian/nemo:nightly",
333+
num_nodes=3,
334+
gpus_per_node=8,
335+
)
336+
```
337+
338+
`cancel(wait=True)` polls until both the CR and all associated pods are fully terminated before returning.
339+
296340
#### LeptonExecutor
297341

298342
The `LeptonExecutor` integrates with an NVIDIA DGX Cloud Lepton cluster's Python SDK to launch distributed jobs. It uses API calls behind the Lepton SDK to authenticate, identify the target node group and resource shapes, and submit the job specification which will be launched as a batch job on the cluster.

local/example.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import time
2+
3+
from nemo_run.core.execution.kubeflow import KubeflowExecutor
4+
5+
EXPECTED_LOG_CONTENT = "NEMO_TEST_OK"
6+
7+
e = KubeflowExecutor(
8+
namespace="runai-nemo-ci",
9+
image="nvcr.io/nvidian/nemo:nightly",
10+
num_nodes=3,
11+
gpus_per_node=8, # also sets nproc_per_node when not explicitly provided
12+
cpu_requests="16",
13+
memory_requests="64Gi",
14+
volumes=[
15+
{
16+
"name": "model-cache",
17+
"persistentVolumeClaim": {"claimName": "nemo-ci-datasets-project-nkf5l"},
18+
}
19+
],
20+
volume_mounts=[{"name": "model-cache", "mountPath": "/nemo-workspace"}],
21+
labels={"app": "nemo-ci-training"},
22+
)
23+
24+
# Script: print the sentinel, then sleep so we can read logs and cancel cleanly
25+
cmd = [
26+
"/bin/bash",
27+
"-c",
28+
f"echo 'print(\"{EXPECTED_LOG_CONTENT}\"); import time; time.sleep(300)' > /tmp/test.py && "
29+
"torchrun --nnodes=$PET_NNODES --nproc_per_node=$PET_NPROC_PER_NODE "
30+
"--node_rank=$RANK --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT /tmp/test.py",
31+
]
32+
33+
# ── Launch and wait until RUNNING ────────────────────────────────────────────
34+
job_name, state = e.launch("nemo-ci-training", cmd, wait=True, timeout=300)
35+
print(f"Launched: {job_name}, state: {state}")
36+
37+
# ── Fetch logs and verify sentinel ────────────────────────────────────────────
38+
print("Polling logs until sentinel appears (up to 2 min)...")
39+
logs = []
40+
deadline = time.time() + 120
41+
while time.time() < deadline:
42+
logs = list(e.fetch_logs(job_name, stream=False, lines=50))
43+
if any(EXPECTED_LOG_CONTENT in line for line in logs):
44+
break
45+
print(f" waiting for sentinel ({len(logs)} lines so far)...")
46+
time.sleep(5)
47+
48+
print(f" received {len(logs)} lines")
49+
for line in logs[:5]:
50+
print(f" | {line}")
51+
52+
assert any(EXPECTED_LOG_CONTENT in line for line in logs), (
53+
f"Expected '{EXPECTED_LOG_CONTENT}' not found in logs.\nGot: {logs}"
54+
)
55+
print(f"✓ Log sentinel '{EXPECTED_LOG_CONTENT}' verified")
56+
57+
# ── Cancel and wait for full cleanup ─────────────────────────────────────────
58+
print("Cancelling job and waiting for cleanup...")
59+
cleaned = e.cancel(job_name, wait=True, timeout=120)
60+
assert cleaned, "Cleanup failed — pods or CR still present after timeout"
61+
print("Full cycle complete without kubectl")

local/example_trainjob.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import time
2+
3+
from nemo_run.core.execution.kubeflow import KubeflowExecutor
4+
5+
EXPECTED_LOG_CONTENT = "NEMO_TEST_OK"
6+
7+
e = KubeflowExecutor(
8+
job_kind="TrainJob",
9+
runtime_ref="torch-distributed",
10+
namespace="nemo-ci-workloads",
11+
image="nvcr.io/nvidian/nemo:nightly",
12+
num_nodes=3,
13+
gpus_per_node=8, # also sets nproc_per_node when not explicitly provided
14+
cpu_requests="16",
15+
memory_requests="64Gi",
16+
volumes=[
17+
{
18+
"name": "model-cache",
19+
"persistentVolumeClaim": {"claimName": "model-cache"},
20+
}
21+
],
22+
volume_mounts=[{"name": "model-cache", "mountPath": "/nemo-workspace"}],
23+
image_pull_secrets=["ngc-registry-secret"],
24+
tolerations=[
25+
{"effect": "NoExecute", "key": "dedicated", "operator": "Equal", "value": "user-workload"},
26+
{"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Equal", "value": "present"},
27+
{
28+
"effect": "NoSchedule",
29+
"key": "kubernetes.io/arch",
30+
"operator": "Equal",
31+
"value": "arm64",
32+
},
33+
{"effect": "NoSchedule", "key": "team", "operator": "Equal", "value": "nemo-ci"},
34+
{"effect": "NoSchedule", "key": "nvidia.com/gpu", "operator": "Exists"},
35+
],
36+
affinity={
37+
"nodeAffinity": {
38+
"requiredDuringSchedulingIgnoredDuringExecution": {
39+
"nodeSelectorTerms": [
40+
{
41+
"matchExpressions": [
42+
{
43+
"key": "cloud.google.com/gke-nodepool",
44+
"operator": "In",
45+
"values": ["customer-gpu-9pb", "customer-gpu-mh2"],
46+
}
47+
]
48+
}
49+
]
50+
}
51+
}
52+
},
53+
env_vars={
54+
"NVIDIA_VISIBLE_DEVICES": "all",
55+
"NVIDIA_DRIVER_CAPABILITIES": "compute,utility",
56+
"NCCL_DEBUG": "INFO",
57+
},
58+
env_list=[
59+
{
60+
"name": "WANDB_API_KEY",
61+
"valueFrom": {"secretKeyRef": {"name": "nemo-ci-secrets", "key": "WANDB_API_KEY"}},
62+
},
63+
{
64+
"name": "HF_TOKEN",
65+
"valueFrom": {"secretKeyRef": {"name": "nemo-ci-secrets", "key": "HF_TOKEN"}},
66+
},
67+
],
68+
pod_spec_overrides={
69+
"resourceClaims": [
70+
{
71+
"name": "imex-channel",
72+
"resourceClaimTemplateName": "nemo-ci-compute-domain-channel",
73+
}
74+
]
75+
},
76+
labels={"app": "nemo-ci-training"},
77+
)
78+
79+
# Script: print the sentinel, then sleep so we can read logs and cancel cleanly
80+
cmd = [
81+
"/bin/bash",
82+
"-c",
83+
f"echo 'print(\"{EXPECTED_LOG_CONTENT}\"); import time; time.sleep(300)' > /tmp/test.py && "
84+
"torchrun --nnodes=$PET_NNODES --nproc_per_node=$PET_NPROC_PER_NODE "
85+
"--node_rank=$RANK --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT /tmp/test.py",
86+
]
87+
88+
# ── Launch and wait until RUNNING ────────────────────────────────────────────
89+
job_name, state = e.launch("nemo-ci-training", cmd, wait=True, timeout=300)
90+
print(f"Launched: {job_name}, state: {state}")
91+
92+
# ── Fetch logs and verify sentinel ────────────────────────────────────────────
93+
print("Polling logs until sentinel appears (up to 2 min)...")
94+
logs = []
95+
deadline = time.time() + 120
96+
while time.time() < deadline:
97+
logs = list(e.fetch_logs(job_name, stream=False, lines=50))
98+
if any(EXPECTED_LOG_CONTENT in line for line in logs):
99+
break
100+
print(f" waiting for sentinel ({len(logs)} lines so far)...")
101+
time.sleep(5)
102+
103+
print(f" received {len(logs)} lines")
104+
for line in logs[:5]:
105+
print(f" | {line}")
106+
107+
assert any(EXPECTED_LOG_CONTENT in line for line in logs), (
108+
f"Expected '{EXPECTED_LOG_CONTENT}' not found in logs.\nGot: {logs}"
109+
)
110+
print(f"✓ Log sentinel '{EXPECTED_LOG_CONTENT}' verified")
111+
112+
# ── Cancel and wait for full cleanup ─────────────────────────────────────────
113+
print("Cancelling job and waiting for cleanup...")
114+
cleaned = e.cancel(job_name, wait=True, timeout=120)
115+
assert cleaned, "Cleanup failed — pods or CR still present after timeout"
116+
print("Full cycle complete without kubectl")

0 commit comments

Comments
 (0)