Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions adapter/sqs_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func resolveFifoDedupID(meta *sqsQueueMeta, in sqsSendMessageInput) string {
// or (nil, nil) when there is no live record for this dedup-id.
// Expired records are surfaced as nil so a stale entry does not block
// a fresh send within the same FIFO queue.
func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) {
key := sqsMsgDedupKey(queueName, gen, dedupID)
func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) {
key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, dedupID)
raw, err := s.store.GetAt(ctx, key, readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
Expand Down Expand Up @@ -130,11 +130,11 @@ func (s *SQSServer) loadFifoSequence(ctx context.Context, queueName string, read

// loadFifoGroupLock fetches the in-flight lock for a group, if any.
// Returns nil when no lock is held. Callers that also need the key
// can recompute it via sqsMsgGroupKey — the helper used to return it
// alongside the lock, but every caller already had the key in scope
// from a different code path.
func (s *SQSServer) loadFifoGroupLock(ctx context.Context, queueName string, gen uint64, groupID string, readTS uint64) (*sqsFifoGroupLock, error) {
key := sqsMsgGroupKey(queueName, gen, groupID)
// can recompute it via sqsMsgGroupKeyDispatch — the helper used to
// return it alongside the lock, but every caller already had the
// key in scope from a different code path.
func (s *SQSServer) loadFifoGroupLock(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, groupID string, readTS uint64) (*sqsFifoGroupLock, error) {
key := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, groupID)
raw, err := s.store.GetAt(ctx, key, readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
Expand Down Expand Up @@ -172,7 +172,14 @@ func (s *SQSServer) sendFifoMessage(
delay int64,
readTS uint64,
) (map[string]string, bool, error) {
dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta.Generation, dedupID, readTS)
// HT-FIFO: hash the MessageGroupId once at the entry point so
// every key built in this transaction (data, vis, byage, dedup,
// group-lock, sequence) lands in the same partition. partitionFor
// returns 0 on legacy / non-partitioned queues and on the
// perQueue throughput short-circuit, so the dispatch helpers
// round-trip to legacy output for those cases.
partition := partitionFor(meta, in.MessageGroupId)
dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta, partition, meta.Generation, dedupID, readTS)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -216,9 +223,9 @@ func (s *SQSServer) sendFifoMessage(
return nil, false, errors.WithStack(err)
}

dataKey := sqsMsgDataKey(queueName, meta.Generation, rec.MessageID)
visKey := sqsMsgVisKey(queueName, meta.Generation, rec.AvailableAtMillis, rec.MessageID)
byAgeKey := sqsMsgByAgeKey(queueName, meta.Generation, rec.SendTimestampMillis, rec.MessageID)
dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, rec.MessageID)
visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, meta.Generation, rec.AvailableAtMillis, rec.MessageID)
byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, meta.Generation, rec.SendTimestampMillis, rec.MessageID)
seqKey := sqsQueueSeqKey(queueName)
metaKey := sqsQueueMetaKey(queueName)
genKey := sqsQueueGenKey(queueName)
Expand Down Expand Up @@ -270,9 +277,9 @@ const (

// classifyFifoGroupLock decides whether a FIFO candidate is eligible
// for delivery. Standard queues bypass the function entirely.
func (s *SQSServer) classifyFifoGroupLock(ctx context.Context, queueName string, gen uint64, rec *sqsMessageRecord, readTS uint64) (fifoCandidateLockState, []byte, error) {
lockKey := sqsMsgGroupKey(queueName, gen, rec.MessageGroupId)
lock, err := s.loadFifoGroupLock(ctx, queueName, gen, rec.MessageGroupId, readTS)
func (s *SQSServer) classifyFifoGroupLock(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, rec *sqsMessageRecord, readTS uint64) (fifoCandidateLockState, []byte, error) {
lockKey := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, rec.MessageGroupId)
lock, err := s.loadFifoGroupLock(ctx, queueName, meta, partition, gen, rec.MessageGroupId, readTS)
if err != nil {
return fifoLockSkip, lockKey, err
}
Expand Down
78 changes: 78 additions & 0 deletions adapter/sqs_keys_dispatch.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package adapter

import (
"bytes"

"github.com/cockroachdb/errors"
)

// Per-key dispatch helpers that route to the legacy single-partition
// constructor or the partitioned-FIFO constructor based on
// meta.PartitionCount. Phase 3.D PR 5b's central abstraction:
Expand Down Expand Up @@ -116,3 +122,75 @@ func effectivePartitionCount(meta *sqsQueueMeta) uint32 {
}
return meta.PartitionCount
}

// sqsMsgVisScanBoundsDispatch returns the start/end byte ranges that
// ReceiveMessage's per-partition visibility-index scan iterates.
// Mirrors sqsMsgVisScanBounds (legacy keyspace) but parametrises the
// prefix on partition when the queue is partitioned. The bounds are
// always [prefix||u64(0), prefix||u64(maxVisibleAtMillis+1)) so
// messages with visible_at == maxVisibleAtMillis are included.
func sqsMsgVisScanBoundsDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, maxVisibleAtMillis int64) (start, end []byte) {
prefix := sqsMsgVisPrefixForQueueDispatch(meta, queueName, partition, gen)
start = append(bytes.Clone(prefix), zeroU64()...)
upper := uint64MaxZero(maxVisibleAtMillis)
if upper < ^uint64(0) {
upper++
}
end = append(bytes.Clone(prefix), encodedU64(upper)...)
return start, end
}

