diff --git a/config/charts/payload-processor/templates/istio.yaml b/config/charts/payload-processor/templates/istio.yaml index 9e3b211..935d7a6 100644 --- a/config/charts/payload-processor/templates/istio.yaml +++ b/config/charts/payload-processor/templates/istio.yaml @@ -34,7 +34,7 @@ spec: request_header_mode: {{ if .Values.provider.supportedEvents.requestHeaders }}"SEND"{{ else }}"SKIP"{{ end }} response_header_mode: {{ if .Values.provider.supportedEvents.responseHeaders }}"SEND"{{ else }}"SKIP"{{ end }} request_body_mode: {{ if .Values.provider.supportedEvents.requestBody }}"FULL_DUPLEX_STREAMED"{{ else }}"NONE"{{ end }} - response_body_mode: {{ if .Values.provider.supportedEvents.responseBody }}"FULL_DUPLEX_STREAMED"{{ else }}"NONE"{{ end }} + response_body_mode: {{ if .Values.provider.supportedEvents.responseBody }}"STREAMED"{{ else }}"NONE"{{ end }} request_trailer_mode: {{ if .Values.provider.supportedEvents.requestTrailers }}"SEND"{{ else }}"SKIP"{{ end }} response_trailer_mode: {{ if .Values.provider.supportedEvents.responseTrailers }}"SEND"{{ else }}"SKIP"{{ end }} grpc_service: diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index 03210f1..f0bfa9a 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -17,8 +17,10 @@ limitations under the License. package handlers import ( + "bytes" "context" "encoding/json" + "errors" "fmt" "strconv" "time" @@ -45,9 +47,9 @@ func (s *Server) HandleResponseHeaders(ctx context.Context, reqCtx *RequestConte if !headers.GetEndOfStream() { log.FromContext(ctx).V(logutil.VERBOSE).Info("captured response headers, deferring response until body arrives...") - return nil } - // EndOfStream means no body is expected, return HeadersResponse immediately + // Always respond to response headers so Envoy proceeds with body chunks. + // In STREAMED/FULL_DUPLEX_STREAMED mode, Envoy blocks until we respond. return []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_ResponseHeaders{ @@ -76,8 +78,15 @@ func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext, } if err := json.Unmarshal(responseBodyBytes, &reqCtx.Response.Body); err != nil { - logger.Error(err, "Failed to parse response body as JSON, skipping response plugins") - return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil + // Try parsing as SSE (Server-Sent Events) — streaming responses from providers + // like Anthropic use SSE format which isn't valid JSON. + if sseBody, sseErr := parseSSEResponseBody(responseBodyBytes); sseErr == nil && sseBody != nil { + reqCtx.Response.Body = sseBody + logger.V(logutil.VERBOSE).Info("parsed SSE response body for response plugins") + } else { + logger.Error(err, "Failed to parse response body as JSON or SSE, skipping response plugins") + return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil + } } if err := s.runResponsePlugins(ctx, reqCtx.CycleState, reqCtx.Response, reqCtx.Profile.ResponsePlugins); err != nil { @@ -117,18 +126,96 @@ func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext, return ret, nil } -// generateEmptyResponseBodyResponse builds a streaming response with an empty -// ResponseHeaders followed by chunked body responses via AddStreamedResponseBody. -func (s *Server) generateEmptyResponseBodyResponse(responseBodyBytes []byte) []*eppb.ProcessingResponse { - responses := []*eppb.ProcessingResponse{ +// generateEmptyResponseBodyResponse returns an empty BodyResponse ack for the +// final (EndOfStream) response body chunk. In STREAMED mode, Envoy has already +// forwarded all chunks downstream via per-chunk acks, so re-emitting the body +// would cause a content-length / transfer-encoding mismatch on the client. +func (s *Server) generateEmptyResponseBodyResponse(_ []byte) []*eppb.ProcessingResponse { + return []*eppb.ProcessingResponse{ { - Response: &eppb.ProcessingResponse_ResponseHeaders{ - ResponseHeaders: &eppb.HeadersResponse{}, + Response: &eppb.ProcessingResponse_ResponseBody{ + ResponseBody: &eppb.BodyResponse{}, }, }, } - responses = envoy.AddStreamedResponseBody(responses, responseBodyBytes) - return responses +} + +const ( + sseDataPrefix = "data:" + sseDoneMarker = "[DONE]" + bodyFieldModel = "model" + bodyFieldUsage = "usage" + bodyFieldResponse = "response" +) + +// parseSSEResponseBody extracts a composite response body from an SSE (Server-Sent Events) +// stream. It parses by SSE event boundaries instead of individual lines because one logical +// event may legally contain multiple consecutive `data:` lines that must be joined before JSON decoding. +func parseSSEResponseBody(body []byte) (map[string]any, error) { + result := map[string]any{} + lines := bytes.Split(body, []byte("\n")) + eventDataLines := make([][]byte, 0) + + flushEvent := func() { + if len(eventDataLines) == 0 { + return + } + + data := bytes.Join(eventDataLines, []byte("\n")) + eventDataLines = eventDataLines[:0] + + data = bytes.TrimSpace(data) + if len(data) == 0 || bytes.Equal(data, []byte(sseDoneMarker)) { + return + } + + var event map[string]any + if err := json.Unmarshal(data, &event); err != nil { + return + } + + if model, ok := event[bodyFieldModel].(string); ok && model != "" { + result[bodyFieldModel] = model + } + + usage, _ := event[bodyFieldUsage].(map[string]any) + if usage == nil { + if resp, ok := event[bodyFieldResponse].(map[string]any); ok { + usage, _ = resp[bodyFieldUsage].(map[string]any) + if m, ok := resp[bodyFieldModel].(string); ok && m != "" { + result[bodyFieldModel] = m + } + } + } + if usage != nil { + existing, _ := result[bodyFieldUsage].(map[string]any) + if existing == nil { + existing = map[string]any{} + } + for k, v := range usage { + existing[k] = v + } + result[bodyFieldUsage] = existing + } + } + + for _, line := range lines { + trimmed := bytes.TrimRight(line, "\r") + if len(trimmed) == 0 { + flushEvent() + continue + } + if bytes.HasPrefix(trimmed, []byte(sseDataPrefix)) { + eventDataLines = append(eventDataLines, bytes.TrimSpace(trimmed[len(sseDataPrefix):])) + } + } + flushEvent() + + if len(result) == 0 { + return nil, errors.New("no parseable SSE data events found") + } + + return result, nil } // HandleResponseTrailers handles response trailers. diff --git a/pkg/handlers/response_test.go b/pkg/handlers/response_test.go index 42e35e4..0708f6d 100644 --- a/pkg/handlers/response_test.go +++ b/pkg/handlers/response_test.go @@ -72,26 +72,13 @@ func TestHandleResponseBody_NoPlugins(t *testing.T) { t.Fatalf("HandleResponseBody returned unexpected error: %v", err) } + // With STREAMED response_body_mode, the body has already been forwarded + // downstream via the per-chunk acks issued in Process. The EoS reply is + // just an empty BodyResponse ack. want := []*extProcPb.ProcessingResponse{ - { - Response: &extProcPb.ProcessingResponse_ResponseHeaders{ - ResponseHeaders: &extProcPb.HeadersResponse{}, - }, - }, { Response: &extProcPb.ProcessingResponse_ResponseBody{ - ResponseBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_StreamedResponse{ - StreamedResponse: &extProcPb.StreamedBodyResponse{ - Body: responseBody, - EndOfStream: true, - }, - }, - }, - }, - }, + ResponseBody: &extProcPb.BodyResponse{}, }, }, } @@ -338,3 +325,116 @@ func expectedStreamedResponseBodyMutation(bodyBytes []byte) []*extProcPb.Process }, } } + +func TestParseSSEResponseBody(t *testing.T) { + tests := []struct { + name string + body string + want map[string]any + wantError bool + }{ + { + name: "empty input", + body: "", + wantError: true, + }, + { + name: "only [DONE] marker", + body: "data: [DONE]\n\n", + wantError: true, + }, + { + name: "single event with top-level model and usage (OpenAI / Anthropic chat style)", + body: "data: {\"model\":\"gpt-4\",\"usage\":{\"completion_tokens\":12,\"prompt_tokens\":34}}\n\n", + want: map[string]any{ + "model": "gpt-4", + "usage": map[string]any{ + "completion_tokens": float64(12), + "prompt_tokens": float64(34), + }, + }, + }, + { + name: "event nested under response (OpenAI Responses API style)", + body: "data: {\"response\":{\"model\":\"gpt-4o\",\"usage\":{\"completion_tokens\":7}}}\n\n", + want: map[string]any{ + "model": "gpt-4o", + "usage": map[string]any{ + "completion_tokens": float64(7), + }, + }, + }, + { + name: "multiple events: later usage overrides earlier", + body: "data: {\"model\":\"gpt-4\",\"usage\":{\"prompt_tokens\":10}}\n\n" + + "data: {\"usage\":{\"completion_tokens\":20}}\n\n" + + "data: [DONE]\n\n", + want: map[string]any{ + "model": "gpt-4", + "usage": map[string]any{ + "prompt_tokens": float64(10), + "completion_tokens": float64(20), + }, + }, + }, + { + name: "multi-line data: joined before JSON decoding", + body: "data: {\"model\":\"gpt-4\",\n" + + "data: \"usage\":{\"completion_tokens\":5}}\n\n", + want: map[string]any{ + "model": "gpt-4", + "usage": map[string]any{ + "completion_tokens": float64(5), + }, + }, + }, + { + name: "malformed JSON event is skipped, valid event is kept", + body: "data: not-json\n\n" + + "data: {\"model\":\"gpt-4\"}\n\n", + want: map[string]any{ + "model": "gpt-4", + }, + }, + { + name: "CRLF line endings", + body: "data: {\"model\":\"gpt-4\"}\r\n\r\n", + want: map[string]any{ + "model": "gpt-4", + }, + }, + { + name: "non-data lines (event:/id:/comments) are ignored", + body: "event: message\n" + + "id: 1\n" + + ": keep-alive\n" + + "data: {\"model\":\"gpt-4\"}\n\n", + want: map[string]any{ + "model": "gpt-4", + }, + }, + { + name: "events with no model or usage produce no result", + body: "data: {\"foo\":\"bar\"}\n\n", + wantError: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got, err := parseSSEResponseBody([]byte(tc.body)) + if tc.wantError { + if err == nil { + t.Fatalf("parseSSEResponseBody: expected error, got result %v", got) + } + return + } + if err != nil { + t.Fatalf("parseSSEResponseBody: unexpected error: %v", err) + } + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("parseSSEResponseBody mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/handlers/server.go b/pkg/handlers/server.go index 38e32c0..2b39ba5 100644 --- a/pkg/handlers/server.go +++ b/pkg/handlers/server.go @@ -156,6 +156,16 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { } responseBody = append(responseBody, v.ResponseBody.Body...) if !v.ResponseBody.EndOfStream { + // Send an immediate response for this chunk so Envoy continues + // streaming. Without this, Envoy blocks waiting for our response + // and stops forwarding subsequent chunks. + if sendErr := srv.Send(&extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ResponseBody{ + ResponseBody: &extProcPb.BodyResponse{}, + }, + }); sendErr != nil { + return status.Errorf(codes.Unknown, "failed to send streaming response ack: %v", sendErr) + } continue } reqCtx.ResponseCompleteTimestamp = time.Now() diff --git a/pkg/handlers/server_test.go b/pkg/handlers/server_test.go index f014348..c2fac01 100644 --- a/pkg/handlers/server_test.go +++ b/pkg/handlers/server_test.go @@ -220,6 +220,10 @@ func TestHandleResponseBody_Streaming(t *testing.T) { if err := process.Send(request); err != nil { t.Fatalf("send response headers: %v", err) } + // Discard the immediate header ack (HandleResponseHeaders always responds now). + if _, err := process.Recv(); err != nil { + t.Fatalf("recv header ack: %v", err) + } for _, c := range tc.chunks { request = &extProcPb.ProcessingRequest{ @@ -233,6 +237,12 @@ func TestHandleResponseBody_Streaming(t *testing.T) { if err := process.Send(request); err != nil { t.Fatalf("send response body chunk: %v", err) } + // Discard the immediate ack for non-EoS chunks (server keeps Envoy unblocked). + if !c.endOfStream { + if _, err := process.Recv(); err != nil { + t.Fatalf("recv chunk ack: %v", err) + } + } } got := make([]*extProcPb.ProcessingResponse, 0, len(want))