Skip to content

Commit 63708b6

Browse files
committed
Add support for evicting multiple queries per cycle
Signed-off-by: Essam Eldaly <eeldaly@amazon.com>
1 parent 5ddc12b commit 63708b6

8 files changed

Lines changed: 149 additions & 78 deletions

File tree

pkg/configs/query_protection.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ type Threshold struct {
3636

3737
// EvictionConfig configures the resource-based query evictor.
3838
type EvictionConfig struct {
39-
Threshold Threshold `yaml:"threshold"`
40-
CheckInterval time.Duration `yaml:"check_interval"`
41-
CooldownPeriod int `yaml:"cooldown_period"`
42-
EvictionMetric string `yaml:"eviction_metric"`
43-
MinQueryAge time.Duration `yaml:"min_query_age"`
39+
Threshold Threshold `yaml:"threshold"`
40+
CheckInterval time.Duration `yaml:"check_interval"`
41+
CooldownPeriod int `yaml:"cooldown_period"`
42+
EvictionMetric string `yaml:"eviction_metric"`
43+
MinQueryAge time.Duration `yaml:"min_query_age"`
44+
MaxEvictionsPerCycle int `yaml:"max_evictions_per_cycle"`
4445
}
4546

4647
// Enabled returns true when at least one eviction threshold is greater than 0.
@@ -60,6 +61,7 @@ func (cfg *QueryProtection) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix stri
6061
f.IntVar(&cfg.Eviction.CooldownPeriod, prefix+"query-protection.eviction.cooldown-period", 3, "EXPERIMENTAL: Number of check intervals to wait after an eviction before evicting again.")
6162
f.StringVar(&cfg.Eviction.EvictionMetric, prefix+"query-protection.eviction.eviction-metric", "fetched_samples", "EXPERIMENTAL: The query metric used to determine the heaviest query for eviction. Supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes.")
6263
f.DurationVar(&cfg.Eviction.MinQueryAge, prefix+"query-protection.eviction.min-query-age", 10*time.Second, "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.")
64+
f.IntVar(&cfg.Eviction.MaxEvictionsPerCycle, prefix+"query-protection.eviction.max-evictions-per-cycle", 1, "EXPERIMENTAL: Maximum number of queries to evict in a single check cycle when resource thresholds are breached.")
6365
}
6466

6567
func (cfg *QueryProtection) Validate(monitoredResources flagext.StringSliceCSV) error {
@@ -104,6 +106,10 @@ func (cfg *QueryProtection) Validate(monitoredResources flagext.StringSliceCSV)
104106
return fmt.Errorf("unrecognized eviction_metric %q; supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes", cfg.Eviction.EvictionMetric)
105107
}
106108

109+
if cfg.Eviction.MaxEvictionsPerCycle < 1 {
110+
return errors.New("eviction max_evictions_per_cycle must be >= 1")
111+
}
112+
107113
if evThreshold.CPUUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.CPU)) {
108114
return errors.New("monitored_resources config must include \"cpu\" when eviction cpu threshold is set")
109115
}

