Skip to content

cp: Add SlurmRayClient for multi-node Ray cluster management on SLURM (1712) into r1.2.0#1791

Merged
abhinavg4 merged 3 commits intor1.2.0from
cherry-pick-1712-r1.2.0
Apr 17, 2026
Merged

cp: Add SlurmRayClient for multi-node Ray cluster management on SLURM (1712) into r1.2.0#1791
abhinavg4 merged 3 commits intor1.2.0from
cherry-pick-1712-r1.2.0

Conversation

@svcnvidia-nemo-ci
Copy link
Copy Markdown
Contributor

beep boop [🤖]: Hi @abhinavg4 👋,

we've cherry picked #1712 into  for you! 🚀

Please review and approve this cherry pick by your convenience!

)

* Add SlurmRayClient for multi-node Ray cluster management on SLURM

SlurmRayClient extends RayClient to handle the multi-process SLURM
job model where srun launches one Python process per node.

Key design:
- Node 0 (SLURM_NODEID=0) starts the Ray head via `ray start --head`
  and writes the GCS port to a shared file (RAY_PORT_BROADCAST_DIR or
  /tmp) so all other nodes can discover it
- Nodes 1+ wait for the port file, then run `ray start --block
  --address=<head>:<port>` as workers; start() returns immediately on
  worker nodes so user code only runs on the head
- Head waits for all allocated nodes to appear in `ray.nodes()` before
  returning (configurable via worker_connect_timeout_s)
- Graceful teardown: head calls `ray stop` and removes the port file;
  workers exit naturally once the head stops
- Port collision avoidance with _pick_free_port() using socket SO_REUSEADDR
- SIGALRM-based timeout on ray.init() to detect hung GCS connections

Parameters:
- ray_port: GCS port (default: random free port)
- ray_temp_dir: shared temp dir for Ray logs/sockets
- num_gpus / num_cpus: resource overrides passed to `ray start`
- worker_connect_timeout_s: how long to wait for all nodes to join
- gpu_enforcement_interval_s: if >0, periodically checks GPU count
  matches expected and raises if enforcement fails

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>

* Add SLURM tutorial with RayClient vs SlurmRayClient walkthrough

New files under tutorials/slurm/:
- pipeline.py: CPU-only word-count + node-tag pipeline; the only
  difference between a local run and a multi-node SLURM run is
  swapping RayClient() for SlurmRayClient()
- submit.sh: sbatch script for bare-metal / virtualenv clusters
- submit_container.sh: sbatch script using the NGC NeMo Curator
  container via Pyxis/enroot (--container-image flag)
- README.md: end-to-end guide covering local dev, bare-metal SLURM,
  container SLURM (Pyxis + Singularity/Apptainer), SlurmRayClient
  config reference, and troubleshooting

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>

* Add GPU reporting to NodeTagStage and update submit scripts

- pipeline.py: NodeTagStage now records nvidia-smi GPU info per worker
  node in a gpu_info column; summary log shows each node's hostname
  and GPU count/model
- submit.sh / submit_container.sh: add --gpus-per-node=2 default,
  print GPU inventory per node before pipeline runs, bump --num-tasks
  to 80 for better distribution across workers, add the 4 sbatch
  override examples (1/2 nodes x 2/8 GPUs) to the header comment,
  add RAY_PORT_BROADCAST_DIR note for node-local /tmp clusters

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>

* tutorials/slurm: use container+venv pattern matching PDF pipeline

Updated submit scripts to activate the local Curator virtualenv inside
the NGC container, mirroring the pattern used by submit_benchmark.sh.
This ensures the local checkout (with SlurmRayClient) is used rather
than the version bundled in the container image.

- submit.sh: use shared venv via source .venv/bin/activate
- submit_container.sh: same venv activation inside Pyxis container
- README: document container+venv approach, add SlurmRayClient import error
  troubleshooting entry

Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>

* tutorials/slurm: revert submit.sh to uv run for bare-metal nodes

The .venv python symlink resolves to /usr/bin/python3.12 which exists
inside the NGC container but not on bare-metal compute nodes. Keep the
bare-metal submit.sh using uv run which works without a system Python,
and rely on submit_container.sh (container+venv) as the primary path.

Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>

* tutorials/slurm: set RAY_PORT_BROADCAST_DIR to shared Lustre path

On this cluster /tmp is node-local, so the worker cannot read the head's
port-broadcast file from /tmp. Set RAY_PORT_BROADCAST_DIR to the shared
logs/ directory so all nodes can discover the head's GCS port.

Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>

