-
Notifications
You must be signed in to change notification settings - Fork 294
feat(flowcontrol): Add ImmediateResponse abort mechanism for evicting in-flight requests #2737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
k8s-ci-robot
merged 5 commits into
kubernetes-sigs:main
from
RishabhSaini:evictImmediate
Apr 13, 2026
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
ee57d3b
Add ImmediateResponse abort mechanism for evicting in-flight requests
RishabhSaini b1b12b4
Rename Aborter to Evictor, Plugin to RequestEvictor
RishabhSaini dc45109
Use 429 (TooManyRequests) instead of 503 for eviction ImmediateResponse
RishabhSaini 7e6a8ce
Refactor Process() loop: single reader goroutine, state machine evict…
RishabhSaini a9cbf1c
fix: fix data read write race to ctx by capturing it
RishabhSaini File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| /* | ||
| Copyright 2026 The Kubernetes Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package eviction | ||
|
|
||
| import "sync" | ||
|
|
||
| // EvictionRegistry is a shared registry that maps request IDs to eviction channels. | ||
| // It bridges the eviction plugin (which decides what to evict) and the ext_proc Process() | ||
| // goroutine (which owns the stream needed to send ImmediateResponse). | ||
| // | ||
| // Lifecycle: | ||
| // - PreRequest: plugin creates an eviction channel and registers it via Register(). | ||
| // - Process(): after HandleRequest returns, looks up the channel via Get() and selects on it. | ||
| // - EvictN: evictor closes the channel via the EvictionItem.EvictCh reference. | ||
| // - Process() defer: removes the channel via Deregister(). | ||
| // | ||
| // All methods are goroutine-safe. | ||
| type EvictionRegistry struct { | ||
| mu sync.RWMutex | ||
| channels map[string]chan struct{} // requestID → eviction channel | ||
| } | ||
|
|
||
| // NewEvictionRegistry creates a new EvictionRegistry. | ||
| func NewEvictionRegistry() *EvictionRegistry { | ||
| return &EvictionRegistry{ | ||
| channels: make(map[string]chan struct{}), | ||
| } | ||
| } | ||
|
|
||
| // Register stores an eviction channel for the given request ID. | ||
| func (r *EvictionRegistry) Register(requestID string, ch chan struct{}) { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
| r.channels[requestID] = ch | ||
| } | ||
|
|
||
| // Get returns the eviction channel for the given request ID, or nil if not found. | ||
| func (r *EvictionRegistry) Get(requestID string) chan struct{} { | ||
| r.mu.RLock() | ||
| defer r.mu.RUnlock() | ||
| return r.channels[requestID] | ||
| } | ||
|
|
||
| // Deregister removes the eviction channel for the given request ID. | ||
| func (r *EvictionRegistry) Deregister(requestID string) { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
| delete(r.channels, requestID) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| Copyright 2026 The Kubernetes Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package eviction | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "sync" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| ) | ||
|
|
||
| func TestEvictionRegistry_RegisterAndGet(t *testing.T) { | ||
| t.Parallel() | ||
| r := NewEvictionRegistry() | ||
|
|
||
| ch := make(chan struct{}) | ||
| r.Register("req-1", ch) | ||
|
|
||
| got := r.Get("req-1") | ||
| assert.Equal(t, ch, got, "Get should return the registered channel") | ||
|
|
||
| assert.Nil(t, r.Get("non-existent"), "Get for unregistered ID should return nil") | ||
| } | ||
|
|
||
| func TestEvictionRegistry_Deregister(t *testing.T) { | ||
| t.Parallel() | ||
| r := NewEvictionRegistry() | ||
|
|
||
| ch := make(chan struct{}) | ||
| r.Register("req-1", ch) | ||
| r.Deregister("req-1") | ||
|
|
||
| assert.Nil(t, r.Get("req-1"), "Get after Deregister should return nil") | ||
|
|
||
| // Deregister non-existent should not panic. | ||
| r.Deregister("non-existent") | ||
| } | ||
|
|
||
| func TestEvictionRegistry_Concurrency(t *testing.T) { | ||
| t.Parallel() | ||
| r := NewEvictionRegistry() | ||
|
|
||
| const goroutines = 10 | ||
| const opsPerGoroutine = 100 | ||
|
|
||
| var wg sync.WaitGroup | ||
| wg.Add(goroutines) | ||
|
|
||
| for g := range goroutines { | ||
| go func(id int) { | ||
| defer wg.Done() | ||
| for i := range opsPerGoroutine { | ||
| reqID := fmt.Sprintf("req-%d-%d", id, i) | ||
| ch := make(chan struct{}) | ||
|
|
||
| switch i % 3 { | ||
| case 0: | ||
| r.Register(reqID, ch) | ||
| case 1: | ||
| r.Register(reqID, ch) | ||
| r.Get(reqID) | ||
| r.Deregister(reqID) | ||
| case 2: | ||
| r.Get(reqID) | ||
| } | ||
| } | ||
| }(g) | ||
| } | ||
|
|
||
| wg.Wait() | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| /* | ||
| Copyright 2026 The Kubernetes Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package eviction | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
|
|
||
| "sigs.k8s.io/controller-runtime/pkg/log" | ||
|
|
||
| logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging" | ||
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol" | ||
| ) | ||
|
|
||
| // Evictor handles evicting an in-flight request on a model server. | ||
| type Evictor interface { | ||
| Evict(ctx context.Context, item *flowcontrol.EvictionItem) error | ||
| } | ||
|
|
||
| // NoOpEvictor logs the eviction but does not evict the request on the model server. | ||
| type NoOpEvictor struct{} | ||
|
|
||
| func (e *NoOpEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error { | ||
| log.FromContext(ctx).V(logutil.DEBUG).Info("Eviction selected request (no-op: eviction mechanism not available)", | ||
| "requestID", item.RequestID, | ||
| "priority", item.Priority, | ||
| "targetURL", item.TargetURL) | ||
| return nil | ||
| } | ||
|
|
||
| // ImmediateResponseEvictor evicts requests by closing the EvictionItem's EvictCh. | ||
| // The ext_proc Process() goroutine selects on this channel and sends an ImmediateResponse | ||
| // to Envoy when it is closed, causing Envoy to reset the upstream connection to the model server. | ||
| type ImmediateResponseEvictor struct { | ||
| // closeOnce tracks which channels have been closed to prevent double-close panics. | ||
| closeOnce sync.Map // requestID → *sync.Once | ||
| } | ||
|
|
||
| // NewImmediateResponseEvictor creates an ImmediateResponseEvictor. | ||
| func NewImmediateResponseEvictor() *ImmediateResponseEvictor { | ||
| return &ImmediateResponseEvictor{} | ||
| } | ||
|
|
||
| func (e *ImmediateResponseEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error { | ||
| if item.EvictCh == nil { | ||
| return fmt.Errorf("eviction item %s has no eviction channel", item.RequestID) | ||
| } | ||
|
|
||
| once, _ := e.closeOnce.LoadOrStore(item.RequestID, &sync.Once{}) | ||
| once.(*sync.Once).Do(func() { | ||
| close(item.EvictCh) | ||
| }) | ||
|
|
||
| log.FromContext(ctx).Info("Eviction signal sent", | ||
| "requestID", item.RequestID, | ||
| "priority", item.Priority, | ||
| "targetURL", item.TargetURL) | ||
| return nil | ||
| } | ||
|
|
||
| // Cleanup removes the sync.Once entry for a request ID to prevent unbounded map growth. | ||
| // Called when a request completes or is untracked. | ||
| func (e *ImmediateResponseEvictor) Cleanup(requestID string) { | ||
| e.closeOnce.Delete(requestID) | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.