Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 0 additions & 45 deletions pkg/epp/flowcontrol/eviction/aborter.go

This file was deleted.

63 changes: 63 additions & 0 deletions pkg/epp/flowcontrol/eviction/eviction_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2026 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package eviction

import "sync"

// EvictionRegistry is a shared registry that maps request IDs to eviction channels.
// It bridges the eviction plugin (which decides what to evict) and the ext_proc Process()
// goroutine (which owns the stream needed to send ImmediateResponse).
//
// Lifecycle:
// - PreRequest: plugin creates an eviction channel and registers it via Register().
// - Process(): after HandleRequest returns, looks up the channel via Get() and selects on it.
// - EvictN: evictor closes the channel via the EvictionItem.EvictCh reference.
// - Process() defer: removes the channel via Deregister().
//
// All methods are goroutine-safe.
type EvictionRegistry struct {
mu sync.RWMutex
channels map[string]chan struct{} // requestID → eviction channel
}

// NewEvictionRegistry creates a new EvictionRegistry.
func NewEvictionRegistry() *EvictionRegistry {
return &EvictionRegistry{
channels: make(map[string]chan struct{}),
}
}

// Register stores an eviction channel for the given request ID.
func (r *EvictionRegistry) Register(requestID string, ch chan struct{}) {
r.mu.Lock()
defer r.mu.Unlock()
r.channels[requestID] = ch
}

// Get returns the eviction channel for the given request ID, or nil if not found.
func (r *EvictionRegistry) Get(requestID string) chan struct{} {
r.mu.RLock()
defer r.mu.RUnlock()
return r.channels[requestID]
}

// Deregister removes the eviction channel for the given request ID.
func (r *EvictionRegistry) Deregister(requestID string) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.channels, requestID)
}
86 changes: 86 additions & 0 deletions pkg/epp/flowcontrol/eviction/eviction_registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2026 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package eviction

import (
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestEvictionRegistry_RegisterAndGet(t *testing.T) {
t.Parallel()
r := NewEvictionRegistry()

ch := make(chan struct{})
r.Register("req-1", ch)

got := r.Get("req-1")
assert.Equal(t, ch, got, "Get should return the registered channel")

assert.Nil(t, r.Get("non-existent"), "Get for unregistered ID should return nil")
}

func TestEvictionRegistry_Deregister(t *testing.T) {
t.Parallel()
r := NewEvictionRegistry()

ch := make(chan struct{})
r.Register("req-1", ch)
r.Deregister("req-1")

assert.Nil(t, r.Get("req-1"), "Get after Deregister should return nil")

// Deregister non-existent should not panic.
r.Deregister("non-existent")
}

func TestEvictionRegistry_Concurrency(t *testing.T) {
t.Parallel()
r := NewEvictionRegistry()

const goroutines = 10
const opsPerGoroutine = 100

var wg sync.WaitGroup
wg.Add(goroutines)

for g := range goroutines {
go func(id int) {
defer wg.Done()
for i := range opsPerGoroutine {
reqID := fmt.Sprintf("req-%d-%d", id, i)
ch := make(chan struct{})

switch i % 3 {
case 0:
r.Register(reqID, ch)
case 1:
r.Register(reqID, ch)
r.Get(reqID)
r.Deregister(reqID)
case 2:
r.Get(reqID)
}
}
}(g)
}

wg.Wait()
}
80 changes: 80 additions & 0 deletions pkg/epp/flowcontrol/eviction/evictor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright 2026 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package eviction

import (
"context"
"fmt"
"sync"

"sigs.k8s.io/controller-runtime/pkg/log"

logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol"
)

// Evictor handles evicting an in-flight request on a model server.
type Evictor interface {
Evict(ctx context.Context, item *flowcontrol.EvictionItem) error
}

// NoOpEvictor logs the eviction but does not evict the request on the model server.
type NoOpEvictor struct{}

func (e *NoOpEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error {
log.FromContext(ctx).V(logutil.DEBUG).Info("Eviction selected request (no-op: eviction mechanism not available)",
"requestID", item.RequestID,
"priority", item.Priority,
"targetURL", item.TargetURL)
return nil
}

// ImmediateResponseEvictor evicts requests by closing the EvictionItem's EvictCh.
// The ext_proc Process() goroutine selects on this channel and sends an ImmediateResponse
// to Envoy when it is closed, causing Envoy to reset the upstream connection to the model server.
type ImmediateResponseEvictor struct {
// closeOnce tracks which channels have been closed to prevent double-close panics.
closeOnce sync.Map // requestID → *sync.Once
Comment thread
RishabhSaini marked this conversation as resolved.
}

// NewImmediateResponseEvictor creates an ImmediateResponseEvictor.
func NewImmediateResponseEvictor() *ImmediateResponseEvictor {
return &ImmediateResponseEvictor{}
}

func (e *ImmediateResponseEvictor) Evict(ctx context.Context, item *flowcontrol.EvictionItem) error {
if item.EvictCh == nil {
return fmt.Errorf("eviction item %s has no eviction channel", item.RequestID)
}

once, _ := e.closeOnce.LoadOrStore(item.RequestID, &sync.Once{})
once.(*sync.Once).Do(func() {
close(item.EvictCh)
})

log.FromContext(ctx).Info("Eviction signal sent",
"requestID", item.RequestID,
"priority", item.Priority,
"targetURL", item.TargetURL)
return nil
}

// Cleanup removes the sync.Once entry for a request ID to prevent unbounded map growth.
// Called when a request completes or is untracked.
func (e *ImmediateResponseEvictor) Cleanup(requestID string) {
e.closeOnce.Delete(requestID)
}
Loading
Loading