Skip to content

Commit 80f1c86

Browse files
committed
feat(sqs): live-queue reaper enumerates partitioned keyspace (Phase 3.D PR 6b)
Follow-up to PR 6a (#735). The tombstone-driven reaper now sweeps partitioned data / vis / byage / dedup / group records on DeleteQueue / PurgeQueue, but the live-queue retention reaper still walked only the legacy keyspace, so: - retention-expired messages on partitioned queues leaked their data / vis / byage / group rows forever (reapQueue's byage walk used sqsMsgByAgePrefixAllGenerations only), - expired dedup records on partitioned FIFO queues leaked forever (reapExpiredDedup's prefix scan used SqsMsgDedupPrefix only — empty for partitioned queues, since sqsMsgDedupKeyDispatch routes their writes under SqsPartitionedMsgDedupPrefix). Closes the live-queue half of the Codex P2 from PR #732 round 0 ("Reap partitioned dedup records to prevent key growth"); PR 6a covered the tombstoned-cohort half. Scope (the second slice of §11 PR 6 from the split-queue-FIFO design doc): - reapQueue: legacy byage walk extracted as reapQueueLegacy, behaviour byte-identical to pre-PR-6b for non-partitioned queues. Adds a new reapQueuePartition step that runs once per partition for PartitionCount > 1 queues. Each partition gets its own per-partition budget per the §6 design ("partitions × budget per cycle"); a 32-partition queue thus allows up to 32 × sqsReaperPerQueueBudget records per tick, comfortably within the 30s reaper interval. - reapPartitionedPage: partitioned twin of reapPage. Same live-vs-orphan classification (parsed.SendTimestampMs > cutoff under the live gen, unconditional reap under older gens, defensive skip on parsed.Generation > currentGen) but parses each entry with parseSqsPartitionedMsgByAgeKey and routes the dispatch through reapOneRecordPartitioned. - classifyPartitionedByAgeEntry: helper extracted from reapPartitionedPage so the loop body stays under the cyclop ceiling. Returns (parsedKey, reapable bool). - reapExpiredDedup: now takes *sqsQueueMeta and routes by PartitionCount. Legacy meta (PartitionCount <= 1) → reapExpiredDedupLegacy (byte-identical to pre-PR-6b walk). Partitioned meta (PartitionCount > 1) → reapExpiredDedupPartitioned (NEW), which iterates each partition's partitioned dedup prefix under its own per-partition budget and uses the existing reapDedupPage to apply the value-based ExpiresAtMillis filter. Caller audit: - reapQueue: one production caller (reapAllQueues line 85). No signature change. Behaviour for non-partitioned queues byte-identical; partitioned queues get the additional per-partition pass. - reapExpiredDedup: signature changed to take *sqsQueueMeta. One production caller (reapAllQueues line 88), updated. No test files invoked this helper directly. Legacy meta routes to the byte-identical legacy walk; partitioned routes to the new walk. - reapQueueLegacy / reapQueuePartition / reapExpiredDedupLegacy / reapExpiredDedupPartitioned / classifyPartitionedByAgeEntry: each has exactly one caller in the new live-queue reap path. - reapOneRecordPartitioned: existing helper from PR 6a. Previous caller was reapDeadByAgePartitionPage (tombstone path); now also called from reapPartitionedPage (live-queue path). Same dispatch semantics — synthetic meta carrying PartitionCount > 1 to flip the dispatch helper branch. Tests: - New TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions (wire-level): create a 4-partition FIFO queue, send across 6 distinct groups, backdate every partitioned dedup record's ExpiresAtMillis, run reapAllQueues, assert every partitioned dedup row across [0, 4) is gone. Pre-PR-6b reaper would have left every row in place — the test fails on the legacy code path. Self-review (CLAUDE.md): 1. Data loss — Closes the live-queue dedup leak; closes the partitioned retention-expired-message leak. Legacy queues unchanged: reapQueue, reapExpiredDedup, byage walks, and dispatch helpers all keep their byte-identical pre-PR-6b paths for PartitionCount <= 1. 2. Concurrency / distributed failures — Reaper still runs only on the leader. Each partition's pass is sequential; per- partition budget bounds the pass. Existing OCC dispatch semantics on each per-record reap unchanged. 3. Performance — Per-tick partitioned-queue cost grows from O(1 walk) to O(partition_count walks) on byage AND dedup. Each partition bounded by sqsReaperPerQueueBudget. 30s tick interval comfortably absorbs 32-partition × per-queue budget per design. 4. Data consistency — Live-vs-orphan classification on partitioned byage mirrors the legacy branch exactly (reapPage / reapPartitionedPage share the rules through classifyPartitionedByAgeEntry). PartitionCount immutability means the meta-driven iteration bound matches the on-disk keys for any cohort. 5. Test coverage — One new wire-level integration test for the partitioned dedup walk; the partitioned byage walk reuses parsing / dispatch helpers already tested by PR 6a's tombstone-reap integration test.
1 parent f489669 commit 80f1c86

2 files changed

Lines changed: 309 additions & 3 deletions

File tree

adapter/sqs_partitioned_dispatch_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package adapter
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"net/http"
@@ -648,3 +649,106 @@ func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64)
648649
}
649650
return total
650651
}
652+
653+
// TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions
654+
// pins the PR 6b contract for live queues: reapExpiredDedup must
655+
// walk the partitioned dedup keyspace too, otherwise expired
656+
// dedup records on partitioned FIFO queues leak forever (the
657+
// pre-PR-6b reaper only scanned SqsMsgDedupPrefix, which is empty
658+
// for partitioned queues). Closes the live-queue half of the
659+
// Codex P2 deferred from PR 5b-2 round 0.
660+
func TestSQSServer_PartitionedFIFO_LiveQueueDedupReaperPartitions(t *testing.T) {
661+
t.Parallel()
662+
nodes, _, _ := createNode(t, 1)
663+
defer shutdown(nodes)
664+
node := sqsLeaderNode(t, nodes)
665+
666+
const queueName = "live-dedup-reap.fifo"
667+
status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{
668+
"QueueName": queueName,
669+
"Attributes": map[string]string{"FifoQueue": "true"},
670+
})
671+
require.Equal(t, http.StatusOK, status, "create queue: %v", out)
672+
queueURL, _ := out["QueueUrl"].(string)
673+
require.NotEmpty(t, queueURL)
674+
675+
const partitions uint32 = 4
676+
installPartitionedMetaForTest(t, node, queueName, partitions, htfifoThroughputPerMessageGroupID)
677+
678+
// Send across distinct groups so dedup records land on more
679+
// than one partition; partitionFor (FNV-1a) gives reasonable
680+
// spread for these inputs.
681+
groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"}
682+
for _, g := range groups {
683+
status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{
684+
"QueueUrl": queueURL,
685+
"MessageBody": "body-" + g,
686+
"MessageGroupId": g,
687+
"MessageDeduplicationId": "dedup-" + g,
688+
})
689+
require.Equal(t, http.StatusOK, status, "send (group=%s): %v", g, out)
690+
}
691+
692+
srv := node.sqsServer
693+
ctx := t.Context()
694+
695+
// Sanity: at least one partitioned dedup row exists pre-reap.
696+
preReap := countPartitionedDedupRowsAcrossPartitions(t, node, queueName, partitions)
697+
require.Positive(t, preReap, "expected partitioned dedup rows after sends")
698+
699+
// Backdate every partitioned dedup record's ExpiresAtMillis
700+
// so the reaper's value-based expiry filter trips on every
701+
// row. Same approach as TestSQSServer_RetentionReaperDropsExpiredFifoDedup
702+
// for the legacy keyspace; here applied per-partition prefix.
703+
readTS := srv.nextTxnReadTS(ctx)
704+
now := time.Now().UnixMilli()
705+
for partition := uint32(0); partition < partitions; partition++ {
706+
prefix := sqsPartitionedMsgDedupKeyPrefix(queueName, partition, 1)
707+
rows, err := srv.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS)
708+
require.NoError(t, err)
709+
for _, row := range rows {
710+
rec, err := decodeFifoDedupRecord(row.Value)
711+
require.NoError(t, err)
712+
rec.ExpiresAtMillis = now - 1000
713+
body, err := encodeFifoDedupRecord(rec)
714+
require.NoError(t, err)
715+
req := &kv.OperationGroup[kv.OP]{
716+
IsTxn: true,
717+
StartTS: readTS,
718+
Elems: []*kv.Elem[kv.OP]{
719+
{Op: kv.Put, Key: bytes.Clone(row.Key), Value: body},
720+
},
721+
}
722+
_, err = srv.coordinator.Dispatch(ctx, req)
723+
require.NoError(t, err)
724+
}
725+
}
726+
727+
// Drive one reaper pass — the live-queue dedup walk now
728+
// dispatches reapExpiredDedupPartitioned for partitioned
729+
// queues (PR 6b).
730+
require.NoError(t, srv.reapAllQueues(ctx), "reapAllQueues")
731+
732+
postReap := countPartitionedDedupRowsAcrossPartitions(t, node, queueName, partitions)
733+
require.Zero(t, postReap,
734+
"expired partitioned dedup records must be reaped (got %d remaining)",
735+
postReap)
736+
}
737+
738+
// countPartitionedDedupRowsAcrossPartitions sums the rows across
739+
// every partition's partitioned dedup prefix at gen=1 (the
740+
// generation a freshly-created queue lands on). Test-helper
741+
// twin of countPartitionedRows scoped to dedup.
742+
func countPartitionedDedupRowsAcrossPartitions(t *testing.T, node Node, queueName string, partitions uint32) int {
743+
t.Helper()
744+
ctx := context.Background()
745+
readTS := node.sqsServer.nextTxnReadTS(ctx)
746+
total := 0
747+
for partition := uint32(0); partition < partitions; partition++ {
748+
prefix := sqsPartitionedMsgDedupKeyPrefix(queueName, partition, 1)
749+
rows, err := node.sqsServer.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS)
750+
require.NoError(t, err)
751+
total += len(rows)
752+
}
753+
return total
754+
}

