Skip to content

Commit 15300f1

Browse files
authored
Merge pull request #82 from EM-GeekLab/fix/sse-content-type
fix(api): restore SSE Content-Type for streaming responses
2 parents a394871 + 53d6fae commit 15300f1

5 files changed

Lines changed: 255 additions & 64 deletions

File tree

backend/src/api/v1/completions.ts

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
type FailoverConfig,
3030
} from "@/services/failover";
3131
import {
32+
acceptsEventStream,
3233
extractUpstreamHeaders,
3334
filterCandidates,
3435
extractContentText,
@@ -569,6 +570,12 @@ export const completionsApi = new Elysia({
569570
// Extract extra headers for passthrough
570571
const extraHeaders = extractUpstreamHeaders(reqHeaders);
571572

573+
// Determine streaming mode. Body wins when explicit; otherwise honor
574+
// the client's Accept: text/event-stream negotiation hint.
575+
if (body.stream === undefined && acceptsEventStream(reqHeaders)) {
576+
body.stream = true;
577+
}
578+
572579
// Check ReqId for deduplication (if provided)
573580
const isStream = body.stream === true;
574581
const reqIdResult = await checkReqId(reqId, {
@@ -718,31 +725,51 @@ export const completionsApi = new Elysia({
718725
extraHeaders,
719726
);
720727

721-
// Return an async generator for streaming
728+
// Return a native Response with proper SSE headers. Wrapping the
729+
// pre-formatted SSE generator in a ReadableStream ensures Elysia
730+
// skips its auto SSE-wrapping (which would double-prefix `data:`)
731+
// and lets us set Content-Type: text/event-stream explicitly.
722732
const streamResponse = result.response;
723733
const streamSignal = request.signal;
724-
return (async function* () {
725-
try {
726-
yield* processStreamingResponse(
727-
streamResponse,
728-
completion,
729-
bearer,
730-
providerType,
731-
apiKeyRecord ?? null,
732-
begin,
733-
streamSignal,
734-
reqIdContext ?? undefined,
735-
);
736-
} catch (error) {
737-
// Don't log error if it's due to client abort
738-
if (!streamSignal.aborted) {
739-
logger.error("Stream processing error", error);
740-
// Note: HTTP status cannot be changed after streaming has started
741-
// Use SSE format for error: data: {...}\n\n
742-
yield `data: ${JSON.stringify({ error: { message: "Stream processing error", type: "server_error", code: "stream_error" } })}\n\n`;
734+
const sseStream = new ReadableStream<Uint8Array>({
735+
async start(controller) {
736+
const encoder = new TextEncoder();
737+
try {
738+
for await (const chunk of processStreamingResponse(
739+
streamResponse,
740+
completion,
741+
bearer,
742+
providerType,
743+
apiKeyRecord ?? null,
744+
begin,
745+
streamSignal,
746+
reqIdContext ?? undefined,
747+
)) {
748+
controller.enqueue(encoder.encode(chunk));
749+
}
750+
} catch (error) {
751+
if (!streamSignal.aborted) {
752+
logger.error("Stream processing error", error);
753+
controller.enqueue(
754+
encoder.encode(
755+
`data: ${JSON.stringify({ error: { message: "Stream processing error", type: "server_error", code: "stream_error" } })}\n\n`,
756+
),
757+
);
758+
}
759+
} finally {
760+
controller.close();
743761
}
744-
}
745-
})();
762+
},
763+
});
764+
765+
return new Response(sseStream, {
766+
status: 200,
767+
headers: {
768+
"Content-Type": "text/event-stream; charset=utf-8",
769+
"Cache-Control": "no-cache",
770+
Connection: "keep-alive",
771+
},
772+
});
746773
} else {
747774
// Non-streaming request - return JSON response directly
748775
const result = await executeWithFailover(

backend/src/api/v1/messages.ts

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
type FailoverConfig,
2626
} from "@/services/failover";
2727
import {
28+
acceptsEventStream,
2829
extractUpstreamHeaders,
2930
filterCandidates,
3031
extractContentText,
@@ -570,6 +571,12 @@ export const messagesApi = new Elysia({
570571
// Extract extra headers for passthrough
571572
const extraHeaders = extractUpstreamHeaders(reqHeaders);
572573

574+
// Determine streaming mode. Body wins when explicit; otherwise honor
575+
// the client's Accept: text/event-stream negotiation hint.
576+
if (body.stream === undefined && acceptsEventStream(reqHeaders)) {
577+
body.stream = true;
578+
}
579+
573580
// Check ReqId for deduplication (if provided)
574581
const isStream = body.stream === true;
575582
const reqIdResult = await checkReqId(reqId, {
@@ -722,30 +729,51 @@ export const messagesApi = new Elysia({
722729
extraHeaders,
723730
);
724731

725-
// Return an async generator for streaming
732+
// Return a native Response with proper SSE headers. Wrapping the
733+
// pre-formatted SSE generator in a ReadableStream ensures Elysia
734+
// skips its auto SSE-wrapping (which would double-prefix `data:`)
735+
// and lets us set Content-Type: text/event-stream explicitly.
726736
const streamResponse = result.response;
727737
const streamSignal = request.signal;
728-
return (async function* () {
729-
try {
730-
yield* processStreamingResponse(
731-
streamResponse,
732-
completion,
733-
bearer,
734-
providerType,
735-
apiKeyRecord ?? null,
736-
begin,
737-
streamSignal,
738-
reqIdContext ?? undefined,
739-
);
740-
} catch (error) {
741-
// Don't log error if it's due to client abort
742-
if (!streamSignal.aborted) {
743-
logger.error("Stream processing error", error);
744-
// Note: HTTP status cannot be changed after streaming has started
745-
yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`;
738+
const sseStream = new ReadableStream<Uint8Array>({
739+
async start(controller) {
740+
const encoder = new TextEncoder();
741+
try {
742+
for await (const chunk of processStreamingResponse(
743+
streamResponse,
744+
completion,
745+
bearer,
746+
providerType,
747+
apiKeyRecord ?? null,
748+
begin,
749+
streamSignal,
750+
reqIdContext ?? undefined,
751+
)) {
752+
controller.enqueue(encoder.encode(chunk));
753+
}
754+
} catch (error) {
755+
if (!streamSignal.aborted) {
756+
logger.error("Stream processing error", error);
757+
controller.enqueue(
758+
encoder.encode(
759+
`event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`,
760+
),
761+
);
762+
}
763+
} finally {
764+
controller.close();
746765
}
747-
}
748-
})();
766+
},
767+
});
768+
769+
return new Response(sseStream, {
770+
status: 200,
771+
headers: {
772+
"Content-Type": "text/event-stream; charset=utf-8",
773+
"Cache-Control": "no-cache",
774+
Connection: "keep-alive",
775+
},
776+
});
749777
} else {
750778
// Non-streaming request - return JSON response directly
751779
const result = await executeWithFailover(

backend/src/api/v1/responses.ts

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
type FailoverConfig,
2525
} from "@/services/failover";
2626
import {
27+
acceptsEventStream,
2728
extractUpstreamHeaders,
2829
filterCandidates,
2930
extractContentText,
@@ -596,6 +597,11 @@ export const responsesApi = new Elysia({
596597
const extraHeaders = extractUpstreamHeaders(reqHeaders);
597598

598599
// Check ReqId for deduplication (if provided)
600+
// Determine streaming mode. Body wins when explicit; otherwise honor
601+
// the client's Accept: text/event-stream negotiation hint.
602+
if (body.stream === undefined && acceptsEventStream(reqHeaders)) {
603+
body.stream = true;
604+
}
599605
const isStream = body.stream === true;
600606

601607
// Convert input to messages format for storage
@@ -765,30 +771,51 @@ export const responsesApi = new Elysia({
765771
extraHeaders,
766772
);
767773

768-
// Return an async generator for streaming
774+
// Return a native Response with proper SSE headers. Wrapping the
775+
// pre-formatted SSE generator in a ReadableStream ensures Elysia
776+
// skips its auto SSE-wrapping (which would double-prefix `data:`)
777+
// and lets us set Content-Type: text/event-stream explicitly.
769778
const streamResponse = result.response;
770779
const streamSignal = request.signal;
771-
return (async function* () {
772-
try {
773-
yield* processStreamingResponse(
774-
streamResponse,
775-
completion,
776-
bearer,
777-
providerType,
778-
apiKeyRecord ?? null,
779-
begin,
780-
streamSignal,
781-
reqIdContext ?? undefined,
782-
);
783-
} catch (error) {
784-
// Don't log error if it's due to client abort
785-
if (!streamSignal.aborted) {
786-
logger.error("Stream processing error", error);
787-
// Note: HTTP status cannot be changed after streaming has started
788-
yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`;
780+
const sseStream = new ReadableStream<Uint8Array>({
781+
async start(controller) {
782+
const encoder = new TextEncoder();
783+
try {
784+
for await (const chunk of processStreamingResponse(
785+
streamResponse,
786+
completion,
787+
bearer,
788+
providerType,
789+
apiKeyRecord ?? null,
790+
begin,
791+
streamSignal,
792+
reqIdContext ?? undefined,
793+
)) {
794+
controller.enqueue(encoder.encode(chunk));
795+
}
796+
} catch (error) {
797+
if (!streamSignal.aborted) {
798+
logger.error("Stream processing error", error);
799+
controller.enqueue(
800+
encoder.encode(
801+
`event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`,
802+
),
803+
);
804+
}
805+
} finally {
806+
controller.close();
789807
}
790-
}
791-
})();
808+
},
809+
});
810+
811+
return new Response(sseStream, {
812+
status: 200,
813+
headers: {
814+
"Content-Type": "text/event-stream; charset=utf-8",
815+
"Cache-Control": "no-cache",
816+
Connection: "keep-alive",
817+
},
818+
});
792819
} else {
793820
// Non-streaming request - return JSON response directly
794821
const result = await executeWithFailover(
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Unit tests for api-helpers utilities
3+
*/
4+
5+
import { describe, expect, test } from "bun:test";
6+
import { acceptsEventStream } from "./api-helpers";
7+
8+
const h = (value?: string) => new Headers(value ? { accept: value } : {});
9+
10+
describe("acceptsEventStream", () => {
11+
test("plain text/event-stream", () => {
12+
expect(acceptsEventStream(h("text/event-stream"))).toBe(true);
13+
});
14+
15+
test("missing Accept header", () => {
16+
expect(acceptsEventStream(h())).toBe(false);
17+
});
18+
19+
test("unrelated media type", () => {
20+
expect(acceptsEventStream(h("application/json"))).toBe(false);
21+
});
22+
23+
test("wildcard does not opt in", () => {
24+
expect(acceptsEventStream(h("*/*"))).toBe(false);
25+
});
26+
27+
test("case-insensitive media type", () => {
28+
expect(acceptsEventStream(h("TEXT/EVENT-STREAM"))).toBe(true);
29+
});
30+
31+
test("tolerates internal whitespace", () => {
32+
expect(acceptsEventStream(h("text/event-stream ; q = 0.5 "))).toBe(true);
33+
});
34+
35+
test("weighted list with positive q is accepted", () => {
36+
expect(
37+
acceptsEventStream(h("application/json, text/event-stream;q=0.5")),
38+
).toBe(true);
39+
});
40+
41+
test("explicit q=0 rejects SSE (RFC 7231 §5.3.1)", () => {
42+
expect(acceptsEventStream(h("text/event-stream;q=0"))).toBe(false);
43+
});
44+
45+
test("q=0.0 also rejects SSE", () => {
46+
expect(acceptsEventStream(h("text/event-stream;q=0.0"))).toBe(false);
47+
});
48+
49+
test("q = 0 with whitespace around = rejects SSE", () => {
50+
expect(acceptsEventStream(h("text/event-stream ; q = 0"))).toBe(false);
51+
});
52+
53+
test("q = 0.5 with whitespace around = is accepted", () => {
54+
expect(acceptsEventStream(h("text/event-stream ; q = 0.5"))).toBe(true);
55+
});
56+
57+
test("malformed empty q value is treated as not acceptable", () => {
58+
expect(acceptsEventStream(h("text/event-stream;q="))).toBe(false);
59+
});
60+
61+
test("q=0 in a weighted list rejects SSE", () => {
62+
expect(
63+
acceptsEventStream(h("text/event-stream;q=0, application/json")),
64+
).toBe(false);
65+
});
66+
67+
test("structured-suffix match is rejected", () => {
68+
expect(acceptsEventStream(h("text/event-stream+json"))).toBe(false);
69+
});
70+
71+
test("malformed q value is treated as not acceptable", () => {
72+
expect(acceptsEventStream(h("text/event-stream;q=NaN"))).toBe(false);
73+
});
74+
});

backend/src/utils/api-helpers.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,41 @@ export const EXCLUDED_HEADERS = new Set([
4646
// Header Extraction
4747
// =============================================================================
4848

49+
/**
50+
* Whether the client is requesting an SSE stream via the Accept header.
51+
* Matches `text/event-stream` exactly (no structured-suffix matches like
52+
* `text/event-stream+json`) and respects the RFC 7231 §5.3.1 quality
53+
* factor — `q=0` means "do not accept" and is filtered out.
54+
*/
55+
export function acceptsEventStream(headers: Headers): boolean {
56+
const accept = headers.get("accept");
57+
if (!accept) {
58+
return false;
59+
}
60+
return accept.split(",").some((part) => {
61+
const [mediaType, ...params] = part
62+
.split(";")
63+
.map((segment) => segment.trim().toLowerCase());
64+
if (mediaType !== "text/event-stream") {
65+
return false;
66+
}
67+
for (const param of params) {
68+
const eq = param.indexOf("=");
69+
if (eq === -1) {
70+
continue;
71+
}
72+
const name = param.slice(0, eq).trim();
73+
if (name !== "q") {
74+
continue;
75+
}
76+
const value = param.slice(eq + 1).trim();
77+
const q = Number(value);
78+
return Number.isFinite(q) && q > 0;
79+
}
80+
return true;
81+
});
82+
}
83+
4984
/**
5085
* Extract headers to be forwarded to upstream
5186
* All headers are forwarded EXCEPT:

0 commit comments

Comments
 (0)