Skip to content

Commit e7230e2

Browse files
committed
fix(engine): synchronize on CPU group before destroying NCCL PG
Before calling ``dist.destroy_process_group()``, FSDPEngine and MegatronEngine now perform a barrier on the gloo CPU subgroup. This ensures every rank finishes its final NCCL abort together instead of having rank-0 tear down the global TCPStore while non-zero ranks' HeartbeatMonitor threads are still polling it. Symptom this fixes: a noisy stderr backtrace at the very end of a successful training run, e.g. [W TCPStore.cpp] recvValue failed on SocketImpl ... no error [W ProcessGroupNCCL.cpp] ... HeartbeatMonitor::runLoop() emitted from the NCCL HeartbeatMonitor C++ thread on non-zero ranks after rank-0's TCPStore server has already been shut down. Also make ``FSDPEngine.destroy()`` idempotent by flipping ``own_global_group`` to False after tearing the group down, matching ``MegatronEngine.destroy()``. This protects against double-destroy from future cleanup hooks. The barrier is wrapped in try/except so a half-dead process group at teardown never turns a warning into a hard failure.
1 parent ae8c792 commit e7230e2

2 files changed

Lines changed: 31 additions & 0 deletions

File tree

areal/engine/fsdp_engine.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,25 @@ def destroy(self):
419419
# handles still exist and we expect another engine to
420420
# clean up these groups.
421421
if dist.is_initialized() and self.own_global_group:
422+
# Pre-destroy synchronization on a CPU (gloo) group so that all
423+
# ranks leave the NCCL collective phase together. Without this
424+
# barrier, rank-0 (which owns the TCPStore server) may exit
425+
# before peers finish their final NCCL abort, causing
426+
# HeartbeatMonitor background threads on other ranks to observe
427+
# "recvValue failed" on the already-closed store. This is
428+
# harmless but produces a noisy stderr backtrace at teardown.
429+
if getattr(self, "_cpu_group", None) is not None:
430+
try:
431+
dist.barrier(group=self._cpu_group)
432+
except Exception as e: # pragma: no cover - best-effort
433+
self.logger.warning(
434+
f"pre-destroy CPU barrier failed (ignored): {e}"
435+
)
422436
dist.destroy_process_group()
437+
# Make destroy() idempotent: if the controller calls destroy
438+
# more than once (e.g. via cleanup hooks), the second call
439+
# must not try to destroy already-destroyed groups.
440+
self.own_global_group = False
423441

424442
@property
425443
def initialized(self) -> bool:

areal/engine/megatron_engine.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,19 @@ def destroy(self):
510510
# handles still exist and we expect another engine to
511511
# clean up these groups.
512512
if dist.is_initialized() and self.own_global_group:
513+
# Pre-destroy synchronization on a CPU (gloo) group so that all
514+
# ranks leave the NCCL collective phase together. Without this
515+
# barrier, rank-0 (which owns the TCPStore server) may exit
516+
# before peers finish their final NCCL abort, causing
517+
# HeartbeatMonitor background threads on other ranks to observe
518+
# "recvValue failed" on the already-closed store.
519+
if getattr(self, "_cpu_group", None) is not None:
520+
try:
521+
dist.barrier(group=self._cpu_group)
522+
except Exception as e: # pragma: no cover - best-effort
523+
self.logger.warning(
524+
f"pre-destroy CPU barrier failed (ignored): {e}"
525+
)
513526
mpu.destroy_model_parallel()
514527
dist.destroy_process_group()
515528
self.own_global_group = False

0 commit comments

Comments
 (0)