Skip to content

fix: teardown tcpstore race#1244

Merged
garrett4wade merged 1 commit intoinclusionAI:mainfrom
HT-Yuan:fix/teardown-tcpstore-race
Apr 27, 2026
Merged

fix: teardown tcpstore race#1244
garrett4wade merged 1 commit intoinclusionAI:mainfrom
HT-Yuan:fix/teardown-tcpstore-race

Conversation

@HT-Yuan
Copy link
Copy Markdown
Contributor

@HT-Yuan HT-Yuan commented Apr 23, 2026

Description

Related Issue

Fixes #1245

Type of Change

  • 🐛 Bug fix
  • ✨ New feature
  • 💥 Breaking change
  • 📝 Documentation update
  • ♻️ Refactoring
  • ⚡ Performance improvement
  • ✅ Test coverage improvement

Checklist

  • I have read the Contributing Guide
  • Pre-commit hooks pass (pre-commit run --all-files)
  • Relevant tests pass; new tests added for new functionality
  • Documentation updated (if applicable; built with ./docs/build_all.sh)
  • Branch is up to date with main
  • Self-reviewed via /review-pr command
  • This PR was created by a coding agent via /create-pr
  • This PR is a breaking change

Breaking Change Details (if applicable):

Additional Context


Need help? Check the Contributing Guide or ask in
GitHub Discussions!

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a coordinated teardown mechanism to eliminate noisy TCPStore warnings during process termination. Key changes include adding a pre-destruction CPU barrier in FSDP and Megatron engines, introducing a reverse_order parameter to the scheduler's delete_workers API to ensure rank-0 is terminated last, and refactoring the local scheduler to perform worker cleanup in parallel using background threads. Feedback suggests reversing the iteration order of roles in the schedulers when reverse_order is enabled, implementing shorter timeouts for the new CPU barriers to prevent hangs on unresponsive ranks, and using a global deadline when joining cleanup threads in the local scheduler to avoid cumulative delays.

colocated_roles = list(self._colocated_roles.keys())
for r in colocated_roles:
self.delete_workers(r)
self.delete_workers(r, reverse_order=reverse_order)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When reverse_order is True, the roles themselves should also be iterated in reverse order (e.g., reversed(colocated_roles)). This ensures that roles created earlier, which typically contain the global TCPStore owner (rank-0), are processed last, maintaining the teardown guarantee across the entire cluster rather than just within each role.

roles = list(self._workers.keys())
for r in roles:
self.delete_workers(r)
self.delete_workers(r, reverse_order=reverse_order)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to colocated roles, the actual worker roles should be iterated in reverse order when reverse_order is True to ensure the role containing rank-0 is the last one to be terminated.

colocated_roles = list(self._colocated_roles.keys())
for r in colocated_roles:
self.delete_workers(r)
self.delete_workers(r, reverse_order=reverse_order)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When reverse_order is True, the roles should be iterated in reverse order to ensure the TCPStore owner (usually in the first role) is signalled last across all roles.

# harmless but produces a noisy stderr backtrace at teardown.
if getattr(self, "_cpu_group", None) is not None:
try:
dist.barrier(group=self._cpu_group)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The CPU barrier uses the default process group timeout (typically 30 minutes). If a rank has crashed or is unresponsive, this will cause all other ranks to hang for a long time during teardown. Consider using a shorter timeout for this specific synchronization point to improve robustness during failure scenarios.

# "recvValue failed" on the already-closed store.
if getattr(self, "_cpu_group", None) is not None:
try:
dist.barrier(group=self._cpu_group)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the FSDP engine, this CPU barrier uses the default process group timeout. An unresponsive rank could cause a significant hang during teardown. A shorter timeout for this barrier would be safer.