* Address PR review: fix race conditions, port cleanup, ray.shutdown bug

- Atomic write-then-rename in _write_head_port (Lustre race condition)
- Retry on ValueError/OSError in _read_head_port (partial file read)
- Delete port file in stop() to prevent stale file on job ID wraparound
- Remove finally: ray.shutdown() in _wait_for_workers — was disconnecting
  the driver from Ray before the pipeline could run
- Move _find_ray_binary() inside contextlib.suppress in _cleanup_local_ray
- Add threading.main_thread() guard in _ray_init_with_timeout (SIGALRM)
- Simplify dead if/else in stop() — both branches were identical
- Add TestParseSlurmNodelist covering range, mixed range+list, zero-padding

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Propagate ray worker exit code to srun task status

_run_as_worker now returns the ray start --block exit code, and the
call site uses sys.exit(self._run_as_worker(...)) so that a failed
worker shows as FAILED in sacct instead of silently COMPLETED.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Fix monkeypatch target in test_expands_with_scontrol

nemo_curator.__init__ does not expose core, so the dotted string form
"nemo_curator.core.client.subprocess.run" fails attribute traversal.
Use the imported module object directly instead.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Add unit tests for SlurmRayClient helpers to improve codecov

Cover previously-untested branches:
- _head_port_file, _write_head_port, _read_head_port (roundtrip, timeout, partial-write retry)
- _run_as_worker (exit code, GPU/CPU flags)
- _cleanup_local_ray (success, error suppression)
- _expand_slurm_nodelist (CalledProcessError fallback, empty scontrol output)
- _ray_init_with_timeout (main-thread SIGALRM, non-main-thread fallback)
- _wait_for_workers (success, timeout, partial join)
- SlurmRayClient.stop with manages_cluster (port file cleanup)
- SlurmRayClient.start: RAY_ADDRESS pre-set, head multi-node, worker sys.exit

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Fix test_worker_node_calls_sys_exit: mock socket.gethostbyname

node-001 doesn't resolve in CI; patch socket.gethostbyname on
nemo_curator.core.client to return a fixed IP.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com>
Signed-off-by: NeMo Bot <nemo-bot@nvidia.com>
@svcnvidia-nemo-ci
Copy link
Copy Markdown
Contributor Author

/ok to test f061ff7

@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented Apr 10, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 10, 2026

Greptile Summary

This cherry-pick from #1712 adds SlurmRayClient, a RayClient subclass that handles multi-node Ray cluster management on SLURM by auto-detecting node roles (SLURM_NODEID), coordinating the head/worker startup via a shared port-broadcast file, and blocking worker processes until the cluster is torn down. It also adds tutorial scripts (pipeline.py, submit.sh, submit_container.sh, README.md) demonstrating the feature end-to-end.

Prior review threads have already flagged the two most significant issues (the 600 s hard-coded _read_head_port timeout not tied to worker_connect_timeout_s, and RAY_TMPDIR being exported but never consumed by SlurmRayClient). One additional usability concern remains in the tutorial submit scripts.

Confidence Score: 5/5

Safe to merge; all remaining findings are P2 style/usability suggestions that do not block correctness on the default happy path.

The core logic is well-implemented and comprehensively tested. Prior review threads already caught the two most impactful issues (hardcoded 600 s port-read timeout and the no-op RAY_TMPDIR export). The two new findings are both P2: a minor timing asymmetry in _wait_for_workers that only matters when worker_connect_timeout_s < 120 s (not the default), and a tutorial script usability gap where the logs/ directory must be pre-created before sbatch submission.

tutorials/slurm/submit.sh and submit_container.sh for the logs/ pre-creation note; nemo_curator/core/client.py _wait_for_workers for the deadline placement.

Important Files Changed

