Skip to content

Commit 76c8ade

Browse files
authored
Add maxSamples limit (#663)
* max samples limit Signed-off-by: Paurush Garg <paurushg@amazon.com> * Update max_samples tracking to selector operators Signed-off-by: Paurush Garg <paurushg@amazon.com> * Update to check maxSamples more frequently Signed-off-by: Paurush Garg <paurushg@amazon.com> * Updating maxSample check interval steps Signed-off-by: Paurush Garg <paurushg@amazon.com> * Add unit tests for max_samples Signed-off-by: Paurush Garg <paurushg@amazon.com> * Apply linter formatting Signed-off-by: Paurush Garg <paurushg@amazon.com> * Update SubQuery Sample tracker to inner loop and remove update matrix_selector maxSamples logic Signed-off-by: Paurush Garg <paurushg@amazon.com> * Remove Checking when delta is negative in matrix_selector Signed-off-by: Paurush Garg <paurushg@amazon.com> * convert SampleTracker to interface with nop implementation Signed-off-by: Paurush Garg <paurushg@amazon.com> --------- Signed-off-by: Paurush Garg <paurushg@amazon.com>
1 parent da5fdce commit 76c8ade

8 files changed

Lines changed: 404 additions & 0 deletions

File tree

engine/engine.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
198198
},
199199
decodingConcurrency: decodingConcurrency,
200200
selectorBatchSize: selectorBatchSize,
201+
maxSamplesPerQuery: opts.MaxSamples,
201202
}
202203
}
203204

@@ -227,6 +228,7 @@ type Engine struct {
227228
selectorBatchSize int64
228229
enableAnalysis bool
229230
noStepSubqueryIntervalFn func(time.Duration) time.Duration
231+
maxSamplesPerQuery int
230232
}
231233

232234
func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
@@ -444,7 +446,9 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
444446
EnableAnalysis: e.enableAnalysis,
445447
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
446448
DecodingConcurrency: e.decodingConcurrency,
449+
SampleTracker: query.NewSampleTracker(e.maxSamplesPerQuery),
447450
}
451+
448452
if opts == nil {
449453
return res
450454
}

engine/engine_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4794,6 +4794,156 @@ func TestQueryTimeout(t *testing.T) {
47944794
testutil.Equals(t, context.DeadlineExceeded, res.Err)
47954795
}
47964796

