@@ -148,30 +148,10 @@ func (sp *shadowPubSub) RecordPrimary(msg *redis.Message) {
148148 now := sp .nowFunc ()
149149
150150 // Attempt immediate reconciliation with any buffered unmatched secondary.
151- unmatchedSecondaries .Lock ()
152- if secBySP , ok := unmatchedSecondaries .data [sp ]; ok {
153- if secs , ok := secBySP [key ]; ok && len (secs ) > 0 {
154- for i , sec := range secs {
155- dt := now .Sub (sec .timestamp )
156- if dt <= sp .window && dt >= - sp .window {
157- // Consume this secondary — remove it from the slice.
158- secs = append (secs [:i ], secs [i + 1 :]... )
159- if len (secs ) == 0 {
160- delete (secBySP , key )
161- if len (secBySP ) == 0 {
162- delete (unmatchedSecondaries .data , sp )
163- }
164- } else {
165- secBySP [key ] = secs
166- }
167- unmatchedSecondaries .Unlock ()
168- // Primary and secondary matched; no need to queue.
169- return
170- }
171- }
172- }
151+ if sp .reconcileWithBufferedSecondary (key , now ) {
152+ // Primary and secondary matched; no need to queue.
153+ return
173154 }
174- unmatchedSecondaries .Unlock ()
175155
176156 // No suitable secondary was buffered; queue this primary for later comparison.
177157 sp .pending [key ] = append (sp .pending [key ], pendingMsg {
@@ -182,6 +162,41 @@ func (sp *shadowPubSub) RecordPrimary(msg *redis.Message) {
182162 })
183163}
184164
165+ // reconcileWithBufferedSecondary checks whether an unmatched secondary message
166+ // exists for the given key and is still within the comparison window. If one is
167+ // found it is consumed and true is returned. Caller must hold sp.mu.
168+ func (sp * shadowPubSub ) reconcileWithBufferedSecondary (key msgKey , now time.Time ) bool {
169+ unmatchedSecondaries .Lock ()
170+ defer unmatchedSecondaries .Unlock ()
171+
172+ secBySP , ok := unmatchedSecondaries .data [sp ]
173+ if ! ok {
174+ return false
175+ }
176+ secs , ok := secBySP [key ]
177+ if ! ok || len (secs ) == 0 {
178+ return false
179+ }
180+ for i , sec := range secs {
181+ dt := now .Sub (sec .timestamp )
182+ if dt > sp .window || dt < - sp .window {
183+ continue
184+ }
185+ // Consume this secondary — remove it from the slice.
186+ secs = append (secs [:i ], secs [i + 1 :]... )
187+ if len (secs ) == 0 {
188+ delete (secBySP , key )
189+ if len (secBySP ) == 0 {
190+ delete (unmatchedSecondaries .data , sp )
191+ }
192+ } else {
193+ secBySP [key ] = secs
194+ }
195+ return true
196+ }
197+ return false
198+ }
199+
185200// Close stops the shadow comparison and closes the secondary pub/sub.
186201// Safe to call even if Start was never called.
187202func (sp * shadowPubSub ) Close () {
0 commit comments