Skip to content

Commit 5277cef

Browse files
committed
feat: enhance Codex response handling with standardized event stream and logging
1 parent 5dad77d commit 5277cef

4 files changed

Lines changed: 162 additions & 31 deletions

File tree

src/routes/provider/responses/handler.ts

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ export async function handleProviderResponsesForProvider(
3838
},
3939
): Promise<Response> {
4040
const { payload, provider } = options
41+
debugJson(logger, "Responses request payload:", {
42+
payload,
43+
provider,
44+
})
4145
const providerConfig = await resolveProviderConfig(provider)
4246
if (providerConfig?.type !== "openai-responses") {
4347
return c.json(
@@ -51,15 +55,19 @@ export async function handleProviderResponsesForProvider(
5155
)
5256
}
5357

54-
const selectedModel =
58+
const model =
5559
providerConfig.name === "codex" ?
5660
getCodexModels().data.find((model) => model.id === payload.model)
5761
: undefined
5862

59-
applyResponsesApiContextManagement(
60-
payload,
61-
selectedModel?.capabilities.limits.max_prompt_tokens,
62-
)
63+
const maxPromptTokens = model?.capabilities.limits.max_prompt_tokens ?? 0
64+
applyResponsesApiContextManagement(payload, maxPromptTokens)
65+
66+
const contextManagement = payload.context_management
67+
debugJson(logger, "Translated Responses request payload:", {
68+
contextManagement,
69+
provider,
70+
})
6371

6472
const upstreamResponse =
6573
providerConfig.name === "codex" ?
@@ -79,6 +87,31 @@ export async function handleProviderResponsesForProvider(
7987

8088
const recordUsage = createProviderResponsesUsageRecorder(payload, provider)
8189

90+
if (providerConfig.name === "codex" && payload.stream) {
91+
let usage: UsageTokens = {}
92+
93+
return createProviderProxyResponse(
94+
upstreamResponse,
95+
createStandardizedCodexResponsesEventStream(
96+
getResponsesEvents(upstreamResponse),
97+
{
98+
onClose: () => {
99+
recordUsage(usage)
100+
},
101+
onChunk: (chunk) => {
102+
debugJson(logger, "Responses stream chunk:", chunk)
103+
},
104+
onEvent: (event) => {
105+
const nextUsage = getResponsesStreamEventUsage(event)
106+
if (nextUsage) {
107+
usage = nextUsage
108+
}
109+
},
110+
},
111+
),
112+
)
113+
}
114+
82115
if (payload.stream) {
83116
void recordProviderResponsesStreamUsage(upstreamResponse.clone(), {
84117
normalizeCodex: providerConfig.name === "codex",
@@ -97,15 +130,6 @@ export async function handleProviderResponsesForProvider(
97130
recordUsage(normalizeResponsesUsage(responseBody.usage))
98131
}
99132

100-
if (providerConfig.name === "codex" && payload.stream) {
101-
return createProviderProxyResponse(
102-
upstreamResponse,
103-
createStandardizedCodexResponsesEventStream(
104-
getResponsesEvents(upstreamResponse),
105-
),
106-
)
107-
}
108-
109133
return createProviderProxyResponse(upstreamResponse)
110134
}
111135

@@ -153,12 +177,11 @@ const recordProviderResponsesStreamUsage = async (
153177
normalizeCodex: options.normalizeCodex,
154178
provider: options.provider,
155179
})
156-
if (
157-
parsed?.type === "response.completed"
158-
|| parsed?.type === "response.failed"
159-
|| parsed?.type === "response.incomplete"
160-
) {
161-
usage = normalizeResponsesUsage(parsed.response.usage)
180+
if (parsed) {
181+
const nextUsage = getResponsesStreamEventUsage(parsed)
182+
if (nextUsage) {
183+
usage = nextUsage
184+
}
162185
}
163186
}
164187
} finally {
@@ -188,6 +211,20 @@ const parseProviderResponsesStreamEvent = (
188211
}
189212
}
190213

214+
const getResponsesStreamEventUsage = (
215+
event: ResponseStreamEvent,
216+
): UsageTokens | null => {
217+
if (
218+
event.type === "response.completed"
219+
|| event.type === "response.failed"
220+
|| event.type === "response.incomplete"
221+
) {
222+
return normalizeResponsesUsage(event.response.usage)
223+
}
224+
225+
return null
226+
}
227+
191228
const getResponsesEvents = (response: unknown) => {
192229
return events(response as Parameters<typeof events>[0])
193230
}

src/routes/responses/handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ export const responsesHandlerDependencies = {
4040

4141
export const handleResponses = async (c: Context) => {
4242
const payload = await c.req.json<ResponsesPayload>()
43-
debugJson(logger, "Responses request payload:", payload)
4443

4544
const providerModelAlias = parseProviderModelAlias(payload.model)
4645
if (providerModelAlias) {
@@ -51,6 +50,7 @@ export const handleResponses = async (c: Context) => {
5150
})
5251
}
5352

53+
debugJson(logger, "Responses request payload:", payload)
5454
await responsesHandlerDependencies.checkRateLimit(state)
5555

5656
// not support subagent marker for now , set sessionId = getUUID(requestId)

src/services/codex/create-responses.ts

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,17 @@ type ServerSentEventChunk = {
3232

3333
type CodexResponsesWebSocketChunk = ServerSentEventChunk
3434

35+
type StandardizedCodexResponsesChunk = {
36+
chunk: ServerSentEventChunk
37+
event: ResponseStreamEvent | null
38+
}
39+
40+
interface CodexResponsesStandardStreamOptions {
41+
onClose?: () => void | Promise<void>
42+
onChunk?: (chunk: ServerSentEventChunk) => void | Promise<void>
43+
onEvent?: (event: ResponseStreamEvent) => void | Promise<void>
44+
}
45+
3546
type CodexResponsesWebSocketRequest =
3647
PooledWebSocketRequest<CodexResponsesWebSocketPayload>
3748

@@ -415,9 +426,10 @@ const createCodexResponsesWebSocketProxyResponse = (
415426

416427
export const createStandardizedCodexResponsesEventStream = (
417428
source: AsyncIterable<ServerSentEventChunk>,
429+
options: CodexResponsesStandardStreamOptions = {},
418430
): ReadableStream<Uint8Array> => {
419431
return createServerSentEventStream(
420-
normalizeCodexResponsesStandardStream(source),
432+
normalizeCodexResponsesStandardStream(source, options),
421433
)
422434
}
423435

@@ -431,9 +443,20 @@ const normalizeCodexResponsesWebSocketStream = async function* (
431443

432444
const normalizeCodexResponsesStandardStream = async function* (
433445
stream: AsyncIterable<ServerSentEventChunk>,
446+
options: CodexResponsesStandardStreamOptions,
434447
): AsyncIterable<ServerSentEventChunk> {
435-
for await (const chunk of stream) {
436-
yield normalizeCodexResponsesStandardChunk(chunk)
448+
try {
449+
for await (const chunk of stream) {
450+
const normalized = normalizeCodexResponsesStandardChunk(chunk)
451+
await options.onChunk?.(normalized.chunk)
452+
if (normalized.event) {
453+
await options.onEvent?.(normalized.event)
454+
}
455+
456+
yield normalized.chunk
457+
}
458+
} finally {
459+
await options.onClose?.()
437460
}
438461
}
439462

@@ -465,26 +488,38 @@ const normalizeCodexResponsesProxyChunk = (
465488

466489
const normalizeCodexResponsesStandardChunk = (
467490
chunk: ServerSentEventChunk,
468-
): ServerSentEventChunk => {
491+
): StandardizedCodexResponsesChunk => {
469492
if (!chunk.data || chunk.data === "[DONE]") {
470-
return chunk
493+
return {
494+
chunk,
495+
event: null,
496+
}
471497
}
472498

473499
try {
474500
const parsed = JSON.parse(chunk.data) as Record<string, unknown>
475501
logCodexRateLimitsEvent(parsed)
476502
const normalized = normalizeCodexResponsesEvent(parsed)
477503
if (!normalized) {
478-
return chunk
504+
return {
505+
chunk,
506+
event: null,
507+
}
479508
}
480509

481510
return {
482-
...chunk,
483-
data: JSON.stringify(normalized),
484-
event: normalized.type,
511+
chunk: {
512+
...chunk,
513+
data: JSON.stringify(normalized),
514+
event: normalized.type,
515+
},
516+
event: normalized,
485517
}
486518
} catch {
487-
return chunk
519+
return {
520+
chunk,
521+
event: null,
522+
}
488523
}
489524
}
490525

tests/codex-create-responses.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,65 @@ describe("codex api helpers", () => {
128128
expect(body).toContain("data: [DONE]")
129129
})
130130

131+
test("reports normalized terminal events while standardizing Codex SSE", async () => {
132+
const seenChunkEvents: Array<string | undefined> = []
133+
const seenEvents: Array<string> = []
134+
let streamClosed = false
135+
const stream = createStandardizedCodexResponsesEventStream(
136+
streamChunks([
137+
{
138+
data: JSON.stringify({
139+
response: {
140+
created_at: 0,
141+
error: null,
142+
id: "resp_123",
143+
incomplete_details: null,
144+
instructions: null,
145+
metadata: null,
146+
model: "gpt-5.4",
147+
object: "response",
148+
output: [],
149+
output_text: "hello",
150+
parallel_tool_calls: true,
151+
status: "completed",
152+
temperature: null,
153+
tool_choice: "auto",
154+
tools: [],
155+
top_p: null,
156+
usage: {
157+
input_tokens: 5,
158+
output_tokens: 2,
159+
total_tokens: 7,
160+
},
161+
},
162+
sequence_number: 1,
163+
type: "response.done",
164+
}),
165+
},
166+
{
167+
data: "[DONE]",
168+
},
169+
]),
170+
{
171+
onClose: () => {
172+
streamClosed = true
173+
},
174+
onChunk: (chunk) => {
175+
seenChunkEvents.push(chunk.event)
176+
},
177+
onEvent: (event) => {
178+
seenEvents.push(event.type)
179+
},
180+
},
181+
)
182+
183+
await new Response(stream).text()
184+
185+
expect(seenChunkEvents).toEqual(["response.completed", undefined])
186+
expect(seenEvents).toEqual(["response.completed"])
187+
expect(streamClosed).toBe(true)
188+
})
189+
131190
test("builds the ChatGPT Codex websocket responses path", () => {
132191
expect(buildCodexResponsesWebSocketUrl()).toBe(
133192
"wss://chatgpt.com/backend-api/codex/responses",

0 commit comments

Comments
 (0)