Skip to content

Commit 12d7fea

Browse files
authored
fix(client): route OpenCode Go through dual-protocol client with fallback (#32)
Use OpenCodeGoClient for deployment adapters so MiniMax/Qwen models hit Anthropic /v1/messages and transparently fall back to OpenAI chat when the stream is reasoning-only.
1 parent 90da60e commit 12d7fea

6 files changed

Lines changed: 134 additions & 13 deletions

File tree

client/opencodego.go

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ type OpenCodeGoClient struct {
1313
anthropic *AnthropicClient
1414
}
1515

16+
type openCodeGoStreamOpener func(context.Context, []EyrieMessage, ChatOptions) (*StreamResult, error)
17+
1618
// NewOpenCodeGoClient builds a dual-protocol OpenCode Go provider client.
1719
func NewOpenCodeGoClient(apiKey, baseURL string, opts ...ClientOption) *OpenCodeGoClient {
1820
openBase := strings.TrimRight(strings.TrimSpace(baseURL), "/")
@@ -30,7 +32,21 @@ func (c *OpenCodeGoClient) Name() string { return "opencodego" }
3032

3133
func (c *OpenCodeGoClient) Chat(ctx context.Context, messages []EyrieMessage, opts ChatOptions) (*EyrieResponse, error) {
3234
if openCodeGoUsesAnthropicMessages(opts.Model) {
33-
return c.anthropic.Chat(ctx, messages, opts)
35+
resp, err := c.anthropic.Chat(ctx, messages, opts)
36+
if err != nil {
37+
return nil, err
38+
}
39+
if resp != nil && strings.TrimSpace(resp.Content) != "" {
40+
return resp, nil
41+
}
42+
if c.openAI == nil {
43+
return resp, err
44+
}
45+
openResp, openErr := c.openAI.Chat(ctx, messages, opts)
46+
if openErr == nil && openResp != nil && strings.TrimSpace(openResp.Content) != "" {
47+
return openResp, nil
48+
}
49+
return resp, err
3450
}
3551
resp, err := c.openAI.Chat(ctx, messages, opts)
3652
if err != nil || c.anthropic == nil || !openCodeGoMightNeedAnthropicFallback(opts.Model) {
@@ -48,13 +64,17 @@ func (c *OpenCodeGoClient) Chat(ctx context.Context, messages []EyrieMessage, op
4864

4965
func (c *OpenCodeGoClient) StreamChat(ctx context.Context, messages []EyrieMessage, opts ChatOptions) (*StreamResult, error) {
5066
if openCodeGoUsesAnthropicMessages(opts.Model) {
51-
return c.anthropic.StreamChat(ctx, messages, opts)
67+
primary, err := c.anthropic.StreamChat(ctx, messages, opts)
68+
if err != nil || c.openAI == nil {
69+
return primary, err
70+
}
71+
return newOpenCodeGoStreamWithFallback(ctx, messages, opts, primary, c.openAI.StreamChat), nil
5272
}
5373
primary, err := c.openAI.StreamChat(ctx, messages, opts)
5474
if err != nil || c.anthropic == nil || !openCodeGoMightNeedAnthropicFallback(opts.Model) {
5575
return primary, err
5676
}
57-
return newOpenCodeGoFallbackStream(ctx, c.anthropic, messages, opts, primary), nil
77+
return newOpenCodeGoStreamWithFallback(ctx, messages, opts, primary, c.anthropic.StreamChat), nil
5878
}
5979

6080
func (c *OpenCodeGoClient) Ping(ctx context.Context) error {
@@ -64,9 +84,10 @@ func (c *OpenCodeGoClient) Ping(ctx context.Context) error {
6484
return c.anthropic.Ping(ctx)
6585
}
6686

67-
// newOpenCodeGoFallbackStream watches an OpenAI-format stream; if it ends with
68-
// reasoning tokens but no answer, transparently retries via /v1/messages.
69-
func newOpenCodeGoFallbackStream(ctx context.Context, anthropic *AnthropicClient, messages []EyrieMessage, opts ChatOptions, primary *StreamResult) *StreamResult {
87+
// newOpenCodeGoStreamWithFallback watches a primary stream; if it ends with
88+
// reasoning tokens but no answer, transparently retries via the fallback opener.
89+
// Used for OpenAI→Anthropic and Anthropic→OpenAI on MiniMax/Qwen models.
90+
func newOpenCodeGoStreamWithFallback(ctx context.Context, messages []EyrieMessage, opts ChatOptions, primary *StreamResult, fallback openCodeGoStreamOpener) *StreamResult {
7091
out := make(chan EyrieStreamEvent, streamChannelBuffer)
7192
cancelCtx, cancel := context.WithCancel(ctx)
7293
go func() {
@@ -126,7 +147,7 @@ func newOpenCodeGoFallbackStream(ctx context.Context, anthropic *AnthropicClient
126147
return
127148
}
128149

129-
fallback, err := anthropic.StreamChat(cancelCtx, messages, opts)
150+
fallbackResult, err := fallback(cancelCtx, messages, opts)
130151
if err != nil {
131152
flush()
132153
select {
@@ -135,8 +156,8 @@ func newOpenCodeGoFallbackStream(ctx context.Context, anthropic *AnthropicClient
135156
}
136157
return
137158
}
138-
defer fallback.Close()
139-
for ev := range fallback.Events {
159+
defer fallbackResult.Close()
160+
for ev := range fallbackResult.Events {
140161
select {
141162
case out <- ev:
142163
case <-cancelCtx.Done():

client/opencodego_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package client
22

33
import (
44
"context"
5+
"fmt"
56
"net/http"
7+
"net/http/httptest"
68
"strings"
79
"testing"
810
)
@@ -101,3 +103,51 @@ func TestOpenCodeGoClient_RoutesKimiToOpenAI(t *testing.T) {
101103
t.Fatalf("path = %q, want suffix /chat/completions", gotPath)
102104
}
103105
}
106+
107+
func TestOpenCodeGoClient_AnthropicReasoningOnlyFallsBackToOpenAI(t *testing.T) {
108+
var paths []string
109+
transport := roundTripFunc(func(r *http.Request) (*http.Response, error) {
110+
paths = append(paths, r.URL.Path)
111+
if strings.HasSuffix(r.URL.Path, "/messages") {
112+
w := httptest.NewRecorder()
113+
w.Header().Set("Content-Type", "text/event-stream")
114+
_, _ = fmt.Fprintf(w, "event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"thinking\"}}\n\n")
115+
_, _ = fmt.Fprintf(w, "event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"thinking_delta\",\"text\":\"hmm\"}}\n\n")
116+
_, _ = fmt.Fprintf(w, "event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n")
117+
_, _ = fmt.Fprintf(w, "event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n")
118+
return w.Result(), nil
119+
}
120+
w := httptest.NewRecorder()
121+
w.Header().Set("Content-Type", "text/event-stream")
122+
_, _ = fmt.Fprintf(w, "data: {\"choices\":[{\"delta\":{\"content\":\"Hello!\"},\"finish_reason\":null}]}\n\n")
123+
_, _ = fmt.Fprintf(w, "data: {\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}]}\n\n")
124+
_, _ = fmt.Fprintf(w, "data: [DONE]\n\n")
125+
return w.Result(), nil
126+
})
127+
128+
c := NewOpenCodeGoClient("ocg-test-key", "https://opencode.example/zen/go/v1")
129+
c.anthropic.httpClient = &http.Client{Transport: transport}
130+
c.openAI.httpClient = &http.Client{Transport: transport}
131+
132+
sr, err := c.StreamChat(context.Background(), []EyrieMessage{{Role: "user", Content: "Hi"}}, ChatOptions{
133+
Model: "minimax-m3",
134+
MaxTokens: 256,
135+
})
136+
if err != nil {
137+
t.Fatalf("StreamChat: %v", err)
138+
}
139+
defer sr.Close()
140+
141+
var content string
142+
for ev := range sr.Events {
143+
if ev.Type == "content" {
144+
content += ev.Content
145+
}
146+
}
147+
if content != "Hello!" {
148+
t.Fatalf("content = %q, want Hello!; paths=%v", content, paths)
149+
}
150+
if len(paths) < 2 {
151+
t.Fatalf("expected anthropic then openai paths, got %v", paths)
152+
}
153+
}

client/stream.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bufio"
55
"context"
66
"encoding/json"
7+
"errors"
78
"fmt"
89
"io"
910
"log/slog"
@@ -68,6 +69,14 @@ func parseSSEStream(ctx context.Context, body io.ReadCloser, logger *slog.Logger
6869
}
6970
}
7071
if err := scanner.Err(); err != nil {
72+
// Context cancellation produces "context canceled" from the
73+
// scanner when the body is closed; that is the expected
74+
// shutdown path, not a stream error. Skip the warning and
75+
// the synthetic error event so callers (and operators) don't
76+
// see noise for every normal cancel/close.
77+
if ctxErr := ctx.Err(); ctxErr != nil || errors.Is(err, context.Canceled) {
78+
return
79+
}
7180
logger.Warn("SSE stream read error", "error", err)
7281
select {
7382
case ch <- SSEEvent{Event: "error", Data: fmt.Sprintf("stream read error: %v", err)}:
@@ -109,6 +118,7 @@ func processAnthropicStream(ctx context.Context, sseEvents <-chan SSEEvent, logg
109118
jsonBuf strings.Builder
110119
}
111120
var currentTool *toolAccum
121+
blockTypes := make(map[int]string)
112122
var stopReason string
113123

114124
for {
@@ -145,6 +155,7 @@ func processAnthropicStream(ctx context.Context, sseEvents <-chan SSEEvent, logg
145155
switch ae.Type {
146156
case "content_block_start":
147157
if ae.ContentBlock != nil {
158+
blockTypes[ae.Index] = ae.ContentBlock.Type
148159
switch ae.ContentBlock.Type {
149160
case "tool_use":
150161
currentTool = &toolAccum{id: ae.ContentBlock.ID, name: ae.ContentBlock.Name}
@@ -160,7 +171,11 @@ func processAnthropicStream(ctx context.Context, sseEvents <-chan SSEEvent, logg
160171
switch ae.Delta.Type {
161172
case "text_delta":
162173
if ae.Delta.Text != "" {
163-
emit(ctx, ch, EyrieStreamEvent{Type: "content", Content: ae.Delta.Text})
174+
if strings.Contains(blockTypes[ae.Index], "thinking") {
175+
emit(ctx, ch, EyrieStreamEvent{Type: "thinking", Thinking: ae.Delta.Text})
176+
} else {
177+
emit(ctx, ch, EyrieStreamEvent{Type: "content", Content: ae.Delta.Text})
178+
}
164179
}
165180
case "input_json_delta":
166181
if currentTool != nil && ae.Delta.PartialJSON != "" {
@@ -173,6 +188,7 @@ func processAnthropicStream(ctx context.Context, sseEvents <-chan SSEEvent, logg
173188
}
174189

175190
case "content_block_stop":
191+
delete(blockTypes, ae.Index)
176192
if currentTool != nil {
177193
rawJSON := currentTool.jsonBuf.String()
178194
var args map[string]interface{}

client/stream_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,36 @@ func TestSSEAnthropicThinkingDelta(t *testing.T) {
217217
}
218218
}
219219

220+
func TestSSEAnthropicThinkingTextDeltaHidden(t *testing.T) {
221+
events := make(chan SSEEvent, 10)
222+
events <- SSEEvent{Event: "content_block_start", Data: `{"type":"content_block_start","index":0,"content_block":{"type":"thinking"}}`}
223+
events <- SSEEvent{Event: "content_block_delta", Data: `{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"private reasoning"}}`}
224+
events <- SSEEvent{Event: "content_block_stop", Data: `{"type":"content_block_stop","index":0}`}
225+
events <- SSEEvent{Event: "message_stop", Data: `{"type":"message_stop"}`}
226+
close(events)
227+
228+
ctx := context.Background()
229+
ch := processAnthropicStream(ctx, events, testLogger())
230+
231+
var sawThinking, sawContent bool
232+
for evt := range ch {
233+
switch evt.Type {
234+
case "thinking":
235+
if evt.Thinking == "private reasoning" {
236+
sawThinking = true
237+
}
238+
case "content":
239+
sawContent = true
240+
}
241+
}
242+
if !sawThinking {
243+
t.Fatal("expected thinking event for text_delta inside thinking block")
244+
}
245+
if sawContent {
246+
t.Fatal("did not expect visible content from thinking block text_delta")
247+
}
248+
}
249+
220250
func TestSSEAnthropicStopReason(t *testing.T) {
221251
events := make(chan SSEEvent, 10)
222252
events <- SSEEvent{Event: "message_delta", Data: `{"type":"message_delta","delta":{"stop_reason":"max_tokens"}}`}

setup/deployment.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func ProviderForDeployment(id string, deployment config.DeploymentConfig) (clien
232232
if apiKey == "" {
233233
return nil, false
234234
}
235-
return client.NewOpenAIClient(apiKey, FirstNonEmpty(deployment.BaseURL, config.DefaultOpenCodeGoBaseURL), &client.OpenCodeGoCompat), true
235+
return client.NewOpenCodeGoClient(apiKey, FirstNonEmpty(deployment.BaseURL, config.DefaultOpenCodeGoBaseURL)), true
236236
case "kimi-direct":
237237
apiKey := FirstNonEmpty(deployment.APIKey, storeSecret("MOONSHOT_API_KEY"))
238238
if apiKey == "" {

setup/deployment_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"testing"
66

7+
"github.com/GrayCodeAI/eyrie/client"
78
"github.com/GrayCodeAI/eyrie/config"
89
"github.com/GrayCodeAI/eyrie/credentials"
910
)
@@ -593,7 +594,10 @@ func TestProviderForDeployment_OpenCodeGo(t *testing.T) {
593594
if !ok {
594595
t.Fatal("expected opencodego to be configured")
595596
}
596-
if p.Name() != "openai" {
597-
t.Fatalf("provider name = %q, want openai", p.Name())
597+
if p.Name() != "opencodego" {
598+
t.Fatalf("provider name = %q, want opencodego", p.Name())
599+
}
600+
if _, ok := p.(*client.OpenCodeGoClient); !ok {
601+
t.Fatalf("provider type = %T, want *client.OpenCodeGoClient", p)
598602
}
599603
}

0 commit comments

Comments
 (0)