Skip to content

Commit c23cecf

Browse files
committed
fix(kubeflow): resolve rank-0 and last rank before forwarding logs
On first attach the GROUP_RANK pod map is empty until the torchrun workers finish rendezvous, so _forward_to_stdout fell back to rank-0-only and the last rank's early per-step loss/throughput lines (replayed via --tail=-1) were written to log-allranks but never forwarded to stdout — the CI log silently dropped the beginning of the run until a re-attach ~120s later, by which point --since-time skips the replayed history. Poll on the first attach until both rank 0 and the last rank resolve before forwarding, capped at 600s (then fall back). The wait is gated on a non-empty pod list, so it is a no-op when pods can't be listed (no kubectl / unit tests) and engages only for real runs. Signed-off-by: oliver könig <okoenig@nvidia.com>
1 parent 981e6f9 commit c23cecf

1 file changed

Lines changed: 30 additions & 0 deletions

File tree

nemo_run/core/execution/kubeflow.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,10 +609,40 @@ def _forward_to_stdout(log_line: str, pod_index: dict[str, int]) -> bool:
609609
# older than REORDER_HOLD_S — long enough to absorb cross-node clock
610610
# skew + flush jitter, short enough to keep the console near-live.
611611
reorder_hold_s = 2.0
612+
# First attach: resolve BOTH rank 0 and the last rank before forwarding
613+
# any line. GROUP_RANK is only readable once the torchrun workers have
614+
# rendezvoused, so the map is empty at first and _forward_to_stdout would
615+
# fall back to rank-0-only — the last rank's early per-step loss lines
616+
# (replayed via --tail=-1) would land in log-allranks but never reach
617+
# stdout, silently dropping the beginning of the run from the CI log.
618+
# Poll until both are resolved, capped so a run that never exposes
619+
# GROUP_RANK still streams (with the completion-index fallback).
620+
rank_resolve_timeout_s = 600.0
621+
rank_resolve_poll_s = 5.0
612622
since_time: Optional[str] = None
613623
while True:
614624
pod_index = _pod_index_map()
615625
_ensure_group_ranks(set(pod_index))
626+
if since_time is None:
627+
# First attach: wait until BOTH rank 0 and the last rank are
628+
# resolved before forwarding, so the last rank's early per-step
629+
# lines (replayed via --tail=-1) reach stdout instead of only
630+
# log-allranks. Only wait while pods are actually listable; an
631+
# empty list (no kubectl / unit tests) skips the wait and streams
632+
# with the existing completion-index fallback.
633+
resolve_deadline = time.time() + rank_resolve_timeout_s
634+
while pod_index and not {0, last_group_rank} <= set(group_rank_map.values()):
635+
if time.time() >= resolve_deadline:
636+
logger.warning(
637+
"rank 0 / last rank (%d) not both resolved within %.0fs; "
638+
"forwarding with completion-index fallback",
639+
last_group_rank,
640+
rank_resolve_timeout_s,
641+
)
642+
break
643+
time.sleep(rank_resolve_poll_s)
644+
pod_index = _pod_index_map()
645+
_ensure_group_ranks(set(pod_index))
616646
attempt_cmd = base_cmd + ["--timestamps", "-f"]
617647
# First attach replays history (--tail=-1); reconnects resume from
618648
# the last seen timestamp so re-attaching never re-emits old lines.

0 commit comments

Comments
 (0)