Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions e2e/test/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() {
startHooksExporter("exporter-hooks-before-fail-endLease.yaml")

out, err := Jmp("shell", "--client", "test-client-hooks",
"--retry-timeout", "0",
"--selector", "example.com/board=hooks", "j", "power", "on")
Expect(err).To(HaveOccurred())
Expect(out).To(MatchRegexp(`(beforeLease hook fail|Exporter shutting down|Connection to exporter lost)`))
Expand All @@ -171,6 +172,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() {

// First lease: shell should fail because beforeLease hook fails
out, err := Jmp("shell", "--client", "test-client-hooks",
"--retry-timeout", "0",
"--selector", "example.com/board=hooks", "j", "power", "on")
Expect(err).To(HaveOccurred())
Expect(out).To(MatchRegexp(`(beforeLease hook fail|Connection to exporter lost)`))
Expand All @@ -181,6 +183,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() {
// Second lease: should also fail (hook still configured to fail),
// but this proves the exporter accepted a new lease after recovery
out2, err2 := Jmp("shell", "--client", "test-client-hooks",
"--retry-timeout", "0",
"--selector", "example.com/board=hooks", "j", "power", "on")
Expect(err2).To(HaveOccurred())
Expect(out2).To(MatchRegexp(`(beforeLease hook fail|Connection to exporter lost)`))
Expand All @@ -193,6 +196,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() {
startHooksExporter("exporter-hooks-before-fail-endLease-with-after.yaml")

out, err := Jmp("shell", "--client", "test-client-hooks",
"--retry-timeout", "0",
"--selector", "example.com/board=hooks", "j", "power", "on")
Expect(err).To(HaveOccurred())
Expect(out).NotTo(ContainSubstring("AFTER_SHOULD_NOT_RUN"))
Expand All @@ -205,6 +209,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() {
startHooksExporterSingle("exporter-hooks-before-fail-exit.yaml")

out, err := Jmp("shell", "--client", "test-client-hooks",
"--retry-timeout", "0",
"--selector", "example.com/board=hooks", "j", "power", "on")
Expect(err).To(HaveOccurred())
Expect(out).To(MatchRegexp(`(beforeLease hook fail|Exporter shutting down|Connection to exporter lost)`))
Expand Down Expand Up @@ -239,6 +244,7 @@ var _ = Describe("Hooks E2E Tests", Label("hooks"), Ordered, func() {

// Shell may succeed or fail; the key is the exporter exits
_, _ = Jmp("shell", "--client", "test-client-hooks",
"--retry-timeout", "0",
"--selector", "example.com/board=hooks", "j", "power", "on")

Eventually(func() bool {
Expand Down Expand Up @@ -360,6 +366,7 @@ print("PYTHON_HOOK: complete")
startHooksExporter("exporter-hooks-none.yaml")

out, err := RunCmd("timeout", "60", "jmp", "shell",
"--retry-timeout", "75",
"--client", "test-client-hooks",
"--selector", "example.com/board=hooks",
"--duration", "10s", "--", "sleep", "30")
Expand All @@ -372,6 +379,7 @@ print("PYTHON_HOOK: complete")
startHooksExporter("exporter-hooks-slow-before.yaml")

out, err := RunCmd("timeout", "60", "jmp", "shell",
"--retry-timeout", "75",
"--client", "test-client-hooks",
"--selector", "example.com/board=hooks",
"--duration", "5s", "--", "sleep", "30")
Expand All @@ -383,6 +391,7 @@ print("PYTHON_HOOK: complete")
startHooksExporter("exporter-hooks-slow-before.yaml")

out, err := RunCmd("timeout", "60", "jmp", "shell",
"--retry-timeout", "75",
"--client", "test-client-hooks",
"--selector", "example.com/board=hooks",
"--duration", "12s", "--", "sleep", "30")
Expand Down Expand Up @@ -411,6 +420,7 @@ print("PYTHON_HOOK: complete")
startHooksExporterSingle("exporter-hooks-before-fail-exit-with-after.yaml")

out, err := Jmp("shell", "--client", "test-client-hooks",
"--retry-timeout", "0",
"--selector", "example.com/board=hooks", "j", "power", "on")
Expect(err).To(HaveOccurred())
Expect(out).NotTo(ContainSubstring("AFTER_SHOULD_NOT_RUN"))
Expand Down
14 changes: 14 additions & 0 deletions python/packages/jumpstarter-cli/jumpstarter_cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ def convert(self, value, param, ctx):
),
)

RETRY_TIMEOUT = DurationParamType(minimum=timedelta(seconds=0))

opt_retry_timeout = partial(
click.option,
"--retry-timeout",
"retry_timeout",
type=RETRY_TIMEOUT,
default=None,
help=(
"Override retry timeout for unreachable exporters (e.g., '5m', '30s', "
"'0' to disable). Env: JMP_RETRY_TIMEOUT. Default: 5m."
),
)

opt_begin_time = click.option(
"--begin-time",
"begin_time",
Expand Down
86 changes: 63 additions & 23 deletions python/packages/jumpstarter-cli/jumpstarter_cli/shell.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import sys
import time
from contextlib import ExitStack
from datetime import timedelta
from types import SimpleNamespace
Expand All @@ -23,12 +24,16 @@
)
from jumpstarter_cli_common.signal import signal_handler

from .common import opt_acquisition_timeout, opt_duration_partial, opt_exporter_name, opt_selector
from .common import opt_acquisition_timeout, opt_duration_partial, opt_exporter_name, opt_retry_timeout, opt_selector
from .login import relogin_client
from jumpstarter.client import DirectLease
from jumpstarter.client.client import client_from_path
from jumpstarter.common import HOOK_WARNING_PREFIX, ExporterStatus
from jumpstarter.common.exceptions import ConnectionError, ExporterOfflineError
from jumpstarter.common.exceptions import (
ConnectionError,
ExporterOfflineError,
ExporterUnreachableError,
)
from jumpstarter.common.utils import launch_shell
from jumpstarter.config.client import ClientConfigV1Alpha1
from jumpstarter.config.env import JMP_LEASE
Expand All @@ -41,7 +46,6 @@
_TOKEN_REFRESH_THRESHOLD_SECONDS = 120



def _run_shell_only(lease, config, command, path: str) -> int:
"""Run just the shell command without log streaming."""
allow = config.drivers.allow if config is not None else getattr(lease, "allow", [])
Expand Down Expand Up @@ -288,9 +292,14 @@ async def _run_shell_with_lease_async(lease, exporter_logs, config, command, can
insecure=getattr(lease, "insecure", False),
passphrase=getattr(lease, "passphrase", None),
) as client:
# Probe GetStatus before log stream so the server-side error
# from unsupported exporters is not streamed to the terminal.
await client.get_status_async()
try:
await client.get_status_async()
except grpc.aio.AioRpcError as e:
if e.code() in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED):
raise ExporterUnreachableError(
f"Exporter {lease.exporter_name} did not respond to initial status check"
) from e
raise

# Start log streaming and status monitor together
# The status monitor polls in the background for reliable status tracking
Expand Down Expand Up @@ -424,7 +433,8 @@ async def _run_shell_with_lease_async(lease, exporter_logs, config, command, can


async def _shell_with_signal_handling( # noqa: C901
config, selector, exporter_name, lease_name, duration, exporter_logs, command, acquisition_timeout
config, selector, exporter_name, lease_name, duration, exporter_logs, command, acquisition_timeout,
retry_timeout=None,
):
"""Handle lease acquisition and shell execution with signal handling."""
exit_code = 0
Expand All @@ -447,23 +457,52 @@ async def _shell_with_signal_handling( # noqa: C901
try:
try:
async with anyio.from_thread.BlockingPortal() as portal:
async with config.lease_async(
selector, exporter_name, lease_name, duration, portal, acquisition_timeout
) as lease:
lease_used = lease

# Start token monitoring only once we're in the shell
tg.start_soon(_monitor_token_expiry, config, lease, tg.cancel_scope, token_state)

exit_code = await _run_shell_with_lease_async(
lease, exporter_logs, config, command, tg.cancel_scope
)
if lease.release and lease.name and token_state["expired_unrecovered"]:
_warn_about_expired_token(lease.name, selector)
connect_deadline = None
while True:
async with config.lease_async(
selector, exporter_name, lease_name, duration, portal, acquisition_timeout,
retry_timeout=retry_timeout,
) as lease:
lease_used = lease

# Start token monitoring only once we're in the shell
tg.start_soon(_monitor_token_expiry, config, lease, tg.cancel_scope, token_state)

unreachable = None
try:
exit_code = await _run_shell_with_lease_async(
lease, exporter_logs, config, command, tg.cancel_scope
)
except BaseExceptionGroup as eg:
unreachable = find_exception_in_group(eg, ExporterUnreachableError)
if unreachable is None:
raise
except ExporterUnreachableError as exc:
unreachable = exc
if unreachable is not None:
if connect_deadline is None:
connect_deadline = time.monotonic() + lease.retry_timeout
if time.monotonic() >= connect_deadline:
raise ExporterUnreachableError(
f"Exporter {lease.exporter_name} unreachable after "
f"{lease.retry_timeout:.0f}s of retrying"
) from unreachable
logger.warning(
"Exporter %s is unreachable, releasing lease and retrying...",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a non-named lease it does not release, simply retries.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, cosmetic but definitely worth fixing. i'll probably do a follow up pr for this one and for better test cases

lease.exporter_name,
)
logger.debug("Unreachable cause: %s", unreachable)
continue # lease released by __aexit__, loop re-acquires
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if lease.release and lease.name and token_state["expired_unrecovered"]:
_warn_about_expired_token(lease.name, selector)
break
except BaseExceptionGroup as eg:
for exc in eg.exceptions:
if isinstance(exc, TimeoutError):
raise exc from None
unreachable_exc = find_exception_in_group(eg, ExporterUnreachableError)
if unreachable_exc:
raise unreachable_exc from None
offline_exc = find_exception_in_group(eg, ExporterOfflineError)
if offline_exc:
raise offline_exc from None
Expand Down Expand Up @@ -580,9 +619,7 @@ async def _shell_direct_async(
async with create_task_group() as tg:
tg.start_soon(signal_handler, tg.cancel_scope)
try:
exit_code = await _run_shell_with_lease_async(
lease, exporter_logs, config, command, tg.cancel_scope
)
exit_code = await _run_shell_with_lease_async(lease, exporter_logs, config, command, tg.cancel_scope)
except grpc.aio.AioRpcError as e:
if e.code() == grpc.StatusCode.UNAUTHENTICATED:
raise click.ClickException("Authentication failed: invalid or missing passphrase") from None
Expand All @@ -607,6 +644,7 @@ async def _shell_direct_async(
@opt_duration_partial(default=timedelta(minutes=30), show_default="00:30:00")
@click.option("--exporter-logs", is_flag=True, help="Enable exporter log streaming")
@opt_acquisition_timeout()
@opt_retry_timeout()
# direct connection (no controller)
@click.option(
"--tls-grpc",
Expand Down Expand Up @@ -637,6 +675,7 @@ def shell(
duration,
exporter_logs,
acquisition_timeout,
retry_timeout,
tls_grpc_address,
tls_grpc_insecure,
passphrase,
Expand Down Expand Up @@ -685,6 +724,7 @@ def shell(
exporter_logs,
command,
acquisition_timeout,
retry_timeout,
)
sys.exit(exit_code)

Expand Down
Loading
Loading