Skip to content

Commit d6daab4

Browse files
committed
execution: push limits into operator telemetry
Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
1 parent 76c8ade commit d6daab4

30 files changed

Lines changed: 164 additions & 336 deletions

engine/engine.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
446446
EnableAnalysis: e.enableAnalysis,
447447
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
448448
DecodingConcurrency: e.decodingConcurrency,
449-
SampleTracker: query.NewSampleTracker(e.maxSamplesPerQuery),
449+
SampleLimiter: telemetry.NewSampleLimiter(e.maxSamplesPerQuery, start.UnixMilli(), end.UnixMilli(), telemetry.StepTrackingInterval(step)),
450450
}
451451

452452
if opts == nil {
@@ -459,7 +459,6 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
459459
if opts.EnablePerStepStats() {
460460
res.EnablePerStepStats = opts.EnablePerStepStats()
461461
}
462-
463462
if opts.DecodingConcurrency != 0 {
464463
res.DecodingConcurrency = opts.DecodingConcurrency
465464
}

engine/explain.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/thanos-io/promql-engine/execution/model"
1010
"github.com/thanos-io/promql-engine/execution/telemetry"
11-
"github.com/thanos-io/promql-engine/logicalplan"
1211

1312
"github.com/prometheus/prometheus/promql"
1413
)
@@ -64,10 +63,10 @@ func (a *AnalyzeOutputNode) aggregateSamples() {
6463
childPeak := child.PeakSamples()
6564
a.peakSamples = max(a.peakSamples, childPeak)
6665

67-
switch a.OperatorTelemetry.LogicalNode().(type) {
68-
case *logicalplan.Subquery:
66+
switch {
67+
case a.OperatorTelemetry.IsSubquery():
6968
// Skip aggregating samples for subquery
70-
case *logicalplan.StepInvariantExpr:
69+
case a.OperatorTelemetry.IsStepInvariant():
7170
childSamples := child.TotalSamples()
7271
for i := range a.totalSamplesPerStep {
7372
a.totalSamples += childSamples

execution/aggregate/count_values.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func NewCountValues(next model.VectorOperator, param string, by bool, grouping [
4949
by: by,
5050
grouping: grouping,
5151
}
52-
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts), op)
52+
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), op)
5353
}
5454

5555
func (c *countValuesOperator) Explain() []model.VectorOperator {

execution/aggregate/hashaggregate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func NewHashAggregate(
6969
params: make([]float64, opts.StepsBatch),
7070
}
7171

72-
return telemetry.NewOperator(telemetry.NewTelemetry(a, opts), a), nil
72+
return telemetry.NewOperator(telemetry.NewTelemetry(a, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), a), nil
7373
}
7474

7575
func (a *aggregate) String() string {

execution/aggregate/khashaggregate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func NewKHashAggregate(
8080
stepsBatch: opts.StepsBatch,
8181
}
8282

83-
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts), op), nil
83+
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), op), nil
8484
}
8585

8686
func (a *kAggregate) Next(ctx context.Context, buf []model.StepVector) (int, error) {

execution/binary/scalar.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func NewScalar(
5656
stepsBatch: opts.StepsBatch,
5757
}
5858

59-
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts), op), nil
59+
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), op), nil
6060
}
6161

6262
func (o *scalarOperator) Explain() (next []model.VectorOperator) {

execution/binary/vector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func NewVectorOperator(
7070
stepsBatch: opts.StepsBatch,
7171
}
7272

73-
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts), op), nil
73+
return telemetry.NewOperator(telemetry.NewTelemetry(op, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), op), nil
7474
}
7575

7676
func (o *vectorOperator) String() string {

execution/exchange/coalesce.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func NewCoalesce(opts *query.Options, batchSize int64, operators ...model.Vector
6363
batchSize: batchSize,
6464
}
6565

66-
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
66+
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
6767
}
6868

6969
func (c *coalesce) Explain() (next []model.VectorOperator) {

execution/exchange/concurrent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func NewConcurrent(next model.VectorOperator, bufferSize int, opts *query.Option
4747
returnChan: make(chan []model.StepVector, bufferSize+2),
4848
}
4949

50-
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
50+
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
5151
}
5252

5353
func (c *concurrencyOperator) Explain() (next []model.VectorOperator) {

execution/exchange/dedup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func NewDedupOperator(next model.VectorOperator, opts *query.Options) model.Vect
4444
oper := &dedupOperator{
4545
next: next,
4646
}
47-
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
47+
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts.EnableAnalysis, opts.EnablePerStepStats, opts.Start.UnixMilli(), opts.End.UnixMilli(), opts.Step, opts.SampleLimiter), oper)
4848
}
4949

5050
func (d *dedupOperator) Next(ctx context.Context, buf []model.StepVector) (int, error) {

0 commit comments

Comments
 (0)