From a5d8ac6a113c0f3752291ff4b73b05e415ca0c41 Mon Sep 17 00:00:00 2001 From: Your Name <3094777083@qq.com> Date: Wed, 17 Jun 2026 09:29:05 +0800 Subject: [PATCH] fix(apicompat): harden Responses and Anthropic tool bridging --- .../handler/gateway_handler_responses.go | 7 +- .../pkg/apicompat/anthropic_responses_test.go | 233 +++++++++++++- .../pkg/apicompat/anthropic_to_responses.go | 4 +- .../anthropic_to_responses_response.go | 111 ++++++- .../chatcompletions_empty_delta_test.go | 201 ++++++++++++ .../chatcompletions_responses_bridge.go | 97 +++++- .../chatcompletions_responses_test.go | 4 +- .../apicompat/chatcompletions_to_responses.go | 6 +- .../responses_input_item_polymorphic_test.go | 253 +++++++++++++++ .../responses_to_anthropic_request.go | 205 ++++++++++--- ...esponses_to_anthropic_tool_pairing_test.go | 2 +- ...esponses_to_anthropic_tools_system_test.go | 219 +++++++++++++ backend/internal/pkg/apicompat/types.go | 28 +- .../service/anthropic_empty_output.go | 288 ++++++++++++++++++ ...teway_anthropic_apikey_passthrough_test.go | 93 ++++++ .../service/gateway_forward_as_responses.go | 175 +++++++++-- .../gateway_forward_as_responses_test.go | 89 ++++++ .../service/gateway_messages_cache.go | 17 ++ backend/internal/service/gateway_service.go | 142 +++++++-- .../service/gateway_tool_rewrite_test.go | 16 + .../internal/service/ops_upstream_context.go | 4 + backend/internal/service/stream_failover.go | 26 ++ 22 files changed, 2111 insertions(+), 109 deletions(-) create mode 100644 backend/internal/pkg/apicompat/chatcompletions_empty_delta_test.go create mode 100644 backend/internal/pkg/apicompat/responses_input_item_polymorphic_test.go create mode 100644 backend/internal/pkg/apicompat/responses_to_anthropic_tools_system_test.go create mode 100644 backend/internal/service/anthropic_empty_output.go create mode 100644 backend/internal/service/stream_failover.go diff --git a/backend/internal/handler/gateway_handler_responses.go b/backend/internal/handler/gateway_handler_responses.go index 3edecd77270..4bd970befa2 100644 --- a/backend/internal/handler/gateway_handler_responses.go +++ b/backend/internal/handler/gateway_handler_responses.go @@ -169,6 +169,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) { if parsedReq == nil { parsedReq = &service.ParsedRequest{Model: reqModel, Stream: reqStream, Body: bodyRef} } + setOpsRequestContext(c, reqModel, reqStream, body, parsedReq.EstimatedInputTokens) parsedReq.SessionContext = &service.SessionContext{ ClientIP: ip.GetClientIP(c), UserAgent: c.GetHeader("User-Agent"), @@ -223,6 +224,9 @@ func (h *GatewayHandler) Responses(c *gin.Context) { ) if err != nil { reqLog.Warn("gateway.responses.account_slot_acquire_failed", zap.Int64("account_id", account.ID), zap.Error(err)) + if fs.HandleAccountSlotExhausted(c.Request.Context(), account.ID) == FailoverContinue { + continue + } h.handleConcurrencyError(c, err, "account", streamStarted) return } @@ -231,6 +235,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) { // 5. Forward request writerSizeBeforeForward := c.Writer.Size() + writerWrittenBeforeForward := c.Writer.Written() forwardBody := body if channelMapping.Mapped { forwardBody = h.gatewayService.ReplaceModelInBody(body, channelMapping.MappedModel) @@ -245,7 +250,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) { var failoverErr *service.UpstreamFailoverError if errors.As(err, &failoverErr) { // Can't failover if streaming content already sent - if c.Writer.Size() != writerSizeBeforeForward { + if c.Writer.Written() != writerWrittenBeforeForward || c.Writer.Size() != writerSizeBeforeForward { h.handleResponsesFailoverExhausted(c, failoverErr, true) return } diff --git a/backend/internal/pkg/apicompat/anthropic_responses_test.go b/backend/internal/pkg/apicompat/anthropic_responses_test.go index 8997835c2aa..c4238951fe0 100644 --- a/backend/internal/pkg/apicompat/anthropic_responses_test.go +++ b/backend/internal/pkg/apicompat/anthropic_responses_test.go @@ -12,6 +12,8 @@ import ( // AnthropicToResponses tests // --------------------------------------------------------------------------- +func intPtr(v int) *int { return &v } + func TestAnthropicToResponses_BasicText(t *testing.T) { req := &AnthropicRequest{ Model: "gpt-5.2", @@ -143,7 +145,7 @@ func TestAnthropicToResponses_ToolUse(t *testing.T) { assert.Empty(t, items[2].ID) assert.Equal(t, "function_call_output", items[3].Type) assert.Equal(t, "call_1", items[3].CallID) - assert.Equal(t, "Sunny, 72°F", items[3].Output) + assert.Equal(t, `"Sunny, 72°F"`, string(items[3].Output)) } func TestAnthropicToResponses_ThinkingIgnored(t *testing.T) { @@ -794,6 +796,70 @@ func TestStreamingReasoning(t *testing.T) { assert.Equal(t, "content_block_stop", events[0].Type) } +func TestAnthropicEventToResponses_ReasoningDoneCarriesFullSummary(t *testing.T) { + state := NewAnthropicEventToResponsesState() + + AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "message_start", + Message: &AnthropicResponse{ + ID: "msg_reasoning_done", + Model: "claude-sonnet-4-5-20250929", + }, + }, state) + + idx := 0 + events := AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_start", + Index: &idx, + ContentBlock: &AnthropicContentBlock{ + Type: "thinking", + ID: "think_1", + }, + }, state) + require.Len(t, events, 1) + assert.Equal(t, "response.output_item.added", events[0].Type) + + AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_delta", + Index: &idx, + Delta: &AnthropicDelta{ + Type: "thinking_delta", + Thinking: "step one", + }, + }, state) + AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_delta", + Index: &idx, + Delta: &AnthropicDelta{ + Type: "thinking_delta", + Thinking: " and step two", + }, + }, state) + + events = AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_stop", + Index: &idx, + }, state) + + var reasoningDone, itemDone *ResponsesStreamEvent + for i := range events { + switch events[i].Type { + case "response.reasoning_summary_text.done": + reasoningDone = &events[i] + case "response.output_item.done": + itemDone = &events[i] + } + } + require.NotNil(t, reasoningDone, "reasoning_summary_text.done missing") + assert.Equal(t, "step one and step two", reasoningDone.Text) + require.NotNil(t, itemDone, "output_item.done missing") + require.NotNil(t, itemDone.Item) + assert.Equal(t, "reasoning", itemDone.Item.Type) + require.Len(t, itemDone.Item.Summary, 1) + assert.Equal(t, "summary_text", itemDone.Item.Summary[0].Type) + assert.Equal(t, "step one and step two", itemDone.Item.Summary[0].Text) +} + func TestStreamingIncomplete(t *testing.T) { state := NewResponsesEventToAnthropicState() @@ -1256,6 +1322,37 @@ func TestResponsesToAnthropicRequest_ToolChoiceLegacyFunctionName(t *testing.T) assert.Equal(t, "get_weather", tc["name"]) } +func TestResponsesToAnthropicRequest_ReasoningUsesAdaptiveThinking(t *testing.T) { + req := &ResponsesRequest{ + Model: "gpt-5.5", + Input: json.RawMessage(`[{"role":"user","content":"Hello"}]`), + Reasoning: &ResponsesReasoning{ + Effort: "medium", + }, + } + + resp, err := ResponsesToAnthropicRequest(req) + require.NoError(t, err) + require.NotNil(t, resp.OutputConfig) + assert.Equal(t, "medium", resp.OutputConfig.Effort) + require.NotNil(t, resp.Thinking) + assert.Equal(t, "adaptive", resp.Thinking.Type) + assert.Zero(t, resp.Thinking.BudgetTokens) +} + +func TestResponsesToAnthropicRequest_EmptyUserContentGetsPlaceholder(t *testing.T) { + req := &ResponsesRequest{ + Model: "gpt-5.5", + Input: json.RawMessage(`[{"role":"user","content":[]}]`), + } + + resp, err := ResponsesToAnthropicRequest(req) + require.NoError(t, err) + require.Len(t, resp.Messages, 1) + assert.Equal(t, "user", resp.Messages[0].Role) + assert.JSONEq(t, `"(empty)"`, string(resp.Messages[0].Content)) +} + // --------------------------------------------------------------------------- // Image content block conversion tests // --------------------------------------------------------------------------- @@ -1340,7 +1437,7 @@ func TestAnthropicToResponses_ToolResultWithImage(t *testing.T) { // function_call_output should have text-only output (no image). assert.Equal(t, "function_call_output", items[2].Type) assert.Equal(t, "toolu_1", items[2].CallID) - assert.Equal(t, "(empty)", items[2].Output) + assert.Equal(t, `"(empty)"`, string(items[2].Output)) // Image should be in a separate user message. assert.Equal(t, "user", items[3].Role) @@ -1377,7 +1474,7 @@ func TestAnthropicToResponses_ToolResultMixed(t *testing.T) { // function_call_output should have text-only output. assert.Equal(t, "function_call_output", items[2].Type) - assert.Equal(t, "File metadata: 800x600 PNG", items[2].Output) + assert.Equal(t, `"File metadata: 800x600 PNG"`, string(items[2].Output)) // Image should be in a separate user message. assert.Equal(t, "user", items[3].Role) @@ -1412,7 +1509,7 @@ func TestAnthropicToResponses_TextOnlyToolResultBackwardCompat(t *testing.T) { require.Len(t, items, 3) // Text-only tool_result should produce a plain string. - assert.Equal(t, "Sunny, 72°F", items[2].Output) + assert.Equal(t, `"Sunny, 72°F"`, string(items[2].Output)) } func TestAnthropicToResponses_ImageEmptyMediaType(t *testing.T) { @@ -1733,3 +1830,131 @@ func TestAnthropicEventToResponses_CacheTokensFromMessageDelta(t *testing.T) { require.NotNil(t, completed.Response.Usage.InputTokensDetails) assert.Equal(t, 11, completed.Response.Usage.InputTokensDetails.CachedTokens) } + +func TestAnthropicEventToResponses_ToolCallDoneCarriesFullArguments(t *testing.T) { + state := NewAnthropicEventToResponsesState() + + AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "message_start", + Message: &AnthropicResponse{ + ID: "msg_tool", + Model: "claude-sonnet-4-5-20250929", + }, + }, state) + + idx := 0 + events := AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_start", + Index: &idx, + ContentBlock: &AnthropicContentBlock{ + Type: "tool_use", + ID: "toolu_123", + Name: "run_shell", + }, + }, state) + require.Len(t, events, 1) + require.Equal(t, "response.output_item.added", events[0].Type) + + AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_delta", + Index: &idx, + Delta: &AnthropicDelta{ + Type: "input_json_delta", + PartialJSON: `{"cmd":`, + }, + }, state) + AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_delta", + Index: &idx, + Delta: &AnthropicDelta{ + Type: "input_json_delta", + PartialJSON: `"ls"}`, + }, + }, state) + + events = AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_stop", + Index: &idx, + }, state) + + var argsDone, itemDone *ResponsesStreamEvent + for i := range events { + switch events[i].Type { + case "response.function_call_arguments.done": + argsDone = &events[i] + case "response.output_item.done": + itemDone = &events[i] + } + } + require.NotNil(t, argsDone, "function_call_arguments.done missing") + assert.Equal(t, `{"cmd":"ls"}`, argsDone.Arguments) + require.NotNil(t, itemDone, "function_call output_item.done missing") + require.NotNil(t, itemDone.Item) + assert.Equal(t, "function_call", itemDone.Item.Type) + assert.Equal(t, "toolu_123", itemDone.Item.CallID) + assert.Equal(t, "run_shell", itemDone.Item.Name) + assert.Equal(t, `{"cmd":"ls"}`, itemDone.Item.Arguments) +} + +func TestAnthropicEventToResponses_ToolUseStartEmptyInputDoesNotPrefixDeltaArguments(t *testing.T) { + state := NewAnthropicEventToResponsesState() + + AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "message_start", + Message: &AnthropicResponse{ + ID: "msg_tool_empty_start", + Model: "claude-sonnet-4-5-20250929", + }, + }, state) + + idx := 0 + events := AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_start", + Index: &idx, + ContentBlock: &AnthropicContentBlock{ + Type: "tool_use", + ID: "toolu_empty_start", + Name: "get_weather", + Input: json.RawMessage(`{}`), + }, + }, state) + require.Len(t, events, 1) + require.Equal(t, "response.output_item.added", events[0].Type) + + AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_delta", + Index: &idx, + Delta: &AnthropicDelta{ + Type: "input_json_delta", + PartialJSON: `{"city":"Beijing"`, + }, + }, state) + AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_delta", + Index: &idx, + Delta: &AnthropicDelta{ + Type: "input_json_delta", + PartialJSON: `,"unit":"celsius"}`, + }, + }, state) + + events = AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_stop", + Index: &idx, + }, state) + + var argsDone, itemDone *ResponsesStreamEvent + for i := range events { + switch events[i].Type { + case "response.function_call_arguments.done": + argsDone = &events[i] + case "response.output_item.done": + itemDone = &events[i] + } + } + require.NotNil(t, argsDone, "function_call_arguments.done missing") + require.JSONEq(t, `{"city":"Beijing","unit":"celsius"}`, argsDone.Arguments) + require.NotNil(t, itemDone, "function_call output_item.done missing") + require.NotNil(t, itemDone.Item) + require.JSONEq(t, `{"city":"Beijing","unit":"celsius"}`, itemDone.Item.Arguments) +} diff --git a/backend/internal/pkg/apicompat/anthropic_to_responses.go b/backend/internal/pkg/apicompat/anthropic_to_responses.go index e2011bee0bf..bc29da07dd5 100644 --- a/backend/internal/pkg/apicompat/anthropic_to_responses.go +++ b/backend/internal/pkg/apicompat/anthropic_to_responses.go @@ -221,7 +221,7 @@ func anthropicUserToResponses(raw json.RawMessage) ([]ResponsesInputItem, error) out = append(out, ResponsesInputItem{ Type: "function_call_output", CallID: toResponsesCallID(b.ToolUseID), - Output: outputText, + Output: jsonRawString(outputText), }) toolResultImageParts = append(toolResultImageParts, imageParts...) } @@ -302,7 +302,7 @@ func anthropicAssistantToResponses(raw json.RawMessage) ([]ResponsesInputItem, e Type: "function_call", CallID: fcID, Name: b.Name, - Arguments: args, + Arguments: jsonRawString(args), }) } diff --git a/backend/internal/pkg/apicompat/anthropic_to_responses_response.go b/backend/internal/pkg/apicompat/anthropic_to_responses_response.go index de8ab78df89..d457ec775f3 100644 --- a/backend/internal/pkg/apicompat/anthropic_to_responses_response.go +++ b/backend/internal/pkg/apicompat/anthropic_to_responses_response.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "strings" "time" ) @@ -151,10 +152,23 @@ type AnthropicEventToResponsesState struct { // For message output: accumulate text parts ContentIndex int + // CurrentText accumulates the message's output_text so the terminal + // output_item.done can carry the full content. codex collects final text + // from OutputItemDone items, not from output_text.delta events, so the + // message item MUST include content:[{type:output_text,text:...}]. + CurrentText string // For function_call: track per-output info CurrentCallID string CurrentName string + // CurrentArguments accumulates the function_call's argument JSON so the + // terminal output_item.done (and arguments.done) can carry the full args. + // codex reads the tool call from the OutputItemDone item; without + // call_id/name/arguments it cannot execute the tool and stalls. + CurrentArguments string + // CurrentReasoning accumulates thinking_delta so the terminal reasoning + // events carry the same summary text that was streamed in deltas. + CurrentReasoning string // Usage from message_start / message_delta. InputTokens here follows // Anthropic semantics (excludes cached tokens); they are added back when @@ -263,6 +277,7 @@ func anthToResHandleContentBlockStart(evt *AnthropicStreamEvent, state *Anthropi state.CurrentItemID = generateItemID() state.CurrentItemType = "reasoning" state.ContentIndex = 0 + state.CurrentReasoning = "" events = append(events, makeResponsesEvent(state, "response.output_item.added", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex, @@ -278,6 +293,7 @@ func anthToResHandleContentBlockStart(evt *AnthropicStreamEvent, state *Anthropi state.CurrentItemID = generateItemID() state.CurrentItemType = "message" state.ContentIndex = 0 + state.CurrentText = "" events = append(events, makeResponsesEvent(state, "response.output_item.added", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex, @@ -288,6 +304,21 @@ func anthToResHandleContentBlockStart(evt *AnthropicStreamEvent, state *Anthropi Status: "in_progress", }, })) + + // Emit response.content_part.added so clients (e.g. codex) know a + // text content part is starting. Without it the subsequent + // output_text.delta events have no part to attach to and the client + // renders nothing. Reverse of anthToResHandleContentBlockStop's + // content_part.done. + events = append(events, makeResponsesEvent(state, "response.content_part.added", &ResponsesStreamEvent{ + OutputIndex: state.OutputIndex, + ContentIndex: state.ContentIndex, + ItemID: state.CurrentItemID, + Part: &ResponsesContentPart{ + Type: "output_text", + Text: "", + }, + })) } case "tool_use": @@ -298,6 +329,7 @@ func anthToResHandleContentBlockStart(evt *AnthropicStreamEvent, state *Anthropi state.CurrentItemType = "function_call" state.CurrentCallID = toResponsesCallID(evt.ContentBlock.ID) state.CurrentName = evt.ContentBlock.Name + state.CurrentArguments = initialToolInputArguments(evt.ContentBlock.Input) events = append(events, makeResponsesEvent(state, "response.output_item.added", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex, @@ -324,6 +356,7 @@ func anthToResHandleContentBlockDelta(evt *AnthropicStreamEvent, state *Anthropi if evt.Delta.Text == "" { return nil } + state.CurrentText += evt.Delta.Text return []ResponsesStreamEvent{makeResponsesEvent(state, "response.output_text.delta", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex, ContentIndex: state.ContentIndex, @@ -335,6 +368,7 @@ func anthToResHandleContentBlockDelta(evt *AnthropicStreamEvent, state *Anthropi if evt.Delta.Thinking == "" { return nil } + state.CurrentReasoning += evt.Delta.Thinking return []ResponsesStreamEvent{makeResponsesEvent(state, "response.reasoning_summary_text.delta", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex, SummaryIndex: 0, @@ -346,6 +380,7 @@ func anthToResHandleContentBlockDelta(evt *AnthropicStreamEvent, state *Anthropi if evt.Delta.PartialJSON == "" { return nil } + state.CurrentArguments += evt.Delta.PartialJSON return []ResponsesStreamEvent{makeResponsesEvent(state, "response.function_call_arguments.delta", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex, Delta: evt.Delta.PartialJSON, @@ -371,6 +406,7 @@ func anthToResHandleContentBlockStop(evt *AnthropicStreamEvent, state *Anthropic OutputIndex: state.OutputIndex, SummaryIndex: 0, ItemID: state.CurrentItemID, + Text: state.CurrentReasoning, }), } events = append(events, closeCurrentResponsesItem(state)...) @@ -384,18 +420,32 @@ func anthToResHandleContentBlockStop(evt *AnthropicStreamEvent, state *Anthropic ItemID: state.CurrentItemID, CallID: state.CurrentCallID, Name: state.CurrentName, + Arguments: nonEmptyArguments(state.CurrentArguments), }), } events = append(events, closeCurrentResponsesItem(state)...) return events case "message": - // Emit output_text.done (text block is done, but message item stays open for potential more blocks) + // Text block done: emit output_text.done then content_part.done. + // The message item stays open for potential more blocks; it is closed + // later by closeCurrentResponsesItem. content_part.done mirrors the + // content_part.added emitted in anthToResHandleContentBlockStart. return []ResponsesStreamEvent{ makeResponsesEvent(state, "response.output_text.done", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex, ContentIndex: state.ContentIndex, ItemID: state.CurrentItemID, + Text: state.CurrentText, + }), + makeResponsesEvent(state, "response.content_part.done", &ResponsesStreamEvent{ + OutputIndex: state.OutputIndex, + ContentIndex: state.ContentIndex, + ItemID: state.CurrentItemID, + Part: &ResponsesContentPart{ + Type: "output_text", + Text: state.CurrentText, + }, }), } } @@ -450,25 +500,74 @@ func closeCurrentResponsesItem(state *AnthropicEventToResponsesState) []Response itemType := state.CurrentItemType itemID := state.CurrentItemID + currentText := state.CurrentText + currentCallID := state.CurrentCallID + currentName := state.CurrentName + currentArgs := state.CurrentArguments + currentReasoning := state.CurrentReasoning // Reset state.CurrentItemType = "" state.CurrentItemID = "" state.CurrentCallID = "" state.CurrentName = "" + state.CurrentText = "" + state.CurrentArguments = "" + state.CurrentReasoning = "" state.OutputIndex++ state.ContentIndex = 0 + // The terminal item carries its full content. codex collects final output + // from OutputItemDone items (not from the delta events), so an item missing + // its content/arguments renders blank or cannot be executed as a tool call. + doneItem := &ResponsesOutput{ + Type: itemType, + ID: itemID, + Status: "completed", + } + switch itemType { + case "message": + doneItem.Role = "assistant" + doneItem.Content = []ResponsesContentPart{{ + Type: "output_text", + Text: currentText, + }} + case "function_call": + doneItem.CallID = currentCallID + doneItem.Name = currentName + doneItem.Arguments = nonEmptyArguments(currentArgs) + case "reasoning": + if currentReasoning != "" { + doneItem.Summary = []ResponsesSummary{{ + Type: "summary_text", + Text: currentReasoning, + }} + } + } + return []ResponsesStreamEvent{makeResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex - 1, // Use the index before increment - Item: &ResponsesOutput{ - Type: itemType, - ID: itemID, - Status: "completed", - }, + Item: doneItem, })} } +// nonEmptyArguments ensures function_call arguments are valid JSON. Anthropic +// tool_use with no input produces an empty string; codex expects at least "{}". +func nonEmptyArguments(args string) string { + if strings.TrimSpace(args) == "" { + return "{}" + } + return args +} + +func initialToolInputArguments(input json.RawMessage) string { + trimmed := strings.TrimSpace(string(input)) + if trimmed == "" || trimmed == "{}" || trimmed == "null" { + return "" + } + return trimmed +} + func makeResponsesCreatedEvent(state *AnthropicEventToResponsesState) ResponsesStreamEvent { seq := state.SequenceNumber state.SequenceNumber++ diff --git a/backend/internal/pkg/apicompat/chatcompletions_empty_delta_test.go b/backend/internal/pkg/apicompat/chatcompletions_empty_delta_test.go new file mode 100644 index 00000000000..d2aa04bbafa --- /dev/null +++ b/backend/internal/pkg/apicompat/chatcompletions_empty_delta_test.go @@ -0,0 +1,201 @@ +package apicompat + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// strptr is a local helper for *string fields. +func strptr(s string) *string { return &s } + +// Reproduces the mimo "thinking done, nothing shown" bug: the upstream emits a +// leading {"content":""} chunk (non-nil, empty). The bridge must NOT emit a +// response.output_text.delta for it (the delta would serialize empty and a +// premature message item would be created), and must still stream the real +// content that follows. +func TestChatChunkToResponses_SkipsEmptyContentDelta(t *testing.T) { + state := NewChatCompletionsToResponsesStreamState("mimo-v2.5") + + // chunk 1: empty content (some upstreams send a leading empty chunk) — no text delta + c1 := &ChatCompletionsChunk{ + ID: "c1", + Choices: []ChatChunkChoice{{Delta: ChatDelta{Role: "assistant", Content: strptr("")}}}, + } + ev1 := ChatCompletionsChunkToResponsesEvents(c1, state) + for _, e := range ev1 { + assert.NotEqual(t, "response.output_text.delta", e.Type, + "empty content must not emit an output_text delta") + } + + // chunk 2: real content — must emit a delta carrying the text + c2 := &ChatCompletionsChunk{ + ID: "c1", + Choices: []ChatChunkChoice{{Delta: ChatDelta{Content: strptr("Hello")}}}, + } + ev2 := ChatCompletionsChunkToResponsesEvents(c2, state) + var sawDelta bool + for _, e := range ev2 { + if e.Type == "response.output_text.delta" { + sawDelta = true + assert.Equal(t, "Hello", e.Delta) + } + } + assert.True(t, sawDelta, "real content must emit an output_text delta") +} + +func TestChatChunkToResponses_SkipsEmptyReasoningDelta(t *testing.T) { + state := NewChatCompletionsToResponsesStreamState("mimo-v2.5") + c := &ChatCompletionsChunk{ + ID: "c1", + Choices: []ChatChunkChoice{{Delta: ChatDelta{ReasoningContent: strptr("")}}}, + } + ev := ChatCompletionsChunkToResponsesEvents(c, state) + for _, e := range ev { + assert.NotEqual(t, "response.reasoning_summary_text.delta", e.Type, + "empty reasoning_content must not emit a reasoning delta") + } +} + +// Full mimo-shaped stream: empty content → reasoning → real content. The final +// visible text must be exactly the real content, and at least one non-empty +// output_text delta must reach the client. +func TestChatChunkToResponses_MimoShapedStream(t *testing.T) { + state := NewChatCompletionsToResponsesStreamState("mimo-v2.5") + chunks := []*ChatCompletionsChunk{ + {ID: "x", Choices: []ChatChunkChoice{{Delta: ChatDelta{Role: "assistant", Content: strptr("")}}}}, + {ID: "x", Choices: []ChatChunkChoice{{Delta: ChatDelta{ReasoningContent: strptr("thinking...")}}}}, + {ID: "x", Choices: []ChatChunkChoice{{Delta: ChatDelta{Content: strptr("Hi")}}}}, + {ID: "x", Choices: []ChatChunkChoice{{Delta: ChatDelta{Content: strptr("!")}}}}, + } + var textDeltas []string + for _, c := range chunks { + for _, e := range ChatCompletionsChunkToResponsesEvents(c, state) { + if e.Type == "response.output_text.delta" { + textDeltas = append(textDeltas, e.Delta) + } + } + } + // every emitted text delta is non-empty + for _, d := range textDeltas { + assert.NotEqual(t, "", d) + } + assert.Equal(t, "Hi!", strings.Join(textDeltas, "")) +} + +// codex requires response.content_part.added before output_text deltas and +// content_part.done at the end; without them it renders nothing. +func TestChatChunkToResponses_EmitsContentPartEvents(t *testing.T) { + state := NewChatCompletionsToResponsesStreamState("mimo-v2.5") + var types []string + for _, c := range []*ChatCompletionsChunk{ + {ID: "x", Choices: []ChatChunkChoice{{Delta: ChatDelta{Content: strptr("Hi")}}}}, + } { + for _, e := range ChatCompletionsChunkToResponsesEvents(c, state) { + types = append(types, e.Type) + } + } + for _, e := range FinalizeChatCompletionsResponsesStream(state) { + types = append(types, e.Type) + } + assert.Contains(t, types, "response.content_part.added") + assert.Contains(t, types, "response.content_part.done") + // content_part.added must come before the first output_text.delta + iAdded, iDelta := -1, -1 + for i, ty := range types { + if ty == "response.content_part.added" && iAdded < 0 { + iAdded = i + } + if ty == "response.output_text.delta" && iDelta < 0 { + iDelta = i + } + } + assert.GreaterOrEqual(t, iDelta, 0) + assert.GreaterOrEqual(t, iAdded, 0) + assert.Less(t, iAdded, iDelta, "content_part.added must precede output_text.delta") +} + +// codex collects final text from OutputItemDone items, so the message item in +// response.output_item.done must carry content with the accumulated text. +func TestChatChunkToResponses_OutputItemDoneCarriesContent(t *testing.T) { + state := NewChatCompletionsToResponsesStreamState("mimo-v2.5") + for _, c := range []*ChatCompletionsChunk{ + {ID: "x", Choices: []ChatChunkChoice{{Delta: ChatDelta{Content: strptr("Hello world")}}}}, + } { + ChatCompletionsChunkToResponsesEvents(c, state) + } + var found bool + for _, e := range FinalizeChatCompletionsResponsesStream(state) { + if e.Type == "response.output_item.done" && e.Item != nil && e.Item.Type == "message" { + found = true + require.Len(t, e.Item.Content, 1) + assert.Equal(t, "output_text", e.Item.Content[0].Type) + assert.Equal(t, "Hello world", e.Item.Content[0].Text) + } + } + assert.True(t, found, "must emit message output_item.done with content") +} + +// Some chat/completions upstreams reject reasoning_effort "xhigh" +// (only low/medium/high allowed). It must be normalized to high. +func TestResponsesToChatCompletions_XhighReasoningNormalized(t *testing.T) { + body := []byte(`{"model":"gpt-5.5","reasoning":{"effort":"xhigh"},"input":[{"role":"user","content":[{"type":"input_text","text":"hi"}]}]}`) + var req ResponsesRequest + require.NoError(t, json.Unmarshal(body, &req)) + cc, err := ResponsesToChatCompletionsRequest(&req) + require.NoError(t, err) + assert.Equal(t, "high", cc.ReasoningEffort, "xhigh must be normalized to high for chat/completions") +} + +func TestNormalizeChatReasoningEffort(t *testing.T) { + assert.Equal(t, "high", normalizeChatReasoningEffort("xhigh")) + assert.Equal(t, "high", normalizeChatReasoningEffort("high")) + assert.Equal(t, "high", normalizeChatReasoningEffort("max")) + assert.Equal(t, "medium", normalizeChatReasoningEffort("medium")) + assert.Equal(t, "low", normalizeChatReasoningEffort("low")) + assert.Equal(t, "low", normalizeChatReasoningEffort("minimal")) + assert.Equal(t, "", normalizeChatReasoningEffort("")) + assert.Equal(t, "", normalizeChatReasoningEffort("bogus")) +} + +// mimo and other chat/completions upstreams stream tool calls; the bridge must +// emit terminal function_call_arguments.done + output_item.done (with +// call_id/name/arguments) at stream end, or codex receives an unterminated +// tool call and stalls/renders blank. +func TestChatChunkToResponses_StreamedToolCallFinalized(t *testing.T) { + state := NewChatCompletionsToResponsesStreamState("test-reasoning-model") + idx := 0 + chunk := &ChatCompletionsChunk{ + ID: "x", + Choices: []ChatChunkChoice{{Delta: ChatDelta{ToolCalls: []ChatToolCall{{ + Index: &idx, + ID: "call_abc", + Type: "function", + Function: ChatFunctionCall{Name: "open_browser", Arguments: `{"url":"google.com"}`}, + }}}}}, + } + ChatCompletionsChunkToResponsesEvents(chunk, state) + final := FinalizeChatCompletionsResponsesStream(state) + + var argsDone, itemDone *ResponsesStreamEvent + for i := range final { + switch final[i].Type { + case "response.function_call_arguments.done": + argsDone = &final[i] + case "response.output_item.done": + if final[i].Item != nil && final[i].Item.Type == "function_call" { + itemDone = &final[i] + } + } + } + require.NotNil(t, argsDone, "must emit function_call_arguments.done") + assert.Equal(t, "call_abc", argsDone.CallID) + assert.JSONEq(t, `{"url":"google.com"}`, argsDone.Arguments) + require.NotNil(t, itemDone, "must emit function_call output_item.done") + assert.Equal(t, "call_abc", itemDone.Item.CallID) + assert.Equal(t, "open_browser", itemDone.Item.Name) + assert.JSONEq(t, `{"url":"google.com"}`, itemDone.Item.Arguments) +} diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go b/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go index f03e0bdce32..bac7f72c312 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go @@ -30,7 +30,7 @@ func ResponsesToChatCompletionsRequest(req *ResponsesRequest) (*ChatCompletionsR ServiceTier: req.ServiceTier, } if req.Reasoning != nil { - out.ReasoningEffort = req.Reasoning.Effort + out.ReasoningEffort = normalizeChatReasoningEffort(req.Reasoning.Effort) } if len(req.Tools) > 0 { out.Tools = responsesToolsToChatTools(req.Tools) @@ -125,7 +125,7 @@ func buildChatMessagesFromItems(messages []ChatMessage, rawItems []json.RawMessa } continue case "function_call": - arguments := rawString(item["arguments"]) + arguments := responsesArgumentsToChatString(item["arguments"]) if strings.TrimSpace(arguments) == "" { arguments = "{}" } @@ -155,7 +155,7 @@ func buildChatMessagesFromItems(messages []ChatMessage, rawItems []json.RawMessa pendingReasoning = "" continue case "function_call_output": - content, _ := json.Marshal(rawString(item["output"])) + content, _ := json.Marshal(extractResponsesOutputText(item["output"])) messages = append(messages, ChatMessage{ Role: "tool", ToolCallID: rawString(item["call_id"]), @@ -737,6 +737,10 @@ func ChatCompletionsChunkToResponsesEvents( copyCall.ID = generateItemID() } copyCall.Type = "function" + // Arguments are accumulated below (line: stored.Function.Arguments + // += ...). Clear them here so the first chunk's arguments are not + // counted twice (which produced duplicated JSON like `{...}{...}`). + copyCall.Function.Arguments = "" state.ToolCalls[idx] = ©Call stored = ©Call itemID := generateItemID() @@ -810,11 +814,16 @@ func FinalizeChatCompletionsResponsesStream(state *ChatCompletionsToResponsesStr events = append(events, chatToResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ OutputIndex: state.MessageIndex, Item: &ResponsesOutput{ - Type: "message", - ID: state.MessageItemID, - Role: "assistant", - Content: []ResponsesContentPart{{Type: "output_text", Text: state.Text.String()}}, - Status: "completed", + Type: "message", + ID: state.MessageItemID, + Role: "assistant", + Status: "completed", + // codex collects final text from OutputItemDone items, so the + // message item must carry its full content, not just status. + Content: []ResponsesContentPart{{ + Type: "output_text", + Text: state.Text.String(), + }}, }, })) } @@ -832,6 +841,39 @@ func FinalizeChatCompletionsResponsesStream(state *ChatCompletionsToResponsesStr incompleteDetails = &ResponsesIncompleteDetails{Reason: "max_output_tokens"} } + // Finalize streamed tool calls. The streaming loop emits + // output_item.added + function_call_arguments.delta per tool call but never + // their terminal events; without function_call_arguments.done and + // output_item.done (carrying call_id/name/arguments) codex receives an + // unterminated tool call, cannot execute it, and renders nothing. + for i := 0; i < len(state.ToolCalls); i++ { + toolCall, ok := state.ToolCalls[i] + if !ok || toolCall == nil { + continue + } + arguments := toolCall.Function.Arguments + if strings.TrimSpace(arguments) == "" { + arguments = "{}" + } + events = append(events, chatToResponsesEvent(state, "response.function_call_arguments.done", &ResponsesStreamEvent{ + OutputIndex: i + 1, + CallID: toolCall.ID, + Name: toolCall.Function.Name, + Arguments: arguments, + })) + events = append(events, chatToResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ + OutputIndex: i + 1, + Item: &ResponsesOutput{ + Type: "function_call", + ID: generateItemID(), + CallID: toolCall.ID, + Name: toolCall.Function.Name, + Arguments: arguments, + Status: "completed", + }, + })) + } + state.CompletedSent = true events = append(events, chatToResponsesEvent(state, "response.completed", &ResponsesStreamEvent{ Response: &ResponsesResponse{ @@ -1084,6 +1126,45 @@ func chatToResponsesEvent( return evt } +// normalizeChatReasoningEffort maps a Responses reasoning effort to a value the +// Chat Completions protocol accepts. The Responses API allows "xhigh" (codex's +// highest tier for gpt-5.5 etc.), but chat/completions upstreams (and the +// OpenAI chat/completions schema) only accept low/medium/high and 400 on +// "xhigh". Map xhigh→high; pass through known values; drop unknown/empty. +func normalizeChatReasoningEffort(effort string) string { + switch strings.ToLower(strings.TrimSpace(effort)) { + case "xhigh", "extrahigh", "max", "high": + return "high" + case "medium": + return "medium" + case "low", "minimal", "none": + return "low" + default: + return "" // omit unknown/empty so the upstream uses its default + } +} + +// responsesArgumentsToChatString converts a Responses function_call.arguments +// field into the stringified-JSON form required by Chat Completions +// (ChatFunctionCall.Arguments is a string). +// +// - stringified JSON: "{\"x\":1}" → use the inner string as-is +// - raw JSON object: {"x":1} → serialize to its string form +// - empty/absent → "" +func responsesArgumentsToChatString(raw json.RawMessage) string { + trimmed := json.RawMessage(strings.TrimSpace(string(raw))) + if len(trimmed) == 0 || string(trimmed) == "null" { + return "" + } + // Already a JSON string — return the inner value verbatim. + var s string + if err := json.Unmarshal(trimmed, &s); err == nil { + return s + } + // Object/array/other JSON — serialize to its compact string form. + return string(trimmed) +} + func rawString(raw json.RawMessage) string { raw = bytesTrimSpace(raw) if len(raw) == 0 || string(raw) == "null" { diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_test.go b/backend/internal/pkg/apicompat/chatcompletions_responses_test.go index c12715e1126..013e59325bf 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_responses_test.go +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_test.go @@ -105,7 +105,7 @@ func TestChatCompletionsToResponses_ToolCalls(t *testing.T) { // Check function_call_output item assert.Equal(t, "function_call_output", items[2].Type) assert.Equal(t, "call_1", items[2].CallID) - assert.Equal(t, "pong", items[2].Output) + assert.Equal(t, `"pong"`, string(items[2].Output)) // Check tools require.Len(t, resp.Tools, 1) @@ -673,7 +673,7 @@ func TestChatCompletionsToResponses_ToolArrayContent(t *testing.T) { require.Len(t, items, 3) assert.Equal(t, "function_call_output", items[2].Type) assert.Equal(t, "call_1", items[2].CallID) - assert.Equal(t, "image width: 100; image height: 200", items[2].Output) + assert.Equal(t, `"image width: 100; image height: 200"`, string(items[2].Output)) } func TestResponsesToChatCompletions_Incomplete(t *testing.T) { diff --git a/backend/internal/pkg/apicompat/chatcompletions_to_responses.go b/backend/internal/pkg/apicompat/chatcompletions_to_responses.go index 463bdd0d15d..a7459bdeb45 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_to_responses.go +++ b/backend/internal/pkg/apicompat/chatcompletions_to_responses.go @@ -194,7 +194,7 @@ func chatAssistantToResponses(m ChatMessage) ([]ResponsesInputItem, error) { Type: "function_call", CallID: tc.ID, Name: tc.Function.Name, - Arguments: args, + Arguments: jsonRawString(args), }) } @@ -284,7 +284,7 @@ func chatToolToResponses(m ChatMessage) ([]ResponsesInputItem, error) { return []ResponsesInputItem{{ Type: "function_call_output", CallID: m.ToolCallID, - Output: output, + Output: jsonRawString(output), }}, nil } @@ -302,7 +302,7 @@ func chatFunctionToResponses(m ChatMessage) ([]ResponsesInputItem, error) { return []ResponsesInputItem{{ Type: "function_call_output", CallID: m.Name, - Output: output, + Output: jsonRawString(output), }}, nil } diff --git a/backend/internal/pkg/apicompat/responses_input_item_polymorphic_test.go b/backend/internal/pkg/apicompat/responses_input_item_polymorphic_test.go new file mode 100644 index 00000000000..dda106bc9c8 --- /dev/null +++ b/backend/internal/pkg/apicompat/responses_input_item_polymorphic_test.go @@ -0,0 +1,253 @@ +package apicompat + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// These tests cover the fix for codex (and newer Responses clients) sending +// function_call.arguments as a JSON object and function_call_output.output as +// a JSON array. Before the fix, ResponsesInputItem.Arguments / .Output were +// typed `string`, so json.Unmarshal failed: +// - Responses→Anthropic path (ResponsesToAnthropicRequest): HTTP 502 +// - Responses→ChatCompletions path (ResponsesToChatCompletionsRequest): +// silent data loss (rawString returned "" for non-string values) + +// --- helper-level tests --------------------------------------------------- + +func TestNormalizeResponsesArguments(t *testing.T) { + cases := []struct { + name string + in string + want string + }{ + {"object", `{"x":1}`, `{"x":1}`}, + {"stringified", `"{\"x\":1}"`, `{"x":1}`}, + {"empty string", `""`, `{}`}, + {"empty raw", ``, `{}`}, + {"null", `null`, `{}`}, + {"non-json string", `"not json"`, `{}`}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := normalizeResponsesArguments(json.RawMessage(tc.in)) + assert.JSONEq(t, tc.want, string(got)) + }) + } +} + +func TestExtractResponsesOutputText(t *testing.T) { + cases := []struct { + name string + in string + want string + }{ + {"plain string", `"result"`, "result"}, + {"array one part", `[{"type":"output_text","text":"result"}]`, "result"}, + {"array two parts", `[{"type":"output_text","text":"a"},{"type":"output_text","text":"b"}]`, "a\n\nb"}, + {"empty raw", ``, ""}, + {"null", `null`, ""}, + {"empty array", `[]`, ""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := extractResponsesOutputText(json.RawMessage(tc.in)) + assert.Equal(t, tc.want, got) + }) + } +} + +// --- Responses→Anthropic path: must not 502 ---------------------------- + +func TestResponsesToAnthropicRequest_FunctionCallObjectArguments(t *testing.T) { + body := []byte(`{ + "model": "claude-opus-4-8", + "input": [ + {"type": "function_call", "call_id": "c1", "name": "foo", "arguments": {"x": 1}}, + {"type": "function_call_output", "call_id": "c1", "output": "ok"} + ] + }`) + var req ResponsesRequest + require.NoError(t, json.Unmarshal(body, &req)) + + anth, err := ResponsesToAnthropicRequest(&req) + require.NoError(t, err) // before fix: "cannot unmarshal object ... arguments of type string" + require.NotNil(t, anth) + + require.Len(t, anth.Messages, 2) + var blocks []AnthropicContentBlock + require.NoError(t, json.Unmarshal(anth.Messages[0].Content, &blocks)) + require.Len(t, blocks, 1) + assert.Equal(t, "tool_use", blocks[0].Type) + assert.Equal(t, "foo", blocks[0].Name) + assert.JSONEq(t, `{"x":1}`, string(blocks[0].Input)) +} + +func TestResponsesToAnthropicRequest_FunctionCallStringifiedArguments(t *testing.T) { + body := []byte(`{ + "model": "claude-opus-4-8", + "input": [ + {"type": "function_call", "call_id": "c1", "name": "foo", "arguments": "{\"x\":1}"}, + {"type": "function_call_output", "call_id": "c1", "output": "ok"} + ] + }`) + var req ResponsesRequest + require.NoError(t, json.Unmarshal(body, &req)) + + anth, err := ResponsesToAnthropicRequest(&req) + require.NoError(t, err) + + require.Len(t, anth.Messages, 2) + var blocks []AnthropicContentBlock + require.NoError(t, json.Unmarshal(anth.Messages[0].Content, &blocks)) + require.Len(t, blocks, 1) + assert.JSONEq(t, `{"x":1}`, string(blocks[0].Input)) +} + +func TestResponsesToAnthropicRequest_FunctionCallNonObjectArgumentsFallback(t *testing.T) { + cases := []string{ + `"not-json-object"`, + `[{"x":1}]`, + `123`, + } + for _, arguments := range cases { + t.Run(arguments, func(t *testing.T) { + body := []byte(`{ + "model": "claude-opus-4-8", + "input": [ + {"type": "function_call", "call_id": "c1", "name": "foo", "arguments": ` + arguments + `}, + {"type": "function_call_output", "call_id": "c1", "output": "ok"} + ] + }`) + var req ResponsesRequest + require.NoError(t, json.Unmarshal(body, &req)) + + anth, err := ResponsesToAnthropicRequest(&req) + require.NoError(t, err) + + require.Len(t, anth.Messages, 2) + var blocks []AnthropicContentBlock + require.NoError(t, json.Unmarshal(anth.Messages[0].Content, &blocks)) + require.Len(t, blocks, 1) + assert.JSONEq(t, `{}`, string(blocks[0].Input)) + }) + } +} + +func TestResponsesToAnthropicRequest_FunctionCallOutputArray(t *testing.T) { + body := []byte(`{ + "model": "claude-opus-4-8", + "input": [ + {"type": "function_call", "call_id": "c1", "name": "foo", "arguments": "{}"}, + {"type": "function_call_output", "call_id": "c1", + "output": [{"type": "output_text", "text": "result"}]} + ] + }`) + var req ResponsesRequest + require.NoError(t, json.Unmarshal(body, &req)) + + anth, err := ResponsesToAnthropicRequest(&req) + require.NoError(t, err) // before fix: "cannot unmarshal array ... output of type string" + require.NotNil(t, anth) + + require.Len(t, anth.Messages, 2) + var blocks []AnthropicContentBlock + require.NoError(t, json.Unmarshal(anth.Messages[1].Content, &blocks)) + require.Len(t, blocks, 1) + assert.Equal(t, "tool_result", blocks[0].Type) + assert.Equal(t, "toolu_c1", blocks[0].ToolUseID) // call_id is namespaced for Anthropic + assert.JSONEq(t, `"result"`, string(blocks[0].Content)) +} + +func TestResponsesToAnthropicRequest_FunctionCallOutputString(t *testing.T) { + // Backward compatibility: older clients send output as a plain string. + body := []byte(`{ + "model": "claude-opus-4-8", + "input": [ + {"type": "function_call", "call_id": "c1", "name": "foo", "arguments": "{}"}, + {"type": "function_call_output", "call_id": "c1", "output": "result"} + ] + }`) + var req ResponsesRequest + require.NoError(t, json.Unmarshal(body, &req)) + + anth, err := ResponsesToAnthropicRequest(&req) + require.NoError(t, err) + + require.Len(t, anth.Messages, 2) + var blocks []AnthropicContentBlock + require.NoError(t, json.Unmarshal(anth.Messages[1].Content, &blocks)) + require.Len(t, blocks, 1) + assert.JSONEq(t, `"result"`, string(blocks[0].Content)) +} + +func TestResponsesToAnthropicRequest_SkipsUnknownResponsesItems(t *testing.T) { + body := []byte(`{ + "model": "claude-opus-4-8", + "input": [ + {"role": "user", "content": [{"type": "input_text", "text": "search and summarize"}]}, + {"type": "web_search_call", "id": "ws_1", "content": [{"type": "output_text", "text": "internal search marker"}]}, + {"type": "function_call", "call_id": "call_1", "name": "foo", "arguments": "{}"}, + {"type": "function_call_output", "call_id": "call_1", "output": "ok"} + ] + }`) + var req ResponsesRequest + require.NoError(t, json.Unmarshal(body, &req)) + + anth, err := ResponsesToAnthropicRequest(&req) + require.NoError(t, err) + assertAnthropicPairing(t, anth.Messages) + + for _, msg := range anth.Messages { + for _, block := range parseContentBlocks(msg.Content) { + assert.NotEqual(t, "input_text", block.Type) + assert.NotEqual(t, "output_text", block.Type) + assert.NotContains(t, block.Text, "internal search marker") + } + } +} + +// --- Responses→ChatCompletions path: must not drop data ---------------- + +func TestResponsesToChatCompletionsRequest_FunctionCallObjectArguments(t *testing.T) { + body := []byte(`{ + "model": "gpt-5.4", + "input": [ + {"type": "function_call", "call_id": "c1", "name": "foo", "arguments": {"x": 1}}, + {"type": "function_call_output", "call_id": "c1", "output": "ok"} + ] + }`) + var req ResponsesRequest + require.NoError(t, json.Unmarshal(body, &req)) + + cc, err := ResponsesToChatCompletionsRequest(&req) + require.NoError(t, err) + require.Len(t, cc.Messages, 2) + require.Len(t, cc.Messages[0].ToolCalls, 1) + // Chat Completions requires arguments to be a stringified JSON object; + // before the fix rawString returned "" and it degraded to "{}". + assert.JSONEq(t, `{"x":1}`, cc.Messages[0].ToolCalls[0].Function.Arguments) +} + +func TestResponsesToChatCompletionsRequest_FunctionCallOutputArray(t *testing.T) { + body := []byte(`{ + "model": "gpt-5.4", + "input": [ + {"type": "function_call", "call_id": "c1", "name": "foo", "arguments": "{}"}, + {"type": "function_call_output", "call_id": "c1", + "output": [{"type": "output_text", "text": "result"}]} + ] + }`) + var req ResponsesRequest + require.NoError(t, json.Unmarshal(body, &req)) + + cc, err := ResponsesToChatCompletionsRequest(&req) + require.NoError(t, err) + require.Len(t, cc.Messages, 2) + assert.Equal(t, "tool", cc.Messages[1].Role) + // before the fix rawString returned "" → tool result content lost. + assert.JSONEq(t, `"result"`, string(cc.Messages[1].Content)) +} diff --git a/backend/internal/pkg/apicompat/responses_to_anthropic_request.go b/backend/internal/pkg/apicompat/responses_to_anthropic_request.go index 672ad80c449..84c92b7ccdc 100644 --- a/backend/internal/pkg/apicompat/responses_to_anthropic_request.go +++ b/backend/internal/pkg/apicompat/responses_to_anthropic_request.go @@ -11,7 +11,7 @@ import ( // enables Anthropic platform groups to accept OpenAI Responses API requests // by converting them to the native /v1/messages format before forwarding upstream. func ResponsesToAnthropicRequest(req *ResponsesRequest) (*AnthropicRequest, error) { - system, messages, err := convertResponsesInputToAnthropic(req.Input) + system, messages, err := convertResponsesInputToAnthropic(req.Instructions, req.Input) if err != nil { return nil, err } @@ -58,8 +58,7 @@ func ResponsesToAnthropicRequest(req *ResponsesRequest) (*AnthropicRequest, erro // Enable thinking for non-low efforts if effort != "low" { out.Thinking = &AnthropicThinking{ - Type: "enabled", - BudgetTokens: defaultThinkingBudget(effort), + Type: "adaptive", } } } @@ -98,14 +97,27 @@ func mapResponsesEffortToAnthropic(effort string) string { } // convertResponsesInputToAnthropic extracts system prompt and messages from -// a Responses API input array. Returns the system as raw JSON (for Anthropic's -// polymorphic system field) and a list of Anthropic messages. -func convertResponsesInputToAnthropic(inputRaw json.RawMessage) (json.RawMessage, []AnthropicMessage, error) { +// a Responses API request. The system prompt is sourced from (in priority +// order, concatenated): the top-level `instructions` field (codex's primary +// system prompt) and any system/developer role items in the input array. +// Returns the system as raw JSON (for Anthropic's polymorphic system field) +// and a list of Anthropic messages. +// +// codex sends its ~20KB system prompt in `instructions` and additional context +// in `developer` role items; both must map to Anthropic's system field, not be +// dropped (the old code ignored both, leaving claude without instructions) nor +// leaked into a user message as raw input_text blocks (which caused 422). +func convertResponsesInputToAnthropic(instructions string, inputRaw json.RawMessage) (json.RawMessage, []AnthropicMessage, error) { + var systemParts []string + if s := strings.TrimSpace(instructions); s != "" { + systemParts = append(systemParts, s) + } + // Try as plain string input. var inputStr string if err := json.Unmarshal(inputRaw, &inputStr); err == nil { content, _ := json.Marshal(inputStr) - return nil, []AnthropicMessage{{Role: "user", Content: content}}, nil + return buildSystemJSON(systemParts), []AnthropicMessage{{Role: "user", Content: content}}, nil } var items []ResponsesInputItem @@ -113,29 +125,23 @@ func convertResponsesInputToAnthropic(inputRaw json.RawMessage) (json.RawMessage return nil, nil, fmt.Errorf("parse responses input: %w", err) } - var system json.RawMessage var messages []AnthropicMessage for _, item := range items { switch { - case item.Role == "system": - // System prompt → Anthropic system field - text := extractTextFromContent(item.Content) - if text != "" { - system, _ = json.Marshal(text) + case item.Role == "system" || item.Role == "developer": + // system / developer → Anthropic system field + if text := strings.TrimSpace(extractTextFromContent(item.Content)); text != "" { + systemParts = append(systemParts, text) } case item.Type == "function_call": // function_call → assistant message with tool_use block - input := json.RawMessage("{}") - if item.Arguments != "" { - input = json.RawMessage(item.Arguments) - } block := AnthropicContentBlock{ Type: "tool_use", ID: fromResponsesCallIDToAnthropic(item.CallID), Name: item.Name, - Input: input, + Input: normalizeResponsesArguments(item.Arguments), } blockJSON, _ := json.Marshal([]AnthropicContentBlock{block}) messages = append(messages, AnthropicMessage{ @@ -145,7 +151,7 @@ func convertResponsesInputToAnthropic(inputRaw json.RawMessage) (json.RawMessage case item.Type == "function_call_output": // function_call_output → user message with tool_result block - outputContent := item.Output + outputContent := extractResponsesOutputText(item.Output) if outputContent == "" { outputContent = "(empty)" } @@ -182,7 +188,13 @@ func convertResponsesInputToAnthropic(inputRaw json.RawMessage) (json.RawMessage }) default: - // Unknown role/type — attempt as user message + // Skip unknown OpenAI-only Responses item types instead of leaking their + // raw content into Anthropic messages, which can break tool adjacency or + // introduce invalid block shapes upstream. + if item.Type != "" { + continue + } + // Unknown role-only item with content: fall back to a user message. if item.Content != nil { messages = append(messages, AnthropicMessage{ Role: "user", @@ -201,7 +213,31 @@ func convertResponsesInputToAnthropic(inputRaw json.RawMessage) (json.RawMessage messages = normalizeAnthropicToolPairing(messages) messages = mergeConsecutiveMessages(messages) - return system, messages, nil + return buildSystemJSON(systemParts), messages, nil +} + +// buildSystemJSON joins collected system prompt fragments into Anthropic's +// system field. Returns nil when there is no non-empty content, so the system +// field is omitted entirely — Anthropic returns 422 for an empty or +// whitespace-only system. +// +// The system is emitted in ARRAY form ([{"type":"text","text":...}]), not as a +// bare JSON string. Both are valid per the Anthropic spec and the official +// Claude Code client uses the array form, but some third-party Anthropic- +// compatible upstreams return 422 when a string-form system is +// combined with tools. The array form works in every case. +func buildSystemJSON(parts []string) json.RawMessage { + joined := strings.TrimSpace(strings.Join(parts, "\n\n")) + if joined == "" { + return nil + } + out, err := json.Marshal([]map[string]string{ + {"type": "text", "text": joined}, + }) + if err != nil { + return nil + } + return out } // normalizeAnthropicToolPairing rebuilds the message sequence so it satisfies @@ -348,7 +384,7 @@ func extractTextFromContent(raw json.RawMessage) string { // content field into Anthropic content blocks JSON. func convertResponsesUserToAnthropicContent(raw json.RawMessage) (json.RawMessage, error) { if len(raw) == 0 { - return json.Marshal("") // empty string content + return json.Marshal("(empty)") } // Try plain string. @@ -386,7 +422,7 @@ func convertResponsesUserToAnthropicContent(raw json.RawMessage) (json.RawMessag } if len(blocks) == 0 { - return json.Marshal("") + return json.Marshal("(empty)") } return json.Marshal(blocks) } @@ -509,30 +545,33 @@ func parseContentBlocks(raw json.RawMessage) []AnthropicContentBlock { // convertResponsesToAnthropicTools maps Responses API tools to Anthropic format. // Reverse of convertAnthropicToolsToResponses. +// +// Every emitted tool must carry a valid input_schema: Anthropic rejects the +// whole request with 422 if any tool has a null/missing schema. Responses tools +// of type "namespace" (codex MCP/agent tools) and bare "web_search" carry no +// `parameters`, so they must be backfilled with an empty object schema. +// +// web_search is intentionally NOT translated to the Anthropic server-side +// web_search_20250305 tool: some third-party Anthropic-compatible upstreams do +// not implement server tools and return 422. Emitting it as a regular function +// tool keeps the request valid; the upstream model simply sees a callable +// named web_search. func convertResponsesToAnthropicTools(tools []ResponsesTool) []AnthropicTool { var out []AnthropicTool for _, t := range tools { - switch t.Type { - case "web_search", "google_search", "web_search_20250305": - out = append(out, AnthropicTool{ - Type: "web_search_20250305", - Name: "web_search", - }) - case "function": - out = append(out, AnthropicTool{ - Name: t.Name, - Description: t.Description, - InputSchema: normalizeAnthropicInputSchema(t.Parameters), - }) - default: - // Pass through unknown tool types - out = append(out, AnthropicTool{ - Type: t.Type, - Name: t.Name, - Description: t.Description, - InputSchema: t.Parameters, - }) + name := strings.TrimSpace(t.Name) + if name == "" && t.Type == "web_search" { + name = "web_search" } + if name == "" { + // Anthropic rejects the whole request when any tool has an empty name. + continue + } + out = append(out, AnthropicTool{ + Name: name, + Description: t.Description, + InputSchema: normalizeAnthropicInputSchema(t.Parameters), + }) } return out } @@ -594,3 +633,83 @@ func convertResponsesToAnthropicToolChoice(raw json.RawMessage) (json.RawMessage // Pass through unknown return raw, nil } + +// normalizeResponsesArguments converts a Responses function_call.arguments +// field into a JSON object suitable for Anthropic's tool_use.input. +// +// The arguments field has three observed shapes: +// - stringified JSON: "{\"x\":1}" → unwrap one layer → {"x":1} +// - raw JSON object: {"x":1} → use as-is +// - empty/absent → {} +// +// Anything that does not resolve to a JSON object falls back to {} so the +// upstream always receives a valid tool_use.input. +func normalizeResponsesArguments(raw json.RawMessage) json.RawMessage { + trimmed := json.RawMessage(strings.TrimSpace(string(raw))) + if len(trimmed) == 0 || string(trimmed) == "null" { + return json.RawMessage("{}") + } + + // Case 1: stringified JSON — unwrap one layer. + var s string + if err := json.Unmarshal(trimmed, &s); err == nil { + inner := strings.TrimSpace(s) + if inner == "" { + return json.RawMessage("{}") + } + if json.Valid([]byte(inner)) { + innerRaw := json.RawMessage(inner) + if looksLikeJSONObject(innerRaw) { + return innerRaw + } + } + return json.RawMessage("{}") + } + + // Case 2: already a JSON object — use as-is; all other JSON shapes fall back + // to {} because Anthropic tool_use.input must be an object. + if looksLikeJSONObject(trimmed) { + return trimmed + } + return json.RawMessage("{}") +} + +// extractResponsesOutputText converts a Responses function_call_output.output +// field into a plain string for Anthropic's tool_result.content. +// +// The output field has three observed shapes: +// - plain string: "result" → use as-is +// - array of content parts: [{"type":"output_text",...}] → join the text +// - empty/absent → "" +func extractResponsesOutputText(raw json.RawMessage) string { + trimmed := json.RawMessage(strings.TrimSpace(string(raw))) + if len(trimmed) == 0 || string(trimmed) == "null" { + return "" + } + + // Case 1: plain string. + var s string + if err := json.Unmarshal(trimmed, &s); err == nil { + return s + } + + // Case 2: array of content parts. + var parts []ResponsesContentPart + if err := json.Unmarshal(trimmed, &parts); err == nil { + var texts []string + for _, p := range parts { + if p.Text != "" { + texts = append(texts, p.Text) + } + } + return strings.Join(texts, "\n\n") + } + + // Case 3: unknown structure — pass through raw JSON so content is not lost. + return string(trimmed) +} + +func looksLikeJSONObject(raw json.RawMessage) bool { + raw = json.RawMessage(strings.TrimSpace(string(raw))) + return len(raw) > 1 && raw[0] == '{' && raw[len(raw)-1] == '}' +} diff --git a/backend/internal/pkg/apicompat/responses_to_anthropic_tool_pairing_test.go b/backend/internal/pkg/apicompat/responses_to_anthropic_tool_pairing_test.go index b2522f274b2..1a51b5478d1 100644 --- a/backend/internal/pkg/apicompat/responses_to_anthropic_tool_pairing_test.go +++ b/backend/internal/pkg/apicompat/responses_to_anthropic_tool_pairing_test.go @@ -58,7 +58,7 @@ func hasToolResult(blocks []AnthropicContentBlock, toolUseID string) bool { func convertAnthropic(t *testing.T, input string) []AnthropicMessage { t.Helper() - _, messages, err := convertResponsesInputToAnthropic(json.RawMessage(input)) + _, messages, err := convertResponsesInputToAnthropic("", json.RawMessage(input)) require.NoError(t, err) assertAnthropicPairing(t, messages) return messages diff --git a/backend/internal/pkg/apicompat/responses_to_anthropic_tools_system_test.go b/backend/internal/pkg/apicompat/responses_to_anthropic_tools_system_test.go new file mode 100644 index 00000000000..fa0e68dacde --- /dev/null +++ b/backend/internal/pkg/apicompat/responses_to_anthropic_tools_system_test.go @@ -0,0 +1,219 @@ +package apicompat + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// These tests cover the codex → Responses → Anthropic conversion fixes that +// eliminated upstream 422s: +// 1. tools with no parameters (type "namespace"/"web_search") must get a +// valid input_schema, never null +// 2. web_search must be a regular function tool, not an Anthropic server tool +// (some third-party upstreams do not implement server tools → 422) +// 3. codex's top-level `instructions` must map to the Anthropic system field +// 4. `developer` role items must map to system, not leak as user input_text +// 5. an empty/whitespace system must be omitted (Anthropic 422s on empty system) + +func anthReqFrom(t *testing.T, body string) *AnthropicRequest { + t.Helper() + var req ResponsesRequest + require.NoError(t, json.Unmarshal([]byte(body), &req)) + out, err := ResponsesToAnthropicRequest(&req) + require.NoError(t, err) + return out +} + +// systemText extracts the concatenated text from an Anthropic system field, +// which buildSystemJSON emits in array form ([{"type":"text","text":...}]). +func systemText(t *testing.T, raw json.RawMessage) string { + t.Helper() + if len(raw) == 0 { + return "" + } + // array form + var parts []struct { + Type string `json:"type"` + Text string `json:"text"` + } + if err := json.Unmarshal(raw, &parts); err == nil { + var sb []string + for _, p := range parts { + sb = append(sb, p.Text) + } + return strings.Join(sb, "\n\n") + } + // string form (fallback) + var s string + require.NoError(t, json.Unmarshal(raw, &s)) + return s +} + +func TestResponsesToAnthropic_ToolWithoutParametersGetsSchema(t *testing.T) { + // codex namespace tools (mcp__*, multi_agent_v1, codex_app) carry no parameters. + out := anthReqFrom(t, `{ + "model": "claude-opus-4-8", + "input": [{"role":"user","content":[{"type":"input_text","text":"hi"}]}], + "tools": [ + {"type":"namespace","name":"mcp__codegraph","description":"graph"}, + {"type":"namespace","name":"codex_app"} + ] + }`) + require.Len(t, out.Tools, 2) + for _, tool := range out.Tools { + require.NotEmpty(t, tool.InputSchema, "tool %s must have non-null input_schema", tool.Name) + assert.NotEqual(t, "null", string(tool.InputSchema)) + // must be a valid object schema + var sch map[string]any + require.NoError(t, json.Unmarshal(tool.InputSchema, &sch)) + assert.Equal(t, "object", sch["type"]) + } +} + +func TestResponsesToAnthropic_WebSearchIsFunctionToolNotServerTool(t *testing.T) { + out := anthReqFrom(t, `{ + "model": "claude-opus-4-8", + "input": [{"role":"user","content":[{"type":"input_text","text":"hi"}]}], + "tools": [{"type":"web_search"}] + }`) + require.Len(t, out.Tools, 1) + tool := out.Tools[0] + assert.Equal(t, "web_search", tool.Name) + // must NOT be emitted as Anthropic server tool web_search_20250305 + assert.NotEqual(t, "web_search_20250305", tool.Type) + assert.Empty(t, tool.Type, "web_search must be a plain function tool, not a server tool") + require.NotEmpty(t, tool.InputSchema) + assert.NotEqual(t, "null", string(tool.InputSchema)) +} + +func TestResponsesToAnthropic_SkipsToolWithoutName(t *testing.T) { + out := anthReqFrom(t, `{ + "model": "claude-opus-4-8", + "input": [{"role":"user","content":[{"type":"input_text","text":"hi"}]}], + "tools": [ + {"type":"namespace","description":"tool discovery"}, + {"type":"namespace","name":"codex_app"}, + {"type":"web_search"} + ] + }`) + require.Len(t, out.Tools, 2) + assert.Equal(t, "codex_app", out.Tools[0].Name) + assert.Equal(t, "web_search", out.Tools[1].Name) + for _, tool := range out.Tools { + assert.NotEmpty(t, tool.Name) + } +} + +func TestResponsesToAnthropic_FunctionToolSchemaPreserved(t *testing.T) { + out := anthReqFrom(t, `{ + "model": "claude-opus-4-8", + "input": [{"role":"user","content":[{"type":"input_text","text":"hi"}]}], + "tools": [{"type":"function","name":"exec","description":"run","parameters":{"type":"object","properties":{"cmd":{"type":"string"}}}}] + }`) + require.Len(t, out.Tools, 1) + assert.Equal(t, "exec", out.Tools[0].Name) + var sch map[string]any + require.NoError(t, json.Unmarshal(out.Tools[0].InputSchema, &sch)) + props, _ := sch["properties"].(map[string]any) + assert.Contains(t, props, "cmd") +} + +func TestResponsesToAnthropic_InstructionsBecomeSystem(t *testing.T) { + out := anthReqFrom(t, `{ + "model": "claude-opus-4-8", + "instructions": "You are a coding agent.", + "input": [{"role":"user","content":[{"type":"input_text","text":"hi"}]}] + }`) + require.NotEmpty(t, out.System) + sys := systemText(t, out.System) + assert.Contains(t, sys, "You are a coding agent.") +} + +func TestResponsesToAnthropic_DeveloperRoleBecomesSystem(t *testing.T) { + out := anthReqFrom(t, `{ + "model": "claude-opus-4-8", + "input": [ + {"role":"developer","content":[{"type":"input_text","text":"Follow the rules."}]}, + {"role":"user","content":[{"type":"input_text","text":"hi"}]} + ] + }`) + // developer content must be in system, not leaked into a user message + require.NotEmpty(t, out.System) + sys := systemText(t, out.System) + assert.Contains(t, sys, "Follow the rules.") + + // no message content may carry input_text (Anthropic only knows "text") + for _, m := range out.Messages { + assert.NotContains(t, string(m.Content), "input_text", + "input_text must not leak into Anthropic messages") + } +} + +func TestResponsesToAnthropic_InstructionsAndDeveloperConcatenated(t *testing.T) { + out := anthReqFrom(t, `{ + "model": "claude-opus-4-8", + "instructions": "Primary prompt.", + "input": [ + {"role":"developer","content":[{"type":"input_text","text":"Extra context."}]}, + {"role":"user","content":[{"type":"input_text","text":"hi"}]} + ] + }`) + sys := systemText(t, out.System) + assert.Contains(t, sys, "Primary prompt.") + assert.Contains(t, sys, "Extra context.") +} + +func TestResponsesToAnthropic_EmptySystemOmitted(t *testing.T) { + // No instructions, no system/developer items → System must be nil/absent, + // never an empty or whitespace string (Anthropic 422s on empty system). + out := anthReqFrom(t, `{ + "model": "claude-opus-4-8", + "instructions": " ", + "input": [ + {"role":"developer","content":[{"type":"input_text","text":" "}]}, + {"role":"user","content":[{"type":"input_text","text":"hi"}]} + ] + }`) + if len(out.System) > 0 { + sys := systemText(t, out.System) + assert.NotEqual(t, "", strings.TrimSpace(sys), "system must never be empty/whitespace") + } +} + +// codex reads the tool call from the OutputItemDone item, so a streamed +// function_call's output_item.done must carry call_id, name and arguments — +// without them codex cannot execute the tool and stalls. +func TestAnthropicStream_FunctionCallDoneCarriesCallFields(t *testing.T) { + state := &AnthropicEventToResponsesState{} + idx := 0 + var all []ResponsesStreamEvent + all = append(all, AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "message_start", Message: &AnthropicResponse{ID: "msg_1", Model: "claude-opus-4-8"}, + }, state)...) + all = append(all, AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_start", Index: &idx, + ContentBlock: &AnthropicContentBlock{Type: "tool_use", ID: "tu_1", Name: "exec"}, + }, state)...) + all = append(all, AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_delta", Index: &idx, + Delta: &AnthropicDelta{Type: "input_json_delta", PartialJSON: `{"cmd":"ls"}`}, + }, state)...) + all = append(all, AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_stop", Index: &idx, + }, state)...) + + var fcDone *ResponsesOutput + for _, e := range all { + if e.Type == "response.output_item.done" && e.Item != nil && e.Item.Type == "function_call" { + fcDone = e.Item + } + } + require.NotNil(t, fcDone, "must emit function_call output_item.done") + assert.NotEmpty(t, fcDone.CallID, "call_id required") + assert.Equal(t, "exec", fcDone.Name) + assert.JSONEq(t, `{"cmd":"ls"}`, fcDone.Arguments) +} diff --git a/backend/internal/pkg/apicompat/types.go b/backend/internal/pkg/apicompat/types.go index d2937802789..cec129d7cc3 100644 --- a/backend/internal/pkg/apicompat/types.go +++ b/backend/internal/pkg/apicompat/types.go @@ -230,13 +230,31 @@ type ResponsesInputItem struct { Content json.RawMessage `json:"content,omitempty"` // string or []ResponsesContentPart // type=function_call - CallID string `json:"call_id,omitempty"` - Name string `json:"name,omitempty"` - Arguments string `json:"arguments,omitempty"` - ID string `json:"id,omitempty"` + CallID string `json:"call_id,omitempty"` + Name string `json:"name,omitempty"` + // Arguments is stringified JSON per the OpenAI spec, but codex / newer + // clients may send a raw JSON object. RawMessage accepts both; callers + // normalize via normalizeResponsesArguments. + Arguments json.RawMessage `json:"arguments,omitempty"` + ID string `json:"id,omitempty"` // type=function_call_output - Output string `json:"output,omitempty"` + // Output is a plain string in older clients, but newer Responses clients + // (codex) send an array like [{"type":"output_text","text":"..."}]. + // RawMessage accepts both; callers normalize via extractResponsesOutputText. + Output json.RawMessage `json:"output,omitempty"` +} + +// jsonRawString marshals a Go string into a JSON-string RawMessage (i.e. a +// quoted value). Used when building ResponsesInputItem.Arguments / .Output from +// a string source, preserving the OpenAI wire format where these fields are +// emitted as JSON strings. +func jsonRawString(s string) json.RawMessage { + b, err := json.Marshal(s) + if err != nil { + return json.RawMessage(`""`) + } + return json.RawMessage(b) } // ResponsesContentPart is a typed content part in a Responses message. diff --git a/backend/internal/service/anthropic_empty_output.go b/backend/internal/service/anthropic_empty_output.go new file mode 100644 index 00000000000..3baa11c1b2c --- /dev/null +++ b/backend/internal/service/anthropic_empty_output.go @@ -0,0 +1,288 @@ +package service + +import ( + "encoding/json" + "fmt" + "log/slog" + "net/http" + "strings" + + "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" + "github.com/gin-gonic/gin" +) + +const emptyAnthropicCompletionMessage = "upstream returned empty Anthropic completion in HTTP 200" + +func appendRawSSEPair(dst *strings.Builder, eventLine, dataLine string) { + if dst == nil { + return + } + dst.WriteString(eventLine) + dst.WriteByte('\n') + dst.WriteString(dataLine) + dst.WriteByte('\n') + dst.WriteByte('\n') +} + +func logEmptyAnthropicHTTP200Response(scope string, c *gin.Context, resp *http.Response, originalModel, mappedModel, reason string, upstreamBody string) { + if strings.TrimSpace(upstreamBody) == "" { + upstreamBody = "(empty)" + } + requestID := "" + statusCode := 0 + contentType := "" + upstreamRequestID := "" + if resp != nil { + statusCode = resp.StatusCode + contentType = resp.Header.Get("Content-Type") + upstreamRequestID = resp.Header.Get("x-request-id") + requestID = upstreamRequestID + } + method := "" + path := "" + if c != nil && c.Request != nil { + method = c.Request.Method + path = c.Request.URL.Path + if v, ok := c.Get("request_id"); ok && v != nil { + switch typed := v.(type) { + case string: + requestID = strings.TrimSpace(typed) + case fmt.Stringer: + requestID = strings.TrimSpace(typed.String()) + } + } + } + slog.Warn("anthropic.http_200_empty_response_body", + "scope", scope, + "reason", reason, + "request_id", requestID, + "upstream_request_id", upstreamRequestID, + "status_code", statusCode, + "content_type", contentType, + "method", method, + "path", path, + "model", originalModel, + "upstream_model", mappedModel, + "upstream_response_body", upstreamBody, + ) +} + +func anthropicResponseHasOutput(resp *apicompat.AnthropicResponse) bool { + if resp == nil { + return false + } + for _, block := range resp.Content { + if anthropicContentBlockHasOutput(block) { + return true + } + } + return false +} + +func anthropicStreamEventHasOutput(event *apicompat.AnthropicStreamEvent) bool { + if event == nil { + return false + } + if event.Message != nil && anthropicResponseHasOutput(event.Message) { + return true + } + if event.ContentBlock != nil && anthropicContentBlockHasOutput(*event.ContentBlock) { + return true + } + if event.Delta != nil { + switch event.Delta.Type { + case "text_delta": + return strings.TrimSpace(event.Delta.Text) != "" + case "thinking_delta": + return strings.TrimSpace(event.Delta.Thinking) != "" + case "input_json_delta": + return rawJSONHasObjectContent(json.RawMessage(event.Delta.PartialJSON)) + } + } + return false +} + +func anthropicStreamEventHasVisibleCompletionOutput(event *apicompat.AnthropicStreamEvent) bool { + if event == nil { + return false + } + if event.ContentBlock != nil { + switch event.ContentBlock.Type { + case "text": + return strings.TrimSpace(event.ContentBlock.Text) != "" + case "tool_use", "server_tool_use": + return strings.TrimSpace(event.ContentBlock.Name) != "" || strings.TrimSpace(event.ContentBlock.ID) != "" || rawJSONHasObjectContent(event.ContentBlock.Input) + } + } + if event.Delta != nil { + switch event.Delta.Type { + case "text_delta": + return strings.TrimSpace(event.Delta.Text) != "" + case "input_json_delta": + return rawJSONHasObjectContent(json.RawMessage(event.Delta.PartialJSON)) + } + } + return false +} + +func anthropicContentBlockHasOutput(block apicompat.AnthropicContentBlock) bool { + switch block.Type { + case "text": + return strings.TrimSpace(block.Text) != "" + case "thinking": + return strings.TrimSpace(block.Thinking) != "" + case "tool_use", "server_tool_use": + return strings.TrimSpace(block.Name) != "" || strings.TrimSpace(block.ID) != "" || rawJSONHasObjectContent(block.Input) + default: + return strings.TrimSpace(block.Text) != "" || strings.TrimSpace(block.Thinking) != "" || rawJSONHasObjectContent(block.Input) + } +} + +func responsesResponseHasOutput(resp *apicompat.ResponsesResponse) bool { + if resp == nil { + return false + } + for _, item := range resp.Output { + if responsesOutputHasOutput(item) { + return true + } + } + return false +} + +func ensureResponsesResponseHasOutput(resp *apicompat.ResponsesResponse, acc *apicompat.BufferedResponseAccumulator) error { + if acc != nil && acc.HasContent() { + return nil + } + if responsesResponseHasOutput(resp) { + return nil + } + return newUpstreamStreamEndedFailoverError(emptyAnthropicCompletionMessage) +} + +func responsesStreamEventHasOutput(event *apicompat.ResponsesStreamEvent) bool { + if event == nil { + return false + } + if responsesResponseHasOutput(event.Response) { + return true + } + if event.Item != nil && responsesOutputHasOutput(*event.Item) { + return true + } + + switch strings.TrimSpace(event.Type) { + case "response.output_text.delta": + return strings.TrimSpace(event.Delta) != "" + case "response.function_call_arguments.delta": + return strings.TrimSpace(event.Delta) != "" || strings.TrimSpace(event.Arguments) != "" + case "response.reasoning_summary_text.delta": + return strings.TrimSpace(event.Delta) != "" || strings.TrimSpace(event.Text) != "" + } + return false +} + +func responsesOutputHasOutput(item apicompat.ResponsesOutput) bool { + switch strings.TrimSpace(item.Type) { + case "message": + for _, part := range item.Content { + if responsesContentPartHasOutput(part) { + return true + } + } + case "function_call": + return strings.TrimSpace(item.CallID) != "" || + strings.TrimSpace(item.Name) != "" || + rawJSONHasObjectContent(json.RawMessage(item.Arguments)) + case "reasoning": + if strings.TrimSpace(item.EncryptedContent) != "" { + return true + } + for _, summary := range item.Summary { + if strings.TrimSpace(summary.Text) != "" { + return true + } + } + case "web_search_call": + if item.Action != nil { + return strings.TrimSpace(item.Action.Query) != "" + } + } + return false +} + +func responsesContentPartHasOutput(part apicompat.ResponsesContentPart) bool { + switch strings.TrimSpace(part.Type) { + case "output_text", "text": + return strings.TrimSpace(part.Text) != "" + default: + return strings.TrimSpace(part.Text) != "" + } +} + +func chatChunkHasOutput(chunk apicompat.ChatCompletionsChunk) bool { + for _, choice := range chunk.Choices { + delta := choice.Delta + if delta.Content != nil && strings.TrimSpace(*delta.Content) != "" { + return true + } + if delta.ReasoningContent != nil && strings.TrimSpace(*delta.ReasoningContent) != "" { + return true + } + if len(delta.ToolCalls) > 0 { + return true + } + } + return false +} + +func chatResponseHasOutput(resp *apicompat.ChatCompletionsResponse) bool { + if resp == nil { + return false + } + for _, choice := range resp.Choices { + if chatMessageHasOutput(choice.Message) { + return true + } + } + return false +} + +func chatMessageHasOutput(message apicompat.ChatMessage) bool { + if rawJSONHasTextContent(message.Content) { + return true + } + if strings.TrimSpace(message.ReasoningContent) != "" { + return true + } + if len(message.ToolCalls) > 0 || message.FunctionCall != nil { + return true + } + return false +} + +func rawJSONHasTextContent(raw json.RawMessage) bool { + trimmed := strings.TrimSpace(string(raw)) + if trimmed == "" || trimmed == "null" || trimmed == `""` || trimmed == "[]" || trimmed == "{}" { + return false + } + var text string + if err := json.Unmarshal(raw, &text); err == nil { + return strings.TrimSpace(text) != "" + } + var parts []apicompat.ChatContentPart + if err := json.Unmarshal(raw, &parts); err == nil { + for _, part := range parts { + if strings.TrimSpace(part.Text) != "" { + return true + } + } + return false + } + return rawJSONHasObjectContent(raw) +} + +func rawJSONHasObjectContent(raw json.RawMessage) bool { + trimmed := strings.TrimSpace(string(raw)) + return trimmed != "" && trimmed != "null" && trimmed != "{}" && trimmed != "[]" +} diff --git a/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go b/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go index c0bc0ef15f1..c07a7040a0a 100644 --- a/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go +++ b/backend/internal/service/gateway_anthropic_apikey_passthrough_test.go @@ -915,6 +915,47 @@ func TestGatewayService_AnthropicAPIKeyPassthrough_MissingTerminalEventReturnsEr require.NotNil(t, result) } +func TestGatewayService_AnthropicAPIKeyPassthrough_EmptyHTTP200StreamTriggersFailover(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + svc := &GatewayService{ + cfg: &config.Config{ + Gateway: config.GatewayConfig{ + MaxLineSize: defaultMaxLineSize, + }, + }, + rateLimitService: &RateLimitService{}, + } + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: io.NopCloser(strings.NewReader(strings.Join([]string{ + `event: message_start`, + `data: {"type":"message_start","message":{"id":"msg_1","type":"message","role":"assistant","content":[],"model":"model","usage":{"input_tokens":0,"output_tokens":0}}}`, + "", + `event: message_delta`, + `data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":0}}`, + "", + `event: message_stop`, + `data: {"type":"message_stop"}`, + "", + }, "\n"))), + } + + result, err := svc.handleStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 1}, time.Now(), "claude-3-7-sonnet-20250219") + require.Error(t, err) + require.Nil(t, result) + var failoverErr *UpstreamFailoverError + require.True(t, errors.As(err, &failoverErr)) + require.Contains(t, string(failoverErr.ResponseBody), emptyAnthropicCompletionMessage) + require.Empty(t, rec.Body.String(), "empty passthrough stream must not be returned as HTTP 200") +} + func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_NonStreamingSuccess(t *testing.T) { gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() @@ -949,6 +990,58 @@ func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_NonStreamingSuc require.Equal(t, upstreamJSON, rec.Body.String()) } +func TestGatewayService_AnthropicAPIKeyPassthrough_NonStreamingEmptyHTTP200TriggersFailover(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + body := `{"id":"msg_1","type":"message","role":"assistant","content":[],"model":"model","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0}}` + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(body)), + } + svc := &GatewayService{ + cfg: &config.Config{}, + rateLimitService: &RateLimitService{}, + } + + usage, err := svc.handleNonStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 1}) + require.Error(t, err) + require.Nil(t, usage) + var failoverErr *UpstreamFailoverError + require.True(t, errors.As(err, &failoverErr)) + require.Contains(t, string(failoverErr.ResponseBody), emptyAnthropicCompletionMessage) + require.Empty(t, rec.Body.String(), "empty passthrough response must not be returned as HTTP 200") +} + +func TestGatewayService_AnthropicAPIKeyPassthrough_NonStreamingHTTP200ErrorObjectTriggersFailover(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + body := `{"type":"error","error":{"type":"overloaded_error","message":"upstream overloaded"}}` + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(body)), + } + svc := &GatewayService{ + cfg: &config.Config{}, + rateLimitService: &RateLimitService{}, + } + + usage, err := svc.handleNonStreamingResponseAnthropicAPIKeyPassthrough(context.Background(), resp, c, &Account{ID: 1}) + require.Error(t, err) + require.Nil(t, usage) + var failoverErr *UpstreamFailoverError + require.True(t, errors.As(err, &failoverErr)) + require.Contains(t, string(failoverErr.ResponseBody), "upstream overloaded") + require.Empty(t, rec.Body.String(), "HTTP 200 error object must not be returned as success") +} + func TestGatewayService_AnthropicAPIKeyPassthrough_ForwardDirect_InvalidTokenType(t *testing.T) { gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() diff --git a/backend/internal/service/gateway_forward_as_responses.go b/backend/internal/service/gateway_forward_as_responses.go index 22951b88b1e..d2c68d7cf71 100644 --- a/backend/internal/service/gateway_forward_as_responses.go +++ b/backend/internal/service/gateway_forward_as_responses.go @@ -97,6 +97,8 @@ func (s *GatewayService) ForwardAsResponses( if shouldMimicClaudeCode { anthropicBody = s.applyClaudeCodeOAuthMimicryToBody(ctx, c, account, anthropicBody, anthropicReq.System, mappedModel) + } else { + anthropicBody = s.applyAnthropicMessageCacheTransforms(ctx, c, anthropicBody) } // 7. Enforce cache_control block limit @@ -151,16 +153,23 @@ func (s *GatewayService) ForwardAsResponses( upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + diagnosticDetail := "" + if resp.StatusCode == http.StatusBadRequest { + diagnosticDetail = summarizeAnthropicRequestBodyForOps(anthropicBody) + } if s.shouldFailoverUpstreamError(resp.StatusCode) { appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ - Platform: account.Platform, - AccountID: account.ID, - AccountName: account.Name, - UpstreamStatusCode: resp.StatusCode, - UpstreamRequestID: resp.Header.Get("x-request-id"), - Kind: "failover", - Message: upstreamMsg, + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "failover", + Message: upstreamMsg, + Detail: diagnosticDetail, + UpstreamRequestBody: string(anthropicBody), + UpstreamResponseBody: string(respBody), }) if s.rateLimitService != nil { s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody, mappedModel) @@ -202,6 +211,102 @@ func ExtractResponsesReasoningEffortFromBody(body []byte) *string { return &normalized } +func summarizeAnthropicRequestBodyForOps(body []byte) string { + model := gjson.GetBytes(body, "model").String() + maxTokens := gjson.GetBytes(body, "max_tokens").Int() + stream := gjson.GetBytes(body, "stream").Bool() + thinkingType := gjson.GetBytes(body, "thinking.type").String() + outputEffort := gjson.GetBytes(body, "output_config.effort").String() + + systemCount := 0 + systemTextBytes := 0 + if sys := gjson.GetBytes(body, "system"); sys.Exists() { + if sys.IsArray() { + sys.ForEach(func(_, item gjson.Result) bool { + systemCount++ + systemTextBytes += len(item.Get("text").String()) + return true + }) + } else { + systemCount = 1 + systemTextBytes = len(sys.String()) + } + } + + messageSummaries := make([]string, 0, 64) + emptyContentCount := 0 + emptyTextCount := 0 + toolUseCount := 0 + toolResultCount := 0 + messages := gjson.GetBytes(body, "messages") + if messages.IsArray() { + idx := 0 + messages.ForEach(func(_, msg gjson.Result) bool { + role := msg.Get("role").String() + content := msg.Get("content") + contentKind := content.Type.String() + blocks := 0 + blockSummaries := make([]string, 0, 8) + if content.IsArray() { + content.ForEach(func(_, block gjson.Result) bool { + blocks++ + blockType := block.Get("type").String() + switch blockType { + case "text": + textLen := len(block.Get("text").String()) + if textLen == 0 { + emptyTextCount++ + } + blockSummaries = append(blockSummaries, fmt.Sprintf("text:%d", textLen)) + case "tool_use": + toolUseCount++ + blockSummaries = append(blockSummaries, fmt.Sprintf("tool_use:%s:%s", block.Get("id").String(), block.Get("name").String())) + case "tool_result": + toolResultCount++ + blockSummaries = append(blockSummaries, fmt.Sprintf("tool_result:%s", block.Get("tool_use_id").String())) + default: + if blockType == "" { + blockType = "" + } + blockSummaries = append(blockSummaries, blockType) + } + return len(blockSummaries) < 8 + }) + } else if content.Type == gjson.String { + textLen := len(content.String()) + if textLen == 0 { + emptyTextCount++ + } + blocks = 1 + blockSummaries = append(blockSummaries, fmt.Sprintf("string:%d", textLen)) + } + if blocks == 0 { + emptyContentCount++ + } + messageSummaries = append(messageSummaries, fmt.Sprintf("#%d role=%s content=%s blocks=%d [%s]", idx, role, contentKind, blocks, strings.Join(blockSummaries, ","))) + idx++ + return len(messageSummaries) < 80 + }) + } + + return fmt.Sprintf( + "anthropic_request_summary model=%s max_tokens=%d stream=%v thinking.type=%s output_config.effort=%s system_blocks=%d system_text_bytes=%d messages=%d empty_contents=%d empty_text_blocks=%d tool_use=%d tool_result=%d first_messages=%s", + model, + maxTokens, + stream, + thinkingType, + outputEffort, + systemCount, + systemTextBytes, + int(messages.Get("#").Int()), + emptyContentCount, + emptyTextCount, + toolUseCount, + toolResultCount, + strings.Join(messageSummaries, " | "), + ) +} + func mergeAnthropicUsage(dst *ClaudeUsage, src apicompat.AnthropicUsage) { if dst == nil { return @@ -243,6 +348,7 @@ func (s *GatewayService) handleResponsesBufferedStreamingResponse( // Accumulate the final Anthropic response from streaming events var finalResp *apicompat.AnthropicResponse var usage ClaudeUsage + var upstreamBody strings.Builder for scanner.Scan() { line := scanner.Text() @@ -259,6 +365,7 @@ func (s *GatewayService) handleResponsesBufferedStreamingResponse( if !strings.HasPrefix(dataLine, "data: ") { continue } + appendRawSSEPair(&upstreamBody, line, dataLine) payload := dataLine[6:] var event apicompat.AnthropicStreamEvent @@ -316,8 +423,12 @@ func (s *GatewayService) handleResponsesBufferedStreamingResponse( } if finalResp == nil { - writeResponsesError(c, http.StatusBadGateway, "server_error", "Upstream stream ended without a response") - return nil, fmt.Errorf("upstream stream ended without response") + logEmptyAnthropicHTTP200Response("forward_as_responses_buffered", c, resp, originalModel, mappedModel, "missing_final_response", upstreamBody.String()) + return nil, newUpstreamStreamEndedFailoverError("Upstream stream ended without a response") + } + if !anthropicResponseHasOutput(finalResp) { + logEmptyAnthropicHTTP200Response("forward_as_responses_buffered", c, resp, originalModel, mappedModel, "empty_output", upstreamBody.String()) + return nil, newUpstreamStreamEndedFailoverError(emptyAnthropicCompletionMessage) } // Update usage from accumulated delta @@ -367,20 +478,30 @@ func (s *GatewayService) handleResponsesStreamingResponse( ) (*ForwardResult, error) { requestID := resp.Header.Get("x-request-id") - if s.responseHeaderFilter != nil { - responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) + headersWritten := false + writeStreamHeaders := func() { + if headersWritten { + return + } + headersWritten = true + if s.responseHeaderFilter != nil { + responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) + } + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("X-Accel-Buffering", "no") + c.Writer.WriteHeader(http.StatusOK) } - c.Writer.Header().Set("Content-Type", "text/event-stream") - c.Writer.Header().Set("Cache-Control", "no-cache") - c.Writer.Header().Set("Connection", "keep-alive") - c.Writer.Header().Set("X-Accel-Buffering", "no") - c.Writer.WriteHeader(http.StatusOK) state := apicompat.NewAnthropicEventToResponsesState() state.Model = originalModel var usage ClaudeUsage var firstTokenMs *int firstChunk := true + sawOutput := false + var pendingEvents []string + var upstreamBody strings.Builder scanner := bufio.NewScanner(resp.Body) maxLineSize := defaultMaxLineSize @@ -404,6 +525,9 @@ func (s *GatewayService) handleResponsesStreamingResponse( // processEvent handles a single parsed Anthropic SSE event. processEvent := func(event *apicompat.AnthropicStreamEvent) bool { + if anthropicStreamEventHasVisibleCompletionOutput(event) { + sawOutput = true + } if firstChunk { firstChunk = false ms := int(time.Since(startTime).Milliseconds()) @@ -431,6 +555,17 @@ func (s *GatewayService) handleResponsesStreamingResponse( continue } out := string(reverseToolNamesIfPresent(c, []byte(sse))) + if !sawOutput { + pendingEvents = append(pendingEvents, out) + continue + } + writeStreamHeaders() + for _, pending := range pendingEvents { + if _, err := fmt.Fprint(c.Writer, pending); err != nil { + return true + } + } + pendingEvents = nil if _, err := fmt.Fprint(c.Writer, out); err != nil { logger.L().Info("forward_as_responses stream: client disconnected", zap.String("request_id", requestID), @@ -438,7 +573,7 @@ func (s *GatewayService) handleResponsesStreamingResponse( return true // client disconnected } } - if len(events) > 0 { + if len(events) > 0 && sawOutput { c.Writer.Flush() } return false @@ -452,6 +587,7 @@ func (s *GatewayService) handleResponsesStreamingResponse( continue } out := string(reverseToolNamesIfPresent(c, []byte(sse))) + writeStreamHeaders() fmt.Fprint(c.Writer, out) //nolint:errcheck } c.Writer.Flush() @@ -475,6 +611,7 @@ func (s *GatewayService) handleResponsesStreamingResponse( if !strings.HasPrefix(dataLine, "data: ") { continue } + appendRawSSEPair(&upstreamBody, line, dataLine) payload := dataLine[6:] var event apicompat.AnthropicStreamEvent @@ -501,6 +638,10 @@ func (s *GatewayService) handleResponsesStreamingResponse( } } + if firstChunk || !sawOutput { + logEmptyAnthropicHTTP200Response("forward_as_responses_stream", c, resp, originalModel, mappedModel, "empty_stream", upstreamBody.String()) + return nil, newUpstreamStreamEndedFailoverError(emptyAnthropicCompletionMessage) + } return finalizeStream() } diff --git a/backend/internal/service/gateway_forward_as_responses_test.go b/backend/internal/service/gateway_forward_as_responses_test.go index e48d8b22fa8..d6de142a1af 100644 --- a/backend/internal/service/gateway_forward_as_responses_test.go +++ b/backend/internal/service/gateway_forward_as_responses_test.go @@ -92,3 +92,92 @@ func TestHandleResponsesStreamingResponse_PreservesMessageStartCacheUsage(t *tes require.Equal(t, 4, result.Usage.CacheCreationInputTokens) require.Contains(t, rec.Body.String(), `response.completed`) } + +func TestHandleResponsesBufferedStreamingResponse_EmptyCompletionReturnsFailoverError(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + + resp := &http.Response{ + Header: http.Header{"x-request-id": []string{"rid_responses_empty_buffered"}}, + Body: io.NopCloser(strings.NewReader(strings.Join([]string{ + `event: message_start`, + `data: {"type":"message_start","message":{"id":"msg_empty","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4.5","stop_reason":"","usage":{"input_tokens":12}}}`, + ``, + `event: content_block_start`, + `data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}`, + ``, + `event: message_delta`, + `data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":0}}`, + ``, + }, "\n"))), + } + + svc := &GatewayService{} + result, err := svc.handleResponsesBufferedStreamingResponse(resp, c, "claude-sonnet-4.5", "claude-sonnet-4.5", nil, time.Now()) + require.Error(t, err) + require.Nil(t, result) + var failoverErr *UpstreamFailoverError + require.ErrorAs(t, err, &failoverErr) + require.Equal(t, http.StatusBadGateway, failoverErr.StatusCode) + require.Empty(t, rec.Body.String()) +} + +func TestHandleResponsesStreamingResponse_EmptyHTTP200ReturnsFailoverError(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + + resp := &http.Response{ + Header: http.Header{"x-request-id": []string{"rid_responses_empty_stream"}}, + Body: io.NopCloser(strings.NewReader("")), + } + + svc := &GatewayService{} + result, err := svc.handleResponsesStreamingResponse(resp, c, "claude-sonnet-4.5", "claude-sonnet-4.5", nil, time.Now()) + require.Error(t, err) + require.Nil(t, result) + var failoverErr *UpstreamFailoverError + require.ErrorAs(t, err, &failoverErr) + require.Equal(t, http.StatusBadGateway, failoverErr.StatusCode) + require.Empty(t, rec.Body.String()) +} + +func TestHandleResponsesStreamingResponse_EmptyCompletionDoesNotWriteStream(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + + resp := &http.Response{ + Header: http.Header{"x-request-id": []string{"rid_responses_empty_completion_stream"}}, + Body: io.NopCloser(strings.NewReader(strings.Join([]string{ + `event: message_start`, + `data: {"type":"message_start","message":{"id":"msg_empty","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4.5","stop_reason":"","usage":{"input_tokens":20}}}`, + ``, + `event: content_block_start`, + `data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}`, + ``, + `event: message_delta`, + `data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":0}}`, + ``, + `event: message_stop`, + `data: {"type":"message_stop"}`, + ``, + }, "\n"))), + } + + svc := &GatewayService{} + result, err := svc.handleResponsesStreamingResponse(resp, c, "claude-sonnet-4.5", "claude-sonnet-4.5", nil, time.Now()) + require.Error(t, err) + require.Nil(t, result) + var failoverErr *UpstreamFailoverError + require.ErrorAs(t, err, &failoverErr) + require.Equal(t, http.StatusBadGateway, failoverErr.StatusCode) + require.Empty(t, rec.Body.String()) +} diff --git a/backend/internal/service/gateway_messages_cache.go b/backend/internal/service/gateway_messages_cache.go index 8f7e6b5b7db..18c780fde47 100644 --- a/backend/internal/service/gateway_messages_cache.go +++ b/backend/internal/service/gateway_messages_cache.go @@ -95,6 +95,23 @@ func (s *GatewayService) rewriteMessageCacheControlIfEnabled(ctx context.Context return addMessageCacheBreakpoints(body) } +type toolNameRewriteSetter interface { + Set(string, any) +} + +func (s *GatewayService) applyAnthropicMessageCacheTransforms(ctx context.Context, c toolNameRewriteSetter, body []byte) []byte { + body = s.rewriteMessageCacheControlIfEnabled(ctx, body) + if rw := buildToolNameRewriteFromBody(body); rw != nil { + body = applyToolNameRewriteToBody(body, rw) + if c != nil { + c.Set(toolNameRewriteKey, rw) + } + } else { + body = applyToolsLastCacheBreakpoint(body) + } + return body +} + func (s *GatewayService) isRewriteMessageCacheControlEnabled(ctx context.Context) bool { if s == nil { return false diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 531c93e4e75..24bebb74d1d 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -26,6 +26,7 @@ import ( "unsafe" "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" "github.com/Wei-Shaw/sub2api/internal/pkg/claude" "github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" @@ -548,6 +549,49 @@ type ClaudeUsage struct { ImageOutputTokens int `json:"image_output_tokens,omitempty"` } +func (u ClaudeUsage) totalTokenLikeCount() int { + return u.InputTokens + u.OutputTokens + u.CacheCreationInputTokens + u.CacheReadInputTokens + u.CacheCreation5mTokens + u.CacheCreation1hTokens + u.ImageOutputTokens +} + +func logHTTP200UsageAnomaly(scope string, account *Account, body []byte, reason string) { + bodyText := string(body) + if len(body) == 0 { + bodyText = "(empty)" + } + accountID := int64(0) + accountName := "" + platform := "" + if account != nil { + accountID = account.ID + accountName = account.Name + platform = account.Platform + } + slog.Warn("upstream returned HTTP 200 with abnormal usage/body", + "scope", scope, + "reason", reason, + "platform", platform, + "account_id", accountID, + "account_name", accountName, + "body", bodyText, + ) +} + +func responseBodyHasErrorObject(body []byte) bool { + if len(body) == 0 || !gjson.ValidBytes(body) { + return false + } + if gjson.GetBytes(body, "error").Exists() { + return true + } + if gjson.GetBytes(body, "error_code").Exists() { + return true + } + if strings.EqualFold(gjson.GetBytes(body, "type").String(), "error") { + return true + } + return false +} + // ForwardResult 转发结果 type ForwardResult struct { RequestID string @@ -5523,6 +5567,9 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( var firstTokenMs *int clientDisconnected := false sawTerminalEvent := false + sawOutput := false + var pendingLines []string + var upstreamBody strings.Builder scanner := bufio.NewScanner(resp.Body) maxLineSize := defaultMaxLineSize @@ -5592,6 +5639,38 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( } lastDataAt := time.Now() inPartialEvent := false + writeLine := func(line string) { + if clientDisconnected { + return + } + restored := string(reverseToolNamesIfPresent(c, []byte(line))) + if _, err := io.WriteString(w, restored); err != nil { + clientDisconnected = true + logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] Client disconnected during streaming, continue draining upstream for usage: account=%d", account.ID) + return + } + if _, err := io.WriteString(w, "\n"); err != nil { + clientDisconnected = true + logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] Client disconnected during streaming, continue draining upstream for usage: account=%d", account.ID) + return + } + if line == "" { + flusher.Flush() + lastDataAt = time.Now() + inPartialEvent = false + } else { + inPartialEvent = true + } + } + flushPending := func() { + for _, pendingLine := range pendingLines { + writeLine(pendingLine) + if clientDisconnected { + break + } + } + pendingLines = nil + } for { select { @@ -5599,7 +5678,13 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( if !ok { if !clientDisconnected { // 兜底补刷,确保最后一个未以空行结尾的事件也能及时送达客户端。 - flusher.Flush() + if sawOutput { + flusher.Flush() + } + } + if !sawOutput { + logEmptyAnthropicHTTP200Response("anthropic_passthrough_stream", c, resp, model, model, "empty_stream", upstreamBody.String()) + return nil, newUpstreamStreamEndedFailoverError(emptyAnthropicCompletionMessage) } if !sawTerminalEvent { if clientDisconnected && streamInterval > 0 { @@ -5614,6 +5699,10 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( } if ev.err != nil { if sawTerminalEvent { + if !sawOutput { + logEmptyAnthropicHTTP200Response("anthropic_passthrough_stream", c, resp, model, model, "empty_stream_after_terminal", upstreamBody.String()) + return nil, newUpstreamStreamEndedFailoverError(emptyAnthropicCompletionMessage) + } return &streamingResult{usage: usage, firstTokenMs: firstTokenMs, clientDisconnect: clientDisconnected}, nil } if clientDisconnected { @@ -5630,12 +5719,26 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( } line := ev.line + upstreamBody.WriteString(line) + upstreamBody.WriteByte('\n') + hasOutput := false if data, ok := extractAnthropicSSEDataLine(line); ok { trimmed := strings.TrimSpace(data) if anthropicStreamEventIsTerminal("", trimmed) { sawTerminalEvent = true } - if firstTokenMs == nil && trimmed != "" && trimmed != "[DONE]" { + if trimmed != "" && trimmed != "[DONE]" { + var streamEvent apicompat.AnthropicStreamEvent + if err := json.Unmarshal([]byte(trimmed), &streamEvent); err == nil { + hasOutput = anthropicStreamEventHasVisibleCompletionOutput(&streamEvent) + } + if !hasOutput { + probeUsage := &ClaudeUsage{} + s.parseSSEUsagePassthrough(trimmed, probeUsage) + hasOutput = probeUsage.totalTokenLikeCount() > 0 + } + } + if firstTokenMs == nil && hasOutput { ms := int(time.Since(startTime).Milliseconds()) firstTokenMs = &ms } @@ -5647,23 +5750,15 @@ func (s *GatewayService) handleStreamingResponseAnthropicAPIKeyPassthrough( } } - if !clientDisconnected { - restored := string(reverseToolNamesIfPresent(c, []byte(line))) - if _, err := io.WriteString(w, restored); err != nil { - clientDisconnected = true - logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] Client disconnected during streaming, continue draining upstream for usage: account=%d", account.ID) - } else if _, err := io.WriteString(w, "\n"); err != nil { - clientDisconnected = true - logger.LegacyPrintf("service.gateway", "[Anthropic passthrough] Client disconnected during streaming, continue draining upstream for usage: account=%d", account.ID) - } else if line == "" { - // 按 SSE 事件边界刷出,减少每行 flush 带来的 syscall 开销。 - flusher.Flush() - lastDataAt = time.Now() - inPartialEvent = false - } else { - inPartialEvent = true + if !sawOutput { + pendingLines = append(pendingLines, line) + if hasOutput { + sawOutput = true + flushPending() } + continue } + writeLine(line) case <-intervalCh: lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt)) @@ -5832,6 +5927,19 @@ func (s *GatewayService) handleNonStreamingResponseAnthropicAPIKeyPassthrough( } usage := parseClaudeUsageFromResponseBody(body) + if resp.StatusCode == http.StatusOK { + switch { + case responseBodyHasErrorObject(body): + logHTTP200UsageAnomaly("anthropic_passthrough_non_stream", account, body, "error_body") + return nil, newUpstreamStreamEndedFailoverError(ExtractUpstreamErrorMessage(body)) + case !gjson.GetBytes(body, "usage").Exists(): + logHTTP200UsageAnomaly("anthropic_passthrough_non_stream", account, body, "usage_missing") + return nil, newUpstreamStreamEndedFailoverError(emptyAnthropicCompletionMessage) + case usage.totalTokenLikeCount() == 0: + logHTTP200UsageAnomaly("anthropic_passthrough_non_stream", account, body, "usage_zero") + return nil, newUpstreamStreamEndedFailoverError(emptyAnthropicCompletionMessage) + } + } writeAnthropicPassthroughResponseHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) contentType := strings.TrimSpace(resp.Header.Get("Content-Type")) diff --git a/backend/internal/service/gateway_tool_rewrite_test.go b/backend/internal/service/gateway_tool_rewrite_test.go index 9e6f6806da5..14bb528d97f 100644 --- a/backend/internal/service/gateway_tool_rewrite_test.go +++ b/backend/internal/service/gateway_tool_rewrite_test.go @@ -10,6 +10,22 @@ import ( "github.com/tidwall/gjson" ) +func TestApplyAnthropicMessageCacheTransforms_RewritesResponsesBridgeBody(t *testing.T) { + body := []byte(`{"model":"claude-sonnet-4.5","system":[{"type":"text","text":"system prompt"}],"messages":[{"role":"user","content":[{"type":"text","text":"stable prompt"}]},{"role":"assistant","content":[{"type":"text","text":"ok"}]},{"role":"user","content":[{"type":"text","text":"latest prompt"}]}],"tools":[{"name":"search","input_schema":{"type":"object"}}]}`) + repo := &gatewayTTLSettingRepo{data: map[string]string{ + SettingKeyRewriteMessageCacheControl: "true", + }} + gatewayForwardingCache.Store(&cachedGatewayForwardingSettings{}) + svc := &GatewayService{settingService: NewSettingService(repo, &config.Config{})} + + out := svc.applyAnthropicMessageCacheTransforms(context.Background(), nil, body) + + require.Equal(t, "ephemeral", gjson.GetBytes(out, "messages.2.content.0.cache_control.type").String()) + require.Equal(t, "5m", gjson.GetBytes(out, "messages.2.content.0.cache_control.ttl").String()) + require.Equal(t, "ephemeral", gjson.GetBytes(out, "tools.0.cache_control.type").String()) + require.Equal(t, "5m", gjson.GetBytes(out, "tools.0.cache_control.ttl").String()) +} + func TestBuildDynamicToolMap_BelowThreshold(t *testing.T) { // Parrot 行为:tools 数量 ≤ 5 时不做动态映射。 names := []string{"bash", "edit", "read", "write", "search"} diff --git a/backend/internal/service/ops_upstream_context.go b/backend/internal/service/ops_upstream_context.go index 2405f306fee..5a5b57ed74a 100644 --- a/backend/internal/service/ops_upstream_context.go +++ b/backend/internal/service/ops_upstream_context.go @@ -119,6 +119,9 @@ type OpsUpstreamErrorEvent struct { // Best-effort upstream response capture (sanitized+trimmed). UpstreamResponseBody string `json:"upstream_response_body,omitempty"` + // Best-effort upstream request capture for diagnostics. + UpstreamRequestBody string `json:"upstream_request_body,omitempty"` + // Kind: http_error | request_error | retry_exhausted | failover Kind string `json:"kind,omitempty"` @@ -136,6 +139,7 @@ func appendOpsUpstreamError(c *gin.Context, ev OpsUpstreamErrorEvent) { ev.Platform = strings.TrimSpace(ev.Platform) ev.UpstreamRequestID = strings.TrimSpace(ev.UpstreamRequestID) ev.UpstreamResponseBody = strings.TrimSpace(ev.UpstreamResponseBody) + ev.UpstreamRequestBody = strings.TrimSpace(ev.UpstreamRequestBody) ev.Kind = strings.TrimSpace(ev.Kind) ev.UpstreamURL = strings.TrimSpace(ev.UpstreamURL) ev.Message = strings.TrimSpace(ev.Message) diff --git a/backend/internal/service/stream_failover.go b/backend/internal/service/stream_failover.go new file mode 100644 index 00000000000..c2537458f74 --- /dev/null +++ b/backend/internal/service/stream_failover.go @@ -0,0 +1,26 @@ +package service + +import ( + "encoding/json" + "net/http" +) + +func newUpstreamStreamEndedFailoverError(message string) *UpstreamFailoverError { + if message == "" { + message = "Upstream stream ended without a response" + } + body, err := json.Marshal(map[string]any{ + "error": map[string]any{ + "type": "upstream_error", + "message": message, + }, + }) + if err != nil { + body = []byte(`{"error":{"type":"upstream_error","message":"Upstream stream ended without a response"}}`) + } + return &UpstreamFailoverError{ + StatusCode: http.StatusBadGateway, + ResponseBody: body, + RetryableOnSameAccount: true, + } +}