From 059db3a637941402eff212c4fd36232df5c472a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Fran=C3=A7a?= Date: Fri, 8 May 2026 02:17:53 -0300 Subject: [PATCH] feat(vm-events): enhance VM event handling and introduce sandbox eviction logic - Updated event types in `vm-events.ts` to include new daemon events. - Added `STALE_CLAIM_DETECTION_MS` for improved stale sandbox detection. - Implemented `writeSse` function for consistent SSE event writing. - Enhanced UI to handle "sandbox-evicted" phase, prompting users to start a new sandbox. - Updated `EnvContent` and `PreviewContent` components to reflect new eviction logic and reset SSE connection on retries. - Refactored `VmEventsProvider` to manage SSE connection resets after terminal failures. --- apps/mesh/src/api/routes/vm-events.ts | 223 ++++++++++++------ apps/mesh/src/web/components/vm/env/env.tsx | 33 ++- .../components/vm/hooks/vm-events-context.tsx | 73 +++--- .../components/vm/preview/preview-state.ts | 9 +- .../src/web/components/vm/preview/preview.tsx | 5 +- .../src/web/components/vm/upstream-status.ts | 6 +- .../web/components/vm/vm-booting-state.tsx | 29 ++- packages/sandbox/daemon/event-types.ts | 79 +++++++ packages/sandbox/package.json | 3 +- .../sandbox/server/runner/lifecycle-types.ts | 1 + 10 files changed, 324 insertions(+), 137 deletions(-) create mode 100644 packages/sandbox/daemon/event-types.ts diff --git a/apps/mesh/src/api/routes/vm-events.ts b/apps/mesh/src/api/routes/vm-events.ts index a737e4f5d0..e2a2d4d9e9 100644 --- a/apps/mesh/src/api/routes/vm-events.ts +++ b/apps/mesh/src/api/routes/vm-events.ts @@ -8,7 +8,7 @@ * VM_START posting a SandboxClaim and the daemon coming online. * Agent-sandbox runner emits real K8s phases; other runners emit a * single synthetic `ready`. - * 2. Daemon events (`event: log|status|scripts|processes|reload|branch-status`) + * 2. Daemon events (`event: log|status|scripts|tasks|intent|phases|branch-status|transition`) * — proxied from the in-pod daemon's `/_decopilot_vm/events` SSE once * lifecycle reaches `ready`. Wire format is preserved verbatim by raw * byte-piping the upstream body, so daemon and client speak the same @@ -36,11 +36,12 @@ import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; +import type { SSEStreamingApi } from "hono/streaming"; import { composeSandboxRef, resolveRunnerKindFromEnv, } from "@decocms/sandbox/runner"; -import type { ClaimPhase } from "@decocms/sandbox/runner"; +import type { ClaimPhase, RunnerKind } from "@decocms/sandbox/runner"; import { computeClaimHandle } from "../../sandbox/claim-handle"; import { getOrInitSharedRunner, @@ -66,8 +67,31 @@ import type { Env } from "../hono-env"; */ const NO_CLAIM_MAX_MS = 90_000; +/** + * Shorter stale-sandbox detection window used only when `expectingHandle` is + * true (a prior sandbox was running). After this many ms stuck in `claiming` + * with no pod activity, we conclude the underlying pod was deleted while the + * SandboxClaim CRD still exists (runner.alive() returned true but the pod is + * gone). 30s is long enough for the operator to emit at least one scheduling + * event if it were actively creating a pod, yet short enough to surface a + * clear error quickly instead of making the user wait the full 90s. + */ +const STALE_CLAIM_DETECTION_MS = 30_000; + const HEARTBEAT_MS = 15_000; +type VmSseEvent = + | { event: "phase"; data: ClaimPhase } + | { event: "gone" } + | { event: "keepalive" }; + +function writeSse(stream: SSEStreamingApi, e: VmSseEvent): Promise { + return stream.writeSSE({ + event: e.event, + data: "data" in e ? JSON.stringify(e.data) : "", + }); +} + export const createVmEventsRoutes = () => { const app = new Hono(); app.get("/", async (c) => { @@ -95,11 +119,6 @@ export const createVmEventsRoutes = () => { return c.json({ error: "virtualMcpId and branch are required" }, 400); } - // Verify caller's org actually owns this virtualMcp. Without this check, - // an authenticated user could probe arbitrary virtualMcpIds — the claim - // hash includes their userId so they couldn't *observe* anyone else's - // events, but the 404 vs not-yet-created surface would still leak - // existence/identity information. const virtualMcp = await ctx.storage.virtualMcps.findById(virtualMcpId); if (!virtualMcp || virtualMcp.organization_id !== organization.id) { return c.json({ error: "Virtual MCP not found" }, 404); @@ -113,14 +132,6 @@ export const createVmEventsRoutes = () => { const claimName = computeClaimHandle({ userId, projectRef }, branch); const runnerKind = resolveRunnerKindFromEnv(); - // Snapshot vmMap from the same metadata read used for the org-ownership - // check. Used below to gate the stale-handle probe: we only run it when - // this user already had a vmMap entry pointing at *this exact* claim. - // The vmId-match guard avoids racing VM_START's claim-creation window - // (~250ms–1.2s for agent-sandbox before `createSandboxClaim` lands; - // similar window for host/docker between `runner.ensure` returning and - // `setVmMapEntry` writing the row). Without it, an SSE that opens during - // that window would observe alive=false and emit a spurious `gone`. const existingVmEntry = resolveVm( readVmMap(virtualMcp.metadata as Record | null), userId, @@ -135,13 +146,13 @@ export const createVmEventsRoutes = () => { // phase rather than a silent close so the UI shows a meaningful error. if (!runner) { return streamSSE(c, async (stream) => { - await stream.writeSSE({ + await writeSse(stream, { event: "phase", - data: JSON.stringify({ + data: { kind: "failed", reason: "unknown", message: "No sandbox runner configured on this mesh.", - } satisfies ClaimPhase), + }, }); }); } @@ -152,7 +163,7 @@ export const createVmEventsRoutes = () => { return streamSSE(c, async (stream) => { const abortCtl = new AbortController(); const heartbeat = setInterval(() => { - stream.writeSSE({ event: "keepalive", data: "" }).catch(() => { + writeSse(stream, { event: "keepalive" }).catch(() => { clearInterval(heartbeat); }); }, HEARTBEAT_MS); @@ -162,15 +173,24 @@ export const createVmEventsRoutes = () => { }); try { - // Same probe for every runner. `runner.alive` is honest across - // host/docker/freestyle/agent-sandbox: each implementation queries - // its respective source-of-truth (state-store + pid for host, docker - // inspect, K8s API, freestyle daemon HTTP). When the prior vmMap - // entry's runner kind differs from the env's current runner, we - // route the stale-state cleanup through the *prior* kind so we - // don't leave behind rows in the wrong table. - if (expectingHandle) { - const stale = await isStaleHandle(runner, claimName); + const priorStateRow = await ctx.db + .selectFrom("sandbox_runner_state") + .select(["handle", "runner_kind"]) + .where("user_id", "=", userId) + .where("project_ref", "=", projectRef) + .executeTakeFirst() + .catch(() => null); + const hasKnownClaim = expectingHandle || priorStateRow !== null; + + if (hasKnownClaim) { + const priorKind = (existingRunnerKind ?? + priorStateRow?.runner_kind ?? + null) as RunnerKind | null; + + const runnerMismatch = priorKind !== null && priorKind !== runnerKind; + const stale = + runnerMismatch || (await isStaleHandle(runner, claimName)); + if (stale) { await cleanupStaleEntry({ ctx, @@ -178,9 +198,24 @@ export const createVmEventsRoutes = () => { claimName, userId, projectRef, - runnerKind: existingRunnerKind ?? runnerKind, + runnerKind: priorKind ?? runnerKind, }); - await stream.writeSSE({ event: "gone", data: "" }).catch(() => {}); + if (expectingHandle) { + // vmMap points at this claim → self-heal: browser triggers VM_START. + await writeSse(stream, { event: "gone" }).catch(() => {}); + } else { + // State-store only (no vmMap entry) → manual prompt; auto-heal + // won't fire without a vmMap entry to give deadVmId a value. + await writeSse(stream, { + event: "phase", + data: { + kind: "failed", + reason: "sandbox-evicted", + message: + "The sandbox was removed. Start a new one to continue.", + }, + }).catch(() => {}); + } return; } } @@ -191,6 +226,19 @@ export const createVmEventsRoutes = () => { claimName, runner, signal: abortCtl.signal, + hasExistingClaim: hasKnownClaim, + onStaleClaim: hasKnownClaim + ? async () => { + await cleanupStaleEntry({ + ctx, + runner, + claimName, + userId, + projectRef, + runnerKind: existingRunnerKind ?? runnerKind, + }); + } + : undefined, }); if (!lifecycleOk || abortCtl.signal.aborted) return; @@ -263,7 +311,7 @@ async function cleanupStaleEntry(args: { claimName: string; userId: string; projectRef: string; - runnerKind: "host" | "docker" | "freestyle" | "agent-sandbox"; + runnerKind: RunnerKind; }): Promise { const { ctx, runner, claimName, userId, projectRef, runnerKind } = args; try { @@ -298,22 +346,39 @@ async function cleanupStaleEntry(args: { * `ready` phase and ends immediately. */ async function emitLifecycle(args: { - stream: import("hono/streaming").SSEStreamingApi; + stream: SSEStreamingApi; claimName: string; runner: NonNullable>>; signal: AbortSignal; + /** + * True when the caller had an existing vmMap entry pointing at this claim + * (i.e. a prior sandbox was running). Enables the shorter + * STALE_CLAIM_DETECTION_MS secondary timer: if the operator hasn't + * produced any pod activity within that window, the pod was likely deleted + * while the SandboxClaim CRD still exists. + */ + hasExistingClaim: boolean; + /** + * Called before emitting `sandbox-evicted` when the secondary timer fires. + * Lets the caller drop the stale state-store row so VM_START's `ensure` + * path doesn't chase a dead port-forward on the next attempt. + */ + onStaleClaim?: () => Promise; }): Promise { - const { stream, claimName, runner, signal } = args; + const { stream, claimName, runner, signal, hasExistingClaim, onStaleClaim } = + args; return new Promise((resolve) => { let settled = false; let claimSeen = false; let handle: { unsubscribe(): void } | null = null; + let staleClaimTimer: ReturnType | null = null; const settle = (result: boolean) => { if (settled) return; settled = true; clearTimeout(watchdogTimer); + if (staleClaimTimer !== null) clearTimeout(staleClaimTimer); signal.removeEventListener("abort", onAbort); handle?.unsubscribe(); resolve(result); @@ -327,19 +392,47 @@ async function emitLifecycle(args: { // never fires. const watchdogTimer = setTimeout(() => { if (claimSeen || settled) return; - stream - .writeSSE({ + writeSse(stream, { + event: "phase", + data: { + kind: "failed", + reason: "claim-never-created", + message: + "Sandbox claim was never created. The VM_START call may have failed earlier — check the start error.", + }, + }).catch(() => {}); + settle(false); + }, NO_CLAIM_MAX_MS); + + // Secondary stale-sandbox detection: only fires when the caller had a + // prior running sandbox (hasExistingClaim). If we're still in `claiming` + // after STALE_CLAIM_DETECTION_MS with no pod events, the SandboxClaim + // exists in K8s but its pod was deleted — runner.alive() returned true + // (claim present) but the operator isn't creating a new pod. Surface + // `sandbox-evicted` so the UI shows a targeted "start new sandbox" + // prompt rather than making the user wait the full 90s. + if (hasExistingClaim) { + staleClaimTimer = setTimeout(async () => { + if (claimSeen || settled) return; + try { + await onStaleClaim?.(); + } catch { + /* swallow — cleanup failure doesn't block the user-visible flow */ + } + // Re-check after the async cleanup to guard against a concurrent settle. + if (settled) return; + await writeSse(stream, { event: "phase", - data: JSON.stringify({ + data: { kind: "failed", - reason: "claim-never-created", + reason: "sandbox-evicted", message: - "Sandbox claim was never created. The VM_START call may have failed earlier — check the start error.", - } satisfies ClaimPhase), - }) - .catch(() => {}); - settle(false); - }, NO_CLAIM_MAX_MS); + "The sandbox is no longer running. The underlying pod may have been removed.", + }, + }).catch(() => {}); + settle(false); + }, STALE_CLAIM_DETECTION_MS); + } const onAbort = () => settle(false); signal.addEventListener("abort", onAbort, { once: true }); @@ -347,9 +440,7 @@ async function emitLifecycle(args: { handle = subscribeLifecycle(runner, claimName, (phase) => { if (settled) return; if (phase.kind !== "claiming") claimSeen = true; - stream - .writeSSE({ event: "phase", data: JSON.stringify(phase) }) - .catch(() => {}); + writeSse(stream, { event: "phase", data: phase }).catch(() => {}); if (phase.kind === "ready") settle(true); else if (phase.kind === "failed") settle(false); }); @@ -385,7 +476,7 @@ const PROXY_OPEN_RETRY_DELAY_MS = 500; * existing error state. */ async function proxyDaemonEvents(args: { - stream: import("hono/streaming").SSEStreamingApi; + stream: SSEStreamingApi; runner: NonNullable>>; claimName: string; signal: AbortSignal; @@ -418,16 +509,14 @@ async function proxyDaemonEvents(args: { continue; } const message = err instanceof Error ? err.message : String(err); - await stream - .writeSSE({ - event: "phase", - data: JSON.stringify({ - kind: "failed", - reason: "unknown", - message: `Upstream daemon SSE error: ${message}`, - } satisfies ClaimPhase), - }) - .catch(() => {}); + await writeSse(stream, { + event: "phase", + data: { + kind: "failed", + reason: "unknown", + message: `Upstream daemon SSE error: ${message}`, + }, + }).catch(() => {}); return; } @@ -443,7 +532,7 @@ async function proxyDaemonEvents(args: { } // Budget elapsed and handle still missing — genuine eviction. Emit // `gone` so the client's self-heal (VM_START) takes over. - await stream.writeSSE({ event: "gone", data: "" }).catch(() => {}); + await writeSse(stream, { event: "gone" }).catch(() => {}); return; } @@ -453,16 +542,14 @@ async function proxyDaemonEvents(args: { } catch { /* ignore */ } - await stream - .writeSSE({ - event: "phase", - data: JSON.stringify({ - kind: "failed", - reason: "unknown", - message: `Upstream daemon SSE failed (${attempt.status}).`, - } satisfies ClaimPhase), - }) - .catch(() => {}); + await writeSse(stream, { + event: "phase", + data: { + kind: "failed", + reason: "unknown", + message: `Upstream daemon SSE failed (${attempt.status}).`, + }, + }).catch(() => {}); return; } diff --git a/apps/mesh/src/web/components/vm/env/env.tsx b/apps/mesh/src/web/components/vm/env/env.tsx index c1c64ae694..71a29fed59 100644 --- a/apps/mesh/src/web/components/vm/env/env.tsx +++ b/apps/mesh/src/web/components/vm/env/env.tsx @@ -170,19 +170,28 @@ export function EnvContent({ daemonOpen = false }: { daemonOpen?: boolean }) { const vmEvents = useVmEvents(); + // A terminal `sandbox-evicted` phase means the sandbox is gone and the user + // must manually start a new one. Treat it as idle so the Run button appears, + // regardless of whether vmData still has a stale entry. + const isEvicted = + vmEvents.phase?.kind === "failed" && + vmEvents.phase.reason === "sandbox-evicted"; + const vmStartPending = useIsVmStartPending( inset?.entity?.id, currentBranch ?? undefined, ); const derivedStatus: ViewStatus = vmEvents.suspended ? "suspended" - : vmEvents.notFound - ? "creating" - : vmData - ? "running" - : vmStartPending - ? "creating" - : "idle"; + : isEvicted + ? "idle" + : vmEvents.notFound + ? "creating" + : vmData + ? "running" + : vmStartPending + ? "creating" + : "idle"; const status: ViewStatus = override ?? derivedStatus; // Clear the override when the derived state catches up. @@ -330,6 +339,10 @@ export function EnvContent({ daemonOpen = false }: { daemonOpen?: boolean }) { scriptsAppliedRef.current = false; setOpenScriptTabs([]); setActiveTab("setup"); + // Reconnect the SSE so lifecycle events from the new sandbox are received. + // Required after a terminal `failed` phase (e.g. sandbox-evicted) which + // permanently closes the stream until resetConnection() is called. + vmEvents.resetConnection(); try { if (!inset?.entity) throw new Error("No virtual MCP context"); @@ -454,6 +467,12 @@ export function EnvContent({ daemonOpen = false }: { daemonOpen?: boolean }) { )} + {isEvicted && ( +

