Skip to content

Commit d84bd44

Browse files
Support STREAMED response body (#25)
* Support STREAMED response body in ext-proc. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Update tests. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Add constants. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> * Add parse sse test. Signed-off-by: Mohammad <mohammad.nassar@ibm.com> --------- Signed-off-by: Mohammad <mohammad.nassar@ibm.com>
1 parent 215caec commit d84bd44

5 files changed

Lines changed: 237 additions & 30 deletions

File tree

config/charts/payload-processor/templates/istio.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ spec:
3434
request_header_mode: {{ if .Values.provider.supportedEvents.requestHeaders }}"SEND"{{ else }}"SKIP"{{ end }}
3535
response_header_mode: {{ if .Values.provider.supportedEvents.responseHeaders }}"SEND"{{ else }}"SKIP"{{ end }}
3636
request_body_mode: {{ if .Values.provider.supportedEvents.requestBody }}"FULL_DUPLEX_STREAMED"{{ else }}"NONE"{{ end }}
37-
response_body_mode: {{ if .Values.provider.supportedEvents.responseBody }}"FULL_DUPLEX_STREAMED"{{ else }}"NONE"{{ end }}
37+
response_body_mode: {{ if .Values.provider.supportedEvents.responseBody }}"STREAMED"{{ else }}"NONE"{{ end }}
3838
request_trailer_mode: {{ if .Values.provider.supportedEvents.requestTrailers }}"SEND"{{ else }}"SKIP"{{ end }}
3939
response_trailer_mode: {{ if .Values.provider.supportedEvents.responseTrailers }}"SEND"{{ else }}"SKIP"{{ end }}
4040
grpc_service:

pkg/handlers/response.go

Lines changed: 99 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ limitations under the License.
1717
package handlers
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"encoding/json"
23+
"errors"
2224
"fmt"
2325
"strconv"
2426
"time"
@@ -45,9 +47,9 @@ func (s *Server) HandleResponseHeaders(ctx context.Context, reqCtx *RequestConte
4547

4648
if !headers.GetEndOfStream() {
4749
log.FromContext(ctx).V(logutil.VERBOSE).Info("captured response headers, deferring response until body arrives...")
48-
return nil
4950
}
50-
// EndOfStream means no body is expected, return HeadersResponse immediately
51+
// Always respond to response headers so Envoy proceeds with body chunks.
52+
// In STREAMED/FULL_DUPLEX_STREAMED mode, Envoy blocks until we respond.
5153
return []*eppb.ProcessingResponse{
5254
{
5355
Response: &eppb.ProcessingResponse_ResponseHeaders{
@@ -76,8 +78,15 @@ func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext,
7678
}
7779

7880
if err := json.Unmarshal(responseBodyBytes, &reqCtx.Response.Body); err != nil {
79-
logger.Error(err, "Failed to parse response body as JSON, skipping response plugins")
80-
return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil
81+
// Try parsing as SSE (Server-Sent Events) — streaming responses from providers
82+
// like Anthropic use SSE format which isn't valid JSON.
83+
if sseBody, sseErr := parseSSEResponseBody(responseBodyBytes); sseErr == nil && sseBody != nil {
84+
reqCtx.Response.Body = sseBody
85+
logger.V(logutil.VERBOSE).Info("parsed SSE response body for response plugins")
86+
} else {
87+
logger.Error(err, "Failed to parse response body as JSON or SSE, skipping response plugins")
88+
return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil
89+
}
8190
}
8291

8392
if err := s.runResponsePlugins(ctx, reqCtx.CycleState, reqCtx.Response, reqCtx.Profile.ResponsePlugins); err != nil {
@@ -117,18 +126,96 @@ func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext,
117126
return ret, nil
118127
}
119128

120-
// generateEmptyResponseBodyResponse builds a streaming response with an empty
121-
// ResponseHeaders followed by chunked body responses via AddStreamedResponseBody.
122-
func (s *Server) generateEmptyResponseBodyResponse(responseBodyBytes []byte) []*eppb.ProcessingResponse {
123-
responses := []*eppb.ProcessingResponse{
129+
// generateEmptyResponseBodyResponse returns an empty BodyResponse ack for the
130+
// final (EndOfStream) response body chunk. In STREAMED mode, Envoy has already
131+
// forwarded all chunks downstream via per-chunk acks, so re-emitting the body
132+
// would cause a content-length / transfer-encoding mismatch on the client.
133+
func (s *Server) generateEmptyResponseBodyResponse(_ []byte) []*eppb.ProcessingResponse {
134+
return []*eppb.ProcessingResponse{
124135
{
125-
Response: &eppb.ProcessingResponse_ResponseHeaders{
126-
ResponseHeaders: &eppb.HeadersResponse{},
136+
Response: &eppb.ProcessingResponse_ResponseBody{
137+
ResponseBody: &eppb.BodyResponse{},
127138
},
128139
},
129140
}
130-
responses = envoy.AddStreamedResponseBody(responses, responseBodyBytes)
131-
return responses
141+
}
142+
143+
const (
144+
sseDataPrefix = "data:"
145+
sseDoneMarker = "[DONE]"
146+
bodyFieldModel = "model"
147+
bodyFieldUsage = "usage"
148+
bodyFieldResponse = "response"
149+
)
150+
151+
// parseSSEResponseBody extracts a composite response body from an SSE (Server-Sent Events)
152+
// stream. It parses by SSE event boundaries instead of individual lines because one logical
153+
// event may legally contain multiple consecutive `data:` lines that must be joined before JSON decoding.
154+
func parseSSEResponseBody(body []byte) (map[string]any, error) {
155+
result := map[string]any{}
156+
lines := bytes.Split(body, []byte("\n"))
157+
eventDataLines := make([][]byte, 0)
158+
159+
flushEvent := func() {
160+
if len(eventDataLines) == 0 {
161+
return
162+
}
163+
164+
data := bytes.Join(eventDataLines, []byte("\n"))
165+
eventDataLines = eventDataLines[:0]
166+
167+
data = bytes.TrimSpace(data)
168+
if len(data) == 0 || bytes.Equal(data, []byte(sseDoneMarker)) {
169+
return
170+
}
171+
172+
var event map[string]any
173+
if err := json.Unmarshal(data, &event); err != nil {
174+
return
175+
}
176+
177+
if model, ok := event[bodyFieldModel].(string); ok && model != "" {
178+
result[bodyFieldModel] = model
179+
}
180+
181+
usage, _ := event[bodyFieldUsage].(map[string]any)
182+
if usage == nil {
183+
if resp, ok := event[bodyFieldResponse].(map[string]any); ok {
184+
usage, _ = resp[bodyFieldUsage].(map[string]any)
185+
if m, ok := resp[bodyFieldModel].(string); ok && m != "" {
186+
result[bodyFieldModel] = m
187+
}
188+
}
189+
}
190+
if usage != nil {
191+
existing, _ := result[bodyFieldUsage].(map[string]any)
192+
if existing == nil {
193+
existing = map[string]any{}
194+
}
195+
for k, v := range usage {
196+
existing[k] = v
197+
}
198+
result[bodyFieldUsage] = existing
199+
}
200+
}
201+
202+
for _, line := range lines {
203+
trimmed := bytes.TrimRight(line, "\r")
204+
if len(trimmed) == 0 {
205+
flushEvent()
206+
continue
207+
}
208+
if bytes.HasPrefix(trimmed, []byte(sseDataPrefix)) {
209+
eventDataLines = append(eventDataLines, bytes.TrimSpace(trimmed[len(sseDataPrefix):]))
210+
}
211+
}
212+
flushEvent()
213+
214+
if len(result) == 0 {
215+
return nil, errors.New("no parseable SSE data events found")
216+
}
217+
218+
return result, nil
132219
}
133220

134221
// HandleResponseTrailers handles response trailers.

pkg/handlers/response_test.go

Lines changed: 117 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -72,26 +72,13 @@ func TestHandleResponseBody_NoPlugins(t *testing.T) {
7272
t.Fatalf("HandleResponseBody returned unexpected error: %v", err)
7373
}
7474

75+
// With STREAMED response_body_mode, the body has already been forwarded
76+
// downstream via the per-chunk acks issued in Process. The EoS reply is
77+
// just an empty BodyResponse ack.
7578
want := []*extProcPb.ProcessingResponse{
76-
{
77-
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
78-
ResponseHeaders: &extProcPb.HeadersResponse{},
79-
},
80-
},
8179
{
8280
Response: &extProcPb.ProcessingResponse_ResponseBody{
83-
ResponseBody: &extProcPb.BodyResponse{
84-
Response: &extProcPb.CommonResponse{
85-
BodyMutation: &extProcPb.BodyMutation{
86-
Mutation: &extProcPb.BodyMutation_StreamedResponse{
87-
StreamedResponse: &extProcPb.StreamedBodyResponse{
88-
Body: responseBody,
89-
EndOfStream: true,
90-
},
91-
},
92-
},
93-
},
94-
},
81+
ResponseBody: &extProcPb.BodyResponse{},
9582
},
9683
},
9784
}
@@ -338,3 +325,116 @@ func expectedStreamedResponseBodyMutation(bodyBytes []byte) []*extProcPb.Process
338325
},
339326
}
340327
}
328+
329+
func TestParseSSEResponseBody(t *testing.T) {
330+
tests := []struct {
331+
name string
332+
body string
333+
want map[string]any
334+
wantError bool
335+
}{
336+
{
337+
name: "empty input",
338+
body: "",
339+
wantError: true,
340+
},
341+
{
342+
name: "only [DONE] marker",
343+
body: "data: [DONE]\n\n",
344+
wantError: true,
345+
},
346+
{
347+
name: "single event with top-level model and usage (OpenAI / Anthropic chat style)",
348+
body: "data: {\"model\":\"gpt-4\",\"usage\":{\"completion_tokens\":12,\"prompt_tokens\":34}}\n\n",
349+
want: map[string]any{
350+
"model": "gpt-4",
351+
"usage": map[string]any{
352+
"completion_tokens": float64(12),
353+
"prompt_tokens": float64(34),
354+
},
355+
},
356+
},
357+
{
358+
name: "event nested under response (OpenAI Responses API style)",
359+
body: "data: {\"response\":{\"model\":\"gpt-4o\",\"usage\":{\"completion_tokens\":7}}}\n\n",
360+
want: map[string]any{
361+
"model": "gpt-4o",
362+
"usage": map[string]any{
363+
"completion_tokens": float64(7),
364+
},
365+
},
366+
},
367+
{
368+
name: "multiple events: later usage overrides earlier",
369+
body: "data: {\"model\":\"gpt-4\",\"usage\":{\"prompt_tokens\":10}}\n\n" +
370+
"data: {\"usage\":{\"completion_tokens\":20}}\n\n" +
371+
"data: [DONE]\n\n",
372+
want: map[string]any{
373+
"model": "gpt-4",
374+
"usage": map[string]any{
375+
"prompt_tokens": float64(10),
376+
"completion_tokens": float64(20),
377+
},
378+
},
379+
},
380+
{
381+
name: "multi-line data: joined before JSON decoding",
382+
body: "data: {\"model\":\"gpt-4\",\n" +
383+
"data: \"usage\":{\"completion_tokens\":5}}\n\n",
384+
want: map[string]any{
385+
"model": "gpt-4",
386+
"usage": map[string]any{
387+
"completion_tokens": float64(5),
388+
},
389+
},
390+
},
391+
{
392+
name: "malformed JSON event is skipped, valid event is kept",
393+
body: "data: not-json\n\n" +
394+
"data: {\"model\":\"gpt-4\"}\n\n",
395+
want: map[string]any{
396+
"model": "gpt-4",
397+
},
398+
},
399+
{
400+
name: "CRLF line endings",
401+
body: "data: {\"model\":\"gpt-4\"}\r\n\r\n",
402+
want: map[string]any{
403+
"model": "gpt-4",
404+
},
405+
},
406+
{
407+
name: "non-data lines (event:/id:/comments) are ignored",
408+
body: "event: message\n" +
409+
"id: 1\n" +
410+
": keep-alive\n" +
411+
"data: {\"model\":\"gpt-4\"}\n\n",
412+
want: map[string]any{
413+
"model": "gpt-4",
414+
},
415+
},
416+
{
417+
name: "events with no model or usage produce no result",
418+
body: "data: {\"foo\":\"bar\"}\n\n",
419+
wantError: true,
420+
},
421+
}
422+
423+
for _, tc := range tests {
424+
t.Run(tc.name, func(t *testing.T) {
425+
got, err := parseSSEResponseBody([]byte(tc.body))
426+
if tc.wantError {
427+
if err == nil {
428+
t.Fatalf("parseSSEResponseBody: expected error, got result %v", got)
429+
}
430+
return
431+
}
432+
if err != nil {
433+
t.Fatalf("parseSSEResponseBody: unexpected error: %v", err)
434+
}
435+
if diff := cmp.Diff(tc.want, got); diff != "" {
436+
t.Errorf("parseSSEResponseBody mismatch (-want +got):\n%s", diff)
437+
}
438+
})
439+
}
440+
}

pkg/handlers/server.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,16 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
156156
}
157157
responseBody = append(responseBody, v.ResponseBody.Body...)
158158
if !v.ResponseBody.EndOfStream {
159+
// Send an immediate response for this chunk so Envoy continues
160+
// streaming. Without this, Envoy blocks waiting for our response
161+
// and stops forwarding subsequent chunks.
162+
if sendErr := srv.Send(&extProcPb.ProcessingResponse{
163+
Response: &extProcPb.ProcessingResponse_ResponseBody{
164+
ResponseBody: &extProcPb.BodyResponse{},
165+
},
166+
}); sendErr != nil {
167+
return status.Errorf(codes.Unknown, "failed to send streaming response ack: %v", sendErr)
168+
}
159169
continue
160170
}
161171
reqCtx.ResponseCompleteTimestamp = time.Now()

pkg/handlers/server_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ func TestHandleResponseBody_Streaming(t *testing.T) {
220220
if err := process.Send(request); err != nil {
221221
t.Fatalf("send response headers: %v", err)
222222
}
223+
// Discard the immediate header ack (HandleResponseHeaders always responds now).
224+
if _, err := process.Recv(); err != nil {
225+
t.Fatalf("recv header ack: %v", err)
226+
}
223227

224228
for _, c := range tc.chunks {
225229
request = &extProcPb.ProcessingRequest{
@@ -233,6 +237,12 @@ func TestHandleResponseBody_Streaming(t *testing.T) {
233237
if err := process.Send(request); err != nil {
234238
t.Fatalf("send response body chunk: %v", err)
235239
}
240+
// Discard the immediate ack for non-EoS chunks (server keeps Envoy unblocked).
241+
if !c.endOfStream {
242+
if _, err := process.Recv(); err != nil {
243+
t.Fatalf("recv chunk ack: %v", err)
244+
}
245+
}
236246
}
237247

238248
got := make([]*extProcPb.ProcessingResponse, 0, len(want))

0 commit comments

Comments
 (0)