pkg/configs/query_protection_test.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,11 @@ func Test_EvictionConfig_Validation(t *testing.T) {
9999
validBase := func() QueryProtection {
100100
return QueryProtection{
101101
Eviction: EvictionConfig{
102-
Threshold: Threshold{CPUUtilization: 0.8, HeapUtilization: 0.85},
103-
CheckInterval: 1 * time.Second,
104-
CooldownPeriod: 3,
105-
EvictionMetric: "fetched_samples",
102+
Threshold: Threshold{CPUUtilization: 0.8, HeapUtilization: 0.85},
103+
CheckInterval: 1 * time.Second,
104+
CooldownPeriod: 3,
105+
EvictionMetric: "fetched_samples",
106+
MaxEvictionsPerCycle: 1,
106107
},
107108
}
108109
}
@@ -112,17 +113,18 @@ func Test_EvictionConfig_Validation(t *testing.T) {
112113
monitoredResources []string
113114
err string
114115
}{
115-
"valid config passes": {func(qp *QueryProtection) {}, []string{"cpu", "heap"}, ""},
116-
"cpu > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = 1.5 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"},
117-
"cpu < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = -0.1 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"},
118-
"heap > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = 2.0 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"},
119-
"heap < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = -0.5 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"},
120-
"check_interval 0 fails": {func(qp *QueryProtection) { qp.Eviction.CheckInterval = 0 }, []string{"cpu", "heap"}, "eviction check_interval must be greater than 0 when eviction is enabled"},
121-
"cooldown < 0 fails": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = -1 }, []string{"cpu", "heap"}, "eviction cooldown_period must be >= 0"},
122-
"unknown metric fails": {func(qp *QueryProtection) { qp.Eviction.EvictionMetric = "unknown" }, []string{"cpu", "heap"}, `unrecognized eviction_metric "unknown"; supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes`},
123-
"cpu without monitored fails": {func(qp *QueryProtection) {}, []string{"heap"}, `monitored_resources config must include "cpu" when eviction cpu threshold is set`},
124-
"heap without monitored fails": {func(qp *QueryProtection) {}, []string{"cpu"}, `monitored_resources config must include "heap" when eviction heap threshold is set`},
125-
"cooldown 0 is valid": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = 0 }, []string{"cpu", "heap"}, ""},
116+
"valid config passes": {func(qp *QueryProtection) {}, []string{"cpu", "heap"}, ""},
117+
"cpu > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = 1.5 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"},
118+
"cpu < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = -0.1 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"},
119+
"heap > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = 2.0 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"},
120+
"heap < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = -0.5 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"},
121+
"check_interval 0 fails": {func(qp *QueryProtection) { qp.Eviction.CheckInterval = 0 }, []string{"cpu", "heap"}, "eviction check_interval must be greater than 0 when eviction is enabled"},
122+
"cooldown < 0 fails": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = -1 }, []string{"cpu", "heap"}, "eviction cooldown_period must be >= 0"},
123+
"unknown metric fails": {func(qp *QueryProtection) { qp.Eviction.EvictionMetric = "unknown" }, []string{"cpu", "heap"}, `unrecognized eviction_metric "unknown"; supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes`},
124+
"cpu without monitored fails": {func(qp *QueryProtection) {}, []string{"heap"}, `monitored_resources config must include "cpu" when eviction cpu threshold is set`},
125+
"heap without monitored fails": {func(qp *QueryProtection) {}, []string{"cpu"}, `monitored_resources config must include "heap" when eviction heap threshold is set`},
126+
"cooldown 0 is valid": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = 0 }, []string{"cpu", "heap"}, ""},
127+
"max_evictions_per_cycle < 1 fails": {func(qp *QueryProtection) { qp.Eviction.MaxEvictionsPerCycle = 0 }, []string{"cpu", "heap"}, "eviction max_evictions_per_cycle must be >= 1"},
126128
"disabled skips interval check": {func(qp *QueryProtection) {
127129
qp.Eviction.Threshold = Threshold{}
128130
qp.Eviction.CheckInterval = 0
@@ -154,5 +156,6 @@ func Test_RegisterFlagsWithPrefix_EvictionDefaults(t *testing.T) {
154156
assert.Equal(t, 1*time.Second, cfg.Eviction.CheckInterval)
155157
assert.Equal(t, 3, cfg.Eviction.CooldownPeriod)
156158
assert.Equal(t, "fetched_samples", cfg.Eviction.EvictionMetric)
159+
assert.Equal(t, 1, cfg.Eviction.MaxEvictionsPerCycle)
157160
assert.False(t, cfg.Eviction.Enabled())
158161
}

pkg/querier/querier_eviction_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,10 @@ func TestQuerier_EvictionIntegration(t *testing.T) {
8585
CPUUtilization: tc.evictionCPU,
8686
HeapUtilization: tc.evictionHeap,
8787
},
88-
CheckInterval: 1 * time.Second,
89-
CooldownPeriod: 3,
90-
EvictionMetric: "fetched_samples",
88+
CheckInterval: 1 * time.Second,
89+
CooldownPeriod: 3,
90+
EvictionMetric: "fetched_samples",
91+
MaxEvictionsPerCycle: 1,
9192
},
9293
}
9394

