-
Notifications
You must be signed in to change notification settings - Fork 2
feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2) #732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,11 @@ | ||
| package adapter | ||
|
|
||
| import ( | ||
| "bytes" | ||
|
|
||
| "github.com/cockroachdb/errors" | ||
| ) | ||
|
|
||
| // Per-key dispatch helpers that route to the legacy single-partition | ||
| // constructor or the partitioned-FIFO constructor based on | ||
| // meta.PartitionCount. Phase 3.D PR 5b's central abstraction: | ||
|
|
@@ -116,3 +122,75 @@ func effectivePartitionCount(meta *sqsQueueMeta) uint32 { | |
| } | ||
| return meta.PartitionCount | ||
| } | ||
|
|
||
| // sqsMsgVisScanBoundsDispatch returns the start/end byte ranges that | ||
| // ReceiveMessage's per-partition visibility-index scan iterates. | ||
| // Mirrors sqsMsgVisScanBounds (legacy keyspace) but parametrises the | ||
| // prefix on partition when the queue is partitioned. The bounds are | ||
| // always [prefix||u64(0), prefix||u64(maxVisibleAtMillis+1)) so | ||
| // messages with visible_at == maxVisibleAtMillis are included. | ||
| func sqsMsgVisScanBoundsDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, maxVisibleAtMillis int64) (start, end []byte) { | ||
| prefix := sqsMsgVisPrefixForQueueDispatch(meta, queueName, partition, gen) | ||
| start = append(bytes.Clone(prefix), zeroU64()...) | ||
| upper := uint64MaxZero(maxVisibleAtMillis) | ||
| if upper < ^uint64(0) { | ||
| upper++ | ||
| } | ||
| end = append(bytes.Clone(prefix), encodedU64(upper)...) | ||
| return start, end | ||
| } | ||
|
|
||
| // encodeReceiptHandleDispatch picks the receipt-handle wire format | ||
| // based on meta.PartitionCount: v1 on legacy / non-partitioned | ||
| // queues, v2 on partitioned ones. The partition argument is only | ||
| // consulted on the v2 branch — callers may pass 0 on the legacy | ||
| // branch. | ||
| // | ||
| // This is the single point where a fresh receipt handle commits to | ||
| // a wire version. Pairing the choice with the same meta.PartitionCount | ||
| // the dispatch helpers used to build keys keeps the handle's | ||
| // recorded partition consistent with the partition the message was | ||
| // stored under, so a later DeleteMessage / ChangeMessageVisibility | ||
| // routes to the right keyspace. | ||
| func encodeReceiptHandleDispatch(meta *sqsQueueMeta, partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) { | ||
| if meta != nil && meta.PartitionCount > 1 { | ||
| return encodeReceiptHandleV2(partition, queueGen, messageIDHex, receiptToken) | ||
| } | ||
| return encodeReceiptHandle(queueGen, messageIDHex, receiptToken) | ||
| } | ||
|
|
||
| // validateReceiptHandleVersion enforces the queue-aware version | ||
| // rule that replaced the dormancy gate from PR 5a: | ||
| // | ||
| // - meta.PartitionCount <= 1 (legacy / non-partitioned queue): | ||
| // handle MUST be v1. A v2 handle on a non-partitioned queue | ||
| // is structurally impossible (SendMessage would never have | ||
| // produced one) and accepting it would let a malicious caller | ||
| // re-encode a v1 handle as v2 to probe / corrupt the v2 layout | ||
| // before any partitioned queue exists. | ||
| // - meta.PartitionCount > 1 (partitioned queue): handle MUST be | ||
| // v2. A v1 handle on a partitioned queue carries no partition | ||
| // index, so dispatch would default to partition 0 and the | ||
| // delete / change-visibility would silently miss messages on | ||
| // other partitions. | ||
| // | ||
| // Mismatches surface as ReceiptHandleIsInvalid (the same AWS error | ||
| // shape used for malformed handles), so a misrouted client cannot | ||
| // distinguish "wrong version" from "garbled bytes" — preserving the | ||
| // PR 5a / PR 724 round 3 dormancy guarantee that the v2 wire format | ||
| // is not probeable from the public API. | ||
| func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error { | ||
| if handle == nil { | ||
| return errors.New("receipt handle is nil") | ||
| } | ||
| if meta != nil && meta.PartitionCount > 1 { | ||
| if handle.Version != sqsReceiptHandleVersion2 { | ||
| return errors.New("receipt handle version mismatch for partitioned queue") | ||
| } | ||
| return nil | ||
| } | ||
| if handle.Version != sqsReceiptHandleVersion1 { | ||
| return errors.New("receipt handle version mismatch for non-partitioned queue") | ||
| } | ||
| return nil | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The queue-aware version check now accepts any v2 handle on partitioned queues but never validates Useful? React with 👍 / 👎. |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reject out-of-range v2 partitions in the meta-aware validator.
handle.Partitionis now client-controlled oncedecodeClientReceiptHandleaccepts v2. This helper only checks the version, soPartition >= meta.PartitionCountfalls through tosqsMsg*KeyDispatchand depends on downstream routing failure semantics instead of returningReceiptHandleIsInvalid. This is the right choke point to bounds-check before PR 5b-3 lifts the gate.Suggested fix
func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error { if handle == nil { return errors.New("receipt handle is nil") } if meta != nil && meta.PartitionCount > 1 { if handle.Version != sqsReceiptHandleVersion2 { return errors.New("receipt handle version mismatch for partitioned queue") } + if handle.Partition >= meta.PartitionCount { + return errors.New("receipt handle partition out of range for queue") + } return nil } if handle.Version != sqsReceiptHandleVersion1 { return errors.New("receipt handle version mismatch for non-partitioned queue") }📝 Committable suggestion
🤖 Prompt for AI Agents