Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions apps/mesh/src/api/routes/decopilot/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { StreamRequestSchema } from "./schemas";
import type { ChatMessage, ModelsConfig } from "./types";
import { streamCore } from "./stream-core";
import { RunClaimError } from "./run-reactor";
import { wrapWithSseKeepalive } from "./sse-keepalive";
import type { SqlThreadStorage } from "@/storage/threads";
import { getPodId } from "@/core/pod-identity";

Expand Down Expand Up @@ -223,10 +224,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
},
});

return createUIMessageStreamResponse({
stream: result.stream,
consumeSseStream: consumeStream,
});
return wrapWithSseKeepalive(
createUIMessageStreamResponse({
stream: result.stream,
consumeSseStream: consumeStream,
}),
);
} catch (err) {
console.error("[decopilot:stream] Error", err);

Expand Down Expand Up @@ -321,10 +324,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
{ runRegistry, streamBuffer, cancelBroadcast },
);

return createUIMessageStreamResponse({
stream: result.stream,
consumeSseStream: consumeStream,
});
return wrapWithSseKeepalive(
createUIMessageStreamResponse({
stream: result.stream,
consumeSseStream: consumeStream,
}),
);
} catch (err) {
console.error("[decopilot:stream] Error", err);

Expand Down Expand Up @@ -429,10 +434,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
},
});

return createUIMessageStreamResponse({
stream: replayStream,
consumeSseStream: consumeStream,
});
return wrapWithSseKeepalive(
createUIMessageStreamResponse({
stream: replayStream,
consumeSseStream: consumeStream,
}),
);
}

// ── Orphan resume path ──
Expand Down Expand Up @@ -511,10 +518,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
{ runRegistry, streamBuffer, cancelBroadcast },
);

return createUIMessageStreamResponse({
stream: result.stream,
consumeSseStream: consumeStream,
});
return wrapWithSseKeepalive(
createUIMessageStreamResponse({
stream: result.stream,
consumeSseStream: consumeStream,
}),
);
} catch (err) {
if (err instanceof HTTPException) throw err;
console.error("[decopilot:attach] Error", err);
Expand Down
142 changes: 142 additions & 0 deletions apps/mesh/src/api/routes/decopilot/sse-keepalive.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { describe, it, expect } from "bun:test";
import { wrapStreamWithKeepalive } from "./sse-keepalive";

const enc = new TextEncoder();
const dec = new TextDecoder();

function makeSource(
chunks: Array<{ delayMs: number; bytes: Uint8Array | string } | "close">,
): ReadableStream<Uint8Array> {
return new ReadableStream<Uint8Array>({
async start(controller) {
for (const item of chunks) {
if (item === "close") {
controller.close();
return;
}
if (item.delayMs > 0) {
await new Promise((r) => setTimeout(r, item.delayMs));
}
const value =
typeof item.bytes === "string" ? enc.encode(item.bytes) : item.bytes;
controller.enqueue(value);
}
controller.close();
},
});
}

async function readAll(stream: ReadableStream<Uint8Array>): Promise<string> {
const reader = stream.getReader();
let out = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
out += dec.decode(value, { stream: true });
}
out += dec.decode();
return out;
}