pkg/util/queryeviction/engine_wrapper_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ func TestEngineWrapper_EvictedQueryReturnsErrQueryEvicted(t *testing.T) {
118118
mq := &mockQuery{
119119
execFunc: func(ctx context.Context) *promql.Result {
120120
// Simulate eviction: find the registered query and cancel it.
121-
heaviest := registry.FindHeaviest(0)
122-
require.NotNil(t, heaviest, "query should be registered during Exec")
123-
heaviest.Cancel() // This cancels the child context, simulating evictor behavior.
121+
victims := registry.FindHeaviest(1, 0)
122+
require.Len(t, victims, 1, "query should be registered during Exec")
123+
victims[0].Cancel() // This cancels the child context, simulating evictor behavior.
124124

125125
// The inner query would see a cancelled context and return an error.
126126
return &promql.Result{Err: context.Canceled}

pkg/util/queryeviction/evictor.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -82,31 +82,31 @@ func (e *QueryEvictor) running(ctx context.Context) error {
8282
continue // no breach
8383
}
8484

85-
// Find the heaviest running query.
86-
heaviest := e.registry.FindHeaviest(e.cfg.MinQueryAge)
87-
if heaviest == nil {
85+
// Find the heaviest running queries (up to MaxEvictionsPerCycle).
86+
victims := e.registry.FindHeaviest(e.cfg.MaxEvictionsPerCycle, e.cfg.MinQueryAge)
87+
if len(victims) == 0 {
8888
continue // no running queries to evict
8989
}
9090

91-
// Evict the heaviest query.
92-
metricValue := e.registry.metric(heaviest.Stats)
93-
heaviest.Cancel()
94-
95-
// Log the eviction.
96-
level.Warn(e.logger).Log(
97-
"msg", "evicting heaviest query due to resource pressure",
98-
"resource", breachedResource,
99-
"utilization", utilization,
100-
"threshold", threshold,
101-
"request_id", heaviest.RequestID,
102-
"query", heaviest.QueryExpr,
103-
"user", heaviest.UserID,
104-
"metric", e.cfg.EvictionMetric,
105-
"metric_value", metricValue,
106-
)
107-
108-
// Increment metrics.
109-
e.evictionsTotal.WithLabelValues(string(breachedResource)).Inc()
91+
// Evict each victim.
92+
for _, victim := range victims {
93+
metricValue := e.registry.metric(victim.Stats)
94+
victim.Cancel()
95+
96+
level.Warn(e.logger).Log(
97+
"msg", "evicting query due to resource pressure",
98+
"resource", breachedResource,
99+
"utilization", utilization,
100+
"threshold", threshold,
101+
"request_id", victim.RequestID,
102+
"query", victim.QueryExpr,
103+
"user", victim.UserID,
104+
"metric", e.cfg.EvictionMetric,
105+
"metric_value", metricValue,
106+
)
107+
108+
e.evictionsTotal.WithLabelValues(string(breachedResource)).Inc()
109+
}
110110

111111
// Enter cooldown.
112112
cooldownRemaining = e.cfg.CooldownPeriod

pkg/util/queryeviction/evictor_test.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ func testEvictorConfig(cpu, heap float64, cooldown int) configs.EvictionConfig {
3939
CPUUtilization: cpu,
4040
HeapUtilization: heap,
4141
},
42-
CheckInterval: 10 * time.Millisecond,
43-
CooldownPeriod: cooldown,
44-
EvictionMetric: "fetched_samples",
42+
CheckInterval: 10 * time.Millisecond,
43+
CooldownPeriod: cooldown,
44+
EvictionMetric: "fetched_samples",
45+
MaxEvictionsPerCycle: 1,
4546
}
4647
}
4748

@@ -178,8 +179,9 @@ func TestCheckThresholds(t *testing.T) {
178179
CPUUtilization: tc.cpuThresh,
179180
HeapUtilization: tc.heapThresh,
180181
},
181-
CheckInterval: time.Second,
182-
EvictionMetric: "fetched_samples",
182+
CheckInterval: time.Second,
183+
EvictionMetric: "fetched_samples",
184+
MaxEvictionsPerCycle: 1,
183185
}
184186

185187
var evictor *QueryEvictor
@@ -212,7 +214,7 @@ func TestPrometheusMetrics_IncrementedCorrectly(t *testing.T) {
212214
}
213215

214216
func TestNewQueryEvictor_ReturnsNilWhenDisabled(t *testing.T) {
215-
cfg := configs.EvictionConfig{CheckInterval: time.Second, EvictionMetric: "fetched_samples"}
217+
cfg := configs.EvictionConfig{CheckInterval: time.Second, EvictionMetric: "fetched_samples", MaxEvictionsPerCycle: 1}
216218
evictor, err := NewQueryEvictor(newMockMonitor(0, 0), NewQueryRegistry(testMetricFunc), cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), "test")
217219
assert.NoError(t, err)
218220
assert.Nil(t, evictor)
@@ -236,3 +238,29 @@ func TestEviction_HeaviestQueryIsEvicted(t *testing.T) {
236238
t.Fatal("expected heaviest query to be evicted")
237239
}
238240
}
241+
242+
func TestEviction_MultipleQueriesPerCycle(t *testing.T) {
243+
reg := NewQueryRegistry(testMetricFunc)
244+
_, evictedSmall := registerTestQuery(reg, 100, "small", "user1")
245+
_, evictedLarge := registerTestQuery(reg, 10000, "large", "user2")
246+
_, evictedMedium := registerTestQuery(reg, 500, "medium", "user3")
247+
248+
cfg := configs.EvictionConfig{
249+
Threshold: configs.Threshold{
250+
CPUUtilization: 0.9,
251+
},
252+
CheckInterval: 10 * time.Millisecond,
253+
CooldownPeriod: 300,
254+
EvictionMetric: "fetched_samples",
255+
MaxEvictionsPerCycle: 2,
256+
}
257+
258+
startEvictor(t, newMockMonitor(0.95, 0.0), reg, cfg)
259+
260+
// The two heaviest (large=10000, medium=500) should be evicted in the same cycle.
261+
waitEvicted(t, evictedLarge)
262+
waitEvicted(t, evictedMedium)
263+
264+
// The smallest should not be evicted because max per cycle is 2 and cooldown prevents another cycle.
265+
assertNotEvicted(t, evictedSmall, 50*time.Millisecond)
266+
}