// encodeReceiptHandleDispatch picks the receipt-handle wire format
// based on meta.PartitionCount: v1 on legacy / non-partitioned
// queues, v2 on partitioned ones. The partition argument is only
// consulted on the v2 branch — callers may pass 0 on the legacy
// branch.
//
// This is the single point where a fresh receipt handle commits to
// a wire version. Pairing the choice with the same meta.PartitionCount
// the dispatch helpers used to build keys keeps the handle's
// recorded partition consistent with the partition the message was
// stored under, so a later DeleteMessage / ChangeMessageVisibility
// routes to the right keyspace.
func encodeReceiptHandleDispatch(meta *sqsQueueMeta, partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) {
if meta != nil && meta.PartitionCount > 1 {
return encodeReceiptHandleV2(partition, queueGen, messageIDHex, receiptToken)
}
return encodeReceiptHandle(queueGen, messageIDHex, receiptToken)
}

// validateReceiptHandleVersion enforces the queue-aware version
// rule that replaced the dormancy gate from PR 5a:
//
// - meta.PartitionCount <= 1 (legacy / non-partitioned queue):
// handle MUST be v1. A v2 handle on a non-partitioned queue
// is structurally impossible (SendMessage would never have
// produced one) and accepting it would let a malicious caller
// re-encode a v1 handle as v2 to probe / corrupt the v2 layout
// before any partitioned queue exists.
// - meta.PartitionCount > 1 (partitioned queue): handle MUST be
// v2. A v1 handle on a partitioned queue carries no partition
// index, so dispatch would default to partition 0 and the
// delete / change-visibility would silently miss messages on
// other partitions.
//
// Mismatches surface as ReceiptHandleIsInvalid (the same AWS error
// shape used for malformed handles), so a misrouted client cannot
// distinguish "wrong version" from "garbled bytes" — preserving the
// PR 5a / PR 724 round 3 dormancy guarantee that the v2 wire format
// is not probeable from the public API.
func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
if handle == nil {
return errors.New("receipt handle is nil")
}
if meta != nil && meta.PartitionCount > 1 {
if handle.Version != sqsReceiptHandleVersion2 {
return errors.New("receipt handle version mismatch for partitioned queue")
}
return nil
}
if handle.Version != sqsReceiptHandleVersion1 {
return errors.New("receipt handle version mismatch for non-partitioned queue")
}
return nil
Comment on lines +182 to +195
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject out-of-range v2 partitions in the meta-aware validator.

handle.Partition is now client-controlled once decodeClientReceiptHandle accepts v2. This helper only checks the version, so Partition >= meta.PartitionCount falls through to sqsMsg*KeyDispatch and depends on downstream routing failure semantics instead of returning ReceiptHandleIsInvalid. This is the right choke point to bounds-check before PR 5b-3 lifts the gate.

Suggested fix
 func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
 	if handle == nil {
 		return errors.New("receipt handle is nil")
 	}
 	if meta != nil && meta.PartitionCount > 1 {
 		if handle.Version != sqsReceiptHandleVersion2 {
 			return errors.New("receipt handle version mismatch for partitioned queue")
 		}
+		if handle.Partition >= meta.PartitionCount {
+			return errors.New("receipt handle partition out of range for queue")
+		}
 		return nil
 	}
 	if handle.Version != sqsReceiptHandleVersion1 {
 		return errors.New("receipt handle version mismatch for non-partitioned queue")
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
if handle == nil {
return errors.New("receipt handle is nil")
}
if meta != nil && meta.PartitionCount > 1 {
if handle.Version != sqsReceiptHandleVersion2 {
return errors.New("receipt handle version mismatch for partitioned queue")
}
return nil
}
if handle.Version != sqsReceiptHandleVersion1 {
return errors.New("receipt handle version mismatch for non-partitioned queue")
}
return nil
func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
if handle == nil {
return errors.New("receipt handle is nil")
}
if meta != nil && meta.PartitionCount > 1 {
if handle.Version != sqsReceiptHandleVersion2 {
return errors.New("receipt handle version mismatch for partitioned queue")
}
if handle.Partition >= meta.PartitionCount {
return errors.New("receipt handle partition out of range for queue")
}
return nil
}
if handle.Version != sqsReceiptHandleVersion1 {
return errors.New("receipt handle version mismatch for non-partitioned queue")
}
return nil
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sqs_keys_dispatch.go` around lines 182 - 195, In
validateReceiptHandleVersion, add a bounds check for v2 handles so that when
meta != nil && meta.PartitionCount > 1 and handle.Version ==
sqsReceiptHandleVersion2 you also verify handle.Partition < meta.PartitionCount;
if the partition is out of range return the ReceiptHandleIsInvalid error (or an
appropriate error indicating invalid receipt handle) instead of allowing it to
fall through to downstream routing; update the function logic around
validateReceiptHandleVersion to perform this check before returning nil.

}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject out-of-range v2 receipt-handle partitions

The queue-aware version check now accepts any v2 handle on partitioned queues but never validates handle.Partition < meta.PartitionCount. A forged v2 handle with an out-of-range partition is then used to build partitioned keys in delete/change-visibility paths; the partition resolver fails closed for unknown partitions, which bubbles up as a generic internal error instead of ReceiptHandleIsInvalid. This is reachable once partitioned queues are enabled.

Useful? React with 👍 / 👎.

Loading
Loading