From ee57d3bbd3e33aefe62d05b9e08035ebfda35aaf Mon Sep 17 00:00:00 2001 From: RishabhSaini Date: Mon, 30 Mar 2026 14:18:46 -0400 Subject: [PATCH 1/5] Add ImmediateResponse abort mechanism for evicting in-flight requests Integrate ext_proc ImmediateResponse as the abort mechanism for evicting dispatched requests from vLLM. When eviction triggers, closing the request's abort channel causes the Process() loop to send ImmediateResponse(503), making Envoy reset the upstream connection. vLLM detects the disconnect and frees KV blocks. - AbortRegistry bridges eviction plugin and ext_proc Process() loop - ImmediateResponseAborter closes abort channels via sync.Once - recvOrAbort wraps srv.Recv() to select on abort signals - Tests cover aborter, registry, recvOrAbort, plugin integration, and concurrent eviction+completion races Signed-off-by: RishabhSaini --- .../flowcontrol/eviction/abort_registry.go | 63 ++++++ .../eviction/abort_registry_test.go | 86 ++++++++ pkg/epp/flowcontrol/eviction/aborter.go | 36 +++- pkg/epp/flowcontrol/eviction/aborter_test.go | 93 +++++++++ pkg/epp/flowcontrol/eviction/plugin.go | 22 +- pkg/epp/flowcontrol/eviction/plugin_test.go | 188 ++++++++++++++++++ .../interface/flowcontrol/eviction.go | 3 + pkg/epp/handlers/server.go | 93 ++++++++- pkg/epp/handlers/server_abort_test.go | 167 ++++++++++++++++ 9 files changed, 740 insertions(+), 11 deletions(-) create mode 100644 pkg/epp/flowcontrol/eviction/abort_registry.go create mode 100644 pkg/epp/flowcontrol/eviction/abort_registry_test.go create mode 100644 pkg/epp/flowcontrol/eviction/aborter_test.go create mode 100644 pkg/epp/flowcontrol/eviction/plugin_test.go create mode 100644 pkg/epp/handlers/server_abort_test.go diff --git a/pkg/epp/flowcontrol/eviction/abort_registry.go b/pkg/epp/flowcontrol/eviction/abort_registry.go new file mode 100644 index 0000000000..1fc02b61fd --- /dev/null +++ b/pkg/epp/flowcontrol/eviction/abort_registry.go @@ -0,0 +1,63 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eviction + +import "sync" + +// AbortRegistry is a shared registry that maps request IDs to abort channels. +// It bridges the eviction plugin (which decides what to evict) and the ext_proc Process() +// goroutine (which owns the stream needed to send ImmediateResponse). +// +// Lifecycle: +// - PreRequest: plugin creates an abort channel and registers it via Register(). +// - Process(): after HandleRequest returns, looks up the channel via Get() and selects on it. +// - EvictN: aborter closes the channel via the EvictionItem.AbortCh reference. +// - Process() defer: removes the channel via Deregister(). +// +// All methods are goroutine-safe. +type AbortRegistry struct { + mu sync.RWMutex + channels map[string]chan struct{} // requestID → abort channel +} + +// NewAbortRegistry creates a new AbortRegistry. +func NewAbortRegistry() *AbortRegistry { + return &AbortRegistry{ + channels: make(map[string]chan struct{}), + } +} + +// Register stores an abort channel for the given request ID. +func (r *AbortRegistry) Register(requestID string, ch chan struct{}) { + r.mu.Lock() + defer r.mu.Unlock() + r.channels[requestID] = ch +} + +// Get returns the abort channel for the given request ID, or nil if not found. +func (r *AbortRegistry) Get(requestID string) chan struct{} { + r.mu.RLock() + defer r.mu.RUnlock() + return r.channels[requestID] +} + +// Deregister removes the abort channel for the given request ID. +func (r *AbortRegistry) Deregister(requestID string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.channels, requestID) +} diff --git a/pkg/epp/flowcontrol/eviction/abort_registry_test.go b/pkg/epp/flowcontrol/eviction/abort_registry_test.go new file mode 100644 index 0000000000..40ea09aa70 --- /dev/null +++ b/pkg/epp/flowcontrol/eviction/abort_registry_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eviction + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAbortRegistry_RegisterAndGet(t *testing.T) { + t.Parallel() + r := NewAbortRegistry() + + ch := make(chan struct{}) + r.Register("req-1", ch) + + got := r.Get("req-1") + assert.Equal(t, ch, got, "Get should return the registered channel") + + assert.Nil(t, r.Get("non-existent"), "Get for unregistered ID should return nil") +} + +func TestAbortRegistry_Deregister(t *testing.T) { + t.Parallel() + r := NewAbortRegistry() + + ch := make(chan struct{}) + r.Register("req-1", ch) + r.Deregister("req-1") + + assert.Nil(t, r.Get("req-1"), "Get after Deregister should return nil") + + // Deregister non-existent should not panic. + r.Deregister("non-existent") +} + +func TestAbortRegistry_Concurrency(t *testing.T) { + t.Parallel() + r := NewAbortRegistry() + + const goroutines = 10 + const opsPerGoroutine = 100 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for g := range goroutines { + go func(id int) { + defer wg.Done() + for i := range opsPerGoroutine { + reqID := fmt.Sprintf("req-%d-%d", id, i) + ch := make(chan struct{}) + + switch i % 3 { + case 0: + r.Register(reqID, ch) + case 1: + r.Register(reqID, ch) + r.Get(reqID) + r.Deregister(reqID) + case 2: + r.Get(reqID) + } + } + }(g) + } + + wg.Wait() +} diff --git a/pkg/epp/flowcontrol/eviction/aborter.go b/pkg/epp/flowcontrol/eviction/aborter.go index 31d3bb2ae2..84dc1c5649 100644 --- a/pkg/epp/flowcontrol/eviction/aborter.go +++ b/pkg/epp/flowcontrol/eviction/aborter.go @@ -18,6 +18,8 @@ package eviction import ( "context" + "fmt" + "sync" "sigs.k8s.io/controller-runtime/pkg/log" @@ -31,9 +33,6 @@ type Aborter interface { } // NoOpAborter logs the eviction but does not abort the request on the model server. -// This is the default aborter until a general-purpose abort mechanism is available -// (e.g., vLLM exposes /v1/abort_requests for all request types, or Envoy ext_proc -// supports downstream connection termination). type NoOpAborter struct{} func (a *NoOpAborter) Abort(ctx context.Context, item *flowcontrol.EvictionItem) error { @@ -43,3 +42,34 @@ func (a *NoOpAborter) Abort(ctx context.Context, item *flowcontrol.EvictionItem) "targetURL", item.TargetURL) return nil } + +// ImmediateResponseAborter aborts requests by closing the EvictionItem's AbortCh. +// The ext_proc Process() goroutine selects on this channel and sends an ImmediateResponse +// to Envoy when it is closed, causing Envoy to reset the upstream connection to the model server. +type ImmediateResponseAborter struct { + // closeOnce tracks which channels have been closed to prevent double-close panics. + closeOnce sync.Map // requestID → *sync.Once +} + +// NewImmediateResponseAborter creates an ImmediateResponseAborter. +func NewImmediateResponseAborter() *ImmediateResponseAborter { + return &ImmediateResponseAborter{} +} + +func (a *ImmediateResponseAborter) Abort(ctx context.Context, item *flowcontrol.EvictionItem) error { + if item.AbortCh == nil { + return fmt.Errorf("eviction item %s has no abort channel", item.RequestID) + } + + // Use sync.Once to safely close the channel exactly once. + once, _ := a.closeOnce.LoadOrStore(item.RequestID, &sync.Once{}) + once.(*sync.Once).Do(func() { + close(item.AbortCh) + }) + + log.FromContext(ctx).Info("Abort signal sent", + "requestID", item.RequestID, + "priority", item.Priority, + "targetURL", item.TargetURL) + return nil +} diff --git a/pkg/epp/flowcontrol/eviction/aborter_test.go b/pkg/epp/flowcontrol/eviction/aborter_test.go new file mode 100644 index 0000000000..d3244e364e --- /dev/null +++ b/pkg/epp/flowcontrol/eviction/aborter_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eviction + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol" +) + +func TestImmediateResponseAborter_ClosesChannel(t *testing.T) { + t.Parallel() + aborter := NewImmediateResponseAborter() + + abortCh := make(chan struct{}) + item := &flowcontrol.EvictionItem{ + RequestID: "req-1", + AbortCh: abortCh, + } + + err := aborter.Abort(context.Background(), item) + require.NoError(t, err) + + // Channel should be closed. + select { + case <-abortCh: + // success — channel is closed + default: + t.Fatal("abort channel should be closed after Abort()") + } +} + +func TestImmediateResponseAborter_DoubleAbortSafe(t *testing.T) { + t.Parallel() + aborter := NewImmediateResponseAborter() + + abortCh := make(chan struct{}) + item := &flowcontrol.EvictionItem{ + RequestID: "req-1", + AbortCh: abortCh, + } + + // First abort should succeed. + err := aborter.Abort(context.Background(), item) + require.NoError(t, err) + + // Second abort on same request should not panic. + err = aborter.Abort(context.Background(), item) + require.NoError(t, err) +} + +func TestImmediateResponseAborter_NilChannel(t *testing.T) { + t.Parallel() + aborter := NewImmediateResponseAborter() + + item := &flowcontrol.EvictionItem{ + RequestID: "req-1", + AbortCh: nil, + } + + err := aborter.Abort(context.Background(), item) + assert.Error(t, err, "Abort with nil channel should return error") +} + +func TestNoOpAborter(t *testing.T) { + t.Parallel() + aborter := &NoOpAborter{} + + item := &flowcontrol.EvictionItem{ + RequestID: "req-1", + } + + err := aborter.Abort(context.Background(), item) + assert.NoError(t, err, "NoOpAborter should always succeed") +} diff --git a/pkg/epp/flowcontrol/eviction/plugin.go b/pkg/epp/flowcontrol/eviction/plugin.go index 1857ccdca4..dcca42898e 100644 --- a/pkg/epp/flowcontrol/eviction/plugin.go +++ b/pkg/epp/flowcontrol/eviction/plugin.go @@ -37,8 +37,9 @@ var _ requestcontrol.ResponseBodyProcessor = &Plugin{} // Plugin tracks in-flight requests via RequestControl hooks and provides eviction capability. type Plugin struct { - queue *EvictionQueue - aborter Aborter + queue *EvictionQueue + aborter Aborter + abortRegistry *AbortRegistry } // NewPlugin creates an eviction plugin with the given policies and aborter. @@ -48,11 +49,18 @@ func NewPlugin( aborter Aborter, ) *Plugin { return &Plugin{ - queue: NewEvictionQueue(ordering, filter), - aborter: aborter, + queue: NewEvictionQueue(ordering, filter), + aborter: aborter, + abortRegistry: NewAbortRegistry(), } } +// AbortRegistry returns the shared abort registry. +// The ext_proc Process() goroutine uses this to look up abort channels for dispatched requests. +func (p *Plugin) AbortRegistry() *AbortRegistry { + return p.abortRegistry +} + func (p *Plugin) TypedName() plugin.TypedName { return plugin.TypedName{Type: "EvictionPlugin", Name: "eviction"} } @@ -80,6 +88,8 @@ func (p *Plugin) PreRequest( return } + abortCh := make(chan struct{}) + item := &flowcontrol.EvictionItem{ RequestID: requestID, Priority: request.Objectives.Priority, @@ -87,9 +97,11 @@ func (p *Plugin) PreRequest( TargetURL: "http://" + net.JoinHostPort(metadata.GetIPAddress(), metadata.GetPort()), Request: request, TargetEndpoint: metadata, + AbortCh: abortCh, } p.queue.Track(item) + p.abortRegistry.Register(requestID, abortCh) // Bind untrack to the request context's lifetime as a safety net. // If the client disconnects and ResponseBody(EndOfStream) never fires, @@ -97,6 +109,7 @@ func (p *Plugin) PreRequest( go func() { <-ctx.Done() p.queue.Untrack(requestID) + p.abortRegistry.Deregister(requestID) }() log.FromContext(ctx).V(logutil.DEBUG).Info("Tracked in-flight request", @@ -126,6 +139,7 @@ func (p *Plugin) ResponseBody( } p.queue.Untrack(requestID) + p.abortRegistry.Deregister(requestID) log.FromContext(ctx).V(logutil.DEBUG).Info("Untracked completed request", "requestID", requestID, diff --git a/pkg/epp/flowcontrol/eviction/plugin_test.go b/pkg/epp/flowcontrol/eviction/plugin_test.go new file mode 100644 index 0000000000..de9a976228 --- /dev/null +++ b/pkg/epp/flowcontrol/eviction/plugin_test.go @@ -0,0 +1,188 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eviction + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + reqcommon "sigs.k8s.io/gateway-api-inference-extension/pkg/common/request" + fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/requestcontrol" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling" +) + +// --- Test helpers --- + +func makeSchedulingResult() *scheduling.SchedulingResult { + endpoint := scheduling.NewEndpoint( + &fwkdl.EndpointMetadata{Address: "10.0.0.1", Port: "8000"}, + nil, nil, + ) + return &scheduling.SchedulingResult{ + PrimaryProfileName: "default", + ProfileResults: map[string]*scheduling.ProfileRunResult{ + "default": { + TargetEndpoints: []scheduling.Endpoint{endpoint}, + }, + }, + } +} + +func makeLLMRequest(requestID string, priority int) *scheduling.LLMRequest { //nolint:unparam + return &scheduling.LLMRequest{ + RequestId: requestID, + Headers: map[string]string{ + reqcommon.RequestIdHeaderKey: requestID, + }, + Objectives: scheduling.RequestObjectives{Priority: priority}, + } +} + +// --- Tests --- + +func TestPlugin_PreRequest_CreatesAbortChannel(t *testing.T) { + t.Parallel() + p := NewPlugin(&testOrdering{}, &acceptAllFilter{}, &NoOpAborter{}) + + ctx := context.Background() + p.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) + + // Verify the abort channel is registered. + abortCh := p.AbortRegistry().Get("req-1") + require.NotNil(t, abortCh, "AbortCh should be registered after PreRequest") + + // Verify tracked in queue. + assert.Equal(t, 1, p.queue.InFlightLen()) + assert.Equal(t, 1, p.queue.EvictableLen()) +} + +func TestPlugin_ResponseBody_DeregistersAbortChannel(t *testing.T) { + t.Parallel() + p := NewPlugin(&testOrdering{}, &acceptAllFilter{}, &NoOpAborter{}) + + ctx := context.Background() + request := makeLLMRequest("req-1", -1) + p.PreRequest(ctx, request, makeSchedulingResult()) + require.NotNil(t, p.AbortRegistry().Get("req-1")) + + // Complete the request. + p.ResponseBody(ctx, request, &requestcontrol.Response{EndOfStream: true}, nil) + + assert.Nil(t, p.AbortRegistry().Get("req-1"), "AbortCh should be deregistered after completion") + assert.Equal(t, 0, p.queue.InFlightLen()) +} + +func TestPlugin_EvictN_ClosesAbortChannel(t *testing.T) { + t.Parallel() + aborter := NewImmediateResponseAborter() + p := NewPlugin(&testOrdering{}, &acceptAllFilter{}, aborter) + + ctx := context.Background() + p.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) + + // Grab the channel before eviction. + abortCh := p.AbortRegistry().Get("req-1") + require.NotNil(t, abortCh) + + // Evict. + aborted, err := p.EvictN(ctx, 1) + require.NoError(t, err) + require.Equal(t, []string{"req-1"}, aborted) + + // Channel should be closed. + select { + case <-abortCh: + // success + default: + t.Fatal("abort channel should be closed after EvictN") + } +} + +func TestPlugin_EvictN_ReTracksOnAbortFailure(t *testing.T) { + t.Parallel() + p := NewPlugin(&testOrdering{}, &acceptAllFilter{}, &failingAborter{}) + + ctx := context.Background() + p.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) + + aborted, err := p.EvictN(ctx, 1) + require.NoError(t, err) + assert.Empty(t, aborted) + + // Item should be re-tracked. + assert.Equal(t, 1, p.queue.EvictableLen()) +} + +func TestPlugin_RaceBetweenEvictAndCompletion(t *testing.T) { + t.Parallel() + aborter := NewImmediateResponseAborter() + p := NewPlugin(&testOrdering{}, &acceptAllFilter{}, aborter) + + ctx := context.Background() + + // Track multiple requests. + requests := make([]*scheduling.LLMRequest, 10) + for i := range requests { + requests[i] = makeLLMRequest( + "req-"+string(rune('a'+i)), + -1, + ) + p.PreRequest(ctx, requests[i], makeSchedulingResult()) + } + + // Concurrently evict and complete. + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for range 5 { + _, _ = p.EvictN(ctx, 1) + time.Sleep(time.Millisecond) + } + }() + + go func() { + defer wg.Done() + for _, req := range requests { + p.ResponseBody(ctx, req, &requestcontrol.Response{EndOfStream: true}, nil) + time.Sleep(time.Millisecond) + } + }() + + wg.Wait() + + // No panics, no deadlocks. State should be consistent. + inFlight, evictable := p.Stats() + assert.GreaterOrEqual(t, inFlight, 0) + assert.GreaterOrEqual(t, evictable, 0) + assert.GreaterOrEqual(t, inFlight, evictable) +} + +// failingAborter always returns an error. +type failingAborter struct{} + +func (a *failingAborter) Abort(_ context.Context, _ *flowcontrol.EvictionItem) error { + return assert.AnError +} diff --git a/pkg/epp/framework/interface/flowcontrol/eviction.go b/pkg/epp/framework/interface/flowcontrol/eviction.go index 86a4329b29..6111a91020 100644 --- a/pkg/epp/framework/interface/flowcontrol/eviction.go +++ b/pkg/epp/framework/interface/flowcontrol/eviction.go @@ -39,6 +39,9 @@ type EvictionItem struct { Request *scheduling.InferenceRequest // TargetEndpoint is the metadata of the endpoint serving this request. TargetEndpoint *datalayer.EndpointMetadata + // AbortCh is closed by the aborter to signal the ext_proc Process() goroutine + // to send an ImmediateResponse and terminate the request. + AbortCh chan struct{} } // EvictionOrderingPolicy determines which in-flight request gets evicted first. diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index a9d2b74565..97279db7c3 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -19,11 +19,13 @@ package handlers import ( "bytes" "context" + "errors" "io" "strings" "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/go-logr/logr" "github.com/google/uuid" "go.opentelemetry.io/otel" @@ -46,6 +48,14 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/version" ) +// AbortChannelLookup is an optional interface for looking up abort channels by request ID. +// When set on the StreamingServer, the Process() loop will select on the abort channel +// to support eviction of in-flight requests via ext_proc ImmediateResponse. +type AbortChannelLookup interface { + Get(requestID string) chan struct{} + Deregister(requestID string) +} + func NewStreamingServer(datastore Datastore, director Director, parser fwkrh.Parser) *StreamingServer { return &StreamingServer{ director: director, @@ -54,6 +64,11 @@ func NewStreamingServer(datastore Datastore, director Director, parser fwkrh.Par } } +// SetAbortChannelLookup sets the abort channel lookup for eviction support. +func (s *StreamingServer) SetAbortChannelLookup(lookup AbortChannelLookup) { + s.abortLookup = lookup +} + type Director interface { HandleRequest(ctx context.Context, reqCtx *RequestContext, inferenceRequestBody *fwkrh.InferenceRequestBody) (*RequestContext, error) HandleResponseHeader(ctx context.Context, reqCtx *RequestContext) *RequestContext @@ -68,9 +83,10 @@ type Datastore interface { // Server implements the Envoy external processing server. // https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto type StreamingServer struct { - datastore Datastore - director Director - parser fwkrh.Parser + datastore Datastore + director Director + parser fwkrh.Parser + abortLookup AbortChannelLookup // optional, set for eviction support } // RequestContext stores context information during the life time of an HTTP request. @@ -166,12 +182,18 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) } var body []byte + var abortCh chan struct{} // set after HandleRequest if eviction is enabled + var abortRequestID string // Create error handling var as each request should only report once for // error metrics. This doesn't cover the error "Cannot receive stream request" because // such errors might happen even though response is processed. var err error defer func() { + // Clean up abort channel registration on exit. + if s.abortLookup != nil && abortRequestID != "" { + s.abortLookup.Deregister(abortRequestID) + } if reqCtx.ResponseStatusCode != "" { metrics.RecordRequestErrCounter(reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.ResponseStatusCode) } else if err != nil { @@ -198,7 +220,21 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) default: } - req, recvErr := srv.Recv() + // If an abort channel is set (request dispatched + eviction enabled), + // use non-blocking receive to simultaneously listen for eviction signals. + var req *extProcPb.ProcessingRequest + var recvErr error + + if abortCh != nil { + req, recvErr, err = s.recvOrAbort(srv, abortCh) + if err != nil { + // Eviction triggered — ImmediateResponse already sent. + return nil + } + } else { + req, recvErr = srv.Recv() + } + if recvErr == io.EOF || status.Code(recvErr) == codes.Canceled { return nil } @@ -251,6 +287,12 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) break } + // After scheduling, look up the abort channel for eviction support. + if s.abortLookup != nil { + abortRequestID = reqCtx.Request.Headers[reqcommon.RequestIdHeaderKey] + abortCh = s.abortLookup.Get(abortRequestID) + } + if reqCtx.SchedulingRequest != nil && reqCtx.SchedulingRequest.Body != nil { reqCtx.modelServerStreaming = reqCtx.SchedulingRequest.Body.Stream } @@ -333,6 +375,49 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) } } +// recvResult holds the result of a non-blocking srv.Recv() call. +type recvResult struct { + req *extProcPb.ProcessingRequest + err error +} + +// recvOrAbort wraps srv.Recv() in a goroutine and selects between receiving the next +// ext_proc message and an eviction abort signal. If the abort channel is closed, +// it sends an ImmediateResponse(503) to Envoy and returns a sentinel error. +func (s *StreamingServer) recvOrAbort( + srv extProcPb.ExternalProcessor_ProcessServer, + abortCh chan struct{}, +) (*extProcPb.ProcessingRequest, error, error) { + recvCh := make(chan recvResult, 1) + go func() { + req, err := srv.Recv() + recvCh <- recvResult{req: req, err: err} + }() + + select { + case result := <-recvCh: + return result.req, result.err, nil + case <-abortCh: + // Eviction triggered — send ImmediateResponse to reset the upstream connection. + sendErr := srv.Send(&extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_ServiceUnavailable, + }, + Body: []byte("request evicted by flow control"), + }, + }, + }) + if sendErr != nil { + return nil, nil, sendErr + } + return nil, nil, errEvicted + } +} + +var errEvicted = errors.New("request evicted") + // finishResponse ensures all post-response logic, such as metric recording // and state updates, is executed exactly once for the request lifecycle. func (s *StreamingServer) finishResponse(ctx context.Context, reqCtx *RequestContext, body []byte, modelStreaming bool, setEos bool) { diff --git a/pkg/epp/handlers/server_abort_test.go b/pkg/epp/handlers/server_abort_test.go new file mode 100644 index 0000000000..3d2436b2df --- /dev/null +++ b/pkg/epp/handlers/server_abort_test.go @@ -0,0 +1,167 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package handlers + +import ( + "context" + "io" + "testing" + "time" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" +) + +// mockProcessServer implements ExternalProcessor_ProcessServer for testing recvOrAbort. +type mockProcessServer struct { + recvCh chan *extProcPb.ProcessingRequest + sentCh chan *extProcPb.ProcessingResponse + ctx context.Context +} + +func newMockProcessServer(ctx context.Context) *mockProcessServer { + return &mockProcessServer{ + recvCh: make(chan *extProcPb.ProcessingRequest, 1), + sentCh: make(chan *extProcPb.ProcessingResponse, 1), + ctx: ctx, + } +} + +func (m *mockProcessServer) Send(resp *extProcPb.ProcessingResponse) error { + m.sentCh <- resp + return nil +} + +func (m *mockProcessServer) Recv() (*extProcPb.ProcessingRequest, error) { + select { + case req := <-m.recvCh: + if req == nil { + return nil, io.EOF + } + return req, nil + case <-m.ctx.Done(): + return nil, m.ctx.Err() + } +} + +func (m *mockProcessServer) SetHeader(metadata.MD) error { return nil } +func (m *mockProcessServer) SendHeader(metadata.MD) error { return nil } +func (m *mockProcessServer) SetTrailer(metadata.MD) {} +func (m *mockProcessServer) Context() context.Context { return m.ctx } +func (m *mockProcessServer) SendMsg(any) error { return nil } +func (m *mockProcessServer) RecvMsg(any) error { return nil } + +func TestRecvOrAbort_NormalRecv(t *testing.T) { + t.Parallel() + s := &StreamingServer{} + ctx := context.Background() + srv := newMockProcessServer(ctx) + abortCh := make(chan struct{}) + + // Send a request before calling recvOrAbort. + expectedReq := &extProcPb.ProcessingRequest{ + Request: &extProcPb.ProcessingRequest_ResponseBody{ + ResponseBody: &extProcPb.HttpBody{ + Body: []byte("token"), + }, + }, + } + srv.recvCh <- expectedReq + + req, recvErr, abortErr := s.recvOrAbort(srv, abortCh) + + assert.NoError(t, abortErr, "No abort should have occurred") + assert.NoError(t, recvErr, "Recv should succeed") + assert.Equal(t, expectedReq, req, "Should receive the expected request") +} + +func TestRecvOrAbort_AbortBeforeRecv(t *testing.T) { + t.Parallel() + s := &StreamingServer{} + ctx := context.Background() + srv := newMockProcessServer(ctx) + abortCh := make(chan struct{}) + + // Close the abort channel before sending any request. + close(abortCh) + + req, _, abortErr := s.recvOrAbort(srv, abortCh) + + assert.ErrorIs(t, abortErr, errEvicted, "Should return eviction error") + assert.Nil(t, req, "Request should be nil on abort") + + // Verify that ImmediateResponse was sent. + select { + case sent := <-srv.sentCh: + ir := sent.GetImmediateResponse() + require.NotNil(t, ir, "Should have sent ImmediateResponse") + assert.Equal(t, envoyTypePb.StatusCode_ServiceUnavailable, ir.Status.Code) + assert.Equal(t, []byte("request evicted by flow control"), ir.Body) + case <-time.After(time.Second): + t.Fatal("Timeout waiting for ImmediateResponse") + } +} + +func TestRecvOrAbort_AbortDuringRecvWait(t *testing.T) { + t.Parallel() + s := &StreamingServer{} + ctx := context.Background() + srv := newMockProcessServer(ctx) + abortCh := make(chan struct{}) + + // Don't send any request — Recv() will block. + // Close abort channel after a short delay. + go func() { + time.Sleep(50 * time.Millisecond) + close(abortCh) + }() + + req, _, abortErr := s.recvOrAbort(srv, abortCh) + + assert.ErrorIs(t, abortErr, errEvicted, "Should return eviction error") + assert.Nil(t, req, "Request should be nil on abort") + + // Verify ImmediateResponse was sent. + select { + case sent := <-srv.sentCh: + ir := sent.GetImmediateResponse() + require.NotNil(t, ir) + assert.Equal(t, envoyTypePb.StatusCode_ServiceUnavailable, ir.Status.Code) + case <-time.After(time.Second): + t.Fatal("Timeout waiting for ImmediateResponse") + } +} + +func TestRecvOrAbort_RecvEOF(t *testing.T) { + t.Parallel() + s := &StreamingServer{} + ctx := context.Background() + srv := newMockProcessServer(ctx) + abortCh := make(chan struct{}) + + // Send nil to simulate EOF. + srv.recvCh <- nil + + req, recvErr, abortErr := s.recvOrAbort(srv, abortCh) + + assert.NoError(t, abortErr, "No abort should have occurred") + assert.ErrorIs(t, recvErr, io.EOF) + assert.Nil(t, req) +} From b1b12b4e5064f54b308d7d727fb28245219f7c70 Mon Sep 17 00:00:00 2001 From: RishabhSaini Date: Thu, 9 Apr 2026 20:27:51 -0400 Subject: [PATCH 2/5] Rename Aborter to Evictor, Plugin to RequestEvictor for naming consistency Signed-off-by: RishabhSaini --- pkg/epp/flowcontrol/eviction/aborter.go | 31 ++++--- pkg/epp/flowcontrol/eviction/aborter_test.go | 37 ++++---- pkg/epp/flowcontrol/eviction/plugin.go | 38 ++++----- pkg/epp/flowcontrol/eviction/plugin_test.go | 88 ++++++++------------ 4 files changed, 88 insertions(+), 106 deletions(-) diff --git a/pkg/epp/flowcontrol/eviction/aborter.go b/pkg/epp/flowcontrol/eviction/aborter.go index 84dc1c5649..6edf600bac 100644 --- a/pkg/epp/flowcontrol/eviction/aborter.go +++ b/pkg/epp/flowcontrol/eviction/aborter.go @@ -27,47 +27,46 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol" ) -// Aborter handles aborting an in-flight request on a model server. -type Aborter interface { - Abort(ctx context.Context, item *flowcontrol.EvictionItem) error +// Evictor handles evicting an in-flight request on a model server. +type Evictor interface { + Evict(ctx context.Context, item *flowcontrol.EvictionItem) error } -// NoOpAborter logs the eviction but does not abort the request on the model server. -type NoOpAborter struct{} +// NoOpEvictor logs the eviction but does not evict the request on the model server. +type NoOpEvictor struct{} -func (a *NoOpAborter) Abort(ctx context.Context, item *flowcontrol.EvictionItem) error { - log.FromContext(ctx).V(logutil.DEBUG).Info("Eviction selected request for abort (no-op: abort mechanism not available)", +func (e *NoOpEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error { + log.FromContext(ctx).V(logutil.DEBUG).Info("Eviction selected request (no-op: eviction mechanism not available)", "requestID", item.RequestID, "priority", item.Priority, "targetURL", item.TargetURL) return nil } -// ImmediateResponseAborter aborts requests by closing the EvictionItem's AbortCh. +// ImmediateResponseEvictor evicts requests by closing the EvictionItem's AbortCh. // The ext_proc Process() goroutine selects on this channel and sends an ImmediateResponse // to Envoy when it is closed, causing Envoy to reset the upstream connection to the model server. -type ImmediateResponseAborter struct { +type ImmediateResponseEvictor struct { // closeOnce tracks which channels have been closed to prevent double-close panics. closeOnce sync.Map // requestID → *sync.Once } -// NewImmediateResponseAborter creates an ImmediateResponseAborter. -func NewImmediateResponseAborter() *ImmediateResponseAborter { - return &ImmediateResponseAborter{} +// NewImmediateResponseEvictor creates an ImmediateResponseEvictor. +func NewImmediateResponseEvictor() *ImmediateResponseEvictor { + return &ImmediateResponseEvictor{} } -func (a *ImmediateResponseAborter) Abort(ctx context.Context, item *flowcontrol.EvictionItem) error { +func (e *ImmediateResponseEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error { if item.AbortCh == nil { return fmt.Errorf("eviction item %s has no abort channel", item.RequestID) } - // Use sync.Once to safely close the channel exactly once. - once, _ := a.closeOnce.LoadOrStore(item.RequestID, &sync.Once{}) + once, _ := e.closeOnce.LoadOrStore(item.RequestID, &sync.Once{}) once.(*sync.Once).Do(func() { close(item.AbortCh) }) - log.FromContext(ctx).Info("Abort signal sent", + log.FromContext(ctx).Info("Eviction signal sent", "requestID", item.RequestID, "priority", item.Priority, "targetURL", item.TargetURL) diff --git a/pkg/epp/flowcontrol/eviction/aborter_test.go b/pkg/epp/flowcontrol/eviction/aborter_test.go index d3244e364e..28d5651634 100644 --- a/pkg/epp/flowcontrol/eviction/aborter_test.go +++ b/pkg/epp/flowcontrol/eviction/aborter_test.go @@ -26,9 +26,9 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol" ) -func TestImmediateResponseAborter_ClosesChannel(t *testing.T) { +func TestImmediateResponseEvictor_ClosesChannel(t *testing.T) { t.Parallel() - aborter := NewImmediateResponseAborter() + evictor := NewImmediateResponseEvictor() abortCh := make(chan struct{}) item := &flowcontrol.EvictionItem{ @@ -36,21 +36,19 @@ func TestImmediateResponseAborter_ClosesChannel(t *testing.T) { AbortCh: abortCh, } - err := aborter.Abort(context.Background(), item) + err := evictor.Evict(context.Background(), item) require.NoError(t, err) - // Channel should be closed. select { case <-abortCh: - // success — channel is closed default: - t.Fatal("abort channel should be closed after Abort()") + t.Fatal("abort channel should be closed after Evict()") } } -func TestImmediateResponseAborter_DoubleAbortSafe(t *testing.T) { +func TestImmediateResponseEvictor_DoubleEvictSafe(t *testing.T) { t.Parallel() - aborter := NewImmediateResponseAborter() + evictor := NewImmediateResponseEvictor() abortCh := make(chan struct{}) item := &flowcontrol.EvictionItem{ @@ -58,36 +56,35 @@ func TestImmediateResponseAborter_DoubleAbortSafe(t *testing.T) { AbortCh: abortCh, } - // First abort should succeed. - err := aborter.Abort(context.Background(), item) + err := evictor.Evict(context.Background(), item) require.NoError(t, err) - // Second abort on same request should not panic. - err = aborter.Abort(context.Background(), item) + // Second evict on same request should not panic. + err = evictor.Evict(context.Background(), item) require.NoError(t, err) } -func TestImmediateResponseAborter_NilChannel(t *testing.T) { +func TestImmediateResponseEvictor_NilChannel(t *testing.T) { t.Parallel() - aborter := NewImmediateResponseAborter() + evictor := NewImmediateResponseEvictor() item := &flowcontrol.EvictionItem{ RequestID: "req-1", AbortCh: nil, } - err := aborter.Abort(context.Background(), item) - assert.Error(t, err, "Abort with nil channel should return error") + err := evictor.Evict(context.Background(), item) + assert.Error(t, err, "Evict with nil channel should return error") } -func TestNoOpAborter(t *testing.T) { +func TestNoOpEvictor(t *testing.T) { t.Parallel() - aborter := &NoOpAborter{} + evictor := &NoOpEvictor{} item := &flowcontrol.EvictionItem{ RequestID: "req-1", } - err := aborter.Abort(context.Background(), item) - assert.NoError(t, err, "NoOpAborter should always succeed") + err := evictor.Evict(context.Background(), item) + assert.NoError(t, err, "NoOpEvictor should always succeed") } diff --git a/pkg/epp/flowcontrol/eviction/plugin.go b/pkg/epp/flowcontrol/eviction/plugin.go index dcca42898e..4295a4d04b 100644 --- a/pkg/epp/flowcontrol/eviction/plugin.go +++ b/pkg/epp/flowcontrol/eviction/plugin.go @@ -32,42 +32,42 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling" ) -var _ requestcontrol.PreRequest = &Plugin{} -var _ requestcontrol.ResponseBodyProcessor = &Plugin{} +var _ requestcontrol.PreRequest = &RequestEvictor{} +var _ requestcontrol.ResponseBody = &RequestEvictor{} -// Plugin tracks in-flight requests via RequestControl hooks and provides eviction capability. -type Plugin struct { +// RequestEvictor tracks in-flight requests via RequestControl hooks and provides eviction capability. +type RequestEvictor struct { queue *EvictionQueue - aborter Aborter + evictor Evictor abortRegistry *AbortRegistry } -// NewPlugin creates an eviction plugin with the given policies and aborter. -func NewPlugin( +// NewRequestEvictor creates a RequestEvictor with the given policies and evictor. +func NewRequestEvictor( ordering flowcontrol.EvictionOrderingPolicy, filter flowcontrol.EvictionFilterPolicy, - aborter Aborter, -) *Plugin { - return &Plugin{ + evictor Evictor, +) *RequestEvictor { + return &RequestEvictor{ queue: NewEvictionQueue(ordering, filter), - aborter: aborter, + evictor: evictor, abortRegistry: NewAbortRegistry(), } } // AbortRegistry returns the shared abort registry. // The ext_proc Process() goroutine uses this to look up abort channels for dispatched requests. -func (p *Plugin) AbortRegistry() *AbortRegistry { +func (p *RequestEvictor) AbortRegistry() *AbortRegistry { return p.abortRegistry } -func (p *Plugin) TypedName() plugin.TypedName { +func (p *RequestEvictor) TypedName() plugin.TypedName { return plugin.TypedName{Type: "EvictionPlugin", Name: "eviction"} } // PreRequest is called after scheduling, before the request reaches the model server. // It tracks the request and, if the filter policy accepts it, adds it to the eviction queue. -func (p *Plugin) PreRequest( +func (p *RequestEvictor) PreRequest( ctx context.Context, request *scheduling.InferenceRequest, result *scheduling.SchedulingResult, @@ -121,7 +121,7 @@ func (p *Plugin) PreRequest( // ResponseBody is called for every response data chunk (streaming) or once (non-streaming). // On the final call (EndOfStream == true), it removes the request from tracking and the eviction queue. -func (p *Plugin) ResponseBody( +func (p *RequestEvictor) ResponseBody( ctx context.Context, request *scheduling.InferenceRequest, response *requestcontrol.Response, @@ -151,7 +151,7 @@ func (p *Plugin) ResponseBody( // Each request is only removed from tracking after a successful abort. If the abort fails, // the request remains in the queue for a future eviction attempt. // Returns the request IDs that were successfully aborted. -func (p *Plugin) EvictN(ctx context.Context, n int) ([]string, error) { +func (p *RequestEvictor) EvictN(ctx context.Context, n int) ([]string, error) { logger := log.FromContext(ctx) aborted := make([]string, 0, n) @@ -162,8 +162,8 @@ func (p *Plugin) EvictN(ctx context.Context, n int) ([]string, error) { } item := items[0] - if err := p.aborter.Abort(ctx, item); err != nil { - logger.Error(err, "Failed to abort request, re-tracking", "requestID", item.RequestID, "targetURL", item.TargetURL) + if err := p.evictor.Evict(ctx, item); err != nil { + logger.Error(err, "Failed to evict request, re-tracking", "requestID", item.RequestID, "targetURL", item.TargetURL) p.queue.Track(item) continue } @@ -177,6 +177,6 @@ func (p *Plugin) EvictN(ctx context.Context, n int) ([]string, error) { } // Stats returns the current in-flight and evictable request counts. -func (p *Plugin) Stats() (inFlight int, evictable int) { +func (p *RequestEvictor) Stats() (inFlight int, evictable int) { return p.queue.InFlightLen(), p.queue.EvictableLen() } diff --git a/pkg/epp/flowcontrol/eviction/plugin_test.go b/pkg/epp/flowcontrol/eviction/plugin_test.go index de9a976228..c50a53520e 100644 --- a/pkg/epp/flowcontrol/eviction/plugin_test.go +++ b/pkg/epp/flowcontrol/eviction/plugin_test.go @@ -61,104 +61,91 @@ func makeLLMRequest(requestID string, priority int) *scheduling.LLMRequest { //n // --- Tests --- -func TestPlugin_PreRequest_CreatesAbortChannel(t *testing.T) { +func TestRequestEvictor_PreRequest_CreatesAbortChannel(t *testing.T) { t.Parallel() - p := NewPlugin(&testOrdering{}, &acceptAllFilter{}, &NoOpAborter{}) + re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, &NoOpEvictor{}) ctx := context.Background() - p.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) + re.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) - // Verify the abort channel is registered. - abortCh := p.AbortRegistry().Get("req-1") + abortCh := re.AbortRegistry().Get("req-1") require.NotNil(t, abortCh, "AbortCh should be registered after PreRequest") - // Verify tracked in queue. - assert.Equal(t, 1, p.queue.InFlightLen()) - assert.Equal(t, 1, p.queue.EvictableLen()) + assert.Equal(t, 1, re.queue.InFlightLen()) + assert.Equal(t, 1, re.queue.EvictableLen()) } -func TestPlugin_ResponseBody_DeregistersAbortChannel(t *testing.T) { +func TestRequestEvictor_ResponseBody_DeregistersAbortChannel(t *testing.T) { t.Parallel() - p := NewPlugin(&testOrdering{}, &acceptAllFilter{}, &NoOpAborter{}) + re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, &NoOpEvictor{}) ctx := context.Background() request := makeLLMRequest("req-1", -1) - p.PreRequest(ctx, request, makeSchedulingResult()) - require.NotNil(t, p.AbortRegistry().Get("req-1")) + re.PreRequest(ctx, request, makeSchedulingResult()) + require.NotNil(t, re.AbortRegistry().Get("req-1")) - // Complete the request. - p.ResponseBody(ctx, request, &requestcontrol.Response{EndOfStream: true}, nil) + re.ResponseBody(ctx, request, &requestcontrol.Response{EndOfStream: true}, nil) - assert.Nil(t, p.AbortRegistry().Get("req-1"), "AbortCh should be deregistered after completion") - assert.Equal(t, 0, p.queue.InFlightLen()) + assert.Nil(t, re.AbortRegistry().Get("req-1"), "AbortCh should be deregistered after completion") + assert.Equal(t, 0, re.queue.InFlightLen()) } -func TestPlugin_EvictN_ClosesAbortChannel(t *testing.T) { +func TestRequestEvictor_EvictN_ClosesAbortChannel(t *testing.T) { t.Parallel() - aborter := NewImmediateResponseAborter() - p := NewPlugin(&testOrdering{}, &acceptAllFilter{}, aborter) + evictor := NewImmediateResponseEvictor() + re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, evictor) ctx := context.Background() - p.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) + re.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) - // Grab the channel before eviction. - abortCh := p.AbortRegistry().Get("req-1") + abortCh := re.AbortRegistry().Get("req-1") require.NotNil(t, abortCh) - // Evict. - aborted, err := p.EvictN(ctx, 1) + evicted, err := re.EvictN(ctx, 1) require.NoError(t, err) - require.Equal(t, []string{"req-1"}, aborted) + require.Equal(t, []string{"req-1"}, evicted) - // Channel should be closed. select { case <-abortCh: - // success default: t.Fatal("abort channel should be closed after EvictN") } } -func TestPlugin_EvictN_ReTracksOnAbortFailure(t *testing.T) { +func TestRequestEvictor_EvictN_ReTracksOnFailure(t *testing.T) { t.Parallel() - p := NewPlugin(&testOrdering{}, &acceptAllFilter{}, &failingAborter{}) + re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, &failingEvictor{}) ctx := context.Background() - p.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) + re.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) - aborted, err := p.EvictN(ctx, 1) + evicted, err := re.EvictN(ctx, 1) require.NoError(t, err) - assert.Empty(t, aborted) + assert.Empty(t, evicted) - // Item should be re-tracked. - assert.Equal(t, 1, p.queue.EvictableLen()) + assert.Equal(t, 1, re.queue.EvictableLen()) } -func TestPlugin_RaceBetweenEvictAndCompletion(t *testing.T) { +func TestRequestEvictor_RaceBetweenEvictAndCompletion(t *testing.T) { t.Parallel() - aborter := NewImmediateResponseAborter() - p := NewPlugin(&testOrdering{}, &acceptAllFilter{}, aborter) + evictor := NewImmediateResponseEvictor() + re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, evictor) ctx := context.Background() - // Track multiple requests. requests := make([]*scheduling.LLMRequest, 10) for i := range requests { - requests[i] = makeLLMRequest( - "req-"+string(rune('a'+i)), - -1, - ) - p.PreRequest(ctx, requests[i], makeSchedulingResult()) + requests[i] = makeLLMRequest("req-"+string(rune('a'+i)), -1) + re.PreRequest(ctx, requests[i], makeSchedulingResult()) } - // Concurrently evict and complete. var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for range 5 { - _, _ = p.EvictN(ctx, 1) + _, _ = re.EvictN(ctx, 1) time.Sleep(time.Millisecond) } }() @@ -166,23 +153,22 @@ func TestPlugin_RaceBetweenEvictAndCompletion(t *testing.T) { go func() { defer wg.Done() for _, req := range requests { - p.ResponseBody(ctx, req, &requestcontrol.Response{EndOfStream: true}, nil) + re.ResponseBody(ctx, req, &requestcontrol.Response{EndOfStream: true}, nil) time.Sleep(time.Millisecond) } }() wg.Wait() - // No panics, no deadlocks. State should be consistent. - inFlight, evictable := p.Stats() + inFlight, evictable := re.Stats() assert.GreaterOrEqual(t, inFlight, 0) assert.GreaterOrEqual(t, evictable, 0) assert.GreaterOrEqual(t, inFlight, evictable) } -// failingAborter always returns an error. -type failingAborter struct{} +// failingEvictor always returns an error. +type failingEvictor struct{} -func (a *failingAborter) Abort(_ context.Context, _ *flowcontrol.EvictionItem) error { +func (e *failingEvictor) Evict(_ context.Context, _ *flowcontrol.EvictionItem) error { return assert.AnError } From dc45109bd36411b7dd0a651aa96a2f1897c5d6c8 Mon Sep 17 00:00:00 2001 From: RishabhSaini Date: Thu, 9 Apr 2026 20:28:18 -0400 Subject: [PATCH 3/5] Use 429 (TooManyRequests) instead of 503 for eviction ImmediateResponse Signed-off-by: RishabhSaini --- pkg/epp/handlers/server.go | 2 +- pkg/epp/handlers/server_abort_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 97279db7c3..f7d7560fa3 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -403,7 +403,7 @@ func (s *StreamingServer) recvOrAbort( Response: &extProcPb.ProcessingResponse_ImmediateResponse{ ImmediateResponse: &extProcPb.ImmediateResponse{ Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_ServiceUnavailable, + Code: envoyTypePb.StatusCode_TooManyRequests, }, Body: []byte("request evicted by flow control"), }, diff --git a/pkg/epp/handlers/server_abort_test.go b/pkg/epp/handlers/server_abort_test.go index 3d2436b2df..3c3de1fbd5 100644 --- a/pkg/epp/handlers/server_abort_test.go +++ b/pkg/epp/handlers/server_abort_test.go @@ -112,7 +112,7 @@ func TestRecvOrAbort_AbortBeforeRecv(t *testing.T) { case sent := <-srv.sentCh: ir := sent.GetImmediateResponse() require.NotNil(t, ir, "Should have sent ImmediateResponse") - assert.Equal(t, envoyTypePb.StatusCode_ServiceUnavailable, ir.Status.Code) + assert.Equal(t, envoyTypePb.StatusCode_TooManyRequests, ir.Status.Code) assert.Equal(t, []byte("request evicted by flow control"), ir.Body) case <-time.After(time.Second): t.Fatal("Timeout waiting for ImmediateResponse") @@ -143,7 +143,7 @@ func TestRecvOrAbort_AbortDuringRecvWait(t *testing.T) { case sent := <-srv.sentCh: ir := sent.GetImmediateResponse() require.NotNil(t, ir) - assert.Equal(t, envoyTypePb.StatusCode_ServiceUnavailable, ir.Status.Code) + assert.Equal(t, envoyTypePb.StatusCode_TooManyRequests, ir.Status.Code) case <-time.After(time.Second): t.Fatal("Timeout waiting for ImmediateResponse") } From 7e6a8ce1b053c09cc817dc73b67f75561a0b251b Mon Sep 17 00:00:00 2001 From: RishabhSaini Date: Thu, 9 Apr 2026 20:59:25 -0400 Subject: [PATCH 4/5] Refactor Process() loop: single reader goroutine, state machine eviction, evictor cleanup - Replace per-message recvOrAbort goroutine with single reader goroutine for the stream lifetime, using nil channel select pattern - Remove errEvicted sentinel; eviction is a state transition (RequestEvicted) handled by updateStateAndSendIfNeeded, not an error - Move ImmediateResponse send from inline code into the state machine - Add EvictorWithCleanup interface and cleanupRequest helper to prevent ImmediateResponseEvictor.closeOnce map from growing unbounded Signed-off-by: RishabhSaini --- ...abort_registry.go => eviction_registry.go} | 28 +-- ...stry_test.go => eviction_registry_test.go} | 12 +- .../eviction/{aborter.go => evictor.go} | 14 +- .../{aborter_test.go => evictor_test.go} | 49 ++++- pkg/epp/flowcontrol/eviction/plugin.go | 65 ++++--- pkg/epp/flowcontrol/eviction/plugin_test.go | 114 ++++++++++-- .../interface/flowcontrol/eviction.go | 4 +- pkg/epp/handlers/server.go | 169 ++++++++++-------- pkg/epp/handlers/server_abort_test.go | 153 ++++------------ 9 files changed, 334 insertions(+), 274 deletions(-) rename pkg/epp/flowcontrol/eviction/{abort_registry.go => eviction_registry.go} (58%) rename pkg/epp/flowcontrol/eviction/{abort_registry_test.go => eviction_registry_test.go} (86%) rename pkg/epp/flowcontrol/eviction/{aborter.go => evictor.go} (86%) rename pkg/epp/flowcontrol/eviction/{aborter_test.go => evictor_test.go} (64%) diff --git a/pkg/epp/flowcontrol/eviction/abort_registry.go b/pkg/epp/flowcontrol/eviction/eviction_registry.go similarity index 58% rename from pkg/epp/flowcontrol/eviction/abort_registry.go rename to pkg/epp/flowcontrol/eviction/eviction_registry.go index 1fc02b61fd..07c5b51769 100644 --- a/pkg/epp/flowcontrol/eviction/abort_registry.go +++ b/pkg/epp/flowcontrol/eviction/eviction_registry.go @@ -18,45 +18,45 @@ package eviction import "sync" -// AbortRegistry is a shared registry that maps request IDs to abort channels. +// EvictionRegistry is a shared registry that maps request IDs to eviction channels. // It bridges the eviction plugin (which decides what to evict) and the ext_proc Process() // goroutine (which owns the stream needed to send ImmediateResponse). // // Lifecycle: -// - PreRequest: plugin creates an abort channel and registers it via Register(). +// - PreRequest: plugin creates an eviction channel and registers it via Register(). // - Process(): after HandleRequest returns, looks up the channel via Get() and selects on it. -// - EvictN: aborter closes the channel via the EvictionItem.AbortCh reference. +// - EvictN: evictor closes the channel via the EvictionItem.EvictCh reference. // - Process() defer: removes the channel via Deregister(). // // All methods are goroutine-safe. -type AbortRegistry struct { +type EvictionRegistry struct { mu sync.RWMutex - channels map[string]chan struct{} // requestID → abort channel + channels map[string]chan struct{} // requestID → eviction channel } -// NewAbortRegistry creates a new AbortRegistry. -func NewAbortRegistry() *AbortRegistry { - return &AbortRegistry{ +// NewEvictionRegistry creates a new EvictionRegistry. +func NewEvictionRegistry() *EvictionRegistry { + return &EvictionRegistry{ channels: make(map[string]chan struct{}), } } -// Register stores an abort channel for the given request ID. -func (r *AbortRegistry) Register(requestID string, ch chan struct{}) { +// Register stores an eviction channel for the given request ID. +func (r *EvictionRegistry) Register(requestID string, ch chan struct{}) { r.mu.Lock() defer r.mu.Unlock() r.channels[requestID] = ch } -// Get returns the abort channel for the given request ID, or nil if not found. -func (r *AbortRegistry) Get(requestID string) chan struct{} { +// Get returns the eviction channel for the given request ID, or nil if not found. +func (r *EvictionRegistry) Get(requestID string) chan struct{} { r.mu.RLock() defer r.mu.RUnlock() return r.channels[requestID] } -// Deregister removes the abort channel for the given request ID. -func (r *AbortRegistry) Deregister(requestID string) { +// Deregister removes the eviction channel for the given request ID. +func (r *EvictionRegistry) Deregister(requestID string) { r.mu.Lock() defer r.mu.Unlock() delete(r.channels, requestID) diff --git a/pkg/epp/flowcontrol/eviction/abort_registry_test.go b/pkg/epp/flowcontrol/eviction/eviction_registry_test.go similarity index 86% rename from pkg/epp/flowcontrol/eviction/abort_registry_test.go rename to pkg/epp/flowcontrol/eviction/eviction_registry_test.go index 40ea09aa70..809e090b6f 100644 --- a/pkg/epp/flowcontrol/eviction/abort_registry_test.go +++ b/pkg/epp/flowcontrol/eviction/eviction_registry_test.go @@ -24,9 +24,9 @@ import ( "github.com/stretchr/testify/assert" ) -func TestAbortRegistry_RegisterAndGet(t *testing.T) { +func TestEvictionRegistry_RegisterAndGet(t *testing.T) { t.Parallel() - r := NewAbortRegistry() + r := NewEvictionRegistry() ch := make(chan struct{}) r.Register("req-1", ch) @@ -37,9 +37,9 @@ func TestAbortRegistry_RegisterAndGet(t *testing.T) { assert.Nil(t, r.Get("non-existent"), "Get for unregistered ID should return nil") } -func TestAbortRegistry_Deregister(t *testing.T) { +func TestEvictionRegistry_Deregister(t *testing.T) { t.Parallel() - r := NewAbortRegistry() + r := NewEvictionRegistry() ch := make(chan struct{}) r.Register("req-1", ch) @@ -51,9 +51,9 @@ func TestAbortRegistry_Deregister(t *testing.T) { r.Deregister("non-existent") } -func TestAbortRegistry_Concurrency(t *testing.T) { +func TestEvictionRegistry_Concurrency(t *testing.T) { t.Parallel() - r := NewAbortRegistry() + r := NewEvictionRegistry() const goroutines = 10 const opsPerGoroutine = 100 diff --git a/pkg/epp/flowcontrol/eviction/aborter.go b/pkg/epp/flowcontrol/eviction/evictor.go similarity index 86% rename from pkg/epp/flowcontrol/eviction/aborter.go rename to pkg/epp/flowcontrol/eviction/evictor.go index 6edf600bac..5160d97c40 100644 --- a/pkg/epp/flowcontrol/eviction/aborter.go +++ b/pkg/epp/flowcontrol/eviction/evictor.go @@ -43,7 +43,7 @@ func (e *NoOpEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) return nil } -// ImmediateResponseEvictor evicts requests by closing the EvictionItem's AbortCh. +// ImmediateResponseEvictor evicts requests by closing the EvictionItem's EvictCh. // The ext_proc Process() goroutine selects on this channel and sends an ImmediateResponse // to Envoy when it is closed, causing Envoy to reset the upstream connection to the model server. type ImmediateResponseEvictor struct { @@ -57,13 +57,13 @@ func NewImmediateResponseEvictor() *ImmediateResponseEvictor { } func (e *ImmediateResponseEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error { - if item.AbortCh == nil { - return fmt.Errorf("eviction item %s has no abort channel", item.RequestID) + if item.EvictCh == nil { + return fmt.Errorf("eviction item %s has no eviction channel", item.RequestID) } once, _ := e.closeOnce.LoadOrStore(item.RequestID, &sync.Once{}) once.(*sync.Once).Do(func() { - close(item.AbortCh) + close(item.EvictCh) }) log.FromContext(ctx).Info("Eviction signal sent", @@ -72,3 +72,9 @@ func (e *ImmediateResponseEvictor) Evict(ctx context.Context, item *flowcontrol. "targetURL", item.TargetURL) return nil } + +// Cleanup removes the sync.Once entry for a request ID to prevent unbounded map growth. +// Called when a request completes or is untracked. +func (e *ImmediateResponseEvictor) Cleanup(requestID string) { + e.closeOnce.Delete(requestID) +} diff --git a/pkg/epp/flowcontrol/eviction/aborter_test.go b/pkg/epp/flowcontrol/eviction/evictor_test.go similarity index 64% rename from pkg/epp/flowcontrol/eviction/aborter_test.go rename to pkg/epp/flowcontrol/eviction/evictor_test.go index 28d5651634..c7465d0803 100644 --- a/pkg/epp/flowcontrol/eviction/aborter_test.go +++ b/pkg/epp/flowcontrol/eviction/evictor_test.go @@ -30,19 +30,19 @@ func TestImmediateResponseEvictor_ClosesChannel(t *testing.T) { t.Parallel() evictor := NewImmediateResponseEvictor() - abortCh := make(chan struct{}) + evictCh := make(chan struct{}) item := &flowcontrol.EvictionItem{ RequestID: "req-1", - AbortCh: abortCh, + EvictCh: evictCh, } err := evictor.Evict(context.Background(), item) require.NoError(t, err) select { - case <-abortCh: + case <-evictCh: default: - t.Fatal("abort channel should be closed after Evict()") + t.Fatal("eviction channel should be closed after Evict()") } } @@ -50,10 +50,10 @@ func TestImmediateResponseEvictor_DoubleEvictSafe(t *testing.T) { t.Parallel() evictor := NewImmediateResponseEvictor() - abortCh := make(chan struct{}) + evictCh := make(chan struct{}) item := &flowcontrol.EvictionItem{ RequestID: "req-1", - AbortCh: abortCh, + EvictCh: evictCh, } err := evictor.Evict(context.Background(), item) @@ -70,13 +70,48 @@ func TestImmediateResponseEvictor_NilChannel(t *testing.T) { item := &flowcontrol.EvictionItem{ RequestID: "req-1", - AbortCh: nil, + EvictCh: nil, } err := evictor.Evict(context.Background(), item) assert.Error(t, err, "Evict with nil channel should return error") } +func TestImmediateResponseEvictor_Cleanup(t *testing.T) { + t.Parallel() + evictor := NewImmediateResponseEvictor() + + evictCh := make(chan struct{}) + item := &flowcontrol.EvictionItem{ + RequestID: "req-1", + EvictCh: evictCh, + } + + _ = evictor.Evict(context.Background(), item) + + // Cleanup should remove the sync.Once entry. + evictor.Cleanup("req-1") + + // After cleanup, a new Evict on the same requestID with a new channel should work + // (the old sync.Once is gone, so a new one will be created). + evictCh2 := make(chan struct{}) + item2 := &flowcontrol.EvictionItem{ + RequestID: "req-1", + EvictCh: evictCh2, + } + err := evictor.Evict(context.Background(), item2) + require.NoError(t, err) + + select { + case <-evictCh2: + default: + t.Fatal("new channel should be closed after Evict post-Cleanup") + } + + // Cleanup non-existent should not panic. + evictor.Cleanup("non-existent") +} + func TestNoOpEvictor(t *testing.T) { t.Parallel() evictor := &NoOpEvictor{} diff --git a/pkg/epp/flowcontrol/eviction/plugin.go b/pkg/epp/flowcontrol/eviction/plugin.go index 4295a4d04b..a76dfcc757 100644 --- a/pkg/epp/flowcontrol/eviction/plugin.go +++ b/pkg/epp/flowcontrol/eviction/plugin.go @@ -33,13 +33,13 @@ import ( ) var _ requestcontrol.PreRequest = &RequestEvictor{} -var _ requestcontrol.ResponseBody = &RequestEvictor{} +var _ requestcontrol.ResponseBodyProcessor = &RequestEvictor{} // RequestEvictor tracks in-flight requests via RequestControl hooks and provides eviction capability. type RequestEvictor struct { - queue *EvictionQueue - evictor Evictor - abortRegistry *AbortRegistry + queue *EvictionQueue + evictor Evictor + evictionRegistry *EvictionRegistry } // NewRequestEvictor creates a RequestEvictor with the given policies and evictor. @@ -49,16 +49,16 @@ func NewRequestEvictor( evictor Evictor, ) *RequestEvictor { return &RequestEvictor{ - queue: NewEvictionQueue(ordering, filter), - evictor: evictor, - abortRegistry: NewAbortRegistry(), + queue: NewEvictionQueue(ordering, filter), + evictor: evictor, + evictionRegistry: NewEvictionRegistry(), } } -// AbortRegistry returns the shared abort registry. -// The ext_proc Process() goroutine uses this to look up abort channels for dispatched requests. -func (p *RequestEvictor) AbortRegistry() *AbortRegistry { - return p.abortRegistry +// EvictionRegistry returns the shared eviction registry. +// The ext_proc Process() goroutine uses this to look up eviction channels for dispatched requests. +func (p *RequestEvictor) EvictionRegistry() *EvictionRegistry { + return p.evictionRegistry } func (p *RequestEvictor) TypedName() plugin.TypedName { @@ -88,7 +88,7 @@ func (p *RequestEvictor) PreRequest( return } - abortCh := make(chan struct{}) + evictCh := make(chan struct{}) item := &flowcontrol.EvictionItem{ RequestID: requestID, @@ -97,19 +97,18 @@ func (p *RequestEvictor) PreRequest( TargetURL: "http://" + net.JoinHostPort(metadata.GetIPAddress(), metadata.GetPort()), Request: request, TargetEndpoint: metadata, - AbortCh: abortCh, + EvictCh: evictCh, } p.queue.Track(item) - p.abortRegistry.Register(requestID, abortCh) + p.evictionRegistry.Register(requestID, evictCh) // Bind untrack to the request context's lifetime as a safety net. // If the client disconnects and ResponseBody(EndOfStream) never fires, // ctx.Done() ensures the request is still cleaned up. Untrack is idempotent. go func() { <-ctx.Done() - p.queue.Untrack(requestID) - p.abortRegistry.Deregister(requestID) + p.cleanupRequest(requestID) }() log.FromContext(ctx).V(logutil.DEBUG).Info("Tracked in-flight request", @@ -138,8 +137,7 @@ func (p *RequestEvictor) ResponseBody( return } - p.queue.Untrack(requestID) - p.abortRegistry.Deregister(requestID) + p.cleanupRequest(requestID) log.FromContext(ctx).V(logutil.DEBUG).Info("Untracked completed request", "requestID", requestID, @@ -148,12 +146,12 @@ func (p *RequestEvictor) ResponseBody( } // EvictN attempts to evict up to n requests from the eviction queue. -// Each request is only removed from tracking after a successful abort. If the abort fails, +// Each request is only removed from tracking after a successful eviction. If the eviction fails, // the request remains in the queue for a future eviction attempt. -// Returns the request IDs that were successfully aborted. +// Returns the request IDs that were successfully evicted. func (p *RequestEvictor) EvictN(ctx context.Context, n int) ([]string, error) { logger := log.FromContext(ctx) - aborted := make([]string, 0, n) + evicted := make([]string, 0, n) for range n { items := p.queue.PopN(1) @@ -167,16 +165,33 @@ func (p *RequestEvictor) EvictN(ctx context.Context, n int) ([]string, error) { p.queue.Track(item) continue } - aborted = append(aborted, item.RequestID) + evicted = append(evicted, item.RequestID) } - if len(aborted) > 0 { - logger.Info("Eviction complete", "requested", n, "aborted", len(aborted)) + if len(evicted) > 0 { + logger.Info("Eviction complete", "requested", n, "evicted", len(evicted)) } - return aborted, nil + return evicted, nil } // Stats returns the current in-flight and evictable request counts. func (p *RequestEvictor) Stats() (inFlight int, evictable int) { return p.queue.InFlightLen(), p.queue.EvictableLen() } + +// cleanupRequest removes a request from all tracking structures. +// If the evictor supports cleanup (e.g., ImmediateResponseEvictor), it also +// cleans up evictor-internal state to prevent unbounded map growth. +func (p *RequestEvictor) cleanupRequest(requestID string) { + p.queue.Untrack(requestID) + p.evictionRegistry.Deregister(requestID) + if c, ok := p.evictor.(EvictorWithCleanup); ok { + c.Cleanup(requestID) + } +} + +// EvictorWithCleanup is an optional interface for evictors that maintain per-request state +// that needs to be cleaned up when a request completes or is untracked. +type EvictorWithCleanup interface { + Cleanup(requestID string) +} diff --git a/pkg/epp/flowcontrol/eviction/plugin_test.go b/pkg/epp/flowcontrol/eviction/plugin_test.go index c50a53520e..843050b7f6 100644 --- a/pkg/epp/flowcontrol/eviction/plugin_test.go +++ b/pkg/epp/flowcontrol/eviction/plugin_test.go @@ -49,8 +49,8 @@ func makeSchedulingResult() *scheduling.SchedulingResult { } } -func makeLLMRequest(requestID string, priority int) *scheduling.LLMRequest { //nolint:unparam - return &scheduling.LLMRequest{ +func makeInferenceRequest(requestID string, priority int) *scheduling.InferenceRequest { //nolint:unparam + return &scheduling.InferenceRequest{ RequestId: requestID, Headers: map[string]string{ reqcommon.RequestIdHeaderKey: requestID, @@ -61,54 +61,54 @@ func makeLLMRequest(requestID string, priority int) *scheduling.LLMRequest { //n // --- Tests --- -func TestRequestEvictor_PreRequest_CreatesAbortChannel(t *testing.T) { +func TestRequestEvictor_PreRequest_CreatesEvictChannel(t *testing.T) { t.Parallel() re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, &NoOpEvictor{}) ctx := context.Background() - re.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) + re.PreRequest(ctx, makeInferenceRequest("req-1", -1), makeSchedulingResult()) - abortCh := re.AbortRegistry().Get("req-1") - require.NotNil(t, abortCh, "AbortCh should be registered after PreRequest") + evictCh := re.EvictionRegistry().Get("req-1") + require.NotNil(t, evictCh, "EvictCh should be registered after PreRequest") assert.Equal(t, 1, re.queue.InFlightLen()) assert.Equal(t, 1, re.queue.EvictableLen()) } -func TestRequestEvictor_ResponseBody_DeregistersAbortChannel(t *testing.T) { +func TestRequestEvictor_ResponseBody_DeregistersEvictChannel(t *testing.T) { t.Parallel() re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, &NoOpEvictor{}) ctx := context.Background() - request := makeLLMRequest("req-1", -1) + request := makeInferenceRequest("req-1", -1) re.PreRequest(ctx, request, makeSchedulingResult()) - require.NotNil(t, re.AbortRegistry().Get("req-1")) + require.NotNil(t, re.EvictionRegistry().Get("req-1")) re.ResponseBody(ctx, request, &requestcontrol.Response{EndOfStream: true}, nil) - assert.Nil(t, re.AbortRegistry().Get("req-1"), "AbortCh should be deregistered after completion") + assert.Nil(t, re.EvictionRegistry().Get("req-1"), "EvictCh should be deregistered after completion") assert.Equal(t, 0, re.queue.InFlightLen()) } -func TestRequestEvictor_EvictN_ClosesAbortChannel(t *testing.T) { +func TestRequestEvictor_EvictN_ClosesEvictChannel(t *testing.T) { t.Parallel() evictor := NewImmediateResponseEvictor() re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, evictor) ctx := context.Background() - re.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) + re.PreRequest(ctx, makeInferenceRequest("req-1", -1), makeSchedulingResult()) - abortCh := re.AbortRegistry().Get("req-1") - require.NotNil(t, abortCh) + evictCh := re.EvictionRegistry().Get("req-1") + require.NotNil(t, evictCh) evicted, err := re.EvictN(ctx, 1) require.NoError(t, err) require.Equal(t, []string{"req-1"}, evicted) select { - case <-abortCh: + case <-evictCh: default: - t.Fatal("abort channel should be closed after EvictN") + t.Fatal("eviction channel should be closed after EvictN") } } @@ -117,7 +117,7 @@ func TestRequestEvictor_EvictN_ReTracksOnFailure(t *testing.T) { re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, &failingEvictor{}) ctx := context.Background() - re.PreRequest(ctx, makeLLMRequest("req-1", -1), makeSchedulingResult()) + re.PreRequest(ctx, makeInferenceRequest("req-1", -1), makeSchedulingResult()) evicted, err := re.EvictN(ctx, 1) require.NoError(t, err) @@ -133,9 +133,9 @@ func TestRequestEvictor_RaceBetweenEvictAndCompletion(t *testing.T) { ctx := context.Background() - requests := make([]*scheduling.LLMRequest, 10) + requests := make([]*scheduling.InferenceRequest, 10) for i := range requests { - requests[i] = makeLLMRequest("req-"+string(rune('a'+i)), -1) + requests[i] = makeInferenceRequest("req-"+string(rune('a'+i)), -1) re.PreRequest(ctx, requests[i], makeSchedulingResult()) } @@ -166,6 +166,82 @@ func TestRequestEvictor_RaceBetweenEvictAndCompletion(t *testing.T) { assert.GreaterOrEqual(t, inFlight, evictable) } +func TestRequestEvictor_CtxCancellationTriggersCleanup(t *testing.T) { + t.Parallel() + evictor := NewImmediateResponseEvictor() + re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, evictor) + + ctx, cancel := context.WithCancel(context.Background()) + re.PreRequest(ctx, makeInferenceRequest("req-1", -1), makeSchedulingResult()) + + // Verify tracked. + assert.Equal(t, 1, re.queue.InFlightLen()) + assert.Equal(t, 1, re.queue.EvictableLen()) + assert.NotNil(t, re.EvictionRegistry().Get("req-1")) + + // Cancel the context — the goroutine in PreRequest should fire and call cleanupRequest. + cancel() + + // Wait briefly for the goroutine to execute. + assert.Eventually(t, func() bool { + return re.queue.InFlightLen() == 0 + }, time.Second, 10*time.Millisecond, "InFlightLen should be 0 after context cancellation") + + assert.Equal(t, 0, re.queue.EvictableLen(), "EvictableLen should be 0 after context cancellation") + assert.Nil(t, re.EvictionRegistry().Get("req-1"), "EvictionRegistry should be cleaned up after context cancellation") +} + +func TestRequestEvictor_CleanupCallsEvictorCleanup(t *testing.T) { + t.Parallel() + evictor := NewImmediateResponseEvictor() + re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, evictor) + + ctx := context.Background() + re.PreRequest(ctx, makeInferenceRequest("req-1", -1), makeSchedulingResult()) + + // Evict to create a sync.Once entry in the evictor. + _, _ = re.EvictN(ctx, 1) + + // Complete the request — this should call cleanupRequest which calls evictor.Cleanup. + // After cleanup, the sync.Once entry for "req-1" should be removed. + // We verify this indirectly: if Cleanup wasn't called, the sync.Once map would retain "req-1". + // Re-track and re-evict with a new channel — if the old sync.Once is still there, + // the new channel won't close (sync.Once already fired). + re.PreRequest(ctx, makeInferenceRequest("req-1", -1), makeSchedulingResult()) + + // The ResponseBody from the first eviction fires via defer in real code. + // Simulate it here. + re.ResponseBody(ctx, makeInferenceRequest("req-1", -1), &requestcontrol.Response{EndOfStream: true}, nil) + + // Re-track and evict again. + re.PreRequest(ctx, makeInferenceRequest("req-1", -1), makeSchedulingResult()) + evictCh := re.EvictionRegistry().Get("req-1") + require.NotNil(t, evictCh) + + evicted, err := re.EvictN(ctx, 1) + require.NoError(t, err) + require.Len(t, evicted, 1) + + // New channel should be closed (Cleanup removed old sync.Once). + select { + case <-evictCh: + default: + t.Fatal("eviction channel should be closed — Cleanup should have removed stale sync.Once") + } +} + +func TestRequestEvictor_CleanupWorksWithNoOpEvictor(t *testing.T) { + t.Parallel() + // NoOpEvictor does not implement EvictorWithCleanup. cleanupRequest should not panic. + re := NewRequestEvictor(&testOrdering{}, &acceptAllFilter{}, &NoOpEvictor{}) + + ctx := context.Background() + re.PreRequest(ctx, makeInferenceRequest("req-1", -1), makeSchedulingResult()) + re.ResponseBody(ctx, makeInferenceRequest("req-1", -1), &requestcontrol.Response{EndOfStream: true}, nil) + + assert.Equal(t, 0, re.queue.InFlightLen()) +} + // failingEvictor always returns an error. type failingEvictor struct{} diff --git a/pkg/epp/framework/interface/flowcontrol/eviction.go b/pkg/epp/framework/interface/flowcontrol/eviction.go index 6111a91020..9e2c83af38 100644 --- a/pkg/epp/framework/interface/flowcontrol/eviction.go +++ b/pkg/epp/framework/interface/flowcontrol/eviction.go @@ -39,9 +39,9 @@ type EvictionItem struct { Request *scheduling.InferenceRequest // TargetEndpoint is the metadata of the endpoint serving this request. TargetEndpoint *datalayer.EndpointMetadata - // AbortCh is closed by the aborter to signal the ext_proc Process() goroutine + // EvictCh is closed by the evictor to signal the ext_proc Process() goroutine // to send an ImmediateResponse and terminate the request. - AbortCh chan struct{} + EvictCh chan struct{} } // EvictionOrderingPolicy determines which in-flight request gets evicted first. diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index f7d7560fa3..ce31a7fcad 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -19,7 +19,6 @@ package handlers import ( "bytes" "context" - "errors" "io" "strings" "time" @@ -48,10 +47,10 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/version" ) -// AbortChannelLookup is an optional interface for looking up abort channels by request ID. -// When set on the StreamingServer, the Process() loop will select on the abort channel +// EvictChannelLookup is an optional interface for looking up eviction channels by request ID. +// When set on the StreamingServer, the Process() loop will select on the eviction channel // to support eviction of in-flight requests via ext_proc ImmediateResponse. -type AbortChannelLookup interface { +type EvictChannelLookup interface { Get(requestID string) chan struct{} Deregister(requestID string) } @@ -64,9 +63,9 @@ func NewStreamingServer(datastore Datastore, director Director, parser fwkrh.Par } } -// SetAbortChannelLookup sets the abort channel lookup for eviction support. -func (s *StreamingServer) SetAbortChannelLookup(lookup AbortChannelLookup) { - s.abortLookup = lookup +// SetEvictChannelLookup sets the eviction channel lookup for eviction support. +func (s *StreamingServer) SetEvictChannelLookup(lookup EvictChannelLookup) { + s.evictionLookup = lookup } type Director interface { @@ -83,10 +82,10 @@ type Datastore interface { // Server implements the Envoy external processing server. // https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto type StreamingServer struct { - datastore Datastore - director Director - parser fwkrh.Parser - abortLookup AbortChannelLookup // optional, set for eviction support + datastore Datastore + director Director + parser fwkrh.Parser + evictionLookup EvictChannelLookup // optional, set for eviction support } // RequestContext stores context information during the life time of an HTTP request. @@ -148,8 +147,17 @@ const ( HeaderResponseResponseComplete StreamRequestState = 5 BodyResponseResponsesComplete StreamRequestState = 6 TrailerResponseResponsesComplete StreamRequestState = 7 + // RequestEvicted indicates the request was evicted by flow control. + // The state machine sends an ImmediateResponse(429) to Envoy. + RequestEvicted StreamRequestState = 8 ) +// recvResult holds the result of a srv.Recv() call from the reader goroutine. +type recvResult struct { + req *extProcPb.ProcessingRequest + err error +} + func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { ctx := srv.Context() @@ -182,17 +190,39 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) } var body []byte - var abortCh chan struct{} // set after HandleRequest if eviction is enabled - var abortRequestID string + var evictionRequestID string + + // Start a single reader goroutine for the lifetime of the stream. + // This avoids spawning a new goroutine per message and allows the main loop to + // select on both incoming messages and the eviction channel. + recvCh := make(chan recvResult, 1) + go func() { + for { + req, err := srv.Recv() + select { + case recvCh <- recvResult{req: req, err: err}: + case <-ctx.Done(): + return + } + if err != nil { + return + } + } + }() + + // evictCh starts nil — selecting on a nil channel blocks forever. + // After scheduling, it is set to the eviction channel, dynamically + // enabling eviction listening. + var evictCh chan struct{} // Create error handling var as each request should only report once for // error metrics. This doesn't cover the error "Cannot receive stream request" because // such errors might happen even though response is processed. var err error defer func() { - // Clean up abort channel registration on exit. - if s.abortLookup != nil && abortRequestID != "" { - s.abortLookup.Deregister(abortRequestID) + // Clean up eviction channel registration on exit. + if s.evictionLookup != nil && evictionRequestID != "" { + s.evictionLookup.Deregister(evictionRequestID) } if reqCtx.ResponseStatusCode != "" { metrics.RecordRequestErrCounter(reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.ResponseStatusCode) @@ -214,25 +244,33 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) }() for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - // If an abort channel is set (request dispatched + eviction enabled), - // use non-blocking receive to simultaneously listen for eviction signals. var req *extProcPb.ProcessingRequest var recvErr error - if abortCh != nil { - req, recvErr, err = s.recvOrAbort(srv, abortCh) - if err != nil { - // Eviction triggered — ImmediateResponse already sent. - return nil + // Main select: listen for incoming messages, eviction signals, and context cancellation. + // evictCh is nil until scheduling completes, so the eviction case blocks forever until then. + select { + case result := <-recvCh: + req = result.req + recvErr = result.err + case <-evictCh: + // Skip if the response already completed — sending ImmediateResponse + // after the final body chunk would be a protocol violation. + if reqCtx.ResponseComplete { + logger.V(logutil.DEBUG).Info("Eviction signal received but response already complete, ignoring", + "requestID", evictionRequestID) + evictCh = nil // prevent closed channel from firing repeatedly + continue } - } else { - req, recvErr = srv.Recv() + // Eviction triggered — transition to evicted state and let the state machine send the response. + logger.Info("Request evicted by flow control", "requestID", evictionRequestID) + reqCtx.RequestState = RequestEvicted + if sendErr := reqCtx.updateStateAndSendIfNeeded(srv, logger); sendErr != nil { + return sendErr + } + return nil + case <-ctx.Done(): + return ctx.Err() } if recvErr == io.EOF || status.Code(recvErr) == codes.Canceled { @@ -287,10 +325,12 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) break } - // After scheduling, look up the abort channel for eviction support. - if s.abortLookup != nil { - abortRequestID = reqCtx.Request.Headers[reqcommon.RequestIdHeaderKey] - abortCh = s.abortLookup.Get(abortRequestID) + // After scheduling, look up the eviction channel for eviction support. + // Setting evictCh from nil to a real channel dynamically enables the + // eviction case in the main select. + if s.evictionLookup != nil { + evictionRequestID = reqCtx.Request.Headers[reqcommon.RequestIdHeaderKey] + evictCh = s.evictionLookup.Get(evictionRequestID) } if reqCtx.SchedulingRequest != nil && reqCtx.SchedulingRequest.Body != nil { @@ -375,49 +415,6 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) } } -// recvResult holds the result of a non-blocking srv.Recv() call. -type recvResult struct { - req *extProcPb.ProcessingRequest - err error -} - -// recvOrAbort wraps srv.Recv() in a goroutine and selects between receiving the next -// ext_proc message and an eviction abort signal. If the abort channel is closed, -// it sends an ImmediateResponse(503) to Envoy and returns a sentinel error. -func (s *StreamingServer) recvOrAbort( - srv extProcPb.ExternalProcessor_ProcessServer, - abortCh chan struct{}, -) (*extProcPb.ProcessingRequest, error, error) { - recvCh := make(chan recvResult, 1) - go func() { - req, err := srv.Recv() - recvCh <- recvResult{req: req, err: err} - }() - - select { - case result := <-recvCh: - return result.req, result.err, nil - case <-abortCh: - // Eviction triggered — send ImmediateResponse to reset the upstream connection. - sendErr := srv.Send(&extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_TooManyRequests, - }, - Body: []byte("request evicted by flow control"), - }, - }, - }) - if sendErr != nil { - return nil, nil, sendErr - } - return nil, nil, errEvicted - } -} - -var errEvicted = errors.New("request evicted") - // finishResponse ensures all post-response logic, such as metric recording // and state updates, is executed exactly once for the request lifecycle. func (s *StreamingServer) finishResponse(ctx context.Context, reqCtx *RequestContext, body []byte, modelStreaming bool, setEos bool) { @@ -462,6 +459,22 @@ func rewriteModelName(body []byte, targetModel, incomingModel string) []byte { // Order of requests matter in FULL_DUPLEX_STREAMING. For both request and response, the order of response sent back MUST be: Header->Body->Trailer, with trailer being optional. func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProcessor_ProcessServer, logger logr.Logger) error { loggerTrace := logger.V(logutil.TRACE) + + // Handle eviction — send ImmediateResponse(429) to Envoy to reset the upstream connection. + if r.RequestState == RequestEvicted { + loggerTrace.Info("Sending ImmediateResponse for evicted request") + return srv.Send(&extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: envoyTypePb.StatusCode_TooManyRequests, + }, + Body: []byte("request evicted by flow control"), + }, + }, + }) + } + // No switch statement as we could send multiple responses in one pass. if r.RequestState == RequestReceived && r.reqHeaderResp != nil { loggerTrace.Info("Sending request header response", "obj", r.reqHeaderResp) diff --git a/pkg/epp/handlers/server_abort_test.go b/pkg/epp/handlers/server_abort_test.go index 3c3de1fbd5..009f468b00 100644 --- a/pkg/epp/handlers/server_abort_test.go +++ b/pkg/epp/handlers/server_abort_test.go @@ -18,150 +18,65 @@ package handlers import ( "context" - "io" "testing" - "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" + "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" ) -// mockProcessServer implements ExternalProcessor_ProcessServer for testing recvOrAbort. +// mockProcessServer implements ExternalProcessor_ProcessServer for testing. type mockProcessServer struct { - recvCh chan *extProcPb.ProcessingRequest - sentCh chan *extProcPb.ProcessingResponse - ctx context.Context -} - -func newMockProcessServer(ctx context.Context) *mockProcessServer { - return &mockProcessServer{ - recvCh: make(chan *extProcPb.ProcessingRequest, 1), - sentCh: make(chan *extProcPb.ProcessingResponse, 1), - ctx: ctx, - } + sentResponses []*extProcPb.ProcessingResponse } func (m *mockProcessServer) Send(resp *extProcPb.ProcessingResponse) error { - m.sentCh <- resp + m.sentResponses = append(m.sentResponses, resp) return nil } -func (m *mockProcessServer) Recv() (*extProcPb.ProcessingRequest, error) { - select { - case req := <-m.recvCh: - if req == nil { - return nil, io.EOF - } - return req, nil - case <-m.ctx.Done(): - return nil, m.ctx.Err() - } -} - -func (m *mockProcessServer) SetHeader(metadata.MD) error { return nil } -func (m *mockProcessServer) SendHeader(metadata.MD) error { return nil } -func (m *mockProcessServer) SetTrailer(metadata.MD) {} -func (m *mockProcessServer) Context() context.Context { return m.ctx } -func (m *mockProcessServer) SendMsg(any) error { return nil } -func (m *mockProcessServer) RecvMsg(any) error { return nil } +// Unused methods to satisfy the interface. +func (m *mockProcessServer) Recv() (*extProcPb.ProcessingRequest, error) { return nil, nil } +func (m *mockProcessServer) SetHeader(metadata.MD) error { return nil } +func (m *mockProcessServer) SendHeader(metadata.MD) error { return nil } +func (m *mockProcessServer) SetTrailer(metadata.MD) {} +func (m *mockProcessServer) Context() context.Context { return context.Background() } +func (m *mockProcessServer) SendMsg(any) error { return nil } +func (m *mockProcessServer) RecvMsg(any) error { return nil } -func TestRecvOrAbort_NormalRecv(t *testing.T) { +func TestUpdateStateAndSendIfNeeded_Evicted(t *testing.T) { t.Parallel() - s := &StreamingServer{} - ctx := context.Background() - srv := newMockProcessServer(ctx) - abortCh := make(chan struct{}) - - // Send a request before calling recvOrAbort. - expectedReq := &extProcPb.ProcessingRequest{ - Request: &extProcPb.ProcessingRequest_ResponseBody{ - ResponseBody: &extProcPb.HttpBody{ - Body: []byte("token"), - }, - }, + srv := &mockProcessServer{} + logger := logr.Discard() + + reqCtx := &RequestContext{ + RequestState: RequestEvicted, } - srv.recvCh <- expectedReq - req, recvErr, abortErr := s.recvOrAbort(srv, abortCh) + err := reqCtx.updateStateAndSendIfNeeded(srv, logger) + require.NoError(t, err) - assert.NoError(t, abortErr, "No abort should have occurred") - assert.NoError(t, recvErr, "Recv should succeed") - assert.Equal(t, expectedReq, req, "Should receive the expected request") + require.Len(t, srv.sentResponses, 1, "Should send exactly one response") + ir := srv.sentResponses[0].GetImmediateResponse() + require.NotNil(t, ir, "Response should be an ImmediateResponse") + assert.Equal(t, envoyTypePb.StatusCode_TooManyRequests, ir.Status.Code) + assert.Equal(t, []byte("request evicted by flow control"), ir.Body) } -func TestRecvOrAbort_AbortBeforeRecv(t *testing.T) { +func TestUpdateStateAndSendIfNeeded_NotEvicted(t *testing.T) { t.Parallel() - s := &StreamingServer{} - ctx := context.Background() - srv := newMockProcessServer(ctx) - abortCh := make(chan struct{}) - - // Close the abort channel before sending any request. - close(abortCh) - - req, _, abortErr := s.recvOrAbort(srv, abortCh) - - assert.ErrorIs(t, abortErr, errEvicted, "Should return eviction error") - assert.Nil(t, req, "Request should be nil on abort") - - // Verify that ImmediateResponse was sent. - select { - case sent := <-srv.sentCh: - ir := sent.GetImmediateResponse() - require.NotNil(t, ir, "Should have sent ImmediateResponse") - assert.Equal(t, envoyTypePb.StatusCode_TooManyRequests, ir.Status.Code) - assert.Equal(t, []byte("request evicted by flow control"), ir.Body) - case <-time.After(time.Second): - t.Fatal("Timeout waiting for ImmediateResponse") - } -} + srv := &mockProcessServer{} + logger := logr.Discard() -func TestRecvOrAbort_AbortDuringRecvWait(t *testing.T) { - t.Parallel() - s := &StreamingServer{} - ctx := context.Background() - srv := newMockProcessServer(ctx) - abortCh := make(chan struct{}) - - // Don't send any request — Recv() will block. - // Close abort channel after a short delay. - go func() { - time.Sleep(50 * time.Millisecond) - close(abortCh) - }() - - req, _, abortErr := s.recvOrAbort(srv, abortCh) - - assert.ErrorIs(t, abortErr, errEvicted, "Should return eviction error") - assert.Nil(t, req, "Request should be nil on abort") - - // Verify ImmediateResponse was sent. - select { - case sent := <-srv.sentCh: - ir := sent.GetImmediateResponse() - require.NotNil(t, ir) - assert.Equal(t, envoyTypePb.StatusCode_TooManyRequests, ir.Status.Code) - case <-time.After(time.Second): - t.Fatal("Timeout waiting for ImmediateResponse") + // Normal state — no responses queued, nothing should be sent. + reqCtx := &RequestContext{ + RequestState: RequestReceived, } -} - -func TestRecvOrAbort_RecvEOF(t *testing.T) { - t.Parallel() - s := &StreamingServer{} - ctx := context.Background() - srv := newMockProcessServer(ctx) - abortCh := make(chan struct{}) - - // Send nil to simulate EOF. - srv.recvCh <- nil - - req, recvErr, abortErr := s.recvOrAbort(srv, abortCh) - assert.NoError(t, abortErr, "No abort should have occurred") - assert.ErrorIs(t, recvErr, io.EOF) - assert.Nil(t, req) + err := reqCtx.updateStateAndSendIfNeeded(srv, logger) + require.NoError(t, err) + assert.Empty(t, srv.sentResponses, "Should not send any response for normal state without queued responses") } From a9cbf1c43b0a00bb3ca11834ace8ec9f7c83b4d3 Mon Sep 17 00:00:00 2001 From: RishabhSaini Date: Mon, 13 Apr 2026 15:36:30 -0400 Subject: [PATCH 5/5] fix: fix data read write race to ctx by capturing it Signed-off-by: RishabhSaini --- pkg/epp/handlers/server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index ce31a7fcad..31e7e260b8 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -196,12 +196,15 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) // This avoids spawning a new goroutine per message and allows the main loop to // select on both incoming messages and the eviction channel. recvCh := make(chan recvResult, 1) + // Capture the stream context's Done channel before ctx is reassigned in the main loop. + // This avoids a data race between the reader goroutine reading ctx and the main loop writing it. + streamDone := srv.Context().Done() go func() { for { req, err := srv.Recv() select { case recvCh <- recvResult{req: req, err: err}: - case <-ctx.Done(): + case <-streamDone: return } if err != nil {