Skip to content

Commit d852c6f

Browse files
authored
Cache kubeconfigs on all raft members and fix race condition with re-engagement (#1439)
* Cache kubeconfigs on all raft members and fix race condition with re-engagement * fix pointer deref * fix more rebase issues
1 parent fbb8308 commit d852c6f

File tree

6 files changed

+726
-63
lines changed

6 files changed

+726
-63
lines changed

pkg/multicluster/broadcaster.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2026 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
package multicluster
11+
12+
import (
13+
"context"
14+
"sync"
15+
)
16+
17+
// restartBroadcaster delivers notifications to multiple concurrent consumers
18+
// with two guarantees:
19+
//
20+
// 1. Fanout: every notify() wakes all goroutines currently waiting, not just
21+
// one (close-and-replace gives channel-close broadcast semantics).
22+
//
23+
// 2. No missed notifications: if notify() fires while a consumer's fn is
24+
// executing, fn will be called again after it returns.
25+
type restartBroadcaster struct {
26+
mu sync.Mutex
27+
ch chan struct{}
28+
}
29+
30+
func newRestartBroadcaster() *restartBroadcaster {
31+
return &restartBroadcaster{ch: make(chan struct{})}
32+
}
33+
34+
// notify wakes all goroutines currently blocked in drainNotifications and
35+
// replaces the channel so consumers can detect notifications that fired while
36+
// their fn was executing.
37+
func (b *restartBroadcaster) notify() {
38+
b.mu.Lock()
39+
defer b.mu.Unlock()
40+
close(b.ch)
41+
b.ch = make(chan struct{})
42+
}
43+
44+
// channel returns the current channel under the lock so callers get a
45+
// consistent snapshot.
46+
func (b *restartBroadcaster) channel() <-chan struct{} {
47+
b.mu.Lock()
48+
defer b.mu.Unlock()
49+
return b.ch
50+
}
51+
52+
// drainNotifications runs fn each time b fires. It handles two concerns:
53+
//
54+
// - Fanout: because notify() closes the channel, all goroutines running
55+
// drainNotifications for the same broadcaster wake simultaneously.
56+
//
57+
// - No missed notifications: the channel reference is snapshotted before fn
58+
// runs. After fn returns, if the reference changed then at least one
59+
// notify() fired during fn, so fn is called again immediately. Multiple
60+
// concurrent notifies during a single fn execution collapse into one extra
61+
// fn call.
62+
func drainNotifications(ctx context.Context, b *restartBroadcaster, fn func(context.Context)) {
63+
ch := b.channel()
64+
for {
65+
select {
66+
case <-ctx.Done():
67+
return
68+
case <-ch:
69+
// Snapshot the channel before fn runs. Any notify() during fn
70+
// replaces the channel, making the reference stale.
71+
ch = b.channel()
72+
fn(ctx)
73+
// Drain: if the channel changed while fn was running, a
74+
// notification was missed — call fn again and repeat.
75+
for next := b.channel(); next != ch; next = b.channel() {
76+
ch = next
77+
fn(ctx)
78+
}
79+
}
80+
}
81+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Copyright 2026 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
package multicluster
11+
12+
import (
13+
"context"
14+
"sync/atomic"
15+
"testing"
16+
"testing/synctest"
17+
)
18+
19+
// TestDrainNotificationsNoConcurrentMiss verifies that drainNotifications
20+
// never drops a notification that fires while fn is already executing.
21+
//
22+
// The scenario that previously caused a missed engage:
23+
//
24+
// 1. Goroutine is waiting on channel ch_A.
25+
// 2. notify() fires → ch_A closes, ch_B created.
26+
// 3. Goroutine wakes, snapshots ch = ch_B, calls fn (slow).
27+
// 4. A second notify() fires while fn is running → ch_B closes, ch_C created.
28+
// 5. fn returns.
29+
// 6. Old code: goroutine calls channel() → gets ch_C (open), waits forever.
30+
// New code: ch_C != ch_B → goroutine detects the missed notification and
31+
// calls fn again before waiting on ch_C.
32+
//
33+
// synctest.Wait() is used to advance all goroutines to their next durable
34+
// blocking point, making the steps above fully deterministic without sleeps.
35+
func TestDrainNotificationsNoConcurrentMiss(t *testing.T) {
36+
synctest.Test(t, func(t *testing.T) {
37+
b := newRestartBroadcaster()
38+
39+
var count atomic.Int32
40+
41+
// gate controls when fn returns, simulating a slow doEngage.
42+
gate := make(chan struct{})
43+
fn := func(_ context.Context) {
44+
count.Add(1)
45+
<-gate
46+
}
47+
48+
ctx, cancel := context.WithCancel(t.Context())
49+
defer cancel()
50+
51+
go drainNotifications(ctx, b, fn)
52+
53+
// Ensure the goroutine has started and is blocked in the select before
54+
// we fire any notification. Without this, the goroutine might start
55+
// AFTER notify() replaces the channel and miss the first notification.
56+
synctest.Wait()
57+
58+
// ── Step 1: first notify ─────────────────────────────────────────────
59+
// The goroutine wakes, snapshots the new channel reference (ch_B), and
60+
// calls fn. fn increments count then blocks on <-gate.
61+
b.notify()
62+
synctest.Wait() // goroutine is now blocked inside fn on <-gate
63+
64+
if count.Load() != 1 {
65+
t.Fatalf("after first notify: expected count=1, got %d", count.Load())
66+
}
67+
68+
// ── Step 2: second notify while fn is still running ──────────────────
69+
// The broadcaster closes ch_B and creates ch_C. Because fn has not
70+
// returned yet, the goroutine cannot observe this change.
71+
b.notify()
72+
73+
// ── Step 3: release fn ───────────────────────────────────────────────
74+
// fn returns. The drain check sees ch_C != ch_B and calls fn again.
75+
// fn increments count then blocks on <-gate again.
76+
gate <- struct{}{}
77+
synctest.Wait() // goroutine is blocked inside the second fn call
78+
79+
if count.Load() != 2 {
80+
t.Fatalf("after second notify: expected count=2, got %d (notification was dropped)", count.Load())
81+
}
82+
83+
// ── Step 4: release the second fn and shut down ──────────────────────
84+
gate <- struct{}{}
85+
cancel()
86+
synctest.Wait() // goroutine exits via ctx.Done()
87+
})
88+
}
89+
90+
// TestDrainNotificationsSingleNotify is a basic sanity check: a single notify
91+
// calls fn exactly once and the goroutine then waits for the next notification.
92+
func TestDrainNotificationsSingleNotify(t *testing.T) {
93+
synctest.Test(t, func(t *testing.T) {
94+
b := newRestartBroadcaster()
95+
96+
var count atomic.Int32
97+
fn := func(_ context.Context) { count.Add(1) }
98+
99+
ctx, cancel := context.WithCancel(t.Context())
100+
defer cancel()
101+
102+
go drainNotifications(ctx, b, fn)
103+
synctest.Wait() // goroutine subscribed and blocking in select
104+
105+
b.notify()
106+
synctest.Wait() // fn ran and goroutine is back waiting for next notify
107+
108+
if count.Load() != 1 {
109+
t.Fatalf("expected count=1, got %d", count.Load())
110+
}
111+
112+
cancel()
113+
synctest.Wait()
114+
})
115+
}
116+
117+
// TestDrainNotificationsThreeRapidNotifies verifies that three back-to-back
118+
// notify() calls issued while fn is blocked result in fn being called at
119+
// least twice: once for the first notification and at least once more for the
120+
// concurrent ones (which collapse into a single drain pass since each
121+
// subsequent notify() replaces the previous replacement channel).
122+
func TestDrainNotificationsThreeRapidNotifies(t *testing.T) {
123+
synctest.Test(t, func(t *testing.T) {
124+
b := newRestartBroadcaster()
125+
126+
var count atomic.Int32
127+
gate := make(chan struct{})
128+
fn := func(_ context.Context) {
129+
count.Add(1)
130+
<-gate
131+
}
132+
133+
ctx, cancel := context.WithCancel(t.Context())
134+
defer cancel()
135+
136+
go drainNotifications(ctx, b, fn)
137+
synctest.Wait() // goroutine subscribed
138+
139+
// First notify starts fn.
140+
b.notify()
141+
synctest.Wait() // goroutine blocked on <-gate in fn
142+
143+
if count.Load() != 1 {
144+
t.Fatalf("after first notify: expected count=1, got %d", count.Load())
145+
}
146+
147+
// Two more notifies while fn is blocked. They replace the channel
148+
// twice; the third notify's channel is what the drain check will see.
149+
// A single drain pass is sufficient to call fn once more.
150+
b.notify()
151+
b.notify()
152+
153+
// Release first fn; drain detects missed notifications and re-runs fn.
154+
gate <- struct{}{}
155+
synctest.Wait() // goroutine blocked on <-gate in drain-pass fn call
156+
157+
if count.Load() < 2 {
158+
t.Fatalf("after two concurrent notifies: expected count>=2, got %d", count.Load())
159+
}
160+
161+
// Release remaining fn calls until the goroutine is back in the select.
162+
for {
163+
select {
164+
case gate <- struct{}{}:
165+
synctest.Wait()
166+
default:
167+
// goroutine is not blocked on gate — drain complete
168+
cancel()
169+
synctest.Wait()
170+
return
171+
}
172+
}
173+
})
174+
}

0 commit comments

Comments
 (0)