Skip to content

Commit e912988

Browse files
authored
fix panic in stream push when context deadline expires during request marshalling (#7541)
* fix panic in stream push when context deadline expires during request marshalling Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * fix error handling Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * extract job processing Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> --------- Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 0c146ef commit e912988

3 files changed

Lines changed: 187 additions & 56 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
* [BUGFIX] Tenant Federation: Fix regex resolver clearing known users list when user scan fails. #7534
4343
* [BUGFIX] Ingester: Release the TSDB appender on every early-return path in `Push` (e.g. out-of-order label set) by deferring `Rollback`. Previously such requests leaked TSDB head series references, mmap'd chunks and pending state per request, causing the `cortex_ingester_tsdb_head_active_appenders` gauge to grow unbounded. #7528
4444
* [BUGFIX] Ring: Fix ring token conflict resolution only applied to updated instance and make constantly token conflict check during instance observe period.
45+
* [BUGFIX] Distributor: Fix a panic (`slice bounds out of range`) in the stream push path when the context deadline expires while the worker goroutine is still marshalling a `WriteRequest`. #7541
4546

4647
## 1.21.0 2026-04-24
4748

pkg/ingester/client/client.go

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,10 @@ type HealthAndIngesterClient interface {
5757
}
5858

5959
type streamWriteJob struct {
60-
req *cortexpb.StreamWriteRequest
61-
resp *cortexpb.WriteResponse
62-
ctx context.Context
63-
cancel context.CancelFunc
64-
err error
60+
req *cortexpb.StreamWriteRequest
61+
resp *cortexpb.WriteResponse
62+
err error
63+
sendDone chan struct{}
6564
}
6665

6766
type closableHealthAndIngesterClient struct {
@@ -112,17 +111,18 @@ func (c *closableHealthAndIngesterClient) PushStreamConnection(ctx context.Conte
112111
Request: in,
113112
}
114113

115-
reqCtx, reqCancel := context.WithCancel(ctx)
116-
defer reqCancel()
117-
118114
job := &streamWriteJob{
119-
req: streamReq,
120-
ctx: reqCtx,
121-
cancel: reqCancel,
115+
req: streamReq,
116+
sendDone: make(chan struct{}),
122117
}
123118
c.streamPushChan <- job
124-
<-reqCtx.Done()
125-
return job.resp, job.err
119+
select {
120+
case <-job.sendDone:
121+
return job.resp, job.err
122+
case <-ctx.Done():
123+
<-job.sendDone
124+
return nil, ctx.Err()
125+
}
126126
})
127127
}
128128

@@ -185,9 +185,11 @@ func (c *closableHealthAndIngesterClient) Close() error {
185185
if !ok {
186186
break drainingLoop
187187
}
188-
if job != nil && job.cancel != nil {
188+
if job != nil {
189189
job.err = errors.New("stream connection ingester client closing")
190-
job.cancel()
190+
if job.sendDone != nil {
191+
close(job.sendDone)
192+
}
191193
}
192194
default:
193195
close(c.streamPushChan)
@@ -236,35 +238,41 @@ func (c *closableHealthAndIngesterClient) worker(ctx context.Context) error {
236238
if !ok {
237239
return
238240
}
239-
err = stream.Send(job.req)
240-
if err == io.EOF {
241-
job.resp = &cortexpb.WriteResponse{}
242-
job.cancel()
243-
return
244-
}
245-
if err != nil {
246-
job.err = err
247-
job.cancel()
248-
continue
249-
}
250-
resp, err := stream.Recv()
251-
if err == io.EOF {
252-
job.resp = &cortexpb.WriteResponse{}
253-
job.cancel()
241+
if done := c.processJob(stream, job); done {
254242
return
255243
}
256-
job.resp = resp
257-
job.err = err
258-
if err == nil && job.resp.Code != http.StatusOK {
259-
job.err = httpgrpc.Errorf(int(job.resp.Code), "%s", job.resp.Message)
260-
}
261-
job.cancel()
262244
}
263245
}
264246
}()
265247
return nil
266248
}
267249

