Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 183 additions & 18 deletions adapter/sqs_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,23 @@ const (
// has <1% tail-latency overhead, large enough that an empty queue
// does not spin.
sqsLongPollInterval = 200 * time.Millisecond
// Version byte prefixed to encoded receipt handles. Bumped when the
// on-wire handle format changes so old handles fail to decode loudly.
sqsReceiptHandleVersion = byte(0x01)
// sqsReceiptHandleVersion1 is the version byte for the legacy
// (single-partition) receipt-handle layout — pre-PR-5 handles
// and post-PR-5 handles for queues with PartitionCount <= 1.
// Bumped when the on-wire handle format changes so old handles
// fail to decode loudly.
sqsReceiptHandleVersion1 = byte(0x01)
// sqsReceiptHandleVersion2 is the version byte for partitioned
// FIFO handles (PartitionCount > 1). v2 carries the partition
// index so DeleteMessage / ChangeMessageVisibility can dispatch
// to the right partition's keyspace without re-running the
// partitionFor hash. The version byte discriminates: a v1 handle
// against a partitioned queue is rejected, a v2 handle against
// a non-partitioned queue is rejected — both with
// ReceiptHandleIsInvalid. Enforcement of the cross-version
// rejection contract is wired by Phase 3.D PR 5b together with
// the gate-and-lift; this scaffold PR adds the codec only.
sqsReceiptHandleVersion2 = byte(0x02)
// Byte sizes used when pre-sizing key buffers. The exact value is not
// critical; it only avoids one append growth for typical queue/ID
// lengths.
Expand Down Expand Up @@ -213,16 +227,40 @@ func newReceiptToken() ([]byte, error) {
return buf, nil
}

// encodeReceiptHandle packs (queue_gen, message_id, receipt_token) into a
// single opaque blob. Format:
// Per-field byte offsets and total sizes for the v1 / v2 receipt
// handle layouts. Named constants so a future field-shape change
// touches the offsets in one place instead of scattering numeric
// arithmetic through encoders / decoders. Keep this aligned with
// the table in encodeReceiptHandle / encodeReceiptHandleV2 doc
// comments — the on-wire format is operator-visible.
const (
// v1 layout: [version][queue_gen][message_id][receipt_token].
sqsReceiptHandleV1VersionOffset = 0
sqsReceiptHandleV1GenOffset = sqsReceiptHandleV1VersionOffset + 1
sqsReceiptHandleV1IDOffset = sqsReceiptHandleV1GenOffset + 8
sqsReceiptHandleV1TokenOffset = sqsReceiptHandleV1IDOffset + sqsMessageIDBytes
sqsReceiptHandleV1Size = sqsReceiptHandleV1TokenOffset + sqsReceiptTokenBytes

// v2 layout: [version][partition][queue_gen][message_id][receipt_token].
sqsReceiptHandleV2VersionOffset = 0
sqsReceiptHandleV2PartitionOffset = sqsReceiptHandleV2VersionOffset + 1
sqsReceiptHandleV2GenOffset = sqsReceiptHandleV2PartitionOffset + 4
sqsReceiptHandleV2IDOffset = sqsReceiptHandleV2GenOffset + 8
sqsReceiptHandleV2TokenOffset = sqsReceiptHandleV2IDOffset + sqsMessageIDBytes
sqsReceiptHandleV2Size = sqsReceiptHandleV2TokenOffset + sqsReceiptTokenBytes
)

// encodeReceiptHandle packs (queue_gen, message_id, receipt_token) into
// a single opaque v1 blob. Used by SendMessage on a NON-partitioned
// FIFO queue and on Standard queues. Format:
//
// [ 0 ] byte version = 0x01
// [ 0 ] byte version = 0x01
// [ 1..9 ] uint64 queue_gen (BE)
// [ 9..25 ] 16 bytes message_id (raw bytes from hex decode)
// [ 25..41 ] 16 bytes receipt_token
//
// The result is base64-urlsafe (no padding) so it passes through JSON and
// HTTP query parameters untouched.
// The result is base64-urlsafe (no padding) so it passes through
// JSON and HTTP query parameters untouched.
func encodeReceiptHandle(queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) {
if len(receiptToken) != sqsReceiptTokenBytes {
return "", errors.New("receipt token has wrong length")
Expand All @@ -231,35 +269,162 @@ func encodeReceiptHandle(queueGen uint64, messageIDHex string, receiptToken []by
if err != nil || len(idBytes) != sqsMessageIDBytes {
return "", errors.New("message id has wrong format")
}
buf := make([]byte, 0, 1+8+sqsMessageIDBytes+sqsReceiptTokenBytes)
buf = append(buf, sqsReceiptHandleVersion)
buf := make([]byte, 0, sqsReceiptHandleV1Size)
buf = append(buf, sqsReceiptHandleVersion1)
buf = appendU64(buf, queueGen)
buf = append(buf, idBytes...)
buf = append(buf, receiptToken...)
return base64.RawURLEncoding.EncodeToString(buf), nil
}

// encodeReceiptHandleV2 packs (partition, queue_gen, message_id,
// receipt_token) for partitioned FIFO queues. Used by SendMessage
// on a partitioned queue (PartitionCount > 1). Format:
//
// [ 0 ] byte version = 0x02
// [ 1..5 ] uint32 partition (BE)
// [ 5..13 ] uint64 queue_gen (BE)
// [ 13..29 ] 16 bytes message_id
// [ 29..45 ] 16 bytes receipt_token
//
// DeleteMessage / ChangeMessageVisibility use the partition field
// to route the operation to the right partition's keyspace
// without re-running partitionFor (the original MessageGroupId is
// not in the handle).
func encodeReceiptHandleV2(partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) {
if len(receiptToken) != sqsReceiptTokenBytes {
return "", errors.New("receipt token has wrong length")
}
idBytes, err := hex.DecodeString(messageIDHex)
if err != nil || len(idBytes) != sqsMessageIDBytes {
return "", errors.New("message id has wrong format")
}
buf := make([]byte, 0, sqsReceiptHandleV2Size)
buf = append(buf, sqsReceiptHandleVersion2)
buf = appendU32(buf, partition)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function appendU32 is called here but it is not defined in this file or any other provided file in the package. This will cause a compilation error. You should add a helper function similar to appendU64 for uint32 values. Ensure that such utility functions are centralized if they are intended for use across the repository to maintain consistency and avoid duplication.

func appendU32(dst []byte, v uint32) []byte {
	var buf [4]byte
	binary.BigEndian.PutUint32(buf[:], v)
	return append(dst, buf[:]...)
}

func encodeReceiptHandleV2(partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) {
References
  1. Centralize common logic into shared helper functions to avoid code duplication and improve maintainability.

buf = appendU64(buf, queueGen)
buf = append(buf, idBytes...)
buf = append(buf, receiptToken...)
return base64.RawURLEncoding.EncodeToString(buf), nil
}

// decodedReceiptHandle is the parsed shape of a receipt handle.
// Version reports which on-wire format the handle was in:
// - 0x01 (sqsReceiptHandleVersion1): legacy single-partition
// handle. Partition is 0 by definition.
// - 0x02 (sqsReceiptHandleVersion2): partitioned-FIFO handle.
// Partition is the partition index the message was sent to.
//
// Callers that route by partition (DeleteMessage,
// ChangeMessageVisibility on a partitioned queue) MUST inspect
// Version before consulting Partition — a v1 handle's zero
// Partition is the LEGACY meaning ("single partition"), not
// "partition 0 of N". A queue's PartitionCount at the moment the
// caller acts must agree with the handle's version, or the
// dispatch is rejected with ReceiptHandleIsInvalid.
type decodedReceiptHandle struct {
Version byte
Partition uint32
QueueGeneration uint64
MessageIDHex string
ReceiptToken []byte
}

// decodeReceiptHandle inspects the version byte to dispatch to
// the v1 or v2 layout. Length and version mismatches both surface
// as the same opaque error so a malicious caller cannot use the
// error message to probe the format.
func decodeReceiptHandle(raw string) (*decodedReceiptHandle, error) {
b, err := base64.RawURLEncoding.DecodeString(raw)
if err != nil {
return nil, errors.WithStack(err)
}
want := 1 + 8 + sqsMessageIDBytes + sqsReceiptTokenBytes
if len(b) != want || b[0] != sqsReceiptHandleVersion {
if len(b) == 0 {
return nil, errors.New("receipt handle length or version mismatch")
}
out := &decodedReceiptHandle{
QueueGeneration: binary.BigEndian.Uint64(b[1:9]),
MessageIDHex: hex.EncodeToString(b[9 : 9+sqsMessageIDBytes]),
ReceiptToken: bytes.Clone(b[9+sqsMessageIDBytes:]),
switch b[0] {
case sqsReceiptHandleVersion1:
return decodeReceiptHandleV1(b)
case sqsReceiptHandleVersion2:
return decodeReceiptHandleV2(b)
default:
return nil, errors.New("receipt handle length or version mismatch")
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return out, nil
}

func decodeReceiptHandleV1(b []byte) (*decodedReceiptHandle, error) {
if len(b) != sqsReceiptHandleV1Size {
return nil, errors.New("receipt handle length or version mismatch")
}
return &decodedReceiptHandle{
Version: sqsReceiptHandleVersion1,
Partition: 0,
QueueGeneration: binary.BigEndian.Uint64(
b[sqsReceiptHandleV1GenOffset:sqsReceiptHandleV1IDOffset]),
MessageIDHex: hex.EncodeToString(
b[sqsReceiptHandleV1IDOffset:sqsReceiptHandleV1TokenOffset]),
ReceiptToken: bytes.Clone(
b[sqsReceiptHandleV1TokenOffset:sqsReceiptHandleV1Size]),
}, nil
}

// decodeClientReceiptHandle is the public-API entry point for
// decoding a client-supplied receipt handle. It wraps
// decodeReceiptHandle with the dormancy gate that keeps the v2
// codec inert until Phase 3.D PR 5b wires the partitioned-FIFO
// data plane.
//
// # Why the gate
//
// PR 5a adds the v2 codec to the binary but does NOT yet wire any
// production path that produces v2 handles — SendMessage on a
// partitioned queue is rejected by the §11 PR 2 dormancy gate
// (PartitionCount > 1 → InvalidAttributeValue). Without this
// helper, a client could craft a v2 handle (re-encoding a
// legitimately-issued v1 handle's queue_gen / message_id /
// receipt_token under the v2 layout) and DeleteMessage /
// ChangeMessageVisibility would accept it, since the downstream
// validation only checks queue_gen + receipt_token. The behaviour
// is technically correct (the v1 keyspace lookup still finds the
// message) but it leaks the new wire format before PR 5b lands —
// breaking the "no behavior change yet" guarantee of this PR
// (codex/coderabbit major on PR #724).
//
// PR 5b lifts this gate together with the rest of the data-plane
// fanout: it replaces the != v1 check with a queue-aware version
// (v1 required on non-partitioned queues, v2 required on
// partitioned ones), so neither version leaks into the wrong
// keyspace. Until then, any v2 handle on the public API surfaces
// as ReceiptHandleIsInvalid.
func decodeClientReceiptHandle(raw string) (*decodedReceiptHandle, error) {
handle, err := decodeReceiptHandle(raw)
if err != nil {
return nil, err
}
if handle.Version != sqsReceiptHandleVersion1 {
// v2 codec is added but dormant until PR 5b. Reject any
// non-v1 handle on the public API so the wire format
// does not leak.
return nil, errors.New("receipt handle version is not yet enabled on the public API")
}
return handle, nil
}

func decodeReceiptHandleV2(b []byte) (*decodedReceiptHandle, error) {
if len(b) != sqsReceiptHandleV2Size {
return nil, errors.New("receipt handle length or version mismatch")
}
return &decodedReceiptHandle{
Version: sqsReceiptHandleVersion2,
Partition: binary.BigEndian.Uint32(
b[sqsReceiptHandleV2PartitionOffset:sqsReceiptHandleV2GenOffset]),
QueueGeneration: binary.BigEndian.Uint64(
b[sqsReceiptHandleV2GenOffset:sqsReceiptHandleV2IDOffset]),
MessageIDHex: hex.EncodeToString(
b[sqsReceiptHandleV2IDOffset:sqsReceiptHandleV2TokenOffset]),
ReceiptToken: bytes.Clone(
b[sqsReceiptHandleV2TokenOffset:sqsReceiptHandleV2Size]),
}, nil
}

// ------------------------ input decoding ------------------------
Expand Down Expand Up @@ -1371,7 +1536,7 @@ func (s *SQSServer) parseQueueAndReceipt(queueUrl, receiptHandle string) (string
if err != nil {
return "", nil, err
}
handle, err := decodeReceiptHandle(receiptHandle)
handle, err := decodeClientReceiptHandle(receiptHandle)
if err != nil {
return "", nil, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle is not parseable")
}
Expand Down
4 changes: 2 additions & 2 deletions adapter/sqs_messages_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (s *SQSServer) deleteMessageBatch(w http.ResponseWriter, r *http.Request) {
// retry-bound stale-is-success delete that single DeleteMessage
// uses. Per-entry isolation matches AWS, where a malformed
// handle in slot 3 must not poison slot 4.
handle, decodeErr := decodeReceiptHandle(entry.ReceiptHandle)
handle, decodeErr := decodeClientReceiptHandle(entry.ReceiptHandle)
if decodeErr != nil {
failed = append(failed, sqsBatchResultErrorEntry{
Id: entry.Id,
Expand Down Expand Up @@ -567,7 +567,7 @@ func (s *SQSServer) applyChangeVisibilityBatchEntry(ctx context.Context, queueNa
SenderFault: true,
}
}
handle, decodeErr := decodeReceiptHandle(entry.ReceiptHandle)
handle, decodeErr := decodeClientReceiptHandle(entry.ReceiptHandle)
if decodeErr != nil {
return false, sqsBatchResultErrorEntry{
Id: entry.Id,
Expand Down
Loading
Loading