fix: teardown tcpstore race#1244
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements a coordinated teardown mechanism to eliminate noisy TCPStore warnings during process termination. Key changes include adding a pre-destruction CPU barrier in FSDP and Megatron engines, introducing a reverse_order parameter to the scheduler's delete_workers API to ensure rank-0 is terminated last, and refactoring the local scheduler to perform worker cleanup in parallel using background threads. Feedback suggests reversing the iteration order of roles in the schedulers when reverse_order is enabled, implementing shorter timeouts for the new CPU barriers to prevent hangs on unresponsive ranks, and using a global deadline when joining cleanup threads in the local scheduler to avoid cumulative delays.
| colocated_roles = list(self._colocated_roles.keys()) | ||
| for r in colocated_roles: | ||
| self.delete_workers(r) | ||
| self.delete_workers(r, reverse_order=reverse_order) |
There was a problem hiding this comment.
When reverse_order is True, the roles themselves should also be iterated in reverse order (e.g., reversed(colocated_roles)). This ensures that roles created earlier, which typically contain the global TCPStore owner (rank-0), are processed last, maintaining the teardown guarantee across the entire cluster rather than just within each role.
| roles = list(self._workers.keys()) | ||
| for r in roles: | ||
| self.delete_workers(r) | ||
| self.delete_workers(r, reverse_order=reverse_order) |
| colocated_roles = list(self._colocated_roles.keys()) | ||
| for r in colocated_roles: | ||
| self.delete_workers(r) | ||
| self.delete_workers(r, reverse_order=reverse_order) |
| # harmless but produces a noisy stderr backtrace at teardown. | ||
| if getattr(self, "_cpu_group", None) is not None: | ||
| try: | ||
| dist.barrier(group=self._cpu_group) |
There was a problem hiding this comment.
The CPU barrier uses the default process group timeout (typically 30 minutes). If a rank has crashed or is unresponsive, this will cause all other ranks to hang for a long time during teardown. Consider using a shorter timeout for this specific synchronization point to improve robustness during failure scenarios.
| # "recvValue failed" on the already-closed store. | ||
| if getattr(self, "_cpu_group", None) is not None: | ||
| try: | ||
| dist.barrier(group=self._cpu_group) |
| join_timeout = 10.0 | ||
| for t in threads: | ||
| t.join(timeout=join_timeout) | ||
| if t.is_alive(): | ||
| logger.warning( | ||
| f"Cleanup thread {t.name} did not finish within " | ||
| f"{join_timeout}s; leaving it as daemon." | ||
| ) |
There was a problem hiding this comment.
Joining threads sequentially with a fixed timeout per thread can lead to significant delays if multiple workers hang (e.g., kill_process_tree has its own internal timeout, it is more efficient to use a global deadline for joining all threads.
| join_timeout = 10.0 | |
| for t in threads: | |
| t.join(timeout=join_timeout) | |
| if t.is_alive(): | |
| logger.warning( | |
| f"Cleanup thread {t.name} did not finish within " | |
| f"{join_timeout}s; leaving it as daemon." | |
| ) | |
| join_deadline = time.time() + 10.0 | |
| for t in threads: | |
| t.join(timeout=max(0, join_deadline - time.time())) | |
| if t.is_alive(): | |
| logger.warning( | |
| f"Cleanup thread {t.name} did not finish within " | |
| f"the timeout; leaving it as daemon." | |
| ) |
garrett4wade
left a comment
There was a problem hiding this comment.
@HT-Yuan Thanks for the important fix! We'd have some refinement over the current code before merging.
There was a problem hiding this comment.
We may also want to fix @areal/experimental/engine/archon_engine.py except for fsdp and megatron
| try: | ||
| self.scheduler.delete_workers( | ||
| role=self._worker_role, reverse_order=True | ||
| ) | ||
| except TypeError: | ||
| # Backward-compat path for custom schedulers that have not | ||
| # been updated to accept `reverse_order`. | ||
| logger.warning( | ||
| "Scheduler.delete_workers does not accept reverse_order; " | ||
| "falling back to legacy behaviour." | ||
| ) | ||
| self.scheduler.delete_workers(role=self._worker_role) |
There was a problem hiding this comment.
This try-except block is unnecessary. Let's go with reverse_order=True.
|
|
||
| if reverse_order: | ||
| workers = list(reversed(workers)) | ||
| self._cleanup_workers(workers) |
There was a problem hiding this comment.
Do we also need to separately manage the call of engine.destroy and ray.actor.exit in Ray RPC workers?
There was a problem hiding this comment.
Already handled. RayRPCServer.destroy() calls engine.destroy() on all engines first, then ray.actor.exit_actor(). The outer _cleanup_workers uses a 3-phase protocol: (1) dispatch actor.destroy.remote() concurrently, (2) ray.wait with bounded timeout, (3) remove_placement_group. So engine teardown and actor exit are already properly sequenced.
| here: Slurm tears down the entire job step atomically via | ||
| ``scancel``, so per-rank ordering cannot be enforced. | ||
| """ | ||
| del reverse_order # unused, see docstring |
There was a problem hiding this comment.
We can also support this in the slurm scheduler. cacel_jobs is similar to kill_process_tree in local scheduler. We should first call engine destroy then cancel these jobs.
There was a problem hiding this comment.
Already implemented. SlurmScheduler.delete_workers() calls _destroy_engines_on_workers() (concurrent HTTP calls to engine.destroy() on all ranks) before cancel_jobs(). Same two-phase pattern as Local and Ray schedulers.
Problem: During teardown, rank-0 (TCPStore server owner) could exit before peer ranks finished their final NCCL abort, causing noisy "TCPStore.recvValue failed" / "Broken pipe" warnings on stderr. Root cause: All ranks were killed simultaneously without first coordinating a distributed barrier on the CPU (gloo) group to safely tear down NCCL communicators and the TCPStore. Solution: Implement a two-phase teardown protocol: Phase 1 - Engine destroy: call engine.destroy() on every worker concurrently. The engine-side destroy() now executes a CPU barrier (dist.barrier on a gloo process group) followed by dist.destroy_process_group(), ensuring all ranks leave the NCCL collective together. Phase 2 - Process kill: only after the barrier completes, kill the actual processes (Ray: remove placement groups; Slurm: scancel; Local: process tree cleanup). Changes: - engine (fsdp/megatron/archon): add _cpu_group + pre-destroy barrier - train_controller: two-phase destroy (engines first, then workers) - scheduler/ray: _cleanup_workers with ray.wait timeout + PG removal - scheduler/slurm: _destroy_engines_on_workers via HTTP before scancel - scheduler/local: graceful engine teardown before SIGKILL - scheduler_api: add reverse_order param to delete_workers interface - tests: updated test_train_controller, added test_local_scheduler Tested: DPO 4xH20 Ray scheduler - clean teardown, no TCPStore warnings.
29b8495 to
6a717bf
Compare
Description
Related Issue
Fixes #1245
Type of Change
Checklist
pre-commit run --all-files)./docs/build_all.sh)main/review-prcommand/create-prBreaking Change Details (if applicable):
Additional Context
Need help? Check the Contributing Guide or ask in
GitHub Discussions!