250+
// processJob handles a single job and returns true if the stream should be closed.
251+
func (c *closableHealthAndIngesterClient) processJob(stream Ingester_PushStreamClient, job *streamWriteJob) (done bool) {
252+
defer close(job.sendDone)
253+
254+
err := stream.Send(job.req)
255+
if err == io.EOF {
256+
job.resp = &cortexpb.WriteResponse{}
257+
return true
258+
}
259+
if err != nil {
260+
job.err = err
261+
return false
262+
}
263+
resp, err := stream.Recv()
264+
if err == io.EOF {
265+
job.resp = &cortexpb.WriteResponse{}
266+
return true
267+
}
268+
job.resp = resp
269+
job.err = err
270+
if err == nil && job.resp.Code != http.StatusOK {
271+
job.err = httpgrpc.Errorf(int(job.resp.Code), "%s", job.resp.Message)
272+
}
273+
return false
274+
}
275+
268276
// Config is the configuration struct for the ingester client
269277
type Config struct {
270278
GRPCClientConfig grpcclient.ConfigWithHealthCheck `yaml:"grpc_client_config"`

pkg/ingester/client/client_test.go

Lines changed: 142 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"context"
5+
"fmt"
56
"net/http/httptest"
67
"strconv"
78
"testing"
@@ -11,6 +12,7 @@ import (
1112
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/mock"
1314
"github.com/stretchr/testify/require"
15+
"github.com/weaveworks/common/user"
1416
"google.golang.org/grpc"
1517

1618
"github.com/cortexproject/cortex/pkg/cortexpb"
@@ -155,10 +157,8 @@ func TestClosableHealthAndIngesterClient_Close_WithActiveStream(t *testing.T) {
155157
ctx, cancel := context.WithCancel(context.Background())
156158
streamChan := make(chan *streamWriteJob, 1)
157159

158-
jobCtx, jobCancel := context.WithCancel(context.Background())
159160
job := &streamWriteJob{
160-
ctx: jobCtx,
161-
cancel: jobCancel,
161+
sendDone: make(chan struct{}),
162162
}
163163
streamChan <- job
164164

@@ -178,6 +178,14 @@ func TestClosableHealthAndIngesterClient_Close_WithActiveStream(t *testing.T) {
178178
_, ok := <-client.streamPushChan
179179
assert.False(t, ok, "stream channel should be closed")
180180

181+
// Verify job.sendDone was closed by Close()
182+
select {
183+
case <-job.sendDone:
184+
// Success - sendDone was closed
185+
case <-time.After(100 * time.Millisecond):
186+
t.Error("job.sendDone was not closed")
187+
}
188+
181189
// Verify context is cancelled
182190
select {
183191
case <-client.streamCtx.Done():
@@ -191,21 +199,11 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) {
191199
ctx, cancel := context.WithCancel(context.Background())
192200
streamChan := make(chan *streamWriteJob, 2)
193201

194-
job1Cancelled := false
195-
job2Cancelled := false
202+
job1Done := make(chan struct{})
203+
job2Done := make(chan struct{})
196204

197-
job1 := &streamWriteJob{
198-
ctx: context.Background(),
199-
cancel: func() {
200-
job1Cancelled = true
201-
},
202-
}
203-
job2 := &streamWriteJob{
204-
ctx: context.Background(),
205-
cancel: func() {
206-
job2Cancelled = true
207-
},
208-
}
205+
job1 := &streamWriteJob{sendDone: job1Done}
206+
job2 := &streamWriteJob{sendDone: job2Done}
209207
streamChan <- job1
210208
streamChan <- job2
211209

@@ -230,9 +228,17 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) {
230228
t.Error("stream context was not cancelled")
231229
}
232230

233-
// Verify jobs were cancelled
234-
assert.True(t, job1Cancelled, "job1 should have been cancelled")
235-
assert.True(t, job2Cancelled, "job2 should have been cancelled")
231+
// Verify jobs were closed (sendDone channels closed)
232+
select {
233+
case <-job1Done:
234+
case <-time.After(500 * time.Millisecond):
235+
t.Error("job1.sendDone was not closed")
236+
}
237+
select {
238+
case <-job2Done:
239+
case <-time.After(500 * time.Millisecond):
240+
t.Error("job2.sendDone was not closed")
241+
}
236242
}
237243

238244
type mockClientStream struct {
@@ -249,6 +255,122 @@ func (m *mockClientStream) Recv() (*cortexpb.WriteResponse, error) {
249255
return &cortexpb.WriteResponse{}, nil
250256
}
251257

258+
// slowSendStream simulates a slow gRPC stream.
259+
// Send() pre-computes the buffer size (mirroring gRPC codec step 1),
260+
// sleeps for sendDelay so the caller's context deadline can fire first,
261+
// then calls MarshalToSizedBuffer (gRPC codec step 2).
262+
type slowSendStream struct {
263+
grpc.ClientStream
264+
sendDelay time.Duration
265+
panicCh chan any
266+
}
267+
268+
func (s *slowSendStream) Send(req *cortexpb.StreamWriteRequest) (retErr error) {
269+
defer func() {
270+
if r := recover(); r != nil {
271+
s.panicCh <- r // forward the panic value to the test
272+
} else {
273+
s.panicCh <- nil
274+
}
275+
}()
276+
277+
// gRPC codec pre-computes buffer size.
278+
size := req.Size()
279+
buf := make([]byte, size)
280+
281+
// Sleep so the caller's ctx deadline fires and PushStreamConnection returns.
282+
// After the sleep the caller may have grown the timeseries.
283+
time.Sleep(s.sendDelay)
284+
285+
// marshal into the pre-allocated buffer.
286+
// Panics when actual data > size (the bug).
287+
_, err := req.MarshalToSizedBuffer(buf)
288+
return err
289+
}
290+
291+
func (s *slowSendStream) Recv() (*cortexpb.WriteResponse, error) {
292+
return &cortexpb.WriteResponse{}, nil
293+
}
294+
295+
// TestPushStreamConnection_PanicWhenCtxExpiresAndTimeseriesGrows is an
296+
// end-to-end regression test for the distributor panic.
297+
func TestPushStreamConnection_PanicWhenCtxExpiresAndTimeseriesGrows(t *testing.T) {
298+
const (
299+
// ctxDeadline mirrors distributor.remote-timeout.
300+
// Kept short here so the test completes quickly.
301+
ctxDeadline = 20 * time.Millisecond
302+
// sendDelay must exceed ctxDeadline so the deadline fires while Send()
303+
// is still sleeping between Size() and MarshalToSizedBuffer().
304+
sendDelay = 200 * time.Millisecond
305+
)
306+
307+
ts := cortexpb.TimeseriesFromPool()
308+
ts.Labels = append(ts.Labels,
309+
cortexpb.LabelAdapter{Name: "__name__", Value: "test_metric"},
310+
cortexpb.LabelAdapter{Name: "job", Value: "test"},
311+
)
312+
ts.Samples = append(ts.Samples, cortexpb.Sample{Value: 1.0, TimestampMs: 1000})
313+
314+
timeseries := cortexpb.PreallocTimeseriesSliceFromPool()
315+
timeseries = append(timeseries, cortexpb.PreallocTimeseries{TimeSeries: ts})
316+
317+
writeReq := &cortexpb.WriteRequest{Timeseries: timeseries}
318+
319+
panicCh := make(chan any, 1)
320+
stream := &slowSendStream{
321+
sendDelay: sendDelay,
322+
panicCh: panicCh,
323+
}
324+
325+
mockIng := &mockIngester{}
326+
mockIng.On("PushStream", mock.Anything, mock.Anything).Return(stream, nil)
327+
328+
streamCtx, streamCancel := context.WithCancel(context.Background())
329+
defer streamCancel()
330+
331+
client := &closableHealthAndIngesterClient{
332+
IngesterClient: mockIng,
333+
conn: &mockClientConn{},
334+
addr: "test-addr",
335+
inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}),
336+
streamCtx: streamCtx,
337+
streamCancel: streamCancel,
338+
streamPushChan: make(chan *streamWriteJob, 1),
339+
}
340+
341+
workerCtx := user.InjectOrgID(streamCtx, "test-worker")
342+
require.NoError(t, client.worker(workerCtx))
343+
344+
// Call PushStreamConnection with a context that expires before Send() finishes.
345+
pushCtx, pushCancel := context.WithTimeout(
346+
user.InjectOrgID(context.Background(), "test-tenant"),
347+
ctxDeadline,
348+
)
349+
defer pushCancel()
350+
351+
// PushStreamConnection blocks until Send()+Recv() complete.
352+
_, pushErr := client.PushStreamConnection(pushCtx, writeReq)
353+
require.ErrorIs(t, pushErr, context.DeadlineExceeded,
354+
"caller should observe its own context deadline")
355+
356+
for i := range 100 {
357+
ts.Labels = append(ts.Labels, cortexpb.LabelAdapter{
358+
Name: fmt.Sprintf("extra_label_%d", i),
359+
Value: fmt.Sprintf("extra_value_%d", i),
360+
})
361+
}
362+
363+
// No panic expected: Send() already completed before labels were appended.
364+
select {
365+
case panicVal := <-panicCh:
366+
require.Nil(t, panicVal,
367+
"unexpected panic in MarshalToSizedBuffer: the fix should prevent "+
368+
"timeseries from being reused while Send() is still marshalling")
369+
case <-time.After(sendDelay + time.Second):
370+
t.Fatal("timed out waiting for Send() to complete")
371+
}
372+
}
373+
252374
func TestClosableHealthAndIngesterClient_ShouldNotPanicWhenClose(t *testing.T) {
253375
ctx, cancel := context.WithCancel(context.Background())
254376
streamChan := make(chan *streamWriteJob)

0 commit comments

Comments
 (0)