|
| 1 | +package querier |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "io" |
| 6 | + "sync" |
| 7 | + "testing" |
| 8 | + "time" |
| 9 | + |
| 10 | + "github.com/oklog/ulid/v2" |
| 11 | + "github.com/prometheus/prometheus/model/labels" |
| 12 | + "github.com/stretchr/testify/require" |
| 13 | + "github.com/thanos-io/thanos/pkg/store/storepb" |
| 14 | + "google.golang.org/grpc" |
| 15 | + "google.golang.org/grpc/codes" |
| 16 | + "google.golang.org/grpc/status" |
| 17 | + |
| 18 | + "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" |
| 19 | +) |
| 20 | + |
| 21 | +// seriesStreamLifetimeClient is a BlocksStoreClient whose Series implementation is |
| 22 | +// supplied by the test, so it can capture the per-stream context passed to Series and |
| 23 | +// control the returned stream. LabelNames/LabelValues are inherited from the embedded |
| 24 | +// storeGatewayClientMock and are never exercised by fetchSeriesFromStores. |
| 25 | +type seriesStreamLifetimeClient struct { |
| 26 | + storeGatewayClientMock |
| 27 | + seriesFn func(ctx context.Context) (storegatewaypb.StoreGateway_SeriesClient, error) |
| 28 | +} |
| 29 | + |
| 30 | +func (c *seriesStreamLifetimeClient) Series(ctx context.Context, _ *storepb.SeriesRequest, _ ...grpc.CallOption) (storegatewaypb.StoreGateway_SeriesClient, error) { |
| 31 | + return c.seriesFn(ctx) |
| 32 | +} |
| 33 | + |
| 34 | +// ctxAwareSeriesClient is a StoreGateway_SeriesClient that streams the queued responses |
| 35 | +// (then io.EOF), but fails fast if its stream context is already cancelled. This lets a |
| 36 | +// test detect a production change that cancels the per-stream context too early (before |
| 37 | +// the response has been fully received). |
| 38 | +type ctxAwareSeriesClient struct { |
| 39 | + grpc.ClientStream |
| 40 | + ctx context.Context |
| 41 | + responses []*storepb.SeriesResponse |
| 42 | +} |
| 43 | + |
| 44 | +func (s *ctxAwareSeriesClient) Recv() (*storepb.SeriesResponse, error) { |
| 45 | + if err := s.ctx.Err(); err != nil { |
| 46 | + return nil, err |
| 47 | + } |
| 48 | + if len(s.responses) == 0 { |
| 49 | + return nil, io.EOF |
| 50 | + } |
| 51 | + resp := s.responses[0] |
| 52 | + s.responses = s.responses[1:] |
| 53 | + return resp, nil |
| 54 | +} |
| 55 | + |
| 56 | +// blockingSeriesClient is a StoreGateway_SeriesClient whose Recv blocks until either its |
| 57 | +// stream context is cancelled or the test releases it. Blocking keeps one |
| 58 | +// fetchSeriesFromStores goroutine (and thus the shared errgroup context) alive while a |
| 59 | +// sibling request runs; honoring ctx cancellation lets a test assert that the stream is |
| 60 | +// torn down when the errgroup context is cancelled. Only Recv is exercised by the code |
| 61 | +// under test; the embedded nil grpc.ClientStream satisfies the rest of the interface. |
| 62 | +type blockingSeriesClient struct { |
| 63 | + grpc.ClientStream |
| 64 | + ctx context.Context |
| 65 | + started chan struct{} |
| 66 | + release chan struct{} |
| 67 | +} |
| 68 | + |
| 69 | +func (s *blockingSeriesClient) Recv() (*storepb.SeriesResponse, error) { |
| 70 | + select { |
| 71 | + case s.started <- struct{}{}: |
| 72 | + default: |
| 73 | + } |
| 74 | + select { |
| 75 | + case <-s.ctx.Done(): |
| 76 | + return nil, s.ctx.Err() |
| 77 | + case <-s.release: |
| 78 | + return nil, io.EOF |
| 79 | + } |
| 80 | +} |
| 81 | + |
| 82 | +// TestBlocksStoreQuerier_FetchSeriesFromStores_CancelsStreamPerRequest verifies that |
| 83 | +// each store-gateway Series stream's context is cancelled as soon as its own fetch |
| 84 | +// goroutine returns, instead of lingering until the shared errgroup context is cancelled |
| 85 | +// when the slowest concurrent request finishes. See issue #7575. |
| 86 | +func TestBlocksStoreQuerier_FetchSeriesFromStores_CancelsStreamPerRequest(t *testing.T) { |
| 87 | + minT, maxT := int64(10), int64(20) |
| 88 | + block1 := ulid.MustNew(1, nil) |
| 89 | + block2 := ulid.MustNew(2, nil) |
| 90 | + |
| 91 | + // Fast store-gateway: capture the context handed to its Series stream, then return a |
| 92 | + // single hints frame followed by io.EOF so its goroutine returns quickly. The stream |
| 93 | + // is context-aware, so if the production code cancelled the context before the |
| 94 | + // response was received the fast request would fail (caught by require.NoError below). |
| 95 | + var fastStreamCtx context.Context |
| 96 | + fastCtxCaptured := make(chan struct{}) |
| 97 | + fastClient := &seriesStreamLifetimeClient{ |
| 98 | + storeGatewayClientMock: storeGatewayClientMock{remoteAddr: "fast"}, |
| 99 | + seriesFn: func(ctx context.Context) (storegatewaypb.StoreGateway_SeriesClient, error) { |
| 100 | + fastStreamCtx = ctx |
| 101 | + close(fastCtxCaptured) |
| 102 | + return &ctxAwareSeriesClient{ |
| 103 | + ctx: ctx, |
| 104 | + responses: []*storepb.SeriesResponse{mockHintsResponse(block1)}, |
| 105 | + }, nil |
| 106 | + }, |
| 107 | + } |
| 108 | + |
| 109 | + // Slow store-gateway: block in Recv until released, keeping the errgroup alive. |
| 110 | + slowStarted := make(chan struct{}, 1) |
| 111 | + release := make(chan struct{}) |
| 112 | + var releaseOnce sync.Once |
| 113 | + doRelease := func() { releaseOnce.Do(func() { close(release) }) } |
| 114 | + defer doRelease() |
| 115 | + slowClient := &seriesStreamLifetimeClient{ |
| 116 | + storeGatewayClientMock: storeGatewayClientMock{remoteAddr: "slow"}, |
| 117 | + seriesFn: func(ctx context.Context) (storegatewaypb.StoreGateway_SeriesClient, error) { |
| 118 | + return &blockingSeriesClient{ctx: ctx, started: slowStarted, release: release}, nil |
| 119 | + }, |
| 120 | + } |
| 121 | + |
| 122 | + q := &blocksStoreQuerier{} |
| 123 | + clients := map[BlocksStoreClient][]ulid.ULID{ |
| 124 | + fastClient: {block1}, |
| 125 | + slowClient: {block2}, |
| 126 | + } |
| 127 | + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric")} |
| 128 | + |
| 129 | + // Capture the returned errors so we can assert the fast request took the success |
| 130 | + // path. This rules out a false pass where the fast goroutine errored and cancelled |
| 131 | + // the shared errgroup context (which would also fire fastStreamCtx.Done(), but for |
| 132 | + // the wrong reason). |
| 133 | + var gotErr, gotRetryErr error |
| 134 | + done := make(chan struct{}) |
| 135 | + go func() { |
| 136 | + defer close(done) |
| 137 | + _, _, _, _, gotErr, gotRetryErr = q.fetchSeriesFromStores(context.Background(), nil, "user-1", clients, minT, maxT, 0, matchers, 0, 0) |
| 138 | + }() |
| 139 | + |
| 140 | + // Wait until the slow request is blocked in Recv and the fast request's stream has |
| 141 | + // been created (so its goroutine is on its way to returning). |
| 142 | + select { |
| 143 | + case <-slowStarted: |
| 144 | + case <-time.After(5 * time.Second): |
| 145 | + doRelease() |
| 146 | + <-done |
| 147 | + t.Fatal("slow store-gateway Series stream never started") |
| 148 | + } |
| 149 | + select { |
| 150 | + case <-fastCtxCaptured: |
| 151 | + case <-time.After(5 * time.Second): |
| 152 | + doRelease() |
| 153 | + <-done |
| 154 | + t.Fatal("fast store-gateway Series stream was never created") |
| 155 | + } |
| 156 | + |
| 157 | + require.NotNil(t, fastStreamCtx) |
| 158 | + |
| 159 | + // The fast request has finished; its per-stream context must be cancelled even though |
| 160 | + // the slow request is still blocked in Recv. On the buggy code the fast stream shares |
| 161 | + // the errgroup context, which stays alive until the slow request returns, so this |
| 162 | + // never fires within the timeout. |
| 163 | + select { |
| 164 | + case <-fastStreamCtx.Done(): |
| 165 | + // Expected: the fast stream's context was cancelled independently. |
| 166 | + case <-time.After(2 * time.Second): |
| 167 | + doRelease() |
| 168 | + <-done |
| 169 | + t.Fatal("fast store-gateway Series stream context was not cancelled while a concurrent request was still in-flight") |
| 170 | + } |
| 171 | + |
| 172 | + // Release the slow request and make sure fetchSeriesFromStores returns. Use t.Error |
| 173 | + // (not t.Fatal) so we don't runtime.Goexit the test goroutine while the worker may |
| 174 | + // still be running; release is already closed, so there is nothing left to unblock. |
| 175 | + doRelease() |
| 176 | + select { |
| 177 | + case <-done: |
| 178 | + // Both store-gateway requests succeeded, confirming the cancellation observed |
| 179 | + // above came from the fast request's own per-stream cancel, not from an error |
| 180 | + // cancelling the shared errgroup context. |
| 181 | + require.NoError(t, gotErr) |
| 182 | + require.NoError(t, gotRetryErr) |
| 183 | + case <-time.After(5 * time.Second): |
| 184 | + t.Error("fetchSeriesFromStores did not return after releasing the slow request") |
| 185 | + } |
| 186 | +} |
| 187 | + |
| 188 | +// TestBlocksStoreQuerier_FetchSeriesFromStores_SiblingErrorCancelsStreamContext verifies |
| 189 | +// that each Series stream's context is derived from the errgroup context (gCtx): when one |
| 190 | +// store-gateway request fails, the errgroup cancels gCtx, which must tear down the other, |
| 191 | +// still-in-flight Series streams. A version that derived the per-stream context from a |
| 192 | +// non-errgroup parent (e.g. the incoming ctx) would leave the blocked stream running and |
| 193 | +// fetchSeriesFromStores would hang. See issue #7575. |
| 194 | +func TestBlocksStoreQuerier_FetchSeriesFromStores_SiblingErrorCancelsStreamContext(t *testing.T) { |
| 195 | + minT, maxT := int64(10), int64(20) |
| 196 | + block1 := ulid.MustNew(1, nil) |
| 197 | + block2 := ulid.MustNew(2, nil) |
| 198 | + |
| 199 | + // Blocked store-gateway: its Recv blocks until its stream context is cancelled (or the |
| 200 | + // deferred release fires as a safety net). The test never releases it on the happy |
| 201 | + // path — cancellation must come from the errgroup. It signals blockedStarted once it |
| 202 | + // is actually parked inside Recv. |
| 203 | + release := make(chan struct{}) |
| 204 | + var releaseOnce sync.Once |
| 205 | + doRelease := func() { releaseOnce.Do(func() { close(release) }) } |
| 206 | + defer doRelease() |
| 207 | + blockedStarted := make(chan struct{}, 1) |
| 208 | + blockedClient := &seriesStreamLifetimeClient{ |
| 209 | + storeGatewayClientMock: storeGatewayClientMock{remoteAddr: "blocked"}, |
| 210 | + seriesFn: func(ctx context.Context) (storegatewaypb.StoreGateway_SeriesClient, error) { |
| 211 | + return &blockingSeriesClient{ctx: ctx, started: blockedStarted, release: release}, nil |
| 212 | + }, |
| 213 | + } |
| 214 | + |
| 215 | + // Failing store-gateway: waits until the blocked stream is parked inside Recv, then |
| 216 | + // returns a non-retryable error so the errgroup cancels gCtx (and therefore every |
| 217 | + // per-stream child context). Sequencing the error after the block guarantees the |
| 218 | + // blocked goroutine is committed to Recv (past the loop's gCtx.Err() guard), so the |
| 219 | + // only thing that can unblock it is cancellation of its own stream context — which |
| 220 | + // happens iff that context is derived from gCtx. This makes the wrong-parent |
| 221 | + // regression deterministic rather than scheduling-dependent. |
| 222 | + failingClient := &seriesStreamLifetimeClient{ |
| 223 | + storeGatewayClientMock: storeGatewayClientMock{remoteAddr: "failing"}, |
| 224 | + seriesFn: func(_ context.Context) (storegatewaypb.StoreGateway_SeriesClient, error) { |
| 225 | + select { |
| 226 | + case <-blockedStarted: |
| 227 | + case <-time.After(5 * time.Second): |
| 228 | + } |
| 229 | + return nil, status.Error(codes.Internal, "boom") |
| 230 | + }, |
| 231 | + } |
| 232 | + |
| 233 | + q := &blocksStoreQuerier{} |
| 234 | + clients := map[BlocksStoreClient][]ulid.ULID{ |
| 235 | + blockedClient: {block1}, |
| 236 | + failingClient: {block2}, |
| 237 | + } |
| 238 | + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric")} |
| 239 | + |
| 240 | + done := make(chan struct{}) |
| 241 | + go func() { |
| 242 | + defer close(done) |
| 243 | + _, _, _, _, _, _ = q.fetchSeriesFromStores(context.Background(), nil, "user-1", clients, minT, maxT, 0, matchers, 0, 0) |
| 244 | + }() |
| 245 | + |
| 246 | + // fetchSeriesFromStores must return promptly: the failing sibling cancels gCtx, which |
| 247 | + // cancels the blocked stream's context and unblocks its Recv. The test does not |
| 248 | + // release the blocked stream itself. On a version that derived the per-stream context |
| 249 | + // from a non-errgroup parent, the blocked Recv would never be cancelled and this hangs. |
| 250 | + select { |
| 251 | + case <-done: |
| 252 | + case <-time.After(2 * time.Second): |
| 253 | + doRelease() |
| 254 | + <-done |
| 255 | + t.Fatal("fetchSeriesFromStores did not return after a sibling store-gateway error; per-stream context is not derived from the errgroup context") |
| 256 | + } |
| 257 | +} |
0 commit comments