Skip to content

Commit 4da3392

Browse files
CharlieTLeclaude
andauthored
Fix flaky TestDistributorQuerier_QueryIngestersWithinBoundary by injecting clock (#7419)
The test was flaky because it captured `time.Now()` at setup, but `distributorQuerier.Select()` called `time.Now()` again internally. On slow CI runners, the clock drift could exceed the 10-second margin in the "maxT well after lookback boundary" subtest, causing the query to short-circuit with an empty result. Inject a `nowFn` function into `distributorQuerier` (defaulting to `time.Now`) so tests can freeze time. Replace the `InDelta` assertion with an exact `Equal` now that the clock is deterministic. Closes #7415 Signed-off-by: Charlie Le <charlie_le@apple.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6fac9f4 commit 4da3392

File tree

4 files changed

+24
-17
lines changed

4 files changed

+24
-17
lines changed

pkg/querier/distributor_queryable.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ type Distributor interface {
4343
MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error)
4444
}
4545

46-
func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int, limits *validation.Overrides) QueryableWithFilter {
46+
func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int, limits *validation.Overrides, nowFn func() time.Time) QueryableWithFilter {
47+
if nowFn == nil {
48+
nowFn = time.Now
49+
}
4750
return distributorQueryable{
4851
distributor: distributor,
4952
streamingMetdata: streamingMetdata,
@@ -52,6 +55,7 @@ func newDistributorQueryable(distributor Distributor, streamingMetdata bool, lab
5255
isPartialDataEnabled: isPartialDataEnabled,
5356
ingesterQueryMaxAttempts: ingesterQueryMaxAttempts,
5457
limits: limits,
58+
nowFn: nowFn,
5559
}
5660
}
5761

@@ -63,6 +67,7 @@ type distributorQueryable struct {
6367
isPartialDataEnabled partialdata.IsCfgEnabledFunc
6468
ingesterQueryMaxAttempts int
6569
limits *validation.Overrides
70+
nowFn func() time.Time
6671
}
6772

6873
func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) {
@@ -76,6 +81,7 @@ func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error)
7681
isPartialDataEnabled: d.isPartialDataEnabled,
7782
ingesterQueryMaxAttempts: d.ingesterQueryMaxAttempts,
7883
limits: d.limits,
84+
nowFn: d.nowFn,
7985
}, nil
8086
}
8187
func (d distributorQueryable) UseQueryable(now time.Time, userID string, _, queryMaxT int64) bool {
@@ -93,6 +99,7 @@ type distributorQuerier struct {
9399
isPartialDataEnabled partialdata.IsCfgEnabledFunc
94100
ingesterQueryMaxAttempts int
95101
limits *validation.Overrides
102+
nowFn func() time.Time
96103
}
97104

98105
// Select implements storage.Querier interface.
@@ -116,7 +123,7 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st
116123
// optimization is particularly important for the blocks storage where the blocks retention in the
117124
// ingesters could be way higher than queryIngestersWithin.
118125
if queryIngestersWithin > 0 {
119-
now := time.Now()
126+
now := q.nowFn()
120127
origMinT := minT
121128
minT = max(minT, util.TimeToMillis(now.Add(-queryIngestersWithin)))
122129

pkg/querier/distributor_queryable_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
9696
limits.QueryIngestersWithin = model.Duration(testData.queryIngestersWithin)
9797
overrides := validation.NewOverrides(limits, nil)
9898

99-
queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, nil, 1, overrides)
99+
queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, nil, 1, overrides, nil)
100100
querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT)
101101
require.NoError(t, err)
102102

@@ -136,7 +136,7 @@ func TestDistributorQueryableFilter(t *testing.T) {
136136
limits.QueryIngestersWithin = model.Duration(1 * time.Hour)
137137
overrides := validation.NewOverrides(limits, nil)
138138

139-
dq := newDistributorQueryable(d, false, true, nil, nil, 1, overrides)
139+
dq := newDistributorQueryable(d, false, true, nil, nil, 1, overrides, nil)
140140

141141
now := time.Now()
142142

@@ -192,7 +192,7 @@ func TestIngesterStreaming(t *testing.T) {
192192

193193
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool {
194194
return partialDataEnabled
195-
}, 1, overrides)
195+
}, 1, overrides, nil)
196196
querier, err := queryable.Querier(mint, maxt)
197197
require.NoError(t, err)
198198

