Skip to content

Commit 3c16183

Browse files
fix(decopilot): keep chat streams alive and recoverable across disconnects (#3316)
* fix(decopilot): keep chat streams alive and recoverable across disconnects Adds 15s SSE keepalive comments to chat stream responses so reverse-proxy read timeouts (nginx ingress, Istio, ALB) don't kill long deep-research runs mid-flight. Fixes claimOrphanedRun excluding same-pod ownership, which pinned threads in_progress forever on single-pod self-hosted deploys when a run left the in-memory registry while run_owner_pod still pointed at the live pod. Bumps MAX_RUN_AGE_MS 30→90min so deep research isn't reaped while the provider job keeps burning credits, and clears the chat-side error banner on silent resume / new send so transient blips don't leave a sticky "network error" toast. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * revert(decopilot): keep MAX_RUN_AGE_MS at 30 minutes Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 831b708 commit 3c16183

7 files changed

Lines changed: 428 additions & 32 deletions

File tree

apps/mesh/src/api/routes/decopilot/routes.ts

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { StreamRequestSchema } from "./schemas";
3535
import type { ChatMessage, ModelsConfig } from "./types";
3636
import { streamCore } from "./stream-core";
3737
import { RunClaimError } from "./run-reactor";
38+
import { wrapWithSseKeepalive } from "./sse-keepalive";
3839
import type { SqlThreadStorage } from "@/storage/threads";
3940
import { getPodId } from "@/core/pod-identity";
4041

@@ -223,10 +224,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
223224
},
224225
});
225226

226-
return createUIMessageStreamResponse({
227-
stream: result.stream,
228-
consumeSseStream: consumeStream,
229-
});
227+
return wrapWithSseKeepalive(
228+
createUIMessageStreamResponse({
229+
stream: result.stream,
230+
consumeSseStream: consumeStream,
231+
}),
232+
);
230233
} catch (err) {
231234
console.error("[decopilot:stream] Error", err);
232235

@@ -321,10 +324,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
321324
{ runRegistry, streamBuffer, cancelBroadcast },
322325
);
323326

324-
return createUIMessageStreamResponse({
325-
stream: result.stream,
326-
consumeSseStream: consumeStream,
327-
});
327+
return wrapWithSseKeepalive(
328+
createUIMessageStreamResponse({
329+
stream: result.stream,
330+
consumeSseStream: consumeStream,
331+
}),
332+
);
328333
} catch (err) {
329334
console.error("[decopilot:stream] Error", err);
330335

@@ -429,10 +434,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
429434
},
430435
});
431436

432-
return createUIMessageStreamResponse({
433-
stream: replayStream,
434-
consumeSseStream: consumeStream,
435-
});
437+
return wrapWithSseKeepalive(
438+
createUIMessageStreamResponse({
439+
stream: replayStream,
440+
consumeSseStream: consumeStream,
441+
}),
442+
);
436443
}
437444

438445
// ── Orphan resume path ──
@@ -511,10 +518,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
511518
{ runRegistry, streamBuffer, cancelBroadcast },
512519
);
513520

