Skip to content

fix: aggregate SSE upstream into chat.completion JSON for non-stream clients#4557

Open
lingozhi wants to merge 1 commit intoQuantumNous:mainfrom
lingozhi:fix/responses-sse-aggregation
Open

fix: aggregate SSE upstream into chat.completion JSON for non-stream clients#4557
lingozhi wants to merge 1 commit intoQuantumNous:mainfrom
lingozhi:fix/responses-sse-aggregation

Conversation

@lingozhi
Copy link
Copy Markdown
Contributor

@lingozhi lingozhi commented Apr 30, 2026

Problem

When a client requests a non-streaming /v1/chat/completions for a reasoning model (gpt-5.x, o1, etc.), relay/chat_completions_via_responses.go forwards the request to upstream /v1/responses. If upstream returns an SSE stream (which is the common case for these models), the existing dispatch logic at line 142 conflates upstream's SSE format with the client's stream preference:

info.IsStream = info.IsStream || strings.HasPrefix(httpResp.Header.Get("Content-Type"), "text/event-stream")

The result: IsStream becomes true, the request goes to OaiResponsesToChatStreamHandler, and raw SSE chunks are forwarded to the client even though the client wanted a single JSON response. The client ends up with a malformed payload like:

data: {"choices":[],"usage":{"prompt_tokens":12,"completion_tokens":0,...}}
data: [DONE]

choices: [] is empty — the actual content never reaches the client.

Reproduction

  1. Configure a channel that proxies /v1/responses-style upstream (e.g. xixiapi.cc, longxiadev, luciferai) for a reasoning model like gpt-5.5.
  2. Call new-api with stream: false:
curl https://your-newapi/v1/chat/completions \
  -H "Authorization: Bearer <token>" \
  -d '{"model":"gpt-5.5","messages":[{"role":"user","content":"reply OK"}],"max_tokens":20}'
  1. Direct curl to the same upstream returns proper JSON; via new-api you get the SSE stub above.

Fix

Preserve the client's original IsStream and route the new "client-JSON + upstream-SSE" case through a dedicated aggregator:

  • relay/chat_completions_via_responses.go: split dispatch into three cases — client wants stream, client wants JSON but upstream is SSE, both non-stream.
  • relay/channel/openai/chat_via_responses.go: new OaiResponsesSSEToChatJSON reads the upstream SSE via helper.StreamScannerHandler, accumulates output_text.delta, function-call args, and the final response.completed.usage, then emits a single dto.OpenAITextResponse to the client. Claude / Gemini relay format passthrough is preserved.

The two existing handlers (OaiResponsesToChatHandler, OaiResponsesToChatStreamHandler) are unchanged.

Verification

  • go build ./relay/... and go vet ./relay/... pass.
  • Direct upstream call still returns the same JSON (no upstream change).
  • After the fix, non-stream gpt-5.5 returns {"choices":[{"message":{"content":"OK"}}], "usage":{...}} end-to-end.

🤖 Generated with Claude Code

Summary by CodeRabbit

  • Bug Fixes

    • Improved handling of chat completion requests when upstream responses arrive in different formats than expected, ensuring proper conversion and delivery to clients.
    • Enhanced error handling for invalid upstream responses with appropriate API error responses instead of raw data passthrough.
  • Improvements

    • Refined response stream decision logic to better match client expectations with upstream capabilities.
    • Enhanced token usage tracking, including support for reasoning tokens when available.

…clients (Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>)
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 30, 2026

Walkthrough

The PR adds a new code path OaiResponsesSSEToChatJSON to handle non-streaming chat requests where the upstream /v1/responses API emits Server-Sent Events (SSE). It also modifies stream decision logic to separately track client-requested streaming versus upstream SSE format, enabling format mismatch handling.

Changes

