Skip to content

Commit 5085aaa

Browse files
authored
Merge pull request #362 from bootjp/copilot/sub-pr-351
Fix pattern propagation in pubsub divergence events, flush closed flag, and window boundary
2 parents b61d0d0 + e42e4ba commit 5085aaa

5 files changed

Lines changed: 63 additions & 30 deletions

File tree

proxy/compare.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func (k DivergenceKind) String() string {
4343
type Divergence struct {
4444
Command string
4545
Key string
46+
Pattern string // non-empty for PSUBSCRIBE/pmessage divergences
4647
Kind DivergenceKind
4748
Primary any
4849
Secondary any

proxy/pubsub.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -720,12 +720,17 @@ func (s *pubsubSession) mirrorUnsub(names []string, isPattern bool) {
720720
// --- Helpers ---
721721

722722
// flushOrClose flushes the detached connection. On error it closes the
723-
// connection so that commandLoop will observe a read failure and shut down.
724-
// Caller must hold s.writeMu.
723+
// connection and marks the session as closed so that background forwarding
724+
// loops exit promptly. Caller must hold s.writeMu.
725725
func (s *pubsubSession) flushOrClose() {
726726
if err := s.dconn.Flush(); err != nil {
727727
s.logger.Warn("failed to flush to client; closing connection", "err", err)
728728
_ = s.dconn.Close()
729+
730+
// Mark the session as closed so background forwarding loops can exit promptly.
731+
s.mu.Lock()
732+
s.closed = true
733+
s.mu.Unlock()
729734
}
730735
}
731736

proxy/sentry.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ func (r *SentryReporter) CaptureDivergence(div Divergence) {
9191
// Omit raw key from Sentry tags to avoid leaking sensitive data;
9292
// only send a truncated form as an extra for debugging.
9393
scope.SetExtra("key", truncateValue(div.Key))
94+
if div.Pattern != "" {
95+
scope.SetExtra("pattern", truncateValue(div.Pattern))
96+
}
9497
scope.SetExtra("primary", truncateValue(div.Primary))
9598
scope.SetExtra("secondary", truncateValue(div.Secondary))
9699
scope.SetFingerprint([]string{"divergence", div.Kind.String(), div.Command})

proxy/shadow_pubsub.go

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type pendingMsg struct {
5050
type divergenceEvent struct {
5151
channel string
5252
payload string
53+
pattern string // non-empty for PSUBSCRIBE/pmessage deliveries
5354
kind DivergenceKind
5455
isPattern bool // true if this originated from a PSUBSCRIBE
5556
}
@@ -147,30 +148,10 @@ func (sp *shadowPubSub) RecordPrimary(msg *redis.Message) {
147148
now := sp.nowFunc()
148149

149150
// Attempt immediate reconciliation with any buffered unmatched secondary.
150-
unmatchedSecondaries.Lock()
151-
if secBySP, ok := unmatchedSecondaries.data[sp]; ok {
152-
if secs, ok := secBySP[key]; ok && len(secs) > 0 {
153-
for i, sec := range secs {
154-
dt := now.Sub(sec.timestamp)
155-
if dt <= sp.window && dt >= -sp.window {
156-
// Consume this secondary — remove it from the slice.
157-
secs = append(secs[:i], secs[i+1:]...)
158-
if len(secs) == 0 {
159-
delete(secBySP, key)
160-
if len(secBySP) == 0 {
161-
delete(unmatchedSecondaries.data, sp)
162-
}
163-
} else {
164-
secBySP[key] = secs
165-
}
166-
unmatchedSecondaries.Unlock()
167-
// Primary and secondary matched; no need to queue.
168-
return
169-
}
170-
}
171-
}
151+
if sp.reconcileWithBufferedSecondary(key, now) {
152+
// Primary and secondary matched; no need to queue.
153+
return
172154
}
173-
unmatchedSecondaries.Unlock()
174155

175156
// No suitable secondary was buffered; queue this primary for later comparison.
176157
sp.pending[key] = append(sp.pending[key], pendingMsg{
@@ -181,6 +162,41 @@ func (sp *shadowPubSub) RecordPrimary(msg *redis.Message) {
181162
})
182163
}
183164

165+
// reconcileWithBufferedSecondary checks whether an unmatched secondary message
166+
// exists for the given key and is still within the comparison window. If one is
167+
// found it is consumed and true is returned. Caller must hold sp.mu.
168+
func (sp *shadowPubSub) reconcileWithBufferedSecondary(key msgKey, now time.Time) bool {
169+
unmatchedSecondaries.Lock()
170+
defer unmatchedSecondaries.Unlock()
171+
172+
secBySP, ok := unmatchedSecondaries.data[sp]
173+
if !ok {
174+
return false
175+
}
176+
secs, ok := secBySP[key]
177+
if !ok || len(secs) == 0 {
178+
return false
179+
}
180+
for i, sec := range secs {
181+
dt := now.Sub(sec.timestamp)
182+
if dt > sp.window || dt < -sp.window {
183+
continue
184+
}
185+
// Consume this secondary — remove it from the slice.
186+
secs = append(secs[:i], secs[i+1:]...)
187+
if len(secs) == 0 {
188+
delete(secBySP, key)
189+
if len(secBySP) == 0 {
190+
delete(unmatchedSecondaries.data, sp)
191+
}
192+
} else {
193+
secBySP[key] = secs
194+
}
195+
return true
196+
}
197+
return false
198+
}
199+
184200
// Close stops the shadow comparison and closes the secondary pub/sub.
185201
// Safe to call even if Start was never called.
186202
func (sp *shadowPubSub) Close() {
@@ -233,7 +249,7 @@ func (sp *shadowPubSub) matchSecondary(msg *redis.Message) {
233249
now := sp.nowFunc()
234250
if entries, ok := sp.pending[key]; ok && len(entries) > 0 {
235251
oldest := entries[0]
236-
if now.Sub(oldest.timestamp) < sp.window {
252+
if now.Sub(oldest.timestamp) <= sp.window {
237253
// Match found within window — remove the oldest pending primary message.
238254
if len(entries) == 1 {
239255
delete(sp.pending, key)
@@ -308,7 +324,7 @@ func (sp *shadowPubSub) reconcilePrimaries(now time.Time, secBuf map[msgKey][]se
308324
if now.Sub(e.timestamp) >= sp.window {
309325
// Primary has expired — report as divergence regardless of any buffered
310326
// secondaries. A late secondary must not suppress a window violation.
311-
out = append(out, divergenceEvent{channel: e.channel, payload: e.payload, kind: DivDataMismatch, isPattern: e.pattern != ""})
327+
out = append(out, divergenceEvent{channel: e.channel, payload: e.payload, pattern: e.pattern, kind: DivDataMismatch, isPattern: e.pattern != ""})
312328
continue
313329
}
314330
if secs := secBuf[key]; len(secs) > 0 {
@@ -337,7 +353,7 @@ func sweepExpiredSecondaries(now time.Time, window time.Duration, secBuf map[msg
337353
var remaining []secondaryPending
338354
for _, sec := range secs {
339355
if now.Sub(sec.timestamp) >= window {
340-
out = append(out, divergenceEvent{channel: sec.channel, payload: sec.payload, kind: DivExtraData, isPattern: key.Pattern != ""})
356+
out = append(out, divergenceEvent{channel: sec.channel, payload: sec.payload, pattern: key.Pattern, kind: DivExtraData, isPattern: key.Pattern != ""})
341357
} else {
342358
remaining = append(remaining, sec)
343359
}
@@ -362,6 +378,7 @@ func (sp *shadowPubSub) sweepAll() {
362378
divergences = append(divergences, divergenceEvent{
363379
channel: e.channel,
364380
payload: e.payload,
381+
pattern: e.pattern,
365382
kind: DivDataMismatch,
366383
isPattern: e.pattern != "",
367384
})
@@ -378,6 +395,7 @@ func (sp *shadowPubSub) sweepAll() {
378395
divergences = append(divergences, divergenceEvent{
379396
channel: sec.channel,
380397
payload: sec.payload,
398+
pattern: key.Pattern,
381399
kind: DivExtraData,
382400
isPattern: key.Pattern != "",
383401
})
@@ -395,11 +413,15 @@ func (sp *shadowPubSub) sweepAll() {
395413

396414
func (sp *shadowPubSub) reportDivergence(d divergenceEvent) {
397415
sp.metrics.PubSubShadowDivergences.WithLabelValues(d.kind.String()).Inc()
398-
sp.logger.Warn("pubsub shadow divergence",
416+
logAttrs := []any{
399417
"channel", truncateValue(d.channel),
400418
"payload", truncateValue(d.payload),
401419
"kind", d.kind.String(),
402-
)
420+
}
421+
if d.pattern != "" {
422+
logAttrs = append(logAttrs, "pattern", truncateValue(d.pattern))
423+
}
424+
sp.logger.Warn("pubsub shadow divergence", logAttrs...)
403425

404426
cmd := "SUBSCRIBE"
405427
if d.isPattern {
@@ -418,6 +440,7 @@ func (sp *shadowPubSub) reportDivergence(d divergenceEvent) {
418440
sp.sentry.CaptureDivergence(Divergence{
419441
Command: cmd,
420442
Key: d.channel,
443+
Pattern: d.pattern,
421444
Kind: d.kind,
422445
Primary: primary,
423446
Secondary: secondary,

proxy/shadow_pubsub_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ func TestShadowPubSub_SecondaryWithinWindowMatches(t *testing.T) {
315315
assert.Equal(t, float64(0), extra, "within-window secondary match must suppress extra_data")
316316
}
317317

318+
// TestShadowPubSub_ExpiredPrimaryThenLateSecondary verifies that a secondary
318319
// arriving after the comparison window does not suppress the already-expired
319320
// primary divergence. Both a DivDataMismatch (primary) and eventually a
320321
// DivExtraData (secondary) should be reported.

0 commit comments

Comments
 (0)