Skip to content
35 changes: 21 additions & 14 deletions adapter/sqs_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func resolveFifoDedupID(meta *sqsQueueMeta, in sqsSendMessageInput) string {
// or (nil, nil) when there is no live record for this dedup-id.
// Expired records are surfaced as nil so a stale entry does not block
// a fresh send within the same FIFO queue.
func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) {
key := sqsMsgDedupKey(queueName, gen, dedupID)
func (s *SQSServer) loadFifoDedupRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, dedupID string, readTS uint64) (*sqsFifoDedupRecord, []byte, error) {
key := sqsMsgDedupKeyDispatch(meta, queueName, partition, gen, dedupID)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Include MessageGroupId in partitioned dedup routing

DeduplicationScope=messageGroup requires dedup to be isolated per message group, but this key construction uses only (queue, partition, dedupID). When two different groups hash to the same partition and reuse a dedup ID within the window, the second send is incorrectly treated as a duplicate and dropped/acknowledged with the first message ID. That is a correctness/data-loss issue for partitioned FIFO once the dormancy gate is lifted (and it is already reachable in the partitioned-meta integration path).

Useful? React with 👍 / 👎.

raw, err := s.store.GetAt(ctx, key, readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
Expand Down Expand Up @@ -130,11 +130,11 @@ func (s *SQSServer) loadFifoSequence(ctx context.Context, queueName string, read

// loadFifoGroupLock fetches the in-flight lock for a group, if any.
// Returns nil when no lock is held. Callers that also need the key
// can recompute it via sqsMsgGroupKey — the helper used to return it
// alongside the lock, but every caller already had the key in scope
// from a different code path.
func (s *SQSServer) loadFifoGroupLock(ctx context.Context, queueName string, gen uint64, groupID string, readTS uint64) (*sqsFifoGroupLock, error) {
key := sqsMsgGroupKey(queueName, gen, groupID)
// can recompute it via sqsMsgGroupKeyDispatch — the helper used to
// return it alongside the lock, but every caller already had the
// key in scope from a different code path.
func (s *SQSServer) loadFifoGroupLock(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, groupID string, readTS uint64) (*sqsFifoGroupLock, error) {
key := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, groupID)
raw, err := s.store.GetAt(ctx, key, readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
Expand Down Expand Up @@ -172,7 +172,14 @@ func (s *SQSServer) sendFifoMessage(
delay int64,
readTS uint64,
) (map[string]string, bool, error) {
dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta.Generation, dedupID, readTS)
// HT-FIFO: hash the MessageGroupId once at the entry point so
// every key built in this transaction (data, vis, byage, dedup,
// group-lock, sequence) lands in the same partition. partitionFor
// returns 0 on legacy / non-partitioned queues and on the
// perQueue throughput short-circuit, so the dispatch helpers
// round-trip to legacy output for those cases.
partition := partitionFor(meta, in.MessageGroupId)
dedup, dedupKey, err := s.loadFifoDedupRecord(ctx, queueName, meta, partition, meta.Generation, dedupID, readTS)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -216,9 +223,9 @@ func (s *SQSServer) sendFifoMessage(
return nil, false, errors.WithStack(err)
}

dataKey := sqsMsgDataKey(queueName, meta.Generation, rec.MessageID)
visKey := sqsMsgVisKey(queueName, meta.Generation, rec.AvailableAtMillis, rec.MessageID)
byAgeKey := sqsMsgByAgeKey(queueName, meta.Generation, rec.SendTimestampMillis, rec.MessageID)
dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, meta.Generation, rec.MessageID)
visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, meta.Generation, rec.AvailableAtMillis, rec.MessageID)
byAgeKey := sqsMsgByAgeKeyDispatch(meta, queueName, partition, meta.Generation, rec.SendTimestampMillis, rec.MessageID)
seqKey := sqsQueueSeqKey(queueName)
metaKey := sqsQueueMetaKey(queueName)
genKey := sqsQueueGenKey(queueName)
Expand Down Expand Up @@ -270,9 +277,9 @@ const (

// classifyFifoGroupLock decides whether a FIFO candidate is eligible
// for delivery. Standard queues bypass the function entirely.
func (s *SQSServer) classifyFifoGroupLock(ctx context.Context, queueName string, gen uint64, rec *sqsMessageRecord, readTS uint64) (fifoCandidateLockState, []byte, error) {
lockKey := sqsMsgGroupKey(queueName, gen, rec.MessageGroupId)
lock, err := s.loadFifoGroupLock(ctx, queueName, gen, rec.MessageGroupId, readTS)
func (s *SQSServer) classifyFifoGroupLock(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, rec *sqsMessageRecord, readTS uint64) (fifoCandidateLockState, []byte, error) {
lockKey := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, rec.MessageGroupId)
lock, err := s.loadFifoGroupLock(ctx, queueName, meta, partition, gen, rec.MessageGroupId, readTS)
if err != nil {
return fifoLockSkip, lockKey, err
}
Expand Down
85 changes: 85 additions & 0 deletions adapter/sqs_keys_dispatch.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package adapter

import (
"bytes"

"github.com/cockroachdb/errors"
)

// Per-key dispatch helpers that route to the legacy single-partition
// constructor or the partitioned-FIFO constructor based on
// meta.PartitionCount. Phase 3.D PR 5b's central abstraction:
Expand Down Expand Up @@ -116,3 +122,82 @@ func effectivePartitionCount(meta *sqsQueueMeta) uint32 {
}
return meta.PartitionCount
}

// sqsMsgVisScanBoundsDispatch returns the start/end byte ranges that
// ReceiveMessage's per-partition visibility-index scan iterates.
// Mirrors sqsMsgVisScanBounds (legacy keyspace) but parametrises the
// prefix on partition when the queue is partitioned. The bounds are
// always [prefix||u64(0), prefix||u64(maxVisibleAtMillis+1)) so
// messages with visible_at == maxVisibleAtMillis are included.
func sqsMsgVisScanBoundsDispatch(meta *sqsQueueMeta, queueName string, partition uint32, gen uint64, maxVisibleAtMillis int64) (start, end []byte) {
prefix := sqsMsgVisPrefixForQueueDispatch(meta, queueName, partition, gen)
start = append(bytes.Clone(prefix), zeroU64()...)
upper := uint64MaxZero(maxVisibleAtMillis)
if upper < ^uint64(0) {
upper++
}
end = append(bytes.Clone(prefix), encodedU64(upper)...)
return start, end
}

// encodeReceiptHandleDispatch picks the receipt-handle wire format
// based on meta.PartitionCount: v1 on legacy / non-partitioned
// queues, v2 on partitioned ones. The partition argument is only
// consulted on the v2 branch — callers may pass 0 on the legacy
// branch.
//
// This is the single point where a fresh receipt handle commits to
// a wire version. Pairing the choice with the same meta.PartitionCount
// the dispatch helpers used to build keys keeps the handle's
// recorded partition consistent with the partition the message was
// stored under, so a later DeleteMessage / ChangeMessageVisibility
// routes to the right keyspace.
func encodeReceiptHandleDispatch(meta *sqsQueueMeta, partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) {
if meta != nil && meta.PartitionCount > 1 {
return encodeReceiptHandleV2(partition, queueGen, messageIDHex, receiptToken)
}
return encodeReceiptHandle(queueGen, messageIDHex, receiptToken)
}

// validateReceiptHandleVersion enforces the queue-aware version
// rule that replaced the dormancy gate from PR 5a:
//
// - meta.PartitionCount <= 1 (legacy / non-partitioned queue):
// handle MUST be v1. A v2 handle on a non-partitioned queue
// is structurally impossible (SendMessage would never have
// produced one) and accepting it would let a malicious caller
// re-encode a v1 handle as v2 to probe / corrupt the v2 layout
// before any partitioned queue exists.
// - meta.PartitionCount > 1 (partitioned queue): handle MUST be
// v2. A v1 handle on a partitioned queue carries no partition
// index, so dispatch would default to partition 0 and the
// delete / change-visibility would silently miss messages on
// other partitions.
//
// Mismatches surface as ReceiptHandleIsInvalid (the same AWS error
// shape used for malformed handles), so a misrouted client cannot
// distinguish "wrong version" from "garbled bytes" — preserving the
// PR 5a / PR 724 round 3 dormancy guarantee that the v2 wire format
// is not probeable from the public API.
func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
if handle == nil {
return errors.New("receipt handle is nil")
}
if meta != nil && meta.PartitionCount > 1 {
if handle.Version != sqsReceiptHandleVersion2 {
return errors.New("receipt handle version mismatch for partitioned queue")
}
// handle.Partition is client-controlled once decodeClientReceiptHandle
// accepts v2. Without this bound check, an out-of-range partition
// falls through to sqsMsg*KeyDispatch and depends on downstream
// routing failure semantics instead of returning ReceiptHandleIsInvalid.
if handle.Partition >= meta.PartitionCount {
return errors.New("receipt handle partition out of range for queue")
}
return nil
}
if handle.Version != sqsReceiptHandleVersion1 {
return errors.New("receipt handle version mismatch for non-partitioned queue")
}
return nil
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject out-of-range v2 receipt-handle partitions

The queue-aware version check now accepts any v2 handle on partitioned queues but never validates handle.Partition < meta.PartitionCount. A forged v2 handle with an out-of-range partition is then used to build partitioned keys in delete/change-visibility paths; the partition resolver fails closed for unknown partitions, which bubbles up as a generic internal error instead of ReceiptHandleIsInvalid. This is reachable once partitioned queues are enabled.

Useful? React with 👍 / 👎.

Loading
Loading