Skip to content

Commit c8b7e2b

Browse files
committed
fix(executor): ensure empty stream completions use output_item.done as fallback
Fixed: router-for-me#2583
1 parent cad45ff commit c8b7e2b

2 files changed

Lines changed: 92 additions & 4 deletions

File tree

internal/runtime/executor/codex_executor.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"net/http"
10+
"sort"
1011
"strings"
1112
"time"
1213

@@ -167,22 +168,63 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
167168
helps.AppendAPIResponseChunk(ctx, e.cfg, data)
168169

169170
lines := bytes.Split(data, []byte("\n"))
171+
outputItemsByIndex := make(map[int64][]byte)
172+
var outputItemsFallback [][]byte
170173
for _, line := range lines {
171174
if !bytes.HasPrefix(line, dataTag) {
172175
continue
173176
}
174177

175-
line = bytes.TrimSpace(line[5:])
176-
if gjson.GetBytes(line, "type").String() != "response.completed" {
178+
eventData := bytes.TrimSpace(line[5:])
179+
eventType := gjson.GetBytes(eventData, "type").String()
180+
181+
if eventType == "response.output_item.done" {
182+
itemResult := gjson.GetBytes(eventData, "item")
183+
if !itemResult.Exists() || itemResult.Type != gjson.JSON {
184+
continue
185+
}
186+
outputIndexResult := gjson.GetBytes(eventData, "output_index")
187+
if outputIndexResult.Exists() {
188+
outputItemsByIndex[outputIndexResult.Int()] = []byte(itemResult.Raw)
189+
} else {
190+
outputItemsFallback = append(outputItemsFallback, []byte(itemResult.Raw))
191+
}
192+
continue
193+
}
194+
195+
if eventType != "response.completed" {
177196
continue
178197
}
179198

180-
if detail, ok := helps.ParseCodexUsage(line); ok {
199+
if detail, ok := helps.ParseCodexUsage(eventData); ok {
181200
reporter.Publish(ctx, detail)
182201
}
183202

203+
completedData := eventData
204+
outputResult := gjson.GetBytes(completedData, "response.output")
205+
shouldPatchOutput := (!outputResult.Exists() || !outputResult.IsArray() || len(outputResult.Array()) == 0) && (len(outputItemsByIndex) > 0 || len(outputItemsFallback) > 0)
206+
if shouldPatchOutput {
207+
completedDataPatched := completedData
208+
completedDataPatched, _ = sjson.SetRawBytes(completedDataPatched, "response.output", []byte(`[]`))
209+
210+
indexes := make([]int64, 0, len(outputItemsByIndex))
211+
for idx := range outputItemsByIndex {
212+
indexes = append(indexes, idx)
213+
}
214+
sort.Slice(indexes, func(i, j int) bool {
215+
return indexes[i] < indexes[j]
216+
})
217+
for _, idx := range indexes {
218+
completedDataPatched, _ = sjson.SetRawBytes(completedDataPatched, "response.output.-1", outputItemsByIndex[idx])
219+
}
220+
for _, item := range outputItemsFallback {
221+
completedDataPatched, _ = sjson.SetRawBytes(completedDataPatched, "response.output.-1", item)
222+
}
223+
completedData = completedDataPatched
224+
}
225+
184226
var param any
185-
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, originalPayload, body, line, &param)
227+
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, originalPayload, body, completedData, &param)
186228
resp = cliproxyexecutor.Response{Payload: out, Headers: httpResp.Header.Clone()}
187229
return resp, nil
188230
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package executor
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"net/http/httptest"
7+
"testing"
8+
9+
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
10+
_ "github.com/router-for-me/CLIProxyAPI/v6/internal/translator"
11+
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
12+
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
13+
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
14+
"github.com/tidwall/gjson"
15+
)
16+
17+
func TestCodexExecutorExecute_EmptyStreamCompletionOutputUsesOutputItemDone(t *testing.T) {
18+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
19+
w.Header().Set("Content-Type", "text/event-stream")
20+
_, _ = w.Write([]byte("data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"ok\"}]},\"output_index\":0}\n"))
21+
_, _ = w.Write([]byte("data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp_1\",\"object\":\"response\",\"created_at\":1775555723,\"status\":\"completed\",\"model\":\"gpt-5.4-mini-2026-03-17\",\"output\":[],\"usage\":{\"input_tokens\":8,\"output_tokens\":28,\"total_tokens\":36}}}\n\n"))
22+
}))
23+
defer server.Close()
24+
25+
executor := NewCodexExecutor(&config.Config{})
26+
auth := &cliproxyauth.Auth{Attributes: map[string]string{
27+
"base_url": server.URL,
28+
"api_key": "test",
29+
}}
30+
31+
resp, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
32+
Model: "gpt-5.4-mini",
33+
Payload: []byte(`{"model":"gpt-5.4-mini","messages":[{"role":"user","content":"Say ok"}]}`),
34+
}, cliproxyexecutor.Options{
35+
SourceFormat: sdktranslator.FromString("openai"),
36+
Stream: false,
37+
})
38+
if err != nil {
39+
t.Fatalf("Execute error: %v", err)
40+
}
41+
42+
gotContent := gjson.GetBytes(resp.Payload, "choices.0.message.content").String()
43+
if gotContent != "ok" {
44+
t.Fatalf("choices.0.message.content = %q, want %q; payload=%s", gotContent, "ok", string(resp.Payload))
45+
}
46+
}

0 commit comments

Comments
 (0)