Skip to content

Commit 996df49

Browse files
committed
sqs(reaper): polish per Claude review (PR #736, round 1)
Two findings from Claude on PR #736 plus a medium-severity Gemini suggestion in the same review pass: 1. (Claude must-fix) Stale forward-reference comment in reapPage was written in PR 6a pointing at PR 6b. PR 6b is now this branch, so the "is a follow-up to PR 6a" wording was misleading. Trimmed to a one-line note that reapPage covers the legacy keyspace and reapPartitionedPage is the partitioned twin. 2. (Claude should-fix) classifyPartitionedByAgeEntry had no direct coverage of its retention-cutoff branch (live gen, sendTs <= cutoff -> reapable) or its future-gen guard (parsed.gen > currentGen -> not reapable). The integration tests exercise the orphan-cohort and dedup-expiry paths only. Added TestClassifyPartitionedByAgeEntry: a table-driven unit test that pins every branch (within retention, past retention, exact-cutoff boundary, orphan generation, future generation, wrong partition, wrong queue prefix, legacy key) against deterministic key inputs built via sqsPartitionedMsgByAgeKey. No store / coordinator dependency, so the test is fast and immune to retention timing. 3. (Gemini medium) reapExpiredDedup used an if/else to pick between legacy and partitioned dedup reaping for partitioned queues, while reapQueue runs both for symmetry and defensive coverage of leaked legacy entries. Mirrored that policy: legacy scan now always runs (cheap on an empty prefix today), and the partitioned scan additionally runs for partitioned queues. Caller audit: reapExpiredDedup has a single caller (reapAllQueues) and the error contract is unchanged, so no semantic ripple. Refs: PR #736 review thread; Claude review at run 25327783108.
1 parent 80f1c86 commit 996df49

2 files changed

Lines changed: 103 additions & 8 deletions

File tree

adapter/sqs_partitioned_dispatch_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,3 +752,91 @@ func countPartitionedDedupRowsAcrossPartitions(t *testing.T, node Node, queueNam
752752
}
753753
return total
754754
}
755+
756+
// TestClassifyPartitionedByAgeEntry pins every branch of the
757+
// classify helper that decides which partitioned byage rows the
758+
// live-queue reaper sweeps. The integration tests above cover
759+
// the orphan-gen path (tombstoned cohort) and the dedup-expiry
760+
// path; this unit test fills in the two live-queue paths that
761+
// have no integration coverage today: the retention-cutoff branch
762+
// (live gen, sendTs <= cutoff → reapable) and the future-gen
763+
// guard (parsed.gen > currentGen → not reapable, defensive
764+
// against a meta-vs-byage race).
765+
func TestClassifyPartitionedByAgeEntry(t *testing.T) {
766+
t.Parallel()
767+
const (
768+
queueName = "live-byage.fifo"
769+
partition = uint32(2)
770+
currentGen = uint64(5)
771+
cutoff = int64(1_000_000)
772+
messageID = "msg-classify"
773+
)
774+
775+
type want struct {
776+
reapable bool
777+
expectedGen uint64
778+
expectedPartn uint32
779+
expectedSendTs int64
780+
expectedMsgID string
781+
assertParsedSet bool // expectedGen / Partition / SendTs / MsgID checked only when true
782+
}
783+
cases := []struct {
784+
name string
785+
key []byte
786+
want want
787+
}{
788+
{
789+
name: "live gen within retention window — not reapable",
790+
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff+1, messageID),
791+
want: want{reapable: false, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff + 1, expectedMsgID: messageID, assertParsedSet: true},
792+
},
793+
{
794+
name: "live gen past retention window — reapable (cutoff branch)",
795+
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff-1, messageID),
796+
want: want{reapable: true, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff - 1, expectedMsgID: messageID, assertParsedSet: true},
797+
},
798+
{
799+
name: "live gen exactly at cutoff — reapable (>, not >=)",
800+
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff, messageID),
801+
want: want{reapable: true, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff, expectedMsgID: messageID, assertParsedSet: true},
802+
},
803+
{
804+
name: "orphan generation (gen < currentGen) — reapable unconditionally",
805+
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen-1, cutoff+1_000_000, messageID),
806+
want: want{reapable: true, expectedGen: currentGen - 1, expectedPartn: partition, expectedSendTs: cutoff + 1_000_000, expectedMsgID: messageID, assertParsedSet: true},
807+
},
808+
{
809+
name: "future generation (gen > currentGen) — not reapable (gen-race guard)",
810+
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen+1, cutoff-1, messageID),
811+
want: want{reapable: false, expectedGen: currentGen + 1, expectedPartn: partition, expectedSendTs: cutoff - 1, expectedMsgID: messageID, assertParsedSet: true},
812+
},
813+
{
814+
name: "wrong partition — not reapable (page bleed defense)",
815+
key: sqsPartitionedMsgByAgeKey(queueName, partition+1, currentGen, cutoff-1, messageID),
816+
want: want{reapable: false, assertParsedSet: false},
817+
},
818+
{
819+
name: "wrong queue prefix — not reapable (parse fails)",
820+
key: sqsPartitionedMsgByAgeKey("different.fifo", partition, currentGen, cutoff-1, messageID),
821+
want: want{reapable: false, assertParsedSet: false},
822+
},
823+
{
824+
name: "legacy byage key — not reapable (parse fails)",
825+
key: sqsMsgByAgeKey(queueName, currentGen, cutoff-1, messageID),
826+
want: want{reapable: false, assertParsedSet: false},
827+
},
828+
}
829+
for _, tc := range cases {
830+
t.Run(tc.name, func(t *testing.T) {
831+
t.Parallel()
832+
parsed, reapable := classifyPartitionedByAgeEntry(tc.key, queueName, partition, currentGen, cutoff)
833+
require.Equal(t, tc.want.reapable, reapable, "reapable")
834+
if tc.want.assertParsedSet {
835+
require.Equal(t, tc.want.expectedGen, parsed.Generation, "Generation")
836+
require.Equal(t, tc.want.expectedPartn, parsed.Partition, "Partition")
837+
require.Equal(t, tc.want.expectedSendTs, parsed.SendTimestampMs, "SendTimestampMs")
838+
require.Equal(t, tc.want.expectedMsgID, parsed.MessageID, "MessageID")
839+
}
840+
})
841+
}
842+
}

