Skip to content

Commit 68e6c72

Browse files
Merge pull request francescopepe#7 from francescopepe/FP/fix-context-cancellation-buffered-consumer
Fix context cancellation in buffered consumer
2 parents 96467d1 + 6cc7e52 commit 68e6c72

2 files changed

Lines changed: 16 additions & 4 deletions

File tree

consumers.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (c *multiMessageConsumer) consume(concurrency int, errorCh chan<- error, me
177177
}
178178

179179
select {
180-
case <-consumers: // Use an available comsumer
180+
case <-consumers: // Use an available consumer
181181
case <-buffer.CtxExpired():
182182
errorCh <- errBufferCtxExpired
183183

@@ -186,16 +186,19 @@ func (c *multiMessageConsumer) consume(concurrency int, errorCh chan<- error, me
186186
continue
187187
}
188188

189+
ctx, cancelCtx := buffer.PullContext()
190+
189191
wg.Add(1)
190-
go func(ctx context.Context, messages []messages.Message) {
192+
go func(ctx context.Context, ctxCancelFunc context.CancelFunc, msgs []messages.Message) {
191193
defer func() {
192194
wg.Done()
193195
consumers <- struct{}{} // Release consumer
196+
ctxCancelFunc() // Cancel context
194197
}()
195198

196199
// Process the messages
197-
c.processMessages(errorCh, deleteCh, ctx, messages)
198-
}(buffer.Context(), buffer.Messages())
200+
c.processMessages(errorCh, deleteCh, ctx, msgs)
201+
}(ctx, cancelCtx, buffer.Messages())
199202

200203
// Reset buffer
201204
buffer.Reset()

internal/messages/messages.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,15 @@ func (b *BufferWithContextTimeout) Context() context.Context {
156156
return b.ctx
157157
}
158158

159+
func (b *BufferWithContextTimeout) PullContext() (context.Context, context.CancelFunc) {
160+
ctx, cancelCtx := b.ctx, b.cancelCtx
161+
162+
b.ctx = context.Background() // Create a context that doesn't expire
163+
b.cancelCtx = func() {}
164+
165+
return ctx, cancelCtx
166+
}
167+
159168
func NewBufferWithContextTimeout(config BufferWithContextTimeoutConfiguration) *BufferWithContextTimeout {
160169
return &BufferWithContextTimeout{
161170
Buffer: NewMessageBuffer(BufferConfiguration{

0 commit comments

Comments
 (0)