Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 155 additions & 68 deletions apps/mesh/src/api/routes/vm-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* VM_START posting a SandboxClaim and the daemon coming online.
* Agent-sandbox runner emits real K8s phases; other runners emit a
* single synthetic `ready`.
* 2. Daemon events (`event: log|status|scripts|processes|reload|branch-status`)
* 2. Daemon events (`event: log|status|scripts|tasks|intent|phases|branch-status|transition`)
* — proxied from the in-pod daemon's `/_decopilot_vm/events` SSE once
* lifecycle reaches `ready`. Wire format is preserved verbatim by raw
* byte-piping the upstream body, so daemon and client speak the same
Expand Down Expand Up @@ -36,11 +36,12 @@

import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import type { SSEStreamingApi } from "hono/streaming";
import {
composeSandboxRef,
resolveRunnerKindFromEnv,
} from "@decocms/sandbox/runner";
import type { ClaimPhase } from "@decocms/sandbox/runner";
import type { ClaimPhase, RunnerKind } from "@decocms/sandbox/runner";
import { computeClaimHandle } from "../../sandbox/claim-handle";
import {
getOrInitSharedRunner,
Expand All @@ -66,8 +67,31 @@ import type { Env } from "../hono-env";
*/
const NO_CLAIM_MAX_MS = 90_000;

/**
* Shorter stale-sandbox detection window used only when `expectingHandle` is
* true (a prior sandbox was running). After this many ms stuck in `claiming`
* with no pod activity, we conclude the underlying pod was deleted while the
* SandboxClaim CRD still exists (runner.alive() returned true but the pod is
* gone). 30s is long enough for the operator to emit at least one scheduling
* event if it were actively creating a pod, yet short enough to surface a
* clear error quickly instead of making the user wait the full 90s.
*/
const STALE_CLAIM_DETECTION_MS = 30_000;

const HEARTBEAT_MS = 15_000;

type VmSseEvent =
| { event: "phase"; data: ClaimPhase }
| { event: "gone" }
| { event: "keepalive" };

function writeSse(stream: SSEStreamingApi, e: VmSseEvent): Promise<void> {
return stream.writeSSE({
event: e.event,
data: "data" in e ? JSON.stringify(e.data) : "",
});
}

export const createVmEventsRoutes = () => {
const app = new Hono<Env>();
app.get("/", async (c) => {
Expand Down Expand Up @@ -95,11 +119,6 @@ export const createVmEventsRoutes = () => {
return c.json({ error: "virtualMcpId and branch are required" }, 400);
}

// Verify caller's org actually owns this virtualMcp. Without this check,
// an authenticated user could probe arbitrary virtualMcpIds — the claim
// hash includes their userId so they couldn't *observe* anyone else's
// events, but the 404 vs not-yet-created surface would still leak
// existence/identity information.
const virtualMcp = await ctx.storage.virtualMcps.findById(virtualMcpId);
if (!virtualMcp || virtualMcp.organization_id !== organization.id) {
return c.json({ error: "Virtual MCP not found" }, 404);
Expand All @@ -113,14 +132,6 @@ export const createVmEventsRoutes = () => {
const claimName = computeClaimHandle({ userId, projectRef }, branch);
const runnerKind = resolveRunnerKindFromEnv();

// Snapshot vmMap from the same metadata read used for the org-ownership
// check. Used below to gate the stale-handle probe: we only run it when
// this user already had a vmMap entry pointing at *this exact* claim.
// The vmId-match guard avoids racing VM_START's claim-creation window
// (~250ms–1.2s for agent-sandbox before `createSandboxClaim` lands;
// similar window for host/docker between `runner.ensure` returning and
// `setVmMapEntry` writing the row). Without it, an SSE that opens during
// that window would observe alive=false and emit a spurious `gone`.
const existingVmEntry = resolveVm(
readVmMap(virtualMcp.metadata as Record<string, unknown> | null),
userId,
Expand All @@ -135,13 +146,13 @@ export const createVmEventsRoutes = () => {
// phase rather than a silent close so the UI shows a meaningful error.
if (!runner) {
return streamSSE(c, async (stream) => {
await stream.writeSSE({
await writeSse(stream, {
event: "phase",
data: JSON.stringify({
data: {
kind: "failed",
reason: "unknown",
message: "No sandbox runner configured on this mesh.",
} satisfies ClaimPhase),
},
});
});
}
Expand All @@ -152,7 +163,7 @@ export const createVmEventsRoutes = () => {
return streamSSE(c, async (stream) => {
const abortCtl = new AbortController();
const heartbeat = setInterval(() => {
stream.writeSSE({ event: "keepalive", data: "" }).catch(() => {
writeSse(stream, { event: "keepalive" }).catch(() => {
clearInterval(heartbeat);
});
}, HEARTBEAT_MS);
Expand All @@ -162,25 +173,49 @@ export const createVmEventsRoutes = () => {
});

try {
// Same probe for every runner. `runner.alive` is honest across
// host/docker/freestyle/agent-sandbox: each implementation queries
// its respective source-of-truth (state-store + pid for host, docker
// inspect, K8s API, freestyle daemon HTTP). When the prior vmMap
// entry's runner kind differs from the env's current runner, we
// route the stale-state cleanup through the *prior* kind so we
// don't leave behind rows in the wrong table.
if (expectingHandle) {
const stale = await isStaleHandle(runner, claimName);
const priorStateRow = await ctx.db
.selectFrom("sandbox_runner_state")
.select(["handle", "runner_kind"])
.where("user_id", "=", userId)
.where("project_ref", "=", projectRef)
.executeTakeFirst()
.catch(() => null);
const hasKnownClaim = expectingHandle || priorStateRow !== null;

if (hasKnownClaim) {
const priorKind = (existingRunnerKind ??
priorStateRow?.runner_kind ??
null) as RunnerKind | null;

const runnerMismatch = priorKind !== null && priorKind !== runnerKind;
const stale =
runnerMismatch || (await isStaleHandle(runner, claimName));

if (stale) {
await cleanupStaleEntry({
ctx,
runner,
claimName,
userId,
projectRef,
runnerKind: existingRunnerKind ?? runnerKind,
runnerKind: priorKind ?? runnerKind,
});
await stream.writeSSE({ event: "gone", data: "" }).catch(() => {});
if (expectingHandle) {
// vmMap points at this claim → self-heal: browser triggers VM_START.
await writeSse(stream, { event: "gone" }).catch(() => {});
} else {
// State-store only (no vmMap entry) → manual prompt; auto-heal
// won't fire without a vmMap entry to give deadVmId a value.
await writeSse(stream, {
event: "phase",
data: {
kind: "failed",
reason: "sandbox-evicted",
message:
"The sandbox was removed. Start a new one to continue.",
},
}).catch(() => {});
}
return;
}
}
Expand All @@ -191,6 +226,19 @@ export const createVmEventsRoutes = () => {
claimName,
runner,
signal: abortCtl.signal,
hasExistingClaim: hasKnownClaim,
onStaleClaim: hasKnownClaim
? async () => {
await cleanupStaleEntry({
ctx,
runner,
claimName,
userId,
projectRef,
runnerKind: existingRunnerKind ?? runnerKind,
});
}
: undefined,
});
if (!lifecycleOk || abortCtl.signal.aborted) return;

Expand Down Expand Up @@ -263,7 +311,7 @@ async function cleanupStaleEntry(args: {
claimName: string;
userId: string;
projectRef: string;
runnerKind: "host" | "docker" | "freestyle" | "agent-sandbox";
runnerKind: RunnerKind;
}): Promise<void> {
const { ctx, runner, claimName, userId, projectRef, runnerKind } = args;
try {
Expand Down Expand Up @@ -298,22 +346,39 @@ async function cleanupStaleEntry(args: {
* `ready` phase and ends immediately.
*/
async function emitLifecycle(args: {
stream: import("hono/streaming").SSEStreamingApi;
stream: SSEStreamingApi;
claimName: string;
runner: NonNullable<Awaited<ReturnType<typeof getOrInitSharedRunner>>>;
signal: AbortSignal;
/**
* True when the caller had an existing vmMap entry pointing at this claim
* (i.e. a prior sandbox was running). Enables the shorter
* STALE_CLAIM_DETECTION_MS secondary timer: if the operator hasn't
* produced any pod activity within that window, the pod was likely deleted
* while the SandboxClaim CRD still exists.
*/
hasExistingClaim: boolean;
/**
* Called before emitting `sandbox-evicted` when the secondary timer fires.
* Lets the caller drop the stale state-store row so VM_START's `ensure`
* path doesn't chase a dead port-forward on the next attempt.
*/
onStaleClaim?: () => Promise<void>;
}): Promise<boolean> {
const { stream, claimName, runner, signal } = args;
const { stream, claimName, runner, signal, hasExistingClaim, onStaleClaim } =
args;

return new Promise<boolean>((resolve) => {
let settled = false;
let claimSeen = false;
let handle: { unsubscribe(): void } | null = null;
let staleClaimTimer: ReturnType<typeof setTimeout> | null = null;

const settle = (result: boolean) => {
if (settled) return;
settled = true;
clearTimeout(watchdogTimer);
if (staleClaimTimer !== null) clearTimeout(staleClaimTimer);
signal.removeEventListener("abort", onAbort);
handle?.unsubscribe();
resolve(result);
Expand All @@ -327,29 +392,55 @@ async function emitLifecycle(args: {
// never fires.
const watchdogTimer = setTimeout(() => {
if (claimSeen || settled) return;
stream
.writeSSE({
writeSse(stream, {
event: "phase",
data: {
kind: "failed",
reason: "claim-never-created",
message:
"Sandbox claim was never created. The VM_START call may have failed earlier — check the start error.",
},
}).catch(() => {});
settle(false);
}, NO_CLAIM_MAX_MS);

// Secondary stale-sandbox detection: only fires when the caller had a
// prior running sandbox (hasExistingClaim). If we're still in `claiming`
// after STALE_CLAIM_DETECTION_MS with no pod events, the SandboxClaim
// exists in K8s but its pod was deleted — runner.alive() returned true
// (claim present) but the operator isn't creating a new pod. Surface
// `sandbox-evicted` so the UI shows a targeted "start new sandbox"
// prompt rather than making the user wait the full 90s.
if (hasExistingClaim) {
staleClaimTimer = setTimeout(async () => {
if (claimSeen || settled) return;
try {
await onStaleClaim?.();
} catch {
/* swallow — cleanup failure doesn't block the user-visible flow */
}
// Re-check after the async cleanup to guard against a concurrent settle.
if (settled) return;
await writeSse(stream, {
event: "phase",
data: JSON.stringify({
data: {
kind: "failed",
reason: "claim-never-created",
reason: "sandbox-evicted",
message:
"Sandbox claim was never created. The VM_START call may have failed earlier — check the start error.",
} satisfies ClaimPhase),
})
.catch(() => {});
settle(false);
}, NO_CLAIM_MAX_MS);
"The sandbox is no longer running. The underlying pod may have been removed.",
},
}).catch(() => {});
settle(false);
}, STALE_CLAIM_DETECTION_MS);
}

const onAbort = () => settle(false);
signal.addEventListener("abort", onAbort, { once: true });

handle = subscribeLifecycle(runner, claimName, (phase) => {
if (settled) return;
if (phase.kind !== "claiming") claimSeen = true;
stream
.writeSSE({ event: "phase", data: JSON.stringify(phase) })
.catch(() => {});
writeSse(stream, { event: "phase", data: phase }).catch(() => {});
if (phase.kind === "ready") settle(true);
else if (phase.kind === "failed") settle(false);
});
Expand Down Expand Up @@ -385,7 +476,7 @@ const PROXY_OPEN_RETRY_DELAY_MS = 500;
* existing error state.
*/
async function proxyDaemonEvents(args: {
stream: import("hono/streaming").SSEStreamingApi;
stream: SSEStreamingApi;
runner: NonNullable<Awaited<ReturnType<typeof getOrInitSharedRunner>>>;
claimName: string;
signal: AbortSignal;
Expand Down Expand Up @@ -418,16 +509,14 @@ async function proxyDaemonEvents(args: {
continue;
}
const message = err instanceof Error ? err.message : String(err);
await stream
.writeSSE({
event: "phase",
data: JSON.stringify({
kind: "failed",
reason: "unknown",
message: `Upstream daemon SSE error: ${message}`,
} satisfies ClaimPhase),
})
.catch(() => {});
await writeSse(stream, {
event: "phase",
data: {
kind: "failed",
reason: "unknown",
message: `Upstream daemon SSE error: ${message}`,
},
}).catch(() => {});
return;
}

Expand All @@ -443,7 +532,7 @@ async function proxyDaemonEvents(args: {
}
// Budget elapsed and handle still missing — genuine eviction. Emit
// `gone` so the client's self-heal (VM_START) takes over.
await stream.writeSSE({ event: "gone", data: "" }).catch(() => {});
await writeSse(stream, { event: "gone" }).catch(() => {});
return;
}

Expand All @@ -453,16 +542,14 @@ async function proxyDaemonEvents(args: {
} catch {
/* ignore */
}
await stream
.writeSSE({
event: "phase",
data: JSON.stringify({
kind: "failed",
reason: "unknown",
message: `Upstream daemon SSE failed (${attempt.status}).`,
} satisfies ClaimPhase),
})
.catch(() => {});
await writeSse(stream, {
event: "phase",
data: {
kind: "failed",
reason: "unknown",
message: `Upstream daemon SSE failed (${attempt.status}).`,
},
}).catch(() => {});
return;
}

Expand Down
Loading
Loading