Skip to content

Commit ef46008

Browse files
pedrofrxncxclaude
andcommitted
revert: undo feat(vm-events) from PR #3314
Reverts 76a9187 — enhanced VM event handling and sandbox eviction logic. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent be96a90 commit ef46008

10 files changed

Lines changed: 137 additions & 324 deletions

File tree

apps/mesh/src/api/routes/vm-events.ts

Lines changed: 68 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* VM_START posting a SandboxClaim and the daemon coming online.
99
* Agent-sandbox runner emits real K8s phases; other runners emit a
1010
* single synthetic `ready`.
11-
* 2. Daemon events (`event: log|status|scripts|tasks|intent|phases|branch-status|transition`)
11+
* 2. Daemon events (`event: log|status|scripts|processes|reload|branch-status`)
1212
* — proxied from the in-pod daemon's `/_decopilot_vm/events` SSE once
1313
* lifecycle reaches `ready`. Wire format is preserved verbatim by raw
1414
* byte-piping the upstream body, so daemon and client speak the same
@@ -36,12 +36,11 @@
3636

3737
import { Hono } from "hono";
3838
import { streamSSE } from "hono/streaming";
39-
import type { SSEStreamingApi } from "hono/streaming";
4039
import {
4140
composeSandboxRef,
4241
resolveRunnerKindFromEnv,
4342
} from "@decocms/sandbox/runner";
44-
import type { ClaimPhase, RunnerKind } from "@decocms/sandbox/runner";
43+
import type { ClaimPhase } from "@decocms/sandbox/runner";
4544
import { computeClaimHandle } from "../../sandbox/claim-handle";
4645
import {
4746
getOrInitSharedRunner,
@@ -67,31 +66,8 @@ import type { Env } from "../hono-env";
6766
*/
6867
const NO_CLAIM_MAX_MS = 90_000;
6968

70-
/**
71-
* Shorter stale-sandbox detection window used only when `expectingHandle` is
72-
* true (a prior sandbox was running). After this many ms stuck in `claiming`
73-
* with no pod activity, we conclude the underlying pod was deleted while the
74-
* SandboxClaim CRD still exists (runner.alive() returned true but the pod is
75-
* gone). 30s is long enough for the operator to emit at least one scheduling
76-
* event if it were actively creating a pod, yet short enough to surface a
77-
* clear error quickly instead of making the user wait the full 90s.
78-
*/
79-
const STALE_CLAIM_DETECTION_MS = 30_000;
80-
8169
const HEARTBEAT_MS = 15_000;
8270

83-
type VmSseEvent =
84-
| { event: "phase"; data: ClaimPhase }
85-
| { event: "gone" }
86-
| { event: "keepalive" };
87-
88-
function writeSse(stream: SSEStreamingApi, e: VmSseEvent): Promise<void> {
89-
return stream.writeSSE({
90-
event: e.event,
91-
data: "data" in e ? JSON.stringify(e.data) : "",
92-
});
93-
}
94-
9571
export const createVmEventsRoutes = () => {
9672
const app = new Hono<Env>();
9773
app.get("/", async (c) => {
@@ -119,6 +95,11 @@ export const createVmEventsRoutes = () => {
11995
return c.json({ error: "virtualMcpId and branch are required" }, 400);
12096
}
12197

98+
// Verify caller's org actually owns this virtualMcp. Without this check,
99+
// an authenticated user could probe arbitrary virtualMcpIds — the claim
100+
// hash includes their userId so they couldn't *observe* anyone else's
101+
// events, but the 404 vs not-yet-created surface would still leak
102+
// existence/identity information.
122103
const virtualMcp = await ctx.storage.virtualMcps.findById(virtualMcpId);
123104
if (!virtualMcp || virtualMcp.organization_id !== organization.id) {
124105
return c.json({ error: "Virtual MCP not found" }, 404);
@@ -132,6 +113,14 @@ export const createVmEventsRoutes = () => {
132113
const claimName = computeClaimHandle({ userId, projectRef }, branch);
133114
const runnerKind = resolveRunnerKindFromEnv();
134115

116+
// Snapshot vmMap from the same metadata read used for the org-ownership
117+
// check. Used below to gate the stale-handle probe: we only run it when
118+
// this user already had a vmMap entry pointing at *this exact* claim.
119+
// The vmId-match guard avoids racing VM_START's claim-creation window
120+
// (~250ms–1.2s for agent-sandbox before `createSandboxClaim` lands;
121+
// similar window for host/docker between `runner.ensure` returning and
122+
// `setVmMapEntry` writing the row). Without it, an SSE that opens during
123+
// that window would observe alive=false and emit a spurious `gone`.
135124
const existingVmEntry = resolveVm(
136125
readVmMap(virtualMcp.metadata as Record<string, unknown> | null),
137126
userId,
@@ -146,13 +135,13 @@ export const createVmEventsRoutes = () => {
146135
// phase rather than a silent close so the UI shows a meaningful error.
147136
if (!runner) {
148137
return streamSSE(c, async (stream) => {
149-
await writeSse(stream, {
138+
await stream.writeSSE({
150139
event: "phase",
151-
data: {
140+
data: JSON.stringify({
152141
kind: "failed",
153142
reason: "unknown",
154143
message: "No sandbox runner configured on this mesh.",
155-
},
144+
} satisfies ClaimPhase),
156145
});
157146
});
158147
}
@@ -163,7 +152,7 @@ export const createVmEventsRoutes = () => {
163152
return streamSSE(c, async (stream) => {
164153
const abortCtl = new AbortController();
165154
const heartbeat = setInterval(() => {
166-
writeSse(stream, { event: "keepalive" }).catch(() => {
155+
stream.writeSSE({ event: "keepalive", data: "" }).catch(() => {
167156
clearInterval(heartbeat);
168157
});
169158
}, HEARTBEAT_MS);
@@ -173,49 +162,25 @@ export const createVmEventsRoutes = () => {
173162
});
174163

175164
try {
176-
const priorStateRow = await ctx.db
177-
.selectFrom("sandbox_runner_state")
178-
.select(["handle", "runner_kind"])
179-
.where("user_id", "=", userId)
180-
.where("project_ref", "=", projectRef)
181-
.executeTakeFirst()
182-
.catch(() => null);
183-
const hasKnownClaim = expectingHandle || priorStateRow !== null;
184-
185-
if (hasKnownClaim) {
186-
const priorKind = (existingRunnerKind ??
187-
priorStateRow?.runner_kind ??
188-
null) as RunnerKind | null;
189-
190-
const runnerMismatch = priorKind !== null && priorKind !== runnerKind;
191-
const stale =
192-
runnerMismatch || (await isStaleHandle(runner, claimName));
193-
165+
// Same probe for every runner. `runner.alive` is honest across
166+
// host/docker/freestyle/agent-sandbox: each implementation queries
167+
// its respective source-of-truth (state-store + pid for host, docker
168+
// inspect, K8s API, freestyle daemon HTTP). When the prior vmMap
169+
// entry's runner kind differs from the env's current runner, we
170+
// route the stale-state cleanup through the *prior* kind so we
171+
// don't leave behind rows in the wrong table.
172+
if (expectingHandle) {
173+
const stale = await isStaleHandle(runner, claimName);
194174
if (stale) {
195175
await cleanupStaleEntry({
196176
ctx,
197177
runner,
198178
claimName,
199179
userId,
200180
projectRef,
201-
runnerKind: priorKind ?? runnerKind,
181+
runnerKind: existingRunnerKind ?? runnerKind,
202182
});
203-
if (expectingHandle) {
204-
// vmMap points at this claim → self-heal: browser triggers VM_START.
205-
await writeSse(stream, { event: "gone" }).catch(() => {});
206-
} else {
207-
// State-store only (no vmMap entry) → manual prompt; auto-heal
208-
// won't fire without a vmMap entry to give deadVmId a value.
209-
await writeSse(stream, {
210-
event: "phase",
211-
data: {
212-
kind: "failed",
213-
reason: "sandbox-evicted",
214-
message:
215-
"The sandbox was removed. Start a new one to continue.",
216-
},
217-
}).catch(() => {});
218-
}
183+
await stream.writeSSE({ event: "gone", data: "" }).catch(() => {});
219184
return;
220185
}
221186
}
@@ -226,19 +191,6 @@ export const createVmEventsRoutes = () => {
226191
claimName,
227192
runner,
228193
signal: abortCtl.signal,
229-
hasExistingClaim: hasKnownClaim,
230-
onStaleClaim: hasKnownClaim
231-
? async () => {
232-
await cleanupStaleEntry({
233-
ctx,
234-
runner,
235-
claimName,
236-
userId,
237-
projectRef,
238-
runnerKind: existingRunnerKind ?? runnerKind,
239-
});
240-
}
241-
: undefined,
242194
});
243195
if (!lifecycleOk || abortCtl.signal.aborted) return;
244196

@@ -311,7 +263,7 @@ async function cleanupStaleEntry(args: {
311263
claimName: string;
312264
userId: string;
313265
projectRef: string;
314-
runnerKind: RunnerKind;
266+
runnerKind: "host" | "docker" | "freestyle" | "agent-sandbox";
315267
}): Promise<void> {
316268
const { ctx, runner, claimName, userId, projectRef, runnerKind } = args;
317269
try {
@@ -346,39 +298,22 @@ async function cleanupStaleEntry(args: {
346298
* `ready` phase and ends immediately.
347299
*/
348300
async function emitLifecycle(args: {
349-
stream: SSEStreamingApi;
301+
stream: import("hono/streaming").SSEStreamingApi;
350302
claimName: string;
351303
runner: NonNullable<Awaited<ReturnType<typeof getOrInitSharedRunner>>>;
352304
signal: AbortSignal;
353-
/**
354-
* True when the caller had an existing vmMap entry pointing at this claim
355-
* (i.e. a prior sandbox was running). Enables the shorter
356-
* STALE_CLAIM_DETECTION_MS secondary timer: if the operator hasn't
357-
* produced any pod activity within that window, the pod was likely deleted
358-
* while the SandboxClaim CRD still exists.
359-
*/
360-
hasExistingClaim: boolean;
361-
/**
362-
* Called before emitting `sandbox-evicted` when the secondary timer fires.
363-
* Lets the caller drop the stale state-store row so VM_START's `ensure`
364-
* path doesn't chase a dead port-forward on the next attempt.
365-
*/
366-
onStaleClaim?: () => Promise<void>;
367305
}): Promise<boolean> {
368-
const { stream, claimName, runner, signal, hasExistingClaim, onStaleClaim } =
369-
args;
306+
const { stream, claimName, runner, signal } = args;
370307

371308
return new Promise<boolean>((resolve) => {
372309
let settled = false;
373310
let claimSeen = false;
374311
let handle: { unsubscribe(): void } | null = null;
375-
let staleClaimTimer: ReturnType<typeof setTimeout> | null = null;
376312

377313
const settle = (result: boolean) => {
378314
if (settled) return;
379315
settled = true;
380316
clearTimeout(watchdogTimer);
381-
if (staleClaimTimer !== null) clearTimeout(staleClaimTimer);
382317
signal.removeEventListener("abort", onAbort);
383318
handle?.unsubscribe();
384319
resolve(result);
@@ -392,55 +327,29 @@ async function emitLifecycle(args: {
392327
// never fires.
393328
const watchdogTimer = setTimeout(() => {
394329
if (claimSeen || settled) return;
395-
writeSse(stream, {
396-
event: "phase",
397-
data: {
398-
kind: "failed",
399-
reason: "claim-never-created",
400-
message:
401-
"Sandbox claim was never created. The VM_START call may have failed earlier — check the start error.",
402-
},
403-
}).catch(() => {});
404-
settle(false);
405-
}, NO_CLAIM_MAX_MS);
406-
407-
// Secondary stale-sandbox detection: only fires when the caller had a
408-
// prior running sandbox (hasExistingClaim). If we're still in `claiming`
409-
// after STALE_CLAIM_DETECTION_MS with no pod events, the SandboxClaim
410-
// exists in K8s but its pod was deleted — runner.alive() returned true
411-
// (claim present) but the operator isn't creating a new pod. Surface
412-
// `sandbox-evicted` so the UI shows a targeted "start new sandbox"
413-
// prompt rather than making the user wait the full 90s.
414-
if (hasExistingClaim) {
415-
staleClaimTimer = setTimeout(async () => {
416-
if (claimSeen || settled) return;
417-
try {
418-
await onStaleClaim?.();
419-
} catch {
420-
/* swallow — cleanup failure doesn't block the user-visible flow */
421-
}
422-
// Re-check after the async cleanup to guard against a concurrent settle.
423-
if (settled) return;
424-
await writeSse(stream, {
330+
stream
331+
.writeSSE({
425332
event: "phase",
426-
data: {
333+
data: JSON.stringify({
427334
kind: "failed",
428-
reason: "sandbox-evicted",
335+
reason: "claim-never-created",
429336
message:
430-
"The sandbox is no longer running. The underlying pod may have been removed.",
431-
},
432-
}).catch(() => {});
433-
settle(false);
434-
}, STALE_CLAIM_DETECTION_MS);
435-
}
337+
"Sandbox claim was never created. The VM_START call may have failed earlier — check the start error.",
338+
} satisfies ClaimPhase),
339+
})
340+
.catch(() => {});
341+
settle(false);
342+
}, NO_CLAIM_MAX_MS);
436343

437344
const onAbort = () => settle(false);
438345
signal.addEventListener("abort", onAbort, { once: true });
439346

440347
handle = subscribeLifecycle(runner, claimName, (phase) => {
441348
if (settled) return;
442349
if (phase.kind !== "claiming") claimSeen = true;
443-
writeSse(stream, { event: "phase", data: phase }).catch(() => {});
350+
stream
351+
.writeSSE({ event: "phase", data: JSON.stringify(phase) })
352+
.catch(() => {});
444353
if (phase.kind === "ready") settle(true);
445354
else if (phase.kind === "failed") settle(false);
446355
});
@@ -476,7 +385,7 @@ const PROXY_OPEN_RETRY_DELAY_MS = 500;
476385
* existing error state.
477386
*/
478387
async function proxyDaemonEvents(args: {
479-
stream: SSEStreamingApi;
388+
stream: import("hono/streaming").SSEStreamingApi;
480389
runner: NonNullable<Awaited<ReturnType<typeof getOrInitSharedRunner>>>;
481390
claimName: string;
482391
signal: AbortSignal;
@@ -509,14 +418,16 @@ async function proxyDaemonEvents(args: {
509418
continue;
510419
}
511420
const message = err instanceof Error ? err.message : String(err);
512-
await writeSse(stream, {
513-
event: "phase",
514-
data: {
515-
kind: "failed",
516-
reason: "unknown",
517-
message: `Upstream daemon SSE error: ${message}`,
518-
},
519-
}).catch(() => {});
421+
await stream
422+
.writeSSE({
423+
event: "phase",
424+
data: JSON.stringify({
425+
kind: "failed",
426+
reason: "unknown",
427+
message: `Upstream daemon SSE error: ${message}`,
428+
} satisfies ClaimPhase),
429+
})
430+
.catch(() => {});
520431
return;
521432
}
522433

@@ -532,7 +443,7 @@ async function proxyDaemonEvents(args: {
532443
}
533444
// Budget elapsed and handle still missing — genuine eviction. Emit
534445
// `gone` so the client's self-heal (VM_START) takes over.
535-
await writeSse(stream, { event: "gone" }).catch(() => {});
446+
await stream.writeSSE({ event: "gone", data: "" }).catch(() => {});
536447
return;
537448
}
538449

@@ -542,14 +453,16 @@ async function proxyDaemonEvents(args: {
542453
} catch {
543454
/* ignore */
544455
}
545-
await writeSse(stream, {
546-
event: "phase",
547-
data: {
548-
kind: "failed",
549-
reason: "unknown",
550-
message: `Upstream daemon SSE failed (${attempt.status}).`,
551-
},
552-
}).catch(() => {});
456+
await stream
457+
.writeSSE({
458+
event: "phase",
459+
data: JSON.stringify({
460+
kind: "failed",
461+
reason: "unknown",
462+
message: `Upstream daemon SSE failed (${attempt.status}).`,
463+
} satisfies ClaimPhase),
464+
})
465+
.catch(() => {});
553466
return;
554467
}
555468

0 commit comments

Comments
 (0)