adapter/sqs_reaper.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -619,10 +619,8 @@ func (s *SQSServer) reapPage(ctx context.Context, queueName string, currentGen u
619619
// see meta caught up.
620620
continue
621621
}
622-
// Live-queue retention reap currently iterates only the
623-
// legacy byage keyspace; partitioned-byage live-queue
624-
// retention is a follow-up to PR 6a (the tombstoned-cohort
625-
// path is what this PR addresses).
622+
// reapPage covers the legacy byage keyspace only; the
623+
// partitioned twin is reapPartitionedPage.
626624
if err := s.reapOneRecord(ctx, queueName, nil, 0, parsed.Generation, kvp.Key, parsed.MessageID, readTS); err != nil {
627625
return true, processed, err
628626
}
@@ -731,14 +729,23 @@ func (s *SQSServer) dispatchOrphanByAgeDrop(ctx context.Context, byAgeKey []byte
731729
//
732730
// On partitioned queues (meta.PartitionCount > 1), the dedup
733731
// records live under SqsPartitionedMsgDedupPrefix instead of
734-
// SqsMsgDedupPrefix, so the legacy scan would find zero records
735-
// and the leak would persist; reapExpiredDedupPartitioned takes
736-
// over for that case (PR 6b).
732+
// SqsMsgDedupPrefix, so the legacy scan alone would miss them;
733+
// reapExpiredDedupPartitioned covers that case (PR 6b).
734+
//
735+
// Mirrors reapQueue's both-scan policy: legacy always runs, and
736+
// the partitioned scan additionally runs for partitioned queues.
737+
// The data plane never writes legacy dedup on partitioned queues
738+
// today, but the legacy scan over an empty prefix is cheap, and
739+
// running it unconditionally keeps the two reaper paths symmetric
740+
// and defends against an unforeseen legacy-prefix leak.
737741
func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, meta *sqsQueueMeta, readTS uint64) error {
742+
if err := s.reapExpiredDedupLegacy(ctx, queueName, readTS); err != nil {
743+
return err
744+
}
738745
if meta != nil && meta.PartitionCount > 1 {
739746
return s.reapExpiredDedupPartitioned(ctx, queueName, meta.PartitionCount, readTS)
740747
}
741-
return s.reapExpiredDedupLegacy(ctx, queueName, readTS)
748+
return nil
742749
}
743750

744751
// reapExpiredDedupLegacy is the legacy-keyspace dedup expiry walk,

0 commit comments

Comments
 (0)