514-
return createUIMessageStreamResponse({
515-
stream: result.stream,
516-
consumeSseStream: consumeStream,
517-
});
521+
return wrapWithSseKeepalive(
522+
createUIMessageStreamResponse({
523+
stream: result.stream,
524+
consumeSseStream: consumeStream,
525+
}),
526+
);
518527
} catch (err) {
519528
if (err instanceof HTTPException) throw err;
520529
console.error("[decopilot:attach] Error", err);
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import { describe, it, expect } from "bun:test";
2+
import { wrapStreamWithKeepalive } from "./sse-keepalive";
3+
4+
const enc = new TextEncoder();
5+
const dec = new TextDecoder();
6+
7+
function makeSource(
8+
chunks: Array<{ delayMs: number; bytes: Uint8Array | string } | "close">,
9+
): ReadableStream<Uint8Array> {
10+
return new ReadableStream<Uint8Array>({
11+
async start(controller) {
12+
for (const item of chunks) {
13+
if (item === "close") {
14+
controller.close();
15+
return;
16+
}
17+
if (item.delayMs > 0) {
18+
await new Promise((r) => setTimeout(r, item.delayMs));
19+
}
20+
const value =
21+
typeof item.bytes === "string" ? enc.encode(item.bytes) : item.bytes;
22+
controller.enqueue(value);
23+
}
24+
controller.close();
25+
},
26+
});
27+
}
28+
29+
async function readAll(stream: ReadableStream<Uint8Array>): Promise<string> {
30+
const reader = stream.getReader();
31+
let out = "";
32+
while (true) {
33+
const { done, value } = await reader.read();
34+
if (done) break;
35+
out += dec.decode(value, { stream: true });
36+
}
37+
out += dec.decode();
38+
return out;
39+
}
40+
41+
describe("wrapStreamWithKeepalive", () => {
42+
it("passes through chunks unchanged when source ends quickly", async () => {
43+
const source = makeSource([
44+
{ delayMs: 0, bytes: 'data: {"x":1}\n\n' },
45+
{ delayMs: 0, bytes: "data: [DONE]\n\n" },
46+
]);
47+
const wrapped = wrapStreamWithKeepalive(source, 10_000);
48+
const out = await readAll(wrapped);
49+
expect(out).toBe('data: {"x":1}\n\ndata: [DONE]\n\n');
50+
});
51+
52+
it("injects keepalive comments during silent periods", async () => {
53+
const source = makeSource([
54+
{ delayMs: 0, bytes: 'data: {"x":1}\n\n' },
55+
{ delayMs: 120, bytes: "data: [DONE]\n\n" },
56+
]);
57+
// 30ms interval — should fire ~3-4 times during the 120ms gap.
58+
const wrapped = wrapStreamWithKeepalive(source, 30);
59+
const out = await readAll(wrapped);
60+
const matches = out.match(/: keepalive\n\n/g) ?? [];
61+
expect(matches.length).toBeGreaterThanOrEqual(2);
62+
// Real data must still be present and uncorrupted.
63+
expect(out).toContain('data: {"x":1}\n\n');
64+
expect(out).toContain("data: [DONE]\n\n");
65+
});
66+
67+
it("does NOT inject mid-event when chunk ends without \\n\\n boundary", async () => {
68+
// Simulate a (hypothetical) future where one event spans two chunks.
69+
const source = makeSource([
70+
{ delayMs: 0, bytes: 'data: {"par' }, // partial — no \n\n yet
71+
{ delayMs: 80, bytes: 'tial":true}\n\n' }, // completes the event
72+
]);
73+
const wrapped = wrapStreamWithKeepalive(source, 20);
74+
const out = await readAll(wrapped);
75+
76+
// No keepalive should appear inside `data: {"...}` payload.
77+
// It's OK if heartbeats arrive after the event completes.
78+
const firstNewlinePair = out.indexOf("\n\n");
79+
expect(firstNewlinePair).toBeGreaterThan(0);
80+
const firstEvent = out.slice(0, firstNewlinePair);
81+
expect(firstEvent).not.toContain(": keepalive");
82+
expect(firstEvent).toBe('data: {"partial":true}');
83+
});
84+
85+
it("clears the interval when source ends", async () => {
86+
const source = makeSource([{ delayMs: 0, bytes: "data: end\n\n" }]);
87+
const wrapped = wrapStreamWithKeepalive(source, 10);
88+
await readAll(wrapped);
89+
// If the interval leaked, this test process would hang on shutdown. Bun
90+
// test would surface that. We also assert no spurious keepalives after
91+
// close by waiting briefly and confirming the stream remains drained.
92+
await new Promise((r) => setTimeout(r, 30));
93+
// No assertion needed — the absence of an open handle is the test.
94+
});
95+
96+
it("propagates downstream cancel to the source", async () => {
97+
let sourceCancelled = false;
98+
const source = new ReadableStream<Uint8Array>({
99+
async start(controller) {
100+
controller.enqueue(enc.encode("data: hi\n\n"));
101+
// Hold the stream open indefinitely; rely on cancel propagation.
102+
await new Promise(() => {});
103+
},
104+
cancel() {
105+
sourceCancelled = true;
106+
},
107+
});
108+
109+
const wrapped = wrapStreamWithKeepalive(source, 10_000);
110+
const reader = wrapped.getReader();
111+
await reader.read(); // first chunk
112+
await reader.cancel("client gone");
113+
114+
// Give the cancel a tick to propagate.
115+
await new Promise((r) => setTimeout(r, 5));
116+
expect(sourceCancelled).toBe(true);
117+
});
118+
119+
it("propagates source errors", async () => {
120+
const source = new ReadableStream<Uint8Array>({
121+
start(controller) {
122+
controller.enqueue(enc.encode("data: ok\n\n"));
123+
// Defer the error so the wrapper has a tick to consume the first
124+
// chunk before the rejection arrives. Modeling a real mid-stream
125+
// failure is what we care about, not a synchronous error in start().
126+
queueMicrotask(() => controller.error(new Error("boom")));
127+
},
128+
});
129+
130+
const wrapped = wrapStreamWithKeepalive(source, 10_000);
131+
const reader = wrapped.getReader();
132+
const first = await reader.read();
133+
expect(first.done).toBe(false);
134+
let caught: unknown = null;
135+
try {
136+
await reader.read();
137+
} catch (e) {
138+
caught = e;
139+
}
140+
expect(caught instanceof Error && (caught as Error).message).toBe("boom");
141+
});
142+
});
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/**
2+
* SSE keepalive wrapper.
3+
*
4+
* Wraps a streaming Response so the body interleaves `: keepalive\n\n` SSE
5+
* comment lines on a fixed interval. Per the SSE spec (and confirmed in the
6+
* `eventsource-parser` package the AI SDK uses), lines starting with `:` are
7+
* comments and are silently ignored by the parser — so injecting them is
8+
* transparent to the client.
9+
*
10+
* Why this exists: the AI SDK's `JsonToSseTransformStream` only emits bytes
11+
* when there's a real chunk to send. During long tool calls or deep-research
12+
* polling phases the stream can be byte-silent for tens of seconds, which
13+
* trips the read timeouts of common reverse proxies (nginx ingress 60s,
14+
* Istio 15s, ALB 60s). Heartbeats keep the socket warm.
15+
*
16+
* Defensive design (this is critical-path streaming code):
17+
*
18+
* - Heartbeats are ONLY injected on event boundaries (chunk ending `\n\n`).
19+
* Mid-event injection would corrupt the JSON payload of a `data:` line,
20+
* so a partial-tail chunk suppresses the next heartbeat until the next
21+
* chunk completes the event. Today the AI SDK enqueues one full event per
22+
* chunk so this is moot, but the boundary check guards against future
23+
* chunk-fragmentation upstream.
24+
*
25+
* - The interval is cleared on stream end, source error, and downstream
26+
* cancel — anything else would leak a Node timer per request.
27+
*
28+
* - `controller.enqueue` is wrapped in try/catch so a closed-controller race
29+
* can't surface as an unhandled exception.
30+
*
31+
* - Downstream cancel (client closed connection) is forwarded to the source
32+
* so the relay/JetStream chain unwinds correctly.
33+
*
34+
* - The wrapper never originates errors; source errors propagate unchanged.
35+
*/
36+
37+
const KEEPALIVE_BYTES = new TextEncoder().encode(": keepalive\n\n");
38+
const LF = 0x0a;
39+
const DEFAULT_INTERVAL_MS = 15_000;
40+
41+
/**
42+
* Wrap an SSE response body with periodic keepalive comments.
43+
*
44+
* Returns a new Response with the same status/statusText/headers but a body
45+
* stream that intersperses `: keepalive\n\n` every `intervalMs` between real
46+
* chunks. If `response.body` is null (already-consumed or empty response),
47+
* the original response is returned unchanged.
48+
*/
49+
export function wrapWithSseKeepalive(
50+
response: Response,
51+
intervalMs: number = DEFAULT_INTERVAL_MS,
52+
): Response {
53+
if (!response.body) return response;
54+
55+
const wrapped = wrapStreamWithKeepalive(response.body, intervalMs);
56+
57+
return new Response(wrapped, {
58+
status: response.status,
59+
statusText: response.statusText,
60+
headers: response.headers,
61+
});
62+
}
63+
64+
/**
65+
* Lower-level: wrap a `ReadableStream<Uint8Array>` of SSE bytes with
66+
* periodic keepalive comments. Exposed separately so tests can drive it
67+
* without going through Response/fetch plumbing.
68+
*/
69+
export function wrapStreamWithKeepalive(
70+
source: ReadableStream<Uint8Array>,
71+
intervalMs: number = DEFAULT_INTERVAL_MS,
72+
): ReadableStream<Uint8Array> {
73+
// These vars are captured by both `start` and `cancel`. They live across
74+
// the constructor's two method scopes so cancel can reach the active reader.
75+
let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
76+
let timer: ReturnType<typeof setInterval> | null = null;
77+
let cleaned = false;
78+
// Tracks whether the most recent chunk left the byte stream mid-event (no
79+
// trailing `\n\n`). When true, heartbeats are suppressed until the next
80+
// chunk completes the event boundary.
81+
let pendingPartial = false;
82+
83+
const clearTimer = () => {
84+
if (timer !== null) {
85+
clearInterval(timer);
86+
timer = null;
87+
}
88+
};
89+
90+
return new ReadableStream<Uint8Array>({
91+
async start(controller) {
92+
reader = source.getReader();
93+
94+
const tryHeartbeat = () => {
95+
if (cleaned || pendingPartial) return;
96+
try {
97+
controller.enqueue(KEEPALIVE_BYTES);
98+
} catch {
99+
// Controller already closed — let the read loop's teardown handle
100+
// the rest. Mark cleaned so subsequent ticks no-op.
101+
cleaned = true;
102+
clearTimer();
103+
}
104+
};
105+
106+
timer = setInterval(tryHeartbeat, intervalMs);
107+
108+
try {
109+
while (true) {
110+
const { done, value } = await reader.read();
111+
if (done) {
112+
cleaned = true;
113+
clearTimer();
114+
controller.close();
115+
return;
116+
}
117+
118+
// Decide whether the next heartbeat tick is allowed to fire by
119+
// checking if this chunk ends on an SSE event boundary (`\n\n`).
120+
// Empty chunks (zero-length) inherit the previous boundary state.
121+
const len = value.byteLength;
122+
if (len > 0) {
123+
pendingPartial = !(
124+
len >= 2 &&
125+
value[len - 2] === LF &&
126+
value[len - 1] === LF
127+
);
128+
}
129+
130+
controller.enqueue(value);
131+
}
132+
} catch (err) {
133+
cleaned = true;
134+
clearTimer();
135+
try {
136+
controller.error(err);
137+
} catch {
138+
// Already errored/closed — nothing to do.
139+
}
140+
} finally {
141+
cleaned = true;
142+
clearTimer();
143+
try {
144+
reader.releaseLock();
145+
} catch {
146+
// Lock may already be released (e.g. cancel path); releaseLock
147+
// throws on an unlocked reader.
148+
}
149+
}
150+
},
151+
cancel(reason) {
152+
// Downstream cancelled (e.g. client disconnected). Cancel via the
153+
// active reader (which holds the lock) so propagation reaches the
154+
// source's cancel hook. Falls back to the source if no reader yet.
155+
cleaned = true;
156+
clearTimer();
157+
const cancelTarget = reader ?? source;
158+
cancelTarget.cancel(reason).catch(() => {
159+
// Cancel may reject if already errored — best-effort.
160+
});
161+
},
162+
});
163+
}

0 commit comments

Comments
 (0)