Commit 680de68
* Add SlurmRayClient for multi-node Ray cluster management on SLURM
SlurmRayClient extends RayClient to handle the multi-process SLURM
job model where srun launches one Python process per node.
Key design:
- Node 0 (SLURM_NODEID=0) starts the Ray head via `ray start --head`
and writes the GCS port to a shared file (RAY_PORT_BROADCAST_DIR or
/tmp) so all other nodes can discover it
- Nodes 1+ wait for the port file, then run `ray start --block
--address=<head>:<port>` as workers; start() returns immediately on
worker nodes so user code only runs on the head
- Head waits for all allocated nodes to appear in `ray.nodes()` before
returning (configurable via worker_connect_timeout_s)
- Graceful teardown: head calls `ray stop` and removes the port file;
workers exit naturally once the head stops
- Port collision avoidance with _pick_free_port() using socket SO_REUSEADDR
- SIGALRM-based timeout on ray.init() to detect hung GCS connections
Parameters:
- ray_port: GCS port (default: random free port)
- ray_temp_dir: shared temp dir for Ray logs/sockets
- num_gpus / num_cpus: resource overrides passed to `ray start`
- worker_connect_timeout_s: how long to wait for all nodes to join
- gpu_enforcement_interval_s: if >0, periodically checks GPU count
matches expected and raises if enforcement fails
* Add SLURM tutorial with RayClient vs SlurmRayClient walkthrough
New files under tutorials/slurm/:
- pipeline.py: CPU-only word-count + node-tag pipeline; the only
difference between a local run and a multi-node SLURM run is
swapping RayClient() for SlurmRayClient()
- submit.sh: sbatch script for bare-metal / virtualenv clusters
- submit_container.sh: sbatch script using the NGC NeMo Curator
container via Pyxis/enroot (--container-image flag)
- README.md: end-to-end guide covering local dev, bare-metal SLURM,
container SLURM (Pyxis + Singularity/Apptainer), SlurmRayClient
config reference, and troubleshooting
* Add GPU reporting to NodeTagStage and update submit scripts
- pipeline.py: NodeTagStage now records nvidia-smi GPU info per worker
node in a gpu_info column; summary log shows each node's hostname
and GPU count/model
- submit.sh / submit_container.sh: add --gpus-per-node=2 default,
print GPU inventory per node before pipeline runs, bump --num-tasks
to 80 for better distribution across workers, add the 4 sbatch
override examples (1/2 nodes x 2/8 GPUs) to the header comment,
add RAY_PORT_BROADCAST_DIR note for node-local /tmp clusters
* tutorials/slurm: use container+venv pattern matching PDF pipeline
Updated submit scripts to activate the local Curator virtualenv inside
the NGC container, mirroring the pattern used by submit_benchmark.sh.
This ensures the local checkout (with SlurmRayClient) is used rather
than the version bundled in the container image.
- submit.sh: use shared venv via source .venv/bin/activate
- submit_container.sh: same venv activation inside Pyxis container
- README: document container+venv approach, add SlurmRayClient import error
troubleshooting entry
* tutorials/slurm: revert submit.sh to uv run for bare-metal nodes
The .venv python symlink resolves to /usr/bin/python3.12 which exists
inside the NGC container but not on bare-metal compute nodes. Keep the
bare-metal submit.sh using uv run which works without a system Python,
and rely on submit_container.sh (container+venv) as the primary path.
* tutorials/slurm: set RAY_PORT_BROADCAST_DIR to shared Lustre path
On this cluster /tmp is node-local, so the worker cannot read the head's
port-broadcast file from /tmp. Set RAY_PORT_BROADCAST_DIR to the shared
logs/ directory so all nodes can discover the head's GCS port.
* Address PR review: fix race conditions, port cleanup, ray.shutdown bug
- Atomic write-then-rename in _write_head_port (Lustre race condition)
- Retry on ValueError/OSError in _read_head_port (partial file read)
- Delete port file in stop() to prevent stale file on job ID wraparound
- Remove finally: ray.shutdown() in _wait_for_workers — was disconnecting
the driver from Ray before the pipeline could run
- Move _find_ray_binary() inside contextlib.suppress in _cleanup_local_ray
- Add threading.main_thread() guard in _ray_init_with_timeout (SIGALRM)
- Simplify dead if/else in stop() — both branches were identical
- Add TestParseSlurmNodelist covering range, mixed range+list, zero-padding
* Propagate ray worker exit code to srun task status
_run_as_worker now returns the ray start --block exit code, and the
call site uses sys.exit(self._run_as_worker(...)) so that a failed
worker shows as FAILED in sacct instead of silently COMPLETED.
* Fix monkeypatch target in test_expands_with_scontrol
nemo_curator.__init__ does not expose core, so the dotted string form
"nemo_curator.core.client.subprocess.run" fails attribute traversal.
Use the imported module object directly instead.
* Add unit tests for SlurmRayClient helpers to improve codecov
Cover previously-untested branches:
- _head_port_file, _write_head_port, _read_head_port (roundtrip, timeout, partial-write retry)
- _run_as_worker (exit code, GPU/CPU flags)
- _cleanup_local_ray (success, error suppression)
- _expand_slurm_nodelist (CalledProcessError fallback, empty scontrol output)
- _ray_init_with_timeout (main-thread SIGALRM, non-main-thread fallback)
- _wait_for_workers (success, timeout, partial join)
- SlurmRayClient.stop with manages_cluster (port file cleanup)
- SlurmRayClient.start: RAY_ADDRESS pre-set, head multi-node, worker sys.exit
* Fix test_worker_node_calls_sys_exit: mock socket.gethostbyname
node-001 doesn't resolve in CI; patch socket.gethostbyname on
nemo_curator.core.client to return a fixed IP.
---------
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
Signed-off-by: NeMo Bot <nemo-bot@nvidia.com>
Co-authored-by: Abhinav Garg <abhinavg@stanford.edu>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com>
1 parent 797cefc commit 680de68
6 files changed
Lines changed: 1635 additions & 0 deletions
File tree
- nemo_curator/core
- tests/core
- tutorials/slurm
0 commit comments