From ccb1f5a3b902b1fc2656151ab5381a88f567cfc2 Mon Sep 17 00:00:00 2001 From: Sylvain Rabot Date: Fri, 20 Mar 2026 08:53:12 +0100 Subject: [PATCH 1/9] feat: support anchored and smoothed modifiers for range selectors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement Prometheus proposal 0052 (extended range selectors) for the thanos promql-engine. The anchored modifier pins boundary values to real samples, while the smoothed modifier interpolates boundary values, eliminating extrapolation artifacts in rate/increase/delta calculations. Closes #630 Changes across the engine layers: - engine: add EnableExtendedRangeSelectors option, set parser flag - logicalplan: extend storage time ranges for anchored/smoothed selectors - execution: enforce function whitelists (anchored: rate, increase, delta, resets, changes; smoothed: rate, increase, delta), extend hint ranges - ringbuffer: add Anchored/Smoothed fields to FunctionArgs, implement extendedRangeRate with pickOrInterpolateLeft/Right boundary logic and counter-reset correction - storage/prometheus: thread anchored/smoothed through matrixSelector, use ext-lookback buffers, extend maxt for smoothed post-range samples Constraint: parser.EnableExtendedRangeSelectors is a global variable in prometheus/parser — can only be set to true (additive) to avoid races Rejected: Streaming ring buffers for anchored/smoothed | they lack ext-lookback and boundary interpolation support Confidence: high Scope-risk: narrow Not-tested: native histograms with anchored/smoothed (rejected upstream too) Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Sylvain Rabot --- engine/engine.go | 16 +- engine/extended_range_test.go | 277 ++++++++++++++++++++++++++ execution/execution.go | 23 +++ execution/parse/functions.go | 10 + logicalplan/plan.go | 8 + ringbuffer/functions.go | 134 ++++++++++++- ringbuffer/generic.go | 19 ++ storage/prometheus/matrix_selector.go | 38 +++- storage/prometheus/scanners.go | 2 + 9 files changed, 513 insertions(+), 14 deletions(-) create mode 100644 engine/extended_range_test.go diff --git a/engine/engine.go b/engine/engine.go index f18a93a85..6a9886d66 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -74,6 +74,10 @@ type Opts struct { // This will default to false. EnableXFunctions bool + // EnableExtendedRangeSelectors enables the anchored and smoothed modifiers + // for range vector selectors (Prometheus proposal 0052). + EnableExtendedRangeSelectors bool + // EnableAnalysis enables query analysis. EnableAnalysis bool @@ -183,7 +187,8 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine { scanners: scanners, activeQueryTracker: queryTracker, - disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks, + disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks, + enableExtendedRangeSelectors: opts.EnableExtendedRangeSelectors, logger: opts.Logger, lookbackDelta: opts.LookbackDelta, @@ -214,7 +219,8 @@ type Engine struct { scanners engstorage.Scanners activeQueryTracker promql.QueryTracker - disableDuplicateLabelChecks bool + disableDuplicateLabelChecks bool + enableExtendedRangeSelectors bool logger *slog.Logger lookbackDelta time.Duration @@ -238,6 +244,9 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts } defer e.activeQueryTracker.Delete(idx) + if e.enableExtendedRangeSelectors { + parser.EnableExtendedRangeSelectors = true + } expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr() if err != nil { return nil, err @@ -336,6 +345,9 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts * } defer e.activeQueryTracker.Delete(idx) + if e.enableExtendedRangeSelectors { + parser.EnableExtendedRangeSelectors = true + } expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr() if err != nil { return nil, err diff --git a/engine/extended_range_test.go b/engine/extended_range_test.go new file mode 100644 index 000000000..f3b974333 --- /dev/null +++ b/engine/extended_range_test.go @@ -0,0 +1,277 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +package engine_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/promqltest" + + "github.com/thanos-io/promql-engine/engine" + "github.com/thanos-io/promql-engine/logicalplan" +) + +func TestAnchoredSmoothedModifiers(t *testing.T) { + t.Parallel() + parser.EnableExtendedRangeSelectors = true + + opts := promql.EngineOpts{ + Timeout: 1 * time.Hour, + MaxSamples: 1e10, + EnableNegativeOffset: true, + EnableAtModifier: true, + } + + cases := []struct { + name string + load string + query string + start time.Time + end time.Time + step time.Duration + }{ + // Anchored rate/increase/delta on a linear counter. + { + name: "anchored rate on linear counter", + load: `load 10s + http_total 0 10 20 30 40 50 60 70 80 90 100`, + query: `rate(http_total[30s] anchored)`, + }, + { + name: "anchored increase on linear counter", + load: `load 10s + http_total 0 10 20 30 40 50 60 70 80 90 100`, + query: `increase(http_total[30s] anchored)`, + }, + { + name: "anchored delta on gauge", + load: `load 10s + temperature 20 22 21 23 25 24 26 28 27 29 30`, + query: `delta(temperature[30s] anchored)`, + }, + // Smoothed rate/increase/delta on a linear counter. + { + name: "smoothed rate on linear counter", + load: `load 10s + http_total 0 10 20 30 40 50 60 70 80 90 100`, + query: `rate(http_total[30s] smoothed)`, + }, + { + name: "smoothed increase on linear counter", + load: `load 10s + http_total 0 10 20 30 40 50 60 70 80 90 100`, + query: `increase(http_total[30s] smoothed)`, + }, + { + name: "smoothed delta on gauge", + load: `load 10s + temperature 20 22 21 23 25 24 26 28 27 29 30`, + query: `delta(temperature[30s] smoothed)`, + }, + // Anchored with counter resets. + { + name: "anchored increase with counter reset", + load: `load 10s + resets_total 0 10 20 5 15 25 10 20 30 40 50`, + query: `increase(resets_total[30s] anchored)`, + }, + { + name: "anchored rate with counter reset", + load: `load 10s + resets_total 0 10 20 5 15 25 10 20 30 40 50`, + query: `rate(resets_total[30s] anchored)`, + }, + // Smoothed with counter resets. + { + name: "smoothed increase with counter reset", + load: `load 10s + resets_total 0 10 20 5 15 25 10 20 30 40 50`, + query: `increase(resets_total[30s] smoothed)`, + }, + { + name: "smoothed rate with counter reset", + load: `load 10s + resets_total 0 10 20 5 15 25 10 20 30 40 50`, + query: `rate(resets_total[30s] smoothed)`, + }, + // Anchored resets and changes (only supported for anchored). + { + name: "anchored resets", + load: `load 10s + resets_total 0 10 20 5 15 25 10 20 30 40 50`, + query: `resets(resets_total[30s] anchored)`, + }, + { + name: "anchored changes", + load: `load 10s + metric 1 1 2 2 3 3 4 4 5 5 6`, + query: `changes(metric[30s] anchored)`, + }, + // Non-linear data. + { + name: "anchored rate on quadratic counter", + load: `load 10s + quadratic 0 1 4 9 16 25 36 49 64 81 100`, + query: `rate(quadratic[30s] anchored)`, + }, + { + name: "smoothed rate on quadratic counter", + load: `load 10s + quadratic 0 1 4 9 16 25 36 49 64 81 100`, + query: `rate(quadratic[30s] smoothed)`, + }, + // Multiple series. + { + name: "anchored increase multiple series", + load: `load 10s + http_total{path="/foo"} 0 5 10 15 20 25 30 35 40 45 50 + http_total{path="/bar"} 0 10 20 30 40 50 60 70 80 90 100`, + query: `increase(http_total[30s] anchored)`, + }, + { + name: "smoothed increase multiple series", + load: `load 10s + http_total{path="/foo"} 0 5 10 15 20 25 30 35 40 45 50 + http_total{path="/bar"} 0 10 20 30 40 50 60 70 80 90 100`, + query: `increase(http_total[30s] smoothed)`, + }, + } + + start := time.Unix(0, 0) + end := time.Unix(100, 0) + step := 10 * time.Second + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + storage := promqltest.LoadedStorage(t, tc.load) + defer storage.Close() + + tcStart := start + if !tc.start.IsZero() { + tcStart = tc.start + } + tcEnd := end + if !tc.end.IsZero() { + tcEnd = tc.end + } + tcStep := step + if tc.step != 0 { + tcStep = tc.step + } + + for _, disableOptimizers := range []bool{false, true} { + t.Run(fmt.Sprintf("disableOptimizers=%v", disableOptimizers), func(t *testing.T) { + optimizers := logicalplan.AllOptimizers + if disableOptimizers { + optimizers = logicalplan.NoOptimizers + } + newEngine := engine.New(engine.Opts{ + EngineOpts: opts, + LogicalOptimizers: optimizers, + SelectorBatchSize: 1, + EnableExtendedRangeSelectors: true, + }) + + ctx := context.Background() + q1, err := newEngine.NewRangeQuery(ctx, storage, nil, tc.query, tcStart, tcEnd, tcStep) + testutil.Ok(t, err) + defer q1.Close() + newResult := q1.Exec(ctx) + + oldEngine := promql.NewEngine(opts) + q2, err := oldEngine.NewRangeQuery(ctx, storage, nil, tc.query, tcStart, tcEnd, tcStep) + testutil.Ok(t, err) + defer q2.Close() + oldResult := q2.Exec(ctx) + + testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q1)) + }) + } + }) + } +} + +func TestAnchoredSmoothedWhitelist(t *testing.T) { + t.Parallel() + parser.EnableExtendedRangeSelectors = true + + opts := promql.EngineOpts{ + Timeout: 1 * time.Hour, + MaxSamples: 1e10, + } + + load := `load 10s + metric 0 10 20 30 40 50` + + storage := promqltest.LoadedStorage(t, load) + defer storage.Close() + + newEngine := engine.New(engine.Opts{ + EngineOpts: opts, + EnableExtendedRangeSelectors: true, + }) + ctx := context.Background() + + // Functions not in the whitelist should error. + unsupportedAnchored := []string{ + `avg_over_time(metric[30s] anchored)`, + `sum_over_time(metric[30s] anchored)`, + `max_over_time(metric[30s] anchored)`, + `deriv(metric[30s] anchored)`, + } + for _, query := range unsupportedAnchored { + t.Run("unsupported_anchored/"+query, func(t *testing.T) { + _, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) + testutil.NotOk(t, err) + }) + } + + unsupportedSmoothed := []string{ + `avg_over_time(metric[30s] smoothed)`, + `resets(metric[30s] smoothed)`, + `changes(metric[30s] smoothed)`, + } + for _, query := range unsupportedSmoothed { + t.Run("unsupported_smoothed/"+query, func(t *testing.T) { + _, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) + testutil.NotOk(t, err) + }) + } + + // Functions in the whitelist should work. + supportedAnchored := []string{ + `rate(metric[30s] anchored)`, + `increase(metric[30s] anchored)`, + `delta(metric[30s] anchored)`, + `resets(metric[30s] anchored)`, + `changes(metric[30s] anchored)`, + } + for _, query := range supportedAnchored { + t.Run("supported_anchored/"+query, func(t *testing.T) { + q, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) + testutil.Ok(t, err) + q.Close() + }) + } + + supportedSmoothed := []string{ + `rate(metric[30s] smoothed)`, + `increase(metric[30s] smoothed)`, + `delta(metric[30s] smoothed)`, + } + for _, query := range supportedSmoothed { + t.Run("supported_smoothed/"+query, func(t *testing.T) { + q, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) + testutil.Ok(t, err) + q.Close() + }) + } +} diff --git a/execution/execution.go b/execution/execution.go index 8df9332d8..000d33e25 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -181,14 +181,37 @@ func newAbsentOverTimeOperator(ctx context.Context, call *logicalplan.FunctionCa func newRangeVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, t *logicalplan.MatrixSelector, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { // TODO(saswatamcode): Range vector result might need new operator // before it can be non-nested. https://github.com/thanos-io/promql-engine/issues/39 + vs := t.VectorSelector + + // Validate function whitelist for anchored/smoothed modifiers. + if vs.Anchored { + if _, ok := parse.AnchoredSafeFunctions[e.Func.Name]; !ok { + return nil, errors.Newf("anchored modifier is not supported for %s, supported functions: %v", e.Func.Name, parse.AnchoredSafeFunctions) + } + } + if vs.Smoothed { + if _, ok := parse.SmoothedSafeFunctions[e.Func.Name]; !ok { + return nil, errors.Newf("smoothed modifier is not supported for %s, supported functions: %v", e.Func.Name, parse.SmoothedSafeFunctions) + } + } + milliSecondRange := t.Range.Milliseconds() if parse.IsExtFunction(e.Func.Name) { milliSecondRange += opts.ExtLookbackDelta.Milliseconds() } + if vs.Anchored { + milliSecondRange += opts.LookbackDelta.Milliseconds() + } + if vs.Smoothed { + milliSecondRange += opts.LookbackDelta.Milliseconds() + } start, end := getTimeRangesForVectorSelector(t.VectorSelector, opts, milliSecondRange) hints.Start = start hints.End = end + if vs.Smoothed { + hints.End += opts.LookbackDelta.Milliseconds() + } hints.Range = milliSecondRange op, err := scanners.NewMatrixSelector(ctx, opts, hints, *t, *e) if err != nil { diff --git a/execution/parse/functions.go b/execution/parse/functions.go index 0ba8173df..2721a9a79 100644 --- a/execution/parse/functions.go +++ b/execution/parse/functions.go @@ -34,6 +34,16 @@ func IsExtFunction(functionName string) bool { return ok } +// AnchoredSafeFunctions lists functions that support the anchored modifier. +var AnchoredSafeFunctions = map[string]struct{}{ + "resets": {}, "changes": {}, "rate": {}, "increase": {}, "delta": {}, +} + +// SmoothedSafeFunctions lists functions that support the smoothed modifier. +var SmoothedSafeFunctions = map[string]struct{}{ + "rate": {}, "increase": {}, "delta": {}, +} + func UnknownFunctionError(name string) error { msg := fmt.Sprintf("unknown function: %s", name) if _, ok := parser.Functions[name]; ok { diff --git a/logicalplan/plan.go b/logicalplan/plan.go index c9cf3f8f5..4bde0d682 100644 --- a/logicalplan/plan.go +++ b/logicalplan/plan.go @@ -132,6 +132,14 @@ func getTimeRangesForSelector(qOpts *query.Options, n *parser.VectorSelector, pa start -= int64(qOpts.ExtLookbackDelta.Milliseconds()) } + if n.Anchored { + start -= qOpts.LookbackDelta.Milliseconds() + } + if n.Smoothed { + start -= qOpts.LookbackDelta.Milliseconds() + end += qOpts.LookbackDelta.Milliseconds() + } + return start, end } diff --git a/ringbuffer/functions.go b/ringbuffer/functions.go index bbf6c054a..c263b9a7b 100644 --- a/ringbuffer/functions.go +++ b/ringbuffer/functions.go @@ -7,12 +7,12 @@ import ( "math" "sort" + "github.com/efficientgo/core/errors" + "github.com/prometheus/prometheus/model/histogram" + "github.com/thanos-io/promql-engine/compute" "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/warnings" - - "github.com/efficientgo/core/errors" - "github.com/prometheus/prometheus/model/histogram" ) type SamplesBuffer GenericRingBuffer @@ -27,6 +27,10 @@ type FunctionArgs struct { // quantile_over_time and predict_linear use one, so we only use one here. ScalarPoint float64 ScalarPoint2 float64 // only for double_exponential_smoothing (trend factor) + + // Anchored/Smoothed modifiers for extended range selectors (proposal 0052). + Anchored bool + Smoothed bool } type FunctionCall func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) @@ -362,18 +366,27 @@ var rangeVectorFuncs = map[string]FunctionCall{ return v, fh, true, warn, nil }, "rate": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { + if f.Anchored || f.Smoothed { + return extendedRangeRate(f.Samples, true, true, f.StepTime, f.SelectRange, f.Offset, f.Smoothed) + } if len(f.Samples) < 2 { return 0., nil, false, 0, nil } return extrapolatedRate(f.Samples, len(f.Samples), true, true, f.StepTime, f.SelectRange, f.Offset) }, "delta": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { + if f.Anchored || f.Smoothed { + return extendedRangeRate(f.Samples, false, false, f.StepTime, f.SelectRange, f.Offset, f.Smoothed) + } if len(f.Samples) < 2 { return 0., nil, false, 0, nil } return extrapolatedRate(f.Samples, len(f.Samples), false, false, f.StepTime, f.SelectRange, f.Offset) }, "increase": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { + if f.Anchored || f.Smoothed { + return extendedRangeRate(f.Samples, true, false, f.StepTime, f.SelectRange, f.Offset, f.Smoothed) + } if len(f.Samples) < 2 { return 0., nil, false, 0, nil } @@ -639,6 +652,121 @@ func extendedRate(samples []Sample, isCounter, isRate bool, stepTime int64, sele return resultValue, nil } +// extendedRangeRate implements the anchored and smoothed rate/increase/delta semantics +// from Prometheus proposal 0052. It computes boundary values at the range start and end, +// either by picking real sample values (anchored) or interpolating (smoothed), then +// computes the difference with counter-reset correction. +func extendedRangeRate(samples []Sample, isCounter, isRate bool, stepTime, selectRange, offset int64, smoothed bool) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { + if len(samples) == 0 { + return 0, nil, false, 0, nil + } + + // Check for mixed float/histogram samples. + if samples[0].V.H != nil { + return 0, nil, false, 0, errors.New("native histograms are not supported with anchored/smoothed modifiers") + } + + rangeEnd := stepTime - offset + rangeStart := rangeEnd - selectRange + + // Find the index of the first sample after rangeStart (first interior sample). + firstInteriorIdx := sort.Search(len(samples), func(i int) bool { + return samples[i].T > rangeStart + }) + + // Find the index of the last sample at or before rangeEnd. + lastInteriorIdx := sort.Search(len(samples), func(i int) bool { + return samples[i].T > rangeEnd + }) - 1 + + // Left boundary value. + leftVal, leftOk := pickOrInterpolateLeft(samples, firstInteriorIdx, rangeStart, smoothed) + if !leftOk { + return 0, nil, false, 0, nil + } + + // Right boundary value. + rightVal, rightOk := pickOrInterpolateRight(samples, lastInteriorIdx, rangeEnd, smoothed) + if !rightOk { + return 0, nil, false, 0, nil + } + + // Counter-reset correction: walk interior samples between the boundaries. + var counterCorrection float64 + if isCounter { + prevVal := leftVal + for i := firstInteriorIdx; i <= lastInteriorIdx; i++ { + if samples[i].V.F < prevVal { + counterCorrection += prevVal + } + prevVal = samples[i].V.F + } + } + + resultValue := rightVal - leftVal + counterCorrection + + if isRate { + rangeDuration := float64(selectRange) / 1000.0 + if rangeDuration == 0 { + return 0, nil, false, 0, nil + } + resultValue /= rangeDuration + } + + return resultValue, nil, true, 0, nil +} + +// pickOrInterpolateLeft returns the left boundary value at rangeStart. +// For anchored: uses the real sample value at/before rangeStart. +// For smoothed: interpolates between the samples bracketing rangeStart. +// If no sample exists before rangeStart, the first interior sample value is used. +func pickOrInterpolateLeft(samples []Sample, firstInteriorIdx int, rangeStart int64, smoothed bool) (float64, bool) { + if firstInteriorIdx > 0 { + // There is a sample at/before rangeStart. + leftSample := samples[firstInteriorIdx-1] + if !smoothed || firstInteriorIdx >= len(samples) { + // Anchored: use the sample value directly. + return leftSample.V.F, true + } + // Smoothed: interpolate between the sample before and after rangeStart. + return interpolateAt(leftSample, samples[firstInteriorIdx], rangeStart), true + } + + // No sample before rangeStart: duplicate the first interior sample. + if firstInteriorIdx < len(samples) { + return samples[firstInteriorIdx].V.F, true + } + + return 0, false +} + +// pickOrInterpolateRight returns the right boundary value at rangeEnd. +// For anchored: uses the last sample at/before rangeEnd. +// For smoothed: interpolates between the samples bracketing rangeEnd. +// If no sample exists after rangeEnd, the last interior sample value is used. +func pickOrInterpolateRight(samples []Sample, lastInteriorIdx int, rangeEnd int64, smoothed bool) (float64, bool) { + if lastInteriorIdx < 0 { + return 0, false + } + + if !smoothed || lastInteriorIdx+1 >= len(samples) { + // Anchored or no sample after rangeEnd: use last interior sample directly. + return samples[lastInteriorIdx].V.F, true + } + + // Smoothed: interpolate between the last sample before and first sample after rangeEnd. + return interpolateAt(samples[lastInteriorIdx], samples[lastInteriorIdx+1], rangeEnd), true +} + +// interpolateAt linearly interpolates between two samples at the given timestamp. +func interpolateAt(left, right Sample, timestamp int64) float64 { + if left.T == right.T { + return left.V.F + } + fraction := float64(timestamp-left.T) / float64(right.T-left.T) + return left.V.F + fraction*(right.V.F-left.V.F) +} + // histogramRate is a helper function for extrapolatedRate. It requires // points[0] to be a histogram. It returns nil if any other Point in points is // not a histogram. diff --git a/ringbuffer/generic.go b/ringbuffer/generic.go index 5357f8237..3c91bea3b 100644 --- a/ringbuffer/generic.go +++ b/ringbuffer/generic.go @@ -46,6 +46,9 @@ type GenericRingBuffer struct { selectRange int64 extLookback int64 call FunctionCall + + anchored bool + smoothed bool } func New(ctx context.Context, size int, selectRange, offset int64, call FunctionCall) *GenericRingBuffer { @@ -63,6 +66,20 @@ func NewWithExtLookback(ctx context.Context, size int, selectRange, offset, extL } } +// NewAnchored creates a ring buffer for anchored range selectors. +func NewAnchored(ctx context.Context, size int, selectRange, offset, extLookback int64, call FunctionCall) *GenericRingBuffer { + buf := NewWithExtLookback(ctx, size, selectRange, offset, extLookback, call) + buf.anchored = true + return buf +} + +// NewSmoothed creates a ring buffer for smoothed range selectors. +func NewSmoothed(ctx context.Context, size int, selectRange, offset, extLookback int64, call FunctionCall) *GenericRingBuffer { + buf := NewWithExtLookback(ctx, size, selectRange, offset, extLookback, call) + buf.smoothed = true + return buf +} + func (r *GenericRingBuffer) SampleCount() int { c := 0 for _, s := range r.items { @@ -142,6 +159,8 @@ func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, scalarA ScalarPoint: scalarArg, ScalarPoint2: scalarArg2, // only for double_exponential_smoothing MetricAppearedTs: metricAppearedTs, + Anchored: r.anchored, + Smoothed: r.smoothed, }) } diff --git a/storage/prometheus/matrix_selector.go b/storage/prometheus/matrix_selector.go index 1b876c243..ca4db63f7 100644 --- a/storage/prometheus/matrix_selector.go +++ b/storage/prometheus/matrix_selector.go @@ -54,13 +54,17 @@ type matrixSelector struct { fhReader *histogram.FloatHistogram opts *query.Options - numSteps int - mint int64 - maxt int64 - step int64 - selectRange int64 - offset int64 - isExtFunction bool + numSteps int + mint int64 + maxt int64 + step int64 + selectRange int64 + offset int64 + isExtFunction bool + isAnchored bool + isSmoothed bool + needsExtLookback bool + lookbackDelta int64 currentStep int64 currentSeries int64 @@ -90,11 +94,13 @@ func NewMatrixSelector( selectRange, offset time.Duration, batchSize int64, shard, numShard int, + anchored, smoothed bool, ) (model.VectorOperator, error) { call, err := ringbuffer.NewRangeVectorFunc(functionName) if err != nil { return nil, err } + isExt := parse.IsExtFunction(functionName) m := &matrixSelector{ storage: selector, call: call, @@ -108,7 +114,12 @@ func NewMatrixSelector( mint: opts.Start.UnixMilli(), maxt: opts.End.UnixMilli(), step: opts.Step.Milliseconds(), - isExtFunction: parse.IsExtFunction(functionName), + isExtFunction: isExt, + isAnchored: anchored, + isSmoothed: smoothed, + + needsExtLookback: isExt || anchored || smoothed, + lookbackDelta: opts.LookbackDelta.Milliseconds(), selectRange: selectRange.Milliseconds(), offset: offset.Milliseconds(), @@ -193,8 +204,11 @@ func (o *matrixSelector) Next(ctx context.Context, buf []model.StepVector) (int, for currStep := 0; currStep < n && seriesTs <= o.maxt; currStep++ { maxt := seriesTs - o.offset mint := maxt - o.selectRange + if o.isSmoothed { + maxt += o.lookbackDelta + } - if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.isExtFunction); err != nil { + if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.needsExtLookback); err != nil { return 0, err } // TODO(saswatamcode): Handle multi-arg functions for matrixSelectors. @@ -317,6 +331,12 @@ func (o *matrixSelector) shouldCheckSampleLimit(firstSeries int64) bool { } func (o *matrixSelector) newBuffer(ctx context.Context) ringbuffer.Buffer { + if o.isAnchored { + return ringbuffer.NewAnchored(ctx, 8, o.selectRange, o.offset, o.lookbackDelta-1, o.call) + } + if o.isSmoothed { + return ringbuffer.NewSmoothed(ctx, 8, o.selectRange, o.offset, o.lookbackDelta-1, o.call) + } if ringbuffer.UseStreamingRingBuffers(*o.opts, o.selectRange) { switch o.functionName { case "rate": diff --git a/storage/prometheus/scanners.go b/storage/prometheus/scanners.go index bac38431c..f440ea1c1 100644 --- a/storage/prometheus/scanners.go +++ b/storage/prometheus/scanners.go @@ -149,6 +149,8 @@ func (p Scanners) NewMatrixSelector( vs.BatchSize, i, opts.DecodingConcurrency, + vs.Anchored, + vs.Smoothed, ) if err != nil { return nil, err From 36f1d1e6fc3c34806e911ae624873097e0ba3bd8 Mon Sep 17 00:00:00 2001 From: Sylvain Rabot Date: Fri, 20 Mar 2026 09:05:16 +0100 Subject: [PATCH 2/9] fix: correct counter-reset handling and hints.Range for anchored/smoothed Address review feedback on the anchored/smoothed implementation: 1. Counter-reset at boundaries: interpolateAt now handles counter resets during smoothed interpolation (left edge zeros y1, right edge adds y1 to y2), matching upstream Prometheus' interpolate function. correctForCounterResets now checks the right boundary value against the last interior sample, preventing impossible negative increase values when a reset occurs at the range boundary. 2. hints.Range semantics: the widened query window (for ext-lookback) is now only applied to hints.Start/End. hints.Range stays equal to the original selector range so backends using it for resolution selection or pushdown see the true range, matching Prometheus behavior. 3. Added regression test for smoothed increase/rate with counter reset at the range boundary. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Sylvain Rabot --- engine/extended_range_test.go | 14 ++++ execution/execution.go | 11 ++- ringbuffer/functions.go | 149 +++++++++++++++++++--------------- 3 files changed, 106 insertions(+), 68 deletions(-) diff --git a/engine/extended_range_test.go b/engine/extended_range_test.go index f3b974333..fd833dc37 100644 --- a/engine/extended_range_test.go +++ b/engine/extended_range_test.go @@ -114,6 +114,20 @@ func TestAnchoredSmoothedModifiers(t *testing.T) { metric 1 1 2 2 3 3 4 4 5 5 6`, query: `changes(metric[30s] anchored)`, }, + // Counter reset at range boundary (regression test: smoothed interpolation + // must handle counter resets to avoid negative increase values). + { + name: "smoothed increase counter reset at boundary", + load: `load 10s + counter_boundary 0 4 5 1 6 11`, + query: `increase(counter_boundary[10s] smoothed)`, + }, + { + name: "smoothed rate counter reset at boundary", + load: `load 10s + counter_boundary 0 4 5 1 6 11`, + query: `rate(counter_boundary[10s] smoothed)`, + }, // Non-linear data. { name: "anchored rate on quadratic counter", diff --git a/execution/execution.go b/execution/execution.go index 000d33e25..8a338c3fb 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -199,14 +199,19 @@ func newRangeVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, t if parse.IsExtFunction(e.Func.Name) { milliSecondRange += opts.ExtLookbackDelta.Milliseconds() } + + // For anchored/smoothed, widen the query window (Start/End) but keep + // hints.Range equal to the original selector range. Backends use Range for + // resolution selection and pushdown decisions and should see the true range. + queryRange := milliSecondRange if vs.Anchored { - milliSecondRange += opts.LookbackDelta.Milliseconds() + queryRange += opts.LookbackDelta.Milliseconds() } if vs.Smoothed { - milliSecondRange += opts.LookbackDelta.Milliseconds() + queryRange += opts.LookbackDelta.Milliseconds() } - start, end := getTimeRangesForVectorSelector(t.VectorSelector, opts, milliSecondRange) + start, end := getTimeRangesForVectorSelector(t.VectorSelector, opts, queryRange) hints.Start = start hints.End = end if vs.Smoothed { diff --git a/ringbuffer/functions.go b/ringbuffer/functions.go index c263b9a7b..fc315762d 100644 --- a/ringbuffer/functions.go +++ b/ringbuffer/functions.go @@ -656,54 +656,63 @@ func extendedRate(samples []Sample, isCounter, isRate bool, stepTime int64, sele // from Prometheus proposal 0052. It computes boundary values at the range start and end, // either by picking real sample values (anchored) or interpolating (smoothed), then // computes the difference with counter-reset correction. +// +// This implementation mirrors prometheus/prometheus promql/functions.go extendedRate. func extendedRangeRate(samples []Sample, isCounter, isRate bool, stepTime, selectRange, offset int64, smoothed bool) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { if len(samples) == 0 { return 0, nil, false, 0, nil } - // Check for mixed float/histogram samples. if samples[0].V.H != nil { return 0, nil, false, 0, errors.New("native histograms are not supported with anchored/smoothed modifiers") } + lastSampleIndex := len(samples) - 1 rangeEnd := stepTime - offset rangeStart := rangeEnd - selectRange - // Find the index of the first sample after rangeStart (first interior sample). - firstInteriorIdx := sort.Search(len(samples), func(i int) bool { + // Find firstSampleIndex: the last sample at or before rangeStart, clamped to 0. + firstSampleIndex := sort.Search(lastSampleIndex, func(i int) bool { return samples[i].T > rangeStart - }) - - // Find the index of the last sample at or before rangeEnd. - lastInteriorIdx := sort.Search(len(samples), func(i int) bool { - return samples[i].T > rangeEnd }) - 1 + if firstSampleIndex < 0 { + firstSampleIndex = 0 + } - // Left boundary value. - leftVal, leftOk := pickOrInterpolateLeft(samples, firstInteriorIdx, rangeStart, smoothed) - if !leftOk { - return 0, nil, false, 0, nil + // For smoothed, extend lastSampleIndex to include the first sample at or after rangeEnd. + if smoothed { + lastSampleIndex = sort.Search(lastSampleIndex, func(i int) bool { + return samples[i].T >= rangeEnd + }) + if lastSampleIndex >= len(samples) { + lastSampleIndex = len(samples) - 1 + } } - // Right boundary value. - rightVal, rightOk := pickOrInterpolateRight(samples, lastInteriorIdx, rangeEnd, smoothed) - if !rightOk { + // If the last sample is at or before rangeStart, there's nothing in the range. + if samples[lastSampleIndex].T <= rangeStart { return 0, nil, false, 0, nil } - // Counter-reset correction: walk interior samples between the boundaries. - var counterCorrection float64 + left := pickOrInterpolateLeft(samples, firstSampleIndex, rangeStart, smoothed, isCounter) + right := pickOrInterpolateRight(samples, lastSampleIndex, rangeEnd, smoothed, isCounter) + + resultValue := right - left + if isCounter { - prevVal := leftVal - for i := firstInteriorIdx; i <= lastInteriorIdx; i++ { - if samples[i].V.F < prevVal { - counterCorrection += prevVal - } - prevVal = samples[i].V.F + // Narrow to samples strictly within the range for counter-reset correction, + // since pickOrInterpolateLeft/Right already handle resets at boundaries. + corrFirst := firstSampleIndex + if samples[corrFirst].T <= rangeStart { + corrFirst++ + } + corrLast := lastSampleIndex + if corrLast >= 0 && samples[corrLast].T >= rangeEnd { + corrLast-- } - } - resultValue := rightVal - leftVal + counterCorrection + resultValue += correctForCounterResets(left, right, samples[corrFirst:corrLast+1]) + } if isRate { rangeDuration := float64(selectRange) / 1000.0 @@ -716,55 +725,65 @@ func extendedRangeRate(samples []Sample, isCounter, isRate bool, stepTime, selec return resultValue, nil, true, 0, nil } -// pickOrInterpolateLeft returns the left boundary value at rangeStart. +// pickOrInterpolateLeft returns the value at the left boundary of the range. // For anchored: uses the real sample value at/before rangeStart. -// For smoothed: interpolates between the samples bracketing rangeStart. -// If no sample exists before rangeStart, the first interior sample value is used. -func pickOrInterpolateLeft(samples []Sample, firstInteriorIdx int, rangeStart int64, smoothed bool) (float64, bool) { - if firstInteriorIdx > 0 { - // There is a sample at/before rangeStart. - leftSample := samples[firstInteriorIdx-1] - if !smoothed || firstInteriorIdx >= len(samples) { - // Anchored: use the sample value directly. - return leftSample.V.F, true - } - // Smoothed: interpolate between the sample before and after rangeStart. - return interpolateAt(leftSample, samples[firstInteriorIdx], rangeStart), true - } - - // No sample before rangeStart: duplicate the first interior sample. - if firstInteriorIdx < len(samples) { - return samples[firstInteriorIdx].V.F, true - } - - return 0, false +// For smoothed: interpolates between the samples bracketing rangeStart, with +// counter-reset awareness. +// If no sample exists before rangeStart, the first sample value is used. +func pickOrInterpolateLeft(samples []Sample, first int, rangeStart int64, smoothed, isCounter bool) float64 { + if smoothed && samples[first].T < rangeStart && first+1 < len(samples) { + return interpolateAt(samples[first], samples[first+1], rangeStart, isCounter, true) + } + return samples[first].V.F } -// pickOrInterpolateRight returns the right boundary value at rangeEnd. +// pickOrInterpolateRight returns the value at the right boundary of the range. // For anchored: uses the last sample at/before rangeEnd. -// For smoothed: interpolates between the samples bracketing rangeEnd. -// If no sample exists after rangeEnd, the last interior sample value is used. -func pickOrInterpolateRight(samples []Sample, lastInteriorIdx int, rangeEnd int64, smoothed bool) (float64, bool) { - if lastInteriorIdx < 0 { - return 0, false - } +// For smoothed: interpolates between the samples bracketing rangeEnd, with +// counter-reset awareness. +// If no sample exists after rangeEnd, the last sample value is used. +func pickOrInterpolateRight(samples []Sample, last int, rangeEnd int64, smoothed, isCounter bool) float64 { + if smoothed && last > 0 && samples[last].T > rangeEnd { + return interpolateAt(samples[last-1], samples[last], rangeEnd, isCounter, false) + } + return samples[last].V.F +} - if !smoothed || lastInteriorIdx+1 >= len(samples) { - // Anchored or no sample after rangeEnd: use last interior sample directly. - return samples[lastInteriorIdx].V.F, true +// interpolateAt performs linear interpolation between two samples at the given timestamp. +// If isCounter is true and there is a counter reset (right < left): +// - on the left edge, it sets the left value to 0 +// - on the right edge, it adds the left value to the right value +// +// This matches Prometheus' interpolate function in promql/functions.go. +func interpolateAt(left, right Sample, timestamp int64, isCounter, leftEdge bool) float64 { + y1 := left.V.F + y2 := right.V.F + if isCounter && y2 < y1 { + if leftEdge { + y1 = 0 + } else { + y2 += y1 + } } - - // Smoothed: interpolate between the last sample before and first sample after rangeEnd. - return interpolateAt(samples[lastInteriorIdx], samples[lastInteriorIdx+1], rangeEnd), true + return y1 + (y2-y1)*float64(timestamp-left.T)/float64(right.T-left.T) } -// interpolateAt linearly interpolates between two samples at the given timestamp. -func interpolateAt(left, right Sample, timestamp int64) float64 { - if left.T == right.T { - return left.V.F +// correctForCounterResets calculates the correction for counter resets across +// interior samples and the right boundary value. This matches Prometheus' +// correctForCounterResets in promql/functions.go. +func correctForCounterResets(left, right float64, points []Sample) float64 { + var correction float64 + prev := left + for _, p := range points { + if p.V.F < prev { + correction += prev + } + prev = p.V.F + } + if right < prev { + correction += prev } - fraction := float64(timestamp-left.T) / float64(right.T-left.T) - return left.V.F + fraction*(right.V.F-left.V.F) + return correction } // histogramRate is a helper function for extrapolatedRate. It requires From 626f2f8d1f8bc803a746b89baef275c85840019a Mon Sep 17 00:00:00 2001 From: Sylvain Rabot Date: Fri, 20 Mar 2026 09:10:22 +0100 Subject: [PATCH 3/9] test: add SelectHints regression test for anchored/smoothed Assert that hints.Range passed to NewMatrixSelector always equals the original selector range, not the widened query window. Uses a spy scanner that captures hints and verifies the contract for standard, anchored, and smoothed selectors. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Sylvain Rabot --- engine/extended_range_test.go | 129 ++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/engine/extended_range_test.go b/engine/extended_range_test.go index fd833dc37..0cfcab2ff 100644 --- a/engine/extended_range_test.go +++ b/engine/extended_range_test.go @@ -6,6 +6,7 @@ package engine_test import ( "context" "fmt" + "sync" "testing" "time" @@ -13,9 +14,14 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/promqltest" + promstorage "github.com/prometheus/prometheus/storage" "github.com/thanos-io/promql-engine/engine" + "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/promql-engine/query" + engstorage "github.com/thanos-io/promql-engine/storage" + promscan "github.com/thanos-io/promql-engine/storage/prometheus" ) func TestAnchoredSmoothedModifiers(t *testing.T) { @@ -289,3 +295,126 @@ func TestAnchoredSmoothedWhitelist(t *testing.T) { }) } } + +// hintsSpy wraps real scanners and records the SelectHints passed to NewMatrixSelector. +type hintsSpy struct { + inner engstorage.Scanners + mu sync.Mutex + matrixHintsCap []promstorage.SelectHints +} + +func (s *hintsSpy) Close() error { return s.inner.Close() } + +func (s *hintsSpy) NewVectorSelector(ctx context.Context, opts *query.Options, hints promstorage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) { + return s.inner.NewVectorSelector(ctx, opts, hints, selector) +} + +func (s *hintsSpy) NewMatrixSelector(ctx context.Context, opts *query.Options, hints promstorage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) { + s.mu.Lock() + s.matrixHintsCap = append(s.matrixHintsCap, hints) + s.mu.Unlock() + return s.inner.NewMatrixSelector(ctx, opts, hints, selector, call) +} + +func (s *hintsSpy) capturedHints() []promstorage.SelectHints { + s.mu.Lock() + defer s.mu.Unlock() + return s.matrixHintsCap +} + +func TestAnchoredSmoothedSelectHints(t *testing.T) { + t.Parallel() + parser.EnableExtendedRangeSelectors = true + + load := `load 10s + metric 0 10 20 30 40 50 60 70 80 90 100` + storage := promqltest.LoadedStorage(t, load) + defer storage.Close() + + lookbackDelta := 5 * time.Minute // engine default + + cases := []struct { + name string + query string + expectedRange int64 // hints.Range in ms — should always be original selector range + }{ + { + name: "standard rate — Range equals selector range", + query: `rate(metric[30s])`, + expectedRange: 30000, + }, + { + name: "anchored rate — Range equals selector range, not widened", + query: `rate(metric[30s] anchored)`, + expectedRange: 30000, + }, + { + name: "smoothed rate — Range equals selector range, not widened", + query: `rate(metric[30s] smoothed)`, + expectedRange: 30000, + }, + { + name: "anchored increase — Range equals selector range", + query: `increase(metric[1m] anchored)`, + expectedRange: 60000, + }, + { + name: "smoothed delta — Range equals selector range", + query: `delta(metric[1m] smoothed)`, + expectedRange: 60000, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + opts := engine.Opts{ + EngineOpts: promql.EngineOpts{ + Timeout: 1 * time.Hour, + MaxSamples: 1e10, + }, + EnableExtendedRangeSelectors: true, + } + qOpts := &query.Options{ + Start: time.Unix(0, 0), + End: time.Unix(100, 0), + Step: 10 * time.Second, + LookbackDelta: lookbackDelta, + ExtLookbackDelta: 1 * time.Hour, + } + + // Parse and build the logical plan to create real scanners. + parser.EnableExtendedRangeSelectors = true + expr, err := parser.NewParser(tc.query).ParseExpr() + testutil.Ok(t, err) + + planOpts := logicalplan.PlanOptions{} + lplan, err := logicalplan.NewFromAST(expr, qOpts, planOpts) + testutil.Ok(t, err) + optimizedPlan, _ := lplan.Optimize(logicalplan.AllOptimizers) + + realScanners, err := promscan.NewPrometheusScanners(storage, qOpts, optimizedPlan) + testutil.Ok(t, err) + defer realScanners.Close() + + spy := &hintsSpy{inner: realScanners} + + eng := engine.NewWithScanners(opts, spy) + q, err := eng.NewRangeQuery(context.Background(), storage, nil, tc.query, time.Unix(0, 0), time.Unix(100, 0), 10*time.Second) + testutil.Ok(t, err) + defer q.Close() + + res := q.Exec(context.Background()) + testutil.Ok(t, res.Err) + + hints := spy.capturedHints() + if len(hints) == 0 { + t.Fatal("expected at least one NewMatrixSelector call, got none") + } + for i, h := range hints { + if h.Range != tc.expectedRange { + t.Errorf("hints[%d].Range = %d, want %d (original selector range)", i, h.Range, tc.expectedRange) + } + } + }) + } +} From 6dd801b26c18906fc6bd00a3a4de516facdca77c Mon Sep 17 00:00:00 2001 From: Sylvain Rabot Date: Fri, 20 Mar 2026 09:13:49 +0100 Subject: [PATCH 4/9] refactor: simplify interpolateAt to match Prometheus v0.310.0 Remove the leftEdge parameter from interpolateAt. Counter resets during interpolation now always set y1=0 for both edges, matching the upstream simplification in prometheus/prometheus v0.310.0 (promql/functions.go). The old v0.308.0 behavior was asymmetric: left edge zeroed y1, right edge added y1 to y2. The new unified approach is cleaner and correctly models counters as starting from 0 post-reset regardless of which boundary is being interpolated. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Sylvain Rabot --- engine/engine.go | 4 +-- engine/extended_range_test.go | 52 +++++++++++++++++------------------ ringbuffer/functions.go | 32 ++++++++------------- 3 files changed, 40 insertions(+), 48 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 6a9886d66..055772237 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -187,7 +187,7 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine { scanners: scanners, activeQueryTracker: queryTracker, - disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks, + disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks, enableExtendedRangeSelectors: opts.EnableExtendedRangeSelectors, logger: opts.Logger, @@ -219,7 +219,7 @@ type Engine struct { scanners engstorage.Scanners activeQueryTracker promql.QueryTracker - disableDuplicateLabelChecks bool + disableDuplicateLabelChecks bool enableExtendedRangeSelectors bool logger *slog.Logger diff --git a/engine/extended_range_test.go b/engine/extended_range_test.go index 0cfcab2ff..4951f5561 100644 --- a/engine/extended_range_test.go +++ b/engine/extended_range_test.go @@ -10,18 +10,18 @@ import ( "testing" "time" - "github.com/efficientgo/core/testutil" - "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/promql/parser" - "github.com/prometheus/prometheus/promql/promqltest" - promstorage "github.com/prometheus/prometheus/storage" - "github.com/thanos-io/promql-engine/engine" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/promql-engine/query" engstorage "github.com/thanos-io/promql-engine/storage" promscan "github.com/thanos-io/promql-engine/storage/prometheus" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/promqltest" + promstorage "github.com/prometheus/prometheus/storage" ) func TestAnchoredSmoothedModifiers(t *testing.T) { @@ -47,77 +47,77 @@ func TestAnchoredSmoothedModifiers(t *testing.T) { { name: "anchored rate on linear counter", load: `load 10s - http_total 0 10 20 30 40 50 60 70 80 90 100`, + http_total 0 10 20 30 40 50 60 70 80 90 100`, query: `rate(http_total[30s] anchored)`, }, { name: "anchored increase on linear counter", load: `load 10s - http_total 0 10 20 30 40 50 60 70 80 90 100`, + http_total 0 10 20 30 40 50 60 70 80 90 100`, query: `increase(http_total[30s] anchored)`, }, { name: "anchored delta on gauge", load: `load 10s - temperature 20 22 21 23 25 24 26 28 27 29 30`, + temperature 20 22 21 23 25 24 26 28 27 29 30`, query: `delta(temperature[30s] anchored)`, }, // Smoothed rate/increase/delta on a linear counter. { name: "smoothed rate on linear counter", load: `load 10s - http_total 0 10 20 30 40 50 60 70 80 90 100`, + http_total 0 10 20 30 40 50 60 70 80 90 100`, query: `rate(http_total[30s] smoothed)`, }, { name: "smoothed increase on linear counter", load: `load 10s - http_total 0 10 20 30 40 50 60 70 80 90 100`, + http_total 0 10 20 30 40 50 60 70 80 90 100`, query: `increase(http_total[30s] smoothed)`, }, { name: "smoothed delta on gauge", load: `load 10s - temperature 20 22 21 23 25 24 26 28 27 29 30`, + temperature 20 22 21 23 25 24 26 28 27 29 30`, query: `delta(temperature[30s] smoothed)`, }, // Anchored with counter resets. { name: "anchored increase with counter reset", load: `load 10s - resets_total 0 10 20 5 15 25 10 20 30 40 50`, + resets_total 0 10 20 5 15 25 10 20 30 40 50`, query: `increase(resets_total[30s] anchored)`, }, { name: "anchored rate with counter reset", load: `load 10s - resets_total 0 10 20 5 15 25 10 20 30 40 50`, + resets_total 0 10 20 5 15 25 10 20 30 40 50`, query: `rate(resets_total[30s] anchored)`, }, // Smoothed with counter resets. { name: "smoothed increase with counter reset", load: `load 10s - resets_total 0 10 20 5 15 25 10 20 30 40 50`, + resets_total 0 10 20 5 15 25 10 20 30 40 50`, query: `increase(resets_total[30s] smoothed)`, }, { name: "smoothed rate with counter reset", load: `load 10s - resets_total 0 10 20 5 15 25 10 20 30 40 50`, + resets_total 0 10 20 5 15 25 10 20 30 40 50`, query: `rate(resets_total[30s] smoothed)`, }, // Anchored resets and changes (only supported for anchored). { name: "anchored resets", load: `load 10s - resets_total 0 10 20 5 15 25 10 20 30 40 50`, + resets_total 0 10 20 5 15 25 10 20 30 40 50`, query: `resets(resets_total[30s] anchored)`, }, { name: "anchored changes", load: `load 10s - metric 1 1 2 2 3 3 4 4 5 5 6`, + metric 1 1 2 2 3 3 4 4 5 5 6`, query: `changes(metric[30s] anchored)`, }, // Counter reset at range boundary (regression test: smoothed interpolation @@ -125,41 +125,41 @@ func TestAnchoredSmoothedModifiers(t *testing.T) { { name: "smoothed increase counter reset at boundary", load: `load 10s - counter_boundary 0 4 5 1 6 11`, + counter_boundary 0 4 5 1 6 11`, query: `increase(counter_boundary[10s] smoothed)`, }, { name: "smoothed rate counter reset at boundary", load: `load 10s - counter_boundary 0 4 5 1 6 11`, + counter_boundary 0 4 5 1 6 11`, query: `rate(counter_boundary[10s] smoothed)`, }, // Non-linear data. { name: "anchored rate on quadratic counter", load: `load 10s - quadratic 0 1 4 9 16 25 36 49 64 81 100`, + quadratic 0 1 4 9 16 25 36 49 64 81 100`, query: `rate(quadratic[30s] anchored)`, }, { name: "smoothed rate on quadratic counter", load: `load 10s - quadratic 0 1 4 9 16 25 36 49 64 81 100`, + quadratic 0 1 4 9 16 25 36 49 64 81 100`, query: `rate(quadratic[30s] smoothed)`, }, // Multiple series. { name: "anchored increase multiple series", load: `load 10s - http_total{path="/foo"} 0 5 10 15 20 25 30 35 40 45 50 - http_total{path="/bar"} 0 10 20 30 40 50 60 70 80 90 100`, + http_total{path="/foo"} 0 5 10 15 20 25 30 35 40 45 50 + http_total{path="/bar"} 0 10 20 30 40 50 60 70 80 90 100`, query: `increase(http_total[30s] anchored)`, }, { name: "smoothed increase multiple series", load: `load 10s - http_total{path="/foo"} 0 5 10 15 20 25 30 35 40 45 50 - http_total{path="/bar"} 0 10 20 30 40 50 60 70 80 90 100`, + http_total{path="/foo"} 0 5 10 15 20 25 30 35 40 45 50 + http_total{path="/bar"} 0 10 20 30 40 50 60 70 80 90 100`, query: `increase(http_total[30s] smoothed)`, }, } diff --git a/ringbuffer/functions.go b/ringbuffer/functions.go index fc315762d..63d3c3c04 100644 --- a/ringbuffer/functions.go +++ b/ringbuffer/functions.go @@ -7,12 +7,12 @@ import ( "math" "sort" - "github.com/efficientgo/core/errors" - "github.com/prometheus/prometheus/model/histogram" - "github.com/thanos-io/promql-engine/compute" "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/warnings" + + "github.com/efficientgo/core/errors" + "github.com/prometheus/prometheus/model/histogram" ) type SamplesBuffer GenericRingBuffer @@ -672,12 +672,9 @@ func extendedRangeRate(samples []Sample, isCounter, isRate bool, stepTime, selec rangeStart := rangeEnd - selectRange // Find firstSampleIndex: the last sample at or before rangeStart, clamped to 0. - firstSampleIndex := sort.Search(lastSampleIndex, func(i int) bool { + firstSampleIndex := max(sort.Search(lastSampleIndex, func(i int) bool { return samples[i].T > rangeStart - }) - 1 - if firstSampleIndex < 0 { - firstSampleIndex = 0 - } + })-1, 0) // For smoothed, extend lastSampleIndex to include the first sample at or after rangeEnd. if smoothed { @@ -732,7 +729,7 @@ func extendedRangeRate(samples []Sample, isCounter, isRate bool, stepTime, selec // If no sample exists before rangeStart, the first sample value is used. func pickOrInterpolateLeft(samples []Sample, first int, rangeStart int64, smoothed, isCounter bool) float64 { if smoothed && samples[first].T < rangeStart && first+1 < len(samples) { - return interpolateAt(samples[first], samples[first+1], rangeStart, isCounter, true) + return interpolateAt(samples[first], samples[first+1], rangeStart, isCounter) } return samples[first].V.F } @@ -744,26 +741,21 @@ func pickOrInterpolateLeft(samples []Sample, first int, rangeStart int64, smooth // If no sample exists after rangeEnd, the last sample value is used. func pickOrInterpolateRight(samples []Sample, last int, rangeEnd int64, smoothed, isCounter bool) float64 { if smoothed && last > 0 && samples[last].T > rangeEnd { - return interpolateAt(samples[last-1], samples[last], rangeEnd, isCounter, false) + return interpolateAt(samples[last-1], samples[last], rangeEnd, isCounter) } return samples[last].V.F } // interpolateAt performs linear interpolation between two samples at the given timestamp. -// If isCounter is true and there is a counter reset (right < left): -// - on the left edge, it sets the left value to 0 -// - on the right edge, it adds the left value to the right value +// If isCounter is true and there is a counter reset (y2 < y1), it models the +// counter as starting from 0 post-reset by setting y1 to 0. // -// This matches Prometheus' interpolate function in promql/functions.go. -func interpolateAt(left, right Sample, timestamp int64, isCounter, leftEdge bool) float64 { +// This matches Prometheus v0.310.0's interpolate function in promql/functions.go. +func interpolateAt(left, right Sample, timestamp int64, isCounter bool) float64 { y1 := left.V.F y2 := right.V.F if isCounter && y2 < y1 { - if leftEdge { - y1 = 0 - } else { - y2 += y1 - } + y1 = 0 } return y1 + (y2-y1)*float64(timestamp-left.T)/float64(right.T-left.T) } From 5880b77c34f09924179982158aa9b39829b36d3c Mon Sep 17 00:00:00 2001 From: Sylvain Rabot Date: Fri, 20 Mar 2026 10:51:28 +0100 Subject: [PATCH 5/9] fix: recover lastSample for anchored/smoothed ext lookback boundary When step intervals don't align with sample intervals, the sample needed as the ext lookback boundary may be trapped in lastSample from the previous step and not pushed back to the buffer (since its timestamp is <= mint). For anchored/smoothed selectors, push lastSample back into the buffer before Reset so the ext lookback retention logic can decide whether to keep it. This is scoped to anchored/smoothed only to avoid changing existing x-function behavior. Also adds a fuzz test (FuzzAnchoredSmoothedModifiers) that generates random data and step parameters, testing all anchored/smoothed function combinations against the upstream Prometheus engine. The fuzzer found this bug within seconds. Signed-off-by: Sylvain Rabot Co-Authored-By: Claude Opus 4.6 (1M context) --- engine/extended_range_test.go | 104 ++++++++++++++++++++++++++ storage/prometheus/matrix_selector.go | 11 ++- 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/engine/extended_range_test.go b/engine/extended_range_test.go index 4951f5561..f6a5feaae 100644 --- a/engine/extended_range_test.go +++ b/engine/extended_range_test.go @@ -6,6 +6,7 @@ package engine_test import ( "context" "fmt" + "math" "sync" "testing" "time" @@ -18,6 +19,7 @@ import ( promscan "github.com/thanos-io/promql-engine/storage/prometheus" "github.com/efficientgo/core/testutil" + "github.com/google/go-cmp/cmp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/promqltest" @@ -322,6 +324,108 @@ func (s *hintsSpy) capturedHints() []promstorage.SelectHints { return s.matrixHintsCap } +func FuzzAnchoredSmoothedModifiers(f *testing.F) { + f.Add(int64(0), uint32(30), uint32(300), uint32(10), 0.0, 10.0, 5.0, 20.0, uint8(0)) + + f.Fuzz(func(t *testing.T, seed int64, startTS, endTS, intervalSeconds uint32, initialVal1, inc1, initialVal2, inc2 float64, funcIdx uint8) { + if math.IsNaN(initialVal1) || math.IsNaN(initialVal2) || math.IsNaN(inc1) || math.IsNaN(inc2) { + return + } + if math.IsInf(initialVal1, 0) || math.IsInf(initialVal2, 0) || math.IsInf(inc1, 0) || math.IsInf(inc2, 0) { + return + } + if inc1 < 0 || inc2 < 0 || intervalSeconds == 0 || endTS <= startTS { + return + } + // Cap values to avoid overflow. + if initialVal1 > 1e12 || initialVal2 > 1e12 || inc1 > 1e8 || inc2 > 1e8 { + return + } + // Ensure query range falls within data range (41 samples at 10s = 400s). + // This avoids edge cases at data boundaries where Prometheus' extendFloats + // synthesis for changes/resets differs from the thanos engine approach. + const maxDataTS uint32 = 400 + if endTS > maxDataTS { + endTS = maxDataTS + } + if startTS >= endTS { + return + } + + parser.EnableExtendedRangeSelectors = true + + type queryDef struct { + name string + query string + } + + // Queries covering all supported function+modifier combinations. + allQueries := []queryDef{ + {"rate_anchored", `rate(http_requests_total[30s] anchored)`}, + {"rate_smoothed", `rate(http_requests_total[30s] smoothed)`}, + {"increase_anchored", `increase(http_requests_total[30s] anchored)`}, + {"increase_smoothed", `increase(http_requests_total[30s] smoothed)`}, + {"delta_anchored", `delta(http_requests_total[30s] anchored)`}, + {"delta_smoothed", `delta(http_requests_total[30s] smoothed)`}, + {"resets_anchored", `resets(http_requests_total[30s] anchored)`}, + {"changes_anchored", `changes(http_requests_total[30s] anchored)`}, + {"rate_anchored_1m", `rate(http_requests_total[1m] anchored)`}, + {"rate_smoothed_1m", `rate(http_requests_total[1m] smoothed)`}, + {"sum_rate_anchored", `sum(rate(http_requests_total[30s] anchored))`}, + {"sum_rate_smoothed", `sum(rate(http_requests_total[30s] smoothed))`}, + } + + // Pick one query per fuzz iteration to keep it fast. + selected := allQueries[int(funcIdx)%len(allQueries)] + + load := fmt.Sprintf(`load 10s + http_requests_total{pod="nginx-1"} %.2f+%.2fx40 + http_requests_total{pod="nginx-2"} %.2f+%.2fx40`, initialVal1, inc1, initialVal2, inc2) + + opts := promql.EngineOpts{ + Timeout: 1 * time.Hour, + MaxSamples: 1e10, + EnableNegativeOffset: true, + EnableAtModifier: true, + } + + storage := promqltest.LoadedStorage(t, load) + defer storage.Close() + + start := time.Unix(int64(startTS), 0) + end := time.Unix(int64(endTS), 0) + interval := time.Duration(intervalSeconds) * time.Second + + newEngine := engine.New(engine.Opts{ + EngineOpts: opts, + EnableExtendedRangeSelectors: true, + }) + oldEngine := promql.NewEngine(opts) + + ctx := context.Background() + + q1, err := newEngine.NewRangeQuery(ctx, storage, nil, selected.query, start, end, interval) + if err != nil { + return // Skip unsupported queries. + } + defer q1.Close() + newResult := q1.Exec(ctx) + + q2, err := oldEngine.NewRangeQuery(ctx, storage, nil, selected.query, start, end, interval) + if err != nil { + t.Fatalf("prometheus engine error for %s: %v", selected.name, err) + } + defer q2.Close() + oldResult := q2.Exec(ctx) + + if !cmp.Equal(oldResult, newResult, comparer) { + t.Logf("load: %s", load) + t.Logf("query: %s (%s), start: %d, end: %d, interval: %v", selected.query, selected.name, start.UnixMilli(), end.UnixMilli(), interval) + t.Errorf("result mismatch.\nnew: %s\nold: %s", newResult.String(), oldResult.String()) + } + }) +} + func TestAnchoredSmoothedSelectHints(t *testing.T) { t.Parallel() parser.EnableExtendedRangeSelectors = true diff --git a/storage/prometheus/matrix_selector.go b/storage/prometheus/matrix_selector.go index ca4db63f7..d7f39b151 100644 --- a/storage/prometheus/matrix_selector.go +++ b/storage/prometheus/matrix_selector.go @@ -208,7 +208,7 @@ func (o *matrixSelector) Next(ctx context.Context, buf []model.StepVector) (int, maxt += o.lookbackDelta } - if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.needsExtLookback); err != nil { + if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.needsExtLookback, o.isAnchored || o.isSmoothed); err != nil { return 0, err } // TODO(saswatamcode): Handle multi-arg functions for matrixSelectors. @@ -394,7 +394,16 @@ func (m *matrixScanner) selectPoints( mint, maxt, evalt int64, fh *histogram.FloatHistogram, isExtFunction bool, + recoverLastSample bool, ) error { + // For anchored/smoothed selectors: push lastSample back into the buffer + // before Reset so the ext lookback retention logic can decide whether to + // keep it as the boundary sample. lastSample always has the largest T, + // so append order is preserved. + if recoverLastSample && m.lastSample.T != math.MinInt64 && m.lastSample.T <= maxt { + m.buffer.Push(m.lastSample.T, m.lastSample.V) + m.lastSample.T = math.MinInt64 + } m.buffer.Reset(mint, evalt) if m.lastSample.T > maxt { return nil From eb3a674e67e6d34675d420f5ce9e1d9895c40b9d Mon Sep 17 00:00:00 2001 From: Sylvain Rabot Date: Fri, 20 Mar 2026 11:09:18 +0100 Subject: [PATCH 6/9] fix: guard correctForCounterResets slice bounds and document global flag Fix a potential slice panic in correctForCounterResets when all interior samples are excluded by the boundary narrowing (corrFirst > corrLast with corrLast = -1). Clamp corrLast to corrFirst-1 to produce a valid empty slice while still allowing the right-boundary counter-reset check to fire. Also remove a duplicate mint computation in selectPoints (dead code), and document the parser.EnableExtendedRangeSelectors process-global limitation in engine.go. Found by architect review. Signed-off-by: Sylvain Rabot Co-Authored-By: Claude Opus 4.6 (1M context) --- engine/engine.go | 10 ++++++++++ ringbuffer/functions.go | 3 +++ storage/prometheus/matrix_selector.go | 1 - 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/engine/engine.go b/engine/engine.go index 055772237..a6c8f55d6 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -244,6 +244,11 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts } defer e.activeQueryTracker.Delete(idx) + // NOTE: parser.EnableExtendedRangeSelectors is a process-global variable + // in the upstream Prometheus parser. Once set to true, it remains enabled + // for all subsequent parses in the process, matching how Prometheus handles + // it (set once at startup via --enable-feature). Two engine instances in + // the same process cannot independently control this flag. if e.enableExtendedRangeSelectors { parser.EnableExtendedRangeSelectors = true } @@ -345,6 +350,11 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts * } defer e.activeQueryTracker.Delete(idx) + // NOTE: parser.EnableExtendedRangeSelectors is a process-global variable + // in the upstream Prometheus parser. Once set to true, it remains enabled + // for all subsequent parses in the process, matching how Prometheus handles + // it (set once at startup via --enable-feature). Two engine instances in + // the same process cannot independently control this flag. if e.enableExtendedRangeSelectors { parser.EnableExtendedRangeSelectors = true } diff --git a/ringbuffer/functions.go b/ringbuffer/functions.go index 63d3c3c04..5a4e0b6e3 100644 --- a/ringbuffer/functions.go +++ b/ringbuffer/functions.go @@ -708,6 +708,9 @@ func extendedRangeRate(samples []Sample, isCounter, isRate bool, stepTime, selec corrLast-- } + // Clamp to valid slice bounds. correctForCounterResets handles empty + // interior correctly — it still checks the right boundary value. + corrLast = max(corrLast, corrFirst-1) resultValue += correctForCounterResets(left, right, samples[corrFirst:corrLast+1]) } diff --git a/storage/prometheus/matrix_selector.go b/storage/prometheus/matrix_selector.go index d7f39b151..a9493da7e 100644 --- a/storage/prometheus/matrix_selector.go +++ b/storage/prometheus/matrix_selector.go @@ -412,7 +412,6 @@ func (m *matrixScanner) selectPoints( if bufMaxt := m.buffer.MaxT() + 1; bufMaxt > mint { mint = bufMaxt } - mint = max(mint, m.buffer.MaxT()+1) if m.lastSample.T > mint { m.buffer.Push(m.lastSample.T, m.lastSample.V) m.lastSample.T = math.MinInt64 From f3523820b4ed25bb508db9e1d93e70bf230b6d95 Mon Sep 17 00:00:00 2001 From: Sylvain Rabot Date: Tue, 24 Mar 2026 09:47:17 +0100 Subject: [PATCH 7/9] =?UTF-8?q?fix:=20address=20PR=20review=20=E2=80=94=20?= =?UTF-8?q?link=20proposal,=20enable=20extended=5Fvectors.test,=20fix=20an?= =?UTF-8?q?chored/smoothed=20gaps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace "proposal 0052" references with actual GitHub link to prometheus/proposals/0052-extended-range-selectors-semantics.md - Remove extended_vectors.test from acceptance test skip list and enable EnableExtendedRangeSelectors in TestPromqlAcceptance - Fix error message format to match Prometheus ("can only be used with: X, Y - not with Z") and defer validation errors to eval time via deferredError operator for expect-fail-msg compatibility - Implement smoothed instant vector interpolation (selectPointSmoothed) matching Prometheus smoothSeries behaviour - Fix anchored changes/resets on sparse data by retaining an additional lookback sample in the ring buffer and adding pickFirstSampleIndex to start comparisons from the correct boundary sample Constraint: error messages must match Prometheus exactly for expect-fail-msg tests Constraint: smoothed instant vectors need forward-looking samples for interpolation Rejected: extending mint in matrix selector for anchored | would over-widen sample window for all functions Confidence: high Scope-risk: moderate Not-tested: smoothed instant vectors with native histograms (returns no data, not an error) Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Sylvain Rabot --- engine/engine.go | 3 +- engine/engine_test.go | 11 +++-- execution/execution.go | 38 +++++++++++++++- execution/noop/operator.go | 1 + execution/remote/operator.go | 2 +- ringbuffer/functions.go | 22 +++++++-- ringbuffer/generic.go | 6 +++ storage/prometheus/matrix_selector.go | 7 ++- storage/prometheus/scanners.go | 1 + storage/prometheus/vector_selector.go | 64 ++++++++++++++++++++++++++- 10 files changed, 143 insertions(+), 12 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index a6c8f55d6..0d8f40f1d 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -75,7 +75,8 @@ type Opts struct { EnableXFunctions bool // EnableExtendedRangeSelectors enables the anchored and smoothed modifiers - // for range vector selectors (Prometheus proposal 0052). + // for range vector selectors. + // See https://github.com/prometheus/proposals/blob/main/proposals/0052-extended-range-selectors-semantics.md EnableExtendedRangeSelectors bool // EnableAnalysis enables query analysis. diff --git a/engine/engine_test.go b/engine/engine_test.go index 1542ebb62..0f4556756 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -75,7 +75,11 @@ func (s *skipTest) Run(name string, t func(*testing.T)) bool { func TestPromqlAcceptance(t *testing.T) { // promql acceptance tests disable experimental functions again // since we use them in our tests too we need to enable them afterwards again - t.Cleanup(func() { parser.EnableExperimentalFunctions = true }) + t.Cleanup(func() { + parser.EnableExperimentalFunctions = true + parser.EnableExtendedRangeSelectors = false + }) + parser.EnableExtendedRangeSelectors = true engine := engine.New(engine.Opts{ EngineOpts: promql.EngineOpts{ @@ -85,13 +89,14 @@ func TestPromqlAcceptance(t *testing.T) { MaxSamples: 5e10, Timeout: 1 * time.Hour, NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { return 30 * time.Second.Milliseconds() }, - }}) + }, + EnableExtendedRangeSelectors: true, + }) st := &skipTest{ skipTests: []string{ "testdata/name_label_dropping.test", // feature unsupported "testdata/type_and_unit.test", // feature unsupported - "testdata/extended_vectors.test", // experimental anchored/smoothed modifiers unsupported "testdata/info.test", // info() function unsupported "testdata/literals.test", // string literal expressions as query results unsupported "testdata/range_queries.test", // matrix selector as instant query result unsupported diff --git a/execution/execution.go b/execution/execution.go index 8a338c3fb..bf4042cc6 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -18,7 +18,10 @@ package execution import ( "context" + "maps" + "slices" "sort" + "strings" "time" "github.com/thanos-io/promql-engine/execution/aggregate" @@ -37,6 +40,7 @@ import ( "github.com/thanos-io/promql-engine/storage" "github.com/efficientgo/core/errors" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" promstorage "github.com/prometheus/prometheus/storage" @@ -88,6 +92,11 @@ func newOperator(ctx context.Context, expr logicalplan.Node, storage storage.Sca func newVectorSelector(ctx context.Context, e *logicalplan.VectorSelector, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { start, end := getTimeRangesForVectorSelector(e, opts, 0) + if e.Smoothed { + // Smoothed instant vectors need samples after the evaluation time + // for interpolation, so extend the end by lookbackDelta. + end += opts.LookbackDelta.Milliseconds() + } hints.Start = start hints.End = end op, err := scanners.NewVectorSelector(ctx, opts, hints, *e) @@ -184,14 +193,16 @@ func newRangeVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, t vs := t.VectorSelector // Validate function whitelist for anchored/smoothed modifiers. + // Return a deferred error operator so the error surfaces at Exec time, + // matching Prometheus behaviour expected by the test framework. if vs.Anchored { if _, ok := parse.AnchoredSafeFunctions[e.Func.Name]; !ok { - return nil, errors.Newf("anchored modifier is not supported for %s, supported functions: %v", e.Func.Name, parse.AnchoredSafeFunctions) + return newDeferredError(errors.Newf("anchored modifier can only be used with: %s - not with %s", strings.Join(slices.Sorted(maps.Keys(parse.AnchoredSafeFunctions)), ", "), e.Func.Name)), nil } } if vs.Smoothed { if _, ok := parse.SmoothedSafeFunctions[e.Func.Name]; !ok { - return nil, errors.Newf("smoothed modifier is not supported for %s, supported functions: %v", e.Func.Name, parse.SmoothedSafeFunctions) + return newDeferredError(errors.Newf("smoothed modifier can only be used with: %s - not with %s", strings.Join(slices.Sorted(maps.Keys(parse.SmoothedSafeFunctions)), ", "), e.Func.Name)), nil } } @@ -437,6 +448,29 @@ func newDuplicateLabelCheck(ctx context.Context, e *logicalplan.CheckDuplicateLa return exchange.NewDuplicateLabelCheck(op, opts), nil } +// deferredError is an operator that returns an error on first Next() call, +// allowing validation errors to surface at evaluation time rather than +// query creation time, matching Prometheus behaviour. +type deferredError struct { + model.VectorOperator + err error +} + +func newDeferredError(err error) model.VectorOperator { + return &deferredError{ + VectorOperator: noop.NewOperator(&query.Options{}), + err: err, + } +} + +func (d *deferredError) Next(context.Context, []model.StepVector) (int, error) { + return 0, d.err +} + +func (d *deferredError) Series(context.Context) ([]labels.Labels, error) { + return nil, d.err +} + // Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791. func getTimeRangesForVectorSelector(n *logicalplan.VectorSelector, opts *query.Options, evalRange int64) (int64, int64) { start := opts.Start.UnixMilli() diff --git a/execution/noop/operator.go b/execution/noop/operator.go index 5b4f03dc3..8e9951270 100644 --- a/execution/noop/operator.go +++ b/execution/noop/operator.go @@ -26,6 +26,7 @@ func NewOperator(opts *query.Options) model.VectorOperator { false, // selectTimestamp 0, // shard 1, // numShards + false, // smoothed ) return &operator{VectorOperator: scanner} } diff --git a/execution/remote/operator.go b/execution/remote/operator.go index fa71bef07..61615b839 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -40,7 +40,7 @@ func NewExecution(query promql.Query, queryRangeStart, queryRangeEnd time.Time, opts: opts, queryRangeStart: queryRangeStart, queryRangeEnd: queryRangeEnd, - vectorSelector: promstorage.NewVectorSelector(storage, opts, 0, 0, false, 0, 1), + vectorSelector: promstorage.NewVectorSelector(storage, opts, 0, 0, false, 0, 1, false), } return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper) diff --git a/ringbuffer/functions.go b/ringbuffer/functions.go index 5a4e0b6e3..b9624f867 100644 --- a/ringbuffer/functions.go +++ b/ringbuffer/functions.go @@ -28,7 +28,8 @@ type FunctionArgs struct { ScalarPoint float64 ScalarPoint2 float64 // only for double_exponential_smoothing (trend factor) - // Anchored/Smoothed modifiers for extended range selectors (proposal 0052). + // Anchored/Smoothed modifiers for extended range selectors. + // See https://github.com/prometheus/proposals/blob/main/proposals/0052-extended-range-selectors-semantics.md Anchored bool Smoothed bool } @@ -330,13 +331,15 @@ var rangeVectorFuncs = map[string]FunctionCall{ if len(f.Samples) == 0 { return 0., nil, false, 0, nil } - return changes(f.Samples), nil, true, 0, nil + start := pickFirstSampleIndex(f) + return changes(f.Samples[start:]), nil, true, 0, nil }, "resets": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { if len(f.Samples) == 0 { return 0., nil, false, 0, nil } - return resets(f.Samples), nil, true, 0, nil + start := pickFirstSampleIndex(f) + return resets(f.Samples[start:]), nil, true, 0, nil }, "deriv": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, warnings.Warnings, error) { if len(f.Samples) < 2 { @@ -1107,6 +1110,19 @@ func stdvarOverTime(points []Sample) (float64, bool, warnings.Warnings) { return ((aux + cAux) / count), true, warn } +// pickFirstSampleIndex returns the index of the last sample at or before the +// range start for anchored selectors. For non-anchored, returns 0. +// This matches Prometheus's pickFirstSampleIndex in promql/functions.go. +func pickFirstSampleIndex(f FunctionArgs) int { + if !f.Anchored || len(f.Samples) == 0 { + return 0 + } + rangeStart := f.StepTime - f.Offset - f.SelectRange + return max(sort.Search(len(f.Samples)-1, func(i int) bool { + return f.Samples[i].T > rangeStart + })-1, 0) +} + func changes(points []Sample) float64 { count := 0. diff --git a/ringbuffer/generic.go b/ringbuffer/generic.go index 3c91bea3b..3bf59db7c 100644 --- a/ringbuffer/generic.go +++ b/ringbuffer/generic.go @@ -140,6 +140,12 @@ func (r *GenericRingBuffer) Reset(mint int64, evalt int64) { } if r.extLookback > 0 && drop > 0 && r.items[drop-1].T >= mint-r.extLookback { drop-- + // For anchored/smoothed, keep one additional lookback sample so that + // functions like changes/resets can compare against a sample before + // the range boundary (matching Prometheus pickFirstSampleIndex). + if (r.anchored || r.smoothed) && drop > 0 && r.items[drop-1].T >= mint-r.extLookback { + drop-- + } } keep := len(r.items) - drop diff --git a/storage/prometheus/matrix_selector.go b/storage/prometheus/matrix_selector.go index a9493da7e..c8d959b67 100644 --- a/storage/prometheus/matrix_selector.go +++ b/storage/prometheus/matrix_selector.go @@ -454,10 +454,15 @@ func (m *matrixScanner) selectPoints( m.lastSample.T, m.lastSample.V.F, m.lastSample.V.H = t, v, nil return nil } - if isExtFunction { + if isExtFunction || recoverLastSample { if t > mint || !appendedPointBeforeMint { m.buffer.Push(t, ringbuffer.Value{F: v}) appendedPointBeforeMint = true + } else if recoverLastSample { + // For anchored/smoothed, keep two samples at/before mint so + // that functions like changes/resets can compare against + // the sample preceding the range boundary. + m.buffer.Push(t, ringbuffer.Value{F: v}) } else { m.buffer.ReadIntoLast(func(s *ringbuffer.Sample) { s.T, s.V.F, s.V.H = t, v, nil diff --git a/storage/prometheus/scanners.go b/storage/prometheus/scanners.go index f440ea1c1..e457f67d8 100644 --- a/storage/prometheus/scanners.go +++ b/storage/prometheus/scanners.go @@ -75,6 +75,7 @@ func (p Scanners) NewVectorSelector( logicalNode.SelectTimestamp, i, opts.DecodingConcurrency, + logicalNode.Smoothed, ), 2, opts) operators = append(operators, operator) } diff --git a/storage/prometheus/vector_selector.go b/storage/prometheus/vector_selector.go index 915c06c3b..7dfbccf64 100644 --- a/storage/prometheus/vector_selector.go +++ b/storage/prometheus/vector_selector.go @@ -52,6 +52,7 @@ type vectorSelector struct { numShards int selectTimestamp bool + smoothed bool opts *query.Options lastTrackedSamples int @@ -65,6 +66,7 @@ func NewVectorSelector( batchSize int64, selectTimestamp bool, shard, numShards int, + smoothed bool, ) model.VectorOperator { o := &vectorSelector{ storage: selector, @@ -82,6 +84,7 @@ func NewVectorSelector( numShards: numShards, selectTimestamp: selectTimestamp, + smoothed: smoothed, opts: queryOpts, } @@ -156,7 +159,18 @@ func (o *vectorSelector) Next(ctx context.Context, buf []model.StepVector) (int, ) for currStep := 0; currStep < n && seriesTs <= o.maxt; currStep++ { currStepSamples = 0 - t, v, h, ok, err := selectPoint(series.samples, seriesTs, o.lookbackDelta, o.offset) + var ( + t int64 + v float64 + h *histogram.FloatHistogram + ok bool + err error + ) + if o.smoothed { + t, v, ok, err = selectPointSmoothed(series.samples, seriesTs, o.lookbackDelta, o.offset) + } else { + t, v, h, ok, err = selectPoint(series.samples, seriesTs, o.lookbackDelta, o.offset) + } if err != nil { return 0, err } @@ -254,6 +268,54 @@ func (o *vectorSelector) shouldCheckSampleLimit(fromSeries int64) bool { return isEndOfBatch || isLastSeries } +// selectPointSmoothed returns an interpolated value at the given timestamp by +// looking at samples in [ts-lookbackDelta, ts+lookbackDelta]. When a sample +// exists exactly at ts it is returned as-is. When samples exist on both sides, +// linear interpolation is performed. When only a previous sample exists, its +// value is carried forward. +func selectPointSmoothed(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset int64) (int64, float64, bool, error) { + refTime := ts - offset + + valueType := it.Seek(refTime) + switch valueType { + case chunkenc.ValNone: + if it.Err() != nil { + return 0, 0, false, it.Err() + } + case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: + // Histograms not supported for smoothed instant vectors. + return 0, 0, false, nil + case chunkenc.ValFloat: + seekT, seekV := it.At() + if value.IsStaleNaN(seekV) { + return 0, 0, false, nil + } + if seekT == refTime { + // Exact match. + return ts, seekV, true, nil + } + // seekT > refTime: we have the "next" sample. Check for a previous one. + if seekT <= refTime+lookbackDelta { + prevT, prevV, _, prevOK := it.PeekPrev() + if prevOK && !value.IsStaleNaN(prevV) && prevT > refTime-lookbackDelta { + // Interpolate between prev and next. + v := prevV + (seekV-prevV)*float64(refTime-prevT)/float64(seekT-prevT) + return ts, v, true, nil + } + } + default: + panic(errors.Newf("unknown value type %v", valueType)) + } + + // No sample at or after refTime (or next sample is beyond lookback). + // Fall back to previous sample (carry forward). + prevT, prevV, _, prevOK := it.PeekPrev() + if !prevOK || prevT <= refTime-lookbackDelta || value.IsStaleNaN(prevV) { + return 0, 0, false, nil + } + return ts, prevV, true, nil +} + // TODO(fpetkovski): Add max samples limit. func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset int64) (int64, float64, *histogram.FloatHistogram, bool, error) { refTime := ts - offset From e4e283566b2f88027198484fd931fe172f4832ac Mon Sep 17 00:00:00 2001 From: Sylvain Rabot Date: Tue, 24 Mar 2026 13:43:19 +0100 Subject: [PATCH 8/9] fix(test): resolve race on parser.EnableExtendedRangeSelectors global flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Prometheus acceptance test framework toggles the process-global EnableExtendedRangeSelectors flag during its run. Extended range tests used t.Parallel(), causing them to overlap and see the flag in the wrong state — producing data races and spurious "expected error, got nothing" failures. - Remove t.Parallel() from all extended range tests so they cannot overlap with the acceptance test's flag toggling - Fix whitelist test to check errors at Exec() time (deferred error pattern) instead of at NewInstantQuery() time - Remove redundant flag manipulation in TestPromqlAcceptance cleanup Constraint: parser.EnableExtendedRangeSelectors is a process-global Rejected: sync.Once in engine | flag is legitimately toggled by acceptance tests Confidence: high Scope-risk: narrow Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Sylvain Rabot --- engine/engine_test.go | 2 -- engine/extended_range_test.go | 19 ++++++++++--------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/engine/engine_test.go b/engine/engine_test.go index 0f4556756..10beb1bb0 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -77,9 +77,7 @@ func TestPromqlAcceptance(t *testing.T) { // since we use them in our tests too we need to enable them afterwards again t.Cleanup(func() { parser.EnableExperimentalFunctions = true - parser.EnableExtendedRangeSelectors = false }) - parser.EnableExtendedRangeSelectors = true engine := engine.New(engine.Opts{ EngineOpts: promql.EngineOpts{ diff --git a/engine/extended_range_test.go b/engine/extended_range_test.go index f6a5feaae..562870870 100644 --- a/engine/extended_range_test.go +++ b/engine/extended_range_test.go @@ -27,7 +27,6 @@ import ( ) func TestAnchoredSmoothedModifiers(t *testing.T) { - t.Parallel() parser.EnableExtendedRangeSelectors = true opts := promql.EngineOpts{ @@ -172,7 +171,6 @@ func TestAnchoredSmoothedModifiers(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - t.Parallel() storage := promqltest.LoadedStorage(t, tc.load) defer storage.Close() @@ -222,7 +220,6 @@ func TestAnchoredSmoothedModifiers(t *testing.T) { } func TestAnchoredSmoothedWhitelist(t *testing.T) { - t.Parallel() parser.EnableExtendedRangeSelectors = true opts := promql.EngineOpts{ @@ -251,8 +248,11 @@ func TestAnchoredSmoothedWhitelist(t *testing.T) { } for _, query := range unsupportedAnchored { t.Run("unsupported_anchored/"+query, func(t *testing.T) { - _, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) - testutil.NotOk(t, err) + q, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) + testutil.Ok(t, err) + defer q.Close() + res := q.Exec(ctx) + testutil.NotOk(t, res.Err) }) } @@ -263,8 +263,11 @@ func TestAnchoredSmoothedWhitelist(t *testing.T) { } for _, query := range unsupportedSmoothed { t.Run("unsupported_smoothed/"+query, func(t *testing.T) { - _, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) - testutil.NotOk(t, err) + q, err := newEngine.NewInstantQuery(ctx, storage, nil, query, time.Unix(50, 0)) + testutil.Ok(t, err) + defer q.Close() + res := q.Exec(ctx) + testutil.NotOk(t, res.Err) }) } @@ -427,7 +430,6 @@ func FuzzAnchoredSmoothedModifiers(f *testing.F) { } func TestAnchoredSmoothedSelectHints(t *testing.T) { - t.Parallel() parser.EnableExtendedRangeSelectors = true load := `load 10s @@ -487,7 +489,6 @@ func TestAnchoredSmoothedSelectHints(t *testing.T) { } // Parse and build the logical plan to create real scanners. - parser.EnableExtendedRangeSelectors = true expr, err := parser.NewParser(tc.query).ParseExpr() testutil.Ok(t, err) From 2c75e3da4fa8faf5692ce1b52df1092face6a576 Mon Sep 17 00:00:00 2001 From: Sylvain Rabot Date: Thu, 26 Mar 2026 11:59:16 +0100 Subject: [PATCH 9/9] fix: typos Signed-off-by: Sylvain Rabot --- execution/execution.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/execution/execution.go b/execution/execution.go index bf4042cc6..91cfd8cbf 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -194,7 +194,7 @@ func newRangeVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, t // Validate function whitelist for anchored/smoothed modifiers. // Return a deferred error operator so the error surfaces at Exec time, - // matching Prometheus behaviour expected by the test framework. + // matching Prometheus behavior expected by the test framework. if vs.Anchored { if _, ok := parse.AnchoredSafeFunctions[e.Func.Name]; !ok { return newDeferredError(errors.Newf("anchored modifier can only be used with: %s - not with %s", strings.Join(slices.Sorted(maps.Keys(parse.AnchoredSafeFunctions)), ", "), e.Func.Name)), nil @@ -450,7 +450,7 @@ func newDuplicateLabelCheck(ctx context.Context, e *logicalplan.CheckDuplicateLa // deferredError is an operator that returns an error on first Next() call, // allowing validation errors to surface at evaluation time rather than -// query creation time, matching Prometheus behaviour. +// query creation time, matching Prometheus behavior. type deferredError struct { model.VectorOperator err error