Skip to content

Commit 2f3d61a

Browse files
committed
fix: strip stream_options from request body before parsing
OpenAI-compatible SDKs add stream_options to streaming requests, but providers like Anthropic reject it with 400. In FULL_DUPLEX_STREAMED mode, body mutations via StreamedBodyResponse cannot reliably remove fields from chunks already forwarded by envoy. Strip stream_options from accumulated raw bytes before json.Unmarshal using JSON parse/delete/re-serialize — avoids false matches when the text "stream_options" appears inside message string values. Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
1 parent 5601f48 commit 2f3d61a

2 files changed

Lines changed: 182 additions & 0 deletions

File tree

pkg/handlers/request.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ func (s *Server) HandleRequestHeaders(ctx context.Context, reqCtx *RequestContex
6464
func (s *Server) HandleRequestBody(ctx context.Context, reqCtx *RequestContext, requestBodyBytes []byte) ([]*eppb.ProcessingResponse, error) {
6565
var ret []*eppb.ProcessingResponse
6666

67+
requestBodyBytes = stripStreamOptions(requestBodyBytes)
68+
6769
if err := json.Unmarshal(requestBodyBytes, &reqCtx.Request.Body); err != nil {
6870
return nil, errcommon.Error{Code: errcommon.BadRequest, Msg: fmt.Sprintf("failed to parse request body: %v", err)}
6971
}
@@ -161,6 +163,28 @@ func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBy
161163
return responses
162164
}
163165

166+
// stripStreamOptions removes the top-level "stream_options" key from JSON
167+
// request bodies. OpenAI SDKs add this field for streaming requests, but
168+
// providers like Anthropic reject it with 400. Body mutations via
169+
// StreamedBodyResponse cannot reliably remove fields already forwarded in
170+
// FULL_DUPLEX_STREAMED mode, so the strip runs on accumulated raw bytes
171+
// before json.Unmarshal.
172+
func stripStreamOptions(body []byte) []byte {
173+
var parsed map[string]interface{}
174+
if err := json.Unmarshal(body, &parsed); err != nil {
175+
return body
176+
}
177+
if _, exists := parsed["stream_options"]; !exists {
178+
return body
179+
}
180+
delete(parsed, "stream_options")
181+
result, err := json.Marshal(parsed)
182+
if err != nil {
183+
return body
184+
}
185+
return result
186+
}
187+
164188
// HandleRequestTrailers handles request trailers.
165189
func (s *Server) HandleRequestTrailers(trailers *eppb.HttpTrailers) ([]*eppb.ProcessingResponse, error) {
166190
return []*eppb.ProcessingResponse{

pkg/handlers/request_test.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,164 @@ func buildStreamingResponse(bodyBytes []byte, setHeaders map[string]string, remo
886886
}
887887
}
888888

889+
// === stripStreamOptions Tests ===
890+
891+
func TestStripStreamOptions(t *testing.T) {
892+
tests := []struct {
893+
name string
894+
input string
895+
wantBody map[string]any
896+
}{
897+
{
898+
name: "removes top-level stream_options key",
899+
input: `{"model":"claude-opus-4-8","messages":[{"role":"user","content":"hi"}],"max_tokens":5,"stream":true,"stream_options":{"include_usage":true}}`,
900+
wantBody: map[string]any{
901+
"model": "claude-opus-4-8",
902+
"messages": []any{map[string]any{"role": "user", "content": "hi"}},
903+
"max_tokens": float64(5),
904+
"stream": true,
905+
},
906+
},
907+
{
908+
name: "no stream_options — body unchanged",
909+
input: `{"model":"claude-opus-4-8","messages":[{"role":"user","content":"hi"}],"max_tokens":5}`,
910+
wantBody: map[string]any{
911+
"model": "claude-opus-4-8",
912+
"messages": []any{map[string]any{"role": "user", "content": "hi"}},
913+
"max_tokens": float64(5),
914+
},
915+
},
916+
{
917+
name: "stream_options text inside message string is preserved",
918+
input: `{"model":"claude-opus-4-8","messages":[{"role":"user","content":"please fix stream_options handling"}],"max_tokens":5,"stream_options":{"include_usage":true}}`,
919+
wantBody: map[string]any{
920+
"model": "claude-opus-4-8",
921+
"messages": []any{map[string]any{"role": "user", "content": "please fix stream_options handling"}},
922+
"max_tokens": float64(5),
923+
},
924+
},
925+
{
926+
name: "only stream_options text inside string — no top-level key to remove",
927+
input: `{"model":"test","messages":[{"role":"user","content":"the stream_options field should not appear"}],"max_tokens":5}`,
928+
wantBody: map[string]any{
929+
"model": "test",
930+
"messages": []any{map[string]any{"role": "user", "content": "the stream_options field should not appear"}},
931+
"max_tokens": float64(5),
932+
},
933+
},
934+
{
935+
name: "large body with stream_options",
936+
input: `{"model":"claude-opus-4-8","messages":[{"role":"user","content":"` + strings.Repeat("A", 50000) + `"}],"max_tokens":5,"stream":true,"stream_options":{"include_usage":true}}`,
937+
wantBody: map[string]any{
938+
"model": "claude-opus-4-8",
939+
"messages": []any{map[string]any{"role": "user", "content": strings.Repeat("A", 50000)}},
940+
"max_tokens": float64(5),
941+
"stream": true,
942+
},
943+
},
944+
}
945+
946+
for _, tc := range tests {
947+
t.Run(tc.name, func(t *testing.T) {
948+
result := stripStreamOptions([]byte(tc.input))
949+
950+
var got map[string]any
951+
if err := json.Unmarshal(result, &got); err != nil {
952+
t.Fatalf("failed to unmarshal result: %v", err)
953+
}
954+
955+
if _, exists := got["stream_options"]; exists {
956+
t.Error("stream_options key still present after stripping")
957+
}
958+
959+
wantBytes, _ := json.Marshal(tc.wantBody)
960+
gotBytes, _ := json.Marshal(got)
961+
if string(wantBytes) != string(gotBytes) {
962+
t.Errorf("body mismatch:\nwant: %s\ngot: %s", wantBytes, gotBytes)
963+
}
964+
})
965+
}
966+
967+
t.Run("invalid JSON returns original bytes", func(t *testing.T) {
968+
input := []byte(`not json at all`)
969+
result := stripStreamOptions(input)
970+
if string(result) != string(input) {
971+
t.Errorf("expected original bytes for invalid JSON, got: %s", result)
972+
}
973+
})
974+
975+
t.Run("empty body returns original bytes", func(t *testing.T) {
976+
result := stripStreamOptions([]byte{})
977+
if len(result) != 0 {
978+
t.Errorf("expected empty result for empty input, got: %s", result)
979+
}
980+
})
981+
}
982+
983+
func TestHandleRequestBody_StripsStreamOptions(t *testing.T) {
984+
metrics.Register()
985+
ctx := logutil.NewTestLoggerIntoContext(context.Background())
986+
987+
tests := []struct {
988+
name string
989+
body string
990+
}{
991+
{
992+
name: "small body with stream_options is stripped before plugin processing",
993+
body: `{"model":"claude-opus-4-8","messages":[{"role":"user","content":"hi"}],"max_tokens":5,"stream":true,"stream_options":{"include_usage":true}}`,
994+
},
995+
{
996+
name: "large body with stream_options (simulates multi-chunk request)",
997+
body: `{"model":"claude-opus-4-8","messages":[{"role":"user","content":"` + strings.Repeat("X", 100000) + `"}],"max_tokens":5,"stream":true,"stream_options":{"include_usage":true}}`,
998+
},
999+
{
1000+
name: "body without stream_options passes through unchanged",
1001+
body: `{"model":"claude-opus-4-8","messages":[{"role":"user","content":"hi"}],"max_tokens":5}`,
1002+
},
1003+
}
1004+
1005+
for _, tc := range tests {
1006+
t.Run(tc.name, func(t *testing.T) {
1007+
profiles := newTestProfiles()
1008+
server := newServerForTest(profiles)
1009+
reqCtx := &RequestContext{
1010+
CycleState: plugin.NewCycleState(),
1011+
Request: requesthandling.NewInferenceRequest(),
1012+
}
1013+
1014+
resp, err := server.HandleRequestBody(ctx, reqCtx, []byte(tc.body))
1015+
if err != nil {
1016+
t.Fatalf("HandleRequestBody returned unexpected error: %v", err)
1017+
}
1018+
1019+
if _, exists := reqCtx.Request.Body["stream_options"]; exists {
1020+
t.Error("stream_options still present in parsed request body after HandleRequestBody")
1021+
}
1022+
1023+
for _, r := range resp {
1024+
if rb, ok := r.Response.(*extProcPb.ProcessingResponse_RequestBody); ok {
1025+
if rb.RequestBody != nil && rb.RequestBody.Response != nil {
1026+
if bm := rb.RequestBody.Response.BodyMutation; bm != nil {
1027+
if sr, ok := bm.Mutation.(*extProcPb.BodyMutation_StreamedResponse); ok {
1028+
var bodyMap map[string]any
1029+
if err := json.Unmarshal(sr.StreamedResponse.Body, &bodyMap); err == nil {
1030+
if _, exists := bodyMap["stream_options"]; exists {
1031+
t.Errorf("stream_options found in StreamedBodyResponse body chunk")
1032+
}
1033+
}
1034+
}
1035+
}
1036+
}
1037+
}
1038+
}
1039+
1040+
if len(resp) < 2 {
1041+
t.Fatalf("expected at least 2 responses (headers + body), got %d", len(resp))
1042+
}
1043+
})
1044+
}
1045+
}
1046+
8891047
// === Request Body Tests (body mutations) ===
8901048

8911049
type bodyMutatingPlugin struct {

0 commit comments

Comments
 (0)