Skip to content

Commit 200b369

Browse files
committed
rename Aborter→Evictor, Plugin→RequestEvictor,
use 429 status code, add ImmediateResponseEvictor.Cleanup() Signed-off-by: RishabhSaini <rishabhsaini01@gmail.com>
1 parent e88a772 commit 200b369

6 files changed

Lines changed: 114 additions & 109 deletions

File tree

pkg/epp/flowcontrol/eviction/aborter.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,49 +27,54 @@ import (
2727
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol"
2828
)
2929

30-
// Aborter handles aborting an in-flight request on a model server.
31-
type Aborter interface {
32-
Abort(ctx context.Context, item *flowcontrol.EvictionItem) error
30+
// Evictor handles evicting an in-flight request on a model server.
31+
type Evictor interface {
32+
Evict(ctx context.Context, item *flowcontrol.EvictionItem) error
3333
}
3434

35-
// NoOpAborter logs the eviction but does not abort the request on the model server.
36-
type NoOpAborter struct{}
35+
// NoOpEvictor logs the eviction but does not evict the request on the model server.
36+
type NoOpEvictor struct{}
3737

38-
func (a *NoOpAborter) Abort(ctx context.Context, item *flowcontrol.EvictionItem) error {
39-
log.FromContext(ctx).V(logutil.DEBUG).Info("Eviction selected request for abort (no-op: abort mechanism not available)",
38+
func (e *NoOpEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error {
39+
log.FromContext(ctx).V(logutil.DEBUG).Info("Eviction selected request (no-op: eviction mechanism not available)",
4040
"requestID", item.RequestID,
4141
"priority", item.Priority,
4242
"targetURL", item.TargetURL)
4343
return nil
4444
}
4545

46-
// ImmediateResponseAborter aborts requests by closing the EvictionItem's AbortCh.
46+
// ImmediateResponseEvictor evicts requests by closing the EvictionItem's AbortCh.
4747
// The ext_proc Process() goroutine selects on this channel and sends an ImmediateResponse
4848
// to Envoy when it is closed, causing Envoy to reset the upstream connection to the model server.
49-
type ImmediateResponseAborter struct {
49+
type ImmediateResponseEvictor struct {
5050
// closeOnce tracks which channels have been closed to prevent double-close panics.
5151
closeOnce sync.Map // requestID → *sync.Once
5252
}
5353

54-
// NewImmediateResponseAborter creates an ImmediateResponseAborter.
55-
func NewImmediateResponseAborter() *ImmediateResponseAborter {
56-
return &ImmediateResponseAborter{}
54+
// NewImmediateResponseEvictor creates an ImmediateResponseEvictor.
55+
func NewImmediateResponseEvictor() *ImmediateResponseEvictor {
56+
return &ImmediateResponseEvictor{}
5757
}
5858

59-
func (a *ImmediateResponseAborter) Abort(ctx context.Context, item *flowcontrol.EvictionItem) error {
59+
func (e *ImmediateResponseEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error {
6060
if item.AbortCh == nil {
6161
return fmt.Errorf("eviction item %s has no abort channel", item.RequestID)
6262
}
6363

64-
// Use sync.Once to safely close the channel exactly once.
65-
once, _ := a.closeOnce.LoadOrStore(item.RequestID, &sync.Once{})
64+
once, _ := e.closeOnce.LoadOrStore(item.RequestID, &sync.Once{})
6665
once.(*sync.Once).Do(func() {
6766
close(item.AbortCh)
6867
})
6968

70-
log.FromContext(ctx).Info("Abort signal sent",
69+
log.FromContext(ctx).Info("Eviction signal sent",
7170
"requestID", item.RequestID,
7271
"priority", item.Priority,
7372
"targetURL", item.TargetURL)
7473
return nil
7574
}
75+
76+
// Cleanup removes the sync.Once entry for a request ID to prevent unbounded map growth.
77+
// Called when a request completes or is untracked.
78+
func (e *ImmediateResponseEvictor) Cleanup(requestID string) {
79+
e.closeOnce.Delete(requestID)
80+
}

pkg/epp/flowcontrol/eviction/aborter_test.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,68 +26,82 @@ import (
2626
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol"
2727
)
2828

29-
func TestImmediateResponseAborter_ClosesChannel(t *testing.T) {
29+
func TestImmediateResponseEvictor_ClosesChannel(t *testing.T) {
3030
t.Parallel()
31-
aborter := NewImmediateResponseAborter()
31+
evictor := NewImmediateResponseEvictor()
3232

3333
abortCh := make(chan struct{})
3434
item := &flowcontrol.EvictionItem{
3535
RequestID: "req-1",
3636
AbortCh: abortCh,
3737
}
3838

39-
err := aborter.Abort(context.Background(), item)
39+
err := evictor.Evict(context.Background(), item)
4040
require.NoError(t, err)
4141

42-
// Channel should be closed.
4342
select {
4443
case <-abortCh:
45-
// success — channel is closed
4644
default:
47-
t.Fatal("abort channel should be closed after Abort()")
45+
t.Fatal("abort channel should be closed after Evict()")
4846
}
4947
}
5048

51-
func TestImmediateResponseAborter_DoubleAbortSafe(t *testing.T) {
49+
func TestImmediateResponseEvictor_DoubleEvictSafe(t *testing.T) {
5250
t.Parallel()
53-
aborter := NewImmediateResponseAborter()
51+
evictor := NewImmediateResponseEvictor()
5452

5553
abortCh := make(chan struct{})
5654
item := &flowcontrol.EvictionItem{
5755
RequestID: "req-1",
5856
AbortCh: abortCh,
5957
}
6058

61-
// First abort should succeed.
62-
err := aborter.Abort(context.Background(), item)
59+
err := evictor.Evict(context.Background(), item)
6360
require.NoError(t, err)
6461

65-
// Second abort on same request should not panic.
66-
err = aborter.Abort(context.Background(), item)
62+
// Second evict on same request should not panic.
63+
err = evictor.Evict(context.Background(), item)
6764
require.NoError(t, err)
6865
}
6966

70-
func TestImmediateResponseAborter_NilChannel(t *testing.T) {
67+
func TestImmediateResponseEvictor_NilChannel(t *testing.T) {
7168
t.Parallel()
72-
aborter := NewImmediateResponseAborter()
69+
evictor := NewImmediateResponseEvictor()
7370

7471
item := &flowcontrol.EvictionItem{
7572
RequestID: "req-1",
7673
AbortCh: nil,
7774
}
7875

79-
err := aborter.Abort(context.Background(), item)
80-
assert.Error(t, err, "Abort with nil channel should return error")
76+
err := evictor.Evict(context.Background(), item)
77+
assert.Error(t, err, "Evict with nil channel should return error")
8178
}
8279

83-
func TestNoOpAborter(t *testing.T) {
80+
func TestImmediateResponseEvictor_Cleanup(t *testing.T) {
8481
t.Parallel()
85-
aborter := &NoOpAborter{}
82+
evictor := NewImmediateResponseEvictor()
83+
84+
abortCh := make(chan struct{})
85+
item := &flowcontrol.EvictionItem{
86+
RequestID: "req-1",
87+
AbortCh: abortCh,
88+
}
89+
90+
_ = evictor.Evict(context.Background(), item)
91+
evictor.Cleanup("req-1")
92+
93+
// Cleanup non-existent should not panic.
94+
evictor.Cleanup("non-existent")
95+
}
96+
97+
func TestNoOpEvictor(t *testing.T) {
98+
t.Parallel()
99+
evictor := &NoOpEvictor{}
86100

87101
item := &flowcontrol.EvictionItem{
88102
RequestID: "req-1",
89103
}
90104

91-
err := aborter.Abort(context.Background(), item)
92-
assert.NoError(t, err, "NoOpAborter should always succeed")
105+
err := evictor.Evict(context.Background(), item)
106+
assert.NoError(t, err, "NoOpEvictor should always succeed")
93107
}

pkg/epp/flowcontrol/eviction/plugin.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,42 +32,42 @@ import (
3232
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"
3333
)
3434

35-
var _ requestcontrol.PreRequest = &Plugin{}
36-
var _ requestcontrol.ResponseBody = &Plugin{}
35+
var _ requestcontrol.PreRequest = &RequestEvictor{}
36+
var _ requestcontrol.ResponseBody = &RequestEvictor{}
3737

38-
// Plugin tracks in-flight requests via RequestControl hooks and provides eviction capability.
39-
type Plugin struct {
38+
// RequestEvictor tracks in-flight requests via RequestControl hooks and provides eviction capability.
39+
type RequestEvictor struct {
4040
queue *EvictionQueue
41-
aborter Aborter
41+
evictor Evictor
4242
abortRegistry *AbortRegistry
4343
}
4444

45-
// NewPlugin creates an eviction plugin with the given policies and aborter.
46-
func NewPlugin(
45+
// NewRequestEvictor creates a RequestEvictor with the given policies and evictor.
46+
func NewRequestEvictor(
4747
ordering flowcontrol.EvictionOrderingPolicy,
4848
filter flowcontrol.EvictionFilterPolicy,
49-
aborter Aborter,
50-
) *Plugin {
51-
return &Plugin{
49+
evictor Evictor,
50+
) *RequestEvictor {
51+
return &RequestEvictor{
5252
queue: NewEvictionQueue(ordering, filter),
53-
aborter: aborter,
53+
evictor: evictor,
5454
abortRegistry: NewAbortRegistry(),
5555
}
5656
}
5757

5858
// AbortRegistry returns the shared abort registry.
5959
// The ext_proc Process() goroutine uses this to look up abort channels for dispatched requests.
60-
func (p *Plugin) AbortRegistry() *AbortRegistry {
60+
func (p *RequestEvictor) AbortRegistry() *AbortRegistry {
6161
return p.abortRegistry
6262
}
6363

64-
func (p *Plugin) TypedName() plugin.TypedName {
64+
func (p *RequestEvictor) TypedName() plugin.TypedName {
6565
return plugin.TypedName{Type: "EvictionPlugin", Name: "eviction"}
6666
}
6767

6868
// PreRequest is called after scheduling, before the request reaches the model server.
6969
// It tracks the request and, if the filter policy accepts it, adds it to the eviction queue.
70-
func (p *Plugin) PreRequest(
70+
func (p *RequestEvictor) PreRequest(
7171
ctx context.Context,
7272
request *scheduling.LLMRequest,
7373
result *scheduling.SchedulingResult,
@@ -121,7 +121,7 @@ func (p *Plugin) PreRequest(
121121

122122
// ResponseBody is called for every response data chunk (streaming) or once (non-streaming).
123123
// On the final call (EndOfStream == true), it removes the request from tracking and the eviction queue.
124-
func (p *Plugin) ResponseBody(
124+
func (p *RequestEvictor) ResponseBody(
125125
ctx context.Context,
126126
request *scheduling.LLMRequest,
127127
response *requestcontrol.Response,
@@ -151,7 +151,7 @@ func (p *Plugin) ResponseBody(
151151
// Each request is only removed from tracking after a successful abort. If the abort fails,
152152
// the request remains in the queue for a future eviction attempt.
153153
// Returns the request IDs that were successfully aborted.
154-
func (p *Plugin) EvictN(ctx context.Context, n int) ([]string, error) {
154+
func (p *RequestEvictor) EvictN(ctx context.Context, n int) ([]string, error) {
155155
logger := log.FromContext(ctx)
156156
aborted := make([]string, 0, n)
157157

@@ -162,8 +162,8 @@ func (p *Plugin) EvictN(ctx context.Context, n int) ([]string, error) {
162162
}
163163
item := items[0]
164164

165-
if err := p.aborter.Abort(ctx, item); err != nil {
166-
logger.Error(err, "Failed to abort request, re-tracking", "requestID", item.RequestID, "targetURL", item.TargetURL)
165+
if err := p.evictor.Evict(ctx, item); err != nil {
166+
logger.Error(err, "Failed to evict request, re-tracking", "requestID", item.RequestID, "targetURL", item.TargetURL)
167167
p.queue.Track(item)
168168
continue
169169
}
@@ -177,6 +177,6 @@ func (p *Plugin) EvictN(ctx context.Context, n int) ([]string, error) {
177177
}
178178

179179
// Stats returns the current in-flight and evictable request counts.
180-
func (p *Plugin) Stats() (inFlight int, evictable int) {
180+
func (p *RequestEvictor) Stats() (inFlight int, evictable int) {
181181
return p.queue.InFlightLen(), p.queue.EvictableLen()
182182
}

0 commit comments

Comments
 (0)