@@ -30,11 +30,12 @@ type secondaryPending struct {
3030// instance. This allows us to avoid reporting DivExtraData immediately when a
3131// secondary arrives before the corresponding primary (e.g. due to network
3232// jitter) and instead only report once the comparison window has elapsed.
33+ // Each key maps to a slice to handle duplicate secondary messages correctly.
3334var unmatchedSecondaries = struct {
3435 sync.Mutex
35- data map [* shadowPubSub ]map [msgKey ]secondaryPending
36+ data map [* shadowPubSub ]map [msgKey ][] secondaryPending
3637}{
37- data : make (map [* shadowPubSub ]map [msgKey ]secondaryPending ),
38+ data : make (map [* shadowPubSub ]map [msgKey ][] secondaryPending ),
3839}
3940
4041// pendingMsg records a message awaiting its counterpart from the other source.
@@ -47,9 +48,10 @@ type pendingMsg struct {
4748
4849// divergenceEvent holds divergence info collected under lock for deferred reporting.
4950type divergenceEvent struct {
50- channel string
51- payload string
52- kind DivergenceKind
51+ channel string
52+ payload string
53+ kind DivergenceKind
54+ isPattern bool // true if this originated from a PSUBSCRIBE
5355}
5456
5557// shadowPubSub subscribes to the secondary backend for the same channels
@@ -152,6 +154,10 @@ func (sp *shadowPubSub) Close() {
152154 if started {
153155 <- sp .done
154156 }
157+ // Clean up buffered unmatched secondaries to prevent memory leak.
158+ unmatchedSecondaries .Lock ()
159+ delete (unmatchedSecondaries .data , sp )
160+ unmatchedSecondaries .Unlock ()
155161}
156162
157163// compareLoop reads from the secondary channel and matches messages.
@@ -201,15 +207,15 @@ func (sp *shadowPubSub) matchSecondary(msg *redis.Message) {
201207
202208 perInstance , ok := unmatchedSecondaries .data [sp ]
203209 if ! ok {
204- perInstance = make (map [msgKey ]secondaryPending )
210+ perInstance = make (map [msgKey ][] secondaryPending )
205211 unmatchedSecondaries .data [sp ] = perInstance
206212 }
207213
208- perInstance [key ] = secondaryPending {
214+ perInstance [key ] = append ( perInstance [ key ], secondaryPending {
209215 timestamp : now ,
210216 channel : msg .Channel ,
211217 payload : msg .Payload ,
212- }
218+ })
213219}
214220
215221// sweepExpired reports primary messages that were not matched within the window.
@@ -219,32 +225,53 @@ func (sp *shadowPubSub) sweepExpired() {
219225 sp .mu .Lock ()
220226 now := time .Now ()
221227
222- // Lock the unmatched secondary buffer while we reconcile primaries and
223- // secondaries and age out any expired entries.
224228 unmatchedSecondaries .Lock ()
229+ perInstance := sp .getOrCreateSecondaryBuffer ()
230+
231+ divergences = sp .reconcilePrimaries (now , perInstance , divergences )
232+ divergences = sweepExpiredSecondaries (now , sp .window , perInstance , divergences )
233+
234+ if len (perInstance ) == 0 {
235+ delete (unmatchedSecondaries .data , sp )
236+ }
237+
238+ unmatchedSecondaries .Unlock ()
239+ sp .mu .Unlock ()
240+
241+ for _ , d := range divergences {
242+ sp .reportDivergence (d )
243+ }
244+ }
245+
246+ // getOrCreateSecondaryBuffer returns the per-instance unmatched secondary buffer.
247+ // Caller must hold unmatchedSecondaries.Lock().
248+ func (sp * shadowPubSub ) getOrCreateSecondaryBuffer () map [msgKey ][]secondaryPending {
225249 perInstance , ok := unmatchedSecondaries .data [sp ]
226250 if ! ok {
227- perInstance = make (map [msgKey ]secondaryPending )
251+ perInstance = make (map [msgKey ][] secondaryPending )
228252 unmatchedSecondaries .data [sp ] = perInstance
229253 }
254+ return perInstance
255+ }
230256
231- // First, reconcile pending primaries against buffered secondaries. If a
232- // primary has a matching buffered secondary within the window, treat them
233- // as matched and drop both without reporting a divergence.
257+ // reconcilePrimaries matches pending primaries against buffered secondaries,
258+ // reporting expired unmatched primaries as divergences.
259+ // Caller must hold sp.mu and unmatchedSecondaries.Lock().
260+ func (sp * shadowPubSub ) reconcilePrimaries (now time.Time , secBuf map [msgKey ][]secondaryPending , out []divergenceEvent ) []divergenceEvent {
234261 for key , entries := range sp .pending {
235262 var remaining []pendingMsg
236263 for _ , e := range entries {
237- if sec , ok := perInstance [key ]; ok && now .Sub (sec .timestamp ) < sp .window {
238- // Matched with a buffered secondary; drop both.
239- delete (perInstance , key )
264+ if secs := secBuf [key ]; len (secs ) > 0 {
265+ // Matched — consume the oldest buffered secondary.
266+ if len (secs ) == 1 {
267+ delete (secBuf , key )
268+ } else {
269+ secBuf [key ] = secs [1 :]
270+ }
240271 continue
241272 }
242273 if now .Sub (e .timestamp ) >= sp .window {
243- divergences = append (divergences , divergenceEvent {
244- channel : e .channel ,
245- payload : e .payload ,
246- kind : DivDataMismatch ,
247- })
274+ out = append (out , divergenceEvent {channel : e .channel , payload : e .payload , kind : DivDataMismatch , isPattern : e .pattern != "" })
248275 } else {
249276 remaining = append (remaining , e )
250277 }
@@ -255,31 +282,27 @@ func (sp *shadowPubSub) sweepExpired() {
255282 sp .pending [key ] = remaining
256283 }
257284 }
285+ return out
286+ }
258287
259- // Next, age out any remaining buffered secondaries that have exceeded the
260- // comparison window and report them as extra_data divergences.
261- for key , sec := range perInstance {
262- if now .Sub (sec .timestamp ) >= sp .window {
263- divergences = append (divergences , divergenceEvent {
264- channel : sec .channel ,
265- payload : sec .payload ,
266- kind : DivExtraData ,
267- })
268- delete (perInstance , key )
288+ // sweepExpiredSecondaries ages out buffered secondaries past the window.
289+ func sweepExpiredSecondaries (now time.Time , window time.Duration , secBuf map [msgKey ][]secondaryPending , out []divergenceEvent ) []divergenceEvent {
290+ for key , secs := range secBuf {
291+ var remaining []secondaryPending
292+ for _ , sec := range secs {
293+ if now .Sub (sec .timestamp ) >= window {
294+ out = append (out , divergenceEvent {channel : sec .channel , payload : sec .payload , kind : DivExtraData })
295+ } else {
296+ remaining = append (remaining , sec )
297+ }
298+ }
299+ if len (remaining ) == 0 {
300+ delete (secBuf , key )
301+ } else {
302+ secBuf [key ] = remaining
269303 }
270304 }
271-
272- // Clean up empty per-instance maps.
273- if len (perInstance ) == 0 {
274- delete (unmatchedSecondaries .data , sp )
275- }
276-
277- unmatchedSecondaries .Unlock ()
278- sp .mu .Unlock ()
279-
280- for _ , d := range divergences {
281- sp .reportDivergence (d .channel , d .payload , d .kind )
282- }
305+ return out
283306}
284307
285308// sweepAll reports all remaining pending messages as divergences (used on shutdown).
@@ -290,41 +313,47 @@ func (sp *shadowPubSub) sweepAll() {
290313 for key , entries := range sp .pending {
291314 for _ , e := range entries {
292315 divergences = append (divergences , divergenceEvent {
293- channel : e .channel ,
294- payload : e .payload ,
295- kind : DivDataMismatch ,
316+ channel : e .channel ,
317+ payload : e .payload ,
318+ kind : DivDataMismatch ,
319+ isPattern : e .pattern != "" ,
296320 })
297321 }
298322 delete (sp .pending , key )
299323 }
300324 sp .mu .Unlock ()
301325
302326 for _ , d := range divergences {
303- sp .reportDivergence (d . channel , d . payload , d . kind )
327+ sp .reportDivergence (d )
304328 }
305329}
306330
307- func (sp * shadowPubSub ) reportDivergence (channel , payload string , kind DivergenceKind ) {
308- sp .metrics .PubSubShadowDivergences .WithLabelValues (kind .String ()).Inc ()
331+ func (sp * shadowPubSub ) reportDivergence (d divergenceEvent ) {
332+ sp .metrics .PubSubShadowDivergences .WithLabelValues (d . kind .String ()).Inc ()
309333 sp .logger .Warn ("pubsub shadow divergence" ,
310- "channel" , truncateValue (channel ),
311- "payload" , truncateValue (payload ),
312- "kind" , kind .String (),
334+ "channel" , truncateValue (d . channel ),
335+ "payload" , truncateValue (d . payload ),
336+ "kind" , d . kind .String (),
313337 )
314338
339+ cmd := "SUBSCRIBE"
340+ if d .isPattern {
341+ cmd = "PSUBSCRIBE"
342+ }
343+
315344 var primary , secondary any
316- switch kind { //nolint:exhaustive // only two kinds apply to pub/sub shadow
345+ switch d . kind { //nolint:exhaustive // only two kinds apply to pub/sub shadow
317346 case DivExtraData :
318347 primary = nil
319- secondary = payload
348+ secondary = d . payload
320349 default :
321- primary = payload
350+ primary = d . payload
322351 secondary = nil
323352 }
324353 sp .sentry .CaptureDivergence (Divergence {
325- Command : "SUBSCRIBE" ,
326- Key : channel ,
327- Kind : kind ,
354+ Command : cmd ,
355+ Key : d . channel ,
356+ Kind : d . kind ,
328357 Primary : primary ,
329358 Secondary : secondary ,
330359 DetectedAt : time .Now (),
0 commit comments