Filename Overview
nemo_curator/core/client.py Adds SlurmRayClient (head/worker role detection, port-broadcast via shared FS, worker connect timeout); well-structured but worker-join deadline starts before ray.init and _read_head_port uses a hardcoded 600 s timeout independent of worker_connect_timeout_s (flagged in prior review).
tests/core/test_slurm_ray_client.py Comprehensive unit tests covering helper functions, head/worker lifecycle branches, port-file roundtrip, timeout cases, and cleanup; good use of monkeypatching with a fake Ray module.
tutorials/slurm/submit.sh Working sbatch script; #SBATCH --output references a logs/ subdirectory that must be pre-created before submission — the in-script mkdir -p comes too late. RAY_TMPDIR is exported but not consumed by SlurmRayClient (flagged in prior review).
tutorials/slurm/submit_container.sh Container-based sbatch script with same logs/ pre-creation requirement as submit.sh; otherwise correct Pyxis/enroot pattern.
tutorials/slurm/pipeline.py Clean demo pipeline illustrating RayClient vs SlurmRayClient; CPU-only stages with correct try/finally cleanup.
tutorials/slurm/README.md Good documentation of head/worker model and environment variables, but RAY_TMPDIR table entry implies the env var is read by SlurmRayClient when it is not (prior review thread); missing note about pre-creating logs/ directory.

Sequence Diagram

sequenceDiagram
    participant SLURM as SLURM (srun)
    participant Head as Node 0 (Head)
    participant Worker as Node N (Worker)
    participant FS as Shared FS (RAY_PORT_BROADCAST_DIR)

    SLURM->>Head: launch python pipeline.py --slurm
    SLURM->>Worker: launch python pipeline.py --slurm

    Head->>Head: _cleanup_local_ray() [if cleanup_on_start]
    Worker->>Worker: _cleanup_local_ray() [if cleanup_on_start]

    Head->>Head: super().start() → ray start --head
    Head->>Head: _write_head_port(job_id)
    Head->>FS: write ray_head_port_<job_id>

    Worker->>FS: _read_head_port(job_id) [polls until file appears]
    FS-->>Worker: port number

    Worker->>Worker: _run_as_worker(head_ip:port)
    Note over Worker: ray start --block --address=head:port (blocks until cluster torn down)

    Head->>Head: _wait_for_workers() [polls ray.nodes()]
    Note over Head: ray.init → polls until all N nodes alive

    Head->>Head: Pipeline runs (returns from start())
    Head->>Head: pipeline.run() completes
    Head->>Head: ray_client.stop() → ray stop + remove port file
    Note over Worker: ray process exits (cluster torn down)
    Worker->>SLURM: sys.exit(0)
Loading

Reviews (3): Last reviewed commit: "Merge branch 'r1.2.0' into cherry-pick-1..." | Re-trigger Greptile

else:
# Worker node — read the port the head actually chose, then connect.
head_ip = socket.gethostbyname(self._slurm_nodes[0])
actual_port = self._read_head_port(slurm_job_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Worker port-read timeout not tied to worker_connect_timeout_s

_read_head_port is called here without a timeout_s argument, so it always uses its hardcoded default of 600 s, regardless of self.worker_connect_timeout_s (default 300 s). A user who sets worker_connect_timeout_s=60 expecting fast failure on both sides will find workers silently waiting up to 600 s for the port file before timing out.

Suggested change
actual_port = self._read_head_port(slurm_job_id)
actual_port = self._read_head_port(slurm_job_id, timeout_s=self.worker_connect_timeout_s)

Comment thread tutorials/slurm/submit.sh
# Shared directory for Ray port broadcast — must be visible to ALL nodes.
# On most clusters /tmp is node-local, so we use a path on the shared FS.
export RAY_PORT_BROADCAST_DIR="${CURATOR_DIR}/logs"
export RAY_TMPDIR="/tmp/ray_${SLURM_JOB_ID}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 RAY_TMPDIR export is not consumed by SlurmRayClient

SlurmRayClient (via RayClient) uses self.ray_temp_dir, which defaults to DEFAULT_RAY_TEMP_DIR (/tmp/ray). Neither _detect_slurm_resources nor any other method reads RAY_TMPDIR to set ray_temp_dir. The same pattern repeats in submit_container.sh line 75. As-is, Ray picks up the --temp-dir flag passed explicitly by the client, so this export is at best a no-op and may mislead users who expect it to control where Ray stores its files.

Consider passing the desired path as an explicit constructor argument instead:

SlurmRayClient(ray_temp_dir=f"/tmp/ray_{os.environ['SLURM_JOB_ID']}")

or remove the export RAY_TMPDIR lines if the intent was to rely on the client's default.

@abhinavg4 abhinavg4 enabled auto-merge (squash) April 16, 2026 17:37
@abhinavg4
Copy link
Copy Markdown
Contributor

/ok to test 47e9341

@abhinavg4 abhinavg4 merged commit 680de68 into r1.2.0 Apr 17, 2026
45 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants