Skip to content

Commit 098e656

Browse files
committed
fix codex responses sse passthrough
1 parent 924e079 commit 098e656

4 files changed

Lines changed: 81 additions & 2 deletions

File tree

core/internal/buildinfo/buildinfo.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import "strings"
44

55
// Set at link time via -ldflags (see .goreleaser.yaml).
66
var (
7-
Version = "dev0.1.99"
7+
Version = "dev0.1.100"
88
Commit = "none"
99
Date = "unknown"
1010
)

core/internal/protocol/stream_sse.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,32 @@ func streamFlushMaybe(w http.ResponseWriter) func() {
7171
return func() {}
7272
}
7373

74+
// CopyPlaintextSSEToDownstream relays an already-decoded SSE stream without
75+
// parsing it. This is used for same-protocol proxying, where the safest
76+
// behavior is to preserve provider-specific event types and lifecycle details.
77+
func CopyPlaintextSSEToDownstream(ctx context.Context, plaintext io.Reader, w http.ResponseWriter) error {
78+
flush := streamFlushMaybe(w)
79+
buf := make([]byte, 16*1024)
80+
for {
81+
if err := ctx.Err(); err != nil {
82+
return err
83+
}
84+
n, readErr := plaintext.Read(buf)
85+
if n > 0 {
86+
if _, err := w.Write(buf[:n]); err != nil {
87+
return err
88+
}
89+
flush()
90+
}
91+
if readErr != nil {
92+
if errors.Is(readErr, io.EOF) {
93+
return nil
94+
}
95+
return readErr
96+
}
97+
}
98+
}
99+
74100
// TranscodePlaintextSSEToIngress converts egress-shaped SSE plaintext into ingress-shaped SSE (decoder -> IR -> encoder), mirroring Electron transformResponse.
75101
//
76102
// prependModel emits an initial message_start event like Electron eventsWithModel.

core/internal/proxy/server.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,11 @@ func (s *Server) handleProxy(w http.ResponseWriter, r *http.Request) {
599599
capture := &cappedBuffer{limit: maxCallLogBodyBytes}
600600
tee := io.TeeReader(buf, capture)
601601
var streamErr error
602-
streamErr = protocol.TranscodePlaintextSSEToIngress(r.Context(), route.IngressStyle, route.EgressStyle, ir.Model, tee, w)
602+
if route.IngressStyle == apistyle.OpenAIResponses && route.EgressStyle == apistyle.OpenAIResponses {
603+
streamErr = protocol.CopyPlaintextSSEToDownstream(r.Context(), tee, w)
604+
} else {
605+
streamErr = protocol.TranscodePlaintextSSEToIngress(r.Context(), route.IngressStyle, route.EgressStyle, ir.Model, tee, w)
606+
}
603607
if shouldRecordStreamError(streamErr) {
604608
trace.setError(streamErr.Error())
605609
}

core/internal/proxy/server_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,55 @@ func wireResponsesUpstreamStore(base string) *profile.Store {
433433
}
434434
}
435435

436+
func TestSameProtocolOpenAIResponsesSSEPassthroughPreservesLifecycle(t *testing.T) {
437+
upstreamBody := strings.Join([]string{
438+
`event: response.created`,
439+
`data: {"type":"response.created","response":{"id":"resp_test","object":"response","model":"gpt-wire","status":"in_progress","output":[]},"sequence_number":0}`,
440+
``,
441+
`event: response.reasoning_summary_text.delta`,
442+
`data: {"type":"response.reasoning_summary_text.delta","delta":"thinking","sequence_number":1}`,
443+
``,
444+
`event: response.completed`,
445+
`data: {"type":"response.completed","response":{"id":"resp_test","object":"response","model":"gpt-wire","status":"completed","output":[],"usage":{"input_tokens":1,"output_tokens":1}},"sequence_number":2}`,
446+
``,
447+
}, "\n")
448+
449+
up := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
450+
if r.URL.Path != "/v1/responses" {
451+
t.Errorf("unexpected upstream path %q", r.URL.Path)
452+
}
453+
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
454+
_, _ = w.Write([]byte(upstreamBody))
455+
}))
456+
defer up.Close()
457+
458+
s := NewServer(profile.ProxyConfig{Host: "127.0.0.1", Port: 27483})
459+
s.ProfileLoader = func() (*profile.Store, error) { return wireResponsesUpstreamStore(up.URL + "/v1"), nil }
460+
ts := httptest.NewServer(s.Server.Handler)
461+
defer ts.Close()
462+
463+
payload := `{"model":"cross-model-id","input":"ping","stream":true}`
464+
resp, err := http.Post(ts.URL+"/custom-api/cross-model-id/openai-responses/v1/responses", "application/json", strings.NewReader(payload))
465+
if err != nil {
466+
t.Fatal(err)
467+
}
468+
defer resp.Body.Close()
469+
body, _ := io.ReadAll(resp.Body)
470+
if resp.StatusCode != http.StatusOK {
471+
t.Fatalf("status=%d body=%s", resp.StatusCode, body)
472+
}
473+
raw := string(body)
474+
for _, want := range []string{
475+
`event: response.reasoning_summary_text.delta`,
476+
`"type":"response.reasoning_summary_text.delta"`,
477+
`event: response.completed`,
478+
`"type":"response.completed"`,
479+
} {
480+
if !strings.Contains(raw, want) {
481+
t.Fatalf("same-protocol Responses stream lost %s in %s", want, raw)
482+
}
483+
}
484+
}
436485
func TestCrossProtocolOpenAIIngressWithClaudeUpstreamTranscodesJSON(t *testing.T) {
437486
var upstreamHits int
438487
up := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

0 commit comments

Comments
 (0)