diff --git a/engine/engine.go b/engine/engine.go index 3a4c47b7..9004e374 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -81,6 +81,18 @@ type Opts struct { // This check can produce false positives when querying time-series data which does not conform to the Prometheus data model, // and can be disabled if it leads to false positives. DisableDuplicateLabelChecks bool + + // EnableHighOverlapBatching reduces memory for queries with long lookback windows. + // Requires DisableDuplicateLabelChecks=true to exceed the 64-step limit. + EnableHighOverlapBatching bool + + // HighOverlapBatchSize is the series batch size for high-overlap queries. + // Defaults to 1000. + HighOverlapBatchSize int64 + + // HighOverlapThreshold is the overlap threshold that triggers the optimization. + // Defaults to 100. + HighOverlapThreshold int64 } // QueryOpts implements promql.QueryOpts but allows to override more engine default options. @@ -173,6 +185,17 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine { } selectorBatchSize := opts.SelectorBatchSize + enableHighOverlapBatching := opts.EnableHighOverlapBatching + + highOverlapBatchSize := opts.HighOverlapBatchSize + if highOverlapBatchSize == 0 { + highOverlapBatchSize = 1000 + } + + highOverlapThreshold := opts.HighOverlapThreshold + if highOverlapThreshold == 0 { + highOverlapThreshold = 100 + } var queryTracker promql.QueryTracker = nopQueryTracker{} if opts.ActiveQueryTracker != nil { queryTracker = opts.ActiveQueryTracker @@ -198,6 +221,10 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine { }, decodingConcurrency: decodingConcurrency, selectorBatchSize: selectorBatchSize, + enableHighOverlapBatching: enableHighOverlapBatching, + highOverlapBatchSize: highOverlapBatchSize, + highOverlapThreshold: highOverlapThreshold, + } } @@ -227,6 +254,10 @@ type Engine struct { selectorBatchSize int64 enableAnalysis bool noStepSubqueryIntervalFn func(time.Duration) time.Duration + + enableHighOverlapBatching bool + highOverlapBatchSize int64 + highOverlapThreshold int64 } func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) { @@ -246,7 +277,7 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts resultSort := newResultSort(expr) qOpts := e.makeQueryOpts(ts, ts, 0, opts) - if qOpts.StepsBatch > 64 { + if !e.disableDuplicateLabelChecks && qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge } @@ -292,7 +323,7 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab defer e.activeQueryTracker.Delete(idx) qOpts := e.makeQueryOpts(ts, ts, 0, opts) - if qOpts.StepsBatch > 64 { + if !e.disableDuplicateLabelChecks && qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge } planOpts := logicalplan.PlanOptions{ @@ -344,7 +375,7 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts * return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) } qOpts := e.makeQueryOpts(start, end, step, opts) - if qOpts.StepsBatch > 64 { + if !e.disableDuplicateLabelChecks && qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge } planOpts := logicalplan.PlanOptions{ @@ -389,7 +420,7 @@ func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable defer e.activeQueryTracker.Delete(idx) qOpts := e.makeQueryOpts(start, end, step, opts) - if qOpts.StepsBatch > 64 { + if !e.disableDuplicateLabelChecks && qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge } planOpts := logicalplan.PlanOptions{ @@ -474,7 +505,12 @@ func (e *Engine) getLogicalOptimizers(opts *QueryOpts) []logicalplan.Optimizer { if opts.SelectorBatchSize != 0 { selectorBatchSize = opts.SelectorBatchSize } - return append(optimizers, logicalplan.SelectorBatchSize{Size: selectorBatchSize}) + return append(optimizers, logicalplan.SelectorBatchSize{ + DefaultBatchSize: selectorBatchSize, + EnableHighOverlapBatching: e.enableHighOverlapBatching, + HighOverlapBatchSize: e.highOverlapBatchSize, + HighOverlapThreshold: e.highOverlapThreshold, + }) } func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) { diff --git a/logicalplan/set_batch_size.go b/logicalplan/set_batch_size.go index ffc666f7..7e27b099 100644 --- a/logicalplan/set_batch_size.go +++ b/logicalplan/set_batch_size.go @@ -13,14 +13,39 @@ import ( // SelectorBatchSize configures the batch size of selector based on // aggregates present in the plan. type SelectorBatchSize struct { - Size int64 + // DefaultBatchSize is the series batch size for standard batching. + // Applied to vector selectors under aggregations. + DefaultBatchSize int64 + + // EnableHighOverlapBatching reduces memory for queries with long lookback windows. + EnableHighOverlapBatching bool + + // HighOverlapBatchSize is the series batch size for high-overlap queries. Defaults to 1000. + HighOverlapBatchSize int64 + + // HighOverlapThreshold is the overlap threshold that triggers the optimization. Defaults to 100. + HighOverlapThreshold int64 } // Optimize configures the batch size of selector based on the query plan. // If any aggregate is present in the plan, the batch size is set to the configured value. // The two exceptions where this cannot be done is if the aggregate is quantile, or // when a binary expression precedes the aggregate. -func (m SelectorBatchSize) Optimize(plan Node, _ *query.Options) (Node, annotations.Annotations) { +// +// If EnableHighOverlapBatching is true, this optimizer also detects high-overlap queries +// and switches to high-overlap batching by setting StepsBatch to TotalSteps and reducing +// the series batch size. +func (m SelectorBatchSize) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) { + if m.EnableHighOverlapBatching && opts != nil { + m.applyHighOverlapBatching(plan, opts) + } + + m.applyStandardBatchSize(plan) + + return plan, nil +} + +func (m SelectorBatchSize) applyStandardBatchSize(plan Node) { canBatch := false Traverse(&plan, func(current *Node) { switch e := (*current).(type) { @@ -39,10 +64,56 @@ func (m SelectorBatchSize) Optimize(plan Node, _ *query.Options) (Node, annotati canBatch = true case *VectorSelector: if canBatch { - e.BatchSize = m.Size + e.BatchSize = m.DefaultBatchSize } canBatch = false } }) - return plan, nil +} + +func (m SelectorBatchSize) applyHighOverlapBatching(plan Node, opts *query.Options) { + overlapThreshold := m.HighOverlapThreshold + if overlapThreshold == 0 { + overlapThreshold = 100 + } + + seriesBatchSize := m.HighOverlapBatchSize + if seriesBatchSize == 0 { + seriesBatchSize = 1000 + } + + vectorSelectors := make(map[*VectorSelector]bool) + shouldBatch := false + + Traverse(&plan, func(current *Node) { + ms, ok := (*current).(*MatrixSelector) + if !ok { + return + } + + selectRangeMs := ms.Range.Milliseconds() + stepMs := opts.Step.Milliseconds() + if stepMs == 0 { + stepMs = 1 + } + + overlap := (selectRangeMs-1)/stepMs + 1 + totalSteps := int64(opts.TotalSteps()) + if overlap > totalSteps { + overlap = totalSteps + } + + if overlap > overlapThreshold { + vectorSelectors[ms.VectorSelector] = true + shouldBatch = true + } + }) + + if shouldBatch { + opts.StepsBatch = opts.TotalSteps() + + for vs := range vectorSelectors { + vs.BatchSize = seriesBatchSize + } + } } diff --git a/ringbuffer/generic.go b/ringbuffer/generic.go index 20315587..f7182d37 100644 --- a/ringbuffer/generic.go +++ b/ringbuffer/generic.go @@ -18,6 +18,7 @@ type Buffer interface { Reset(mint int64, evalt int64) Eval(ctx context.Context, _, _ float64, _ int64) (float64, *histogram.FloatHistogram, bool, error) SampleCount() int + Clear() // to handle extlookback properly, only used by buffers that implement xincrease or xrate ReadIntoLast(f func(*Sample)) @@ -145,6 +146,11 @@ func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, scalarA }) } +func (r *GenericRingBuffer) Clear() { + r.items = r.items[:0] + r.currentStep = 0 +} + func resize(s []Sample, n int) []Sample { if cap(s) >= n { return s[:n] diff --git a/ringbuffer/overtime.go b/ringbuffer/overtime.go index 10a5f9f4..701ac09a 100644 --- a/ringbuffer/overtime.go +++ b/ringbuffer/overtime.go @@ -128,6 +128,23 @@ func (r *OverTimeBuffer) SampleCount() int { return r.stepRanges[0].sampleCount } +func (r *OverTimeBuffer) Clear() { + r.lastTimestamp = math.MinInt64 + + for i := range r.firstTimestamps { + r.firstTimestamps[i] = math.MaxInt64 + } + + for i := range r.stepStates { + r.stepStates[i].acc.Reset(0) + r.stepStates[i].warn = nil + } + + for i := range r.stepRanges { + r.stepRanges[i].numSamples = 0 + r.stepRanges[i].sampleCount = 0 + } +} func (r *OverTimeBuffer) MaxT() int64 { return r.lastTimestamp } func (r *OverTimeBuffer) Push(t int64, v Value) { diff --git a/ringbuffer/pool.go b/ringbuffer/pool.go new file mode 100644 index 00000000..465831c6 --- /dev/null +++ b/ringbuffer/pool.go @@ -0,0 +1,41 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +package ringbuffer + +// BufferPool manages a pool of reusable ring buffers for memory efficiency. +// Buffers are pre-allocated and accessed via round-robin indexing. +type BufferPool struct { + // Pre-allocated buffers for deterministic behavior + buffers []Buffer + size int +} + +// NewBufferPool creates a new buffer pool with the specified size. +// The factory function is called to create new buffers. +func NewBufferPool(size int, factory func() Buffer) *BufferPool { + if size <= 0 { + size = 1 + } + + buffers := make([]Buffer, size) + for i := range buffers { + buffers[i] = factory() + } + + return &BufferPool{ + buffers: buffers, + size: size, + } +} + +// GetBuffer returns a buffer for the given index. +// Uses modulo to map any index to the pool size. +func (p *BufferPool) GetBuffer(index int) Buffer { + return p.buffers[index%p.size] +} + +// Size returns the number of buffers in the pool. +func (p *BufferPool) Size() int { + return p.size +} diff --git a/ringbuffer/rate.go b/ringbuffer/rate.go index c1612eb9..5e542079 100644 --- a/ringbuffer/rate.go +++ b/ringbuffer/rate.go @@ -194,6 +194,21 @@ func (r *RateBuffer) Eval(ctx context.Context, _, _ float64, _ int64) (float64, func (r *RateBuffer) ReadIntoLast(func(*Sample)) {} +func (r *RateBuffer) Clear() { + r.resets = r.resets[:0] + r.lastSample = Sample{T: math.MinInt64} + r.currentMint = math.MaxInt64 + + for i := range r.firstSamples { + r.firstSamples[i] = Sample{T: math.MaxInt64} + } + + for i := range r.stepRanges { + r.stepRanges[i].numSamples = 0 + r.stepRanges[i].sampleCount = 0 + } +} + func querySteps(o query.Options) int64 { // Instant evaluation is executed as a range evaluation with one step. if o.Step.Milliseconds() == 0 { diff --git a/storage/prometheus/matrix_selector.go b/storage/prometheus/matrix_selector.go index aecf7921..42d3af61 100644 --- a/storage/prometheus/matrix_selector.go +++ b/storage/prometheus/matrix_selector.go @@ -32,7 +32,6 @@ type matrixScanner struct { labels labels.Labels signature uint64 - buffer ringbuffer.Buffer iterator chunkenc.Iterator lastSample ringbuffer.Sample metricAppearedTs int64 @@ -74,6 +73,9 @@ type matrixSelector struct { nonCounterMetric string hasFloats bool + + // Buffer pool for memory efficiency + bufferPool *ringbuffer.BufferPool } var ErrNativeHistogramsNotSupported = errors.New("native histograms are not supported in extended range functions") @@ -175,23 +177,26 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { ts = o.currentStep firstSeries := o.currentSeries for ; o.currentSeries-firstSeries < o.seriesBatchSize && o.currentSeries < int64(len(o.scanners)); o.currentSeries++ { - var ( - scanner = &o.scanners[o.currentSeries] - seriesTs = ts - ) + scanner := &o.scanners[o.currentSeries] + + // Get buffer from pool + buffer := o.bufferPool.GetBuffer(int(o.currentSeries)) + buffer.Clear() + + seriesTs := ts for currStep := 0; currStep < o.numSteps && seriesTs <= o.maxt; currStep++ { maxt := seriesTs - o.offset mint := maxt - o.selectRange - if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.isExtFunction); err != nil { + if err := scanner.selectPoints(buffer, mint, maxt, seriesTs, o.fhReader, o.isExtFunction); err != nil { return nil, err } // TODO(saswatamcode): Handle multi-arg functions for matrixSelectors. // Also, allow operator to exist independently without being nested // under parser.Call by implementing new data model. // https://github.com/thanos-io/promql-engine/issues/39 - f, h, ok, err := scanner.buffer.Eval(ctx, o.scalarArg, o.scalarArg2, scanner.metricAppearedTs) + f, h, ok, err := buffer.Eval(ctx, o.scalarArg, o.scalarArg2, scanner.metricAppearedTs) if err != nil { return nil, err } @@ -204,7 +209,7 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { o.hasFloats = true } } - o.telemetry.IncrementSamplesAtTimestamp(scanner.buffer.SampleCount(), seriesTs) + o.telemetry.IncrementSamplesAtTimestamp(buffer.SampleCount(), seriesTs) seriesTs += o.step } } @@ -228,6 +233,15 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { o.series = make([]labels.Labels, len(series)) var b labels.ScratchBuilder + numSeries := int64(len(series)) + if o.seriesBatchSize == 0 || numSeries < o.seriesBatchSize { + o.seriesBatchSize = numSeries + } + + o.bufferPool = ringbuffer.NewBufferPool(int(o.seriesBatchSize), func() ringbuffer.Buffer { + return o.newBuffer(ctx) + }) + for i, s := range series { lbls := s.Labels() if o.functionName != "last_over_time" { @@ -238,20 +252,19 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { // is reused between Select() calls? lbls = extlabels.DropReserved(lbls, b) } - o.scanners[i] = matrixScanner{ + + scanner := matrixScanner{ labels: lbls, signature: s.Signature, iterator: s.Iterator(nil), lastSample: ringbuffer.Sample{T: math.MinInt64}, - buffer: o.newBuffer(ctx), metricAppearedTs: math.MinInt64, } + + o.scanners[i] = scanner o.series[i] = lbls } - numSeries := int64(len(o.series)) - if o.seriesBatchSize == 0 || numSeries < o.seriesBatchSize { - o.seriesBatchSize = numSeries - } + o.vectorPool.SetStepSize(int(o.seriesBatchSize)) // Add a warning if rate or increase is applied on metrics which are not named like counters. @@ -325,26 +338,27 @@ func (o *matrixSelector) String() string { // are populated from the iterator. // TODO(fpetkovski): Add max samples limit. func (m *matrixScanner) selectPoints( + buffer ringbuffer.Buffer, mint, maxt, evalt int64, fh *histogram.FloatHistogram, isExtFunction bool, ) error { - m.buffer.Reset(mint, evalt) + buffer.Reset(mint, evalt) if m.lastSample.T > maxt { return nil } - if bufMaxt := m.buffer.MaxT() + 1; bufMaxt > mint { + if bufMaxt := buffer.MaxT() + 1; bufMaxt > mint { mint = bufMaxt } - mint = max(mint, m.buffer.MaxT()+1) + mint = max(mint, buffer.MaxT()+1) if m.lastSample.T > mint { - m.buffer.Push(m.lastSample.T, m.lastSample.V) + buffer.Push(m.lastSample.T, m.lastSample.V) m.lastSample.T = math.MinInt64 - mint = max(mint, m.buffer.MaxT()+1) + mint = max(mint, buffer.MaxT()+1) } - appendedPointBeforeMint := !ringbuffer.Empty(m.buffer) + appendedPointBeforeMint := !ringbuffer.Empty(buffer) for valType := m.iterator.Next(); valType != chunkenc.ValNone; valType = m.iterator.Next() { switch valType { case chunkenc.ValHistogram, chunkenc.ValFloatHistogram: @@ -366,7 +380,7 @@ func (m *matrixScanner) selectPoints( return nil } if t > mint { - m.buffer.Push(t, ringbuffer.Value{H: fh}) + buffer.Push(t, ringbuffer.Value{H: fh}) } case chunkenc.ValFloat: t, v := m.iterator.At() @@ -382,16 +396,16 @@ func (m *matrixScanner) selectPoints( } if isExtFunction { if t > mint || !appendedPointBeforeMint { - m.buffer.Push(t, ringbuffer.Value{F: v}) + buffer.Push(t, ringbuffer.Value{F: v}) appendedPointBeforeMint = true } else { - m.buffer.ReadIntoLast(func(s *ringbuffer.Sample) { + buffer.ReadIntoLast(func(s *ringbuffer.Sample) { s.T, s.V.F, s.V.H = t, v, nil }) } } else { if t > mint { - m.buffer.Push(t, ringbuffer.Value{F: v}) + buffer.Push(t, ringbuffer.Value{F: v}) } } }