Cohort / File(s) Summary
SSE-to-Chat JSON Conversion
relay/channel/openai/chat_via_responses.go
Added OaiResponsesSSEToChatJSON function (+231 lines) to parse SSE events from upstream responses, accumulate output text into assistant messages, reconstruct function tool calls with ordering and argument aggregation, extract model/timestamps, populate usage fields including prompt token details and reasoning tokens, and handle error events with OpenAI-style error responses.
Stream Decision Logic
relay/chat_completions_via_responses.go
Modified stream routing to track clientWantsStream and upstreamIsSSE separately (+18/-2 lines); client's streaming preference determines handler selection, with fallback that converts SSE streams to complete JSON when client expects non-stream format.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Handler as chat_completions Handler
    participant UpstreamAPI as Upstream /v1/responses
    participant Parser as OaiResponsesSSEToChatJSON

    Client->>Handler: Chat completion request (streaming=false)
    Handler->>UpstreamAPI: Forward request
    UpstreamAPI-->>Handler: SSE event stream
    
    Handler->>Parser: Detect upstream is SSE, client wants JSON
    
    loop Process SSE Events
        UpstreamAPI-->>Parser: response.output_text delta
        Parser->>Parser: Accumulate output_text
        
        UpstreamAPI-->>Parser: response.tool_calls (item/argument deltas)
        Parser->>Parser: Reconstruct function calls<br/>(ordering, args, names)
        
        UpstreamAPI-->>Parser: response.completed (usage, timestamps)
        Parser->>Parser: Extract model, usage fields,<br/>finishReason
    end
    
    alt Error Event
        UpstreamAPI-->>Parser: response.error/response.failed
        Parser->>Handler: Stop stream, return API error
    else Success
        Parser->>Handler: Assembled chat.completions JSON
        Handler-->>Client: Complete JSON response
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • creamlike1024

Poem

🐰 SSE streams now dance and sway,
Converting responses the rabbit way,
Tool calls reconstructed with care so fine,
JSON and rabbits align, align!
From delta to message, a streaming delight,
The chat flows complete through the code tonight! 🌙

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely describes the main change: handling SSE responses from upstream and converting them to JSON for non-streaming clients.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@relay/channel/openai/chat_via_responses.go`:
- Around line 598-602: The malformed SSE event handling currently logs the error
then continues; change it so a failed unmarshal of data into streamResp (via
common.UnmarshalJsonStr) aborts the whole aggregation rather than skipping:
after logger.LogError(c, "..."+err.Error()) propagate/return an error to the
caller or abort the request flow exactly like the stream handler does (e.g.,
call the same abort/cleanup routine or return a non-200 error), ensuring the
incomplete chat completion is not emitted.
- Around line 621-643: When handling
"response.output_item.added"/"response.output_item.done" in
chat_via_responses.go, avoid double-counting the function name and ensure args
from the item payload are used as a fallback when upstream omits usage: after
deriving itemID and callID and calling registerToolCall(callID), only set
toolCallNameByID[callID] and append to usageText if the name isn't already
recorded for that callID (prevent duplicate append on both "added" and "done"),
and if toolCallArgsByID[callID] is empty but streamResp.Item.ArgumentsString()
returns non-empty, populate toolCallArgsByID[callID] and append the args to
usageText as the fallback; use the same callID lookup (and
toolCallCanonicalIDByItemID mapping) to ensure consistent canonicalization.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 65bc683b-325c-43b1-b4d1-b50dbfd83198

📥 Commits

Reviewing files that changed from the base of the PR and between 5f86839 and 7e96c8e.

📒 Files selected for processing (2)
  • relay/channel/openai/chat_via_responses.go
  • relay/chat_completions_via_responses.go

Comment on lines +598 to +602
var streamResp dto.ResponsesStreamResponse
if err := common.UnmarshalJsonStr(data, &streamResp); err != nil {
logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error())
return
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Stop aggregation on malformed SSE events.

Right now a bad event is only logged and skipped, so this path can still return 200 with a partial chat.completion assembled from incomplete upstream data. The stream handler above aborts on the same condition; this path should do the same.

Proposed fix
 		var streamResp dto.ResponsesStreamResponse
 		if err := common.UnmarshalJsonStr(data, &streamResp); err != nil {
 			logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error())
-			return
+			streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponseBody, http.StatusInternalServerError)
+			sr.Stop(streamErr)
+			return
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var streamResp dto.ResponsesStreamResponse
if err := common.UnmarshalJsonStr(data, &streamResp); err != nil {
logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error())
return
}
var streamResp dto.ResponsesStreamResponse
if err := common.UnmarshalJsonStr(data, &streamResp); err != nil {
logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error())
streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponseBody, http.StatusInternalServerError)
sr.Stop(streamErr)
return
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@relay/channel/openai/chat_via_responses.go` around lines 598 - 602, The
malformed SSE event handling currently logs the error then continues; change it
so a failed unmarshal of data into streamResp (via common.UnmarshalJsonStr)
aborts the whole aggregation rather than skipping: after logger.LogError(c,
"..."+err.Error()) propagate/return an error to the caller or abort the request
flow exactly like the stream handler does (e.g., call the same abort/cleanup
routine or return a non-200 error), ensuring the incomplete chat completion is
not emitted.

