Skip to content

Commit 86c308e

Browse files
authored
fix: retry PTY drain on empty select() to prevent data loss on macOS (#826)
Add a `DRAIN_MAX_EMPTY_POLLS = 10` constant and track consecutive empty `select()` polls, only giving up after 10 in a row (~1s worst-case). This is well within the existing `DRAIN_TIMEOUT_SECONDS` (2s) deadline and gives ample headroom for heavily-loaded CI runners. Fixes #821
1 parent 4a5d98e commit 86c308e

3 files changed

Lines changed: 142 additions & 2 deletions

File tree

.github/workflows/python-tests.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ jobs:
5353
with:
5454
fetch-depth: 0
5555

56+
- name: Free up runner disk space
57+
uses: mathio/gha-cleanup@aca3d43a05bf564f5fc69acb929a65b01797f22b # v1.1.2
58+
with:
59+
remove-browsers: true
60+
verbose: true
61+
5662
- id: uv
5763
run: echo "version=$(cat .uv-version)" >> "$GITHUB_OUTPUT"
5864
- name: Install uv

python/packages/jumpstarter/jumpstarter/exporter/hooks.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
MAX_DRAIN_BYTES = 256 * 1024
2525
DRAIN_TIMEOUT_SECONDS = 2.0
26+
DRAIN_MAX_EMPTY_POLLS = 10
2627

2728
# Module-level reference to time.monotonic so tests can patch it without
2829
# affecting the asyncio event loop (which also uses time.monotonic).
@@ -391,6 +392,7 @@ async def read_pty_output() -> None: # noqa: C901
391392
try:
392393
drain_deadline = _monotonic() + DRAIN_TIMEOUT_SECONDS
393394
drained = 0
395+
consecutive_empty = 0
394396
while drained < MAX_DRAIN_BYTES and _monotonic() < drain_deadline:
395397
# Poll for readability with a short timeout.
396398
# This avoids the race where a non-blocking read
@@ -406,8 +408,16 @@ async def read_pty_output() -> None: # noqa: C901
406408
# fd closed or invalid
407409
break
408410
if not readable:
409-
# Timed out with no data — drain is complete
410-
break
411+
# On macOS, data may not be available on the
412+
# first select() call even though the subprocess
413+
# has already written and exited. Keep retrying
414+
# until we see several consecutive empty polls,
415+
# which indicates the buffer is truly drained.
416+
consecutive_empty += 1
417+
if consecutive_empty >= DRAIN_MAX_EMPTY_POLLS:
418+
break
419+
continue
420+
consecutive_empty = 0
411421
try:
412422
chunk = os.read(parent_fd, 4096)
413423
if not chunk:

python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from jumpstarter.common import HOOK_WARNING_PREFIX, ExporterStatus
88
from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1
99
from jumpstarter.exporter.hooks import (
10+
DRAIN_MAX_EMPTY_POLLS,
1011
DRAIN_TIMEOUT_SECONDS,
1112
MAX_DRAIN_BYTES,
1213
HookExecutionError,
@@ -927,9 +928,132 @@ def flush_lines_with_drain_error(buffer, output_lines):
927928
result = await executor.execute_before_lease_hook(lease_scope)
928929
assert result is None
929930

931+
async def test_drain_retries_empty_select_then_captures_data(self, lease_scope) -> None:
932+
"""Verify that the drain retries after empty select() calls and still
933+
captures data that arrives later.
934+
935+
Patches select.select to return empty for the first N calls (where
936+
N < DRAIN_MAX_EMPTY_POLLS), then reports the fd as readable. The
937+
hook output should still be captured despite the initial empty polls.
938+
"""
939+
import select as select_mod
940+
941+
original_select = select_mod.select
942+
state = _PtyTracker()
943+
empty_count = 0
944+
empties_before_data = DRAIN_MAX_EMPTY_POLLS - 2 # e.g. 8 empties then data
945+
946+
def select_with_delayed_ready(rlist, wlist, xlist, timeout=None):
947+
nonlocal empty_count
948+
if state.eof_seen and rlist and rlist[0] == state.parent_fd:
949+
empty_count += 1
950+
if empty_count <= empties_before_data:
951+
return ([], [], []) # simulate delayed data
952+
return original_select(rlist, wlist, xlist, timeout)
953+
954+
hook_config = HookConfigV1Alpha1(
955+
before_lease=HookInstanceConfigV1Alpha1(
956+
script="echo DELAYED_DRAIN_OK", timeout=10,
957+
),
958+
)
959+
executor = HookExecutor(config=hook_config)
960+
961+
with (
962+
patch("pty.openpty", side_effect=state.tracking_openpty),
963+
patch("os.read", side_effect=state.os_read_with_drain_data),
964+
patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_delayed_ready),
965+
patch("jumpstarter.exporter.hooks.logger") as mock_logger,
966+
):
967+
result = await executor.execute_before_lease_hook(lease_scope)
968+
assert result is None
969+
info_calls = [str(call) for call in mock_logger.info.call_args_list]
970+
assert any("DELAYED_DRAIN_OK" in call for call in info_calls)
971+
972+
async def test_drain_terminates_after_max_empty_polls(self, lease_scope) -> None:
973+
"""Verify the drain loop terminates after DRAIN_MAX_EMPTY_POLLS
974+
consecutive empty select() results.
975+
976+
Patches select.select to always return empty during the drain phase.
977+
The hook should still complete (no hang) and the drain data should
978+
not appear since it's never read.
979+
"""
980+
import select as select_mod
981+
982+
original_select = select_mod.select
983+
state = _PtyTracker(return_drain_data=False)
984+
985+
def select_always_empty(rlist, wlist, xlist, timeout=None):
986+
if state.eof_seen and rlist and rlist[0] == state.parent_fd:
987+
return ([], [], []) # always empty
988+
return original_select(rlist, wlist, xlist, timeout)
989+
990+
hook_config = HookConfigV1Alpha1(
991+
before_lease=HookInstanceConfigV1Alpha1(
992+
script="echo MAX_EMPTY_TEST", timeout=10,
993+
),
994+
)
995+
executor = HookExecutor(config=hook_config)
996+
997+
with (
998+
patch("pty.openpty", side_effect=state.tracking_openpty),
999+
patch("os.read", side_effect=state.os_read_with_drain_data),
1000+
patch("jumpstarter.exporter.hooks.select.select", side_effect=select_always_empty),
1001+
patch("jumpstarter.exporter.hooks.logger") as mock_logger,
1002+
):
1003+
result = await executor.execute_before_lease_hook(lease_scope)
1004+
assert result is None
1005+
# Main loop should have captured the output before drain
1006+
info_calls = [str(call) for call in mock_logger.info.call_args_list]
1007+
assert any("MAX_EMPTY_TEST" in call for call in info_calls)
1008+
1009+
async def test_drain_empty_counter_resets_on_data(self, lease_scope) -> None:
1010+
"""Verify the consecutive empty poll counter resets when data arrives.
1011+
1012+
Simulates an empty-data-empty pattern during drain: a few empty polls,
1013+
then data becomes readable, then more empty polls. The counter should
1014+
reset after data is read, so the drain should tolerate more than
1015+
DRAIN_MAX_EMPTY_POLLS total empties as long as they are not consecutive.
1016+
"""
1017+
import select as select_mod
1018+
1019+
original_select = select_mod.select
1020+
state = _PtyTracker()
1021+
drain_select_call = 0
1022+
# Pattern: 5 empties, then ready, then 5 more empties, then ready
1023+
# Total empties (10) >= DRAIN_MAX_EMPTY_POLLS but never consecutive
1024+
pattern = [False] * 5 + [True] + [False] * 5 + [True]
1025+
1026+
def select_with_interleaved_empties(rlist, wlist, xlist, timeout=None):
1027+
nonlocal drain_select_call
1028+
if state.eof_seen and rlist and rlist[0] == state.parent_fd:
1029+
idx = drain_select_call
1030+
drain_select_call += 1
1031+
if idx < len(pattern) and not pattern[idx]:
1032+
return ([], [], [])
1033+
return original_select(rlist, wlist, xlist, timeout)
1034+
1035+
hook_config = HookConfigV1Alpha1(
1036+
before_lease=HookInstanceConfigV1Alpha1(
1037+
script="echo INTERLEAVE_TEST", timeout=10,
1038+
),
1039+
)
1040+
executor = HookExecutor(config=hook_config)
1041+
1042+
with (
1043+
patch("pty.openpty", side_effect=state.tracking_openpty),
1044+
patch("os.read", side_effect=state.os_read_with_drain_data),
1045+
patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_interleaved_empties),
1046+
patch("jumpstarter.exporter.hooks.logger") as mock_logger,
1047+
):
1048+
result = await executor.execute_before_lease_hook(lease_scope)
1049+
assert result is None
1050+
info_calls = [str(call) for call in mock_logger.info.call_args_list]
1051+
assert any("INTERLEAVE_TEST" in call for call in info_calls)
1052+
9301053
async def test_drain_constants_are_reasonable(self) -> None:
9311054
assert MAX_DRAIN_BYTES == 256 * 1024
9321055
assert DRAIN_TIMEOUT_SECONDS == 2.0
1056+
assert DRAIN_MAX_EMPTY_POLLS == 10
9331057

9341058
async def test_exec_default_is_none(self) -> None:
9351059
"""Test that the default exec is None (auto-detect)."""

0 commit comments

Comments
 (0)