diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index 7d2d7cc0..7d73f52b 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -17,10 +17,12 @@ limitations under the License. package handlers import ( + "bytes" "context" "encoding/json" "fmt" "strconv" + "strings" "time" eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -44,9 +46,9 @@ func (s *Server) HandleResponseHeaders(ctx context.Context, reqCtx *RequestConte if !headers.GetEndOfStream() { log.FromContext(ctx).V(logutil.VERBOSE).Info("captured response headers, deferring response until body arrives...") - return nil } - // EndOfStream means no body is expected, return HeadersResponse immediately + // Always respond to response headers so Envoy proceeds with body chunks. + // In STREAMED/FULL_DUPLEX_STREAMED mode, Envoy blocks until we respond. return []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_ResponseHeaders{ @@ -63,9 +65,22 @@ func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext, return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil } - if err := json.Unmarshal(responseBodyBytes, &reqCtx.Response.Body); err != nil { - logger.Error(err, "Failed to parse response body as JSON, skipping response plugins") - return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil + contentType := reqCtx.Response.Headers["content-type"] + isSSE := strings.Contains(contentType, "text/event-stream") + + if isSSE { + sseBody, err := parseSSEResponseBody(responseBodyBytes) + if err != nil || sseBody == nil { + logger.Error(err, "failed to parse SSE response body, skipping response plugins") + return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil + } + reqCtx.Response.Body = sseBody + logger.V(logutil.VERBOSE).Info("parsed SSE response body for response plugins") + } else { + if err := json.Unmarshal(responseBodyBytes, &reqCtx.Response.Body); err != nil { + logger.Error(err, "failed to parse response body as JSON, skipping response plugins") + return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil + } } if err := s.runResponsePlugins(ctx, reqCtx.CycleState, reqCtx.Response); err != nil { @@ -130,6 +145,62 @@ func (s *Server) HandleResponseTrailers(trailers *eppb.HttpTrailers) ([]*eppb.Pr }, nil } +// parseSSEResponseBody extracts a composite response body from an SSE (Server-Sent Events) +// stream. It scans all "data:" lines for JSON objects and merges usage/model fields into +// a single map that response plugins can process. This enables usage-tracking and metering +// plugins to work with streaming responses from providers like Anthropic and OpenAI. +func parseSSEResponseBody(body []byte) (map[string]any, error) { + result := map[string]any{} + lines := bytes.Split(body, []byte("\n")) + + for _, line := range lines { + line = bytes.TrimSpace(line) + if !bytes.HasPrefix(line, []byte("data:")) { + continue + } + data := bytes.TrimSpace(line[5:]) + if len(data) == 0 || bytes.Equal(data, []byte("[DONE]")) { + continue + } + + var event map[string]any + if err := json.Unmarshal(data, &event); err != nil { + continue + } + + if model, ok := event["model"].(string); ok && model != "" { + result["model"] = model + } + + // Check for usage at top level (Anthropic) or nested in response (OpenAI Responses API) + usage, _ := event["usage"].(map[string]any) + if usage == nil { + if resp, ok := event["response"].(map[string]any); ok { + usage, _ = resp["usage"].(map[string]any) + if m, ok := resp["model"].(string); ok && m != "" { + result["model"] = m + } + } + } + if usage != nil { + existing, _ := result["usage"].(map[string]any) + if existing == nil { + existing = map[string]any{} + } + for k, v := range usage { + existing[k] = v + } + result["usage"] = existing + } + } + + if len(result) == 0 { + return nil, fmt.Errorf("no parseable SSE data events found") + } + + return result, nil +} + // runResponsePlugins executes response plugins in the order they were registered. func (s *Server) runResponsePlugins(ctx context.Context, cycleState *plugin.CycleState, response *requesthandling.InferenceResponse) error { logger := log.FromContext(ctx).V(logutil.DEFAULT) diff --git a/pkg/handlers/server.go b/pkg/handlers/server.go index 68a37656..77d1aa69 100644 --- a/pkg/handlers/server.go +++ b/pkg/handlers/server.go @@ -141,6 +141,16 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { loggerVerbose.Info("Incoming response body chunk", "EoS", v.ResponseBody.EndOfStream) responseBody = append(responseBody, v.ResponseBody.Body...) if !v.ResponseBody.EndOfStream { + // Send an immediate response for this chunk so Envoy continues + // streaming. Without this, Envoy blocks waiting for our response + // and stops forwarding subsequent chunks. + if sendErr := srv.Send(&extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ResponseBody{ + ResponseBody: &extProcPb.BodyResponse{}, + }, + }); sendErr != nil { + return status.Errorf(codes.Unknown, "failed to send streaming response ack: %v", sendErr) + } continue } responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody) diff --git a/pkg/handlers/server_test.go b/pkg/handlers/server_test.go index 406195ad..bd044036 100644 --- a/pkg/handlers/server_test.go +++ b/pkg/handlers/server_test.go @@ -167,7 +167,7 @@ func TestHandleResponseBody_Streaming(t *testing.T) { respHeaders := utils.BuildEnvoyGRPCHeaders(map[string]string{ "x-test": "body", ":method": "POST", - "content-type": "text/event-stream", + "content-type": "application/json", }, true) request := &extProcPb.ProcessingRequest{ Request: &extProcPb.ProcessingRequest_ResponseHeaders{ @@ -178,6 +178,12 @@ func TestHandleResponseBody_Streaming(t *testing.T) { t.Fatalf("send response headers: %v", err) } + // Receive the response header ack before sending body chunks. + // HandleResponseHeaders always responds immediately so Envoy proceeds. + if _, err := process.Recv(); err != nil { + t.Fatalf("recv response headers ack: %v", err) + } + for _, c := range tc.chunks { request = &extProcPb.ProcessingRequest{ Request: &extProcPb.ProcessingRequest_ResponseBody{ @@ -190,6 +196,12 @@ func TestHandleResponseBody_Streaming(t *testing.T) { if err := process.Send(request); err != nil { t.Fatalf("send response body chunk: %v", err) } + // For non-final chunks, receive the streaming ack + if !c.endOfStream { + if _, err := process.Recv(); err != nil { + t.Fatalf("recv chunk ack: %v", err) + } + } } got := make([]*extProcPb.ProcessingResponse, 0, len(want))