@@ -26,18 +26,6 @@ type secondaryPending struct {
2626 payload string
2727}
2828
29- // unmatchedSecondaries buffers unmatched secondary messages per shadowPubSub
30- // instance. This allows us to avoid reporting DivExtraData immediately when a
31- // secondary arrives before the corresponding primary (e.g. due to network
32- // jitter) and instead only report once the comparison window has elapsed.
33- // Each key maps to a slice to handle duplicate secondary messages correctly.
34- var unmatchedSecondaries = struct {
35- sync.Mutex
36- data map [* shadowPubSub ]map [msgKey ][]secondaryPending
37- }{
38- data : make (map [* shadowPubSub ]map [msgKey ][]secondaryPending ),
39- }
40-
4129// pendingMsg records a message awaiting its counterpart from the other source.
4230type pendingMsg struct {
4331 pattern string
@@ -65,24 +53,29 @@ type shadowPubSub struct {
6553 window time.Duration
6654 nowFunc func () time.Time // injectable clock; defaults to time.Now
6755
68- mu sync.Mutex
69- pending map [msgKey ][]pendingMsg // primary messages awaiting secondary match
70- closed bool
71- started bool
72- startOnce sync.Once
73- done chan struct {}
56+ mu sync.Mutex
57+ pending map [msgKey ][]pendingMsg // primary messages awaiting secondary match
58+ // unmatchedSecondaries buffers secondary messages that arrived before a
59+ // corresponding primary. Protected by mu. Each key maps to a slice to
60+ // handle duplicate messages correctly.
61+ unmatchedSecondaries map [msgKey ][]secondaryPending
62+ closed bool
63+ started bool
64+ startOnce sync.Once
65+ done chan struct {}
7466}
7567
7668func newShadowPubSub (backend PubSubBackend , metrics * ProxyMetrics , sentry * SentryReporter , logger * slog.Logger , window time.Duration ) * shadowPubSub {
7769 return & shadowPubSub {
78- secondary : backend .NewPubSub (context .Background ()),
79- metrics : metrics ,
80- sentry : sentry ,
81- logger : logger ,
82- window : window ,
83- nowFunc : time .Now ,
84- pending : make (map [msgKey ][]pendingMsg ),
85- done : make (chan struct {}),
70+ secondary : backend .NewPubSub (context .Background ()),
71+ metrics : metrics ,
72+ sentry : sentry ,
73+ logger : logger ,
74+ window : window ,
75+ nowFunc : time .Now ,
76+ pending : make (map [msgKey ][]pendingMsg ),
77+ unmatchedSecondaries : make (map [msgKey ][]secondaryPending ),
78+ done : make (chan struct {}),
8679 }
8780}
8881
@@ -166,14 +159,7 @@ func (sp *shadowPubSub) RecordPrimary(msg *redis.Message) {
166159// exists for the given key and is still within the comparison window. If one is
167160// found it is consumed and true is returned. Caller must hold sp.mu.
168161func (sp * shadowPubSub ) reconcileWithBufferedSecondary (key msgKey , now time.Time ) bool {
169- unmatchedSecondaries .Lock ()
170- defer unmatchedSecondaries .Unlock ()
171-
172- secBySP , ok := unmatchedSecondaries .data [sp ]
173- if ! ok {
174- return false
175- }
176- secs , ok := secBySP [key ]
162+ secs , ok := sp .unmatchedSecondaries [key ]
177163 if ! ok || len (secs ) == 0 {
178164 return false
179165 }
@@ -185,12 +171,9 @@ func (sp *shadowPubSub) reconcileWithBufferedSecondary(key msgKey, now time.Time
185171 // Consume this secondary — remove it from the slice.
186172 secs = append (secs [:i ], secs [i + 1 :]... )
187173 if len (secs ) == 0 {
188- delete (secBySP , key )
189- if len (secBySP ) == 0 {
190- delete (unmatchedSecondaries .data , sp )
191- }
174+ delete (sp .unmatchedSecondaries , key )
192175 } else {
193- secBySP [key ] = secs
176+ sp . unmatchedSecondaries [key ] = secs
194177 }
195178 return true
196179 }
@@ -210,10 +193,6 @@ func (sp *shadowPubSub) Close() {
210193 if started {
211194 <- sp .done
212195 }
213- // Clean up buffered unmatched secondaries to prevent memory leak.
214- unmatchedSecondaries .Lock ()
215- delete (unmatchedSecondaries .data , sp )
216- unmatchedSecondaries .Unlock ()
217196}
218197
219198// compareLoop reads from the secondary channel and matches messages.
@@ -244,6 +223,7 @@ func (sp *shadowPubSub) compareLoop(ch <-chan *redis.Message) {
244223// Collects divergence info under lock and reports after releasing it.
245224func (sp * shadowPubSub ) matchSecondary (msg * redis.Message ) {
246225 sp .mu .Lock ()
226+ defer sp .mu .Unlock ()
247227
248228 key := msgKeyFromMessage (msg )
249229 now := sp .nowFunc ()
@@ -256,28 +236,16 @@ func (sp *shadowPubSub) matchSecondary(msg *redis.Message) {
256236 } else {
257237 sp .pending [key ] = entries [1 :]
258238 }
259- sp .mu .Unlock ()
260239 return
261240 }
262241 // The oldest pending primary is already past the window. Let the periodic
263242 // sweep report it as DivDataMismatch; fall through to buffer this secondary
264243 // so it can be reported as DivExtraData if it also remains unmatched.
265244 }
266245
267- sp .mu .Unlock ()
268-
269246 // No matching primary message within the window. Buffer the secondary and only
270247 // report DivExtraData if it remains unmatched after the comparison window.
271- unmatchedSecondaries .Lock ()
272- defer unmatchedSecondaries .Unlock ()
273-
274- perInstance , ok := unmatchedSecondaries .data [sp ]
275- if ! ok {
276- perInstance = make (map [msgKey ][]secondaryPending )
277- unmatchedSecondaries .data [sp ] = perInstance
278- }
279-
280- perInstance [key ] = append (perInstance [key ], secondaryPending {
248+ sp .unmatchedSecondaries [key ] = append (sp .unmatchedSecondaries [key ], secondaryPending {
281249 timestamp : now ,
282250 channel : msg .Channel ,
283251 payload : msg .Payload ,
@@ -291,22 +259,11 @@ func (sp *shadowPubSub) sweepExpired() {
291259 sp .mu .Lock ()
292260 now := sp .nowFunc ()
293261
294- unmatchedSecondaries .Lock ()
295- // Fetch without creating to avoid unnecessary allocation on every tick for idle instances.
296- perInstance := unmatchedSecondaries .data [sp ]
297-
298262 // Expire old buffered secondaries first so they cannot be consumed as a
299263 // "match" during reconciliation (prevents bypassing the comparison window).
300- if perInstance != nil {
301- divergences = sweepExpiredSecondaries (now , sp .window , perInstance , divergences )
302- }
303- divergences = sp .reconcilePrimaries (now , perInstance , divergences )
304-
305- if len (perInstance ) == 0 {
306- delete (unmatchedSecondaries .data , sp )
307- }
264+ divergences = sweepExpiredSecondaries (now , sp .window , sp .unmatchedSecondaries , divergences )
265+ divergences = sp .reconcilePrimaries (now , sp .unmatchedSecondaries , divergences )
308266
309- unmatchedSecondaries .Unlock ()
310267 sp .mu .Unlock ()
311268
312269 for _ , d := range divergences {
@@ -316,7 +273,7 @@ func (sp *shadowPubSub) sweepExpired() {
316273
317274// reconcilePrimaries matches pending primaries against buffered secondaries,
318275// reporting expired unmatched primaries as divergences.
319- // Caller must hold sp.mu and unmatchedSecondaries.Lock() .
276+ // Caller must hold sp.mu.
320277func (sp * shadowPubSub ) reconcilePrimaries (now time.Time , secBuf map [msgKey ][]secondaryPending , out []divergenceEvent ) []divergenceEvent {
321278 for key , entries := range sp .pending {
322279 var remaining []pendingMsg
@@ -385,26 +342,20 @@ func (sp *shadowPubSub) sweepAll() {
385342 }
386343 delete (sp .pending , key )
387344 }
388- sp .mu .Unlock ()
389-
390345 // Also drain any buffered unmatched secondaries for this instance.
391- unmatchedSecondaries .Lock ()
392- if perInstance , ok := unmatchedSecondaries .data [sp ]; ok {
393- for key , secs := range perInstance {
394- for _ , sec := range secs {
395- divergences = append (divergences , divergenceEvent {
396- channel : sec .channel ,
397- payload : sec .payload ,
398- pattern : key .Pattern ,
399- kind : DivExtraData ,
400- isPattern : key .Pattern != "" ,
401- })
402- }
403- delete (perInstance , key )
346+ for key , secs := range sp .unmatchedSecondaries {
347+ for _ , sec := range secs {
348+ divergences = append (divergences , divergenceEvent {
349+ channel : sec .channel ,
350+ payload : sec .payload ,
351+ pattern : key .Pattern ,
352+ kind : DivExtraData ,
353+ isPattern : key .Pattern != "" ,
354+ })
404355 }
405- delete (unmatchedSecondaries . data , sp )
356+ delete (sp . unmatchedSecondaries , key )
406357 }
407- unmatchedSecondaries .Unlock ()
358+ sp . mu .Unlock ()
408359
409360 for _ , d := range divergences {
410361 sp .reportDivergence (d )
@@ -429,11 +380,10 @@ func (sp *shadowPubSub) reportDivergence(d divergenceEvent) {
429380 }
430381
431382 var primary , secondary any
432- switch d .kind { //nolint:exhaustive // only two kinds apply to pub/sub shadow
433- case DivExtraData :
383+ if d .kind == DivExtraData {
434384 primary = nil
435385 secondary = d .payload
436- default :
386+ } else {
437387 primary = d .payload
438388 secondary = nil
439389 }
0 commit comments