From 5f2053cc8dbfb24d52e8b3955721105afbff4cda Mon Sep 17 00:00:00 2001 From: Noy Itzikowitz Date: Mon, 22 Jun 2026 08:36:49 -0400 Subject: [PATCH 1/6] feat: add ResponseChunkProcessor interface for streaming response processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split response processing into two interfaces per review feedback: - ResponseProcessor: processes the complete buffered response body (existing) - ResponseChunkProcessor: processes individual chunks without buffering (new) The Profile now carries both processor lists and a NeedsResponseBuffering flag. The config loader sorts response plugins into the right list based on which interface they implement. A plugin can implement both. When ResponseProcessor plugins are present → buffer the full response. When only ResponseChunkProcessors are present → stream chunks through. When neither is present → buffer (backward compatible). Signed-off-by: Noy Itzikowitz --- pkg/config/loader/configloader.go | 22 +++++--- .../interface/requesthandling/plugins.go | 13 ++++- .../interface/requesthandling/types.go | 9 +++- pkg/handlers/response.go | 50 +++++++++++++++++++ pkg/handlers/server.go | 40 ++++++++++++--- 5 files changed, 114 insertions(+), 20 deletions(-) diff --git a/pkg/config/loader/configloader.go b/pkg/config/loader/configloader.go index 25cef3a7..5697bffa 100644 --- a/pkg/config/loader/configloader.go +++ b/pkg/config/loader/configloader.go @@ -214,9 +214,7 @@ func buildProfiles(rawProfiles []configapi.Profile, handle plugin.Handle) (map[s return nil, fmt.Errorf("the profile %s must have one or both of the Request and Response sections", rawProfile.Name) } - theProfile := requesthandling.Profile{ - ResponsePlugins: make([]requesthandling.ResponseProcessor, len(rawProfile.Plugins.Response)), - } + theProfile := requesthandling.Profile{} for _, pluginRef := range rawProfile.Plugins.Request { rawPlugin := handle.Plugin(pluginRef.PluginRef) @@ -245,17 +243,25 @@ func buildProfiles(rawProfiles []configapi.Profile, handle plugin.Handle) (map[s } } - for idx, pluginRef := range rawProfile.Plugins.Response { + for _, pluginRef := range rawProfile.Plugins.Response { rawPlugin := handle.Plugin(pluginRef.PluginRef) if rawPlugin == nil { return nil, fmt.Errorf("there is no plugin named %s", pluginRef.PluginRef) } - thePlugin, ok := rawPlugin.(requesthandling.ResponseProcessor) - if !ok { - return nil, fmt.Errorf("the plugin named %s is not a ResponseProcessor", pluginRef.PluginRef) + matched := false + if bodyPlugin, ok := rawPlugin.(requesthandling.ResponseProcessor); ok { + theProfile.ResponsePlugins = append(theProfile.ResponsePlugins, bodyPlugin) + matched = true + } + if chunkPlugin, ok := rawPlugin.(requesthandling.ResponseChunkProcessor); ok { + theProfile.ResponseChunkProcessors = append(theProfile.ResponseChunkProcessors, chunkPlugin) + matched = true + } + if !matched { + return nil, fmt.Errorf("the plugin named %s is not a ResponseProcessor or ResponseChunkProcessor", pluginRef.PluginRef) } - theProfile.ResponsePlugins[idx] = thePlugin } + theProfile.NeedsResponseBuffering = len(theProfile.ResponsePlugins) > 0 profiles[rawProfile.Name] = &theProfile } diff --git a/pkg/framework/interface/requesthandling/plugins.go b/pkg/framework/interface/requesthandling/plugins.go index dad4ebb6..ae9b40d8 100644 --- a/pkg/framework/interface/requesthandling/plugins.go +++ b/pkg/framework/interface/requesthandling/plugins.go @@ -43,13 +43,22 @@ type RequestProcessor interface { ProcessRequest(ctx context.Context, cycleState *plugin.CycleState, request *InferenceRequest) error } +// ResponseProcessor processes the complete buffered response body. +// If any plugin in a profile implements this interface, the framework buffers +// the entire response before calling ProcessResponse on each such plugin. type ResponseProcessor interface { plugin.Plugin - // ProcessResponse runs the ResponseProcessor plugin. - // ResponseProcessor can mutate the headers and/or the body of the response. ProcessResponse(ctx context.Context, cycleState *plugin.CycleState, response *InferenceResponse) error } +// ResponseChunkProcessor processes individual response body chunks as they +// stream through without buffering. The framework calls ProcessResponseChunk +// for each chunk; the returned bytes are forwarded to the client. +type ResponseChunkProcessor interface { + plugin.Plugin + ProcessResponseChunk(ctx context.Context, cycleState *plugin.CycleState, chunk []byte, isFinal bool) ([]byte, error) +} + type PostProcessor interface { plugin.Plugin diff --git a/pkg/framework/interface/requesthandling/types.go b/pkg/framework/interface/requesthandling/types.go index 78afa58d..271c56e6 100644 --- a/pkg/framework/interface/requesthandling/types.go +++ b/pkg/framework/interface/requesthandling/types.go @@ -114,9 +114,14 @@ type Profile struct { // RequestPlugins are the request processing plugin instances executed by the request handler, // in the same order provided in the configuration file. RequestPlugins []RequestProcessor - // ResponsePlugins are the response processing plugin instances executed by the response handler, - // in the same order provided in the configuration file. + // ResponsePlugins process the complete buffered response body. ResponsePlugins []ResponseProcessor + // ResponseChunkProcessors process individual response chunks without buffering. + ResponseChunkProcessors []ResponseChunkProcessor + // NeedsResponseBuffering is true when any ResponsePlugin is present. + // The framework uses this to decide whether to buffer the full response body + // or stream chunks through ResponseChunkProcessors. + NeedsResponseBuffering bool // ModelSelectorPlugins are the Filter, Scorer (including WeightedScorer), and Picker plugin // instances to be wired into any model-selector plugin present in RequestPlugins. ModelSelectorPlugins []plugin.Plugin diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index 03210f11..262c8423 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -131,6 +131,56 @@ func (s *Server) generateEmptyResponseBodyResponse(responseBodyBytes []byte) []* return responses } +// buildChunkPassthroughResponse wraps a single response body chunk in the +// ext_proc streaming response format. Used when no ResponseProcessor (buffered) +// plugins are present — chunks flow through ResponseChunkProcessors and are +// forwarded immediately. +func (s *Server) buildChunkPassthroughResponse(chunk []byte, isFinal bool) []*eppb.ProcessingResponse { + if isFinal { + return []*eppb.ProcessingResponse{ + { + Response: &eppb.ProcessingResponse_ResponseHeaders{ + ResponseHeaders: &eppb.HeadersResponse{}, + }, + }, + { + Response: &eppb.ProcessingResponse_ResponseBody{ + ResponseBody: &eppb.BodyResponse{ + Response: &eppb.CommonResponse{ + BodyMutation: &eppb.BodyMutation{ + Mutation: &eppb.BodyMutation_StreamedResponse{ + StreamedResponse: &eppb.StreamedBodyResponse{ + Body: chunk, + EndOfStream: true, + }, + }, + }, + }, + }, + }, + }, + } + } + return []*eppb.ProcessingResponse{ + { + Response: &eppb.ProcessingResponse_ResponseBody{ + ResponseBody: &eppb.BodyResponse{ + Response: &eppb.CommonResponse{ + BodyMutation: &eppb.BodyMutation{ + Mutation: &eppb.BodyMutation_StreamedResponse{ + StreamedResponse: &eppb.StreamedBodyResponse{ + Body: chunk, + EndOfStream: false, + }, + }, + }, + }, + }, + }, + }, + } +} + // HandleResponseTrailers handles response trailers. func (s *Server) HandleResponseTrailers(trailers *eppb.HttpTrailers) ([]*eppb.ProcessingResponse, error) { return []*eppb.ProcessingResponse{ diff --git a/pkg/handlers/server.go b/pkg/handlers/server.go index 3cab9c26..de2ef294 100644 --- a/pkg/handlers/server.go +++ b/pkg/handlers/server.go @@ -176,15 +176,39 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { if reqCtx.ResponseFirstChunkTimestamp.IsZero() { reqCtx.ResponseFirstChunkTimestamp = time.Now() } - responseBody = append(responseBody, v.ResponseBody.Body...) - if !v.ResponseBody.EndOfStream { - continue + + hasChunkProcessors := reqCtx.Profile != nil && len(reqCtx.Profile.ResponseChunkProcessors) > 0 + needsBuffering := !hasChunkProcessors + if reqCtx.Profile != nil && reqCtx.Profile.NeedsResponseBuffering { + needsBuffering = true + } + if needsBuffering { + responseBody = append(responseBody, v.ResponseBody.Body...) + if !v.ResponseBody.EndOfStream { + continue + } + reqCtx.ResponseCompleteTimestamp = time.Now() + model, _ := reqCtx.Request.Body["model"].(string) + metrics.RecordRequestTTFT(model, reqCtx.ResponseFirstChunkTimestamp.Sub(reqCtx.RequestReceivedTimestamp)) + responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody) + loggerVerbose.Info("processing response body complete") + } else { + chunk := v.ResponseBody.Body + isFinal := v.ResponseBody.EndOfStream + if reqCtx.Profile != nil { + for _, cp := range reqCtx.Profile.ResponseChunkProcessors { + chunk, err = cp.ProcessResponseChunk(ctx, reqCtx.CycleState, chunk, isFinal) + if err != nil { + break + } + } + } + if isFinal { + reqCtx.ResponseCompleteTimestamp = time.Now() + } + responses = s.buildChunkPassthroughResponse(chunk, isFinal) + loggerVerbose.Info("response body streaming pass-through complete") } - reqCtx.ResponseCompleteTimestamp = time.Now() - model, _ := reqCtx.Request.Body["model"].(string) - metrics.RecordRequestTTFT(model, reqCtx.ResponseFirstChunkTimestamp.Sub(reqCtx.RequestReceivedTimestamp)) - responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody) - loggerVerbose.Info("processing response body complete") case *extProcPb.ProcessingRequest_ResponseTrailers: responses, err = s.HandleResponseTrailers(v.ResponseTrailers) default: From 541110e21d0b9440a5100421e47e0b1ded081391 Mon Sep 17 00:00:00 2001 From: Noy Itzikowitz Date: Wed, 24 Jun 2026 14:55:05 -0400 Subject: [PATCH 2/6] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20simpl?= =?UTF-8?q?ify=20buffering,=20move=20HandleResponseChunk,=20validate=20no?= =?UTF-8?q?=20mixing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Simplify server.go buffering check to Profile.NeedsResponseBuffering (comment 1) - Remove matched variable, use continue pattern in config loader (comment 2) - Move chunk processing from server.go to HandleResponseChunk in response.go (comment 3) - Add validation: reject profiles that mix ResponseProcessor and ResponseChunkProcessor plugins (comment 4) - HandleResponseChunk runs chunk processors with metrics and logging, following the same pattern as HandleResponseBody - Update streaming test to set NeedsResponseBuffering for buffered path Signed-off-by: Noy Itzikowitz --- pkg/config/loader/configloader.go | 12 +++--- pkg/handlers/response.go | 61 ++++++++++++++++--------------- pkg/handlers/server.go | 23 ++---------- pkg/handlers/server_test.go | 2 + 4 files changed, 43 insertions(+), 55 deletions(-) diff --git a/pkg/config/loader/configloader.go b/pkg/config/loader/configloader.go index 5697bffa..9edf78c5 100644 --- a/pkg/config/loader/configloader.go +++ b/pkg/config/loader/configloader.go @@ -248,18 +248,18 @@ func buildProfiles(rawProfiles []configapi.Profile, handle plugin.Handle) (map[s if rawPlugin == nil { return nil, fmt.Errorf("there is no plugin named %s", pluginRef.PluginRef) } - matched := false if bodyPlugin, ok := rawPlugin.(requesthandling.ResponseProcessor); ok { theProfile.ResponsePlugins = append(theProfile.ResponsePlugins, bodyPlugin) - matched = true + continue } if chunkPlugin, ok := rawPlugin.(requesthandling.ResponseChunkProcessor); ok { theProfile.ResponseChunkProcessors = append(theProfile.ResponseChunkProcessors, chunkPlugin) - matched = true - } - if !matched { - return nil, fmt.Errorf("the plugin named %s is not a ResponseProcessor or ResponseChunkProcessor", pluginRef.PluginRef) + continue } + return nil, fmt.Errorf("the plugin named %s is not a ResponseProcessor nor ResponseChunkProcessor", pluginRef.PluginRef) + } + if len(theProfile.ResponsePlugins) > 0 && len(theProfile.ResponseChunkProcessors) > 0 { + return nil, fmt.Errorf("profile %s mixes ResponseProcessor and ResponseChunkProcessor plugins — a profile must use one type exclusively", rawProfile.Name) } theProfile.NeedsResponseBuffering = len(theProfile.ResponsePlugins) > 0 diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index 262c8423..eb09142e 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -131,37 +131,27 @@ func (s *Server) generateEmptyResponseBodyResponse(responseBodyBytes []byte) []* return responses } -// buildChunkPassthroughResponse wraps a single response body chunk in the -// ext_proc streaming response format. Used when no ResponseProcessor (buffered) -// plugins are present — chunks flow through ResponseChunkProcessors and are -// forwarded immediately. -func (s *Server) buildChunkPassthroughResponse(chunk []byte, isFinal bool) []*eppb.ProcessingResponse { - if isFinal { - return []*eppb.ProcessingResponse{ - { - Response: &eppb.ProcessingResponse_ResponseHeaders{ - ResponseHeaders: &eppb.HeadersResponse{}, - }, - }, - { - Response: &eppb.ProcessingResponse_ResponseBody{ - ResponseBody: &eppb.BodyResponse{ - Response: &eppb.CommonResponse{ - BodyMutation: &eppb.BodyMutation{ - Mutation: &eppb.BodyMutation_StreamedResponse{ - StreamedResponse: &eppb.StreamedBodyResponse{ - Body: chunk, - EndOfStream: true, - }, - }, - }, - }, - }, - }, - }, +// HandleResponseChunk runs ResponseChunkProcessors on a single response body chunk +// and wraps the result in the ext_proc streaming response format. +func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext, chunk []byte, endOfStream bool) ([]*eppb.ProcessingResponse, error) { + logger := log.FromContext(ctx).V(logutil.DEFAULT) + verboseLogger := logger.V(logutil.VERBOSE) + + for _, cp := range reqCtx.Profile.ResponseChunkProcessors { + if verboseLogger.Enabled() { + verboseLogger.Info("Executing response chunk plugin", "plugin", cp.TypedName()) + } + var err error + before := time.Now() + chunk, err = cp.ProcessResponseChunk(ctx, reqCtx.CycleState, chunk, endOfStream) + metrics.RecordPluginProcessingLatency(responsePluginExtensionPoint, cp.TypedName().Type, cp.TypedName().Name, time.Since(before)) + if err != nil { + logger.Error(err, "Failed to execute response chunk plugin", "plugin", cp.TypedName()) + return nil, err } } - return []*eppb.ProcessingResponse{ + + responses := []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_ResponseBody{ ResponseBody: &eppb.BodyResponse{ @@ -170,7 +160,7 @@ func (s *Server) buildChunkPassthroughResponse(chunk []byte, isFinal bool) []*ep Mutation: &eppb.BodyMutation_StreamedResponse{ StreamedResponse: &eppb.StreamedBodyResponse{ Body: chunk, - EndOfStream: false, + EndOfStream: endOfStream, }, }, }, @@ -179,6 +169,17 @@ func (s *Server) buildChunkPassthroughResponse(chunk []byte, isFinal bool) []*ep }, }, } + + if endOfStream { + headerResp := &eppb.ProcessingResponse{ + Response: &eppb.ProcessingResponse_ResponseHeaders{ + ResponseHeaders: &eppb.HeadersResponse{}, + }, + } + responses = append([]*eppb.ProcessingResponse{headerResp}, responses...) + } + + return responses, nil } // HandleResponseTrailers handles response trailers. diff --git a/pkg/handlers/server.go b/pkg/handlers/server.go index de2ef294..717828b5 100644 --- a/pkg/handlers/server.go +++ b/pkg/handlers/server.go @@ -177,12 +177,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { reqCtx.ResponseFirstChunkTimestamp = time.Now() } - hasChunkProcessors := reqCtx.Profile != nil && len(reqCtx.Profile.ResponseChunkProcessors) > 0 - needsBuffering := !hasChunkProcessors - if reqCtx.Profile != nil && reqCtx.Profile.NeedsResponseBuffering { - needsBuffering = true - } - if needsBuffering { + if reqCtx.Profile.NeedsResponseBuffering { responseBody = append(responseBody, v.ResponseBody.Body...) if !v.ResponseBody.EndOfStream { continue @@ -193,21 +188,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody) loggerVerbose.Info("processing response body complete") } else { - chunk := v.ResponseBody.Body - isFinal := v.ResponseBody.EndOfStream - if reqCtx.Profile != nil { - for _, cp := range reqCtx.Profile.ResponseChunkProcessors { - chunk, err = cp.ProcessResponseChunk(ctx, reqCtx.CycleState, chunk, isFinal) - if err != nil { - break - } - } - } - if isFinal { + if v.ResponseBody.EndOfStream { reqCtx.ResponseCompleteTimestamp = time.Now() } - responses = s.buildChunkPassthroughResponse(chunk, isFinal) - loggerVerbose.Info("response body streaming pass-through complete") + responses, err = s.HandleResponseChunk(ctx, reqCtx, v.ResponseBody.Body, v.ResponseBody.EndOfStream) + loggerVerbose.Info("response chunk processing complete") } case *extProcPb.ProcessingRequest_ResponseTrailers: responses, err = s.HandleResponseTrailers(v.ResponseTrailers) diff --git a/pkg/handlers/server_test.go b/pkg/handlers/server_test.go index 332d71d4..75087c58 100644 --- a/pkg/handlers/server_test.go +++ b/pkg/handlers/server_test.go @@ -123,6 +123,7 @@ func TestHandleResponseBody_Streaming(t *testing.T) { wantFullBody := []byte(`{"choices":[{"text":"Hello!"}]}`) profiles := newTestProfiles() + profiles[testProfileName].NeedsResponseBuffering = true ref := newServerForTest(profiles) want, err := ref.HandleResponseBody(ctx, newTestRequestContext(profiles), wantFullBody) if err != nil { @@ -164,6 +165,7 @@ func TestHandleResponseBody_Streaming(t *testing.T) { t.Run(tc.name, func(t *testing.T) { streamCtx, cancel := context.WithCancel(logutil.NewTestLoggerIntoContext(context.Background())) profiles := newTestProfiles() + profiles[testProfileName].NeedsResponseBuffering = true srv := newServerForTest(profiles) testListener, errChan := utils.SetupTestStreamingServer(t, streamCtx, srv) process, conn := utils.GetStreamingServerClient(streamCtx, t) From c5b83afbb04b70e19db3a17aefbb4858861d3d7c Mon Sep 17 00:00:00 2001 From: Noy Itzikowitz Date: Thu, 25 Jun 2026 10:53:29 -0700 Subject: [PATCH 3/6] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20TTFT?= =?UTF-8?q?=20metric,=20chunk=20interface,=20nil=20checks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move TTFT metric recording after if/else so it applies to both buffered and streaming paths (comment 3+4) - Change ResponseChunkProcessor interface: takes string (framework converts once) + InferenceResponse (for header mutation) (comment 5) - Add nil profile check and empty processor check in HandleResponseChunk for bodiless requests like GET /v1/models (comment 6) - Extract runResponseChunkProcessors and buildStreamedChunkResponse following the same patterns as the body processing path Signed-off-by: Noy Itzikowitz --- .../interface/requesthandling/plugins.go | 7 ++-- pkg/handlers/response.go | 35 +++++++++++++++---- pkg/handlers/server.go | 12 +++---- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/pkg/framework/interface/requesthandling/plugins.go b/pkg/framework/interface/requesthandling/plugins.go index ae9b40d8..da49b86b 100644 --- a/pkg/framework/interface/requesthandling/plugins.go +++ b/pkg/framework/interface/requesthandling/plugins.go @@ -52,11 +52,12 @@ type ResponseProcessor interface { } // ResponseChunkProcessor processes individual response body chunks as they -// stream through without buffering. The framework calls ProcessResponseChunk -// for each chunk; the returned bytes are forwarded to the client. +// stream through without buffering. The framework converts the raw chunk bytes +// to a string once and passes it to all chunk processors. Plugins receive the +// InferenceResponse to allow header mutation. type ResponseChunkProcessor interface { plugin.Plugin - ProcessResponseChunk(ctx context.Context, cycleState *plugin.CycleState, chunk []byte, isFinal bool) ([]byte, error) + ProcessResponseChunk(ctx context.Context, cycleState *plugin.CycleState, response *InferenceResponse, chunk string, isFinal bool) error } type PostProcessor interface { diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index eb09142e..9ab35f05 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -133,24 +133,45 @@ func (s *Server) generateEmptyResponseBodyResponse(responseBodyBytes []byte) []* // HandleResponseChunk runs ResponseChunkProcessors on a single response body chunk // and wraps the result in the ext_proc streaming response format. -func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext, chunk []byte, endOfStream bool) ([]*eppb.ProcessingResponse, error) { +func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext, chunkBytes []byte, endOfStream bool) ([]*eppb.ProcessingResponse, error) { + // Bodiless requests (e.g., GET /v1/models) may not have a profile set. + if reqCtx.Profile == nil || len(reqCtx.Profile.ResponseChunkProcessors) == 0 { + return s.buildStreamedChunkResponse(chunkBytes, endOfStream), nil + } + + logger := log.FromContext(ctx).V(logutil.DEFAULT) + + chunk := string(chunkBytes) + + if err := s.runResponseChunkProcessors(ctx, reqCtx.CycleState, reqCtx.Response, chunk, endOfStream, reqCtx.Profile.ResponseChunkProcessors); err != nil { + logger.Error(err, "Failed to run response chunk processors") + return nil, err + } + + return s.buildStreamedChunkResponse(chunkBytes, endOfStream), nil +} + +// runResponseChunkProcessors executes chunk processors in the order they were registered. +func (s *Server) runResponseChunkProcessors(ctx context.Context, cycleState *plugin.CycleState, response *requesthandling.InferenceResponse, chunk string, isFinal bool, processors []requesthandling.ResponseChunkProcessor) error { logger := log.FromContext(ctx).V(logutil.DEFAULT) verboseLogger := logger.V(logutil.VERBOSE) - for _, cp := range reqCtx.Profile.ResponseChunkProcessors { + for _, cp := range processors { if verboseLogger.Enabled() { verboseLogger.Info("Executing response chunk plugin", "plugin", cp.TypedName()) } - var err error before := time.Now() - chunk, err = cp.ProcessResponseChunk(ctx, reqCtx.CycleState, chunk, endOfStream) + err := cp.ProcessResponseChunk(ctx, cycleState, response, chunk, isFinal) metrics.RecordPluginProcessingLatency(responsePluginExtensionPoint, cp.TypedName().Type, cp.TypedName().Name, time.Since(before)) if err != nil { - logger.Error(err, "Failed to execute response chunk plugin", "plugin", cp.TypedName()) - return nil, err + return err } } + return nil +} +// buildStreamedChunkResponse wraps a chunk in the ext_proc streaming response format. +func (s *Server) buildStreamedChunkResponse(chunk []byte, endOfStream bool) []*eppb.ProcessingResponse { responses := []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_ResponseBody{ @@ -179,7 +200,7 @@ func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext responses = append([]*eppb.ProcessingResponse{headerResp}, responses...) } - return responses, nil + return responses } // HandleResponseTrailers handles response trailers. diff --git a/pkg/handlers/server.go b/pkg/handlers/server.go index 717828b5..4de95c16 100644 --- a/pkg/handlers/server.go +++ b/pkg/handlers/server.go @@ -182,18 +182,18 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { if !v.ResponseBody.EndOfStream { continue } - reqCtx.ResponseCompleteTimestamp = time.Now() - model, _ := reqCtx.Request.Body["model"].(string) - metrics.RecordRequestTTFT(model, reqCtx.ResponseFirstChunkTimestamp.Sub(reqCtx.RequestReceivedTimestamp)) responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody) loggerVerbose.Info("processing response body complete") } else { - if v.ResponseBody.EndOfStream { - reqCtx.ResponseCompleteTimestamp = time.Now() - } responses, err = s.HandleResponseChunk(ctx, reqCtx, v.ResponseBody.Body, v.ResponseBody.EndOfStream) loggerVerbose.Info("response chunk processing complete") } + + if v.ResponseBody.EndOfStream { + reqCtx.ResponseCompleteTimestamp = time.Now() + model, _ := reqCtx.Request.Body["model"].(string) + metrics.RecordRequestTTFT(model, reqCtx.ResponseFirstChunkTimestamp.Sub(reqCtx.RequestReceivedTimestamp)) + } case *extProcPb.ProcessingRequest_ResponseTrailers: responses, err = s.HandleResponseTrailers(v.ResponseTrailers) default: From 1194b3a0cc3d8f6e15e90601c078a0f117f3038a Mon Sep 17 00:00:00 2001 From: Noy Itzikowitz Date: Thu, 25 Jun 2026 13:39:51 -0700 Subject: [PATCH 4/6] fix: send HeadersResponse with first chunk, not last In FULL_DUPLEX_STREAMED mode, envoy needs the deferred HeadersResponse before it forwards response body chunks to the client. Previously, buildStreamedChunkResponse only sent HeadersResponse on endOfStream, which caused empty response bodies when EoS was delayed or missing. Track ResponseHeadersSent on RequestContext and send HeadersResponse with the first chunk instead. Signed-off-by: Noy Itzikowitz --- pkg/handlers/response.go | 11 +++++++---- pkg/handlers/server.go | 1 + 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index 9ab35f05..1942abfb 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -136,7 +136,7 @@ func (s *Server) generateEmptyResponseBodyResponse(responseBodyBytes []byte) []* func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext, chunkBytes []byte, endOfStream bool) ([]*eppb.ProcessingResponse, error) { // Bodiless requests (e.g., GET /v1/models) may not have a profile set. if reqCtx.Profile == nil || len(reqCtx.Profile.ResponseChunkProcessors) == 0 { - return s.buildStreamedChunkResponse(chunkBytes, endOfStream), nil + return s.buildStreamedChunkResponse(reqCtx, chunkBytes, endOfStream), nil } logger := log.FromContext(ctx).V(logutil.DEFAULT) @@ -148,7 +148,7 @@ func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext return nil, err } - return s.buildStreamedChunkResponse(chunkBytes, endOfStream), nil + return s.buildStreamedChunkResponse(reqCtx, chunkBytes, endOfStream), nil } // runResponseChunkProcessors executes chunk processors in the order they were registered. @@ -171,7 +171,9 @@ func (s *Server) runResponseChunkProcessors(ctx context.Context, cycleState *plu } // buildStreamedChunkResponse wraps a chunk in the ext_proc streaming response format. -func (s *Server) buildStreamedChunkResponse(chunk []byte, endOfStream bool) []*eppb.ProcessingResponse { +// On the first call (responseHeadersSent=false), it prepends a HeadersResponse to answer +// the deferred response headers — envoy requires this before it accepts body responses. +func (s *Server) buildStreamedChunkResponse(reqCtx *RequestContext, chunk []byte, endOfStream bool) []*eppb.ProcessingResponse { responses := []*eppb.ProcessingResponse{ { Response: &eppb.ProcessingResponse_ResponseBody{ @@ -191,13 +193,14 @@ func (s *Server) buildStreamedChunkResponse(chunk []byte, endOfStream bool) []*e }, } - if endOfStream { + if !reqCtx.ResponseHeadersSent { headerResp := &eppb.ProcessingResponse{ Response: &eppb.ProcessingResponse_ResponseHeaders{ ResponseHeaders: &eppb.HeadersResponse{}, }, } responses = append([]*eppb.ProcessingResponse{headerResp}, responses...) + reqCtx.ResponseHeadersSent = true } return responses diff --git a/pkg/handlers/server.go b/pkg/handlers/server.go index 4de95c16..2010ab2d 100644 --- a/pkg/handlers/server.go +++ b/pkg/handlers/server.go @@ -76,6 +76,7 @@ type RequestContext struct { RequestSentTimestamp time.Time ResponseFirstChunkTimestamp time.Time ResponseCompleteTimestamp time.Time + ResponseHeadersSent bool Profile *requesthandling.Profile CycleState *plugin.CycleState Request *requesthandling.InferenceRequest From 63991fd22deff1e53eb6b0c6f93ebe905ee45dea Mon Sep 17 00:00:00 2001 From: Noy Itzikowitz Date: Thu, 25 Jun 2026 13:56:28 -0700 Subject: [PATCH 5/6] fix: chunk mutation via InferenceResponse, TTFT metric for both paths - Add CurrentChunk/SetChunk/ChunkMutated to InferenceResponse so chunk processors can read and mutate chunk content through the response object rather than a pass-by-value string parameter. - HandleResponseChunk sets ResetChunkState before plugins run and uses the (potentially mutated) chunk in buildStreamedChunkResponse. - Move TTFT metric recording after the if/else block so it fires for both buffered and streaming response paths. Change continue to break in the buffering accumulation branch so the metric code is reachable. Signed-off-by: Noy Itzikowitz --- .../interface/requesthandling/types.go | 24 +++++++++++++++++++ pkg/handlers/response.go | 8 ++++++- pkg/handlers/server.go | 3 ++- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/pkg/framework/interface/requesthandling/types.go b/pkg/framework/interface/requesthandling/types.go index 271c56e6..43c70fbb 100644 --- a/pkg/framework/interface/requesthandling/types.go +++ b/pkg/framework/interface/requesthandling/types.go @@ -93,6 +93,30 @@ type InferenceRequest struct { type InferenceResponse struct { InferenceMessage + + // CurrentChunk holds the current response body chunk during streaming. + // Set by the framework before calling ResponseChunkProcessor plugins. + // Plugins can read or mutate this field; the framework uses the final + // value when building the ext_proc response. + CurrentChunk string + chunkMutated bool +} + +// SetChunk sets the current chunk content and marks it as mutated. +func (r *InferenceResponse) SetChunk(chunk string) { + r.CurrentChunk = chunk + r.chunkMutated = true +} + +// ChunkMutated reports whether any plugin modified the chunk via SetChunk. +func (r *InferenceResponse) ChunkMutated() bool { + return r.chunkMutated +} + +// ResetChunkState prepares the response for a new chunk processing cycle. +func (r *InferenceResponse) ResetChunkState(chunk string) { + r.CurrentChunk = chunk + r.chunkMutated = false } // NewInferenceRequest returns a new request with initialized Headers, Body, and mutatedHeaders. diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index 1942abfb..9dc6056d 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -142,13 +142,19 @@ func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext logger := log.FromContext(ctx).V(logutil.DEFAULT) chunk := string(chunkBytes) + reqCtx.Response.ResetChunkState(chunk) if err := s.runResponseChunkProcessors(ctx, reqCtx.CycleState, reqCtx.Response, chunk, endOfStream, reqCtx.Profile.ResponseChunkProcessors); err != nil { logger.Error(err, "Failed to run response chunk processors") return nil, err } - return s.buildStreamedChunkResponse(reqCtx, chunkBytes, endOfStream), nil + outBytes := chunkBytes + if reqCtx.Response.ChunkMutated() { + outBytes = []byte(reqCtx.Response.CurrentChunk) + } + + return s.buildStreamedChunkResponse(reqCtx, outBytes, endOfStream), nil } // runResponseChunkProcessors executes chunk processors in the order they were registered. diff --git a/pkg/handlers/server.go b/pkg/handlers/server.go index 2010ab2d..b4288726 100644 --- a/pkg/handlers/server.go +++ b/pkg/handlers/server.go @@ -181,7 +181,8 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { if reqCtx.Profile.NeedsResponseBuffering { responseBody = append(responseBody, v.ResponseBody.Body...) if !v.ResponseBody.EndOfStream { - continue + // Keep accumulating — don't send responses or record metrics yet. + break } responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody) loggerVerbose.Info("processing response body complete") From d793ccbcf774047d66e6791e632b97d971ec9a4e Mon Sep 17 00:00:00 2001 From: Noy Itzikowitz Date: Thu, 25 Jun 2026 17:43:01 -0700 Subject: [PATCH 6/6] fix: chain chunk mutations across plugins, add chunk mutation tests - runResponseChunkProcessors passes response.CurrentChunk to each plugin so mutations from earlier plugins are visible to later ones. - Add unit tests for SetChunk, ChunkMutated, ResetChunkState. Signed-off-by: Noy Itzikowitz --- .../interface/requesthandling/types_test.go | 42 +++++++++++++++++++ pkg/handlers/response.go | 4 +- 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/pkg/framework/interface/requesthandling/types_test.go b/pkg/framework/interface/requesthandling/types_test.go index f9c4843d..49fcb1f7 100644 --- a/pkg/framework/interface/requesthandling/types_test.go +++ b/pkg/framework/interface/requesthandling/types_test.go @@ -90,6 +90,48 @@ func TestSetBody(t *testing.T) { } } +func TestChunkMutation(t *testing.T) { + resp := NewInferenceResponse() + + if resp.ChunkMutated() { + t.Error("new InferenceResponse should not be marked as chunk-mutated") + } + + resp.ResetChunkState("original chunk") + if resp.CurrentChunk != "original chunk" { + t.Errorf("CurrentChunk = %q; want %q", resp.CurrentChunk, "original chunk") + } + if resp.ChunkMutated() { + t.Error("ResetChunkState should not mark chunk as mutated") + } + + resp.SetChunk("modified chunk") + if resp.CurrentChunk != "modified chunk" { + t.Errorf("CurrentChunk = %q; want %q", resp.CurrentChunk, "modified chunk") + } + if !resp.ChunkMutated() { + t.Error("expected ChunkMutated() to return true after SetChunk") + } +} + +func TestChunkMutation_ResetClearsMutatedFlag(t *testing.T) { + resp := NewInferenceResponse() + resp.ResetChunkState("chunk 1") + resp.SetChunk("mutated chunk 1") + + if !resp.ChunkMutated() { + t.Error("expected ChunkMutated() true after SetChunk") + } + + resp.ResetChunkState("chunk 2") + if resp.ChunkMutated() { + t.Error("ResetChunkState should clear the mutated flag") + } + if resp.CurrentChunk != "chunk 2" { + t.Errorf("CurrentChunk = %q; want %q", resp.CurrentChunk, "chunk 2") + } +} + func TestBodyMutated_FalseByDefault(t *testing.T) { req := NewInferenceRequest() if req.BodyMutated() { diff --git a/pkg/handlers/response.go b/pkg/handlers/response.go index 9dc6056d..e3830cf2 100644 --- a/pkg/handlers/response.go +++ b/pkg/handlers/response.go @@ -158,6 +158,8 @@ func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext } // runResponseChunkProcessors executes chunk processors in the order they were registered. +// Each plugin receives response.CurrentChunk so mutations from earlier plugins are visible +// to later ones in the chain. func (s *Server) runResponseChunkProcessors(ctx context.Context, cycleState *plugin.CycleState, response *requesthandling.InferenceResponse, chunk string, isFinal bool, processors []requesthandling.ResponseChunkProcessor) error { logger := log.FromContext(ctx).V(logutil.DEFAULT) verboseLogger := logger.V(logutil.VERBOSE) @@ -167,7 +169,7 @@ func (s *Server) runResponseChunkProcessors(ctx context.Context, cycleState *plu verboseLogger.Info("Executing response chunk plugin", "plugin", cp.TypedName()) } before := time.Now() - err := cp.ProcessResponseChunk(ctx, cycleState, response, chunk, isFinal) + err := cp.ProcessResponseChunk(ctx, cycleState, response, response.CurrentChunk, isFinal) metrics.RecordPluginProcessingLatency(responsePluginExtensionPoint, cp.TypedName().Type, cp.TypedName().Name, time.Since(before)) if err != nil { return err