Skip to content

Commit 9fb947d

Browse files
sandy2008claude
andauthored
[BUGFIX] Ring: fix DoBatch cleanup callback blocked forever on callback panic (#7559)
* Fix DoBatch cleanup callback blocked forever on callback panic DoBatch's submitted closures called wg.Done() inline after callback() and tracker.record(). If callback panicked, wg.Done() was skipped, causing wg.Wait() to block forever and the cleanup callback to never execute. This leaked context timers, request buffers, and any other resources owned by the cleanup function for all DoBatch callers (distributor, alertmanager). Move wg.Done() to a defer so it runs even during panic unwinding, ensuring the cleanup goroutine always completes. Fixes #7558 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Sandy Chen <Yuxuan.Chen@morganstanley.com> * Fix errcheck lint: assign recover() return to blank identifier Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Sandy Chen <Yuxuan.Chen@morganstanley.com> * Fix faillint: use go.uber.org/atomic instead of sync/atomic Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Sandy Chen <Yuxuan.Chen@morganstanley.com> --------- Signed-off-by: Sandy Chen <Yuxuan.Chen@morganstanley.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b64d2a7 commit 9fb947d

2 files changed

Lines changed: 107 additions & 1 deletion

File tree

pkg/ring/batch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, e util.AsyncExecutor
125125
wg.Add(len(instances))
126126
for _, i := range instances {
127127
e.Submit(func() {
128+
defer wg.Done()
128129
err := callback(i.desc, i.indexes)
129130
tracker.record(i, err)
130-
wg.Done()
131131
})
132132
}
133133

pkg/ring/batch_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package ring
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"go.uber.org/atomic"
10+
)
11+
12+
var _ ReadRing = (*mockReadRing)(nil)
13+
14+
// recoveringExecutor wraps each submitted function in a goroutine that
15+
// recovers from panics. This prevents a panic inside a DoBatch callback
16+
// from crashing the test process, while still allowing us to observe
17+
// whether cleanup is called (which depends on defer wg.Done()).
18+
type recoveringExecutor struct{}
19+
20+
func (e *recoveringExecutor) Submit(f func()) {
21+
go func() {
22+
defer func() { _ = recover() }()
23+
f()
24+
}()
25+
}
26+
27+
func (e *recoveringExecutor) Stop() {}
28+
29+
// mockReadRing is a minimal ReadRing implementation for testing DoBatch.
30+
// It returns a single healthy instance for any Get call.
31+
type mockReadRing struct {
32+
inst InstanceDesc
33+
}
34+
35+
func (m *mockReadRing) Get(_ uint32, _ Operation, _ []InstanceDesc, _ []string, _ map[string]int) (ReplicationSet, error) {
36+
return ReplicationSet{
37+
Instances: []InstanceDesc{m.inst},
38+
MaxErrors: 0,
39+
}, nil
40+
}
41+
42+
func (m *mockReadRing) GetAllHealthy(_ Operation) (ReplicationSet, error) {
43+
return ReplicationSet{}, nil
44+
}
45+
func (m *mockReadRing) GetAllInstanceDescs(_ Operation) ([]InstanceDesc, []InstanceDesc, error) {
46+
return nil, nil, nil
47+
}
48+
func (m *mockReadRing) GetInstanceDescsForOperation(_ Operation) (map[string]InstanceDesc, error) {
49+
return nil, nil
50+
}
51+
func (m *mockReadRing) GetReplicationSetForOperation(_ Operation) (ReplicationSet, error) {
52+
return ReplicationSet{}, nil
53+
}
54+
func (m *mockReadRing) ReplicationFactor() int { return 1 }
55+
func (m *mockReadRing) InstancesCount() int { return 1 }
56+
func (m *mockReadRing) ShuffleShard(_ string, _ int) ReadRing {
57+
return m
58+
}
59+
func (m *mockReadRing) ShuffleShardWithZoneStability(_ string, _ int) ReadRing {
60+
return m
61+
}
62+
func (m *mockReadRing) GetInstanceState(_ string) (InstanceState, error) {
63+
return ACTIVE, nil
64+
}
65+
func (m *mockReadRing) GetInstanceIdByAddr(_ string) (string, error) {
66+
return "", nil
67+
}
68+
func (m *mockReadRing) ShuffleShardWithLookback(_ string, _ int, _ time.Duration, _ time.Time) ReadRing {
69+
return m
70+
}
71+
func (m *mockReadRing) HasInstance(_ string) bool { return true }
72+
func (m *mockReadRing) CleanupShuffleShardCache(_ string) {}
73+
74+
func TestDoBatchCleanupCalledOnCallbackPanic(t *testing.T) {
75+
ring := &mockReadRing{
76+
inst: InstanceDesc{
77+
Addr: "addr-0",
78+
Timestamp: time.Now().Unix(),
79+
State: ACTIVE,
80+
Tokens: []uint32{0},
81+
},
82+
}
83+
84+
var cleanupCalled atomic.Bool
85+
cleanup := func() {
86+
cleanupCalled.Store(true)
87+
}
88+
89+
panicCallback := func(_ InstanceDesc, _ []int) error {
90+
panic("test panic in callback")
91+
}
92+
93+
// Use a context with timeout so DoBatch can return. When the callback
94+
// panics, tracker.record is never called, so neither tracker.done nor
95+
// tracker.err is signaled. DoBatch exits via ctx.Done(). The key
96+
// assertion is that cleanup still runs: with defer wg.Done(), the
97+
// WaitGroup completes despite the panic, unblocking the cleanup goroutine.
98+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
99+
defer cancel()
100+
101+
_ = DoBatch(ctx, Write, ring, &recoveringExecutor{}, []uint32{0}, panicCallback, cleanup)
102+
103+
assert.Eventually(t, func() bool {
104+
return cleanupCalled.Load()
105+
}, 5*time.Second, 10*time.Millisecond, "cleanup must be called even when callback panics")
106+
}

0 commit comments

Comments
 (0)