Skip to content

Commit 675adc3

Browse files
authored
feat(sqs): live-queue reaper enumerates partitioned keyspace (Phase 3.D PR 6b) (#736)
## Summary Phase 3.D PR 6b: live-queue reaper enumerates partitioned keyspace. Follow-up to PR #735 (6a) — the tombstone-driven sweep already walks partitioned data on DeleteQueue / PurgeQueue, but the live-queue retention reaper still only saw the legacy keyspace, so: - retention-expired messages on partitioned queues leaked their data / vis / byage / group rows forever (`reapQueue` walked `sqsMsgByAgePrefixAllGenerations` only), - expired dedup records on partitioned FIFO queues leaked forever (`reapExpiredDedup` scanned `SqsMsgDedupPrefix` only — empty for partitioned queues since `sqsMsgDedupKeyDispatch` routes their writes under `SqsPartitionedMsgDedupPrefix`). Closes the live-queue half of the Codex P2 from PR #732 round 0; PR 6a covered the tombstoned-cohort half. ## What changes - **`reapQueue`**: legacy byage walk extracted as `reapQueueLegacy` (byte-identical to pre-PR-6b for non-partitioned queues). Adds `reapQueuePartition` step that runs once per partition for `PartitionCount > 1` queues. Per-partition budget per the §6 design ("partitions × budget per cycle"); 30s tick interval comfortably absorbs. - **`reapPartitionedPage`**: partitioned twin of `reapPage`. Same live-vs-orphan classification, but parses each entry with `parseSqsPartitionedMsgByAgeKey` and routes the dispatch through `reapOneRecordPartitioned`. - **`classifyPartitionedByAgeEntry`**: helper extracted from `reapPartitionedPage` so the loop body stays under the cyclop ceiling. Returns `(parsedKey, reapable bool)`. - **`reapExpiredDedup`** (signature changed): now takes `*sqsQueueMeta` and routes by `PartitionCount`. Legacy meta → `reapExpiredDedupLegacy` (byte-identical). Partitioned meta → `reapExpiredDedupPartitioned` (NEW), iterates each partition's dedup prefix under its own per-partition budget. ## Caller audit - `reapQueue` — one production caller (`reapAllQueues`); signature unchanged. Non-partitioned queues byte-identical; partitioned get the extra per-partition pass. - `reapExpiredDedup` — signature changed to take `*sqsQueueMeta`; one production caller (`reapAllQueues`), updated. No tests called it directly. - New helpers (`reapQueueLegacy` / `reapQueuePartition` / `reapPartitionedPage` / `reapExpiredDedupLegacy` / `reapExpiredDedupPartitioned` / `classifyPartitionedByAgeEntry`) each have exactly one production caller in the new live-queue reap path. - `reapOneRecordPartitioned` (existing PR 6a helper): previously called from `reapDeadByAgePartitionPage` (tombstone path); now also from `reapPartitionedPage` (live-queue path). Same dispatch semantics. ## Tests - New `TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions`: 4-partition queue, send across 6 distinct groups, backdate every partitioned dedup record's `ExpiresAtMillis`, run `reapAllQueues`, assert every partitioned dedup row across `[0, 4)` is gone. Pre-PR-6b reaper would leave every row in place. ## Self-review (CLAUDE.md) 1. **Data loss** — Closes the live-queue dedup leak + partitioned retention-expired-message leak. Legacy queues unchanged. 2. **Concurrency / distributed failures** — Reaper still runs only on the leader. Per-partition pass is sequential; per-partition budget bounds the pass. OCC semantics on each record reap unchanged. 3. **Performance** — Per-tick partitioned-queue cost grows from O(1 walk) to O(partition_count walks) on byage AND dedup. Each partition bounded by `sqsReaperPerQueueBudget`. 30s tick interval comfortably absorbs 32-partition × per-queue budget per design. 4. **Data consistency** — Live-vs-orphan classification on partitioned byage mirrors the legacy branch exactly (`reapPage` / `reapPartitionedPage` share the rules through `classifyPartitionedByAgeEntry`). `PartitionCount` immutability means the meta-driven iteration bound matches the on-disk keys. 5. **Test coverage** — One new wire-level integration test for the partitioned dedup walk; the partitioned byage walk reuses parsing / dispatch helpers already covered by PR 6a's tombstone-reap integration test. ## Test plan - [x] `make lint` — 0 issues - [x] Targeted reaper / retention / dedup / HTFIFO / PartitionedFIFO suites (-race, clean) - [x] Wider regression on Send/Receive/Delete + CreateQueue/DeleteQueue/PurgeQueue (-race, clean) - [ ] CI: full Jepsen + race <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Fixed deduplication record cleanup in partitioned FIFO queues to properly remove expired records across all partitions. * Enhanced cleanup mechanism to correctly handle partition-aware behavior and prevent memory leaks in partitioned queue scenarios. * **Tests** * Added comprehensive test for deduplication record cleanup across partitioned FIFO queue partitions. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 04a997a + 996df49 commit 675adc3

2 files changed

Lines changed: 408 additions & 7 deletions

File tree

adapter/sqs_partitioned_dispatch_test.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package adapter
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"net/http"
@@ -648,3 +649,194 @@ func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64)
648649
}
649650
return total
650651
}
652+
653+
// TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions
654+
// pins the PR 6b contract for live queues: reapExpiredDedup must
655+
// walk the partitioned dedup keyspace too, otherwise expired
656+
// dedup records on partitioned FIFO queues leak forever (the
657+
// pre-PR-6b reaper only scanned SqsMsgDedupPrefix, which is empty
658+
// for partitioned queues). Closes the live-queue half of the
659+
// Codex P2 deferred from PR 5b-2 round 0.
660+
func TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions(t *testing.T) {
661+
t.Parallel()
662+
nodes, _, _ := createNode(t, 1)
663+
defer shutdown(nodes)
664+
node := sqsLeaderNode(t, nodes)
665+
666+
const queueName = "live-dedup-reap.fifo"
667+
status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{
668+
"QueueName": queueName,
669+
"Attributes": map[string]string{"FifoQueue": "true"},
670+
})
671+
require.Equal(t, http.StatusOK, status, "create queue: %v", out)
672+
queueURL, _ := out["QueueUrl"].(string)
673+
require.NotEmpty(t, queueURL)
674+
675+
const partitions uint32 = 4
676+
installPartitionedMetaForTest(t, node, queueName, partitions, htfifoThroughputPerMessageGroupID)
677+
678+
// Send across distinct groups so dedup records land on more
679+
// than one partition; partitionFor (FNV-1a) gives reasonable
680+
// spread for these inputs.
681+
groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"}
682+
for _, g := range groups {
683+
status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{
684+
"QueueUrl": queueURL,
685+
"MessageBody": "body-" + g,
686+
"MessageGroupId": g,
687+
"MessageDeduplicationId": "dedup-" + g,
688+
})
689+
require.Equal(t, http.StatusOK, status, "send (group=%s): %v", g, out)
690+
}
691+
692+
srv := node.sqsServer
693+
ctx := t.Context()
694+
695+
// Sanity: at least one partitioned dedup row exists pre-reap.
696+
preReap := countPartitionedDedupRowsAcrossPartitions(t, node, queueName, partitions)
697+
require.Positive(t, preReap, "expected partitioned dedup rows after sends")
698+
699+
// Backdate every partitioned dedup record's ExpiresAtMillis
700+
// so the reaper's value-based expiry filter trips on every
701+
// row. Same approach as TestSQSServer_RetentionReaperDropsExpiredFifoDedup
702+
// for the legacy keyspace; here applied per-partition prefix.
703+
readTS := srv.nextTxnReadTS(ctx)
704+
now := time.Now().UnixMilli()
705+
for partition := uint32(0); partition < partitions; partition++ {
706+
prefix := sqsPartitionedMsgDedupKeyPrefix(queueName, partition, 1)
707+
rows, err := srv.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS)
708+
require.NoError(t, err)
709+
for _, row := range rows {
710+
rec, err := decodeFifoDedupRecord(row.Value)
711+
require.NoError(t, err)
712+
rec.ExpiresAtMillis = now - 1000
713+
body, err := encodeFifoDedupRecord(rec)
714+
require.NoError(t, err)
715+
req := &kv.OperationGroup[kv.OP]{
716+
IsTxn: true,
717+
StartTS: readTS,
718+
Elems: []*kv.Elem[kv.OP]{
719+
{Op: kv.Put, Key: bytes.Clone(row.Key), Value: body},
720+
},
721+
}
722+
_, err = srv.coordinator.Dispatch(ctx, req)
723+
require.NoError(t, err)
724+
}
725+
}
726+
727+
// Drive one reaper pass — the live-queue dedup walk now
728+
// dispatches reapExpiredDedupPartitioned for partitioned
729+
// queues (PR 6b).
730+
require.NoError(t, srv.reapAllQueues(ctx), "reapAllQueues")
731+
732+
postReap := countPartitionedDedupRowsAcrossPartitions(t, node, queueName, partitions)
733+
require.Zero(t, postReap,
734+
"expired partitioned dedup records must be reaped (got %d remaining)",
735+
postReap)
736+
}
737+
738+
// countPartitionedDedupRowsAcrossPartitions sums the rows across
739+
// every partition's partitioned dedup prefix at gen=1 (the
740+
// generation a freshly-created queue lands on). Test-helper
741+
// twin of countPartitionedRows scoped to dedup.
742+
func countPartitionedDedupRowsAcrossPartitions(t *testing.T, node Node, queueName string, partitions uint32) int {
743+
t.Helper()
744+
ctx := context.Background()
745+
readTS := node.sqsServer.nextTxnReadTS(ctx)
746+
total := 0
747+
for partition := uint32(0); partition < partitions; partition++ {
748+
prefix := sqsPartitionedMsgDedupKeyPrefix(queueName, partition, 1)
749+
rows, err := node.sqsServer.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS)
750+
require.NoError(t, err)
751+
total += len(rows)
752+
}
753+
return total
754+
}
755+
756+
// TestClassifyPartitionedByAgeEntry pins every branch of the
757+
// classify helper that decides which partitioned byage rows the
758+
// live-queue reaper sweeps. The integration tests above cover
759+
// the orphan-gen path (tombstoned cohort) and the dedup-expiry
760+
// path; this unit test fills in the two live-queue paths that
761+
// have no integration coverage today: the retention-cutoff branch
762+
// (live gen, sendTs <= cutoff → reapable) and the future-gen
763+
// guard (parsed.gen > currentGen → not reapable, defensive
764+
// against a meta-vs-byage race).
765+
func TestClassifyPartitionedByAgeEntry(t *testing.T) {
766+
t.Parallel()
767+
const (
768+
queueName = "live-byage.fifo"
769+
partition = uint32(2)
770+
currentGen = uint64(5)
771+
cutoff = int64(1_000_000)
772+
messageID = "msg-classify"
773+
)
774+
775+
type want struct {
776+
reapable bool
777+
expectedGen uint64
778+
expectedPartn uint32
779+
expectedSendTs int64
780+
expectedMsgID string
781+
assertParsedSet bool // expectedGen / Partition / SendTs / MsgID checked only when true
782+
}
783+
cases := []struct {
784+
name string
785+
key []byte
786+
want want
787+
}{
788+
{
789+
name: "live gen within retention window — not reapable",
790+
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff+1, messageID),
791+
want: want{reapable: false, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff + 1, expectedMsgID: messageID, assertParsedSet: true},
792+
},
793+
{
794+
name: "live gen past retention window — reapable (cutoff branch)",
795+
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff-1, messageID),
796+
want: want{reapable: true, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff - 1, expectedMsgID: messageID, assertParsedSet: true},
797+
},
798+
{
799+
name: "live gen exactly at cutoff — reapable (>, not >=)",
800+
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen, cutoff, messageID),
801+
want: want{reapable: true, expectedGen: currentGen, expectedPartn: partition, expectedSendTs: cutoff, expectedMsgID: messageID, assertParsedSet: true},
802+
},
803+
{
804+
name: "orphan generation (gen < currentGen) — reapable unconditionally",
805+
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen-1, cutoff+1_000_000, messageID),
806+
want: want{reapable: true, expectedGen: currentGen - 1, expectedPartn: partition, expectedSendTs: cutoff + 1_000_000, expectedMsgID: messageID, assertParsedSet: true},
807+
},
808+
{
809+
name: "future generation (gen > currentGen) — not reapable (gen-race guard)",
810+
key: sqsPartitionedMsgByAgeKey(queueName, partition, currentGen+1, cutoff-1, messageID),
811+
want: want{reapable: false, expectedGen: currentGen + 1, expectedPartn: partition, expectedSendTs: cutoff - 1, expectedMsgID: messageID, assertParsedSet: true},
812+
},
813+
{
814+
name: "wrong partition — not reapable (page bleed defense)",
815+
key: sqsPartitionedMsgByAgeKey(queueName, partition+1, currentGen, cutoff-1, messageID),
816+
want: want{reapable: false, assertParsedSet: false},
817+
},
818+
{
819+
name: "wrong queue prefix — not reapable (parse fails)",
820+
key: sqsPartitionedMsgByAgeKey("different.fifo", partition, currentGen, cutoff-1, messageID),
821+
want: want{reapable: false, assertParsedSet: false},
822+
},
823+
{
824+
name: "legacy byage key — not reapable (parse fails)",
825+
key: sqsMsgByAgeKey(queueName, currentGen, cutoff-1, messageID),
826+
want: want{reapable: false, assertParsedSet: false},
827+
},
828+
}
829+
for _, tc := range cases {
830+
t.Run(tc.name, func(t *testing.T) {
831+
t.Parallel()
832+
parsed, reapable := classifyPartitionedByAgeEntry(tc.key, queueName, partition, currentGen, cutoff)
833+
require.Equal(t, tc.want.reapable, reapable, "reapable")
834+
if tc.want.assertParsedSet {
835+
require.Equal(t, tc.want.expectedGen, parsed.Generation, "Generation")
836+
require.Equal(t, tc.want.expectedPartn, parsed.Partition, "Partition")
837+
require.Equal(t, tc.want.expectedSendTs, parsed.SendTimestampMs, "SendTimestampMs")
838+
require.Equal(t, tc.want.expectedMsgID, parsed.MessageID, "MessageID")
839+
}
840+
})
841+
}
842+
}

0 commit comments

Comments
 (0)