+ Sandbox was removed. Start a new one to continue. +

+ )} +
diff --git a/apps/mesh/src/web/components/vm/upstream-status.ts b/apps/mesh/src/web/components/vm/upstream-status.ts index 9e638c472f..45ec1eff9e 100644 --- a/apps/mesh/src/web/components/vm/upstream-status.ts +++ b/apps/mesh/src/web/components/vm/upstream-status.ts @@ -1,5 +1 @@ -/** - * Upstream HTTP probe status emitted by the daemon over SSE. - * Mirrors the daemon's `UpstreamStatus` (packages/sandbox/daemon/probe.ts). - */ -export type UpstreamStatus = "booting" | "online" | "offline"; +export type { UpstreamStatus } from "@decocms/sandbox/daemon"; diff --git a/apps/mesh/src/web/components/vm/vm-booting-state.tsx b/apps/mesh/src/web/components/vm/vm-booting-state.tsx index 67a791342d..84292c629e 100644 --- a/apps/mesh/src/web/components/vm/vm-booting-state.tsx +++ b/apps/mesh/src/web/components/vm/vm-booting-state.tsx @@ -391,6 +391,8 @@ const LIFECYCLE_COPY: Record< function failureCopy(phase: Extract): { headline: string; body: string; + retryLabel?: string; + hideViewLogs?: boolean; } { switch (phase.reason) { case "image-pull-backoff": @@ -413,6 +415,13 @@ function failureCopy(phase: Extract): { headline: "Sandbox claim was never posted", body: phase.message, }; + case "sandbox-evicted": + return { + headline: "Sandbox was removed", + body: "The sandbox is no longer running. Start a new one to continue.", + retryLabel: "Start new sandbox", + hideViewLogs: true, + }; case "reconciler-error": return { headline: "Sandbox controller reported an error", @@ -452,17 +461,19 @@ function ClaimLifecycleView({ onClick={onRetry} className="rounded-md border border-foreground/15 bg-background px-3 py-1.5 text-xs font-medium hover:bg-foreground/[0.04] transition-colors" > - Try again + {copy.retryLabel ?? "Try again"} + + )} + {!copy.hideViewLogs && ( + )} -
); } diff --git a/packages/sandbox/daemon/event-types.ts b/packages/sandbox/daemon/event-types.ts new file mode 100644 index 0000000000..3b83147e0b --- /dev/null +++ b/packages/sandbox/daemon/event-types.ts @@ -0,0 +1,79 @@ +/** + * Wire-format types for the daemon SSE stream at `/_decopilot_vm/events`. + * + * Pure types — no runtime imports — so type-only consumers (notably the + * studio web bundle) can pull them in without dragging daemon runtime deps + * through the dependency graph. Follows the same pattern as + * `server/runner/lifecycle-types.ts`. + * + * The daemon byte-pipes these events through mesh's `/api/:org/vm-events` SSE + * proxy verbatim, so the same types describe what the browser receives. + */ + +// ---- Shared value types ------------------------------------------------------ + +export type UpstreamStatus = "booting" | "online" | "offline"; + +export type BranchStatusReady = { + readonly kind: "ready"; + readonly branch: string; + readonly base: string; + readonly workingTreeDirty: boolean; + readonly unpushed: number; + readonly aheadOfBase: number; + readonly behindBase: number; + /** HEAD sha (falls back to origin/). Empty if the daemon couldn't compute it. */ + readonly headSha: string; +}; + +export type BranchStatus = + | { readonly kind: "initializing" } + | { readonly kind: "cloning" } + | { readonly kind: "clone-failed"; readonly error: string } + | { readonly kind: "checking-out"; readonly to: string } + | { readonly kind: "checkout-failed"; readonly error: string } + | BranchStatusReady; + +export type PhaseStatus = "running" | "done" | "failed"; + +export interface DaemonPhase { + id: string; + name: string; + status: PhaseStatus; + startedAt: number; + doneAt: number | null; + error?: string; +} + +export interface DaemonTask { + id: string; + command: string; + logName?: string; +} + +// ---- Event payload map (SSE event name → JSON payload shape) ----------------- + +export interface DaemonEventMap { + /** Raw terminal output from a named source (script name, "setup", "daemon"). */ + log: { source: string; data: string }; + /** Upstream HTTP probe state — sent on connect and every 15 s as keepalive. */ + status: { status: UpstreamStatus; port: number | null; htmlSupport: boolean }; + /** npm/bun/etc scripts discovered in the repo's package.json. */ + scripts: { scripts: string[] }; + /** Running task-manager entries (supersedes the legacy `processes` event). */ + tasks: { active: DaemonTask[] }; + /** Operator intent: running (normal) or paused (manually stopped). */ + intent: { state: "running" | "paused"; reason?: string }; + /** Setup phase transitions (clone, install, config-driven transitions). */ + phases: { phases: DaemonPhase[] }; + /** Git branch state from the in-pod watcher. */ + "branch-status": BranchStatus; + /** Config-driven orchestrator transition lifecycle. */ + transition: { + kind: string; + phase: "start" | "done" | "failed"; + error?: string; + }; +} + +export type DaemonEventType = keyof DaemonEventMap; diff --git a/packages/sandbox/package.json b/packages/sandbox/package.json index ca391c087a..d3ed5efeb5 100644 --- a/packages/sandbox/package.json +++ b/packages/sandbox/package.json @@ -1,6 +1,6 @@ { "name": "@decocms/sandbox", - "version": "0.4.2", + "version": "0.4.3", "type": "module", "description": "Sandbox runner for isolated per-user containerised tool execution", "scripts": { @@ -11,6 +11,7 @@ }, "exports": { "./shared": "./shared.ts", + "./daemon": "./daemon/event-types.ts", "./runner": "./server/runner/index.ts", "./runner/agent-sandbox": "./server/runner/agent-sandbox/index.ts", "./runner/freestyle": "./server/runner/freestyle/index.ts" diff --git a/packages/sandbox/server/runner/lifecycle-types.ts b/packages/sandbox/server/runner/lifecycle-types.ts index 825f5ffc03..8ed2468e8e 100644 --- a/packages/sandbox/server/runner/lifecycle-types.ts +++ b/packages/sandbox/server/runner/lifecycle-types.ts @@ -18,6 +18,7 @@ export type ClaimFailureReason = | "crash-loop-backoff" | "scheduling-timeout" | "claim-never-created" + | "sandbox-evicted" | "reconciler-error" | "unknown";