From 0206a1cb7350701ff1da9b24701a1532106dcdd2 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 1 May 2026 18:52:54 +0900 Subject: [PATCH 1/3] feat(sqs): receipt-handle v2 codec for partitioned FIFO queues (Phase 3.D PR 5a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the v2 receipt-handle layout that PR 5b's send / receive fanout will consume. Pure scaffold: the v2 encoder is added but not yet called by any production path (PartitionCount > 1 is still rejected by the §11 PR 2 dormancy gate, so SendMessage never reaches a partitioned queue, so encodeReceiptHandleV2 stays unreachable until PR 5b lifts the gate). Splitting the codec out as a small isolated PR keeps PR 5b's review focused on the gate-and-lift atomic sequence — codec correctness and the gate-lift correctness are independent concerns and a single mega-PR would conflate them. Wire format v1 (legacy, single partition): [ 0 ] byte version = 0x01 [ 1..9 ] uint64 queue_gen (BE) [ 9..25 ] 16 bytes message_id [ 25..41 ] 16 bytes receipt_token Total: 41 bytes v2 (partitioned, PartitionCount > 1): [ 0 ] byte version = 0x02 [ 1..5 ] uint32 partition (BE) ← NEW [ 5..13 ] uint64 queue_gen (BE) [ 13..29 ] 16 bytes message_id [ 29..45 ] 16 bytes receipt_token Total: 45 bytes decodedReceiptHandle gains Version (byte) and Partition (uint32) fields. decodeReceiptHandle dispatches by the version byte; v1 keeps Partition=0 by definition. What does NOT change yet - encodeReceiptHandleV2 is dormant until PR 5b's SendMessage partitioned-fanout dispatch wires it (gated by meta.PartitionCount > 1). Production traffic continues to use the v1 encoder verbatim — no behaviour change. - DeleteMessage / ChangeMessageVisibility still consume decodedReceiptHandle without inspecting Version or Partition. PR 5b adds the cross-version rejection contract (a v1 handle against a partitioned queue → ReceiptHandleIsInvalid, and vice-versa). Tests - TestEncodeReceiptHandleV2_RoundTrip — every (partition, queue_gen, message_id, token) tuple round-trips with Version=v2 and the encoder's partition. - TestEncodeReceiptHandleV1_StillReportsV1 — regression: v1 encoder still produces v1-decodable handles with Partition=0. - TestDecodeReceiptHandle_VersionDispatch — v1 and v2 produce distinct on-wire bytes (different version + 4-byte size delta); decoder picks the right layout for each. - TestDecodeReceiptHandle_RejectsLengthMismatch — 5 cases (v1 byte / v2 length, v2 byte / v1 length, v1 truncated, v2 truncated, empty) all fail with the same opaque error. - TestDecodeReceiptHandle_RejectsUnknownVersion — 0x00, 0x03, 0x42, 0xFF all reject. - TestEncodeReceiptHandleV2_RejectsBadInputs — short / long token, non-hex / short / empty id all surface as encoder errors. - TestReceiptHandleVersionConstants_Distinct — pins the invariants v1 != v2, v1 == 0x01, v2 == 0x02, legacy alias points at v1, on-wire size constants match what encoders write. - TestDecodeReceiptHandle_RejectsBase64Garbage — base64 errors surface at the right step. - Existing TestSQSServer_ReceiptHandleCodecRoundTrip continues to pass unchanged (v1 path). Self-review (per CLAUDE.md) 1. Data loss — codec only; no FSM/Pebble path. No issue. 2. Concurrency — no shared state; pure encode/decode functions. No issue. 3. Performance — encode/decode are O(handle size) ≤ 45 bytes. The version-byte dispatch is one switch. No issue. 4. Data consistency — v1 and v2 are length-distinct, version-byte- distinct, and decoder rejects wrong-length blobs. A v1 client parsing a v2 handle (or vice versa) fails closed at the length check — pinned by the 5-case rejection test. No issue. 5. Test coverage — 8 new tests across the codec contract surface; existing v1 round-trip test continues to pass. --- adapter/sqs_messages.go | 135 ++++++++++++-- adapter/sqs_receipt_handle_v2_test.go | 253 ++++++++++++++++++++++++++ 2 files changed, 374 insertions(+), 14 deletions(-) create mode 100644 adapter/sqs_receipt_handle_v2_test.go diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index dcf8cfd6..7a8c07ed 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -43,9 +43,27 @@ const ( // has <1% tail-latency overhead, large enough that an empty queue // does not spin. sqsLongPollInterval = 200 * time.Millisecond - // Version byte prefixed to encoded receipt handles. Bumped when the - // on-wire handle format changes so old handles fail to decode loudly. - sqsReceiptHandleVersion = byte(0x01) + // sqsReceiptHandleVersion1 is the version byte for the legacy + // (single-partition) receipt-handle layout — pre-PR-5 handles + // and post-PR-5 handles for queues with PartitionCount <= 1. + // Bumped when the on-wire handle format changes so old handles + // fail to decode loudly. + sqsReceiptHandleVersion1 = byte(0x01) + // sqsReceiptHandleVersion2 is the version byte for partitioned + // FIFO handles (PartitionCount > 1). v2 carries the partition + // index so DeleteMessage / ChangeMessageVisibility can dispatch + // to the right partition's keyspace without re-running the + // partitionFor hash. The version byte discriminates: a v1 handle + // against a partitioned queue is rejected, a v2 handle against + // a non-partitioned queue is rejected — both with + // ReceiptHandleIsInvalid (codex P1 round-2 on PR #715 documents + // the cross-version rejection contract). + sqsReceiptHandleVersion2 = byte(0x02) + // sqsReceiptHandleVersion is the legacy const name preserved as + // an alias of v1 so existing call sites in this file keep + // compiling. New call sites SHOULD use sqsReceiptHandleVersion1 + // (or sqsReceiptHandleVersion2) explicitly. + sqsReceiptHandleVersion = sqsReceiptHandleVersion1 // Byte sizes used when pre-sizing key buffers. The exact value is not // critical; it only avoids one append growth for typical queue/ID // lengths. @@ -213,16 +231,27 @@ func newReceiptToken() ([]byte, error) { return buf, nil } -// encodeReceiptHandle packs (queue_gen, message_id, receipt_token) into a -// single opaque blob. Format: +// sqsReceiptHandleV1Size is the on-wire byte length of a v1 +// receipt handle: 1 byte version + 8 byte queue_gen + 16 byte +// message_id + 16 byte receipt_token = 41 bytes. +const sqsReceiptHandleV1Size = 1 + 8 + sqsMessageIDBytes + sqsReceiptTokenBytes + +// sqsReceiptHandleV2Size is the on-wire byte length of a v2 +// (partitioned) receipt handle: v1 + 4 bytes for the partition +// uint32 inserted between version and queue_gen = 45 bytes. +const sqsReceiptHandleV2Size = 1 + 4 + 8 + sqsMessageIDBytes + sqsReceiptTokenBytes + +// encodeReceiptHandle packs (queue_gen, message_id, receipt_token) into +// a single opaque v1 blob. Used by SendMessage on a NON-partitioned +// FIFO queue and on Standard queues. Format: // -// [ 0 ] byte version = 0x01 +// [ 0 ] byte version = 0x01 // [ 1..9 ] uint64 queue_gen (BE) // [ 9..25 ] 16 bytes message_id (raw bytes from hex decode) // [ 25..41 ] 16 bytes receipt_token // -// The result is base64-urlsafe (no padding) so it passes through JSON and -// HTTP query parameters untouched. +// The result is base64-urlsafe (no padding) so it passes through +// JSON and HTTP query parameters untouched. func encodeReceiptHandle(queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) { if len(receiptToken) != sqsReceiptTokenBytes { return "", errors.New("receipt token has wrong length") @@ -231,35 +260,113 @@ func encodeReceiptHandle(queueGen uint64, messageIDHex string, receiptToken []by if err != nil || len(idBytes) != sqsMessageIDBytes { return "", errors.New("message id has wrong format") } - buf := make([]byte, 0, 1+8+sqsMessageIDBytes+sqsReceiptTokenBytes) - buf = append(buf, sqsReceiptHandleVersion) + buf := make([]byte, 0, sqsReceiptHandleV1Size) + buf = append(buf, sqsReceiptHandleVersion1) buf = appendU64(buf, queueGen) buf = append(buf, idBytes...) buf = append(buf, receiptToken...) return base64.RawURLEncoding.EncodeToString(buf), nil } +// encodeReceiptHandleV2 packs (partition, queue_gen, message_id, +// receipt_token) for partitioned FIFO queues. Used by SendMessage +// on a partitioned queue (PartitionCount > 1). Format: +// +// [ 0 ] byte version = 0x02 +// [ 1..5 ] uint32 partition (BE) +// [ 5..13 ] uint64 queue_gen (BE) +// [ 13..29 ] 16 bytes message_id +// [ 29..45 ] 16 bytes receipt_token +// +// DeleteMessage / ChangeMessageVisibility use the partition field +// to route the operation to the right partition's keyspace +// without re-running partitionFor (the original MessageGroupId is +// not in the handle). +func encodeReceiptHandleV2(partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) { + if len(receiptToken) != sqsReceiptTokenBytes { + return "", errors.New("receipt token has wrong length") + } + idBytes, err := hex.DecodeString(messageIDHex) + if err != nil || len(idBytes) != sqsMessageIDBytes { + return "", errors.New("message id has wrong format") + } + buf := make([]byte, 0, sqsReceiptHandleV2Size) + buf = append(buf, sqsReceiptHandleVersion2) + buf = appendU32(buf, partition) + buf = appendU64(buf, queueGen) + buf = append(buf, idBytes...) + buf = append(buf, receiptToken...) + return base64.RawURLEncoding.EncodeToString(buf), nil +} + +// decodedReceiptHandle is the parsed shape of a receipt handle. +// Version reports which on-wire format the handle was in: +// - 0x01 (sqsReceiptHandleVersion1): legacy single-partition +// handle. Partition is 0 by definition. +// - 0x02 (sqsReceiptHandleVersion2): partitioned-FIFO handle. +// Partition is the partition index the message was sent to. +// +// Callers that route by partition (DeleteMessage, +// ChangeMessageVisibility on a partitioned queue) MUST inspect +// Version before consulting Partition — a v1 handle's zero +// Partition is the LEGACY meaning ("single partition"), not +// "partition 0 of N". A queue's PartitionCount at the moment the +// caller acts must agree with the handle's version, or the +// dispatch is rejected with ReceiptHandleIsInvalid. type decodedReceiptHandle struct { + Version byte + Partition uint32 QueueGeneration uint64 MessageIDHex string ReceiptToken []byte } +// decodeReceiptHandle inspects the version byte to dispatch to +// the v1 or v2 layout. Length and version mismatches both surface +// as the same opaque error so a malicious caller cannot use the +// error message to probe the format. func decodeReceiptHandle(raw string) (*decodedReceiptHandle, error) { b, err := base64.RawURLEncoding.DecodeString(raw) if err != nil { return nil, errors.WithStack(err) } - want := 1 + 8 + sqsMessageIDBytes + sqsReceiptTokenBytes - if len(b) != want || b[0] != sqsReceiptHandleVersion { + if len(b) == 0 { + return nil, errors.New("receipt handle length or version mismatch") + } + switch b[0] { + case sqsReceiptHandleVersion1: + return decodeReceiptHandleV1(b) + case sqsReceiptHandleVersion2: + return decodeReceiptHandleV2(b) + default: return nil, errors.New("receipt handle length or version mismatch") } - out := &decodedReceiptHandle{ +} + +func decodeReceiptHandleV1(b []byte) (*decodedReceiptHandle, error) { + if len(b) != sqsReceiptHandleV1Size { + return nil, errors.New("receipt handle length or version mismatch") + } + return &decodedReceiptHandle{ + Version: sqsReceiptHandleVersion1, + Partition: 0, QueueGeneration: binary.BigEndian.Uint64(b[1:9]), MessageIDHex: hex.EncodeToString(b[9 : 9+sqsMessageIDBytes]), ReceiptToken: bytes.Clone(b[9+sqsMessageIDBytes:]), + }, nil +} + +func decodeReceiptHandleV2(b []byte) (*decodedReceiptHandle, error) { + if len(b) != sqsReceiptHandleV2Size { + return nil, errors.New("receipt handle length or version mismatch") } - return out, nil + return &decodedReceiptHandle{ + Version: sqsReceiptHandleVersion2, + Partition: binary.BigEndian.Uint32(b[1:5]), + QueueGeneration: binary.BigEndian.Uint64(b[5:13]), + MessageIDHex: hex.EncodeToString(b[13 : 13+sqsMessageIDBytes]), + ReceiptToken: bytes.Clone(b[13+sqsMessageIDBytes:]), + }, nil } // ------------------------ input decoding ------------------------ diff --git a/adapter/sqs_receipt_handle_v2_test.go b/adapter/sqs_receipt_handle_v2_test.go new file mode 100644 index 00000000..13a029d1 --- /dev/null +++ b/adapter/sqs_receipt_handle_v2_test.go @@ -0,0 +1,253 @@ +package adapter + +import ( + "encoding/base64" + "encoding/hex" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestEncodeReceiptHandleV2_RoundTrip pins the v2 codec contract: +// every (partition, queue_gen, message_id, receipt_token) tuple +// round-trips through encodeReceiptHandleV2 → decodeReceiptHandle +// without loss, and the decoded Version is sqsReceiptHandleVersion2 +// while Partition matches the encoder's input. +func TestEncodeReceiptHandleV2_RoundTrip(t *testing.T) { + t.Parallel() + cases := []struct { + name string + partition uint32 + gen uint64 + id string + }{ + {"partition 0 / gen 1", 0, 1, "00000000000000000000000000000000"}, + {"partition 7 / large gen", 7, 0xFFFFFFFFFFFF, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}, + {"partition 31 / typical id", 31, 42, "deadbeefdeadbeefdeadbeefdeadbeef"}, + // PartitionCount cap is 32 per §3.1, so the largest valid + // partition index is 31. The codec itself is uint32 wide + // so an out-of-cap value still encodes — caller validation + // is responsible for cap enforcement. + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + token := make([]byte, sqsReceiptTokenBytes) + for i := range token { + token[i] = byte(i + int(tc.partition)) + } + h, err := encodeReceiptHandleV2(tc.partition, tc.gen, tc.id, token) + require.NoError(t, err) + + back, err := decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion2, back.Version, + "Version must report v2 so callers can dispatch on it") + require.Equal(t, tc.partition, back.Partition) + require.Equal(t, tc.gen, back.QueueGeneration) + require.Equal(t, tc.id, back.MessageIDHex) + require.Equal(t, hex.EncodeToString(token), + hex.EncodeToString(back.ReceiptToken)) + }) + } +} + +// TestEncodeReceiptHandleV1_StillReportsV1 pins the v1 round-trip: +// a v1-encoded handle decodes with Version=v1, Partition=0. This +// guards against a refactor that accidentally upgrades v1 +// encoders to v2 (or vice versa) — the wire-format compatibility +// is the operator's contract. +func TestEncodeReceiptHandleV1_StillReportsV1(t *testing.T) { + t.Parallel() + token := make([]byte, sqsReceiptTokenBytes) + for i := range token { + token[i] = byte(0xAA) + } + h, err := encodeReceiptHandle(99, "deadbeefdeadbeefdeadbeefdeadbeef", token) + require.NoError(t, err) + + back, err := decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion1, back.Version, + "v1 encoder must produce v1-decodable handle") + require.Equal(t, uint32(0), back.Partition, + "v1 handle has no partition field; Partition must be 0") + require.Equal(t, uint64(99), back.QueueGeneration) + require.Equal(t, "deadbeefdeadbeefdeadbeefdeadbeef", back.MessageIDHex) +} + +// TestDecodeReceiptHandle_VersionDispatch pins that decode picks +// the right layout based on the version byte. Critical because +// v1 and v2 have different sizes (41 vs 45 bytes); reading a v2 +// blob with v1 offsets would silently misinterpret the queue_gen +// field. +func TestDecodeReceiptHandle_VersionDispatch(t *testing.T) { + t.Parallel() + token := make([]byte, sqsReceiptTokenBytes) + id := "0123456789abcdef0123456789abcdef" + + v1, err := encodeReceiptHandle(7, id, token) + require.NoError(t, err) + v2, err := encodeReceiptHandleV2(3, 7, id, token) + require.NoError(t, err) + + // v1 and v2 must produce distinct on-wire bytes — version + // byte differs AND v2 carries 4 extra bytes for the partition. + v1Raw, err := base64.RawURLEncoding.DecodeString(v1) + require.NoError(t, err) + v2Raw, err := base64.RawURLEncoding.DecodeString(v2) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleV1Size, len(v1Raw)) + require.Equal(t, sqsReceiptHandleV2Size, len(v2Raw)) + require.Equal(t, sqsReceiptHandleVersion1, v1Raw[0]) + require.Equal(t, sqsReceiptHandleVersion2, v2Raw[0]) + + // Decode dispatches by version: same queue_gen=7, same id, + // same token, distinct Version + Partition. + v1Back, err := decodeReceiptHandle(v1) + require.NoError(t, err) + v2Back, err := decodeReceiptHandle(v2) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion1, v1Back.Version) + require.Equal(t, uint32(0), v1Back.Partition) + require.Equal(t, sqsReceiptHandleVersion2, v2Back.Version) + require.Equal(t, uint32(3), v2Back.Partition) + require.Equal(t, v1Back.QueueGeneration, v2Back.QueueGeneration) + require.Equal(t, v1Back.MessageIDHex, v2Back.MessageIDHex) +} + +// TestDecodeReceiptHandle_RejectsLengthMismatch pins that a blob +// with a known version byte but wrong length fails decode. The +// failure mode the test would catch: a refactor that adds / +// removes a field but forgets to bump the version byte. +func TestDecodeReceiptHandle_RejectsLengthMismatch(t *testing.T) { + t.Parallel() + cases := []struct { + name string + make func() []byte + }{ + { + name: "v1 byte but v2 length", + make: func() []byte { + out := make([]byte, sqsReceiptHandleV2Size) + out[0] = sqsReceiptHandleVersion1 + return out + }, + }, + { + name: "v2 byte but v1 length", + make: func() []byte { + out := make([]byte, sqsReceiptHandleV1Size) + out[0] = sqsReceiptHandleVersion2 + return out + }, + }, + { + name: "v1 truncated", + make: func() []byte { + out := make([]byte, sqsReceiptHandleV1Size-1) + out[0] = sqsReceiptHandleVersion1 + return out + }, + }, + { + name: "v2 truncated", + make: func() []byte { + out := make([]byte, sqsReceiptHandleV2Size-1) + out[0] = sqsReceiptHandleVersion2 + return out + }, + }, + { + name: "empty", + make: func() []byte { return nil }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + h := base64.RawURLEncoding.EncodeToString(tc.make()) + _, err := decodeReceiptHandle(h) + require.Error(t, err) + require.Contains(t, err.Error(), "length or version mismatch") + }) + } +} + +// TestDecodeReceiptHandle_RejectsUnknownVersion pins that an +// unknown version byte fails decode rather than falling through +// to one of the known layouts. +func TestDecodeReceiptHandle_RejectsUnknownVersion(t *testing.T) { + t.Parallel() + for _, version := range []byte{0x00, 0x03, 0x42, 0xFF} { + t.Run("version 0x"+hex.EncodeToString([]byte{version}), func(t *testing.T) { + t.Parallel() + out := make([]byte, sqsReceiptHandleV1Size) + out[0] = version + h := base64.RawURLEncoding.EncodeToString(out) + _, err := decodeReceiptHandle(h) + require.Error(t, err, + "unknown version 0x%02x must fail decode", version) + }) + } +} + +// TestEncodeReceiptHandleV2_RejectsBadInputs pins the encoder's +// input validation: a token of the wrong length, or a hex id +// that doesn't decode to 16 bytes, surfaces as an error rather +// than producing a malformed handle. +func TestEncodeReceiptHandleV2_RejectsBadInputs(t *testing.T) { + t.Parallel() + cases := []struct { + name string + token []byte + id string + }{ + {"short token", make([]byte, sqsReceiptTokenBytes-1), "deadbeefdeadbeefdeadbeefdeadbeef"}, + {"long token", make([]byte, sqsReceiptTokenBytes+1), "deadbeefdeadbeefdeadbeefdeadbeef"}, + {"non-hex id", make([]byte, sqsReceiptTokenBytes), "not-hex-not-hex-not-hex-not-hex0"}, + {"short id", make([]byte, sqsReceiptTokenBytes), "deadbeef"}, + {"empty id", make([]byte, sqsReceiptTokenBytes), ""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + _, err := encodeReceiptHandleV2(0, 1, tc.id, tc.token) + require.Error(t, err) + }) + } +} + +// TestReceiptHandleVersionConstants_Distinct pins the assertion +// that v1 and v2 version bytes differ. A refactor that +// accidentally collapses them (e.g. both → 0x01) would cause v2 +// handles to decode as v1 with garbled data. +func TestReceiptHandleVersionConstants_Distinct(t *testing.T) { + t.Parallel() + require.NotEqual(t, sqsReceiptHandleVersion1, sqsReceiptHandleVersion2, + "v1 and v2 version bytes must differ for the dispatch to work") + require.Equal(t, byte(0x01), sqsReceiptHandleVersion1) + require.Equal(t, byte(0x02), sqsReceiptHandleVersion2) + require.Equal(t, sqsReceiptHandleVersion1, sqsReceiptHandleVersion, + "the legacy alias must continue to point at v1 so existing "+ + "call sites stay on the v1 path") + // On-wire size constants must equal what the encoders write — + // pinning them keeps a future struct change from silently + // changing the wire format. + require.Equal(t, 1+8+sqsMessageIDBytes+sqsReceiptTokenBytes, + sqsReceiptHandleV1Size) + require.Equal(t, 1+4+8+sqsMessageIDBytes+sqsReceiptTokenBytes, + sqsReceiptHandleV2Size) +} + +// TestDecodeReceiptHandle_RejectsBase64Garbage pins that +// non-base64 input fails decode at the base64 step rather than +// the version-byte step. The error wrap chain matters for +// operators triaging client-side encoding bugs. +func TestDecodeReceiptHandle_RejectsBase64Garbage(t *testing.T) { + t.Parallel() + _, err := decodeReceiptHandle("!!!" + strings.Repeat("?", 50)) + require.Error(t, err, + "non-base64 input must fail at the base64 decode step") +} From 42c9894a3027efd77b8313a46e15f67df9dd089b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 1 May 2026 19:01:42 +0900 Subject: [PATCH 2/3] refactor(sqs): drop unused alias + name decoder offsets + extra tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 1 Claude review nits on PR #724: 1) Dropped sqsReceiptHandleVersion alias Round 1 added sqsReceiptHandleVersion as an alias of v1 "for existing call sites". In practice every call site now uses sqsReceiptHandleVersion1 explicitly, so the alias's only reference was a self-test. Dropped both the alias and the test assertion. The two version-byte constants (sqsReceiptHandleVersion1 / sqsReceiptHandleVersion2) are now the single source of truth. 2) Named per-field offset constants decodeReceiptHandleV1 / decodeReceiptHandleV2 used numeric offsets (1, 9, 13, 1+sqsMessageIDBytes, …) derived from layout arithmetic. Replaced with named constants: v1: V1VersionOffset, V1GenOffset, V1IDOffset, V1TokenOffset, V1Size v2: V2VersionOffset, V2PartitionOffset, V2GenOffset, V2IDOffset, V2TokenOffset, V2Size A future field-shape change touches the constants in one place instead of scattering numeric arithmetic through encoders / decoders. 3) Explicit upper bound on token slice bytes.Clone(b[start:]) → bytes.Clone(b[start:end]). The length check above guarantees the open-ended form would have read the right bytes, but the explicit upper bound is self-documenting and protects against a future refactor that forgets the length precondition. 4) Doc note on cross-version rejection decodedReceiptHandle's doc described the cross-version rejection contract ("a v1 handle against a partitioned queue → ReceiptHandleIsInvalid") that this scaffold PR does NOT enforce — that's PR 5b's gate-and-lift. Added an explicit note that enforcement is wired by PR 5b so future review of that PR doesn't go looking for the contract here. 5) New test case: v2 / v1 oversized TestDecodeReceiptHandle_RejectsLengthMismatch grew two cases (v2 + 1 byte, v1 + 1 byte) so the exact-length check on each decoder branch is independently exercised. The previous table covered "byte mismatch" and "truncated" but not "too long for the version". Audit per the lessons-learned discipline The semantic change here is the alias removal. grep -rn sqsReceiptHandleVersion across the repo confirmed zero remaining references after the test assertion was dropped. The named-offset refactor preserves byte-for-byte the same on-wire format; the constants ARE the layout. go test -race ./adapter/... and golangci-lint clean. --- adapter/sqs_messages.go | 68 +++++++++++++++++---------- adapter/sqs_receipt_handle_v2_test.go | 19 ++++++-- 2 files changed, 58 insertions(+), 29 deletions(-) diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index 7a8c07ed..4094da7e 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -56,14 +56,10 @@ const ( // partitionFor hash. The version byte discriminates: a v1 handle // against a partitioned queue is rejected, a v2 handle against // a non-partitioned queue is rejected — both with - // ReceiptHandleIsInvalid (codex P1 round-2 on PR #715 documents - // the cross-version rejection contract). + // ReceiptHandleIsInvalid. Enforcement of the cross-version + // rejection contract is wired by Phase 3.D PR 5b together with + // the gate-and-lift; this scaffold PR adds the codec only. sqsReceiptHandleVersion2 = byte(0x02) - // sqsReceiptHandleVersion is the legacy const name preserved as - // an alias of v1 so existing call sites in this file keep - // compiling. New call sites SHOULD use sqsReceiptHandleVersion1 - // (or sqsReceiptHandleVersion2) explicitly. - sqsReceiptHandleVersion = sqsReceiptHandleVersion1 // Byte sizes used when pre-sizing key buffers. The exact value is not // critical; it only avoids one append growth for typical queue/ID // lengths. @@ -231,15 +227,28 @@ func newReceiptToken() ([]byte, error) { return buf, nil } -// sqsReceiptHandleV1Size is the on-wire byte length of a v1 -// receipt handle: 1 byte version + 8 byte queue_gen + 16 byte -// message_id + 16 byte receipt_token = 41 bytes. -const sqsReceiptHandleV1Size = 1 + 8 + sqsMessageIDBytes + sqsReceiptTokenBytes - -// sqsReceiptHandleV2Size is the on-wire byte length of a v2 -// (partitioned) receipt handle: v1 + 4 bytes for the partition -// uint32 inserted between version and queue_gen = 45 bytes. -const sqsReceiptHandleV2Size = 1 + 4 + 8 + sqsMessageIDBytes + sqsReceiptTokenBytes +// Per-field byte offsets and total sizes for the v1 / v2 receipt +// handle layouts. Named constants so a future field-shape change +// touches the offsets in one place instead of scattering numeric +// arithmetic through encoders / decoders. Keep this aligned with +// the table in encodeReceiptHandle / encodeReceiptHandleV2 doc +// comments — the on-wire format is operator-visible. +const ( + // v1 layout: [version][queue_gen][message_id][receipt_token]. + sqsReceiptHandleV1VersionOffset = 0 + sqsReceiptHandleV1GenOffset = sqsReceiptHandleV1VersionOffset + 1 + sqsReceiptHandleV1IDOffset = sqsReceiptHandleV1GenOffset + 8 + sqsReceiptHandleV1TokenOffset = sqsReceiptHandleV1IDOffset + sqsMessageIDBytes + sqsReceiptHandleV1Size = sqsReceiptHandleV1TokenOffset + sqsReceiptTokenBytes + + // v2 layout: [version][partition][queue_gen][message_id][receipt_token]. + sqsReceiptHandleV2VersionOffset = 0 + sqsReceiptHandleV2PartitionOffset = sqsReceiptHandleV2VersionOffset + 1 + sqsReceiptHandleV2GenOffset = sqsReceiptHandleV2PartitionOffset + 4 + sqsReceiptHandleV2IDOffset = sqsReceiptHandleV2GenOffset + 8 + sqsReceiptHandleV2TokenOffset = sqsReceiptHandleV2IDOffset + sqsMessageIDBytes + sqsReceiptHandleV2Size = sqsReceiptHandleV2TokenOffset + sqsReceiptTokenBytes +) // encodeReceiptHandle packs (queue_gen, message_id, receipt_token) into // a single opaque v1 blob. Used by SendMessage on a NON-partitioned @@ -348,11 +357,14 @@ func decodeReceiptHandleV1(b []byte) (*decodedReceiptHandle, error) { return nil, errors.New("receipt handle length or version mismatch") } return &decodedReceiptHandle{ - Version: sqsReceiptHandleVersion1, - Partition: 0, - QueueGeneration: binary.BigEndian.Uint64(b[1:9]), - MessageIDHex: hex.EncodeToString(b[9 : 9+sqsMessageIDBytes]), - ReceiptToken: bytes.Clone(b[9+sqsMessageIDBytes:]), + Version: sqsReceiptHandleVersion1, + Partition: 0, + QueueGeneration: binary.BigEndian.Uint64( + b[sqsReceiptHandleV1GenOffset:sqsReceiptHandleV1IDOffset]), + MessageIDHex: hex.EncodeToString( + b[sqsReceiptHandleV1IDOffset:sqsReceiptHandleV1TokenOffset]), + ReceiptToken: bytes.Clone( + b[sqsReceiptHandleV1TokenOffset:sqsReceiptHandleV1Size]), }, nil } @@ -361,11 +373,15 @@ func decodeReceiptHandleV2(b []byte) (*decodedReceiptHandle, error) { return nil, errors.New("receipt handle length or version mismatch") } return &decodedReceiptHandle{ - Version: sqsReceiptHandleVersion2, - Partition: binary.BigEndian.Uint32(b[1:5]), - QueueGeneration: binary.BigEndian.Uint64(b[5:13]), - MessageIDHex: hex.EncodeToString(b[13 : 13+sqsMessageIDBytes]), - ReceiptToken: bytes.Clone(b[13+sqsMessageIDBytes:]), + Version: sqsReceiptHandleVersion2, + Partition: binary.BigEndian.Uint32( + b[sqsReceiptHandleV2PartitionOffset:sqsReceiptHandleV2GenOffset]), + QueueGeneration: binary.BigEndian.Uint64( + b[sqsReceiptHandleV2GenOffset:sqsReceiptHandleV2IDOffset]), + MessageIDHex: hex.EncodeToString( + b[sqsReceiptHandleV2IDOffset:sqsReceiptHandleV2TokenOffset]), + ReceiptToken: bytes.Clone( + b[sqsReceiptHandleV2TokenOffset:sqsReceiptHandleV2Size]), }, nil } diff --git a/adapter/sqs_receipt_handle_v2_test.go b/adapter/sqs_receipt_handle_v2_test.go index 13a029d1..6513a273 100644 --- a/adapter/sqs_receipt_handle_v2_test.go +++ b/adapter/sqs_receipt_handle_v2_test.go @@ -159,6 +159,22 @@ func TestDecodeReceiptHandle_RejectsLengthMismatch(t *testing.T) { return out }, }, + { + name: "v2 oversized", + make: func() []byte { + out := make([]byte, sqsReceiptHandleV2Size+1) + out[0] = sqsReceiptHandleVersion2 + return out + }, + }, + { + name: "v1 oversized", + make: func() []byte { + out := make([]byte, sqsReceiptHandleV1Size+1) + out[0] = sqsReceiptHandleVersion1 + return out + }, + }, { name: "empty", make: func() []byte { return nil }, @@ -229,9 +245,6 @@ func TestReceiptHandleVersionConstants_Distinct(t *testing.T) { "v1 and v2 version bytes must differ for the dispatch to work") require.Equal(t, byte(0x01), sqsReceiptHandleVersion1) require.Equal(t, byte(0x02), sqsReceiptHandleVersion2) - require.Equal(t, sqsReceiptHandleVersion1, sqsReceiptHandleVersion, - "the legacy alias must continue to point at v1 so existing "+ - "call sites stay on the v1 path") // On-wire size constants must equal what the encoders write — // pinning them keeps a future struct change from silently // changing the wire format. From c15d801e75c5cd181d9fc9513fb82d110dfa959a Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 1 May 2026 19:07:11 +0900 Subject: [PATCH 3/3] fix(sqs): gate v2 receipt handles at the public API boundary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeRabbit major on PR #724 round 2: the v2 codec is in the binary AND reachable on the public API. A client can re-encode a legitimately-issued v1 handle's (queue_gen, message_id, receipt_token) under the v2 layout, and DeleteMessage / ChangeMessageVisibility will accept it (downstream validation only checks queue_gen + receipt_token). The behaviour is technically correct (the v1 keyspace lookup still finds the message) but it leaks the new wire format before PR 5b lands — breaking the "no behavior change yet" guarantee. Fix decodeClientReceiptHandle wraps decodeReceiptHandle with the dormancy gate: any v2 handle on the public API surfaces as ReceiptHandleIsInvalid until PR 5b's queue-aware version replaces the gate (v1 required on non-partitioned queues, v2 required on partitioned ones). Changed call sites Three public-API decode points switch from decodeReceiptHandle to decodeClientReceiptHandle: - parseQueueAndReceipt (sqs_messages.go) — DeleteMessage and ChangeMessageVisibility entry point. - DeleteMessageBatch entry decode (sqs_messages_batch.go). - ChangeMessageVisibilityBatch entry decode (sqs_messages_batch.go). The low-level decodeReceiptHandle keeps working as a pure codec so the existing v1 + v2 round-trip tests keep passing. Tests - TestDecodeClientReceiptHandle_AcceptsV1: v1 handles flow through unchanged. - TestDecodeClientReceiptHandle_RejectsV2: a v2 handle on the public API surfaces as the dormancy-gate error. The low-level decoder still accepts it (the gate is at the API boundary, not in the codec). - TestDecodeClientReceiptHandle_PassesThroughDecodeErrors: a decode-step error (e.g. base64 garbage) is NOT masked by the dormancy-gate message — operators see the underlying error. Audit per the lessons-learned discipline The semantic change here is the rejection contract for v2 handles on the public API. grep -rn "decodeReceiptHandle" across adapter/ showed 3 production call sites (parseQueueAndReceipt, DeleteMessageBatch, ChangeMessageVisibilityBatch). All 3 updated. The v2 codec round-trip tests intentionally keep using decodeReceiptHandle directly so they exercise the codec, not the public API. Note on Gemini's "appendU32 is undefined" finding (round 2) False positive. appendU32 is defined in adapter/sqs_keys.go:444 and has been since PR #703 merged. Gemini did not search across files in the package; CI build is green and the round-1 v2 round-trip tests exercise the function. No code change needed. --- adapter/sqs_messages.go | 44 +++++++++++++++++- adapter/sqs_messages_batch.go | 4 +- adapter/sqs_receipt_handle_v2_test.go | 64 +++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 3 deletions(-) diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index 4094da7e..4f3ee6d0 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -368,6 +368,48 @@ func decodeReceiptHandleV1(b []byte) (*decodedReceiptHandle, error) { }, nil } +// decodeClientReceiptHandle is the public-API entry point for +// decoding a client-supplied receipt handle. It wraps +// decodeReceiptHandle with the dormancy gate that keeps the v2 +// codec inert until Phase 3.D PR 5b wires the partitioned-FIFO +// data plane. +// +// # Why the gate +// +// PR 5a adds the v2 codec to the binary but does NOT yet wire any +// production path that produces v2 handles — SendMessage on a +// partitioned queue is rejected by the §11 PR 2 dormancy gate +// (PartitionCount > 1 → InvalidAttributeValue). Without this +// helper, a client could craft a v2 handle (re-encoding a +// legitimately-issued v1 handle's queue_gen / message_id / +// receipt_token under the v2 layout) and DeleteMessage / +// ChangeMessageVisibility would accept it, since the downstream +// validation only checks queue_gen + receipt_token. The behaviour +// is technically correct (the v1 keyspace lookup still finds the +// message) but it leaks the new wire format before PR 5b lands — +// breaking the "no behavior change yet" guarantee of this PR +// (codex/coderabbit major on PR #724). +// +// PR 5b lifts this gate together with the rest of the data-plane +// fanout: it replaces the != v1 check with a queue-aware version +// (v1 required on non-partitioned queues, v2 required on +// partitioned ones), so neither version leaks into the wrong +// keyspace. Until then, any v2 handle on the public API surfaces +// as ReceiptHandleIsInvalid. +func decodeClientReceiptHandle(raw string) (*decodedReceiptHandle, error) { + handle, err := decodeReceiptHandle(raw) + if err != nil { + return nil, err + } + if handle.Version != sqsReceiptHandleVersion1 { + // v2 codec is added but dormant until PR 5b. Reject any + // non-v1 handle on the public API so the wire format + // does not leak. + return nil, errors.New("receipt handle version is not yet enabled on the public API") + } + return handle, nil +} + func decodeReceiptHandleV2(b []byte) (*decodedReceiptHandle, error) { if len(b) != sqsReceiptHandleV2Size { return nil, errors.New("receipt handle length or version mismatch") @@ -1494,7 +1536,7 @@ func (s *SQSServer) parseQueueAndReceipt(queueUrl, receiptHandle string) (string if err != nil { return "", nil, err } - handle, err := decodeReceiptHandle(receiptHandle) + handle, err := decodeClientReceiptHandle(receiptHandle) if err != nil { return "", nil, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle is not parseable") } diff --git a/adapter/sqs_messages_batch.go b/adapter/sqs_messages_batch.go index 68d47b7b..e837750c 100644 --- a/adapter/sqs_messages_batch.go +++ b/adapter/sqs_messages_batch.go @@ -467,7 +467,7 @@ func (s *SQSServer) deleteMessageBatch(w http.ResponseWriter, r *http.Request) { // retry-bound stale-is-success delete that single DeleteMessage // uses. Per-entry isolation matches AWS, where a malformed // handle in slot 3 must not poison slot 4. - handle, decodeErr := decodeReceiptHandle(entry.ReceiptHandle) + handle, decodeErr := decodeClientReceiptHandle(entry.ReceiptHandle) if decodeErr != nil { failed = append(failed, sqsBatchResultErrorEntry{ Id: entry.Id, @@ -567,7 +567,7 @@ func (s *SQSServer) applyChangeVisibilityBatchEntry(ctx context.Context, queueNa SenderFault: true, } } - handle, decodeErr := decodeReceiptHandle(entry.ReceiptHandle) + handle, decodeErr := decodeClientReceiptHandle(entry.ReceiptHandle) if decodeErr != nil { return false, sqsBatchResultErrorEntry{ Id: entry.Id, diff --git a/adapter/sqs_receipt_handle_v2_test.go b/adapter/sqs_receipt_handle_v2_test.go index 6513a273..aa02da4a 100644 --- a/adapter/sqs_receipt_handle_v2_test.go +++ b/adapter/sqs_receipt_handle_v2_test.go @@ -264,3 +264,67 @@ func TestDecodeReceiptHandle_RejectsBase64Garbage(t *testing.T) { require.Error(t, err, "non-base64 input must fail at the base64 decode step") } + +// TestDecodeClientReceiptHandle_AcceptsV1 pins the public-API +// wrapper's happy path — v1 handles flow through unchanged. +func TestDecodeClientReceiptHandle_AcceptsV1(t *testing.T) { + t.Parallel() + token := make([]byte, sqsReceiptTokenBytes) + h, err := encodeReceiptHandle(7, "deadbeefdeadbeefdeadbeefdeadbeef", token) + require.NoError(t, err) + back, err := decodeClientReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion1, back.Version) + require.Equal(t, uint64(7), back.QueueGeneration) +} + +// TestDecodeClientReceiptHandle_RejectsV2 pins the codex/coderabbit +// major fix on PR #724 round 1: the v2 codec is added but +// dormant. A client-supplied v2 handle MUST be rejected at the +// public API boundary so the wire format does not leak before +// PR 5b wires the partitioned-FIFO data plane. +// +// Without this gate, a malicious / curious client could re-encode +// a legitimately-issued v1 handle's (queue_gen, message_id, +// receipt_token) under the v2 layout, and DeleteMessage / +// ChangeMessageVisibility would accept it (since downstream +// validation only checks queue_gen + receipt_token). PR 5b +// replaces this gate with a queue-aware version (v1 required on +// non-partitioned queues, v2 required on partitioned ones), so +// the gate-removal lands together with the partitioned data plane. +func TestDecodeClientReceiptHandle_RejectsV2(t *testing.T) { + t.Parallel() + token := make([]byte, sqsReceiptTokenBytes) + h, err := encodeReceiptHandleV2(3, 7, "deadbeefdeadbeefdeadbeefdeadbeef", token) + require.NoError(t, err, + "v2 encoder must succeed even though the public API "+ + "wrapper rejects the result — the codec is dormant, "+ + "not absent") + + // Low-level decoder still accepts v2 (it's pure codec). + back, err := decodeReceiptHandle(h) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion2, back.Version, + "low-level decodeReceiptHandle must keep working — the "+ + "gate is at the public API boundary, not in the codec") + + // Public API wrapper rejects v2. + _, err = decodeClientReceiptHandle(h) + require.Error(t, err, + "v2 handle on the public API must fail until PR 5b lifts "+ + "the dormancy gate") + require.Contains(t, err.Error(), "not yet enabled") +} + +// TestDecodeClientReceiptHandle_PassesThroughDecodeErrors pins +// that decode-error propagation is unchanged — a malformed blob +// still surfaces the underlying base64 / length error rather than +// being masked by the dormancy-gate message. +func TestDecodeClientReceiptHandle_PassesThroughDecodeErrors(t *testing.T) { + t.Parallel() + _, err := decodeClientReceiptHandle("!!!" + strings.Repeat("?", 50)) + require.Error(t, err) + require.NotContains(t, err.Error(), "not yet enabled", + "a decode-step error must NOT be reported as the "+ + "dormancy-gate message") +}