describe("wrapStreamWithKeepalive", () => {
it("passes through chunks unchanged when source ends quickly", async () => {
const source = makeSource([
{ delayMs: 0, bytes: 'data: {"x":1}\n\n' },
{ delayMs: 0, bytes: "data: [DONE]\n\n" },
]);
const wrapped = wrapStreamWithKeepalive(source, 10_000);
const out = await readAll(wrapped);
expect(out).toBe('data: {"x":1}\n\ndata: [DONE]\n\n');
});

it("injects keepalive comments during silent periods", async () => {
const source = makeSource([
{ delayMs: 0, bytes: 'data: {"x":1}\n\n' },
{ delayMs: 120, bytes: "data: [DONE]\n\n" },
]);
// 30ms interval — should fire ~3-4 times during the 120ms gap.
const wrapped = wrapStreamWithKeepalive(source, 30);
const out = await readAll(wrapped);
const matches = out.match(/: keepalive\n\n/g) ?? [];
expect(matches.length).toBeGreaterThanOrEqual(2);
// Real data must still be present and uncorrupted.
expect(out).toContain('data: {"x":1}\n\n');
expect(out).toContain("data: [DONE]\n\n");
});

it("does NOT inject mid-event when chunk ends without \\n\\n boundary", async () => {
// Simulate a (hypothetical) future where one event spans two chunks.
const source = makeSource([
{ delayMs: 0, bytes: 'data: {"par' }, // partial — no \n\n yet
{ delayMs: 80, bytes: 'tial":true}\n\n' }, // completes the event
]);
const wrapped = wrapStreamWithKeepalive(source, 20);
const out = await readAll(wrapped);

// No keepalive should appear inside `data: {"...}` payload.
// It's OK if heartbeats arrive after the event completes.
const firstNewlinePair = out.indexOf("\n\n");
expect(firstNewlinePair).toBeGreaterThan(0);
const firstEvent = out.slice(0, firstNewlinePair);
expect(firstEvent).not.toContain(": keepalive");
expect(firstEvent).toBe('data: {"partial":true}');
});

it("clears the interval when source ends", async () => {
const source = makeSource([{ delayMs: 0, bytes: "data: end\n\n" }]);
const wrapped = wrapStreamWithKeepalive(source, 10);
await readAll(wrapped);
// If the interval leaked, this test process would hang on shutdown. Bun
// test would surface that. We also assert no spurious keepalives after
// close by waiting briefly and confirming the stream remains drained.
await new Promise((r) => setTimeout(r, 30));
// No assertion needed — the absence of an open handle is the test.
});

it("propagates downstream cancel to the source", async () => {
let sourceCancelled = false;
const source = new ReadableStream<Uint8Array>({
async start(controller) {
controller.enqueue(enc.encode("data: hi\n\n"));
// Hold the stream open indefinitely; rely on cancel propagation.
await new Promise(() => {});
},
cancel() {
sourceCancelled = true;
},
});

const wrapped = wrapStreamWithKeepalive(source, 10_000);
const reader = wrapped.getReader();
await reader.read(); // first chunk
await reader.cancel("client gone");

// Give the cancel a tick to propagate.
await new Promise((r) => setTimeout(r, 5));
expect(sourceCancelled).toBe(true);
});

it("propagates source errors", async () => {
const source = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(enc.encode("data: ok\n\n"));
// Defer the error so the wrapper has a tick to consume the first
// chunk before the rejection arrives. Modeling a real mid-stream
// failure is what we care about, not a synchronous error in start().
queueMicrotask(() => controller.error(new Error("boom")));
},
});

const wrapped = wrapStreamWithKeepalive(source, 10_000);
const reader = wrapped.getReader();
const first = await reader.read();
expect(first.done).toBe(false);
let caught: unknown = null;
try {
await reader.read();
} catch (e) {
caught = e;
}
expect(caught instanceof Error && (caught as Error).message).toBe("boom");
});
});
163 changes: 163 additions & 0 deletions apps/mesh/src/api/routes/decopilot/sse-keepalive.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/**
* SSE keepalive wrapper.
*
* Wraps a streaming Response so the body interleaves `: keepalive\n\n` SSE
* comment lines on a fixed interval. Per the SSE spec (and confirmed in the
* `eventsource-parser` package the AI SDK uses), lines starting with `:` are
* comments and are silently ignored by the parser — so injecting them is
* transparent to the client.
*
* Why this exists: the AI SDK's `JsonToSseTransformStream` only emits bytes
* when there's a real chunk to send. During long tool calls or deep-research
* polling phases the stream can be byte-silent for tens of seconds, which
* trips the read timeouts of common reverse proxies (nginx ingress 60s,
* Istio 15s, ALB 60s). Heartbeats keep the socket warm.
*
* Defensive design (this is critical-path streaming code):
*
* - Heartbeats are ONLY injected on event boundaries (chunk ending `\n\n`).
* Mid-event injection would corrupt the JSON payload of a `data:` line,
* so a partial-tail chunk suppresses the next heartbeat until the next
* chunk completes the event. Today the AI SDK enqueues one full event per
* chunk so this is moot, but the boundary check guards against future
* chunk-fragmentation upstream.
*
* - The interval is cleared on stream end, source error, and downstream
* cancel — anything else would leak a Node timer per request.
*
* - `controller.enqueue` is wrapped in try/catch so a closed-controller race
* can't surface as an unhandled exception.
*
* - Downstream cancel (client closed connection) is forwarded to the source
* so the relay/JetStream chain unwinds correctly.
*
* - The wrapper never originates errors; source errors propagate unchanged.
*/

