cp: Add SlurmRayClient for multi-node Ray cluster management on SLURM (1712) into r1.2.0#1791
cp: Add SlurmRayClient for multi-node Ray cluster management on SLURM (1712) into r1.2.0#1791
Add SlurmRayClient for multi-node Ray cluster management on SLURM (1712) into r1.2.0#1791Conversation
) * Add SlurmRayClient for multi-node Ray cluster management on SLURM SlurmRayClient extends RayClient to handle the multi-process SLURM job model where srun launches one Python process per node. Key design: - Node 0 (SLURM_NODEID=0) starts the Ray head via `ray start --head` and writes the GCS port to a shared file (RAY_PORT_BROADCAST_DIR or /tmp) so all other nodes can discover it - Nodes 1+ wait for the port file, then run `ray start --block --address=<head>:<port>` as workers; start() returns immediately on worker nodes so user code only runs on the head - Head waits for all allocated nodes to appear in `ray.nodes()` before returning (configurable via worker_connect_timeout_s) - Graceful teardown: head calls `ray stop` and removes the port file; workers exit naturally once the head stops - Port collision avoidance with _pick_free_port() using socket SO_REUSEADDR - SIGALRM-based timeout on ray.init() to detect hung GCS connections Parameters: - ray_port: GCS port (default: random free port) - ray_temp_dir: shared temp dir for Ray logs/sockets - num_gpus / num_cpus: resource overrides passed to `ray start` - worker_connect_timeout_s: how long to wait for all nodes to join - gpu_enforcement_interval_s: if >0, periodically checks GPU count matches expected and raises if enforcement fails Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Abhinav Garg <abhgarg@nvidia.com> * Add SLURM tutorial with RayClient vs SlurmRayClient walkthrough New files under tutorials/slurm/: - pipeline.py: CPU-only word-count + node-tag pipeline; the only difference between a local run and a multi-node SLURM run is swapping RayClient() for SlurmRayClient() - submit.sh: sbatch script for bare-metal / virtualenv clusters - submit_container.sh: sbatch script using the NGC NeMo Curator container via Pyxis/enroot (--container-image flag) - README.md: end-to-end guide covering local dev, bare-metal SLURM, container SLURM (Pyxis + Singularity/Apptainer), SlurmRayClient config reference, and troubleshooting Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Abhinav Garg <abhgarg@nvidia.com> * Add GPU reporting to NodeTagStage and update submit scripts - pipeline.py: NodeTagStage now records nvidia-smi GPU info per worker node in a gpu_info column; summary log shows each node's hostname and GPU count/model - submit.sh / submit_container.sh: add --gpus-per-node=2 default, print GPU inventory per node before pipeline runs, bump --num-tasks to 80 for better distribution across workers, add the 4 sbatch override examples (1/2 nodes x 2/8 GPUs) to the header comment, add RAY_PORT_BROADCAST_DIR note for node-local /tmp clusters Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Abhinav Garg <abhgarg@nvidia.com> * tutorials/slurm: use container+venv pattern matching PDF pipeline Updated submit scripts to activate the local Curator virtualenv inside the NGC container, mirroring the pattern used by submit_benchmark.sh. This ensures the local checkout (with SlurmRayClient) is used rather than the version bundled in the container image. - submit.sh: use shared venv via source .venv/bin/activate - submit_container.sh: same venv activation inside Pyxis container - README: document container+venv approach, add SlurmRayClient import error troubleshooting entry Signed-off-by: Abhinav Garg <abhgarg@nvidia.com> * tutorials/slurm: revert submit.sh to uv run for bare-metal nodes The .venv python symlink resolves to /usr/bin/python3.12 which exists inside the NGC container but not on bare-metal compute nodes. Keep the bare-metal submit.sh using uv run which works without a system Python, and rely on submit_container.sh (container+venv) as the primary path. Signed-off-by: Abhinav Garg <abhgarg@nvidia.com> * tutorials/slurm: set RAY_PORT_BROADCAST_DIR to shared Lustre path On this cluster /tmp is node-local, so the worker cannot read the head's port-broadcast file from /tmp. Set RAY_PORT_BROADCAST_DIR to the shared logs/ directory so all nodes can discover the head's GCS port. Signed-off-by: Abhinav Garg <abhgarg@nvidia.com> * Address PR review: fix race conditions, port cleanup, ray.shutdown bug - Atomic write-then-rename in _write_head_port (Lustre race condition) - Retry on ValueError/OSError in _read_head_port (partial file read) - Delete port file in stop() to prevent stale file on job ID wraparound - Remove finally: ray.shutdown() in _wait_for_workers — was disconnecting the driver from Ray before the pipeline could run - Move _find_ray_binary() inside contextlib.suppress in _cleanup_local_ray - Add threading.main_thread() guard in _ray_init_with_timeout (SIGALRM) - Simplify dead if/else in stop() — both branches were identical - Add TestParseSlurmNodelist covering range, mixed range+list, zero-padding Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Propagate ray worker exit code to srun task status _run_as_worker now returns the ray start --block exit code, and the call site uses sys.exit(self._run_as_worker(...)) so that a failed worker shows as FAILED in sacct instead of silently COMPLETED. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix monkeypatch target in test_expands_with_scontrol nemo_curator.__init__ does not expose core, so the dotted string form "nemo_curator.core.client.subprocess.run" fails attribute traversal. Use the imported module object directly instead. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Add unit tests for SlurmRayClient helpers to improve codecov Cover previously-untested branches: - _head_port_file, _write_head_port, _read_head_port (roundtrip, timeout, partial-write retry) - _run_as_worker (exit code, GPU/CPU flags) - _cleanup_local_ray (success, error suppression) - _expand_slurm_nodelist (CalledProcessError fallback, empty scontrol output) - _ray_init_with_timeout (main-thread SIGALRM, non-main-thread fallback) - _wait_for_workers (success, timeout, partial join) - SlurmRayClient.stop with manages_cluster (port file cleanup) - SlurmRayClient.start: RAY_ADDRESS pre-set, head multi-node, worker sys.exit Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix test_worker_node_calls_sys_exit: mock socket.gethostbyname node-001 doesn't resolve in CI; patch socket.gethostbyname on nemo_curator.core.client to return a fixed IP. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Signed-off-by: Abhinav Garg <abhgarg@nvidia.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com>
|
/ok to test f061ff7 |
Greptile SummaryThis cherry-pick from #1712 adds Prior review threads have already flagged the two most significant issues (the 600 s hard-coded Confidence Score: 5/5Safe to merge; all remaining findings are P2 style/usability suggestions that do not block correctness on the default happy path. The core logic is well-implemented and comprehensively tested. Prior review threads already caught the two most impactful issues (hardcoded 600 s port-read timeout and the no-op RAY_TMPDIR export). The two new findings are both P2: a minor timing asymmetry in _wait_for_workers that only matters when worker_connect_timeout_s < 120 s (not the default), and a tutorial script usability gap where the logs/ directory must be pre-created before sbatch submission. tutorials/slurm/submit.sh and submit_container.sh for the logs/ pre-creation note; nemo_curator/core/client.py _wait_for_workers for the deadline placement. Important Files Changed
Sequence DiagramsequenceDiagram
participant SLURM as SLURM (srun)
participant Head as Node 0 (Head)
participant Worker as Node N (Worker)
participant FS as Shared FS (RAY_PORT_BROADCAST_DIR)
SLURM->>Head: launch python pipeline.py --slurm
SLURM->>Worker: launch python pipeline.py --slurm
Head->>Head: _cleanup_local_ray() [if cleanup_on_start]
Worker->>Worker: _cleanup_local_ray() [if cleanup_on_start]
Head->>Head: super().start() → ray start --head
Head->>Head: _write_head_port(job_id)
Head->>FS: write ray_head_port_<job_id>
Worker->>FS: _read_head_port(job_id) [polls until file appears]
FS-->>Worker: port number
Worker->>Worker: _run_as_worker(head_ip:port)
Note over Worker: ray start --block --address=head:port (blocks until cluster torn down)
Head->>Head: _wait_for_workers() [polls ray.nodes()]
Note over Head: ray.init → polls until all N nodes alive
Head->>Head: Pipeline runs (returns from start())
Head->>Head: pipeline.run() completes
Head->>Head: ray_client.stop() → ray stop + remove port file
Note over Worker: ray process exits (cluster torn down)
Worker->>SLURM: sys.exit(0)
Reviews (3): Last reviewed commit: "Merge branch 'r1.2.0' into cherry-pick-1..." | Re-trigger Greptile |
| else: | ||
| # Worker node — read the port the head actually chose, then connect. | ||
| head_ip = socket.gethostbyname(self._slurm_nodes[0]) | ||
| actual_port = self._read_head_port(slurm_job_id) |
There was a problem hiding this comment.
Worker port-read timeout not tied to
worker_connect_timeout_s
_read_head_port is called here without a timeout_s argument, so it always uses its hardcoded default of 600 s, regardless of self.worker_connect_timeout_s (default 300 s). A user who sets worker_connect_timeout_s=60 expecting fast failure on both sides will find workers silently waiting up to 600 s for the port file before timing out.
| actual_port = self._read_head_port(slurm_job_id) | |
| actual_port = self._read_head_port(slurm_job_id, timeout_s=self.worker_connect_timeout_s) |
| # Shared directory for Ray port broadcast — must be visible to ALL nodes. | ||
| # On most clusters /tmp is node-local, so we use a path on the shared FS. | ||
| export RAY_PORT_BROADCAST_DIR="${CURATOR_DIR}/logs" | ||
| export RAY_TMPDIR="/tmp/ray_${SLURM_JOB_ID}" |
There was a problem hiding this comment.
RAY_TMPDIR export is not consumed by SlurmRayClient
SlurmRayClient (via RayClient) uses self.ray_temp_dir, which defaults to DEFAULT_RAY_TEMP_DIR (/tmp/ray). Neither _detect_slurm_resources nor any other method reads RAY_TMPDIR to set ray_temp_dir. The same pattern repeats in submit_container.sh line 75. As-is, Ray picks up the --temp-dir flag passed explicitly by the client, so this export is at best a no-op and may mislead users who expect it to control where Ray stores its files.
Consider passing the desired path as an explicit constructor argument instead:
SlurmRayClient(ray_temp_dir=f"/tmp/ray_{os.environ['SLURM_JOB_ID']}")or remove the export RAY_TMPDIR lines if the intent was to rely on the client's default.
|
/ok to test 47e9341 |
beep boop [🤖]: Hi @abhinavg4 👋,