Skip to content

Commit 760deef

Browse files
Copilotbootjp
andcommitted
Fix divergenceEvent pattern propagation, flush close flag, and window boundary
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
1 parent a06f845 commit 760deef

5 files changed

Lines changed: 25 additions & 7 deletions

File tree

proxy/compare.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func (k DivergenceKind) String() string {
4343
type Divergence struct {
4444
Command string
4545
Key string
46+
Pattern string // non-empty for PSUBSCRIBE/pmessage divergences
4647
Kind DivergenceKind
4748
Primary any
4849
Secondary any

proxy/pubsub.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -720,12 +720,17 @@ func (s *pubsubSession) mirrorUnsub(names []string, isPattern bool) {
720720
// --- Helpers ---
721721

722722
// flushOrClose flushes the detached connection. On error it closes the
723-
// connection so that commandLoop will observe a read failure and shut down.
724-
// Caller must hold s.writeMu.
723+
// connection and marks the session as closed so that background forwarding
724+
// loops exit promptly. Caller must hold s.writeMu.
725725
func (s *pubsubSession) flushOrClose() {
726726
if err := s.dconn.Flush(); err != nil {
727727
s.logger.Warn("failed to flush to client; closing connection", "err", err)
728728
_ = s.dconn.Close()
729+
730+
// Mark the session as closed so background forwarding loops can exit promptly.
731+
s.mu.Lock()
732+
s.closed = true
733+
s.mu.Unlock()
729734
}
730735
}
731736

proxy/sentry.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ func (r *SentryReporter) CaptureDivergence(div Divergence) {
9191
// Omit raw key from Sentry tags to avoid leaking sensitive data;
9292
// only send a truncated form as an extra for debugging.
9393
scope.SetExtra("key", truncateValue(div.Key))
94+
if div.Pattern != "" {
95+
scope.SetExtra("pattern", truncateValue(div.Pattern))
96+
}
9497
scope.SetExtra("primary", truncateValue(div.Primary))
9598
scope.SetExtra("secondary", truncateValue(div.Secondary))
9699
scope.SetFingerprint([]string{"divergence", div.Kind.String(), div.Command})

proxy/shadow_pubsub.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type pendingMsg struct {
5050
type divergenceEvent struct {
5151
channel string
5252
payload string
53+
pattern string // non-empty for PSUBSCRIBE/pmessage deliveries
5354
kind DivergenceKind
5455
isPattern bool // true if this originated from a PSUBSCRIBE
5556
}
@@ -233,7 +234,7 @@ func (sp *shadowPubSub) matchSecondary(msg *redis.Message) {
233234
now := sp.nowFunc()
234235
if entries, ok := sp.pending[key]; ok && len(entries) > 0 {
235236
oldest := entries[0]
236-
if now.Sub(oldest.timestamp) < sp.window {
237+
if now.Sub(oldest.timestamp) <= sp.window {
237238
// Match found within window — remove the oldest pending primary message.
238239
if len(entries) == 1 {
239240
delete(sp.pending, key)
@@ -308,7 +309,7 @@ func (sp *shadowPubSub) reconcilePrimaries(now time.Time, secBuf map[msgKey][]se
308309
if now.Sub(e.timestamp) >= sp.window {
309310
// Primary has expired — report as divergence regardless of any buffered
310311
// secondaries. A late secondary must not suppress a window violation.
311-
out = append(out, divergenceEvent{channel: e.channel, payload: e.payload, kind: DivDataMismatch, isPattern: e.pattern != ""})
312+
out = append(out, divergenceEvent{channel: e.channel, payload: e.payload, pattern: e.pattern, kind: DivDataMismatch, isPattern: e.pattern != ""})
312313
continue
313314
}
314315
if secs := secBuf[key]; len(secs) > 0 {
@@ -337,7 +338,7 @@ func sweepExpiredSecondaries(now time.Time, window time.Duration, secBuf map[msg
337338
var remaining []secondaryPending
338339
for _, sec := range secs {
339340
if now.Sub(sec.timestamp) >= window {
340-
out = append(out, divergenceEvent{channel: sec.channel, payload: sec.payload, kind: DivExtraData, isPattern: key.Pattern != ""})
341+
out = append(out, divergenceEvent{channel: sec.channel, payload: sec.payload, pattern: key.Pattern, kind: DivExtraData, isPattern: key.Pattern != ""})
341342
} else {
342343
remaining = append(remaining, sec)
343344
}
@@ -362,6 +363,7 @@ func (sp *shadowPubSub) sweepAll() {
362363
divergences = append(divergences, divergenceEvent{
363364
channel: e.channel,
364365
payload: e.payload,
366+
pattern: e.pattern,
365367
kind: DivDataMismatch,
366368
isPattern: e.pattern != "",
367369
})
@@ -378,6 +380,7 @@ func (sp *shadowPubSub) sweepAll() {
378380
divergences = append(divergences, divergenceEvent{
379381
channel: sec.channel,
380382
payload: sec.payload,
383+
pattern: key.Pattern,
381384
kind: DivExtraData,
382385
isPattern: key.Pattern != "",
383386
})
@@ -395,11 +398,15 @@ func (sp *shadowPubSub) sweepAll() {
395398

396399
func (sp *shadowPubSub) reportDivergence(d divergenceEvent) {
397400
sp.metrics.PubSubShadowDivergences.WithLabelValues(d.kind.String()).Inc()
398-
sp.logger.Warn("pubsub shadow divergence",
401+
logAttrs := []any{
399402
"channel", truncateValue(d.channel),
400403
"payload", truncateValue(d.payload),
401404
"kind", d.kind.String(),
402-
)
405+
}
406+
if d.pattern != "" {
407+
logAttrs = append(logAttrs, "pattern", truncateValue(d.pattern))
408+
}
409+
sp.logger.Warn("pubsub shadow divergence", logAttrs...)
403410

404411
cmd := "SUBSCRIBE"
405412
if d.isPattern {
@@ -418,6 +425,7 @@ func (sp *shadowPubSub) reportDivergence(d divergenceEvent) {
418425
sp.sentry.CaptureDivergence(Divergence{
419426
Command: cmd,
420427
Key: d.channel,
428+
Pattern: d.pattern,
421429
Kind: d.kind,
422430
Primary: primary,
423431
Secondary: secondary,

proxy/shadow_pubsub_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ func TestShadowPubSub_SecondaryWithinWindowMatches(t *testing.T) {
315315
assert.Equal(t, float64(0), extra, "within-window secondary match must suppress extra_data")
316316
}
317317

318+
// TestShadowPubSub_ExpiredPrimaryThenLateSecondary verifies that a secondary
318319
// arriving after the comparison window does not suppress the already-expired
319320
// primary divergence. Both a DivDataMismatch (primary) and eventually a
320321
// DivExtraData (secondary) should be reported.

0 commit comments

Comments
 (0)