@@ -363,7 +363,7 @@ func TestDistributorQuerier_Retry(t *testing.T) {
363363

364364
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool {
365365
return true
366-
}, ingesterQueryMaxAttempts, overrides)
366+
}, ingesterQueryMaxAttempts, overrides, nil)
367367
querier, err := queryable.Querier(mint, maxt)
368368
require.NoError(t, err)
369369

@@ -421,7 +421,7 @@ func TestDistributorQuerier_Select_CancelledContext_NoRetry(t *testing.T) {
421421
overrides := validation.NewOverrides(limits, nil)
422422
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool {
423423
return true
424-
}, ingesterQueryMaxAttempts, overrides)
424+
}, ingesterQueryMaxAttempts, overrides, nil)
425425
querier, err := queryable.Querier(mint, maxt)
426426
require.NoError(t, err)
427427

@@ -455,7 +455,7 @@ func TestDistributorQuerier_Select_CancelledContext(t *testing.T) {
455455
overrides := validation.NewOverrides(limits, nil)
456456
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool {
457457
return true
458-
}, ingesterQueryMaxAttempts, overrides)
458+
}, ingesterQueryMaxAttempts, overrides, nil)
459459
querier, err := queryable.Querier(mint, maxt)
460460
require.NoError(t, err)
461461

@@ -480,7 +480,7 @@ func TestDistributorQuerier_Labels_CancelledContext(t *testing.T) {
480480
overrides := validation.NewOverrides(limits, nil)
481481
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool {
482482
return true
483-
}, ingesterQueryMaxAttempts, overrides)
483+
}, ingesterQueryMaxAttempts, overrides, nil)
484484
querier, err := queryable.Querier(mint, maxt)
485485
require.NoError(t, err)
486486

@@ -537,7 +537,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) {
537537

538538
queryable := newDistributorQueryable(d, streamingEnabled, labelNamesWithMatchers, nil, func(string) bool {
539539
return partialDataEnabled
540-
}, 1, overrides)
540+
}, 1, overrides, nil)
541541
querier, err := queryable.Querier(mint, maxt)
542542
require.NoError(t, err)
543543

@@ -625,7 +625,7 @@ func TestDistributorQuerier_QueryIngestersWithinBoundary(t *testing.T) {
625625
limits.QueryIngestersWithin = model.Duration(lookback)
626626
overrides := validation.NewOverrides(limits, nil)
627627

628-
queryable := newDistributorQueryable(distributor, false, true, nil, nil, 1, overrides)
628+
queryable := newDistributorQueryable(distributor, false, true, nil, nil, 1, overrides, func() time.Time { return now })
629629
querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT)
630630
require.NoError(t, err)
631631

@@ -636,7 +636,7 @@ func TestDistributorQuerier_QueryIngestersWithinBoundary(t *testing.T) {
636636
assert.Len(t, distributor.Calls, 0, testData.description)
637637
} else {
638638
require.Len(t, distributor.Calls, 1, testData.description)
639-
assert.InDelta(t, testData.expectedMinT, int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), float64(15*time.Second.Milliseconds()), testData.description)
639+
assert.Equal(t, testData.expectedMinT, int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), testData.description)
640640
assert.Equal(t, testData.expectedMaxT, int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), testData.description)
641641
}
642642
})

pkg/querier/querier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc {
198198
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) {
199199
iteratorFunc := getChunksIteratorFunction(cfg)
200200

201-
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits)
201+
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits, nil)
202202

203203
ns := make([]QueryableWithFilter, len(stores))
204204
for ix, s := range stores {

pkg/querier/querier_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {
301301
limits := DefaultLimitsConfig()
302302
testOverrides := validation.NewOverrides(limits, nil)
303303

304-
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides)
304+
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil)
305305

306306
tCases := []struct {
307307
name string
@@ -450,7 +450,7 @@ func TestLimits(t *testing.T) {
450450
limits := DefaultLimitsConfig()
451451
testOverrides := validation.NewOverrides(limits, nil)
452452

453-
distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides)
453+
distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil)
454454

455455
tCases := []struct {
456456
name string
@@ -1824,11 +1824,11 @@ func TestQuerier_ProjectionHints(t *testing.T) {
18241824
var distributorQueryable QueryableWithFilter
18251825
if testData.queryIngesters {
18261826
// Ingesters will be queried
1827-
distributorQueryable = newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides)
1827+
distributorQueryable = newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil)
18281828
} else {
18291829
// Ingesters will not be queried (time range is too old)
18301830
distributorQueryable = UseBeforeTimestampQueryable(
1831-
newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides),
1831+
newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil),
18321832
start.Add(-1*time.Hour),
18331833
)
18341834
}

0 commit comments

Comments
 (0)