-
Notifications
You must be signed in to change notification settings - Fork 2
feat(sqs): per-key dispatch helpers for partitioned-FIFO routing (Phase 3.D PR 5b-1) #731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
bootjp
wants to merge
2
commits into
main
Choose a base branch
from
feat/sqs-htfifo-key-dispatch-helpers
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| package adapter | ||
|
|
||
| // Per-key dispatch helpers that route to the legacy single-partition | ||
| // constructor or the partitioned-FIFO constructor based on | ||
| // meta.PartitionCount. Phase 3.D PR 5b's central abstraction: | ||
| // every send/receive/delete code path that constructs a message- | ||
| // keyspace key goes through one of these wrappers, so the | ||
| // PartitionCount > 1 → partitioned-prefix dispatch lives in one | ||
| // place instead of being scattered across 14 call sites. | ||
| // | ||
| // Contract | ||
| // | ||
| // - meta.PartitionCount <= 1: legacy single-partition layout. | ||
| // The partition argument is ignored. Existing data on disk | ||
| // stays byte-identical with pre-PR-5 deployments. | ||
| // - meta.PartitionCount > 1: partitioned layout, partition is | ||
| // the index in [0, PartitionCount) the caller resolved via | ||
| // partitionFor (for SendMessage) or extracted from a v2 | ||
| // receipt handle (for Delete/ChangeMessageVisibility). | ||
| // | ||
| // Caller responsibility | ||
| // | ||
| // The partition value MUST be valid for the queue's PartitionCount | ||
| // when meta.PartitionCount > 1. Out-of-range values produce a key | ||
| // the cluster's --sqsFifoPartitionMap doesn't have a route for — | ||
| // the partition resolver returns (0, false) and the request fails | ||
| // closed at the routing layer. parseSQSFifoPartitionMap + | ||
| // validatePartitionedFIFO + the v2 codec each enforce their | ||
| // piece, so the dispatch helpers don't re-validate. | ||
|
|
||
| // sqsMsgDataKeyDispatch builds the data-record key for either the | ||
| // legacy or partitioned keyspace, depending on meta.PartitionCount. | ||
| func sqsMsgDataKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, messageID string) []byte { | ||
| if meta != nil && meta.PartitionCount > 1 { | ||
| return sqsPartitionedMsgDataKey(queueName, partition, gen, messageID) | ||
| } | ||
| return sqsMsgDataKey(queueName, gen, messageID) | ||
| } | ||
|
|
||
| // sqsMsgVisKeyDispatch builds the visibility-index key for either | ||
| // keyspace. | ||
| func sqsMsgVisKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, visibleAtMillis int64, messageID string) []byte { | ||
| if meta != nil && meta.PartitionCount > 1 { | ||
| return sqsPartitionedMsgVisKey(queueName, partition, gen, visibleAtMillis, messageID) | ||
| } | ||
| return sqsMsgVisKey(queueName, gen, visibleAtMillis, messageID) | ||
| } | ||
|
|
||
| // sqsMsgDedupKeyDispatch builds the FIFO dedup key for either | ||
| // keyspace. Dedup scope is per-partition on partitioned queues | ||
| // (DeduplicationScope = messageGroup is enforced by the validator | ||
| // on PartitionCount > 1). | ||
| func sqsMsgDedupKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, dedupID string) []byte { | ||
| if meta != nil && meta.PartitionCount > 1 { | ||
| return sqsPartitionedMsgDedupKey(queueName, partition, gen, dedupID) | ||
| } | ||
| return sqsMsgDedupKey(queueName, gen, dedupID) | ||
| } | ||
|
|
||
| // sqsMsgGroupKeyDispatch builds the FIFO group-lock key for either | ||
| // keyspace. partitionFor maps a MessageGroupId to one partition, | ||
| // so a group lock for any given group lives on exactly one | ||
| // partition — there is no cross-partition group-lock invariant | ||
| // to maintain. | ||
| func sqsMsgGroupKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, groupID string) []byte { | ||
| if meta != nil && meta.PartitionCount > 1 { | ||
| return sqsPartitionedMsgGroupKey(queueName, partition, gen, groupID) | ||
| } | ||
| return sqsMsgGroupKey(queueName, gen, groupID) | ||
| } | ||
|
|
||
| // sqsMsgByAgeKeyDispatch builds the send-age index key for either | ||
| // keyspace. The reaper's enumeration helper | ||
| // (sqsMsgByAgePrefixesForQueue) already returns BOTH legacy and | ||
| // partitioned prefixes per queue, so a queue that was created | ||
| // legacy and later — hypothetically — gains partitions does not | ||
| // strand its old data. | ||
| func sqsMsgByAgeKeyDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, sendTimestampMs int64, messageID string) []byte { | ||
| if meta != nil && meta.PartitionCount > 1 { | ||
| return sqsPartitionedMsgByAgeKey(queueName, partition, gen, sendTimestampMs, messageID) | ||
| } | ||
| return sqsMsgByAgeKey(queueName, gen, sendTimestampMs, messageID) | ||
| } | ||
|
|
||
| // sqsMsgVisPrefixForQueueDispatch returns the vis-prefix used by | ||
| // ReceiveMessage's per-partition scan. Legacy queues have one | ||
| // per-(queue, gen) prefix; partitioned queues have one prefix per | ||
| // (queue, partition, gen) — the fanout reader iterates these. | ||
| func sqsMsgVisPrefixForQueueDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64) []byte { | ||
| if meta != nil && meta.PartitionCount > 1 { | ||
| return sqsPartitionedMsgVisPrefixForQueue(queueName, partition, gen) | ||
| } | ||
| return sqsMsgVisPrefixForQueue(queueName, gen) | ||
| } | ||
|
|
||
| // effectivePartitionCount returns the number of partitions the | ||
| // fanout reader iterates. Treats meta.PartitionCount values 0 and | ||
| // 1 as the legacy single-partition layout (one iteration on | ||
| // partition 0). | ||
| // | ||
| // Honors the §3.3 perQueue short-circuit: when | ||
| // meta.FifoThroughputLimit == "perQueue", partitionFor forces | ||
| // every MessageGroupId to partition 0 regardless of | ||
| // PartitionCount, so the only non-empty partition the fanout | ||
| // reader will ever find is 0. Returning the literal | ||
| // PartitionCount in that mode would have ReceiveMessage scan up | ||
| // to 31 guaranteed-empty partitions on every poll, multiplying | ||
| // read / CPU work for no correctness benefit (codex P2 round 1 | ||
| // on PR #731). Mirror the routing decision: collapse to 1. | ||
| func effectivePartitionCount(meta *sqsQueueMeta) uint32 { | ||
| if meta == nil || meta.PartitionCount <= 1 { | ||
| return 1 | ||
| } | ||
| if meta.FifoThroughputLimit == htfifoThroughputPerQueue { | ||
| return 1 | ||
| } | ||
| return meta.PartitionCount | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
effectivePartitionCountreturnsmeta.PartitionCountfor every queue withPartitionCount > 1, butpartitionForexplicitly forces all message groups to partition0whenFifoThroughputLimit == "perQueue"(adapter/sqs_partitioning.go). In that valid configuration, a Receive fanout built on this helper will scan empty partitions on every poll, multiplying read/CPU work by up to the configured partition count (max 32) with no correctness benefit. The helper should collapse to1partition inperQueuemode to match the routing behavior.Useful? React with 👍 / 👎.