4797+
func TestMaxSamples(t *testing.T) {
4798+
t.Parallel()
4799+
4800+
t.Run("max_samples with rate function", func(t *testing.T) {
4801+
t.Parallel()
4802+
storage := teststorage.New(t)
4803+
defer storage.Close()
4804+
4805+
app := storage.Appender(context.Background())
4806+
// Create 1000 series with samples every 15s for 5 minutes
4807+
for i := range 1000 {
4808+
for ts := int64(0); ts <= 300; ts += 15 {
4809+
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
4810+
require.NoError(t, err)
4811+
}
4812+
}
4813+
require.NoError(t, app.Commit())
4814+
4815+
// With 1000 series and a 2m window, rate() will keep ~8 samples per series in memory
4816+
// = ~8000 samples total
4817+
query := `rate(test_metric[2m])`
4818+
start := time.Unix(120, 0)
4819+
end := time.Unix(300, 0)
4820+
step := 30 * time.Second
4821+
4822+
t.Run("exceeds limit", func(t *testing.T) {
4823+
ng := engine.New(engine.Opts{
4824+
EngineOpts: promql.EngineOpts{
4825+
Timeout: 1 * time.Hour,
4826+
MaxSamples: 5000, // Lower than ~8000 expected
4827+
},
4828+
})
4829+
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
4830+
require.NoError(t, err)
4831+
res := q.Exec(context.Background())
4832+
require.Error(t, res.Err, "expected max_samples error")
4833+
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
4834+
})
4835+
4836+
t.Run("within limit", func(t *testing.T) {
4837+
ng := engine.New(engine.Opts{
4838+
EngineOpts: promql.EngineOpts{
4839+
Timeout: 1 * time.Hour,
4840+
MaxSamples: 50000, // Higher than ~8000 expected
4841+
},
4842+
})
4843+
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
4844+
require.NoError(t, err)
4845+
res := q.Exec(context.Background())
4846+
require.NoError(t, res.Err)
4847+
})
4848+
})
4849+
4850+
t.Run("max_samples with vector selector", func(t *testing.T) {
4851+
t.Parallel()
4852+
storage := teststorage.New(t)
4853+
defer storage.Close()
4854+
4855+
app := storage.Appender(context.Background())
4856+
// 10000 series, each step will have 10000 samples in memory
4857+
for i := range 10000 {
4858+
for ts := int64(0); ts <= 300; ts += 30 {
4859+
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
4860+
require.NoError(t, err)
4861+
}
4862+
}
4863+
require.NoError(t, app.Commit())
4864+
4865+
query := `test_metric`
4866+
start := time.Unix(0, 0)
4867+
end := time.Unix(60, 0)
4868+
step := 30 * time.Second
4869+
4870+
ng := engine.New(engine.Opts{
4871+
EngineOpts: promql.EngineOpts{
4872+
Timeout: 1 * time.Hour,
4873+
MaxSamples: 5000, // Lower than 10000 series per step
4874+
},
4875+
})
4876+
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
4877+
require.NoError(t, err)
4878+
res := q.Exec(context.Background())
4879+
require.Error(t, res.Err)
4880+
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
4881+
})
4882+
4883+
t.Run("max_samples with subquery", func(t *testing.T) {
4884+
t.Parallel()
4885+
storage := teststorage.New(t)
4886+
defer storage.Close()
4887+
4888+
app := storage.Appender(context.Background())
4889+
// 1000 series with subquery that accumulates samples
4890+
for i := range 1000 {
4891+
for ts := int64(0); ts <= 600; ts += 15 {
4892+
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
4893+
require.NoError(t, err)
4894+
}
4895+
}
4896+
require.NoError(t, app.Commit())
4897+
4898+
// Subquery with 2m range and 30s step = 5 steps per evaluation
4899+
// With 1000 series, that's ~5000 samples in ring buffer
4900+
query := `sum_over_time(test_metric[2m:30s])`
4901+
start := time.Unix(120, 0)
4902+
end := time.Unix(300, 0)
4903+
step := 60 * time.Second
4904+
4905+
ng := engine.New(engine.Opts{
4906+
EngineOpts: promql.EngineOpts{
4907+
Timeout: 1 * time.Hour,
4908+
MaxSamples: 1000, // Lower than expected
4909+
},
4910+
})
4911+
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
4912+
require.NoError(t, err)
4913+
res := q.Exec(context.Background())
4914+
require.Error(t, res.Err)
4915+
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
4916+
})
4917+
4918+
t.Run("max_samples disabled by default", func(t *testing.T) {
4919+
t.Parallel()
4920+
storage := teststorage.New(t)
4921+
defer storage.Close()
4922+
4923+
app := storage.Appender(context.Background())
4924+
for i := range 100 {
4925+
for ts := int64(0); ts < 300; ts += 30 {
4926+
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
4927+
require.NoError(t, err)
4928+
}
4929+
}
4930+
require.NoError(t, app.Commit())
4931+
4932+
query := `rate(test_metric[1m])`
4933+
start := time.Unix(0, 0)
4934+
end := time.Unix(300, 0)
4935+
step := 30 * time.Second
4936+
4937+
ng := engine.New(engine.Opts{
4938+
EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour},
4939+
})
4940+
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
4941+
require.NoError(t, err)
4942+
res := q.Exec(context.Background())
4943+
require.NoError(t, res.Err)
4944+
})
4945+
}
4946+
47974947
type hintRecordingQuerier struct {
47984948
storage.Querier
47994949
mux sync.Mutex

execution/scan/subquery.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"github.com/prometheus/prometheus/model/labels"
2121
)
2222

23+
const sampleLimitCheckPercentage = 0.05
24+
2325
type subqueryOperator struct {
2426
next model.VectorOperator
2527
paramOp model.VectorOperator
@@ -52,6 +54,9 @@ type subqueryOperator struct {
5254
paramBuf []model.StepVector
5355
param2Buf []model.StepVector
5456
tempBuf []model.StepVector
57+
58+
currentTrackedSamples int
59+
lastTrackedSamples int
5560
}
5661

5762
func NewSubqueryOperator(next, paramOp, paramOp2 model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
@@ -150,6 +155,9 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
150155
for _, b := range o.buffers {
151156
b.Reset(mint, maxt+o.subQuery.Offset.Milliseconds())
152157
}
158+
o.currentTrackedSamples = 0
159+
o.lastTrackedSamples = 0
160+
checkSampleLimitCounter := 0
153161
if len(o.lastVectors) > 0 {
154162
for _, v := range o.lastVectors[o.lastCollected+1:] {
155163
if v.T > maxt {
@@ -184,6 +192,18 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
184192
o.collect(vector, mint)
185193
}
186194

195+
checkSampleLimitCounter++
196+
if o.shouldCheckSampleLimit(checkSampleLimitCounter) {
197+
if err := o.checkSampleLimit(); err != nil {
198+
return 0, err
199+
}
200+
checkSampleLimitCounter = 0
201+
}
202+
}
203+
if checkSampleLimitCounter > 0 {
204+
if err := o.checkSampleLimit(); err != nil {
205+
return 0, err
206+
}
187207
}
188208

189209
buf[n].Reset(o.currentStep)
@@ -210,6 +230,15 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
210230
return n, nil
211231
}
212232

233+
func (o *subqueryOperator) checkSampleLimit() error {
234+
delta := o.currentTrackedSamples - o.lastTrackedSamples
235+
if delta > 0 {
236+
o.opts.SampleTracker.Add(delta)
237+
}
238+
o.lastTrackedSamples = o.currentTrackedSamples
239+
return o.opts.SampleTracker.CheckLimit()
240+
}
241+
213242
func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
214243
if v.T < mint {
215244
return
@@ -220,6 +249,7 @@ func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
220249
continue
221250
}
222251
buffer.Push(v.T, ringbuffer.Value{F: s})
252+
o.currentTrackedSamples++
223253
}
224254
for i, s := range v.Histograms {
225255
buffer := o.buffers[v.HistogramIDs[i]]
@@ -245,6 +275,7 @@ func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
245275
s.CounterResetHint = histogram.UnknownCounterReset
246276
}
247277
buffer.Push(v.T, ringbuffer.Value{H: s})
278+
o.currentTrackedSamples += telemetry.CalculateHistogramSampleCount(s)
248279
}
249280

