From 72b5454928fdb7ec677933f69251673a8ceea663 Mon Sep 17 00:00:00 2001 From: Noy Itzikowitz Date: Thu, 28 May 2026 12:51:50 -0700 Subject: [PATCH 1/2] feat: parse SSE streaming responses for response plugins When the response body is not valid JSON (e.g., SSE/Server-Sent Events from streaming providers like Anthropic), parse the SSE data lines to extract usage and model information. This enables response plugins (usage-tracking, metering) to process streaming responses that were previously skipped with "Failed to parse response body as JSON". Also fixes two issues with streaming response handling: 1. Always respond to response headers so Envoy proceeds with body chunks (previously returned nil, causing per-message timeout) 2. Send an immediate ack for each non-EoS response body chunk so Envoy continues forwarding subsequent chunks instead of blocking Signed-off-by: Noy Itzikowitz --- pkg/handlers/response.go | 72 +++++++++++++++++++++++++++++++++++++--- pkg/handlers/server.go | 10 ++++++ 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index 7d2d7cc0..fd199a09 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -17,6 +17,7 @@ limitations under the License. package handlers import ( + "bytes" "context" "encoding/json" "fmt" @@ -44,9 +45,9 @@ func (s *Server) HandleResponseHeaders(ctx context.Context, reqCtx *RequestConte if !headers.GetEndOfStream() { log.FromContext(ctx).V(logutil.VERBOSE).Info("captured response headers, deferring response until body arrives...") - return nil } - // EndOfStream means no body is expected, return HeadersResponse immediately + // Always respond to response headers so Envoy proceeds with body chunks. + // In STREAMED/FULL_DUPLEX_STREAMED mode, Envoy blocks until we respond. return []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_ResponseHeaders{ @@ -64,8 +65,15 @@ func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext, } if err := json.Unmarshal(responseBodyBytes, &reqCtx.Response.Body); err != nil { - logger.Error(err, "Failed to parse response body as JSON, skipping response plugins") - return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil + // Try parsing as SSE (Server-Sent Events) — streaming responses from providers + // like Anthropic use SSE format which isn't valid JSON. + if sseBody, sseErr := parseSSEResponseBody(responseBodyBytes); sseErr == nil && sseBody != nil { + reqCtx.Response.Body = sseBody + logger.V(logutil.VERBOSE).Info("parsed SSE response body for response plugins") + } else { + logger.Error(err, "Failed to parse response body as JSON or SSE, skipping response plugins") + return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil + } } if err := s.runResponsePlugins(ctx, reqCtx.CycleState, reqCtx.Response); err != nil { @@ -130,6 +138,62 @@ func (s *Server) HandleResponseTrailers(trailers *eppb.HttpTrailers) ([]*eppb.Pr }, nil } +// parseSSEResponseBody extracts a composite response body from an SSE (Server-Sent Events) +// stream. It scans all "data:" lines for JSON objects and merges usage/model fields into +// a single map that response plugins can process. This enables usage-tracking and metering +// plugins to work with streaming responses from providers like Anthropic and OpenAI. +func parseSSEResponseBody(body []byte) (map[string]any, error) { + result := map[string]any{} + lines := bytes.Split(body, []byte("\n")) + + for _, line := range lines { + line = bytes.TrimSpace(line) + if !bytes.HasPrefix(line, []byte("data:")) { + continue + } + data := bytes.TrimSpace(line[5:]) + if len(data) == 0 || bytes.Equal(data, []byte("[DONE]")) { + continue + } + + var event map[string]any + if err := json.Unmarshal(data, &event); err != nil { + continue + } + + if model, ok := event["model"].(string); ok && model != "" { + result["model"] = model + } + + // Check for usage at top level (Anthropic) or nested in response (OpenAI Responses API) + usage, _ := event["usage"].(map[string]any) + if usage == nil { + if resp, ok := event["response"].(map[string]any); ok { + usage, _ = resp["usage"].(map[string]any) + if m, ok := resp["model"].(string); ok && m != "" { + result["model"] = m + } + } + } + if usage != nil { + existing, _ := result["usage"].(map[string]any) + if existing == nil { + existing = map[string]any{} + } + for k, v := range usage { + existing[k] = v + } + result["usage"] = existing + } + } + + if len(result) == 0 { + return nil, fmt.Errorf("no parseable SSE data events found") + } + + return result, nil +} + // runResponsePlugins executes response plugins in the order they were registered. func (s *Server) runResponsePlugins(ctx context.Context, cycleState *plugin.CycleState, response *requesthandling.InferenceResponse) error { logger := log.FromContext(ctx).V(logutil.DEFAULT) diff --git a/pkg/handlers/server.go b/pkg/handlers/server.go index 68a37656..77d1aa69 100644 --- a/pkg/handlers/server.go +++ b/pkg/handlers/server.go @@ -141,6 +141,16 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { loggerVerbose.Info("Incoming response body chunk", "EoS", v.ResponseBody.EndOfStream) responseBody = append(responseBody, v.ResponseBody.Body...) if !v.ResponseBody.EndOfStream { + // Send an immediate response for this chunk so Envoy continues + // streaming. Without this, Envoy blocks waiting for our response + // and stops forwarding subsequent chunks. + if sendErr := srv.Send(&extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ResponseBody{ + ResponseBody: &extProcPb.BodyResponse{}, + }, + }); sendErr != nil { + return status.Errorf(codes.Unknown, "failed to send streaming response ack: %v", sendErr) + } continue } responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody) From 8cd97475d7c56d561f4e4f1b32a8befff22feb4a Mon Sep 17 00:00:00 2001 From: Noy Itzikowitz Date: Mon, 15 Jun 2026 11:15:40 -0700 Subject: [PATCH 2/2] fix: use Content-Type header to select response parser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review feedback from @shmuelk: instead of trying JSON parse and falling back to SSE on failure, check the Content-Type response header upfront to select the correct parser. - text/event-stream → SSE parser (parseSSEResponseBody) - anything else → JSON parser (json.Unmarshal) Also fix streaming tests: - JSON body tests now use content-type: application/json (not text/event-stream) - Tests receive response header ack before sending body chunks - Tests receive chunk acks for non-final streaming chunks --- pkg/handlers/response.go | 23 +++++++++++++++-------- pkg/handlers/server_test.go | 14 +++++++++++++- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index fd199a09..7d73f52b 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "strconv" + "strings" "time" eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -64,14 +65,20 @@ func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext, return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil } - if err := json.Unmarshal(responseBodyBytes, &reqCtx.Response.Body); err != nil { - // Try parsing as SSE (Server-Sent Events) — streaming responses from providers - // like Anthropic use SSE format which isn't valid JSON. - if sseBody, sseErr := parseSSEResponseBody(responseBodyBytes); sseErr == nil && sseBody != nil { - reqCtx.Response.Body = sseBody - logger.V(logutil.VERBOSE).Info("parsed SSE response body for response plugins") - } else { - logger.Error(err, "Failed to parse response body as JSON or SSE, skipping response plugins") + contentType := reqCtx.Response.Headers["content-type"] + isSSE := strings.Contains(contentType, "text/event-stream") + + if isSSE { + sseBody, err := parseSSEResponseBody(responseBodyBytes) + if err != nil || sseBody == nil { + logger.Error(err, "failed to parse SSE response body, skipping response plugins") + return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil + } + reqCtx.Response.Body = sseBody + logger.V(logutil.VERBOSE).Info("parsed SSE response body for response plugins") + } else { + if err := json.Unmarshal(responseBodyBytes, &reqCtx.Response.Body); err != nil { + logger.Error(err, "failed to parse response body as JSON, skipping response plugins") return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil } } diff --git a/pkg/handlers/server_test.go b/pkg/handlers/server_test.go index 406195ad..bd044036 100644 --- a/pkg/handlers/server_test.go +++ b/pkg/handlers/server_test.go @@ -167,7 +167,7 @@ func TestHandleResponseBody_Streaming(t *testing.T) { respHeaders := utils.BuildEnvoyGRPCHeaders(map[string]string{ "x-test": "body", ":method": "POST", - "content-type": "text/event-stream", + "content-type": "application/json", }, true) request := &extProcPb.ProcessingRequest{ Request: &extProcPb.ProcessingRequest_ResponseHeaders{ @@ -178,6 +178,12 @@ func TestHandleResponseBody_Streaming(t *testing.T) { t.Fatalf("send response headers: %v", err) } + // Receive the response header ack before sending body chunks. + // HandleResponseHeaders always responds immediately so Envoy proceeds. + if _, err := process.Recv(); err != nil { + t.Fatalf("recv response headers ack: %v", err) + } + for _, c := range tc.chunks { request = &extProcPb.ProcessingRequest{ Request: &extProcPb.ProcessingRequest_ResponseBody{ @@ -190,6 +196,12 @@ func TestHandleResponseBody_Streaming(t *testing.T) { if err := process.Send(request); err != nil { t.Fatalf("send response body chunk: %v", err) } + // For non-final chunks, receive the streaming ack + if !c.endOfStream { + if _, err := process.Recv(); err != nil { + t.Fatalf("recv chunk ack: %v", err) + } + } } got := make([]*extProcPb.ProcessingResponse, 0, len(want))