Skip to content

Commit f997ab7

Browse files
committed
Complete fix for goroutine lifecycle race condition in Subscribe
Address comprehensive goroutine lifecycle management issue: Problem: The previous fix with buffered channel only prevented blocking, but didn't prevent goroutine leaks. When the main function returns with an error or due to context cancellation BEFORE the goroutine enters its main loop, the goroutine would continue running indefinitely until the original context expires (could be much later). Complete Solution: 1. Added stop channel (chan struct{}) to signal goroutine termination 2. Goroutine checks stop channel during initialization (both error and success paths) 3. Goroutine checks stop channel as first case in main select loop 4. Main function closes stop channel when returning with error or ctx.Done() 5. Main function does NOT close stop on success - goroutine continues normally This ensures: - No goroutine leaks when Subscribe returns early with error - No goroutine leaks when context is cancelled during initialization - Clean resource cleanup via defer statements - Goroutine runs normally when initialization succeeds Verified with race detector running 3 consecutive times.
1 parent 3fe25be commit f997ab7

1 file changed

Lines changed: 10 additions & 2 deletions

File tree

pkg/syncer/pubsub/redis_streams.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,11 @@ func (r *RedisStreams) Subscribe(ctx context.Context, channel string) (chan stri
100100

101101
ch := make(chan string)
102102
ready := make(chan error, 1) // Signal when consumer group is ready
103+
stop := make(chan struct{}) // Signal to stop goroutine
103104

104105
go func() {
105106
defer close(ch)
107+
defer close(ready) // Ensure ready is closed
106108

107109
batchSize := r.getBatchSize()
108110
flushTicker := time.NewTicker(r.getFlushInterval())
@@ -123,20 +125,23 @@ func (r *RedisStreams) Subscribe(ctx context.Context, channel string) (chan stri
123125
log.Error().Err(err).Str("stream", streamName).Str("group", consumerGroup).Msg("Failed to create consumer group")
124126
select {
125127
case ready <- err: // Signal initialization failure
126-
case <-ctx.Done(): // Main function already returned
128+
case <-stop: // Main function signaled stop
127129
}
128130
return
129131
}
130132

131133
// Signal that consumer group is ready
132134
select {
133135
case ready <- nil:
134-
case <-ctx.Done(): // Main function already returned
136+
case <-stop: // Main function signaled stop
135137
return
136138
}
137139

138140
for {
139141
select {
142+
case <-stop:
143+
// Main function requested stop
144+
return
140145
case <-ctx.Done():
141146
// Send any remaining batch before closing
142147
if len(batch) > 0 {
@@ -239,10 +244,13 @@ func (r *RedisStreams) Subscribe(ctx context.Context, channel string) (chan stri
239244
select {
240245
case err := <-ready:
241246
if err != nil {
247+
close(stop) // Signal goroutine to stop on initialization error
242248
return nil, err
243249
}
250+
// Success - goroutine continues running
244251
return ch, nil
245252
case <-ctx.Done():
253+
close(stop) // Signal goroutine to stop on context cancellation
246254
return nil, ctx.Err()
247255
}
248256
}

0 commit comments

Comments
 (0)