Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 49 additions & 22 deletions backend/src/api/v1/completions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
type FailoverConfig,
} from "@/services/failover";
import {
acceptsEventStream,
extractUpstreamHeaders,
filterCandidates,
extractContentText,
Expand Down Expand Up @@ -569,6 +570,12 @@ export const completionsApi = new Elysia({
// Extract extra headers for passthrough
const extraHeaders = extractUpstreamHeaders(reqHeaders);

// Determine streaming mode. Body wins when explicit; otherwise honor
// the client's Accept: text/event-stream negotiation hint.
if (body.stream === undefined && acceptsEventStream(reqHeaders)) {
body.stream = true;
}

// Check ReqId for deduplication (if provided)
const isStream = body.stream === true;
const reqIdResult = await checkReqId(reqId, {
Expand Down Expand Up @@ -718,31 +725,51 @@ export const completionsApi = new Elysia({
extraHeaders,
);

// Return an async generator for streaming
// Return a native Response with proper SSE headers. Wrapping the
// pre-formatted SSE generator in a ReadableStream ensures Elysia
// skips its auto SSE-wrapping (which would double-prefix `data:`)
// and lets us set Content-Type: text/event-stream explicitly.
const streamResponse = result.response;
const streamSignal = request.signal;
return (async function* () {
try {
yield* processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
);
} catch (error) {
// Don't log error if it's due to client abort
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
// Note: HTTP status cannot be changed after streaming has started
// Use SSE format for error: data: {...}\n\n
yield `data: ${JSON.stringify({ error: { message: "Stream processing error", type: "server_error", code: "stream_error" } })}\n\n`;
const sseStream = new ReadableStream<Uint8Array>({
async start(controller) {
const encoder = new TextEncoder();
try {
for await (const chunk of processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
)) {
controller.enqueue(encoder.encode(chunk));
}
} catch (error) {
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ error: { message: "Stream processing error", type: "server_error", code: "stream_error" } })}\n\n`,
),
);
}
} finally {
controller.close();
}
}
})();
},
});

return new Response(sseStream, {
status: 200,
headers: {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
} else {
// Non-streaming request - return JSON response directly
const result = await executeWithFailover(
Expand Down
70 changes: 49 additions & 21 deletions backend/src/api/v1/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
type FailoverConfig,
} from "@/services/failover";
import {
acceptsEventStream,
extractUpstreamHeaders,
filterCandidates,
extractContentText,
Expand Down Expand Up @@ -570,6 +571,12 @@ export const messagesApi = new Elysia({
// Extract extra headers for passthrough
const extraHeaders = extractUpstreamHeaders(reqHeaders);

// Determine streaming mode. Body wins when explicit; otherwise honor
// the client's Accept: text/event-stream negotiation hint.
if (body.stream === undefined && acceptsEventStream(reqHeaders)) {
body.stream = true;
}

// Check ReqId for deduplication (if provided)
const isStream = body.stream === true;
const reqIdResult = await checkReqId(reqId, {
Expand Down Expand Up @@ -722,30 +729,51 @@ export const messagesApi = new Elysia({
extraHeaders,
);

// Return an async generator for streaming
// Return a native Response with proper SSE headers. Wrapping the
// pre-formatted SSE generator in a ReadableStream ensures Elysia
// skips its auto SSE-wrapping (which would double-prefix `data:`)
// and lets us set Content-Type: text/event-stream explicitly.
const streamResponse = result.response;
const streamSignal = request.signal;
return (async function* () {
try {
yield* processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
);
} catch (error) {
// Don't log error if it's due to client abort
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
// Note: HTTP status cannot be changed after streaming has started
yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`;
const sseStream = new ReadableStream<Uint8Array>({
async start(controller) {
const encoder = new TextEncoder();
try {
for await (const chunk of processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
)) {
controller.enqueue(encoder.encode(chunk));
}
} catch (error) {
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
controller.enqueue(
encoder.encode(
`event: error\ndata: ${JSON.stringify({ type: "error", error: { type: "server_error", message: "Stream processing error" } })}\n\n`,
),
);
}
} finally {
controller.close();
}
}
})();
},
});

return new Response(sseStream, {
status: 200,
headers: {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
} else {
// Non-streaming request - return JSON response directly
const result = await executeWithFailover(
Expand Down
69 changes: 48 additions & 21 deletions backend/src/api/v1/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
type FailoverConfig,
} from "@/services/failover";
import {
acceptsEventStream,
extractUpstreamHeaders,
filterCandidates,
extractContentText,
Expand Down Expand Up @@ -596,6 +597,11 @@ export const responsesApi = new Elysia({
const extraHeaders = extractUpstreamHeaders(reqHeaders);

// Check ReqId for deduplication (if provided)
// Determine streaming mode. Body wins when explicit; otherwise honor
// the client's Accept: text/event-stream negotiation hint.
if (body.stream === undefined && acceptsEventStream(reqHeaders)) {
body.stream = true;
}
const isStream = body.stream === true;

// Convert input to messages format for storage
Expand Down Expand Up @@ -765,30 +771,51 @@ export const responsesApi = new Elysia({
extraHeaders,
);

// Return an async generator for streaming
// Return a native Response with proper SSE headers. Wrapping the
// pre-formatted SSE generator in a ReadableStream ensures Elysia
// skips its auto SSE-wrapping (which would double-prefix `data:`)
// and lets us set Content-Type: text/event-stream explicitly.
const streamResponse = result.response;
const streamSignal = request.signal;
return (async function* () {
try {
yield* processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
);
} catch (error) {
// Don't log error if it's due to client abort
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
// Note: HTTP status cannot be changed after streaming has started
yield `event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`;
const sseStream = new ReadableStream<Uint8Array>({
async start(controller) {
const encoder = new TextEncoder();
try {
for await (const chunk of processStreamingResponse(
streamResponse,
completion,
bearer,
providerType,
apiKeyRecord ?? null,
begin,
streamSignal,
reqIdContext ?? undefined,
)) {
controller.enqueue(encoder.encode(chunk));
}
} catch (error) {
if (!streamSignal.aborted) {
logger.error("Stream processing error", error);
controller.enqueue(
encoder.encode(
`event: error\ndata: ${JSON.stringify({ type: "error", error: { code: "internal_error", message: "Stream processing error", param: null, help_url: null } })}\n\n`,
),
);
}
} finally {
controller.close();
}
}
})();
},
});

return new Response(sseStream, {
status: 200,
headers: {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
} else {
// Non-streaming request - return JSON response directly
const result = await executeWithFailover(
Expand Down
15 changes: 15 additions & 0 deletions backend/src/utils/api-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ export const EXCLUDED_HEADERS = new Set([
// Header Extraction
// =============================================================================

/**
* Whether the client is requesting an SSE stream via the Accept header.
* Returns true for any Accept value that lists `text/event-stream`, including
* weighted lists like `text/event-stream;q=1, application/json;q=0.5`.
*/
export function acceptsEventStream(headers: Headers): boolean {
const accept = headers.get("accept");
if (!accept) {
return false;
}
return accept
.split(",")
.some((part) => part.trim().toLowerCase().startsWith("text/event-stream"));
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
Comment thread
pescn marked this conversation as resolved.

/**
* Extract headers to be forwarded to upstream
* All headers are forwarded EXCEPT:
Expand Down
Loading