pkg/util/queryeviction/registry.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package queryeviction
22

33
import (
4+
"container/heap"
45
"context"
56
"fmt"
67
"sync"
@@ -80,29 +81,61 @@ func (r *QueryRegistry) Deregister(id uint64) {
8081
delete(r.queries, id)
8182
}
8283

83-
// FindHeaviest returns the entry with the highest metric value
84+
// FindHeaviest returns up to n entries with the highest metric values
8485
// among queries that have been running for at least minAge,
85-
// or nil if no eligible queries exist.
86-
func (r *QueryRegistry) FindHeaviest(minAge time.Duration) *QueryEntry {
86+
// sorted heaviest first. Returns nil if no eligible queries exist.
87+
func (r *QueryRegistry) FindHeaviest(n int, minAge time.Duration) []*QueryEntry {
8788
r.mu.RLock()
8889
defer r.mu.RUnlock()
8990

90-
var heaviest *QueryEntry
91-
var maxWeight uint64
9291
now := time.Now()
9392

93+
// Use a min-heap of size n to find the top-n queries
94+
h := &weightedHeap{}
95+
9496
for _, entry := range r.queries {
9597
if now.Sub(entry.RegisteredAt) < minAge {
9698
continue
9799
}
98-
weight := r.metric(entry.Stats)
99-
if heaviest == nil || weight > maxWeight {
100-
heaviest = entry
101-
maxWeight = weight
100+
w := weighted{entry: entry, weight: r.metric(entry.Stats)}
101+
102+
if h.Len() < n {
103+
heap.Push(h, w)
104+
} else if w.weight > (*h)[0].weight {
105+
(*h)[0] = w
106+
heap.Fix(h, 0)
102107
}
103108
}
104109

105-
return heaviest
110+
if h.Len() == 0 {
111+
return nil
112+
}
113+
114+
result := make([]*QueryEntry, h.Len())
115+
for i := len(result) - 1; i >= 0; i-- {
116+
result[i] = heap.Pop(h).(weighted).entry
117+
}
118+
return result
119+
}
120+
121+
// weightedHeap is a min-heap of weighted entries (smallest weight at root).
122+
type weightedHeap []weighted
123+
124+
type weighted struct {
125+
entry *QueryEntry
126+
weight uint64
127+
}
128+
129+
func (h weightedHeap) Len() int { return len(h) }
130+
func (h weightedHeap) Less(i, j int) bool { return h[i].weight < h[j].weight }
131+
func (h weightedHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
132+
func (h *weightedHeap) Push(x interface{}) { *h = append(*h, x.(weighted)) }
133+
func (h *weightedHeap) Pop() interface{} {
134+
old := *h
135+
n := len(old)
136+
x := old[n-1]
137+
*h = old[:n-1]
138+
return x
106139
}
107140

108141
// Len returns the number of currently registered queries.

pkg/util/queryeviction/registry_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ func TestDeregister_RemovesEntry(t *testing.T) {
5454
require.Equal(t, 1, reg.Len())
5555

5656
// FindHeaviest should return the registered entry.
57-
heaviest := reg.FindHeaviest(0)
58-
require.NotNil(t, heaviest)
59-
assert.Equal(t, id, heaviest.QueryID)
57+
results := reg.FindHeaviest(1, 0)
58+
require.Len(t, results, 1)
59+
assert.Equal(t, id, results[0].QueryID)
6060

6161
// Deregister and verify it's gone.
6262
reg.Deregister(id)
6363
assert.Equal(t, 0, reg.Len())
64-
assert.Nil(t, reg.FindHeaviest(0), "FindHeaviest should return nil after deregistering the only entry")
64+
assert.Nil(t, reg.FindHeaviest(1, 0), "FindHeaviest should return nil after deregistering the only entry")
6565
}
6666

6767
func TestDeregister_UnknownID_IsNoOp(t *testing.T) {
@@ -76,9 +76,9 @@ func TestDeregister_UnknownID_IsNoOp(t *testing.T) {
7676

7777
// Original entry should still be present.
7878
assert.Equal(t, 1, reg.Len())
79-
heaviest := reg.FindHeaviest(0)
80-
require.NotNil(t, heaviest)
81-
assert.Equal(t, id, heaviest.QueryID)
79+
results := reg.FindHeaviest(1, 0)
80+
require.Len(t, results, 1)
81+
assert.Equal(t, id, results[0].QueryID)
8282
}
8383

8484
func TestFindHeaviest_ReturnsHighestMetricValue(t *testing.T) {
@@ -91,16 +91,16 @@ func TestFindHeaviest_ReturnsHighestMetricValue(t *testing.T) {
9191
heaviestID := reg.Register(cancel, newTestStats(1000), "large-query", "user3", "")
9292
reg.Register(cancel, newTestStats(200), "another-query", "user4", "")
9393

94-
heaviest := reg.FindHeaviest(0)
95-
require.NotNil(t, heaviest)
96-
assert.Equal(t, heaviestID, heaviest.QueryID)
97-
assert.Equal(t, "large-query", heaviest.QueryExpr)
98-
assert.Equal(t, uint64(1000), heaviest.Stats.LoadFetchedSamples())
94+
results := reg.FindHeaviest(1, 0)
95+
require.Len(t, results, 1)
96+
assert.Equal(t, heaviestID, results[0].QueryID)
97+
assert.Equal(t, "large-query", results[0].QueryExpr)
98+
assert.Equal(t, uint64(1000), results[0].Stats.LoadFetchedSamples())
9999
}
100100

101101
func TestFindHeaviest_EmptyRegistry(t *testing.T) {
102102
reg := NewQueryRegistry(testMetricFunc)
103-
assert.Nil(t, reg.FindHeaviest(0), "FindHeaviest should return nil for empty registry")
103+
assert.Nil(t, reg.FindHeaviest(1, 0), "FindHeaviest should return nil for empty registry")
104104
}
105105

106106
func TestLen_ReflectsCurrentCount(t *testing.T) {
@@ -147,7 +147,7 @@ func TestConcurrent_RegisterDeregisterFindHeaviest(t *testing.T) {
147147
id := reg.Register(cancel, stats, "concurrent-query", "user", "")
148148

149149
// Interleave FindHeaviest and Len calls.
150-
_ = reg.FindHeaviest(0)
150+
_ = reg.FindHeaviest(1, 0)
151151
_ = reg.Len()
152152

153153
reg.Deregister(id)

0 commit comments

Comments
 (0)