Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/python-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ jobs:
with:
fetch-depth: 0

- name: Free up runner disk space
uses: mathio/gha-cleanup@aca3d43a05bf564f5fc69acb929a65b01797f22b # v1.1.2
with:
remove-browsers: true
verbose: true

- id: uv
run: echo "version=$(cat .uv-version)" >> "$GITHUB_OUTPUT"
- name: Install uv
Expand Down
14 changes: 12 additions & 2 deletions python/packages/jumpstarter/jumpstarter/exporter/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

MAX_DRAIN_BYTES = 256 * 1024
DRAIN_TIMEOUT_SECONDS = 2.0
DRAIN_MAX_EMPTY_POLLS = 10

# Module-level reference to time.monotonic so tests can patch it without
# affecting the asyncio event loop (which also uses time.monotonic).
Expand Down Expand Up @@ -390,6 +391,7 @@ async def read_pty_output() -> None: # noqa: C901
try:
drain_deadline = _monotonic() + DRAIN_TIMEOUT_SECONDS
drained = 0
consecutive_empty = 0
while drained < MAX_DRAIN_BYTES and _monotonic() < drain_deadline:
# Poll for readability with a short timeout.
# This avoids the race where a non-blocking read
Expand All @@ -405,8 +407,16 @@ async def read_pty_output() -> None: # noqa: C901
# fd closed or invalid
break
if not readable:
# Timed out with no data — drain is complete
break
# On macOS, data may not be available on the
# first select() call even though the subprocess
# has already written and exited. Keep retrying
# until we see several consecutive empty polls,
# which indicates the buffer is truly drained.
consecutive_empty += 1
if consecutive_empty >= DRAIN_MAX_EMPTY_POLLS:
break
continue
consecutive_empty = 0
try:
chunk = os.read(parent_fd, 4096)
if not chunk:
Expand Down
124 changes: 124 additions & 0 deletions python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from jumpstarter.common import HOOK_WARNING_PREFIX, ExporterStatus
from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1
from jumpstarter.exporter.hooks import (
DRAIN_MAX_EMPTY_POLLS,
DRAIN_TIMEOUT_SECONDS,
MAX_DRAIN_BYTES,
HookExecutionError,
Expand Down Expand Up @@ -901,9 +902,132 @@ def flush_lines_with_drain_error(buffer, output_lines):
result = await executor.execute_before_lease_hook(lease_scope)
assert result is None

async def test_drain_retries_empty_select_then_captures_data(self, lease_scope) -> None:
"""Verify that the drain retries after empty select() calls and still
captures data that arrives later.

Patches select.select to return empty for the first N calls (where
N < DRAIN_MAX_EMPTY_POLLS), then reports the fd as readable. The
hook output should still be captured despite the initial empty polls.
"""
import select as select_mod

original_select = select_mod.select
state = _PtyTracker()
empty_count = 0
empties_before_data = DRAIN_MAX_EMPTY_POLLS - 2 # e.g. 8 empties then data

def select_with_delayed_ready(rlist, wlist, xlist, timeout=None):
nonlocal empty_count
if state.eof_seen and rlist and rlist[0] == state.parent_fd:
empty_count += 1
if empty_count <= empties_before_data:
return ([], [], []) # simulate delayed data
return original_select(rlist, wlist, xlist, timeout)

hook_config = HookConfigV1Alpha1(
before_lease=HookInstanceConfigV1Alpha1(
script="echo DELAYED_DRAIN_OK", timeout=10,
),
)
executor = HookExecutor(config=hook_config)

with (
patch("pty.openpty", side_effect=state.tracking_openpty),
patch("os.read", side_effect=state.os_read_with_drain_data),
patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_delayed_ready),
patch("jumpstarter.exporter.hooks.logger") as mock_logger,
):
result = await executor.execute_before_lease_hook(lease_scope)
assert result is None
info_calls = [str(call) for call in mock_logger.info.call_args_list]
assert any("DELAYED_DRAIN_OK" in call for call in info_calls)

async def test_drain_terminates_after_max_empty_polls(self, lease_scope) -> None:
"""Verify the drain loop terminates after DRAIN_MAX_EMPTY_POLLS
consecutive empty select() results.

Patches select.select to always return empty during the drain phase.
The hook should still complete (no hang) and the drain data should
not appear since it's never read.
"""
import select as select_mod

original_select = select_mod.select
state = _PtyTracker(return_drain_data=False)

def select_always_empty(rlist, wlist, xlist, timeout=None):
if state.eof_seen and rlist and rlist[0] == state.parent_fd:
return ([], [], []) # always empty
return original_select(rlist, wlist, xlist, timeout)

hook_config = HookConfigV1Alpha1(
before_lease=HookInstanceConfigV1Alpha1(
script="echo MAX_EMPTY_TEST", timeout=10,
),
)
executor = HookExecutor(config=hook_config)

with (
patch("pty.openpty", side_effect=state.tracking_openpty),
patch("os.read", side_effect=state.os_read_with_drain_data),
patch("jumpstarter.exporter.hooks.select.select", side_effect=select_always_empty),
patch("jumpstarter.exporter.hooks.logger") as mock_logger,
):
result = await executor.execute_before_lease_hook(lease_scope)
assert result is None
# Main loop should have captured the output before drain
info_calls = [str(call) for call in mock_logger.info.call_args_list]
assert any("MAX_EMPTY_TEST" in call for call in info_calls)

async def test_drain_empty_counter_resets_on_data(self, lease_scope) -> None:
"""Verify the consecutive empty poll counter resets when data arrives.

Simulates an empty-data-empty pattern during drain: a few empty polls,
then data becomes readable, then more empty polls. The counter should
reset after data is read, so the drain should tolerate more than
DRAIN_MAX_EMPTY_POLLS total empties as long as they are not consecutive.
"""
import select as select_mod

original_select = select_mod.select
state = _PtyTracker()
drain_select_call = 0
# Pattern: 5 empties, then ready, then 5 more empties, then ready
# Total empties (10) >= DRAIN_MAX_EMPTY_POLLS but never consecutive
pattern = [False] * 5 + [True] + [False] * 5 + [True]

def select_with_interleaved_empties(rlist, wlist, xlist, timeout=None):
nonlocal drain_select_call
if state.eof_seen and rlist and rlist[0] == state.parent_fd:
idx = drain_select_call
drain_select_call += 1
if idx < len(pattern) and not pattern[idx]:
return ([], [], [])
return original_select(rlist, wlist, xlist, timeout)

hook_config = HookConfigV1Alpha1(
before_lease=HookInstanceConfigV1Alpha1(
script="echo INTERLEAVE_TEST", timeout=10,
),
)
executor = HookExecutor(config=hook_config)

with (
patch("pty.openpty", side_effect=state.tracking_openpty),
patch("os.read", side_effect=state.os_read_with_drain_data),
patch("jumpstarter.exporter.hooks.select.select", side_effect=select_with_interleaved_empties),
patch("jumpstarter.exporter.hooks.logger") as mock_logger,
):
result = await executor.execute_before_lease_hook(lease_scope)
assert result is None
info_calls = [str(call) for call in mock_logger.info.call_args_list]
assert any("INTERLEAVE_TEST" in call for call in info_calls)

async def test_drain_constants_are_reasonable(self) -> None:
assert MAX_DRAIN_BYTES == 256 * 1024
assert DRAIN_TIMEOUT_SECONDS == 2.0
Comment thread
raballew marked this conversation as resolved.
assert DRAIN_MAX_EMPTY_POLLS == 10

async def test_exec_default_is_none(self) -> None:
"""Test that the default exec is None (auto-detect)."""
Expand Down
Loading