From 72b5454928fdb7ec677933f69251673a8ceea663 Mon Sep 17 00:00:00 2001 From: Noy Itzikowitz Date: Thu, 28 May 2026 12:51:50 -0700 Subject: [PATCH 1/4] feat: parse SSE streaming responses for response plugins When the response body is not valid JSON (e.g., SSE/Server-Sent Events from streaming providers like Anthropic), parse the SSE data lines to extract usage and model information. This enables response plugins (usage-tracking, metering) to process streaming responses that were previously skipped with "Failed to parse response body as JSON". Also fixes two issues with streaming response handling: 1. Always respond to response headers so Envoy proceeds with body chunks (previously returned nil, causing per-message timeout) 2. Send an immediate ack for each non-EoS response body chunk so Envoy continues forwarding subsequent chunks instead of blocking Signed-off-by: Noy Itzikowitz --- pkg/handlers/response.go | 72 +++++++++++++++++++++++++++++++++++++--- pkg/handlers/server.go | 10 ++++++ 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index 7d2d7cc..fd199a0 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -17,6 +17,7 @@ limitations under the License. package handlers import ( + "bytes" "context" "encoding/json" "fmt" @@ -44,9 +45,9 @@ func (s *Server) HandleResponseHeaders(ctx context.Context, reqCtx *RequestConte if !headers.GetEndOfStream() { log.FromContext(ctx).V(logutil.VERBOSE).Info("captured response headers, deferring response until body arrives...") - return nil } - // EndOfStream means no body is expected, return HeadersResponse immediately + // Always respond to response headers so Envoy proceeds with body chunks. + // In STREAMED/FULL_DUPLEX_STREAMED mode, Envoy blocks until we respond. return []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_ResponseHeaders{ @@ -64,8 +65,15 @@ func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext, } if err := json.Unmarshal(responseBodyBytes, &reqCtx.Response.Body); err != nil { - logger.Error(err, "Failed to parse response body as JSON, skipping response plugins") - return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil + // Try parsing as SSE (Server-Sent Events) — streaming responses from providers + // like Anthropic use SSE format which isn't valid JSON. + if sseBody, sseErr := parseSSEResponseBody(responseBodyBytes); sseErr == nil && sseBody != nil { + reqCtx.Response.Body = sseBody + logger.V(logutil.VERBOSE).Info("parsed SSE response body for response plugins") + } else { + logger.Error(err, "Failed to parse response body as JSON or SSE, skipping response plugins") + return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil + } } if err := s.runResponsePlugins(ctx, reqCtx.CycleState, reqCtx.Response); err != nil { @@ -130,6 +138,62 @@ func (s *Server) HandleResponseTrailers(trailers *eppb.HttpTrailers) ([]*eppb.Pr }, nil } +// parseSSEResponseBody extracts a composite response body from an SSE (Server-Sent Events) +// stream. It scans all "data:" lines for JSON objects and merges usage/model fields into +// a single map that response plugins can process. This enables usage-tracking and metering +// plugins to work with streaming responses from providers like Anthropic and OpenAI. +func parseSSEResponseBody(body []byte) (map[string]any, error) { + result := map[string]any{} + lines := bytes.Split(body, []byte("\n")) + + for _, line := range lines { + line = bytes.TrimSpace(line) + if !bytes.HasPrefix(line, []byte("data:")) { + continue + } + data := bytes.TrimSpace(line[5:]) + if len(data) == 0 || bytes.Equal(data, []byte("[DONE]")) { + continue + } + + var event map[string]any + if err := json.Unmarshal(data, &event); err != nil { + continue + } + + if model, ok := event["model"].(string); ok && model != "" { + result["model"] = model + } + + // Check for usage at top level (Anthropic) or nested in response (OpenAI Responses API) + usage, _ := event["usage"].(map[string]any) + if usage == nil { + if resp, ok := event["response"].(map[string]any); ok { + usage, _ = resp["usage"].(map[string]any) + if m, ok := resp["model"].(string); ok && m != "" { + result["model"] = m + } + } + } + if usage != nil { + existing, _ := result["usage"].(map[string]any) + if existing == nil { + existing = map[string]any{} + } + for k, v := range usage { + existing[k] = v + } + result["usage"] = existing + } + } + + if len(result) == 0 { + return nil, fmt.Errorf("no parseable SSE data events found") + } + + return result, nil +} + // runResponsePlugins executes response plugins in the order they were registered. func (s *Server) runResponsePlugins(ctx context.Context, cycleState *plugin.CycleState, response *requesthandling.InferenceResponse) error { logger := log.FromContext(ctx).V(logutil.DEFAULT) diff --git a/pkg/handlers/server.go b/pkg/handlers/server.go index 68a3765..77d1aa6 100644 --- a/pkg/handlers/server.go +++ b/pkg/handlers/server.go @@ -141,6 +141,16 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { loggerVerbose.Info("Incoming response body chunk", "EoS", v.ResponseBody.EndOfStream) responseBody = append(responseBody, v.ResponseBody.Body...) if !v.ResponseBody.EndOfStream { + // Send an immediate response for this chunk so Envoy continues + // streaming. Without this, Envoy blocks waiting for our response + // and stops forwarding subsequent chunks. + if sendErr := srv.Send(&extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ResponseBody{ + ResponseBody: &extProcPb.BodyResponse{}, + }, + }); sendErr != nil { + return status.Errorf(codes.Unknown, "failed to send streaming response ack: %v", sendErr) + } continue } responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody) From e08bfae7e10d8c87efd6a6401976e071a60431f5 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Tue, 2 Jun 2026 15:51:54 +0300 Subject: [PATCH 2/4] Fix lint. Signed-off-by: Mohammad --- pkg/handlers/response.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index fd199a0..5bbea55 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "strconv" "time" @@ -188,7 +189,7 @@ func parseSSEResponseBody(body []byte) (map[string]any, error) { } if len(result) == 0 { - return nil, fmt.Errorf("no parseable SSE data events found") + return nil, errors.New("no parseable SSE data events found") } return result, nil From 3918d6624a6612417da073a294f390345591edc7 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Wed, 3 Jun 2026 16:55:37 +0300 Subject: [PATCH 3/4] Fix test. Signed-off-by: Mohammad --- pkg/handlers/server_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/handlers/server_test.go b/pkg/handlers/server_test.go index 406195a..421b12c 100644 --- a/pkg/handlers/server_test.go +++ b/pkg/handlers/server_test.go @@ -177,6 +177,10 @@ func TestHandleResponseBody_Streaming(t *testing.T) { if err := process.Send(request); err != nil { t.Fatalf("send response headers: %v", err) } + // Discard the immediate header ack (HandleResponseHeaders always responds now). + if _, err := process.Recv(); err != nil { + t.Fatalf("recv header ack: %v", err) + } for _, c := range tc.chunks { request = &extProcPb.ProcessingRequest{ @@ -190,6 +194,12 @@ func TestHandleResponseBody_Streaming(t *testing.T) { if err := process.Send(request); err != nil { t.Fatalf("send response body chunk: %v", err) } + // Discard the immediate ack for non-EoS chunks. + if !c.endOfStream { + if _, err := process.Recv(); err != nil { + t.Fatalf("recv chunk ack: %v", err) + } + } } got := make([]*extProcPb.ProcessingResponse, 0, len(want)) From c077d4d6af3e8ba051abd6ef948ac15020ab7e57 Mon Sep 17 00:00:00 2001 From: David Breitgand Date: Thu, 4 Jun 2026 15:44:15 +0300 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: David Breitgand Signed-off-by: David Breitgand --- pkg/handlers/response.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index 5bbea55..7e35eb4 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -143,23 +143,32 @@ func (s *Server) HandleResponseTrailers(trailers *eppb.HttpTrailers) ([]*eppb.Pr // stream. It scans all "data:" lines for JSON objects and merges usage/model fields into // a single map that response plugins can process. This enables usage-tracking and metering // plugins to work with streaming responses from providers like Anthropic and OpenAI. +// parseSSEResponseBody extracts a composite response body from an SSE (Server-Sent Events) +// stream. It parses by SSE event boundaries instead of individual lines because one logical +// event may legally contain multiple consecutive `data:` lines that must be joined before JSON decoding. func parseSSEResponseBody(body []byte) (map[string]any, error) { result := map[string]any{} lines := bytes.Split(body, []byte("\n")) + eventDataLines := make([][]byte, 0) - for _, line := range lines { - line = bytes.TrimSpace(line) - if !bytes.HasPrefix(line, []byte("data:")) { - continue + // flushEvent keeps the SSE framing logic local to this parser because the bug happens + // exactly at event boundaries: we must join all `data:` lines for one event before parsing. + flushEvent := func() { + if len(eventDataLines) == 0 { + return } - data := bytes.TrimSpace(line[5:]) + + data := bytes.Join(eventDataLines, []byte("\n")) + eventDataLines = eventDataLines[:0] + + data = bytes.TrimSpace(data) if len(data) == 0 || bytes.Equal(data, []byte("[DONE]")) { - continue + return } var event map[string]any if err := json.Unmarshal(data, &event); err != nil { - continue + return } if model, ok := event["model"].(string); ok && model != "" {