adapter/sqs_reaper.go

Lines changed: 205 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (s *SQSServer) reapAllQueues(ctx context.Context) error {
8585
if err := s.reapQueue(ctx, name, meta, readTS); err != nil {
8686
slog.Warn("sqs reaper queue pass failed", "queue", name, "err", err)
8787
}
88-
if err := s.reapExpiredDedup(ctx, name, readTS); err != nil {
88+
if err := s.reapExpiredDedup(ctx, name, meta, readTS); err != nil {
8989
slog.Warn("sqs dedup reaper pass failed", "queue", name, "err", err)
9090
}
9191
}
@@ -425,6 +425,39 @@ func (s *SQSServer) reapQueue(ctx context.Context, queueName string, meta *sqsQu
425425
// for the live generation so we never delete live records.
426426
cutoff = 0
427427
}
428+
// Legacy byage scan — always runs. For non-partitioned queues
429+
// this is the only path; for partitioned queues this also
430+
// catches any defensive legacy entry that might have leaked
431+
// in (the data plane never writes here on partitioned queues
432+
// today, but the sweep is idempotent and cheap).
433+
if err := s.reapQueueLegacy(ctx, queueName, meta.Generation, cutoff, readTS); err != nil {
434+
return err
435+
}
436+
// Partitioned byage scan — one per partition under its own
437+
// per-partition budget. Per the §6 split-queue-FIFO design,
438+
// the per-queue budget becomes a per-partition budget so a
439+
// 32-partition queue cannot starve other queues; instead its
440+
// reap completes in partitions × budget time per cycle, which
441+
// at the 30s reaper interval is well within budget.
442+
if meta.PartitionCount > 1 {
443+
for partition := uint32(0); partition < meta.PartitionCount; partition++ {
444+
if err := ctx.Err(); err != nil {
445+
return errors.WithStack(err)
446+
}
447+
if err := s.reapQueuePartition(ctx, queueName, partition, meta.Generation, cutoff, readTS); err != nil {
448+
return err
449+
}
450+
}
451+
}
452+
return nil
453+
}
454+
455+
// reapQueueLegacy is the existing legacy-keyspace byage walk for
456+
// one queue, factored out of reapQueue so the partitioned twin
457+
// (reapQueuePartition) can sit alongside it under the same
458+
// per-budget contract. Behaviour for non-partitioned queues is
459+
// byte-identical to pre-PR-6b reapQueue.
460+
func (s *SQSServer) reapQueueLegacy(ctx context.Context, queueName string, currentGen uint64, cutoff int64, readTS uint64) error {
428461
prefix := sqsMsgByAgePrefixAllGenerations(queueName)
429462
upper := prefixScanEnd(prefix)
430463
start := bytes.Clone(prefix)
@@ -438,7 +471,58 @@ func (s *SQSServer) reapQueue(ctx context.Context, queueName string, meta *sqsQu
438471
if len(page) == 0 {
439472
return nil
440473
}
441-
done, newProcessed, err := s.reapPage(ctx, queueName, meta.Generation, cutoff, page, readTS, processed)
474+
done, newProcessed, err := s.reapPage(ctx, queueName, currentGen, cutoff, page, readTS, processed)
475+
if err != nil {
476+
return err
477+
}
478+
processed = newProcessed
479+
if done {
480+
return nil
481+
}
482+
start = nextScanCursorAfter(page[len(page)-1].Key)
483+
if bytes.Compare(start, upper) >= 0 {
484+
return nil
485+
}
486+
}
487+
return nil
488+
}
489+
490+
// reapQueuePartition is the partitioned-keyspace twin of
491+
// reapQueueLegacy. Walks one partition's byage prefix family
492+
// across all generations the partitioned-byage prefix matches
493+
// (parseSqsPartitionedMsgByAgeKey returns the gen embedded in
494+
// each key) and reaps each entry past the retention cutoff (live
495+
// gen) or unconditionally (any older gen — orphan from a prior
496+
// PurgeQueue).
497+
//
498+
// Per-partition budget rather than per-queue: a 32-partition
499+
// queue therefore allows up to 32 × sqsReaperPerQueueBudget
500+
// records per tick, which the 30s tick interval comfortably
501+
// absorbs (§6 design contract).
502+
func (s *SQSServer) reapQueuePartition(ctx context.Context, queueName string, partition uint32, currentGen uint64, cutoff int64, readTS uint64) error {
503+
// Note: the partitioned-byage prefix embeds (queue, partition)
504+
// but not the generation, so this scan walks every generation
505+
// for that partition. reapPartitionedPage filters per-entry by
506+
// the (currentGen, cutoff) live-vs-orphan rules, mirroring
507+
// reapPage on the legacy path.
508+
prefix := []byte{}
509+
prefix = append(prefix, SqsPartitionedMsgByAgePrefix...)
510+
prefix = append(prefix, encodeSQSSegment(queueName)...)
511+
prefix = append(prefix, sqsPartitionedQueueTerminator)
512+
prefix = appendU32(prefix, partition)
513+
upper := prefixScanEnd(prefix)
514+
start := bytes.Clone(prefix)
515+
516+
processed := 0
517+
for processed < sqsReaperPerQueueBudget {
518+
page, err := s.store.ScanAt(ctx, start, upper, sqsReaperPageLimit, readTS)
519+
if err != nil {
520+
return errors.WithStack(err)
521+
}
522+
if len(page) == 0 {
523+
return nil
524+
}
525+
done, newProcessed, err := s.reapPartitionedPage(ctx, queueName, partition, currentGen, cutoff, page, readTS, processed)
442526
if err != nil {
443527
return err
444528
}
@@ -454,6 +538,57 @@ func (s *SQSServer) reapQueue(ctx context.Context, queueName string, meta *sqsQu
454538
return nil
455539
}
456540

541+
// reapPartitionedPage is the partitioned twin of reapPage. Same
542+
// live-vs-orphan classification as reapPage, but parses each entry
543+
// as a partitioned byage key and routes the dispatch through
544+
// reapOneRecordPartitioned so the dispatch helpers build
545+
// partitioned data / vis / group keys instead of legacy ones.
546+
func (s *SQSServer) reapPartitionedPage(ctx context.Context, queueName string, partition uint32, currentGen uint64, cutoff int64, page []*store.KVPair, readTS uint64, processed int) (bool, int, error) {
547+
for _, kvp := range page {
548+
if err := ctx.Err(); err != nil {
549+
return true, processed, errors.WithStack(err)
550+
}
551+
parsed, reapable := classifyPartitionedByAgeEntry(kvp.Key, queueName, partition, currentGen, cutoff)
552+
if !reapable {
553+
continue
554+
}
555+
if err := s.reapOneRecordPartitioned(ctx, queueName, partition, parsed.Generation, kvp.Key, parsed.MessageID, readTS); err != nil {
556+
return true, processed, err
557+
}
558+
processed++
559+
if processed >= sqsReaperPerQueueBudget {
560+
return true, processed, nil
561+
}
562+
}
563+
if len(page) < sqsReaperPageLimit {
564+
return true, processed, nil
565+
}
566+
return false, processed, nil
567+
}
568+
569+
// classifyPartitionedByAgeEntry parses a candidate partitioned
570+
// byage key and decides whether it should be reaped this pass.
571+
// Returns reapable=false for entries that do not match the
572+
// partition (page bleed across partitions, defensive), live
573+
// entries inside their retention window, or future-generation
574+
// rows from a meta read that hasn't caught up yet. Pulled out of
575+
// reapPartitionedPage so the loop body stays under the cyclop
576+
// ceiling.
577+
func classifyPartitionedByAgeEntry(key []byte, queueName string, partition uint32, currentGen uint64, cutoff int64) (sqsPartitionedMsgByAgeRecord, bool) {
578+
parsed, ok := parseSqsPartitionedMsgByAgeKey(key, queueName)
579+
if !ok || parsed.Partition != partition {
580+
return sqsPartitionedMsgByAgeRecord{}, false
581+
}
582+
if parsed.Generation == currentGen && parsed.SendTimestampMs > cutoff {
583+
return parsed, false
584+
}
585+
if parsed.Generation > currentGen {
586+
// Defensive against gen-counter races; mirrors reapPage.
587+
return parsed, false
588+
}
589+
return parsed, true
590+
}
591+
457592
// reapPage walks one ScanAt page, dispatching a per-record reap
458593
// transaction. currentGen is the queue's *live* generation; entries
459594
// under any earlier generation are unconditionally reaped, while
@@ -593,7 +728,24 @@ func (s *SQSServer) dispatchOrphanByAgeDrop(ctx context.Context, byAgeKey []byte
593728
// unique MessageDeduplicationIds would accumulate permanent
594729
// dedup-row leaks because the send path treats expired records as
595730
// misses but never removes them.
596-
func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, readTS uint64) error {
731+
//
732+
// On partitioned queues (meta.PartitionCount > 1), the dedup
733+
// records live under SqsPartitionedMsgDedupPrefix instead of
734+
// SqsMsgDedupPrefix, so the legacy scan would find zero records
735+
// and the leak would persist; reapExpiredDedupPartitioned takes
736+
// over for that case (PR 6b).
737+
func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, meta *sqsQueueMeta, readTS uint64) error {
738+
if meta != nil && meta.PartitionCount > 1 {
739+
return s.reapExpiredDedupPartitioned(ctx, queueName, meta.PartitionCount, readTS)
740+
}
741+
return s.reapExpiredDedupLegacy(ctx, queueName, readTS)
742+
}
743+
744+
// reapExpiredDedupLegacy is the legacy-keyspace dedup expiry walk,
745+
// factored out of reapExpiredDedup so the partitioned twin can sit
746+
// alongside it. Behaviour for non-partitioned queues is byte-
747+
// identical to pre-PR-6b reapExpiredDedup.
748+
func (s *SQSServer) reapExpiredDedupLegacy(ctx context.Context, queueName string, readTS uint64) error {
597749
prefix := []byte(SqsMsgDedupPrefix)
598750
prefix = append(prefix, []byte(encodeSQSSegment(queueName))...)
599751
upper := prefixScanEnd(prefix)
@@ -625,6 +777,56 @@ func (s *SQSServer) reapExpiredDedup(ctx context.Context, queueName string, read
625777
return nil
626778
}
627779

780+
// reapExpiredDedupPartitioned is the partitioned-keyspace twin of
781+
// reapExpiredDedupLegacy. Walks every partition's dedup prefix
782+
// across all generations and removes records whose
783+
// ExpiresAtMillis has passed. Each partition gets its own
784+
// per-partition budget (per the §6 design contract — same as
785+
// reapQueuePartition).
786+
func (s *SQSServer) reapExpiredDedupPartitioned(ctx context.Context, queueName string, partitionCount uint32, readTS uint64) error {
787+
now := time.Now().UnixMilli()
788+
for partition := uint32(0); partition < partitionCount; partition++ {
789+
if err := ctx.Err(); err != nil {
790+
return errors.WithStack(err)
791+
}
792+
// Per-partition prefix (across all gens) — the partitioned
793+
// dedup key embeds gen after partition, so a partition-only
794+
// prefix walks every gen for that partition. The per-entry
795+
// expiry check is unchanged from the legacy walk.
796+
prefix := []byte{}
797+
prefix = append(prefix, SqsPartitionedMsgDedupPrefix...)
798+
prefix = append(prefix, encodeSQSSegment(queueName)...)
799+
prefix = append(prefix, sqsPartitionedQueueTerminator)
800+
prefix = appendU32(prefix, partition)
801+
upper := prefixScanEnd(prefix)
802+
start := bytes.Clone(prefix)
803+
804+
processed := 0
805+
for processed < sqsReaperPerQueueBudget {
806+
page, err := s.store.ScanAt(ctx, start, upper, sqsReaperPageLimit, readTS)
807+
if err != nil {
808+
return errors.WithStack(err)
809+
}
810+
if len(page) == 0 {
811+
break
812+
}
813+
done, newProcessed, err := s.reapDedupPage(ctx, page, now, readTS, processed)
814+
if err != nil {
815+
return err
816+
}
817+
processed = newProcessed
818+
if done {
819+
break
820+
}
821+
start = nextScanCursorAfter(page[len(page)-1].Key)
822+
if bytes.Compare(start, upper) >= 0 {
823+
break
824+
}
825+
}
826+
}
827+
return nil
828+
}
829+
628830
// reapDedupPage walks one ScanAt page of dedup records and removes
629831
// any whose ExpiresAtMillis is in the past. Returns done=true when
630832
// the per-queue budget runs out or the page was short.

0 commit comments

Comments
 (0)