diff --git a/api/remote.go b/api/remote.go index f1d18c4fc..21b7c2724 100644 --- a/api/remote.go +++ b/api/remote.go @@ -6,6 +6,7 @@ package api import ( "context" "fmt" + "sync" "time" "github.com/prometheus/prometheus/model/labels" @@ -16,8 +17,20 @@ type RemoteQuery interface { fmt.Stringer } +// RemoteEndpoints returns remote engines. +// +// Implementations should use mint and maxt to prune engine metadata +// (e.g., filter TSDBInfos to only those overlapping the time range), +// reducing unnecessary computations in subsequent calls to methods like +// RemoteEngine.LabelSets(). +// +// All available engines should be returned regardless of pruning. type RemoteEndpoints interface { - Engines() []RemoteEngine + // Engines returns remote engines. + // + // If mint and/or maxt of the query is unknown, the caller must pass + // math.MinInt64 and math.MaxInt64 respectively to retrieve unpruned engines. + Engines(mint, maxt int64) []RemoteEngine } type RemoteEngine interface { @@ -40,10 +53,41 @@ type staticEndpoints struct { engines []RemoteEngine } -func (m staticEndpoints) Engines() []RemoteEngine { +func (m staticEndpoints) Engines(mint, maxt int64) []RemoteEngine { return m.engines } func NewStaticEndpoints(engines []RemoteEngine) RemoteEndpoints { return &staticEndpoints{engines: engines} } + +type cachedEndpoints struct { + endpoints RemoteEndpoints + + enginesOnce sync.Once + engines []RemoteEngine +} + +func (l *cachedEndpoints) Engines(mint, maxt int64) []RemoteEngine { + l.enginesOnce.Do(func() { + l.engines = l.endpoints.Engines(mint, maxt) + }) + return l.engines +} + +// NewCachedEndpoints returns an endpoints wrapper that +// resolves and caches engines on first access. +// +// All subsequent Engines calls return cached engines, ignoring any query +// parameters. +func NewCachedEndpoints(endpoints RemoteEndpoints) RemoteEndpoints { + if endpoints == nil { + panic("api.NewCachedEndpoints: endpoints is nil") + } + + if le, ok := endpoints.(*cachedEndpoints); ok { + return le + } + + return &cachedEndpoints{endpoints: endpoints} +} diff --git a/api/remote_test.go b/api/remote_test.go new file mode 100644 index 000000000..b4bd4502d --- /dev/null +++ b/api/remote_test.go @@ -0,0 +1,72 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +package api + +import ( + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/model/labels" +) + +func TestCachedEndpoints(t *testing.T) { + engines := remoteEndpointsFunc(func(mint, maxt int64) []RemoteEngine { + testutil.Equals(t, int64(10), mint) + testutil.Equals(t, int64(20), maxt) + return []RemoteEngine{newEngineMock(0, 1, nil)} + }) + endpoints := NewCachedEndpoints(engines) + + es := endpoints.Engines(10, 20) + testutil.Equals(t, 1, len(es)) +} + +func TestCachedEndpointsCachesEngines(t *testing.T) { + var calls int + engines := remoteEndpointsFunc(func(mint, maxt int64) []RemoteEngine { + calls++ + return []RemoteEngine{ + newEngineMock(100*int64(calls), 1000*int64(calls), nil), + newEngineMock(200*int64(calls), 2000*int64(calls), nil), + } + }) + endpoints := NewCachedEndpoints(engines) + + es1 := endpoints.Engines(10, 10000) + testutil.Equals(t, 2, len(es1)) + + es2 := endpoints.Engines(20, 20000) + testutil.Equals(t, 2, len(es2)) + + testutil.Equals(t, 1, calls) + testutil.Equals(t, es1, es2) + + // Engines must be mutable. + es1[0].(*engineMock).maxT = 1337 + testutil.Equals(t, int64(1337), es1[0].MaxT()) + testutil.Equals(t, int64(1337), es2[0].MaxT()) +} + +type remoteEndpointsFunc func(mint, maxt int64) []RemoteEngine + +func (f remoteEndpointsFunc) Engines(mint, maxt int64) []RemoteEngine { + return f(mint, maxt) +} + +type engineMock struct { + RemoteEngine + minT int64 + maxT int64 + labelSets []labels.Labels + partitionLabelSets []labels.Labels +} + +func (e engineMock) MaxT() int64 { return e.maxT } +func (e engineMock) MinT() int64 { return e.minT } +func (e engineMock) LabelSets() []labels.Labels { return e.labelSets } +func (e engineMock) PartitionLabelSets() []labels.Labels { return e.partitionLabelSets } + +func newEngineMock(mint, maxt int64, labelSets []labels.Labels) *engineMock { + return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets, partitionLabelSets: labelSets} +} diff --git a/engine/distributed.go b/engine/distributed.go index a5f12ab75..c9fa1c691 100644 --- a/engine/distributed.go +++ b/engine/distributed.go @@ -68,6 +68,12 @@ func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q stora // Some clients might only support second precision when executing queries. ts = ts.Truncate(time.Second) + // Cache engines to give optimizers a consistent view of Engines(). + // Some RemoteEndpoints implementations also compute and cache + // MinT() / MaxT() / LabelSets() on the fly, so the cache prevents + // recomputing those fields in each optimizer. + e = api.NewCachedEndpoints(e) + qOpts := fromPromQLOpts(opts) qOpts.LogicalOptimizers = []logicalplan.Optimizer{ logicalplan.PassthroughOptimizer{Endpoints: e}, @@ -84,6 +90,12 @@ func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage end = end.Truncate(time.Second) interval = interval.Truncate(time.Second) + // Cache engines to give optimizers a consistent view of Engines(). + // Some RemoteEndpoints implementations also compute and cache + // MinT() / MaxT() / LabelSets() on the fly, so the cache prevents + // recomputing those fields in each optimizer. + e = api.NewCachedEndpoints(e) + qOpts := fromPromQLOpts(opts) qOpts.LogicalOptimizers = []logicalplan.Optimizer{ logicalplan.PassthroughOptimizer{Endpoints: e}, @@ -98,6 +110,12 @@ func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Query // Some clients might only support second precision when executing queries. ts = ts.Truncate(time.Second) + // Cache engines to give optimizers a consistent view of Engines(). + // Some RemoteEndpoints implementations also compute and cache + // MinT() / MaxT() / LabelSets() on the fly, so the cache prevents + // recomputing those fields in each optimizer. + e = api.NewCachedEndpoints(e) + qOpts := fromPromQLOpts(opts) qOpts.LogicalOptimizers = []logicalplan.Optimizer{ logicalplan.PassthroughOptimizer{Endpoints: e}, @@ -114,6 +132,12 @@ func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryab end = end.Truncate(time.Second) interval = interval.Truncate(time.Second) + // Cache engines to give optimizers a consistent view of Engines(). + // Some RemoteEndpoints implementations also compute and cache + // MinT() / MaxT() / LabelSets() on the fly, so the cache prevents + // recomputing those fields in each optimizer. + e = api.NewCachedEndpoints(e) + qOpts := fromPromQLOpts(opts) qOpts.LogicalOptimizers = []logicalplan.Optimizer{ logicalplan.PassthroughOptimizer{Endpoints: e}, diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index 446ce5d9a..d8e2d37ac 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -157,7 +157,7 @@ type DistributedExecutionOptimizer struct { } func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) { - engines := m.Endpoints.Engines() + engines := m.Endpoints.Engines(MinMaxTime(plan, opts)) sort.Slice(engines, func(i, j int) bool { return engines[i].MinT() < engines[j].MinT() }) diff --git a/logicalplan/passthrough.go b/logicalplan/passthrough.go index c256e80ca..e75aa1e43 100644 --- a/logicalplan/passthrough.go +++ b/logicalplan/passthrough.go @@ -43,40 +43,45 @@ func matchingEngineTime(e api.RemoteEngine, opts *query.Options) bool { } func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) { - engines := m.Endpoints.Engines() - if len(engines) == 1 { - if !matchingEngineTime(engines[0], opts) { - return plan, nil - } - return RemoteExecution{ - Engine: engines[0], - Query: plan.Clone(), - QueryRangeStart: opts.Start, - QueryRangeEnd: opts.End, - }, nil - } - + engines := m.Endpoints.Engines(MinMaxTime(plan, opts)) if len(engines) == 0 { return plan, nil } - matchingLabelsEngines := make([]api.RemoteEngine, 0, len(engines)) + var ( + hasSelector bool + matchingEngines int + firstMatchingEngine api.RemoteEngine + ) TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) { if vs, ok := (*current).(*VectorSelector); ok { + hasSelector = true + for _, e := range engines { if !labelSetsMatch(vs.LabelMatchers, e.LabelSets()...) { continue } - matchingLabelsEngines = append(matchingLabelsEngines, e) + matchingEngines++ + if matchingEngines > 1 { + return true + } + + firstMatchingEngine = e } } return false }) - if len(matchingLabelsEngines) == 1 && matchingEngineTime(matchingLabelsEngines[0], opts) { + // Fallback to all engines. + if !hasSelector && matchingEngines == 0 { + matchingEngines = len(engines) + firstMatchingEngine = engines[0] + } + + if matchingEngines == 1 && matchingEngineTime(firstMatchingEngine, opts) { return RemoteExecution{ - Engine: matchingLabelsEngines[0], + Engine: firstMatchingEngine, Query: plan.Clone(), QueryRangeStart: opts.Start, QueryRangeEnd: opts.End, diff --git a/logicalplan/plan.go b/logicalplan/plan.go index c9cf3f8f5..3f6753301 100644 --- a/logicalplan/plan.go +++ b/logicalplan/plan.go @@ -30,7 +30,6 @@ var DefaultOptimizers = []Optimizer{ type Plan interface { Optimize([]Optimizer) (Plan, annotations.Annotations) Root() Node - MinMaxTime(*query.Options) (int64, int64) } type Optimizer interface { @@ -152,15 +151,19 @@ func extractFuncFromPath(p []*Node) string { return extractFuncFromPath(p[:len(p)-1]) } -func (p *plan) MinMaxTime(qOpts *query.Options) (int64, int64) { +func (p *plan) Root() Node { + return p.expr +} + +// MinMaxTime returns the min and max timestamp that any selector in the query +// can read. +func MinMaxTime(root Node, qOpts *query.Options) (int64, int64) { var minTimestamp, maxTimestamp int64 = math.MaxInt64, math.MinInt64 // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. // The evaluation of the VectorSelector inside then evaluates the given range and unsets // the variable. var evalRange time.Duration - root := p.Root() - TraverseWithParents(nil, &root, func(parents []*Node, node *Node) { switch n := (*node).(type) { case *VectorSelector: @@ -205,10 +208,6 @@ func (p *plan) Optimize(optimizers []Optimizer) (Plan, annotations.Annotations) return &plan{expr: expr, opts: p.opts}, *annos } -func (p *plan) Root() Node { - return p.expr -} - func Traverse(expr *Node, transform func(*Node)) { children := (*expr).Children() transform(expr) @@ -230,10 +229,7 @@ func TraverseBottomUp(parent *Node, current *Node, transform func(parent *Node, for _, c := range (*current).Children() { stop = TraverseBottomUp(current, c, transform) || stop } - if stop { - return stop - } - return transform(parent, current) + return stop || transform(parent, current) } func replacePrometheusNodes(plan parser.Expr) Node { diff --git a/storage/prometheus/scanners.go b/storage/prometheus/scanners.go index dc7888f57..24831c322 100644 --- a/storage/prometheus/scanners.go +++ b/storage/prometheus/scanners.go @@ -35,7 +35,7 @@ func (s *Scanners) Close() error { func NewPrometheusScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (*Scanners, error) { var min, max int64 if lplan != nil { - min, max = lplan.MinMaxTime(qOpts) + min, max = logicalplan.MinMaxTime(lplan.Root(), qOpts) } else { min, max = qOpts.Start.UnixMilli(), qOpts.End.UnixMilli() } diff --git a/storage/prometheus/scanners_test.go b/storage/prometheus/scanners_test.go index f96cd7698..127f762db 100644 --- a/storage/prometheus/scanners_test.go +++ b/storage/prometheus/scanners_test.go @@ -80,7 +80,7 @@ func TestScannersMinMaxTime(t *testing.T) { plan, _ := logicalplan.NewFromAST(p, qOpts, logicalplan.PlanOptions{}) - min, max := plan.MinMaxTime(qOpts) + min, max := logicalplan.MinMaxTime(plan.Root(), qOpts) require.Equal(t, tcase.min, min) require.Equal(t, tcase.max, max)