Comment on lines +621 to +643
case "response.output_item.added", "response.output_item.done":
if streamResp.Item == nil || streamResp.Item.Type != "function_call" {
break
}
itemID := strings.TrimSpace(streamResp.Item.ID)
callID := strings.TrimSpace(streamResp.Item.CallId)
if callID == "" {
callID = itemID
}
if itemID != "" && callID != "" {
toolCallCanonicalIDByItemID[itemID] = callID
}
if callID == "" {
break
}
registerToolCall(callID)
if name := strings.TrimSpace(streamResp.Item.Name); name != "" {
toolCallNameByID[callID] = name
usageText.WriteString(name)
}
if args := streamResp.Item.ArgumentsString(); args != "" {
toolCallArgsByID[callID] = args
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fallback usage accounting for tool calls is inconsistent here.

response.output_item.added and response.output_item.done can both carry the same function name, so usageText may count it twice. At the same time, arguments that only appear on the item payload are never appended at all. When upstream omits usage, the estimated Usage can end up inflated or undercounted.

Proposed fix
 	toolCallIndexByID := make(map[string]int)
 	toolCallNameByID := make(map[string]string)
 	toolCallArgsByID := make(map[string]string)
+	toolCallNameCounted := make(map[string]bool)
 	toolCallOrder := make([]string, 0)
 	toolCallCanonicalIDByItemID := make(map[string]string)
@@
 			registerToolCall(callID)
 			if name := strings.TrimSpace(streamResp.Item.Name); name != "" {
 				toolCallNameByID[callID] = name
-				usageText.WriteString(name)
+				if !toolCallNameCounted[callID] {
+					usageText.WriteString(name)
+					toolCallNameCounted[callID] = true
+				}
 			}
 			if args := streamResp.Item.ArgumentsString(); args != "" {
+				usageText.WriteString(stringDeltaFromPrefix(toolCallArgsByID[callID], args))
 				toolCallArgsByID[callID] = args
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@relay/channel/openai/chat_via_responses.go` around lines 621 - 643, When
handling "response.output_item.added"/"response.output_item.done" in
chat_via_responses.go, avoid double-counting the function name and ensure args
from the item payload are used as a fallback when upstream omits usage: after
deriving itemID and callID and calling registerToolCall(callID), only set
toolCallNameByID[callID] and append to usageText if the name isn't already
recorded for that callID (prevent duplicate append on both "added" and "done"),
and if toolCallArgsByID[callID] is empty but streamResp.Item.ArgumentsString()
returns non-empty, populate toolCallArgsByID[callID] and append the args to
usageText as the fallback; use the same callID lookup (and
toolCallCanonicalIDByItemID mapping) to ensure consistent canonicalization.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant