Skip to content

Commit 39523c4

Browse files
ko3n1gclaude
andcommitted
fix: make fetch_logs streaming resilient to flaky kubectl exits
Replace the brittle `lines_yielded > 0` and 10-minute deadline heuristics with `status()`-based termination: the retry loop now runs until the job reaches SUCCEEDED or FAILED, handling slow container pulls, mid-stream crashes, and transient network failures correctly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com>
1 parent bc629c0 commit 39523c4

2 files changed

Lines changed: 29 additions & 18 deletions

File tree

nemo_run/core/execution/kubeflow.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,9 @@ def fetch_logs(
342342
]
343343
if stream:
344344
cmd.append("-f")
345-
# Pods may not be running yet when the log thread starts. Retry
346-
# kubectl logs -f until we get output (or 10 minutes pass).
347-
deadline = time.time() + 600
348-
while time.time() < deadline:
345+
# Retry kubectl logs -f until the job reaches a terminal state.
346+
# This handles both pods not yet running and transient mid-stream failures.
347+
while True:
349348
proc = subprocess.Popen(
350349
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, bufsize=1
351350
)
@@ -362,14 +361,20 @@ def fetch_logs(
362361
yield remaining
363362
break
364363
except Exception as e:
365-
logger.error("Error streaming logs: %s", e)
366-
break
364+
logger.warning("Error streaming logs: %s; retrying", e)
367365
finally:
368366
proc.terminate()
369367
proc.wait(timeout=2)
370-
if lines_yielded > 0:
371-
break # kubectl exited after producing output — job done
372-
time.sleep(5) # no pods running yet, retry
368+
state = self.status(job_name)
369+
if state in (KubeflowJobState.SUCCEEDED, KubeflowJobState.FAILED):
370+
break # job reached a terminal state, stop streaming
371+
logger.warning(
372+
"kubectl logs exited (rc=%d, lines=%d, state=%s); retrying",
373+
proc.returncode,
374+
lines_yielded,
375+
state,
376+
)
377+
time.sleep(5)
373378
else:
374379
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
375380
yield from result.stdout.splitlines()

test/core/execution/test_kubeflow.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -588,45 +588,51 @@ def test_launch_wait_exits_on_failed(self, executor, mock_k8s_clients):
588588
_, state = executor.launch("test-job", ["echo"], wait=True, timeout=30)
589589
assert state == KubeflowJobState.FAILED
590590

591-
# ── fetch_logs streaming: retry when no lines yielded ────────────────────
591+
# ── fetch_logs streaming: retry until terminal state ─────────────────────
592592

593-
def test_fetch_logs_stream_retries_when_no_output_then_succeeds(
593+
def test_fetch_logs_stream_retries_until_terminal_state(
594594
self, executor, mock_k8s_clients
595595
):
596-
"""First Popen yields nothing; second yields a line — loop exits after output."""
596+
"""First Popen yields nothing and job is RUNNING; second yields a line and job is
597+
SUCCEEDED — loop exits on terminal status."""
597598
import io
598599

599600
empty_proc = MagicMock()
600601
empty_proc.stdout = io.StringIO("")
601602
empty_proc.poll.return_value = None
603+
empty_proc.returncode = 1
602604

603605
output_proc = MagicMock()
604606
output_proc.stdout = io.StringIO("some output\n")
605607
output_proc.poll.return_value = None
606-
607-
procs = [empty_proc, output_proc]
608+
output_proc.returncode = 0
608609

609610
with (
610-
patch("subprocess.Popen", side_effect=procs),
611+
patch("subprocess.Popen", side_effect=[empty_proc, output_proc]),
611612
patch("time.sleep"),
613+
patch.object(
614+
executor,
615+
"status",
616+
side_effect=[KubeflowJobState.RUNNING, KubeflowJobState.SUCCEEDED],
617+
),
612618
):
613619
lines = list(executor.fetch_logs("my-job", stream=True))
614620

615621
assert "some output\n" in lines
616622

617623
def test_fetch_logs_stream_handles_exception(self, executor, mock_k8s_clients):
618-
"""Exception inside the readline loop is caught; generator terminates cleanly."""
624+
"""Exception inside the readline loop is caught; loop exits when job is terminal."""
619625

620626
mock_proc = MagicMock()
621-
622627
mock_proc.stdout.readline.side_effect = OSError("read error")
623628
mock_proc.poll.return_value = None
629+
mock_proc.returncode = 1
624630

625631
with (
626632
patch("subprocess.Popen", return_value=mock_proc),
627633
patch("time.sleep"),
634+
patch.object(executor, "status", return_value=KubeflowJobState.FAILED),
628635
):
629-
# Should not raise; returns empty (error path)
630636
lines = list(executor.fetch_logs("my-job", stream=True))
631637

632638
assert lines == []

0 commit comments

Comments
 (0)