Skip to content

Commit bd8b1c3

Browse files
authored
feat(flowcontrol): Add ImmediateResponse abort mechanism for evicting in-flight requests (#2737)
* Add ImmediateResponse abort mechanism for evicting in-flight requests Integrate ext_proc ImmediateResponse as the abort mechanism for evicting dispatched requests from vLLM. When eviction triggers, closing the request's abort channel causes the Process() loop to send ImmediateResponse(503), making Envoy reset the upstream connection. vLLM detects the disconnect and frees KV blocks. - AbortRegistry bridges eviction plugin and ext_proc Process() loop - ImmediateResponseAborter closes abort channels via sync.Once - recvOrAbort wraps srv.Recv() to select on abort signals - Tests cover aborter, registry, recvOrAbort, plugin integration, and concurrent eviction+completion races Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com> * Rename Aborter to Evictor, Plugin to RequestEvictor for naming consistency Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com> * Use 429 (TooManyRequests) instead of 503 for eviction ImmediateResponse Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com> * Refactor Process() loop: single reader goroutine, state machine eviction, evictor cleanup - Replace per-message recvOrAbort goroutine with single reader goroutine for the stream lifetime, using nil channel select pattern - Remove errEvicted sentinel; eviction is a state transition (RequestEvicted) handled by updateStateAndSendIfNeeded, not an error - Move ImmediateResponse send from inline code into the state machine - Add EvictorWithCleanup interface and cleanupRequest helper to prevent ImmediateResponseEvictor.closeOnce map from growing unbounded Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com> * fix: fix data read write race to ctx by capturing it Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com> --------- Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com>
1 parent 1edb604 commit bd8b1c3

10 files changed

Lines changed: 854 additions & 80 deletions

File tree

pkg/epp/flowcontrol/eviction/aborter.go

Lines changed: 0 additions & 45 deletions
This file was deleted.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Copyright 2026 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package eviction
18+
19+
import "sync"
20+
21+
// EvictionRegistry is a shared registry that maps request IDs to eviction channels.
22+
// It bridges the eviction plugin (which decides what to evict) and the ext_proc Process()
23+
// goroutine (which owns the stream needed to send ImmediateResponse).
24+
//
25+
// Lifecycle:
26+
// - PreRequest: plugin creates an eviction channel and registers it via Register().
27+
// - Process(): after HandleRequest returns, looks up the channel via Get() and selects on it.
28+
// - EvictN: evictor closes the channel via the EvictionItem.EvictCh reference.
29+
// - Process() defer: removes the channel via Deregister().
30+
//
31+
// All methods are goroutine-safe.
32+
type EvictionRegistry struct {
33+
mu sync.RWMutex
34+
channels map[string]chan struct{} // requestID → eviction channel
35+
}
36+
37+
// NewEvictionRegistry creates a new EvictionRegistry.
38+
func NewEvictionRegistry() *EvictionRegistry {
39+
return &EvictionRegistry{
40+
channels: make(map[string]chan struct{}),
41+
}
42+
}
43+
44+
// Register stores an eviction channel for the given request ID.
45+
func (r *EvictionRegistry) Register(requestID string, ch chan struct{}) {
46+
r.mu.Lock()
47+
defer r.mu.Unlock()
48+
r.channels[requestID] = ch
49+
}
50+
51+
// Get returns the eviction channel for the given request ID, or nil if not found.
52+
func (r *EvictionRegistry) Get(requestID string) chan struct{} {
53+
r.mu.RLock()
54+
defer r.mu.RUnlock()
55+
return r.channels[requestID]
56+
}
57+
58+
// Deregister removes the eviction channel for the given request ID.
59+
func (r *EvictionRegistry) Deregister(requestID string) {
60+
r.mu.Lock()
61+
defer r.mu.Unlock()
62+
delete(r.channels, requestID)
63+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright 2026 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package eviction
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"testing"
23+
24+
"github.com/stretchr/testify/assert"
25+
)
26+
27+
func TestEvictionRegistry_RegisterAndGet(t *testing.T) {
28+
t.Parallel()
29+
r := NewEvictionRegistry()
30+
31+
ch := make(chan struct{})
32+
r.Register("req-1", ch)
33+
34+
got := r.Get("req-1")
35+
assert.Equal(t, ch, got, "Get should return the registered channel")
36+
37+
assert.Nil(t, r.Get("non-existent"), "Get for unregistered ID should return nil")
38+
}
39+
40+
func TestEvictionRegistry_Deregister(t *testing.T) {
41+
t.Parallel()
42+
r := NewEvictionRegistry()
43+
44+
ch := make(chan struct{})
45+
r.Register("req-1", ch)
46+
r.Deregister("req-1")
47+
48+
assert.Nil(t, r.Get("req-1"), "Get after Deregister should return nil")
49+
50+
// Deregister non-existent should not panic.
51+
r.Deregister("non-existent")
52+
}
53+
54+
func TestEvictionRegistry_Concurrency(t *testing.T) {
55+
t.Parallel()
56+
r := NewEvictionRegistry()
57+
58+
const goroutines = 10
59+
const opsPerGoroutine = 100
60+
61+
var wg sync.WaitGroup
62+
wg.Add(goroutines)
63+
64+
for g := range goroutines {
65+
go func(id int) {
66+
defer wg.Done()
67+
for i := range opsPerGoroutine {
68+
reqID := fmt.Sprintf("req-%d-%d", id, i)
69+
ch := make(chan struct{})
70+
71+
switch i % 3 {
72+
case 0:
73+
r.Register(reqID, ch)
74+
case 1:
75+
r.Register(reqID, ch)
76+
r.Get(reqID)
77+
r.Deregister(reqID)
78+
case 2:
79+
r.Get(reqID)
80+
}
81+
}
82+
}(g)
83+
}
84+
85+
wg.Wait()
86+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
Copyright 2026 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package eviction
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
24+
"sigs.k8s.io/controller-runtime/pkg/log"
25+
26+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol"
28+
)
29+
30+
// Evictor handles evicting an in-flight request on a model server.
31+
type Evictor interface {
32+
Evict(ctx context.Context, item *flowcontrol.EvictionItem) error
33+
}
34+
35+
// NoOpEvictor logs the eviction but does not evict the request on the model server.
36+
type NoOpEvictor struct{}
37+
38+
func (e *NoOpEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error {
39+
log.FromContext(ctx).V(logutil.DEBUG).Info("Eviction selected request (no-op: eviction mechanism not available)",
40+
"requestID", item.RequestID,
41+
"priority", item.Priority,
42+
"targetURL", item.TargetURL)
43+
return nil
44+
}
45+
46+
// ImmediateResponseEvictor evicts requests by closing the EvictionItem's EvictCh.
47+
// The ext_proc Process() goroutine selects on this channel and sends an ImmediateResponse
48+
// to Envoy when it is closed, causing Envoy to reset the upstream connection to the model server.
49+
type ImmediateResponseEvictor struct {
50+
// closeOnce tracks which channels have been closed to prevent double-close panics.
51+
closeOnce sync.Map // requestID → *sync.Once
52+
}
53+
54+
// NewImmediateResponseEvictor creates an ImmediateResponseEvictor.
55+
func NewImmediateResponseEvictor() *ImmediateResponseEvictor {
56+
return &ImmediateResponseEvictor{}
57+
}
58+
59+
func (e *ImmediateResponseEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error {
60+
if item.EvictCh == nil {
61+
return fmt.Errorf("eviction item %s has no eviction channel", item.RequestID)
62+
}
63+
64+
once, _ := e.closeOnce.LoadOrStore(item.RequestID, &sync.Once{})
65+
once.(*sync.Once).Do(func() {
66+
close(item.EvictCh)
67+
})
68+
69+
log.FromContext(ctx).Info("Eviction signal sent",
70+
"requestID", item.RequestID,
71+
"priority", item.Priority,
72+
"targetURL", item.TargetURL)
73+
return nil
74+
}
75+
76+
// Cleanup removes the sync.Once entry for a request ID to prevent unbounded map growth.
77+
// Called when a request completes or is untracked.
78+
func (e *ImmediateResponseEvictor) Cleanup(requestID string) {
79+
e.closeOnce.Delete(requestID)
80+
}

0 commit comments

Comments
 (0)