Skip to content

Commit f619e36

Browse files
snimuclaude
andauthored
fix(ckpt): master-only mkdir + barrier to avoid concurrent-mkdir race on parallel FS (#2699)
Path.mkdir(exist_ok=True) re-checks self.is_dir() after EEXIST and re-raises if it returns False. On a parallel filesystem (beegfs) with concurrent mkdir from every trainer rank, a non-master rank can hit EEXIST while its metadata cache hasn't seen the new inode yet -> is_dir() returns False -> FileExistsError, crashing the trainer mid-save (observed at weights/step_180). Guard the three all-ranks mkdir sites behind world.is_master + a barrier so only the master creates the dir and the others wait until it exists: - CheckpointManager.save_to_path: dataloader_dir - CheckpointManager.save: ckpt_path.parent - WeightCheckpointManager.save: step_path Mirrors the existing master-only guard in WeightCheckpointManager.save_to_path. Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 563b37a commit f619e36

1 file changed

Lines changed: 16 additions & 3 deletions

File tree

src/prime_rl/trainer/ckpt.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,12 @@ def save_to_path(
171171
# Checkpoint the local dataloader
172172
if dataloader is not None:
173173
dataloader_dir = path / "dataloader"
174-
dataloader_dir.mkdir(parents=True, exist_ok=True)
174+
# Only the master creates the dir; the rest wait at a barrier. On a
175+
# parallel FS (beegfs), concurrent mkdir from every rank can re-raise
176+
# FileExistsError (EEXIST + stale is_dir() metadata).
177+
if self.world.is_master:
178+
dataloader_dir.mkdir(parents=True, exist_ok=True)
179+
torch.distributed.barrier()
175180
torch.save(dataloader.state_dict(), dataloader_dir / f"rank_{self.world.rank}.pt")
176181

177182
# Save sharded state
@@ -239,7 +244,11 @@ def save(
239244
) -> None:
240245
"""Save the full checkpoint state for a specified step."""
241246
ckpt_path = self.get_ckpt_path(step)
242-
ckpt_path.parent.mkdir(parents=True, exist_ok=True)
247+
# Master-only mkdir + barrier: concurrent mkdir from every rank can
248+
# re-raise FileExistsError on a parallel FS (see save_to_path).
249+
if self.world.is_master:
250+
ckpt_path.parent.mkdir(parents=True, exist_ok=True)
251+
torch.distributed.barrier()
243252

244253
self.save_to_path(ckpt_path, model, optimizers, scheduler, progress, dataloader)
245254
bisect.insort(self.ckpt_steps, step)
@@ -390,7 +399,11 @@ def save(
390399
):
391400
"""Save a HF-compatible weight-only checkpoint for a given step."""
392401
step_path = self.get_step_path(step)
393-
step_path.mkdir(parents=True, exist_ok=True)
402+
# Master-only mkdir + barrier: concurrent mkdir from every rank can
403+
# re-raise FileExistsError on a parallel FS (EEXIST + stale is_dir()).
404+
if self.world.is_master:
405+
step_path.mkdir(parents=True, exist_ok=True)
406+
torch.distributed.barrier()
394407

395408
# Gather all weights on master rank
396409
self.logger.debug("Gathering weights on master rank for weight checkpoint")

0 commit comments

Comments
 (0)