250281
}
@@ -291,3 +322,21 @@ func (o *subqueryOperator) initSeries(ctx context.Context) error {
291322
})
292323
return err
293324
}
325+
326+
func (o *subqueryOperator) shouldCheckSampleLimit(checkSampleLimitCounter int) bool {
327+
if len(o.series) == 0 {
328+
return checkSampleLimitCounter >= 1
329+
}
330+
331+
limit := o.opts.SampleTracker.Limit()
332+
targetSamplesPerCheck := int(float64(limit) * sampleLimitCheckPercentage)
333+
334+
maxSamplesPerCall := len(o.series) * o.stepsBatch
335+
if maxSamplesPerCall == 0 {
336+
return checkSampleLimitCounter >= 1
337+
}
338+
339+
interval := max(targetSamplesPerCheck/maxSamplesPerCall, 1)
340+
341+
return checkSampleLimitCounter >= interval
342+
}

query/options.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Options struct {
1818
NoStepSubqueryIntervalFn func(time.Duration) time.Duration
1919
EnableAnalysis bool
2020
DecodingConcurrency int
21+
SampleTracker SampleTracker // Tracks current samples in memory
2122
}
2223

2324
// TotalSteps returns the total number of steps in the query, regardless of batching.
@@ -57,6 +58,10 @@ func NestedOptionsForSubquery(opts *Options, step, queryRange, offset time.Durat
5758
NoStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn,
5859
EnableAnalysis: opts.EnableAnalysis,
5960
DecodingConcurrency: opts.DecodingConcurrency,
61+
SampleTracker: opts.SampleTracker,
62+
}
63+
if nOpts.SampleTracker == nil {
64+
nOpts.SampleTracker = NewSampleTracker(0)
6065
}
6166
if step != 0 {
6267
nOpts.Step = step

query/sample_tracker.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright (c) The Thanos Community Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package query
5+
6+
import (
7+
"fmt"
8+
"math"
9+
"sync/atomic"
10+
)
11+
12+
type SampleTracker interface {
13+
Add(count int)
14+
Remove(count int)
15+
CheckLimit() error
16+
Limit() int64
17+
}
18+
19+
type sampleTracker struct {
20+
current atomic.Int64
21+
limit int64
22+
}
23+
24+
func NewSampleTracker(maxSamples int) SampleTracker {
25+
if maxSamples <= 0 {
26+
return nopSampleTracker{}
27+
}
28+
return &sampleTracker{
29+
limit: int64(maxSamples),
30+
}
31+
}
32+
33+
func (st *sampleTracker) Add(count int) {
34+
st.current.Add(int64(count))
35+
}
36+
37+
func (st *sampleTracker) Remove(count int) {
38+
st.current.Add(-int64(count))
39+
}
40+
41+
func (st *sampleTracker) CheckLimit() error {
42+
current := st.current.Load()
43+
if current > st.limit {
44+
return ErrMaxSamplesExceeded{Current: current, Limit: st.limit}
45+
}
46+
return nil
47+
}
48+
49+
func (st *sampleTracker) Limit() int64 {
50+
return st.limit
51+
}
52+
53+
type nopSampleTracker struct{}
54+
55+
func (nopSampleTracker) Add(int) {}
56+
func (nopSampleTracker) Remove(int) {}
57+
func (nopSampleTracker) CheckLimit() error { return nil }
58+
func (nopSampleTracker) Limit() int64 { return math.MaxInt64 }
59+
60+
type ErrMaxSamplesExceeded struct {
61+
Current int64
62+
Limit int64
63+
}
64+
65+
func (e ErrMaxSamplesExceeded) Error() string {
66+
return fmt.Sprintf("query processing would load too many samples into memory: current=%d, limit=%d", e.Current, e.Limit)
67+
}

0 commit comments

Comments
 (0)