Skip to content

Commit a21b994

Browse files
sandy2008claude
andcommitted
Querier: release store-gateway Series stream resources per-request
In fetchSeriesFromStores, each store-gateway Series server-streaming RPC was created with the shared errgroup context (gCtx). errgroup cancels that context only on the first goroutine error or when g.Wait() returns, so a store-gateway request that finished early kept its per-stream gRPC resources alive until the slowest concurrent request in the same query completed. Give each Series stream its own cancellable child of gCtx and cancel it when the goroutine returns via any path, so resources are released promptly and independently. gCtx remains the parent to preserve sibling-failure teardown. CloseSend() is not a fix here: the generated client already calls it, and for a server-streaming RPC it only half-closes the send side. LabelNames and LabelValues are unary and unaffected. Adds regression tests covering both per-stream cancellation on early return and errgroup-context-derived teardown on a sibling error. Fixes #7575 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Sandy Chen <Yuxuan.Chen@morganstanley.com>
1 parent 29b6167 commit a21b994

3 files changed

Lines changed: 266 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
* [BUGFIX] Compactor: Fix stale `cortex_bucket_index_last_successful_update_timestamp_seconds` metric not being cleaned up when tenant ownership changes due to ring rebalancing. This caused false alarms on bucket index update rate when a tenant moved between compactors. #7485
3838
* [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512
3939
* [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515
40+
* [BUGFIX] Querier: Release each store-gateway `Series` gRPC stream's resources as soon as its goroutine returns, instead of holding them until the slowest concurrent store-gateway request in the same query finishes. #7576
4041

4142
## 1.21.0 2026-04-24
4243

pkg/querier/blocks_store_queryable.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,14 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
658658
}
659659

660660
begin := time.Now()
661-
stream, err := c.Series(gCtx, req)
661+
// Use a dedicated cancellable context per stream so the gRPC Series stream's
662+
// context is cancelled (triggering gRPC to tear down the receive side) as soon
663+
// as this goroutine returns via any path, instead of lingering until the shared
664+
// errgroup context is cancelled when the slowest concurrent request finishes.
665+
// See https://github.com/cortexproject/cortex/issues/7575.
666+
streamCtx, cancel := context.WithCancel(gCtx)
667+
defer cancel()
668+
stream, err := c.Series(streamCtx, req)
662669
if err != nil {
663670
if isRetryableError(err) {
664671
level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress()))
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
package querier
2+
3+
import (
4+
"context"
5+
"io"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/oklog/ulid/v2"
11+
"github.com/prometheus/prometheus/model/labels"
12+
"github.com/stretchr/testify/require"
13+
"github.com/thanos-io/thanos/pkg/store/storepb"
14+
"google.golang.org/grpc"
15+
"google.golang.org/grpc/codes"
16+
"google.golang.org/grpc/status"
17+
18+
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
19+
)
20+
21+
// seriesStreamLifetimeClient is a BlocksStoreClient whose Series implementation is
22+
// supplied by the test, so it can capture the per-stream context passed to Series and
23+
// control the returned stream. LabelNames/LabelValues are inherited from the embedded
24+
// storeGatewayClientMock and are never exercised by fetchSeriesFromStores.
25+
type seriesStreamLifetimeClient struct {
26+
storeGatewayClientMock
27+
seriesFn func(ctx context.Context) (storegatewaypb.StoreGateway_SeriesClient, error)
28+
}
29+
30+
func (c *seriesStreamLifetimeClient) Series(ctx context.Context, _ *storepb.SeriesRequest, _ ...grpc.CallOption) (storegatewaypb.StoreGateway_SeriesClient, error) {
31+
return c.seriesFn(ctx)
32+
}
33+
34+
// ctxAwareSeriesClient is a StoreGateway_SeriesClient that streams the queued responses
35+
// (then io.EOF), but fails fast if its stream context is already cancelled. This lets a
36+
// test detect a production change that cancels the per-stream context too early (before
37+
// the response has been fully received).
38+
type ctxAwareSeriesClient struct {
39+
grpc.ClientStream
40+
ctx context.Context
41+
responses []*storepb.SeriesResponse
42+
}
43+
44+
func (s *ctxAwareSeriesClient) Recv() (*storepb.SeriesResponse, error) {
45+
if err := s.ctx.Err(); err != nil {
46+
return nil, err
47+
}
48+
if len(s.responses) == 0 {
49+
return nil, io.EOF
50+
}
51+
resp := s.responses[0]
52+
s.responses = s.responses[1:]
53+
return resp, nil
54+
}
55+
56+
// blockingSeriesClient is a StoreGateway_SeriesClient whose Recv blocks until either its
57+
// stream context is cancelled or the test releases it. Blocking keeps one
58+
// fetchSeriesFromStores goroutine (and thus the shared errgroup context) alive while a
59+
// sibling request runs; honoring ctx cancellation lets a test assert that the stream is
60+
// torn down when the errgroup context is cancelled. Only Recv is exercised by the code
61+
// under test; the embedded nil grpc.ClientStream satisfies the rest of the interface.
62+
type blockingSeriesClient struct {
63+
grpc.ClientStream
64+
ctx context.Context
65+
started chan struct{}
66+
release chan struct{}
67+
}
68+
69+
func (s *blockingSeriesClient) Recv() (*storepb.SeriesResponse, error) {
70+
select {
71+
case s.started <- struct{}{}:
72+
default:
73+
}
74+
select {
75+
case <-s.ctx.Done():
76+
return nil, s.ctx.Err()
77+
case <-s.release:
78+
return nil, io.EOF
79+
}
80+
}
81+
82+
// TestBlocksStoreQuerier_FetchSeriesFromStores_CancelsStreamPerRequest verifies that
83+
// each store-gateway Series stream's context is cancelled as soon as its own fetch
84+
// goroutine returns, instead of lingering until the shared errgroup context is cancelled
85+
// when the slowest concurrent request finishes. See issue #7575.
86+
func TestBlocksStoreQuerier_FetchSeriesFromStores_CancelsStreamPerRequest(t *testing.T) {
87+
minT, maxT := int64(10), int64(20)
88+
block1 := ulid.MustNew(1, nil)
89+
block2 := ulid.MustNew(2, nil)
90+
91+
// Fast store-gateway: capture the context handed to its Series stream, then return a
92+
// single hints frame followed by io.EOF so its goroutine returns quickly. The stream
93+
// is context-aware, so if the production code cancelled the context before the
94+
// response was received the fast request would fail (caught by require.NoError below).
95+
var fastStreamCtx context.Context
96+
fastCtxCaptured := make(chan struct{})
97+
fastClient := &seriesStreamLifetimeClient{
98+
storeGatewayClientMock: storeGatewayClientMock{remoteAddr: "fast"},
99+
seriesFn: func(ctx context.Context) (storegatewaypb.StoreGateway_SeriesClient, error) {
100+
fastStreamCtx = ctx
101+
close(fastCtxCaptured)
102+
return &ctxAwareSeriesClient{
103+
ctx: ctx,
104+
responses: []*storepb.SeriesResponse{mockHintsResponse(block1)},
105+
}, nil
106+
},
107+
}
108+
109+
// Slow store-gateway: block in Recv until released, keeping the errgroup alive.
110+
slowStarted := make(chan struct{}, 1)
111+
release := make(chan struct{})
112+
var releaseOnce sync.Once
113+
doRelease := func() { releaseOnce.Do(func() { close(release) }) }
114+
defer doRelease()
115+
slowClient := &seriesStreamLifetimeClient{
116+
storeGatewayClientMock: storeGatewayClientMock{remoteAddr: "slow"},
117+
seriesFn: func(ctx context.Context) (storegatewaypb.StoreGateway_SeriesClient, error) {
118+
return &blockingSeriesClient{ctx: ctx, started: slowStarted, release: release}, nil
119+
},
120+
}
121+
122+
q := &blocksStoreQuerier{}
123+
clients := map[BlocksStoreClient][]ulid.ULID{
124+
fastClient: {block1},
125+
slowClient: {block2},
126+
}
127+
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric")}
128+
129+
// Capture the returned errors so we can assert the fast request took the success
130+
// path. This rules out a false pass where the fast goroutine errored and cancelled
131+
// the shared errgroup context (which would also fire fastStreamCtx.Done(), but for
132+
// the wrong reason).
133+
var gotErr, gotRetryErr error
134+
done := make(chan struct{})
135+
go func() {
136+
defer close(done)
137+
_, _, _, _, gotErr, gotRetryErr = q.fetchSeriesFromStores(context.Background(), nil, "user-1", clients, minT, maxT, 0, matchers, 0, 0)
138+
}()
139+
140+
// Wait until the slow request is blocked in Recv and the fast request's stream has
141+
// been created (so its goroutine is on its way to returning).
142+
select {
143+
case <-slowStarted:
144+
case <-time.After(5 * time.Second):
145+
doRelease()
146+
<-done
147+
t.Fatal("slow store-gateway Series stream never started")
148+
}
149+
select {
150+
case <-fastCtxCaptured:
151+
case <-time.After(5 * time.Second):
152+
doRelease()
153+
<-done
154+
t.Fatal("fast store-gateway Series stream was never created")
155+
}
156+
157+
require.NotNil(t, fastStreamCtx)
158+
159+
// The fast request has finished; its per-stream context must be cancelled even though
160+
// the slow request is still blocked in Recv. On the buggy code the fast stream shares
161+
// the errgroup context, which stays alive until the slow request returns, so this
162+
// never fires within the timeout.
163+
select {
164+
case <-fastStreamCtx.Done():
165+
// Expected: the fast stream's context was cancelled independently.
166+
case <-time.After(2 * time.Second):
167+
doRelease()
168+
<-done
169+
t.Fatal("fast store-gateway Series stream context was not cancelled while a concurrent request was still in-flight")
170+
}
171+
172+
// Release the slow request and make sure fetchSeriesFromStores returns. Use t.Error
173+
// (not t.Fatal) so we don't runtime.Goexit the test goroutine while the worker may
174+
// still be running; release is already closed, so there is nothing left to unblock.
175+
doRelease()
176+
select {
177+
case <-done:
178+
// Both store-gateway requests succeeded, confirming the cancellation observed
179+
// above came from the fast request's own per-stream cancel, not from an error
180+
// cancelling the shared errgroup context.
181+
require.NoError(t, gotErr)
182+
require.NoError(t, gotRetryErr)
183+
case <-time.After(5 * time.Second):
184+
t.Error("fetchSeriesFromStores did not return after releasing the slow request")
185+
}
186+
}
187+
188+
// TestBlocksStoreQuerier_FetchSeriesFromStores_SiblingErrorCancelsStreamContext verifies
189+
// that each Series stream's context is derived from the errgroup context (gCtx): when one
190+
// store-gateway request fails, the errgroup cancels gCtx, which must tear down the other,
191+
// still-in-flight Series streams. A version that derived the per-stream context from a
192+
// non-errgroup parent (e.g. the incoming ctx) would leave the blocked stream running and
193+
// fetchSeriesFromStores would hang. See issue #7575.
194+
func TestBlocksStoreQuerier_FetchSeriesFromStores_SiblingErrorCancelsStreamContext(t *testing.T) {
195+
minT, maxT := int64(10), int64(20)
196+
block1 := ulid.MustNew(1, nil)
197+
block2 := ulid.MustNew(2, nil)
198+
199+
// Blocked store-gateway: its Recv blocks until its stream context is cancelled (or the
200+
// deferred release fires as a safety net). The test never releases it on the happy
201+
// path — cancellation must come from the errgroup. It signals blockedStarted once it
202+
// is actually parked inside Recv.
203+
release := make(chan struct{})
204+
var releaseOnce sync.Once
205+
doRelease := func() { releaseOnce.Do(func() { close(release) }) }
206+
defer doRelease()
207+
blockedStarted := make(chan struct{}, 1)
208+
blockedClient := &seriesStreamLifetimeClient{
209+
storeGatewayClientMock: storeGatewayClientMock{remoteAddr: "blocked"},
210+
seriesFn: func(ctx context.Context) (storegatewaypb.StoreGateway_SeriesClient, error) {
211+
return &blockingSeriesClient{ctx: ctx, started: blockedStarted, release: release}, nil
212+
},
213+
}
214+
215+
// Failing store-gateway: waits until the blocked stream is parked inside Recv, then
216+
// returns a non-retryable error so the errgroup cancels gCtx (and therefore every
217+
// per-stream child context). Sequencing the error after the block guarantees the
218+
// blocked goroutine is committed to Recv (past the loop's gCtx.Err() guard), so the
219+
// only thing that can unblock it is cancellation of its own stream context — which
220+
// happens iff that context is derived from gCtx. This makes the wrong-parent
221+
// regression deterministic rather than scheduling-dependent.
222+
failingClient := &seriesStreamLifetimeClient{
223+
storeGatewayClientMock: storeGatewayClientMock{remoteAddr: "failing"},
224+
seriesFn: func(_ context.Context) (storegatewaypb.StoreGateway_SeriesClient, error) {
225+
select {
226+
case <-blockedStarted:
227+
case <-time.After(5 * time.Second):
228+
}
229+
return nil, status.Error(codes.Internal, "boom")
230+
},
231+
}
232+
233+
q := &blocksStoreQuerier{}
234+
clients := map[BlocksStoreClient][]ulid.ULID{
235+
blockedClient: {block1},
236+
failingClient: {block2},
237+
}
238+
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric")}
239+
240+
done := make(chan struct{})
241+
go func() {
242+
defer close(done)
243+
_, _, _, _, _, _ = q.fetchSeriesFromStores(context.Background(), nil, "user-1", clients, minT, maxT, 0, matchers, 0, 0)
244+
}()
245+
246+
// fetchSeriesFromStores must return promptly: the failing sibling cancels gCtx, which
247+
// cancels the blocked stream's context and unblocks its Recv. The test does not
248+
// release the blocked stream itself. On a version that derived the per-stream context
249+
// from a non-errgroup parent, the blocked Recv would never be cancelled and this hangs.
250+
select {
251+
case <-done:
252+
case <-time.After(2 * time.Second):
253+
doRelease()
254+
<-done
255+
t.Fatal("fetchSeriesFromStores did not return after a sibling store-gateway error; per-stream context is not derived from the errgroup context")
256+
}
257+
}

0 commit comments

Comments
 (0)