Comment on lines +1226 to +1233
join_timeout = 10.0
for t in threads:
t.join(timeout=join_timeout)
if t.is_alive():
logger.warning(
f"Cleanup thread {t.name} did not finish within "
f"{join_timeout}s; leaving it as daemon."
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Joining threads sequentially with a fixed timeout per thread can lead to significant delays if multiple workers hang (e.g., $N \times 10$ seconds). Since the cleanup threads are already running in parallel and kill_process_tree has its own internal timeout, it is more efficient to use a global deadline for joining all threads.

Suggested change
join_timeout = 10.0
for t in threads:
t.join(timeout=join_timeout)
if t.is_alive():
logger.warning(
f"Cleanup thread {t.name} did not finish within "
f"{join_timeout}s; leaving it as daemon."
)
join_deadline = time.time() + 10.0
for t in threads:
t.join(timeout=max(0, join_deadline - time.time()))
if t.is_alive():
logger.warning(
f"Cleanup thread {t.name} did not finish within "
f"the timeout; leaving it as daemon."
)

@HT-Yuan HT-Yuan changed the title [WIP] Fix/teardown tcpstore race Fix/teardown tcpstore race Apr 23, 2026
@HT-Yuan HT-Yuan changed the title Fix/teardown tcpstore race fix: teardown tcpstore race Apr 23, 2026
Copy link
Copy Markdown
Collaborator

@garrett4wade garrett4wade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HT-Yuan Thanks for the important fix! We'd have some refinement over the current code before merging.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may also want to fix @areal/experimental/engine/archon_engine.py except for fsdp and megatron

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +457 to +468
try:
self.scheduler.delete_workers(
role=self._worker_role, reverse_order=True
)
except TypeError:
# Backward-compat path for custom schedulers that have not
# been updated to accept `reverse_order`.
logger.warning(
"Scheduler.delete_workers does not accept reverse_order; "
"falling back to legacy behaviour."
)
self.scheduler.delete_workers(role=self._worker_role)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This try-except block is unnecessary. Let's go with reverse_order=True.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


if reverse_order:
workers = list(reversed(workers))
self._cleanup_workers(workers)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to separately manage the call of engine.destroy and ray.actor.exit in Ray RPC workers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already handled. RayRPCServer.destroy() calls engine.destroy() on all engines first, then ray.actor.exit_actor(). The outer _cleanup_workers uses a 3-phase protocol: (1) dispatch actor.destroy.remote() concurrently, (2) ray.wait with bounded timeout, (3) remove_placement_group. So engine teardown and actor exit are already properly sequenced.

here: Slurm tears down the entire job step atomically via
``scancel``, so per-rank ordering cannot be enforced.
"""
del reverse_order # unused, see docstring
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also support this in the slurm scheduler. cacel_jobs is similar to kill_process_tree in local scheduler. We should first call engine destroy then cancel these jobs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already implemented. SlurmScheduler.delete_workers() calls _destroy_engines_on_workers() (concurrent HTTP calls to engine.destroy() on all ranks) before cancel_jobs(). Same two-phase pattern as Local and Ray schedulers.

Problem:
During teardown, rank-0 (TCPStore server owner) could exit before peer
ranks finished their final NCCL abort, causing noisy "TCPStore.recvValue
failed" / "Broken pipe" warnings on stderr.

Root cause:
All ranks were killed simultaneously without first coordinating a
distributed barrier on the CPU (gloo) group to safely tear down NCCL
communicators and the TCPStore.

Solution:
Implement a two-phase teardown protocol:

Phase 1 - Engine destroy: call engine.destroy() on every worker
concurrently. The engine-side destroy() now executes a CPU barrier
(dist.barrier on a gloo process group) followed by
dist.destroy_process_group(), ensuring all ranks leave the NCCL
collective together.

Phase 2 - Process kill: only after the barrier completes, kill the
actual processes (Ray: remove placement groups; Slurm: scancel;
Local: process tree cleanup).

Changes:
- engine (fsdp/megatron/archon): add _cpu_group + pre-destroy barrier
- train_controller: two-phase destroy (engines first, then workers)
- scheduler/ray: _cleanup_workers with ray.wait timeout + PG removal
- scheduler/slurm: _destroy_engines_on_workers via HTTP before scancel
- scheduler/local: graceful engine teardown before SIGKILL
- scheduler_api: add reverse_order param to delete_workers interface
- tests: updated test_train_controller, added test_local_scheduler

Tested: DPO 4xH20 Ray scheduler - clean teardown, no TCPStore warnings.
@HT-Yuan HT-Yuan force-pushed the fix/teardown-tcpstore-race branch from 29b8495 to 6a717bf Compare April 24, 2026 10:11
Copy link
Copy Markdown
Collaborator

@garrett4wade garrett4wade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@garrett4wade garrett4wade merged commit 3ed3e81 into inclusionAI:main Apr 27, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] TCPStore.recvValue failed / HeartbeatMonitor warnings on clean shutdown with scheduler.type=local

2 participants