Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions pkg/config/loader/configloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,7 @@ func buildProfiles(rawProfiles []configapi.Profile, handle plugin.Handle) (map[s
return nil, fmt.Errorf("the profile %s must have one or both of the Request and Response sections", rawProfile.Name)
}

theProfile := requesthandling.Profile{
ResponsePlugins: make([]requesthandling.ResponseProcessor, len(rawProfile.Plugins.Response)),
}
theProfile := requesthandling.Profile{}

for _, pluginRef := range rawProfile.Plugins.Request {
rawPlugin := handle.Plugin(pluginRef.PluginRef)
Expand Down Expand Up @@ -245,17 +243,25 @@ func buildProfiles(rawProfiles []configapi.Profile, handle plugin.Handle) (map[s
}
}

for idx, pluginRef := range rawProfile.Plugins.Response {
for _, pluginRef := range rawProfile.Plugins.Response {
rawPlugin := handle.Plugin(pluginRef.PluginRef)
if rawPlugin == nil {
return nil, fmt.Errorf("there is no plugin named %s", pluginRef.PluginRef)
}
thePlugin, ok := rawPlugin.(requesthandling.ResponseProcessor)
if !ok {
return nil, fmt.Errorf("the plugin named %s is not a ResponseProcessor", pluginRef.PluginRef)
if bodyPlugin, ok := rawPlugin.(requesthandling.ResponseProcessor); ok {
theProfile.ResponsePlugins = append(theProfile.ResponsePlugins, bodyPlugin)
continue
}
theProfile.ResponsePlugins[idx] = thePlugin
if chunkPlugin, ok := rawPlugin.(requesthandling.ResponseChunkProcessor); ok {
theProfile.ResponseChunkProcessors = append(theProfile.ResponseChunkProcessors, chunkPlugin)
continue
}
return nil, fmt.Errorf("the plugin named %s is not a ResponseProcessor nor ResponseChunkProcessor", pluginRef.PluginRef)
}
if len(theProfile.ResponsePlugins) > 0 && len(theProfile.ResponseChunkProcessors) > 0 {
return nil, fmt.Errorf("profile %s mixes ResponseProcessor and ResponseChunkProcessor plugins — a profile must use one type exclusively", rawProfile.Name)
}
theProfile.NeedsResponseBuffering = len(theProfile.ResponsePlugins) > 0

profiles[rawProfile.Name] = &theProfile
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/framework/interface/requesthandling/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,23 @@ type RequestProcessor interface {
ProcessRequest(ctx context.Context, cycleState *plugin.CycleState, request *InferenceRequest) error
}

// ResponseProcessor processes the complete buffered response body.
// If any plugin in a profile implements this interface, the framework buffers
// the entire response before calling ProcessResponse on each such plugin.
type ResponseProcessor interface {
plugin.Plugin
// ProcessResponse runs the ResponseProcessor plugin.
// ResponseProcessor can mutate the headers and/or the body of the response.
ProcessResponse(ctx context.Context, cycleState *plugin.CycleState, response *InferenceResponse) error
}

// ResponseChunkProcessor processes individual response body chunks as they
// stream through without buffering. The framework converts the raw chunk bytes
// to a string once and passes it to all chunk processors. Plugins receive the
// InferenceResponse to allow header mutation.
type ResponseChunkProcessor interface {
plugin.Plugin
ProcessResponseChunk(ctx context.Context, cycleState *plugin.CycleState, response *InferenceResponse, chunk string, isFinal bool) error
}

type PostProcessor interface {
plugin.Plugin

Expand Down
33 changes: 31 additions & 2 deletions pkg/framework/interface/requesthandling/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,30 @@ type InferenceRequest struct {

type InferenceResponse struct {
InferenceMessage

// CurrentChunk holds the current response body chunk during streaming.
// Set by the framework before calling ResponseChunkProcessor plugins.
// Plugins can read or mutate this field; the framework uses the final
// value when building the ext_proc response.
CurrentChunk string
chunkMutated bool
}

// SetChunk sets the current chunk content and marks it as mutated.
func (r *InferenceResponse) SetChunk(chunk string) {
r.CurrentChunk = chunk
r.chunkMutated = true
}

// ChunkMutated reports whether any plugin modified the chunk via SetChunk.
func (r *InferenceResponse) ChunkMutated() bool {
return r.chunkMutated
}

// ResetChunkState prepares the response for a new chunk processing cycle.
func (r *InferenceResponse) ResetChunkState(chunk string) {
r.CurrentChunk = chunk
r.chunkMutated = false
}

// NewInferenceRequest returns a new request with initialized Headers, Body, and mutatedHeaders.
Expand All @@ -114,9 +138,14 @@ type Profile struct {
// RequestPlugins are the request processing plugin instances executed by the request handler,
// in the same order provided in the configuration file.
RequestPlugins []RequestProcessor
// ResponsePlugins are the response processing plugin instances executed by the response handler,
// in the same order provided in the configuration file.
// ResponsePlugins process the complete buffered response body.
ResponsePlugins []ResponseProcessor
// ResponseChunkProcessors process individual response chunks without buffering.
ResponseChunkProcessors []ResponseChunkProcessor
// NeedsResponseBuffering is true when any ResponsePlugin is present.
// The framework uses this to decide whether to buffer the full response body
// or stream chunks through ResponseChunkProcessors.
NeedsResponseBuffering bool
// ModelSelectorPlugins are the Filter, Scorer (including WeightedScorer), and Picker plugin
// instances to be wired into any model-selector plugin present in RequestPlugins.
ModelSelectorPlugins []plugin.Plugin
Expand Down
42 changes: 42 additions & 0 deletions pkg/framework/interface/requesthandling/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,48 @@ func TestSetBody(t *testing.T) {
}
}

func TestChunkMutation(t *testing.T) {
resp := NewInferenceResponse()

if resp.ChunkMutated() {
t.Error("new InferenceResponse should not be marked as chunk-mutated")
}

resp.ResetChunkState("original chunk")
if resp.CurrentChunk != "original chunk" {
t.Errorf("CurrentChunk = %q; want %q", resp.CurrentChunk, "original chunk")
}
if resp.ChunkMutated() {
t.Error("ResetChunkState should not mark chunk as mutated")
}

resp.SetChunk("modified chunk")
if resp.CurrentChunk != "modified chunk" {
t.Errorf("CurrentChunk = %q; want %q", resp.CurrentChunk, "modified chunk")
}
if !resp.ChunkMutated() {
t.Error("expected ChunkMutated() to return true after SetChunk")
}
}

func TestChunkMutation_ResetClearsMutatedFlag(t *testing.T) {
resp := NewInferenceResponse()
resp.ResetChunkState("chunk 1")
resp.SetChunk("mutated chunk 1")

if !resp.ChunkMutated() {
t.Error("expected ChunkMutated() true after SetChunk")
}

resp.ResetChunkState("chunk 2")
if resp.ChunkMutated() {
t.Error("ResetChunkState should clear the mutated flag")
}
if resp.CurrentChunk != "chunk 2" {
t.Errorf("CurrentChunk = %q; want %q", resp.CurrentChunk, "chunk 2")
}
}

func TestBodyMutated_FalseByDefault(t *testing.T) {
req := NewInferenceRequest()
if req.BodyMutated() {
Expand Down
83 changes: 83 additions & 0 deletions pkg/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,89 @@
return responses
}

// HandleResponseChunk runs ResponseChunkProcessors on a single response body chunk
// and wraps the result in the ext_proc streaming response format.
func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext, chunkBytes []byte, endOfStream bool) ([]*eppb.ProcessingResponse, error) {

@nirrozenbaum nirrozenbaum Jun 25, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something is still wrong in this function..
chunk processor plugins cannot mutate the chunk content, and even if they do it's ignored in L151.
also, I've noticed that BodyMutation is always used even if chunk hasn't changed, which is an expensive operation in envoy..

we should also probably distinguish between the case when chunk was mutated and the case when it was passed through.
we can improve that last point on a follow up PR, leaving mutation always works for now.
but the first point is functionality wise wrong.

// Bodiless requests (e.g., GET /v1/models) may not have a profile set.
if reqCtx.Profile == nil || len(reqCtx.Profile.ResponseChunkProcessors) == 0 {
return s.buildStreamedChunkResponse(reqCtx, chunkBytes, endOfStream), nil
}

logger := log.FromContext(ctx).V(logutil.DEFAULT)

chunk := string(chunkBytes)
reqCtx.Response.ResetChunkState(chunk)

if err := s.runResponseChunkProcessors(ctx, reqCtx.CycleState, reqCtx.Response, chunk, endOfStream, reqCtx.Profile.ResponseChunkProcessors); err != nil {
logger.Error(err, "Failed to run response chunk processors")
return nil, err
}

outBytes := chunkBytes
if reqCtx.Response.ChunkMutated() {
outBytes = []byte(reqCtx.Response.CurrentChunk)
}

return s.buildStreamedChunkResponse(reqCtx, outBytes, endOfStream), nil
}

// runResponseChunkProcessors executes chunk processors in the order they were registered.
// Each plugin receives response.CurrentChunk so mutations from earlier plugins are visible
// to later ones in the chain.
func (s *Server) runResponseChunkProcessors(ctx context.Context, cycleState *plugin.CycleState, response *requesthandling.InferenceResponse, chunk string, isFinal bool, processors []requesthandling.ResponseChunkProcessor) error {
logger := log.FromContext(ctx).V(logutil.DEFAULT)

Check failure on line 164 in pkg/handlers/response.go

View workflow job for this annotation

GitHub Actions / lint-and-test

(*Server).runResponseChunkProcessors - chunk is unused (unparam)
verboseLogger := logger.V(logutil.VERBOSE)

Comment thread
nirrozenbaum marked this conversation as resolved.
for _, cp := range processors {
if verboseLogger.Enabled() {
verboseLogger.Info("Executing response chunk plugin", "plugin", cp.TypedName())
}
before := time.Now()
err := cp.ProcessResponseChunk(ctx, cycleState, response, response.CurrentChunk, isFinal)
metrics.RecordPluginProcessingLatency(responsePluginExtensionPoint, cp.TypedName().Type, cp.TypedName().Name, time.Since(before))
if err != nil {
return err
}
}
return nil
}

// buildStreamedChunkResponse wraps a chunk in the ext_proc streaming response format.
// On the first call (responseHeadersSent=false), it prepends a HeadersResponse to answer
// the deferred response headers — envoy requires this before it accepts body responses.
func (s *Server) buildStreamedChunkResponse(reqCtx *RequestContext, chunk []byte, endOfStream bool) []*eppb.ProcessingResponse {
responses := []*eppb.ProcessingResponse{
{
Response: &eppb.ProcessingResponse_ResponseBody{
ResponseBody: &eppb.BodyResponse{
Response: &eppb.CommonResponse{
BodyMutation: &eppb.BodyMutation{
Mutation: &eppb.BodyMutation_StreamedResponse{
StreamedResponse: &eppb.StreamedBodyResponse{
Body: chunk,
EndOfStream: endOfStream,
},
},
},
},
},
},
},
}

if !reqCtx.ResponseHeadersSent {
headerResp := &eppb.ProcessingResponse{
Response: &eppb.ProcessingResponse_ResponseHeaders{
ResponseHeaders: &eppb.HeadersResponse{},
},
}
responses = append([]*eppb.ProcessingResponse{headerResp}, responses...)
reqCtx.ResponseHeadersSent = true
}

return responses
}

// HandleResponseTrailers handles response trailers.
func (s *Server) HandleResponseTrailers(trailers *eppb.HttpTrailers) ([]*eppb.ProcessingResponse, error) {
return []*eppb.ProcessingResponse{
Expand Down
27 changes: 19 additions & 8 deletions pkg/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type RequestContext struct {
RequestSentTimestamp time.Time
ResponseFirstChunkTimestamp time.Time
ResponseCompleteTimestamp time.Time
ResponseHeadersSent bool
Profile *requesthandling.Profile
CycleState *plugin.CycleState
Request *requesthandling.InferenceRequest
Expand Down Expand Up @@ -176,15 +177,25 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
if reqCtx.ResponseFirstChunkTimestamp.IsZero() {
reqCtx.ResponseFirstChunkTimestamp = time.Now()
}
responseBody = append(responseBody, v.ResponseBody.Body...)
if !v.ResponseBody.EndOfStream {
continue

if reqCtx.Profile.NeedsResponseBuffering {
responseBody = append(responseBody, v.ResponseBody.Body...)
if !v.ResponseBody.EndOfStream {
// Keep accumulating — don't send responses or record metrics yet.
break
}
responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody)
loggerVerbose.Info("processing response body complete")
} else {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now this looks nice and clean :)

responses, err = s.HandleResponseChunk(ctx, reqCtx, v.ResponseBody.Body, v.ResponseBody.EndOfStream)
loggerVerbose.Info("response chunk processing complete")
}

if v.ResponseBody.EndOfStream {
reqCtx.ResponseCompleteTimestamp = time.Now()
model, _ := reqCtx.Request.Body["model"].(string)
metrics.RecordRequestTTFT(model, reqCtx.ResponseFirstChunkTimestamp.Sub(reqCtx.RequestReceivedTimestamp))
}
reqCtx.ResponseCompleteTimestamp = time.Now()
model, _ := reqCtx.Request.Body["model"].(string)
metrics.RecordRequestTTFT(model, reqCtx.ResponseFirstChunkTimestamp.Sub(reqCtx.RequestReceivedTimestamp))
responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody)
loggerVerbose.Info("processing response body complete")
case *extProcPb.ProcessingRequest_ResponseTrailers:
responses, err = s.HandleResponseTrailers(v.ResponseTrailers)
default:
Expand Down
2 changes: 2 additions & 0 deletions pkg/handlers/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func TestHandleResponseBody_Streaming(t *testing.T) {
wantFullBody := []byte(`{"choices":[{"text":"Hello!"}]}`)

profiles := newTestProfiles()
profiles[testProfileName].NeedsResponseBuffering = true

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add in a follow up unit tests to cover the new funcionality?

  • mix of body response plugins + chunk response fails.
  • setting profile with chunks only perform streaming.
  • setting profile with full only is buffering.
  • setting profile with no plugins performs streaming.

not a blocker for this PR (can be pushed in follow up).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — will add in a follow-up PR.

ref := newServerForTest(profiles)
want, err := ref.HandleResponseBody(ctx, newTestRequestContext(profiles), wantFullBody)
if err != nil {
Expand Down Expand Up @@ -164,6 +165,7 @@ func TestHandleResponseBody_Streaming(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
streamCtx, cancel := context.WithCancel(logutil.NewTestLoggerIntoContext(context.Background()))
profiles := newTestProfiles()
profiles[testProfileName].NeedsResponseBuffering = true
srv := newServerForTest(profiles)
testListener, errChan := utils.SetupTestStreamingServer(t, streamCtx, srv)
process, conn := utils.GetStreamingServerClient(streamCtx, t)
Expand Down
Loading