const KEEPALIVE_BYTES = new TextEncoder().encode(": keepalive\n\n");
const LF = 0x0a;
const DEFAULT_INTERVAL_MS = 15_000;

/**
* Wrap an SSE response body with periodic keepalive comments.
*
* Returns a new Response with the same status/statusText/headers but a body
* stream that intersperses `: keepalive\n\n` every `intervalMs` between real
* chunks. If `response.body` is null (already-consumed or empty response),
* the original response is returned unchanged.
*/
export function wrapWithSseKeepalive(
response: Response,
intervalMs: number = DEFAULT_INTERVAL_MS,
): Response {
if (!response.body) return response;

const wrapped = wrapStreamWithKeepalive(response.body, intervalMs);

return new Response(wrapped, {
status: response.status,
statusText: response.statusText,
headers: response.headers,
});
}

/**
* Lower-level: wrap a `ReadableStream<Uint8Array>` of SSE bytes with
* periodic keepalive comments. Exposed separately so tests can drive it
* without going through Response/fetch plumbing.
*/
export function wrapStreamWithKeepalive(
source: ReadableStream<Uint8Array>,
intervalMs: number = DEFAULT_INTERVAL_MS,
): ReadableStream<Uint8Array> {
// These vars are captured by both `start` and `cancel`. They live across
// the constructor's two method scopes so cancel can reach the active reader.
let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
let timer: ReturnType<typeof setInterval> | null = null;
let cleaned = false;
// Tracks whether the most recent chunk left the byte stream mid-event (no
// trailing `\n\n`). When true, heartbeats are suppressed until the next
// chunk completes the event boundary.
let pendingPartial = false;

const clearTimer = () => {
if (timer !== null) {
clearInterval(timer);
timer = null;
}
};

return new ReadableStream<Uint8Array>({
async start(controller) {
reader = source.getReader();

const tryHeartbeat = () => {
if (cleaned || pendingPartial) return;
try {
controller.enqueue(KEEPALIVE_BYTES);
} catch {
// Controller already closed — let the read loop's teardown handle
// the rest. Mark cleaned so subsequent ticks no-op.
cleaned = true;
clearTimer();
}
};

timer = setInterval(tryHeartbeat, intervalMs);

try {
while (true) {
const { done, value } = await reader.read();
if (done) {
cleaned = true;
clearTimer();
controller.close();
return;
}

// Decide whether the next heartbeat tick is allowed to fire by
// checking if this chunk ends on an SSE event boundary (`\n\n`).
// Empty chunks (zero-length) inherit the previous boundary state.
const len = value.byteLength;
if (len > 0) {
pendingPartial = !(
len >= 2 &&
value[len - 2] === LF &&
value[len - 1] === LF
);
}

controller.enqueue(value);
}
} catch (err) {
cleaned = true;
clearTimer();
try {
controller.error(err);
} catch {
// Already errored/closed — nothing to do.
}
} finally {
cleaned = true;
clearTimer();
try {
reader.releaseLock();
} catch {
// Lock may already be released (e.g. cancel path); releaseLock
// throws on an unlocked reader.
}
}
},
cancel(reason) {
// Downstream cancelled (e.g. client disconnected). Cancel via the
// active reader (which holds the lock) so propagation reaches the
// source's cancel hook. Falls back to the source if no reader yet.
cleaned = true;
clearTimer();
const cancelTarget = reader ?? source;
cancelTarget.cancel(reason).catch(() => {
// Cancel may reject if already errored — best-effort.
});
},
});
}
Loading
Loading