fix: aggregate SSE upstream into chat.completion JSON for non-stream clients#4557
fix: aggregate SSE upstream into chat.completion JSON for non-stream clients#4557lingozhi wants to merge 1 commit intoQuantumNous:mainfrom
Conversation
…clients (Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>)
WalkthroughThe PR adds a new code path Changes
Sequence DiagramsequenceDiagram
participant Client
participant Handler as chat_completions Handler
participant UpstreamAPI as Upstream /v1/responses
participant Parser as OaiResponsesSSEToChatJSON
Client->>Handler: Chat completion request (streaming=false)
Handler->>UpstreamAPI: Forward request
UpstreamAPI-->>Handler: SSE event stream
Handler->>Parser: Detect upstream is SSE, client wants JSON
loop Process SSE Events
UpstreamAPI-->>Parser: response.output_text delta
Parser->>Parser: Accumulate output_text
UpstreamAPI-->>Parser: response.tool_calls (item/argument deltas)
Parser->>Parser: Reconstruct function calls<br/>(ordering, args, names)
UpstreamAPI-->>Parser: response.completed (usage, timestamps)
Parser->>Parser: Extract model, usage fields,<br/>finishReason
end
alt Error Event
UpstreamAPI-->>Parser: response.error/response.failed
Parser->>Handler: Stop stream, return API error
else Success
Parser->>Handler: Assembled chat.completions JSON
Handler-->>Client: Complete JSON response
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@relay/channel/openai/chat_via_responses.go`:
- Around line 598-602: The malformed SSE event handling currently logs the error
then continues; change it so a failed unmarshal of data into streamResp (via
common.UnmarshalJsonStr) aborts the whole aggregation rather than skipping:
after logger.LogError(c, "..."+err.Error()) propagate/return an error to the
caller or abort the request flow exactly like the stream handler does (e.g.,
call the same abort/cleanup routine or return a non-200 error), ensuring the
incomplete chat completion is not emitted.
- Around line 621-643: When handling
"response.output_item.added"/"response.output_item.done" in
chat_via_responses.go, avoid double-counting the function name and ensure args
from the item payload are used as a fallback when upstream omits usage: after
deriving itemID and callID and calling registerToolCall(callID), only set
toolCallNameByID[callID] and append to usageText if the name isn't already
recorded for that callID (prevent duplicate append on both "added" and "done"),
and if toolCallArgsByID[callID] is empty but streamResp.Item.ArgumentsString()
returns non-empty, populate toolCallArgsByID[callID] and append the args to
usageText as the fallback; use the same callID lookup (and
toolCallCanonicalIDByItemID mapping) to ensure consistent canonicalization.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 65bc683b-325c-43b1-b4d1-b50dbfd83198
📒 Files selected for processing (2)
relay/channel/openai/chat_via_responses.gorelay/chat_completions_via_responses.go
| var streamResp dto.ResponsesStreamResponse | ||
| if err := common.UnmarshalJsonStr(data, &streamResp); err != nil { | ||
| logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error()) | ||
| return | ||
| } |
There was a problem hiding this comment.
Stop aggregation on malformed SSE events.
Right now a bad event is only logged and skipped, so this path can still return 200 with a partial chat.completion assembled from incomplete upstream data. The stream handler above aborts on the same condition; this path should do the same.
Proposed fix
var streamResp dto.ResponsesStreamResponse
if err := common.UnmarshalJsonStr(data, &streamResp); err != nil {
logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error())
- return
+ streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponseBody, http.StatusInternalServerError)
+ sr.Stop(streamErr)
+ return
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var streamResp dto.ResponsesStreamResponse | |
| if err := common.UnmarshalJsonStr(data, &streamResp); err != nil { | |
| logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error()) | |
| return | |
| } | |
| var streamResp dto.ResponsesStreamResponse | |
| if err := common.UnmarshalJsonStr(data, &streamResp); err != nil { | |
| logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error()) | |
| streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponseBody, http.StatusInternalServerError) | |
| sr.Stop(streamErr) | |
| return | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@relay/channel/openai/chat_via_responses.go` around lines 598 - 602, The
malformed SSE event handling currently logs the error then continues; change it
so a failed unmarshal of data into streamResp (via common.UnmarshalJsonStr)
aborts the whole aggregation rather than skipping: after logger.LogError(c,
"..."+err.Error()) propagate/return an error to the caller or abort the request
flow exactly like the stream handler does (e.g., call the same abort/cleanup
routine or return a non-200 error), ensuring the incomplete chat completion is
not emitted.
| case "response.output_item.added", "response.output_item.done": | ||
| if streamResp.Item == nil || streamResp.Item.Type != "function_call" { | ||
| break | ||
| } | ||
| itemID := strings.TrimSpace(streamResp.Item.ID) | ||
| callID := strings.TrimSpace(streamResp.Item.CallId) | ||
| if callID == "" { | ||
| callID = itemID | ||
| } | ||
| if itemID != "" && callID != "" { | ||
| toolCallCanonicalIDByItemID[itemID] = callID | ||
| } | ||
| if callID == "" { | ||
| break | ||
| } | ||
| registerToolCall(callID) | ||
| if name := strings.TrimSpace(streamResp.Item.Name); name != "" { | ||
| toolCallNameByID[callID] = name | ||
| usageText.WriteString(name) | ||
| } | ||
| if args := streamResp.Item.ArgumentsString(); args != "" { | ||
| toolCallArgsByID[callID] = args | ||
| } |
There was a problem hiding this comment.
Fallback usage accounting for tool calls is inconsistent here.
response.output_item.added and response.output_item.done can both carry the same function name, so usageText may count it twice. At the same time, arguments that only appear on the item payload are never appended at all. When upstream omits usage, the estimated Usage can end up inflated or undercounted.
Proposed fix
toolCallIndexByID := make(map[string]int)
toolCallNameByID := make(map[string]string)
toolCallArgsByID := make(map[string]string)
+ toolCallNameCounted := make(map[string]bool)
toolCallOrder := make([]string, 0)
toolCallCanonicalIDByItemID := make(map[string]string)
@@
registerToolCall(callID)
if name := strings.TrimSpace(streamResp.Item.Name); name != "" {
toolCallNameByID[callID] = name
- usageText.WriteString(name)
+ if !toolCallNameCounted[callID] {
+ usageText.WriteString(name)
+ toolCallNameCounted[callID] = true
+ }
}
if args := streamResp.Item.ArgumentsString(); args != "" {
+ usageText.WriteString(stringDeltaFromPrefix(toolCallArgsByID[callID], args))
toolCallArgsByID[callID] = args
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@relay/channel/openai/chat_via_responses.go` around lines 621 - 643, When
handling "response.output_item.added"/"response.output_item.done" in
chat_via_responses.go, avoid double-counting the function name and ensure args
from the item payload are used as a fallback when upstream omits usage: after
deriving itemID and callID and calling registerToolCall(callID), only set
toolCallNameByID[callID] and append to usageText if the name isn't already
recorded for that callID (prevent duplicate append on both "added" and "done"),
and if toolCallArgsByID[callID] is empty but streamResp.Item.ArgumentsString()
returns non-empty, populate toolCallArgsByID[callID] and append the args to
usageText as the fallback; use the same callID lookup (and
toolCallCanonicalIDByItemID mapping) to ensure consistent canonicalization.
Problem
When a client requests a non-streaming
/v1/chat/completionsfor a reasoning model (gpt-5.x, o1, etc.),relay/chat_completions_via_responses.goforwards the request to upstream/v1/responses. If upstream returns an SSE stream (which is the common case for these models), the existing dispatch logic at line 142 conflates upstream's SSE format with the client's stream preference:The result:
IsStreambecomestrue, the request goes toOaiResponsesToChatStreamHandler, and raw SSE chunks are forwarded to the client even though the client wanted a single JSON response. The client ends up with a malformed payload like:choices: []is empty — the actual content never reaches the client.Reproduction
/v1/responses-style upstream (e.g. xixiapi.cc, longxiadev, luciferai) for a reasoning model likegpt-5.5.stream: false:Fix
Preserve the client's original
IsStreamand route the new "client-JSON + upstream-SSE" case through a dedicated aggregator:relay/chat_completions_via_responses.go: split dispatch into three cases — client wants stream, client wants JSON but upstream is SSE, both non-stream.relay/channel/openai/chat_via_responses.go: newOaiResponsesSSEToChatJSONreads the upstream SSE viahelper.StreamScannerHandler, accumulatesoutput_text.delta, function-call args, and the finalresponse.completed.usage, then emits a singledto.OpenAITextResponseto the client. Claude / Gemini relay format passthrough is preserved.The two existing handlers (
OaiResponsesToChatHandler,OaiResponsesToChatStreamHandler) are unchanged.Verification
go build ./relay/...andgo vet ./relay/...pass.gpt-5.5returns{"choices":[{"message":{"content":"OK"}}], "usage":{...}}end-to-end.🤖 Generated with Claude Code
Summary by CodeRabbit
Bug Fixes
Improvements