From 08968f9361a518b2fe52c78c2a35bae4c46c7323 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 2 May 2026 19:24:27 +0900 Subject: [PATCH 1/2] feat(sqs): per-key dispatch helpers for partitioned-FIFO routing (Phase 3.D PR 5b-1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for PR 5b's Send/Receive partition fanout. Adds sqsMsg*KeyDispatch wrappers that route to the legacy or partitioned constructor based on meta.PartitionCount. Pure scaffold: helpers are added but no production call site uses them yet — those switch in PR 5b-2. What's added adapter/sqs_keys_dispatch.go (new file): - sqsMsgDataKeyDispatch - sqsMsgVisKeyDispatch - sqsMsgDedupKeyDispatch - sqsMsgGroupKeyDispatch - sqsMsgByAgeKeyDispatch - sqsMsgVisPrefixForQueueDispatch - effectivePartitionCount helper for fanout iteration Each dispatch helper takes (meta, queueName, partition, gen, …) and routes to the legacy single-partition constructor when meta.PartitionCount <= 1 (existing behaviour, byte-identical output) or to the partitioned constructor when > 1 (PR #715's keyspace). Why a separate scaffold PR PR 5b's full scope (Send/Receive fanout + receipt-handle v2 wiring + dormancy gate-lift + CreateQueue capability check) is ~1000 LOC across multiple files. The lessons-learned discipline from the recent rounds (PR #723 missed the raftadmin stub because the audit was function-scoped, not interface-scoped) favours smaller diffs. Splitting PR 5b into three: - PR 5b-1 (this PR): dispatch helpers + tests, zero behaviour change. - PR 5b-2: SendMessage / ReceiveMessage / DeleteMessage / ChangeMessageVisibility wire through the helpers. Receipt- handle v2 dispatch wired. Still gated by §11 PR 2 dormancy. - PR 5b-3 (atomic): gate-lift + CreateQueue capability check via PollSQSHTFIFOCapability. Small-but-critical. What does NOT change yet - 14 call sites in adapter/sqs_messages.go still call the legacy sqsMsg*Key constructors directly. PR 5b-2 swaps them. - encodeReceiptHandleV2 stays dormant. - decodeClientReceiptHandle still rejects v2 unconditionally. - CreateQueue still rejects PartitionCount > 1 with InvalidAttributeValue. Tests (5 top-level) - TestSQSKeysDispatch_LegacyMatchesLegacyConstructor: 8 sub-cases pinning byte-identical output for nil meta, PartitionCount=0, PartitionCount=1. - TestSQSKeysDispatch_PartitionedMatchesPartitionedConstructor: 6 sub-cases pinning byte-identical output for PartitionCount=4 partitioned constructors. - TestSQSKeysDispatch_LegacyAndPartitionedAreDistinct: keyspace isolation invariant — same logical record on legacy vs partitioned never shares a byte sequence. - TestEffectivePartitionCount: iteration-count contract (nil/0/1 → 1; explicit count otherwise). - TestSQSKeysDispatch_PartitionIgnoredOnLegacy: stale- partition caller against a non-partitioned queue produces the same key — protects against silent strand of messages. Self-review (per CLAUDE.md) 1. Data loss — wrappers only; no FSM/Pebble/retention path. Byte- identical output to legacy on non-partitioned queues. No issue. 2. Concurrency — pure functions, no shared state. No issue. 3. Performance — one nil-check + one comparison per dispatch. The non-partitioned path (steady state today) goes through the same constructor it always did. No issue. 4. Data consistency — keyspace isolation is pinned by TestSQSKeysDispatch_LegacyAndPartitionedAreDistinct. Stale- partition stranding is pinned by TestSQSKeysDispatch_PartitionIgnoredOnLegacy. No issue. 5. Test coverage — 19 sub-tests across the contract surface. Existing partitioned-constructor tests (PR #703) and legacy constructor tests continue to pass unchanged. --- adapter/sqs_keys_dispatch.go | 105 ++++++++++++++++++ adapter/sqs_keys_dispatch_test.go | 178 ++++++++++++++++++++++++++++++ 2 files changed, 283 insertions(+) create mode 100644 adapter/sqs_keys_dispatch.go create mode 100644 adapter/sqs_keys_dispatch_test.go diff --git a/adapter/sqs_keys_dispatch.go b/adapter/sqs_keys_dispatch.go new file mode 100644 index 00000000..3218e1eb --- /dev/null +++ b/adapter/sqs_keys_dispatch.go @@ -0,0 +1,105 @@ +package adapter + +// Per-key dispatch helpers that route to the legacy single-partition +// constructor or the partitioned-FIFO constructor based on +// meta.PartitionCount. Phase 3.D PR 5b's central abstraction: +// every send/receive/delete code path that constructs a message- +// keyspace key goes through one of these wrappers, so the +// PartitionCount > 1 → partitioned-prefix dispatch lives in one +// place instead of being scattered across 14 call sites. +// +// Contract +// +// - meta.PartitionCount <= 1: legacy single-partition layout. +// The partition argument is ignored. Existing data on disk +// stays byte-identical with pre-PR-5 deployments. +// - meta.PartitionCount > 1: partitioned layout, partition is +// the index in [0, PartitionCount) the caller resolved via +// partitionFor (for SendMessage) or extracted from a v2 +// receipt handle (for Delete/ChangeMessageVisibility). +// +// Caller responsibility +// +// The partition value MUST be valid for the queue's PartitionCount +// when meta.PartitionCount > 1. Out-of-range values produce a key +// the cluster's --sqsFifoPartitionMap doesn't have a route for — +// the partition resolver returns (0, false) and the request fails +// closed at the routing layer. parseSQSFifoPartitionMap + +// validatePartitionedFIFO + the v2 codec each enforce their +// piece, so the dispatch helpers don't re-validate. + +// sqsMsgDataKeyDispatch builds the data-record key for either the +// legacy or partitioned keyspace, depending on meta.PartitionCount. +func sqsMsgDataKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, messageID string) []byte { + if meta != nil && meta.PartitionCount > 1 { + return sqsPartitionedMsgDataKey(queueName, partition, gen, messageID) + } + return sqsMsgDataKey(queueName, gen, messageID) +} + +// sqsMsgVisKeyDispatch builds the visibility-index key for either +// keyspace. +func sqsMsgVisKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, visibleAtMillis int64, messageID string) []byte { + if meta != nil && meta.PartitionCount > 1 { + return sqsPartitionedMsgVisKey(queueName, partition, gen, visibleAtMillis, messageID) + } + return sqsMsgVisKey(queueName, gen, visibleAtMillis, messageID) +} + +// sqsMsgDedupKeyDispatch builds the FIFO dedup key for either +// keyspace. Dedup scope is per-partition on partitioned queues +// (DeduplicationScope = messageGroup is enforced by the validator +// on PartitionCount > 1). +func sqsMsgDedupKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, dedupID string) []byte { + if meta != nil && meta.PartitionCount > 1 { + return sqsPartitionedMsgDedupKey(queueName, partition, gen, dedupID) + } + return sqsMsgDedupKey(queueName, gen, dedupID) +} + +// sqsMsgGroupKeyDispatch builds the FIFO group-lock key for either +// keyspace. partitionFor maps a MessageGroupId to one partition, +// so a group lock for any given group lives on exactly one +// partition — there is no cross-partition group-lock invariant +// to maintain. +func sqsMsgGroupKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, groupID string) []byte { + if meta != nil && meta.PartitionCount > 1 { + return sqsPartitionedMsgGroupKey(queueName, partition, gen, groupID) + } + return sqsMsgGroupKey(queueName, gen, groupID) +} + +// sqsMsgByAgeKeyDispatch builds the send-age index key for either +// keyspace. The reaper's enumeration helper +// (sqsMsgByAgePrefixesForQueue) already returns BOTH legacy and +// partitioned prefixes per queue, so a queue that was created +// legacy and later — hypothetically — gains partitions does not +// strand its old data. +func sqsMsgByAgeKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, sendTimestampMs int64, messageID string) []byte { + if meta != nil && meta.PartitionCount > 1 { + return sqsPartitionedMsgByAgeKey(queueName, partition, gen, sendTimestampMs, messageID) + } + return sqsMsgByAgeKey(queueName, gen, sendTimestampMs, messageID) +} + +// sqsMsgVisPrefixForQueueDispatch returns the vis-prefix used by +// ReceiveMessage's per-partition scan. Legacy queues have one +// per-(queue, gen) prefix; partitioned queues have one prefix per +// (queue, partition, gen) — the fanout reader iterates these. +func sqsMsgVisPrefixForQueueDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64) []byte { + if meta != nil && meta.PartitionCount > 1 { + return sqsPartitionedMsgVisPrefixForQueue(queueName, partition, gen) + } + return sqsMsgVisPrefixForQueue(queueName, gen) +} + +// effectivePartitionCount returns the number of partitions the +// fanout reader iterates. Treats meta.PartitionCount values 0 and +// 1 as the legacy single-partition layout (one iteration on +// partition 0). +func effectivePartitionCount(meta *sqsQueueMeta) uint32 { + if meta == nil || meta.PartitionCount <= 1 { + return 1 + } + return meta.PartitionCount +} diff --git a/adapter/sqs_keys_dispatch_test.go b/adapter/sqs_keys_dispatch_test.go new file mode 100644 index 00000000..5ab35885 --- /dev/null +++ b/adapter/sqs_keys_dispatch_test.go @@ -0,0 +1,178 @@ +package adapter + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestSQSKeysDispatch_LegacyMatchesLegacyConstructor pins that +// every dispatch helper, when meta.PartitionCount <= 1, produces +// byte-for-byte the same key as the existing legacy constructor. +// The stage-1 contract is "no behavior change for non-partitioned +// queues" — any drift here would corrupt every existing queue. +func TestSQSKeysDispatch_LegacyMatchesLegacyConstructor(t *testing.T) { + t.Parallel() + const ( + queue = "orders.fifo" + gen = uint64(7) + msgID = "0123456789abcdef" + groupID = "user-42" + dedupID = "dedup-token" + ts = int64(1700000000000) + ) + cases := []struct { + name string + dispatched []byte + legacy []byte + }{ + {"meta=nil data", sqsMsgDataKeyDispatch(nil, queue, 0, gen, msgID), + sqsMsgDataKey(queue, gen, msgID)}, + {"meta.PartitionCount=0 data", + sqsMsgDataKeyDispatch(&sqsQueueMeta{PartitionCount: 0}, queue, 0, gen, msgID), + sqsMsgDataKey(queue, gen, msgID)}, + {"meta.PartitionCount=1 data", + sqsMsgDataKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, msgID), + sqsMsgDataKey(queue, gen, msgID)}, + {"meta.PartitionCount=1 vis", + sqsMsgVisKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, ts, msgID), + sqsMsgVisKey(queue, gen, ts, msgID)}, + {"meta.PartitionCount=1 dedup", + sqsMsgDedupKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, dedupID), + sqsMsgDedupKey(queue, gen, dedupID)}, + {"meta.PartitionCount=1 group", + sqsMsgGroupKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, groupID), + sqsMsgGroupKey(queue, gen, groupID)}, + {"meta.PartitionCount=1 byage", + sqsMsgByAgeKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, ts, msgID), + sqsMsgByAgeKey(queue, gen, ts, msgID)}, + {"meta.PartitionCount=1 vis prefix", + sqsMsgVisPrefixForQueueDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen), + sqsMsgVisPrefixForQueue(queue, gen)}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.legacy, tc.dispatched, + "dispatched key must be byte-identical to the legacy "+ + "constructor on a non-partitioned queue; otherwise "+ + "existing data on disk becomes unreadable") + }) + } +} + +// TestSQSKeysDispatch_PartitionedMatchesPartitionedConstructor +// pins the converse: for meta.PartitionCount > 1, every dispatch +// helper produces byte-for-byte the same key as the partitioned +// constructor. This is what makes the partitioned key family +// reachable for SendMessage / ReceiveMessage in stage 2 and +// stage 3. +func TestSQSKeysDispatch_PartitionedMatchesPartitionedConstructor(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 4} + const ( + queue = "events.fifo" + gen = uint64(11) + partition = uint32(2) + msgID = "fedcba9876543210" + groupID = "tenant-9" + dedupID = "send-token" + ts = int64(1701234567890) + ) + cases := []struct { + name string + dispatched []byte + partitioned []byte + }{ + {"data", + sqsMsgDataKeyDispatch(meta, queue, partition, gen, msgID), + sqsPartitionedMsgDataKey(queue, partition, gen, msgID)}, + {"vis", + sqsMsgVisKeyDispatch(meta, queue, partition, gen, ts, msgID), + sqsPartitionedMsgVisKey(queue, partition, gen, ts, msgID)}, + {"dedup", + sqsMsgDedupKeyDispatch(meta, queue, partition, gen, dedupID), + sqsPartitionedMsgDedupKey(queue, partition, gen, dedupID)}, + {"group", + sqsMsgGroupKeyDispatch(meta, queue, partition, gen, groupID), + sqsPartitionedMsgGroupKey(queue, partition, gen, groupID)}, + {"byage", + sqsMsgByAgeKeyDispatch(meta, queue, partition, gen, ts, msgID), + sqsPartitionedMsgByAgeKey(queue, partition, gen, ts, msgID)}, + {"vis prefix", + sqsMsgVisPrefixForQueueDispatch(meta, queue, partition, gen), + sqsPartitionedMsgVisPrefixForQueue(queue, partition, gen)}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.partitioned, tc.dispatched, + "dispatched key must be byte-identical to the "+ + "partitioned constructor on a partitioned queue") + }) + } +} + +// TestSQSKeysDispatch_LegacyAndPartitionedAreDistinct pins the +// keyspace-isolation invariant at the dispatch level: a legacy +// (PartitionCount=1) key and a partitioned (PartitionCount>1) key +// for the same conceptual record never share a byte sequence. +// This is what makes meta.PartitionCount the routing decision — +// without keyspace distinctness, a single-partition queue and a +// partitioned queue of the same name would collide. +func TestSQSKeysDispatch_LegacyAndPartitionedAreDistinct(t *testing.T) { + t.Parallel() + legacyMeta := &sqsQueueMeta{PartitionCount: 1} + partitionedMeta := &sqsQueueMeta{PartitionCount: 4} + const ( + queue = "q.fifo" + gen = uint64(1) + msgID = "id" + ) + legacy := sqsMsgDataKeyDispatch(legacyMeta, queue, 0, gen, msgID) + partitioned := sqsMsgDataKeyDispatch(partitionedMeta, queue, 0, gen, msgID) + require.NotEqual(t, legacy, partitioned, + "legacy and partitioned keys must be byte-distinct") + require.False(t, bytes.HasPrefix(legacy, partitioned), + "legacy key must not start with partitioned key bytes") + require.False(t, bytes.HasPrefix(partitioned, legacy), + "partitioned key must not start with legacy key bytes") +} + +// TestEffectivePartitionCount pins the iteration-count helper +// that ReceiveMessage's fanout uses. Returns 1 for any +// PartitionCount that the rest of the system treats as +// non-partitioned (nil meta, 0, 1) and the explicit count +// otherwise. +func TestEffectivePartitionCount(t *testing.T) { + t.Parallel() + require.Equal(t, uint32(1), effectivePartitionCount(nil)) + require.Equal(t, uint32(1), effectivePartitionCount(&sqsQueueMeta{})) + require.Equal(t, uint32(1), effectivePartitionCount(&sqsQueueMeta{PartitionCount: 1})) + require.Equal(t, uint32(2), effectivePartitionCount(&sqsQueueMeta{PartitionCount: 2})) + require.Equal(t, uint32(8), effectivePartitionCount(&sqsQueueMeta{PartitionCount: 8})) + require.Equal(t, uint32(32), effectivePartitionCount(&sqsQueueMeta{PartitionCount: 32})) +} + +// TestSQSKeysDispatch_PartitionIgnoredOnLegacy pins the contract +// that the partition argument is ignored when meta.PartitionCount +// <= 1 — calling with partition=0 vs partition=999 against a +// legacy queue produces the same key. Without this, a buggy +// caller passing a stale partition value to a non-partitioned +// queue would corrupt the keyspace. +func TestSQSKeysDispatch_PartitionIgnoredOnLegacy(t *testing.T) { + t.Parallel() + legacyMeta := &sqsQueueMeta{PartitionCount: 1} + const ( + queue = "legacy.fifo" + gen = uint64(3) + msgID = "id" + ) + zero := sqsMsgDataKeyDispatch(legacyMeta, queue, 0, gen, msgID) + bogus := sqsMsgDataKeyDispatch(legacyMeta, queue, 999, gen, msgID) + require.Equal(t, zero, bogus, + "partition arg must be ignored on a non-partitioned queue; "+ + "otherwise a stale-partition caller could write to a "+ + "different keyspace and silently strand the message") +} From 379fc024d6fe309242295fa618342013df0dcfec Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 2 May 2026 19:33:11 +0900 Subject: [PATCH 2/2] fix(sqs): perQueue mode collapses fanout + expand dispatch test coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #731 round 1 review findings: 1) Codex P2 — effectivePartitionCount must honor perQueue mode partitionFor (sqs_partitioning.go:76) forces every MessageGroupId to partition 0 when meta.FifoThroughputLimit == "perQueue", regardless of PartitionCount. effectivePartitionCount returned the literal PartitionCount in that mode, so ReceiveMessage's fanout would scan up to 31 guaranteed-empty partitions per poll for no correctness benefit — multiplying read / CPU work by the configured partition count. Fix: collapse to 1 when FifoThroughputLimit is the perQueue short-circuit. Mirrors partitionFor's routing decision. TestEffectivePartitionCount_PerQueueModeCollapsesToOne pins this with 4 sub-cases (perQueue + 4, perQueue + 32, perMessageGroupId + 4, empty + 4) so a future refactor that drops the branch is caught. 2) Claude finding 1 — expand legacy-match test coverage The PR description claimed nil and PartitionCount=0 sub-cases for all 6 helpers, but the actual table only had them for sqsMsgDataKeyDispatch. Expanded to cover all 6 helpers. Without this, a future edit that drops the `meta != nil` guard from one of the non-data helpers would compile and pass the existing test suite. TestSQSKeysDispatch_LegacyMatchesLegacyConstructor grew from 8 to 15 sub-cases. 3) Claude finding 2 — PartitionCount=2 boundary TestSQSKeysDispatch_PartitionedMatchesPartitionedConstructor used PartitionCount=4. An off-by-one in the dispatch threshold (>= 1 vs > 1) wouldn't be caught because 2 was never exercised. New TestSQSKeysDispatch_BoundaryAtPartitionCount2 pins the boundary cheaply. CodeRabbit nitpick on the eager-construction test pattern is acknowledged as "Low value" in their own classification — left as-is since the existing pattern is consistent with the project's other table-driven tests. go test -race ./adapter/... and golangci-lint clean. --- adapter/sqs_keys_dispatch.go | 13 ++++ adapter/sqs_keys_dispatch_test.go | 114 ++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/adapter/sqs_keys_dispatch.go b/adapter/sqs_keys_dispatch.go index 3218e1eb..0b57877b 100644 --- a/adapter/sqs_keys_dispatch.go +++ b/adapter/sqs_keys_dispatch.go @@ -97,9 +97,22 @@ func sqsMsgVisPrefixForQueueDispatch(meta *sqsQueueMeta, queueName string, parti // fanout reader iterates. Treats meta.PartitionCount values 0 and // 1 as the legacy single-partition layout (one iteration on // partition 0). +// +// Honors the §3.3 perQueue short-circuit: when +// meta.FifoThroughputLimit == "perQueue", partitionFor forces +// every MessageGroupId to partition 0 regardless of +// PartitionCount, so the only non-empty partition the fanout +// reader will ever find is 0. Returning the literal +// PartitionCount in that mode would have ReceiveMessage scan up +// to 31 guaranteed-empty partitions on every poll, multiplying +// read / CPU work for no correctness benefit (codex P2 round 1 +// on PR #731). Mirror the routing decision: collapse to 1. func effectivePartitionCount(meta *sqsQueueMeta) uint32 { if meta == nil || meta.PartitionCount <= 1 { return 1 } + if meta.FifoThroughputLimit == htfifoThroughputPerQueue { + return 1 + } return meta.PartitionCount } diff --git a/adapter/sqs_keys_dispatch_test.go b/adapter/sqs_keys_dispatch_test.go index 5ab35885..27de6399 100644 --- a/adapter/sqs_keys_dispatch_test.go +++ b/adapter/sqs_keys_dispatch_test.go @@ -27,11 +27,38 @@ func TestSQSKeysDispatch_LegacyMatchesLegacyConstructor(t *testing.T) { dispatched []byte legacy []byte }{ + // meta=nil sub-cases — ratchet against accidentally + // dropping the nil-guard from any helper. {"meta=nil data", sqsMsgDataKeyDispatch(nil, queue, 0, gen, msgID), sqsMsgDataKey(queue, gen, msgID)}, + {"meta=nil vis", + sqsMsgVisKeyDispatch(nil, queue, 0, gen, ts, msgID), + sqsMsgVisKey(queue, gen, ts, msgID)}, + {"meta=nil dedup", + sqsMsgDedupKeyDispatch(nil, queue, 0, gen, dedupID), + sqsMsgDedupKey(queue, gen, dedupID)}, + {"meta=nil group", + sqsMsgGroupKeyDispatch(nil, queue, 0, gen, groupID), + sqsMsgGroupKey(queue, gen, groupID)}, + {"meta=nil byage", + sqsMsgByAgeKeyDispatch(nil, queue, 0, gen, ts, msgID), + sqsMsgByAgeKey(queue, gen, ts, msgID)}, + {"meta=nil vis prefix", + sqsMsgVisPrefixForQueueDispatch(nil, queue, 0, gen), + sqsMsgVisPrefixForQueue(queue, gen)}, + // meta.PartitionCount=0 sub-cases — ratchet against + // accidentally dropping the > 1 guard. {"meta.PartitionCount=0 data", sqsMsgDataKeyDispatch(&sqsQueueMeta{PartitionCount: 0}, queue, 0, gen, msgID), sqsMsgDataKey(queue, gen, msgID)}, + {"meta.PartitionCount=0 vis", + sqsMsgVisKeyDispatch(&sqsQueueMeta{PartitionCount: 0}, queue, 0, gen, ts, msgID), + sqsMsgVisKey(queue, gen, ts, msgID)}, + {"meta.PartitionCount=0 vis prefix", + sqsMsgVisPrefixForQueueDispatch(&sqsQueueMeta{PartitionCount: 0}, queue, 0, gen), + sqsMsgVisPrefixForQueue(queue, gen)}, + // meta.PartitionCount=1 sub-cases — ratchet that the > 1 + // boundary is exclusive: 1 is still the legacy layout. {"meta.PartitionCount=1 data", sqsMsgDataKeyDispatch(&sqsQueueMeta{PartitionCount: 1}, queue, 0, gen, msgID), sqsMsgDataKey(queue, gen, msgID)}, @@ -114,6 +141,32 @@ func TestSQSKeysDispatch_PartitionedMatchesPartitionedConstructor(t *testing.T) } } +// TestSQSKeysDispatch_BoundaryAtPartitionCount2 pins the > 1 +// threshold: PartitionCount=2 is the smallest value that selects +// the partitioned keyspace. An off-by-one in the dispatch +// condition (e.g. >= 1 vs > 1) would not be caught by tests that +// only exercise PartitionCount=4. +func TestSQSKeysDispatch_BoundaryAtPartitionCount2(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 2} + const ( + queue = "boundary.fifo" + gen = uint64(1) + msgID = "id" + ) + got := sqsMsgDataKeyDispatch(meta, queue, 0, gen, msgID) + want := sqsPartitionedMsgDataKey(queue, 0, gen, msgID) + require.Equal(t, want, got, + "PartitionCount=2 must dispatch to the partitioned "+ + "keyspace; an off-by-one in the > 1 threshold would "+ + "silently put PR 5b's first-partitioned-queue traffic "+ + "on the legacy keyspace") + // And it must NOT match the legacy constructor. + legacy := sqsMsgDataKey(queue, gen, msgID) + require.NotEqual(t, legacy, got, + "PartitionCount=2 must NOT route to the legacy keyspace") +} + // TestSQSKeysDispatch_LegacyAndPartitionedAreDistinct pins the // keyspace-isolation invariant at the dispatch level: a legacy // (PartitionCount=1) key and a partitioned (PartitionCount>1) key @@ -155,6 +208,67 @@ func TestEffectivePartitionCount(t *testing.T) { require.Equal(t, uint32(32), effectivePartitionCount(&sqsQueueMeta{PartitionCount: 32})) } +// TestEffectivePartitionCount_PerQueueModeCollapsesToOne pins the +// codex P2 round-1 fix on PR #731: when a queue is configured +// with FifoThroughputLimit=perQueue, partitionFor's §3.3 short- +// circuit forces every MessageGroupId to partition 0 regardless +// of PartitionCount. The fanout helper MUST mirror that decision +// — returning the literal PartitionCount would have +// ReceiveMessage scan up to 31 guaranteed-empty partitions on +// every poll for no correctness benefit. +// +// Without this ratchet, a future refactor that drops the +// perQueue branch from effectivePartitionCount would silently +// regress receive performance to "scan 32 empty partitions per +// poll" with no test failure. +func TestEffectivePartitionCount_PerQueueModeCollapsesToOne(t *testing.T) { + t.Parallel() + cases := []struct { + name string + meta *sqsQueueMeta + want uint32 + }{ + { + name: "perQueue + PartitionCount=4 → 1", + meta: &sqsQueueMeta{ + PartitionCount: 4, + FifoThroughputLimit: htfifoThroughputPerQueue, + }, + want: 1, + }, + { + name: "perQueue + PartitionCount=32 (max) → 1", + meta: &sqsQueueMeta{ + PartitionCount: 32, + FifoThroughputLimit: htfifoThroughputPerQueue, + }, + want: 1, + }, + { + name: "perMessageGroupId + PartitionCount=4 → 4", + meta: &sqsQueueMeta{ + PartitionCount: 4, + FifoThroughputLimit: htfifoThroughputPerMessageGroupID, + }, + want: 4, + }, + { + name: "empty FifoThroughputLimit + PartitionCount=4 → 4", + meta: &sqsQueueMeta{ + PartitionCount: 4, + FifoThroughputLimit: "", + }, + want: 4, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.want, effectivePartitionCount(tc.meta)) + }) + } +} + // TestSQSKeysDispatch_PartitionIgnoredOnLegacy pins the contract // that the partition argument is ignored when meta.PartitionCount // <= 1 — calling with partition=0 vs partition=999 against a