diff --git a/backend/internal/pkg/apicompat/anthropic_chatcompletions.go b/backend/internal/pkg/apicompat/anthropic_chatcompletions.go new file mode 100644 index 00000000000..c1827a71f90 --- /dev/null +++ b/backend/internal/pkg/apicompat/anthropic_chatcompletions.go @@ -0,0 +1,743 @@ +package apicompat + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "strings" +) + +// Direct Anthropic Messages <-> OpenAI Chat Completions bridge. This bypasses +// the Responses API hub entirely: +// +// request : /v1/messages --AnthropicToChatCompletions--> /v1/chat/completions +// response : chat SSE --ChatCompletionsChunkToAnthropicEvents--> Anthropic SSE +// +// The streaming response side is a single state machine that opens/closes +// Anthropic content blocks straight from chat deltas (reasoning_content -> +// thinking block, content -> text block, tool_calls -> tool_use block), so +// reasoning can never become an "orphan" the way it can when routed through the +// Responses item lifecycle (chat -> responses -> anthropic). + +// =========================================================================== +// Request side: Anthropic request -> Chat Completions request +// =========================================================================== + +// AnthropicToChatCompletions converts a /v1/messages request body into a +// /v1/chat/completions request for upstreams that only speak Chat Completions. +func AnthropicToChatCompletions(req *AnthropicRequest) (*ChatCompletionsRequest, error) { + if req == nil { + return nil, fmt.Errorf("nil request") + } + + var messages []ChatMessage + if sys := anthropicSystemText(req.System); sys != "" { + messages = append(messages, ChatMessage{Role: "system", Content: mustJSONString(sys)}) + } + for _, m := range req.Messages { + msgs, err := anthropicMessageToChat(m) + if err != nil { + return nil, err + } + messages = append(messages, msgs...) + } + + out := &ChatCompletionsRequest{ + Model: req.Model, + Messages: messages, + Stream: req.Stream, + Temperature: req.Temperature, + TopP: req.TopP, + } + if req.MaxTokens > 0 { + v := req.MaxTokens + out.MaxCompletionTokens = &v + } + if len(req.StopSeqs) > 0 { + if raw, err := json.Marshal(req.StopSeqs); err == nil { + out.Stop = raw + } + } + if len(req.Tools) > 0 { + out.Tools = anthropicToolsToChat(req.Tools) + } + if len(req.ToolChoice) > 0 { + if tc, err := anthropicToolChoiceToChat(req.ToolChoice); err == nil { + out.ToolChoice = tc + } + } + // When the client explicitly disables thinking, forward {type:"disabled"} to + // the upstream (GLM/DeepSeek/Qwen/... honor it natively) and drop + // reasoning_effort, so we never send a "disable thinking" signal together with + // an effort hint that strict upstreams may reject. All other cases keep the + // existing reasoning_effort mapping (enabled / output_config.effort). + if req.Thinking != nil && req.Thinking.Type == "disabled" { + out.Thinking = &ChatThinking{Type: "disabled"} + } else if effort := anthropicReasoningEffort(req); effort != "" { + out.ReasoningEffort = effort + } + return out, nil +} + +// anthropicSystemText flattens the Anthropic system field (string or block +// array) into a single string. +func anthropicSystemText(raw json.RawMessage) string { + if len(raw) == 0 { + return "" + } + var s string + if err := json.Unmarshal(raw, &s); err == nil { + return s + } + var blocks []AnthropicContentBlock + if err := json.Unmarshal(raw, &blocks); err != nil { + return "" + } + var parts []string + for _, b := range blocks { + if b.Type == "text" && b.Text != "" { + parts = append(parts, b.Text) + } + } + return strings.Join(parts, "\n\n") +} + +// anthropicMessageToChat converts one Anthropic message into one or more chat +// messages. A user turn carrying tool_result blocks fans out into separate +// role:"tool" messages (which must follow the assistant tool_calls message). +func anthropicMessageToChat(m AnthropicMessage) ([]ChatMessage, error) { + // Plain string content. + var s string + if err := json.Unmarshal(m.Content, &s); err == nil { + role := m.Role + if role != "user" && role != "assistant" { + role = "user" + } + return []ChatMessage{{Role: role, Content: mustJSONString(s)}}, nil + } + + var blocks []AnthropicContentBlock + if err := json.Unmarshal(m.Content, &blocks); err != nil { + return nil, err + } + + if m.Role == "assistant" { + return anthropicAssistantBlocksToChat(blocks), nil + } + return anthropicUserBlocksToChat(blocks), nil +} + +func anthropicUserBlocksToChat(blocks []AnthropicContentBlock) []ChatMessage { + var out []ChatMessage + var toolMsgs []ChatMessage + var parts []ChatContentPart + + for _, b := range blocks { + switch b.Type { + case "text": + if b.Text != "" { + parts = append(parts, ChatContentPart{Type: "text", Text: b.Text}) + } + case "image": + if uri := anthropicImageDataURI(b.Source); uri != "" { + parts = append(parts, ChatContentPart{Type: "image_url", ImageURL: &ChatImageURL{URL: uri}}) + } + case "tool_result": + toolMsgs = append(toolMsgs, ChatMessage{ + Role: "tool", + ToolCallID: b.ToolUseID, + Content: mustJSONString(anthropicToolResultText(b)), + }) + } + } + + // tool_result replies must come first: Chat Completions requires the tool + // messages to immediately follow the assistant's tool_calls message. Any new + // user text/image in the same Anthropic turn is a fresh user turn and follows. + out = append(out, toolMsgs...) + if len(parts) > 0 { + out = append(out, ChatMessage{Role: "user", Content: chatContentFromParts(parts)}) + } + return out +} + +func anthropicAssistantBlocksToChat(blocks []AnthropicContentBlock) []ChatMessage { + msg := ChatMessage{Role: "assistant"} + var textParts []string + for _, b := range blocks { + switch b.Type { + case "text": + if b.Text != "" { + textParts = append(textParts, b.Text) + } + case "tool_use": + args := "{}" + if len(b.Input) > 0 { + args = string(b.Input) + } + msg.ToolCalls = append(msg.ToolCalls, ChatToolCall{ + ID: b.ID, + Type: "function", + Function: ChatFunctionCall{Name: b.Name, Arguments: args}, + }) + // thinking blocks: dropped (chat upstreams reject them as input) + } + } + if text := strings.Join(textParts, "\n\n"); text != "" { + msg.Content = mustJSONString(text) + } + return []ChatMessage{msg} +} + +func anthropicToolResultText(b AnthropicContentBlock) string { + if len(b.Content) == 0 { + return "(empty)" + } + var s string + if err := json.Unmarshal(b.Content, &s); err == nil { + if s == "" { + return "(empty)" + } + return s + } + var inner []AnthropicContentBlock + if err := json.Unmarshal(b.Content, &inner); err != nil { + return "(empty)" + } + var parts []string + for _, ib := range inner { + if ib.Type == "text" && ib.Text != "" { + parts = append(parts, ib.Text) + } + } + if text := strings.Join(parts, "\n\n"); text != "" { + return text + } + return "(empty)" +} + +func anthropicImageDataURI(src *AnthropicImageSource) string { + if src == nil || src.Data == "" { + return "" + } + mediaType := src.MediaType + if mediaType == "" { + mediaType = "image/png" + } + return "data:" + mediaType + ";base64," + src.Data +} + +// chatContentFromParts emits a plain string when the content is a single text +// part (the common case), else the multi-modal parts array. +func chatContentFromParts(parts []ChatContentPart) json.RawMessage { + if len(parts) == 1 && parts[0].Type == "text" { + return mustJSONString(parts[0].Text) + } + raw, _ := json.Marshal(parts) + return raw +} + +func anthropicToolsToChat(tools []AnthropicTool) []ChatTool { + var out []ChatTool + for _, t := range tools { + // keep only custom tools (type empty); any typed tool is an Anthropic built-in the chat upstream can't use + if t.Type != "" { + continue + } + strict := false + out = append(out, ChatTool{ + Type: "function", + Function: &ChatFunction{ + Name: t.Name, + Description: t.Description, + Parameters: normalizeToolParams(t.InputSchema), + Strict: &strict, + }, + }) + } + return out +} + +func anthropicToolChoiceToChat(raw json.RawMessage) (json.RawMessage, error) { + var tc struct { + Type string `json:"type"` + Name string `json:"name"` + } + if err := json.Unmarshal(raw, &tc); err != nil { + return nil, err + } + switch tc.Type { + case "auto": + return json.Marshal("auto") + case "any": + return json.Marshal("required") + case "none": + return json.Marshal("none") + case "tool": + return json.Marshal(map[string]any{ + "type": "function", + "function": map[string]any{"name": tc.Name}, + }) + default: + return raw, nil + } +} + +// anthropicReasoningEffort mirrors the validated chain: output_config.effort +// wins; otherwise enabled thinking defaults to "medium"; else omitted. +func anthropicReasoningEffort(req *AnthropicRequest) string { + effort := "" + if req.OutputConfig != nil && req.OutputConfig.Effort != "" { + effort = req.OutputConfig.Effort + } else if req.Thinking != nil && req.Thinking.Type == "enabled" { + effort = "medium" + } + if effort == "max" { + return "xhigh" + } + return effort +} + +func normalizeToolParams(schema json.RawMessage) json.RawMessage { + if len(schema) == 0 || string(schema) == "null" { + return json.RawMessage(`{"type":"object","properties":{}}`) + } + var m map[string]json.RawMessage + if err := json.Unmarshal(schema, &m); err != nil { + return schema + } + if string(m["type"]) != `"object"` { + return schema + } + if _, ok := m["properties"]; ok { + return schema + } + m["properties"] = json.RawMessage(`{}`) + if out, err := json.Marshal(m); err == nil { + return out + } + return schema +} + +func mustJSONString(s string) json.RawMessage { + raw, _ := json.Marshal(s) + return raw +} + +// =========================================================================== +// Response side: streaming Chat Completions -> Anthropic SSE +// =========================================================================== + +// ChatCompletionsToAnthropicStreamState carries the cross-chunk state needed to +// reconstruct Anthropic content blocks from a Chat Completions SSE stream. +type ChatCompletionsToAnthropicStreamState struct { + MessageID string + Model string + + started bool + stopped bool + + blockOpen bool + blockType string // "thinking" | "text" | "tool_use" + blockIndex int + nextIndex int + + // Current tool_use block. Blocks are serialized (one open at a time), so a + // new tool_calls index closes the previous tool block before opening. + curToolChatIdx int // -1 when no tool block is open + curToolName string + curToolArgs strings.Builder + curToolBuffered bool // "Read": buffer args, emit one sanitized delta at close + curToolStreamed bool + + reasoning strings.Builder // accumulated for the reasoning-only fallback + textEmitted bool + + hasTool bool + finish string + usage *ChatUsage +} + +// NewChatCompletionsToAnthropicStreamState builds an empty stream state. +func NewChatCompletionsToAnthropicStreamState(model string) *ChatCompletionsToAnthropicStreamState { + return &ChatCompletionsToAnthropicStreamState{Model: model, curToolChatIdx: -1} +} + +// ChatCompletionsChunkToAnthropicEvents converts one Chat Completions stream +// chunk into zero or more Anthropic SSE events, mutating state. +func ChatCompletionsChunkToAnthropicEvents(chunk *ChatCompletionsChunk, s *ChatCompletionsToAnthropicStreamState) []AnthropicStreamEvent { + if chunk == nil || s == nil { + return nil + } + if chunk.ID != "" { + s.MessageID = chunk.ID + } + if s.Model == "" && chunk.Model != "" { + s.Model = chunk.Model + } + if chunk.Usage != nil { + s.usage = chunk.Usage + } + + var events []AnthropicStreamEvent + events = append(events, s.ensureStart()...) + + for _, choice := range chunk.Choices { + d := choice.Delta + + if d.ReasoningContent != nil && *d.ReasoningContent != "" { + events = append(events, s.openBlock("thinking")...) + _, _ = s.reasoning.WriteString(*d.ReasoningContent) + events = append(events, contentBlockDelta(s.blockIndex, AnthropicDelta{ + Type: "thinking_delta", Thinking: *d.ReasoningContent, + })) + } + + if d.Content != nil && *d.Content != "" { + events = append(events, s.openBlock("text")...) + s.textEmitted = true + events = append(events, contentBlockDelta(s.blockIndex, AnthropicDelta{ + Type: "text_delta", Text: *d.Content, + })) + } + + for _, tc := range d.ToolCalls { + idx := 0 + if tc.Index != nil { + idx = *tc.Index + } + if !s.blockOpen || s.blockType != "tool_use" || s.curToolChatIdx != idx { + toolID := tc.ID + if toolID == "" { + toolID = generateToolUseID() + } + events = append(events, s.openToolBlock(idx, toolID, tc.Function.Name)...) + } else if tc.Function.Name != "" && s.curToolName == "" { + s.curToolName = tc.Function.Name + s.curToolBuffered = tc.Function.Name == "Read" + } + if tc.Function.Arguments != "" { + _, _ = s.curToolArgs.WriteString(tc.Function.Arguments) + if !s.curToolBuffered { + s.curToolStreamed = true + events = append(events, contentBlockDelta(s.blockIndex, AnthropicDelta{ + Type: "input_json_delta", PartialJSON: tc.Function.Arguments, + })) + } + } + } + + if choice.FinishReason != nil && *choice.FinishReason != "" { + s.finish = *choice.FinishReason + } + } + return events +} + +// FinalizeChatCompletionsAnthropicStream closes any open block and emits the +// terminal message_delta + message_stop events. Safe to call once. +func FinalizeChatCompletionsAnthropicStream(s *ChatCompletionsToAnthropicStreamState) []AnthropicStreamEvent { + if s == nil || !s.started || s.stopped { + return nil + } + var events []AnthropicStreamEvent + events = append(events, s.closeBlock()...) + + // Reasoning-only completion (reasoning present, no text, no tools): echo the + // reasoning as the answer so the client never gets a thinking block with no + // message. Mirrors sub2api's synthesizeChatReasoningFallbackMessage. + if !s.textEmitted && !s.hasTool { + if r := s.reasoning.String(); strings.TrimSpace(r) != "" { + events = append(events, s.openBlock("text")...) + s.textEmitted = true + events = append(events, contentBlockDelta(s.blockIndex, AnthropicDelta{ + Type: "text_delta", Text: r, + })) + events = append(events, s.closeBlock()...) + } + } + + usage := anthropicUsageFromChatUsage(s.usage) + events = append(events, + AnthropicStreamEvent{ + Type: "message_delta", + Delta: &AnthropicDelta{StopReason: chatFinishToAnthropicStopReason(s.finish, s.hasTool)}, + Usage: &usage, + }, + AnthropicStreamEvent{Type: "message_stop"}, + ) + s.stopped = true + return events +} + +func (s *ChatCompletionsToAnthropicStreamState) ensureStart() []AnthropicStreamEvent { + if s.started { + return nil + } + s.started = true + return []AnthropicStreamEvent{{ + Type: "message_start", + Message: &AnthropicResponse{ + ID: s.MessageID, + Type: "message", + Role: "assistant", + Content: []AnthropicContentBlock{}, + Model: s.Model, + Usage: AnthropicUsage{}, + }, + }} +} + +// openBlock ensures a thinking/text block of the given type is open, closing +// any other open block first. A no-op if that exact block is already open. +func (s *ChatCompletionsToAnthropicStreamState) openBlock(blockType string) []AnthropicStreamEvent { + if s.blockOpen && s.blockType == blockType { + return nil + } + var events []AnthropicStreamEvent + events = append(events, s.closeBlock()...) + + idx := s.nextIndex + s.nextIndex++ + s.blockOpen = true + s.blockType = blockType + s.blockIndex = idx + + bi := idx + events = append(events, AnthropicStreamEvent{ + Type: "content_block_start", + Index: &bi, + ContentBlock: &AnthropicContentBlock{Type: blockType}, + }) + return events +} + +// openToolBlock closes any open block and opens a tool_use block for a new +// chat-completions tool_calls index. +func (s *ChatCompletionsToAnthropicStreamState) openToolBlock(chatIdx int, id, name string) []AnthropicStreamEvent { + var events []AnthropicStreamEvent + events = append(events, s.closeBlock()...) + + idx := s.nextIndex + s.nextIndex++ + s.blockOpen = true + s.blockType = "tool_use" + s.blockIndex = idx + s.hasTool = true + s.curToolChatIdx = chatIdx + s.curToolName = name + s.curToolBuffered = name == "Read" + s.curToolStreamed = false + s.curToolArgs.Reset() + + bi := idx + events = append(events, AnthropicStreamEvent{ + Type: "content_block_start", + Index: &bi, + ContentBlock: &AnthropicContentBlock{ + Type: "tool_use", + ID: id, + Name: name, + Input: json.RawMessage("{}"), + }, + }) + return events +} + +func (s *ChatCompletionsToAnthropicStreamState) closeBlock() []AnthropicStreamEvent { + if !s.blockOpen { + return nil + } + var events []AnthropicStreamEvent + idx := s.blockIndex + + // A tool whose arguments were never streamed (a buffered "Read" tool, or a + // call that produced no argument fragments) emits them as a single, sanitized + // input_json_delta right before the block closes. Mirrors sub2api's + // resToAnthHandleFuncArgsDone / closeChatToolItems. + if s.blockType == "tool_use" && !s.curToolStreamed { + args := s.curToolArgs.String() + if strings.TrimSpace(args) == "" { + args = "{}" + } + events = append(events, contentBlockDelta(idx, AnthropicDelta{ + Type: "input_json_delta", PartialJSON: string(sanitizeAnthropicToolUseInput(s.curToolName, args)), + })) + } + + s.blockOpen = false + s.blockType = "" + s.curToolChatIdx = -1 + s.curToolName = "" + s.curToolBuffered = false + s.curToolStreamed = false + s.curToolArgs.Reset() + + events = append(events, AnthropicStreamEvent{Type: "content_block_stop", Index: &idx}) + return events +} + +func contentBlockDelta(index int, delta AnthropicDelta) AnthropicStreamEvent { + i := index + return AnthropicStreamEvent{Type: "content_block_delta", Index: &i, Delta: &delta} +} + +func chatFinishToAnthropicStopReason(finish string, hasTool bool) string { + switch finish { + case "stop": + return "end_turn" + case "length": + return "max_tokens" + case "tool_calls": + return "tool_use" + case "content_filter": + return "end_turn" + default: + if hasTool { + return "tool_use" + } + return "end_turn" + } +} + +// anthropicUsageFromChatUsage maps Chat Completions usage to Anthropic usage. +// Anthropic's input_tokens excludes cached tokens (reported separately as +// cache_read_input_tokens), so cached is subtracted out. Mirrors sub2api's +// anthropicUsageFromResponsesUsage. +func anthropicUsageFromChatUsage(u *ChatUsage) AnthropicUsage { + if u == nil { + return AnthropicUsage{} + } + cached := 0 + if u.PromptTokensDetails != nil { + cached = u.PromptTokensDetails.CachedTokens + } + input := u.PromptTokens - cached + if input < 0 { + input = 0 + } + return AnthropicUsage{ + InputTokens: input, + OutputTokens: u.CompletionTokens, + CacheReadInputTokens: cached, + } +} + +// ChatCompletionsStreamToAnthropicResponse is the non-streaming (sync) path: it +// buffers the whole chat stream and assembles a single Anthropic Messages JSON +// response, matching what the gateway does when the client requested +// stream=false (upstream is always streamed, then collapsed). Block order +// mirrors ResponsesToAnthropic: thinking, then text, then tool_use. +func ChatCompletionsStreamToAnthropicResponse(chunks []*ChatCompletionsChunk, model string) *AnthropicResponse { + id := "" + var reasoning, text strings.Builder + type toolAgg struct { + id, name string + args strings.Builder + } + tools := map[int]*toolAgg{} + maxToolIdx := -1 + finish := "" + var usage *ChatUsage + + for _, chunk := range chunks { + if chunk == nil { + continue + } + if chunk.ID != "" { + id = chunk.ID + } + if model == "" && chunk.Model != "" { + model = chunk.Model + } + if chunk.Usage != nil { + usage = chunk.Usage + } + for _, choice := range chunk.Choices { + d := choice.Delta + if d.ReasoningContent != nil { + _, _ = reasoning.WriteString(*d.ReasoningContent) + } + if d.Content != nil { + _, _ = text.WriteString(*d.Content) + } + for _, tc := range d.ToolCalls { + idx := 0 + if tc.Index != nil { + idx = *tc.Index + } + agg, ok := tools[idx] + if !ok { + agg = &toolAgg{} + tools[idx] = agg + if idx > maxToolIdx { + maxToolIdx = idx + } + } + if tc.ID != "" { + agg.id = tc.ID + } + if tc.Function.Name != "" { + agg.name = tc.Function.Name + } + _, _ = agg.args.WriteString(tc.Function.Arguments) + } + if choice.FinishReason != nil && *choice.FinishReason != "" { + finish = *choice.FinishReason + } + } + } + + hasTool := maxToolIdx >= 0 + var blocks []AnthropicContentBlock + if reasoning.Len() > 0 { + blocks = append(blocks, AnthropicContentBlock{Type: "thinking", Thinking: reasoning.String()}) + } + finalText := text.String() + if finalText == "" && !hasTool && strings.TrimSpace(reasoning.String()) != "" { + finalText = reasoning.String() // reasoning-only fallback + } + if finalText != "" { + blocks = append(blocks, AnthropicContentBlock{Type: "text", Text: finalText}) + } + for i := 0; i <= maxToolIdx; i++ { + agg, ok := tools[i] + if !ok { + continue + } + args := agg.args.String() + if strings.TrimSpace(args) == "" { + args = "{}" + } + toolID := agg.id + if toolID == "" { + toolID = generateToolUseID() + } + blocks = append(blocks, AnthropicContentBlock{ + Type: "tool_use", + ID: toolID, + Name: agg.name, + Input: sanitizeAnthropicToolUseInput(agg.name, args), + }) + } + if len(blocks) == 0 { + blocks = append(blocks, AnthropicContentBlock{Type: "text", Text: ""}) + } + + return &AnthropicResponse{ + ID: id, + Type: "message", + Role: "assistant", + Model: model, + Content: blocks, + StopReason: chatFinishToAnthropicStopReason(finish, hasTool), + Usage: anthropicUsageFromChatUsage(usage), + } +} + +func generateToolUseID() string { + b := make([]byte, 12) + _, _ = rand.Read(b) + return "toolu_" + hex.EncodeToString(b) +} diff --git a/backend/internal/pkg/apicompat/anthropic_chatcompletions_test.go b/backend/internal/pkg/apicompat/anthropic_chatcompletions_test.go new file mode 100644 index 00000000000..230d8b4f264 --- /dev/null +++ b/backend/internal/pkg/apicompat/anthropic_chatcompletions_test.go @@ -0,0 +1,354 @@ +package apicompat + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --------------------------------------------------------------------------- +// test helpers +// --------------------------------------------------------------------------- + +func intPtr(i int) *int { return &i } + +func anthChatTextChunk(s string) *ChatCompletionsChunk { + return &ChatCompletionsChunk{Choices: []ChatChunkChoice{{Delta: ChatDelta{Content: stringPtr(s)}}}} +} + +func anthChatReasoningChunk(s string) *ChatCompletionsChunk { + return &ChatCompletionsChunk{Choices: []ChatChunkChoice{{Delta: ChatDelta{ReasoningContent: stringPtr(s)}}}} +} + +func anthChatFinishChunk(reason string) *ChatCompletionsChunk { + return &ChatCompletionsChunk{Choices: []ChatChunkChoice{{FinishReason: stringPtr(reason)}}} +} + +func anthChatConcatDelta(events []AnthropicStreamEvent, deltaType string) string { + var b strings.Builder + for _, e := range events { + if e.Type != "content_block_delta" || e.Delta == nil || e.Delta.Type != deltaType { + continue + } + switch deltaType { + case "text_delta": + _, _ = b.WriteString(e.Delta.Text) + case "thinking_delta": + _, _ = b.WriteString(e.Delta.Thinking) + case "input_json_delta": + _, _ = b.WriteString(e.Delta.PartialJSON) + } + } + return b.String() +} + +// anthChatInputJSONByIndex concatenates input_json_delta partials per block index. +func anthChatInputJSONByIndex(events []AnthropicStreamEvent) map[int]string { + m := map[int]string{} + for _, e := range events { + if e.Type == "content_block_delta" && e.Index != nil && e.Delta != nil && e.Delta.Type == "input_json_delta" { + m[*e.Index] += e.Delta.PartialJSON + } + } + return m +} + +// anthChatStartedBlocks maps each started content block index to its type. +func anthChatStartedBlocks(events []AnthropicStreamEvent) map[int]string { + m := map[int]string{} + for _, e := range events { + if e.Type == "content_block_start" && e.Index != nil && e.ContentBlock != nil { + m[*e.Index] = e.ContentBlock.Type + } + } + return m +} + +func anthChatMessageDelta(events []AnthropicStreamEvent) *AnthropicStreamEvent { + for i := range events { + if events[i].Type == "message_delta" { + return &events[i] + } + } + return nil +} + +// --------------------------------------------------------------------------- +// AnthropicToChatCompletions (request) tests +// --------------------------------------------------------------------------- + +func TestAnthropicToChatCompletions_SystemAndToolResultOrdering(t *testing.T) { + req := &AnthropicRequest{ + Model: "claude", + System: json.RawMessage(`"You are helpful"`), + Messages: []AnthropicMessage{ + {Role: "user", Content: json.RawMessage(`"hi"`)}, + {Role: "assistant", Content: json.RawMessage(`[{"type":"tool_use","id":"call_1","name":"get_weather","input":{"location":"Paris"}}]`)}, + {Role: "user", Content: json.RawMessage(`[{"type":"tool_result","tool_use_id":"call_1","content":"sunny"},{"type":"text","text":"thanks"}]`)}, + }, + } + + out, err := AnthropicToChatCompletions(req) + require.NoError(t, err) + require.Len(t, out.Messages, 5) + + assert.Equal(t, "system", out.Messages[0].Role) + assert.JSONEq(t, `"You are helpful"`, string(out.Messages[0].Content)) + + assert.Equal(t, "user", out.Messages[1].Role) + assert.JSONEq(t, `"hi"`, string(out.Messages[1].Content)) + + assert.Equal(t, "assistant", out.Messages[2].Role) + require.Len(t, out.Messages[2].ToolCalls, 1) + assert.Equal(t, "call_1", out.Messages[2].ToolCalls[0].ID) + assert.Equal(t, "get_weather", out.Messages[2].ToolCalls[0].Function.Name) + assert.JSONEq(t, `{"location":"Paris"}`, out.Messages[2].ToolCalls[0].Function.Arguments) + + // The tool reply must come before any new user text from the same turn. + assert.Equal(t, "tool", out.Messages[3].Role) + assert.Equal(t, "call_1", out.Messages[3].ToolCallID) + assert.JSONEq(t, `"sunny"`, string(out.Messages[3].Content)) + + assert.Equal(t, "user", out.Messages[4].Role) + assert.JSONEq(t, `"thanks"`, string(out.Messages[4].Content)) +} + +func TestAnthropicToChatCompletions_ToolsAndToolChoice(t *testing.T) { + req := &AnthropicRequest{ + Model: "claude", + Tools: []AnthropicTool{ + {Name: "get_weather", Description: "Get weather", InputSchema: json.RawMessage(`{"type":"object","properties":{"location":{"type":"string"}}}`)}, + {Type: "web_search_20250305", Name: "web_search", InputSchema: json.RawMessage(`{}`)}, + {Type: "bash_20250124", Name: "bash"}, + }, + ToolChoice: json.RawMessage(`{"type":"any"}`), + } + + out, err := AnthropicToChatCompletions(req) + require.NoError(t, err) + + require.Len(t, out.Tools, 1, "all typed built-in tools (web_search, bash) should be skipped; only the custom tool survives") + assert.Equal(t, "function", out.Tools[0].Type) + require.NotNil(t, out.Tools[0].Function) + assert.Equal(t, "get_weather", out.Tools[0].Function.Name) + require.NotNil(t, out.Tools[0].Function.Strict) + assert.False(t, *out.Tools[0].Function.Strict) + assert.Equal(t, `"required"`, string(out.ToolChoice)) +} + +func TestAnthropicToChatCompletions_ReasoningEffort(t *testing.T) { + enabled, err := AnthropicToChatCompletions(&AnthropicRequest{Model: "c", Thinking: &AnthropicThinking{Type: "enabled"}}) + require.NoError(t, err) + assert.Equal(t, "medium", enabled.ReasoningEffort) + + maxed, err := AnthropicToChatCompletions(&AnthropicRequest{Model: "c", OutputConfig: &AnthropicOutputConfig{Effort: "max"}}) + require.NoError(t, err) + assert.Equal(t, "xhigh", maxed.ReasoningEffort) +} + +func TestAnthropicToChatCompletions_ThinkingDisabled(t *testing.T) { + // thinking:disabled must forward {type:"disabled"} to the upstream and drop + // reasoning_effort — even when output_config.effort would otherwise set one — + // so reasoning models (GLM/...) stop thinking instead of burning the token + // budget, and strict upstreams never see a disable+effort conflict. + out, err := AnthropicToChatCompletions(&AnthropicRequest{ + Model: "c", + Thinking: &AnthropicThinking{Type: "disabled"}, + OutputConfig: &AnthropicOutputConfig{Effort: "high"}, + }) + require.NoError(t, err) + require.NotNil(t, out.Thinking) + assert.Equal(t, "disabled", out.Thinking.Type) + assert.Equal(t, "", out.ReasoningEffort, "reasoning_effort must be dropped when thinking is disabled") + + // Anthropic-only budget_tokens must never leak into the chat request. + b, err := json.Marshal(out) + require.NoError(t, err) + assert.Contains(t, string(b), `"thinking":{"type":"disabled"}`) + assert.NotContains(t, string(b), "budget_tokens") + + // enabled keeps the existing reasoning_effort mapping and emits no thinking field. + enabled, err := AnthropicToChatCompletions(&AnthropicRequest{Model: "c", Thinking: &AnthropicThinking{Type: "enabled"}}) + require.NoError(t, err) + assert.Nil(t, enabled.Thinking) + assert.Equal(t, "medium", enabled.ReasoningEffort) +} + +// --------------------------------------------------------------------------- +// Streaming chat -> Anthropic SSE tests +// --------------------------------------------------------------------------- + +func TestChatCompletionsChunkToAnthropicEvents_TextAndUsage(t *testing.T) { + st := NewChatCompletionsToAnthropicStreamState("deepseek") + var all []AnthropicStreamEvent + all = append(all, ChatCompletionsChunkToAnthropicEvents(&ChatCompletionsChunk{ID: "id1", Choices: []ChatChunkChoice{{Delta: ChatDelta{Content: stringPtr("Hello")}}}}, st)...) + all = append(all, ChatCompletionsChunkToAnthropicEvents(anthChatTextChunk(" world"), st)...) + all = append(all, ChatCompletionsChunkToAnthropicEvents(&ChatCompletionsChunk{ + Choices: []ChatChunkChoice{{FinishReason: stringPtr("stop")}}, + Usage: &ChatUsage{PromptTokens: 100, CompletionTokens: 20, PromptTokensDetails: &ChatTokenDetails{CachedTokens: 30}}, + }, st)...) + all = append(all, FinalizeChatCompletionsAnthropicStream(st)...) + + require.Equal(t, "message_start", all[0].Type) + require.NotNil(t, all[0].Message) + assert.Equal(t, "assistant", all[0].Message.Role) + assert.Equal(t, "text", anthChatStartedBlocks(all)[0]) + assert.Equal(t, "Hello world", anthChatConcatDelta(all, "text_delta")) + assert.Equal(t, "message_stop", all[len(all)-1].Type) + + md := anthChatMessageDelta(all) + require.NotNil(t, md) + assert.Equal(t, "end_turn", md.Delta.StopReason) + require.NotNil(t, md.Usage) + // Anthropic input_tokens excludes cached; cached surfaces as cache_read. + assert.Equal(t, 70, md.Usage.InputTokens) + assert.Equal(t, 30, md.Usage.CacheReadInputTokens) + assert.Equal(t, 20, md.Usage.OutputTokens) +} + +func TestChatCompletionsChunkToAnthropicEvents_ReasoningThenText(t *testing.T) { + st := NewChatCompletionsToAnthropicStreamState("m") + var all []AnthropicStreamEvent + all = append(all, ChatCompletionsChunkToAnthropicEvents(anthChatReasoningChunk("thinking "), st)...) + all = append(all, ChatCompletionsChunkToAnthropicEvents(anthChatReasoningChunk("more"), st)...) + all = append(all, ChatCompletionsChunkToAnthropicEvents(anthChatTextChunk("answer"), st)...) + all = append(all, FinalizeChatCompletionsAnthropicStream(st)...) + + started := anthChatStartedBlocks(all) + assert.Equal(t, "thinking", started[0]) + assert.Equal(t, "text", started[1]) + assert.Equal(t, "thinking more", anthChatConcatDelta(all, "thinking_delta")) + assert.Equal(t, "answer", anthChatConcatDelta(all, "text_delta")) +} + +// TestChatCompletionsChunkToAnthropicEvents_ParallelToolsNoOrphan is the key +// regression guard: routing chat -> responses -> anthropic produced an orphan +// input_json_delta at a phantom block index for parallel tools. The direct path +// must open every block it deltas into, and never double an argument fragment. +func TestChatCompletionsChunkToAnthropicEvents_ParallelToolsNoOrphan(t *testing.T) { + st := NewChatCompletionsToAnthropicStreamState("m") + var all []AnthropicStreamEvent + all = append(all, ChatCompletionsChunkToAnthropicEvents(&ChatCompletionsChunk{ + ID: "id", + Choices: []ChatChunkChoice{{Delta: ChatDelta{ToolCalls: []ChatToolCall{ + {Index: intPtr(0), ID: "call_a", Type: "function", Function: ChatFunctionCall{Name: "get_weather", Arguments: `{"location":"Paris"}`}}, + {Index: intPtr(1), ID: "call_b", Type: "function", Function: ChatFunctionCall{Name: "get_time", Arguments: `{"tz":"UTC"}`}}, + }}}}, + }, st)...) + all = append(all, ChatCompletionsChunkToAnthropicEvents(anthChatFinishChunk("tool_calls"), st)...) + all = append(all, FinalizeChatCompletionsAnthropicStream(st)...) + + started := anthChatStartedBlocks(all) + require.Len(t, started, 2) + assert.Equal(t, "tool_use", started[0]) + assert.Equal(t, "tool_use", started[1]) + + ij := anthChatInputJSONByIndex(all) + require.Len(t, ij, 2) + // Every block that receives input_json_delta must have been opened (no orphan). + for idx := range ij { + _, ok := started[idx] + assert.Truef(t, ok, "input_json_delta at unopened block index %d (orphan)", idx) + } + // Args appear exactly once each (no doubling). + assert.JSONEq(t, `{"location":"Paris"}`, ij[0]) + assert.JSONEq(t, `{"tz":"UTC"}`, ij[1]) + + md := anthChatMessageDelta(all) + require.NotNil(t, md) + assert.Equal(t, "tool_use", md.Delta.StopReason) +} + +func TestChatCompletionsChunkToAnthropicEvents_ReadToolBuffered(t *testing.T) { + st := NewChatCompletionsToAnthropicStreamState("m") + chunk1 := &ChatCompletionsChunk{ID: "id", Choices: []ChatChunkChoice{{Delta: ChatDelta{ToolCalls: []ChatToolCall{ + {Index: intPtr(0), ID: "call_r", Type: "function", Function: ChatFunctionCall{Name: "Read", Arguments: `{"file_path":"/x",`}}, + }}}}} + chunk2 := &ChatCompletionsChunk{Choices: []ChatChunkChoice{{Delta: ChatDelta{ToolCalls: []ChatToolCall{ + {Index: intPtr(0), Function: ChatFunctionCall{Arguments: `"pages":""}`}}, + }}}}} + + e1 := ChatCompletionsChunkToAnthropicEvents(chunk1, st) + e2 := ChatCompletionsChunkToAnthropicEvents(chunk2, st) + streamPhase := append(append([]AnthropicStreamEvent{}, e1...), e2...) + fin := FinalizeChatCompletionsAnthropicStream(st) + all := append(append([]AnthropicStreamEvent{}, streamPhase...), fin...) + + // "Read" args are buffered: nothing is streamed mid-flight. + assert.Empty(t, anthChatInputJSONByIndex(streamPhase), "Read tool args must be buffered until close") + + // At close, a single sanitized delta with pages:"" stripped. + ij := anthChatInputJSONByIndex(all) + require.Len(t, ij, 1) + assert.JSONEq(t, `{"file_path":"/x"}`, ij[0]) +} + +func TestFinalizeChatCompletionsAnthropicStream_ReasoningOnlyFallback(t *testing.T) { + st := NewChatCompletionsToAnthropicStreamState("m") + _ = ChatCompletionsChunkToAnthropicEvents(anthChatReasoningChunk("the reasoning"), st) + _ = ChatCompletionsChunkToAnthropicEvents(anthChatFinishChunk("stop"), st) + fin := FinalizeChatCompletionsAnthropicStream(st) + + // Reasoning-only completion echoes the reasoning as a text block so the + // client never receives a thinking block with no message. + assert.Equal(t, "text", anthChatStartedBlocks(fin)[1]) + assert.Equal(t, "the reasoning", anthChatConcatDelta(fin, "text_delta")) + + md := anthChatMessageDelta(fin) + require.NotNil(t, md) + assert.Equal(t, "end_turn", md.Delta.StopReason) +} + +// --------------------------------------------------------------------------- +// ChatCompletionsStreamToAnthropicResponse (sync collapse) tests +// --------------------------------------------------------------------------- + +func TestChatCompletionsStreamToAnthropicResponse_BlockOrderAndUsage(t *testing.T) { + chunks := []*ChatCompletionsChunk{ + {ID: "id", Choices: []ChatChunkChoice{{Delta: ChatDelta{ReasoningContent: stringPtr("r")}}}}, + anthChatTextChunk("t"), + {Choices: []ChatChunkChoice{{Delta: ChatDelta{ToolCalls: []ChatToolCall{ + {Index: intPtr(0), ID: "call_a", Type: "function", Function: ChatFunctionCall{Name: "foo", Arguments: `{"a":1}`}}, + }}}}}, + {Choices: []ChatChunkChoice{{FinishReason: stringPtr("tool_calls")}}, Usage: &ChatUsage{PromptTokens: 100, CompletionTokens: 20, PromptTokensDetails: &ChatTokenDetails{CachedTokens: 30}}}, + } + + resp := ChatCompletionsStreamToAnthropicResponse(chunks, "deepseek") + require.NotNil(t, resp) + assert.Equal(t, "id", resp.ID) + assert.Equal(t, "message", resp.Type) + assert.Equal(t, "assistant", resp.Role) + + // Block order mirrors ResponsesToAnthropic: thinking, text, tool_use. + require.Len(t, resp.Content, 3) + assert.Equal(t, "thinking", resp.Content[0].Type) + assert.Equal(t, "r", resp.Content[0].Thinking) + assert.Equal(t, "text", resp.Content[1].Type) + assert.Equal(t, "t", resp.Content[1].Text) + assert.Equal(t, "tool_use", resp.Content[2].Type) + assert.Equal(t, "call_a", resp.Content[2].ID) + assert.Equal(t, "foo", resp.Content[2].Name) + assert.JSONEq(t, `{"a":1}`, string(resp.Content[2].Input)) + + assert.Equal(t, "tool_use", resp.StopReason) + assert.Equal(t, 70, resp.Usage.InputTokens) + assert.Equal(t, 30, resp.Usage.CacheReadInputTokens) + assert.Equal(t, 20, resp.Usage.OutputTokens) +} + +func TestChatCompletionsStreamToAnthropicResponse_ReasoningOnlyFallback(t *testing.T) { + chunks := []*ChatCompletionsChunk{ + {ID: "id", Choices: []ChatChunkChoice{{Delta: ChatDelta{ReasoningContent: stringPtr("just thinking")}}}}, + anthChatFinishChunk("stop"), + } + + resp := ChatCompletionsStreamToAnthropicResponse(chunks, "m") + require.Len(t, resp.Content, 2) + assert.Equal(t, "thinking", resp.Content[0].Type) + assert.Equal(t, "text", resp.Content[1].Type) + assert.Equal(t, "just thinking", resp.Content[1].Text) + assert.Equal(t, "end_turn", resp.StopReason) +} diff --git a/backend/internal/pkg/apicompat/types.go b/backend/internal/pkg/apicompat/types.go index d2937802789..ef6b0a39e9b 100644 --- a/backend/internal/pkg/apicompat/types.go +++ b/backend/internal/pkg/apicompat/types.go @@ -436,6 +436,7 @@ type ChatCompletionsRequest struct { Tools []ChatTool `json:"tools,omitempty"` ToolChoice json.RawMessage `json:"tool_choice,omitempty"` ReasoningEffort string `json:"reasoning_effort,omitempty"` // "low" | "medium" | "high" | "xhigh" + Thinking *ChatThinking `json:"thinking,omitempty"` // GLM/DeepSeek/Qwen-style thinking toggle ServiceTier string `json:"service_tier,omitempty"` Stop json.RawMessage `json:"stop,omitempty"` // string or []string @@ -444,6 +445,14 @@ type ChatCompletionsRequest struct { FunctionCall json.RawMessage `json:"function_call,omitempty"` } +// ChatThinking toggles extended thinking on chat upstreams that follow the +// Anthropic-style {"type":"enabled"|"disabled"} shape (GLM, DeepSeek, Qwen, +// Kimi, MiniMax, ...). Only "type" is carried — never Anthropic-only fields +// such as budget_tokens, which strict upstreams may reject. +type ChatThinking struct { + Type string `json:"type"` +} + // ChatStreamOptions configures streaming behavior. type ChatStreamOptions struct { IncludeUsage bool `json:"include_usage,omitempty"` diff --git a/backend/internal/service/openai_gateway_messages.go b/backend/internal/service/openai_gateway_messages.go index 7d07fcc808d..aabe845a29e 100644 --- a/backend/internal/service/openai_gateway_messages.go +++ b/backend/internal/service/openai_gateway_messages.go @@ -16,6 +16,7 @@ import ( "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" "github.com/Wei-Shaw/sub2api/internal/pkg/claude" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" + "github.com/Wei-Shaw/sub2api/internal/pkg/openai_compat" "github.com/Wei-Shaw/sub2api/internal/util/responseheaders" "github.com/gin-gonic/gin" "go.uber.org/zap" @@ -33,6 +34,12 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic( promptCacheKey string, defaultMappedModel string, ) (*OpenAIForwardResult, error) { + // API-key accounts whose upstream only speaks /v1/chat/completions are served + // through a direct Anthropic<->Chat bridge that bypasses the Responses hub. + if account.Type == AccountTypeAPIKey && !openai_compat.ShouldUseResponsesAPI(account.Extra) { + return s.forwardMessagesViaRawChatCompletions(ctx, c, account, body, defaultMappedModel) + } + startTime := time.Now() // 1. Parse Anthropic request diff --git a/backend/internal/service/openai_messages_chat_fallback.go b/backend/internal/service/openai_messages_chat_fallback.go new file mode 100644 index 00000000000..8bb4a6ba91d --- /dev/null +++ b/backend/internal/service/openai_messages_chat_fallback.go @@ -0,0 +1,426 @@ +package service + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" + "github.com/Wei-Shaw/sub2api/internal/pkg/claude" + "github.com/Wei-Shaw/sub2api/internal/pkg/logger" + "github.com/Wei-Shaw/sub2api/internal/util/responseheaders" + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +// forwardMessagesViaRawChatCompletions serves /v1/messages (Anthropic) clients +// through an upstream that only supports /v1/chat/completions, using the direct +// Anthropic<->Chat bridge (no Responses hub). The upstream is always streamed +// (some compat upstreams don't support a sync response), with usage requested in +// the terminal chunk; the client's original stream preference then selects +// whether we relay Anthropic SSE or collapse the stream into a single Anthropic +// JSON response. +// +// The upstream send block below is intentionally duplicated from +// forwardResponsesViaRawChatCompletions rather than shared, to keep the two +// fallback paths independently evolvable. +func (s *OpenAIGatewayService) forwardMessagesViaRawChatCompletions( + ctx context.Context, + c *gin.Context, + account *Account, + body []byte, + defaultMappedModel string, +) (*OpenAIForwardResult, error) { + startTime := time.Now() + + var anthropicReq apicompat.AnthropicRequest + if err := json.Unmarshal(body, &anthropicReq); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": gin.H{ + "type": "invalid_request_error", + "message": "Failed to parse request body", + }, + }) + return nil, fmt.Errorf("parse anthropic request: %w", err) + } + originalModel := strings.TrimSpace(anthropicReq.Model) + if originalModel == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "error": gin.H{ + "type": "invalid_request_error", + "message": "model is required", + }, + }) + return nil, fmt.Errorf("missing model in request") + } + applyOpenAICompatModelNormalization(&anthropicReq) + clientStream := anthropicReq.Stream + + chatReq, err := apicompat.AnthropicToChatCompletions(&anthropicReq) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": gin.H{ + "type": "invalid_request_error", + "message": err.Error(), + }, + }) + return nil, fmt.Errorf("convert anthropic to chat completions: %w", err) + } + + billingModel := resolveOpenAIForwardModel(account, anthropicReq.Model, defaultMappedModel) + upstreamModel := normalizeOpenAIModelForUpstream(account, billingModel) + chatReq.Model = upstreamModel + + // Always stream the upstream and request usage in the terminal chunk; the + // client's preference is honored by the response writer, not the upstream. + chatReq.Stream = true + chatReq.StreamOptions = &apicompat.ChatStreamOptions{IncludeUsage: true} + + // BetaFastMode -> service_tier: "priority" (parity with the Responses path). + if containsBetaToken(c.GetHeader("anthropic-beta"), claude.BetaFastMode) { + chatReq.ServiceTier = "priority" + } + + var reasoningEffort *string + if e := strings.TrimSpace(chatReq.ReasoningEffort); e != "" { + reasoningEffort = &e + } + + chatBody, err := json.Marshal(chatReq) + if err != nil { + return nil, fmt.Errorf("marshal chat completions request: %w", err) + } + chatBody, err = s.applyOpenAIFastPolicyToBody(ctx, account, upstreamModel, chatBody) + if err != nil { + var blocked *OpenAIFastBlockedError + if errors.As(err, &blocked) { + writeOpenAIFastPolicyBlockedResponse(c, blocked) + } + return nil, err + } + serviceTier := extractOpenAIServiceTierFromBody(chatBody) + + logger.L().Debug("openai messages: forwarding via raw chat completions", + zap.Int64("account_id", account.ID), + zap.String("original_model", originalModel), + zap.String("billing_model", billingModel), + zap.String("upstream_model", upstreamModel), + zap.Bool("stream", clientStream), + ) + + apiKey := account.GetOpenAIApiKey() + if apiKey == "" { + return nil, fmt.Errorf("account %d missing api_key", account.ID) + } + baseURL := account.GetOpenAIBaseURL() + if baseURL == "" { + baseURL = "https://api.openai.com" + } + validatedURL, err := s.validateUpstreamBaseURL(baseURL) + if err != nil { + return nil, fmt.Errorf("invalid base_url: %w", err) + } + targetURL := buildOpenAIChatCompletionsURL(validatedURL) + + upstreamCtx, releaseUpstreamCtx := detachUpstreamContext(ctx) + upstreamReq, err := http.NewRequestWithContext(upstreamCtx, http.MethodPost, targetURL, bytes.NewReader(chatBody)) + releaseUpstreamCtx() + if err != nil { + return nil, fmt.Errorf("build upstream request: %w", err) + } + upstreamReq = upstreamReq.WithContext(WithHTTPUpstreamProfile(upstreamReq.Context(), HTTPUpstreamProfileOpenAI)) + upstreamReq.Header.Set("Content-Type", "application/json") + upstreamReq.Header.Set("Authorization", "Bearer "+apiKey) + upstreamReq.Header.Set("Accept", "text/event-stream") + for key, values := range c.Request.Header { + lowerKey := strings.ToLower(key) + if openaiCCRawAllowedHeaders[lowerKey] { + for _, v := range values { + upstreamReq.Header.Add(key, v) + } + } + } + if customUA := account.GetOpenAIUserAgent(); customUA != "" { + upstreamReq.Header.Set("user-agent", customUA) + } + + proxyURL := "" + if account.Proxy != nil { + proxyURL = account.Proxy.URL() + } + resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) + if err != nil { + return nil, s.handleOpenAIUpstreamTransportError(ctx, c, account, err, false) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode >= 400 { + respBody := s.readUpstreamErrorBody(resp) + _ = resp.Body.Close() + resp.Body = io.NopCloser(bytes.NewReader(respBody)) + + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + if s.shouldFailoverOpenAIUpstreamResponse(resp.StatusCode, upstreamMsg, respBody) { + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + s.handleOpenAIAccountUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody, upstreamModel) + return nil, &UpstreamFailoverError{ + StatusCode: resp.StatusCode, + ResponseBody: respBody, + RetryableOnSameAccount: account.IsPoolMode() && (account.IsPoolModeRetryableStatus(resp.StatusCode) || isOpenAITransientProcessingError(resp.StatusCode, upstreamMsg, respBody)), + } + } + return s.handleErrorResponse(ctx, resp, c, account, chatBody, billingModel) + } + + if clientStream { + return s.streamChatCompletionsAsAnthropic(c, resp, originalModel, billingModel, upstreamModel, reasoningEffort, serviceTier, startTime) + } + return s.bufferChatCompletionsAsAnthropic(c, resp, originalModel, billingModel, upstreamModel, reasoningEffort, serviceTier, startTime) +} + +// streamChatCompletionsAsAnthropic relays an always-streamed upstream Chat +// Completions response to a streaming Anthropic Messages client. +func (s *OpenAIGatewayService) streamChatCompletionsAsAnthropic( + c *gin.Context, + resp *http.Response, + originalModel string, + billingModel string, + upstreamModel string, + reasoningEffort *string, + serviceTier *string, + startTime time.Time, +) (*OpenAIForwardResult, error) { + requestID := resp.Header.Get("x-request-id") + headersWritten := false + writeStreamHeaders := func() { + if headersWritten { + return + } + headersWritten = true + if s.responseHeaderFilter != nil { + responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) + } + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("X-Accel-Buffering", "no") + c.Writer.WriteHeader(http.StatusOK) + } + + state := apicompat.NewChatCompletionsToAnthropicStreamState(originalModel) + var usage OpenAIUsage + var firstTokenMs *int + clientDisconnected := false + + writeEvents := func(events []apicompat.AnthropicStreamEvent) { + if clientDisconnected || len(events) == 0 { + return + } + writeStreamHeaders() + for _, event := range events { + sse, err := apicompat.ResponsesAnthropicEventToSSE(event) + if err != nil { + logger.L().Warn("openai messages chat fallback: failed to marshal stream event", + zap.Error(err), + zap.String("request_id", requestID), + ) + continue + } + if _, err := fmt.Fprint(c.Writer, sse); err != nil { + clientDisconnected = true + logger.L().Debug("openai messages chat fallback: client disconnected, continuing to drain upstream for billing", + zap.Error(err), + zap.String("request_id", requestID), + ) + return + } + } + c.Writer.Flush() + } + + scanner := bufio.NewScanner(resp.Body) + maxLineSize := defaultMaxLineSize + if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 { + maxLineSize = s.cfg.Gateway.MaxLineSize + } + scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) + + for scanner.Scan() { + payload, ok := extractOpenAISSEDataLine(scanner.Text()) + if !ok { + continue + } + payload = strings.TrimSpace(payload) + if payload == "" { + continue + } + if payload == "[DONE]" { + break + } + + if u := extractCCStreamUsage(payload); u != nil { + usage = *u + } + + var chunk apicompat.ChatCompletionsChunk + if err := json.Unmarshal([]byte(payload), &chunk); err != nil { + logger.L().Warn("openai messages chat fallback: failed to parse chat stream chunk", + zap.Error(err), + zap.String("request_id", requestID), + ) + continue + } + if firstTokenMs == nil && !isOpenAIChatUsageOnlyStreamChunk(payload) && chatChunkStartsResponsesOutput(&chunk) { + ms := int(time.Since(startTime).Milliseconds()) + firstTokenMs = &ms + } + writeEvents(apicompat.ChatCompletionsChunkToAnthropicEvents(&chunk, state)) + } + + if err := scanner.Err(); err != nil { + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + logger.L().Warn("openai messages chat fallback: stream read error", + zap.Error(err), + zap.String("request_id", requestID), + ) + } + return &OpenAIForwardResult{ + RequestID: requestID, + Usage: usage, + Model: originalModel, + BillingModel: billingModel, + UpstreamModel: upstreamModel, + ReasoningEffort: reasoningEffort, + ServiceTier: serviceTier, + Stream: true, + Duration: time.Since(startTime), + FirstTokenMs: firstTokenMs, + }, fmt.Errorf("stream usage incomplete: %w", err) + } + + writeEvents(apicompat.FinalizeChatCompletionsAnthropicStream(state)) + if !clientDisconnected { + c.Writer.Flush() + } + + return &OpenAIForwardResult{ + RequestID: requestID, + Usage: usage, + Model: originalModel, + BillingModel: billingModel, + UpstreamModel: upstreamModel, + ReasoningEffort: reasoningEffort, + ServiceTier: serviceTier, + Stream: true, + Duration: time.Since(startTime), + FirstTokenMs: firstTokenMs, + }, nil +} + +// bufferChatCompletionsAsAnthropic drains an always-streamed upstream Chat +// Completions response and collapses it into a single Anthropic Messages JSON +// response for a non-streaming client. +func (s *OpenAIGatewayService) bufferChatCompletionsAsAnthropic( + c *gin.Context, + resp *http.Response, + originalModel string, + billingModel string, + upstreamModel string, + reasoningEffort *string, + serviceTier *string, + startTime time.Time, +) (*OpenAIForwardResult, error) { + requestID := resp.Header.Get("x-request-id") + var usage OpenAIUsage + var chunks []*apicompat.ChatCompletionsChunk + + scanner := bufio.NewScanner(resp.Body) + maxLineSize := defaultMaxLineSize + if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 { + maxLineSize = s.cfg.Gateway.MaxLineSize + } + scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) + + for scanner.Scan() { + payload, ok := extractOpenAISSEDataLine(scanner.Text()) + if !ok { + continue + } + payload = strings.TrimSpace(payload) + if payload == "" { + continue + } + if payload == "[DONE]" { + break + } + + if u := extractCCStreamUsage(payload); u != nil { + usage = *u + } + + var chunk apicompat.ChatCompletionsChunk + if err := json.Unmarshal([]byte(payload), &chunk); err != nil { + logger.L().Warn("openai messages chat fallback: failed to parse chat stream chunk", + zap.Error(err), + zap.String("request_id", requestID), + ) + continue + } + chunks = append(chunks, &chunk) + } + if err := scanner.Err(); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + logger.L().Warn("openai messages chat fallback: stream read error", + zap.Error(err), + zap.String("request_id", requestID), + ) + } + + anthropicResp := apicompat.ChatCompletionsStreamToAnthropicResponse(chunks, originalModel) + + if s.responseHeaderFilter != nil { + responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) + } + // 上游被强制流式,其响应头 Content-Type 为 text/event-stream,会经 + // WriteFilteredHeaders 透传进来;gin 的 c.JSON 仅在头不存在时才设置、无法覆盖已有头。 + // 非流式响应必须是 application/json,否则按 Content-Type 判流式的客户端(如 Claude + // 在 auto 模式做安全分类时发起的非流式请求)会把 JSON 当 SSE 解析而失败。 + c.Writer.Header().Set("Content-Type", "application/json; charset=utf-8") + c.JSON(http.StatusOK, anthropicResp) + + return &OpenAIForwardResult{ + RequestID: requestID, + Usage: usage, + Model: originalModel, + BillingModel: billingModel, + UpstreamModel: upstreamModel, + ReasoningEffort: reasoningEffort, + ServiceTier: serviceTier, + Stream: false, + Duration: time.Since(startTime), + }, nil +} diff --git a/backend/internal/service/openai_messages_chat_fallback_test.go b/backend/internal/service/openai_messages_chat_fallback_test.go new file mode 100644 index 00000000000..e4b8e555e32 --- /dev/null +++ b/backend/internal/service/openai_messages_chat_fallback_test.go @@ -0,0 +1,61 @@ +package service + +import ( + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/require" +) + +// A non-streaming Anthropic /v1/messages client served through the +// chat-completions bridge must receive an application/json response. The bridge +// always streams the upstream, so the upstream's text/event-stream Content-Type +// is forwarded by WriteFilteredHeaders; gin's c.JSON only sets Content-Type when +// absent and cannot override it, so the buffered path must reset it explicitly. +func TestBufferChatCompletionsAsAnthropic_NonStreamUsesJSONContentType(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + + const upstreamSSE = "data: {\"id\":\"chatcmpl-1\",\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"hi\"}}]}\n" + + "data: {\"id\":\"chatcmpl-1\",\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}]}\n" + + "data: [DONE]\n" + + cfg := &config.Config{Gateway: config.GatewayConfig{MaxLineSize: defaultMaxLineSize}} + s := &OpenAIGatewayService{ + cfg: cfg, + responseHeaderFilter: compileResponseHeaderFilter(cfg), + } + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: io.NopCloser(strings.NewReader(upstreamSSE)), + } + + _, err := s.bufferChatCompletionsAsAnthropic( + c, resp, + "claude-3-5-sonnet-20241022", + "gpt-4o", + "gpt-4o", + nil, + nil, + time.Now(), + ) + require.NoError(t, err) + + require.Truef(t, + strings.HasPrefix(rec.Header().Get("Content-Type"), "application/json"), + "non-stream response must be application/json, got %q", rec.Header().Get("Content-Type"), + ) + require.Truef(t, + strings.HasPrefix(strings.TrimSpace(rec.Body.String()), "{"), + "expected JSON object body, got %q", rec.Body.String(), + ) +}