Skip to content

Commit 0206a1c

Browse files
committed
feat(sqs): receipt-handle v2 codec for partitioned FIFO queues (Phase 3.D PR 5a)
Adds the v2 receipt-handle layout that PR 5b's send / receive fanout will consume. Pure scaffold: the v2 encoder is added but not yet called by any production path (PartitionCount > 1 is still rejected by the §11 PR 2 dormancy gate, so SendMessage never reaches a partitioned queue, so encodeReceiptHandleV2 stays unreachable until PR 5b lifts the gate). Splitting the codec out as a small isolated PR keeps PR 5b's review focused on the gate-and-lift atomic sequence — codec correctness and the gate-lift correctness are independent concerns and a single mega-PR would conflate them. Wire format v1 (legacy, single partition): [ 0 ] byte version = 0x01 [ 1..9 ] uint64 queue_gen (BE) [ 9..25 ] 16 bytes message_id [ 25..41 ] 16 bytes receipt_token Total: 41 bytes v2 (partitioned, PartitionCount > 1): [ 0 ] byte version = 0x02 [ 1..5 ] uint32 partition (BE) ← NEW [ 5..13 ] uint64 queue_gen (BE) [ 13..29 ] 16 bytes message_id [ 29..45 ] 16 bytes receipt_token Total: 45 bytes decodedReceiptHandle gains Version (byte) and Partition (uint32) fields. decodeReceiptHandle dispatches by the version byte; v1 keeps Partition=0 by definition. What does NOT change yet - encodeReceiptHandleV2 is dormant until PR 5b's SendMessage partitioned-fanout dispatch wires it (gated by meta.PartitionCount > 1). Production traffic continues to use the v1 encoder verbatim — no behaviour change. - DeleteMessage / ChangeMessageVisibility still consume decodedReceiptHandle without inspecting Version or Partition. PR 5b adds the cross-version rejection contract (a v1 handle against a partitioned queue → ReceiptHandleIsInvalid, and vice-versa). Tests - TestEncodeReceiptHandleV2_RoundTrip — every (partition, queue_gen, message_id, token) tuple round-trips with Version=v2 and the encoder's partition. - TestEncodeReceiptHandleV1_StillReportsV1 — regression: v1 encoder still produces v1-decodable handles with Partition=0. - TestDecodeReceiptHandle_VersionDispatch — v1 and v2 produce distinct on-wire bytes (different version + 4-byte size delta); decoder picks the right layout for each. - TestDecodeReceiptHandle_RejectsLengthMismatch — 5 cases (v1 byte / v2 length, v2 byte / v1 length, v1 truncated, v2 truncated, empty) all fail with the same opaque error. - TestDecodeReceiptHandle_RejectsUnknownVersion — 0x00, 0x03, 0x42, 0xFF all reject. - TestEncodeReceiptHandleV2_RejectsBadInputs — short / long token, non-hex / short / empty id all surface as encoder errors. - TestReceiptHandleVersionConstants_Distinct — pins the invariants v1 != v2, v1 == 0x01, v2 == 0x02, legacy alias points at v1, on-wire size constants match what encoders write. - TestDecodeReceiptHandle_RejectsBase64Garbage — base64 errors surface at the right step. - Existing TestSQSServer_ReceiptHandleCodecRoundTrip continues to pass unchanged (v1 path). Self-review (per CLAUDE.md) 1. Data loss — codec only; no FSM/Pebble path. No issue. 2. Concurrency — no shared state; pure encode/decode functions. No issue. 3. Performance — encode/decode are O(handle size) ≤ 45 bytes. The version-byte dispatch is one switch. No issue. 4. Data consistency — v1 and v2 are length-distinct, version-byte- distinct, and decoder rejects wrong-length blobs. A v1 client parsing a v2 handle (or vice versa) fails closed at the length check — pinned by the 5-case rejection test. No issue. 5. Test coverage — 8 new tests across the codec contract surface; existing v1 round-trip test continues to pass.
1 parent e9f33eb commit 0206a1c

2 files changed

Lines changed: 374 additions & 14 deletions

File tree

adapter/sqs_messages.go

Lines changed: 121 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,27 @@ const (
4343
// has <1% tail-latency overhead, large enough that an empty queue
4444
// does not spin.
4545
sqsLongPollInterval = 200 * time.Millisecond
46-
// Version byte prefixed to encoded receipt handles. Bumped when the
47-
// on-wire handle format changes so old handles fail to decode loudly.
48-
sqsReceiptHandleVersion = byte(0x01)
46+
// sqsReceiptHandleVersion1 is the version byte for the legacy
47+
// (single-partition) receipt-handle layout — pre-PR-5 handles
48+
// and post-PR-5 handles for queues with PartitionCount <= 1.
49+
// Bumped when the on-wire handle format changes so old handles
50+
// fail to decode loudly.
51+
sqsReceiptHandleVersion1 = byte(0x01)
52+
// sqsReceiptHandleVersion2 is the version byte for partitioned
53+
// FIFO handles (PartitionCount > 1). v2 carries the partition
54+
// index so DeleteMessage / ChangeMessageVisibility can dispatch
55+
// to the right partition's keyspace without re-running the
56+
// partitionFor hash. The version byte discriminates: a v1 handle
57+
// against a partitioned queue is rejected, a v2 handle against
58+
// a non-partitioned queue is rejected — both with
59+
// ReceiptHandleIsInvalid (codex P1 round-2 on PR #715 documents
60+
// the cross-version rejection contract).
61+
sqsReceiptHandleVersion2 = byte(0x02)
62+
// sqsReceiptHandleVersion is the legacy const name preserved as
63+
// an alias of v1 so existing call sites in this file keep
64+
// compiling. New call sites SHOULD use sqsReceiptHandleVersion1
65+
// (or sqsReceiptHandleVersion2) explicitly.
66+
sqsReceiptHandleVersion = sqsReceiptHandleVersion1
4967
// Byte sizes used when pre-sizing key buffers. The exact value is not
5068
// critical; it only avoids one append growth for typical queue/ID
5169
// lengths.
@@ -213,16 +231,27 @@ func newReceiptToken() ([]byte, error) {
213231
return buf, nil
214232
}
215233

216-
// encodeReceiptHandle packs (queue_gen, message_id, receipt_token) into a
217-
// single opaque blob. Format:
234+
// sqsReceiptHandleV1Size is the on-wire byte length of a v1
235+
// receipt handle: 1 byte version + 8 byte queue_gen + 16 byte
236+
// message_id + 16 byte receipt_token = 41 bytes.
237+
const sqsReceiptHandleV1Size = 1 + 8 + sqsMessageIDBytes + sqsReceiptTokenBytes
238+
239+
// sqsReceiptHandleV2Size is the on-wire byte length of a v2
240+
// (partitioned) receipt handle: v1 + 4 bytes for the partition
241+
// uint32 inserted between version and queue_gen = 45 bytes.
242+
const sqsReceiptHandleV2Size = 1 + 4 + 8 + sqsMessageIDBytes + sqsReceiptTokenBytes
243+
244+
// encodeReceiptHandle packs (queue_gen, message_id, receipt_token) into
245+
// a single opaque v1 blob. Used by SendMessage on a NON-partitioned
246+
// FIFO queue and on Standard queues. Format:
218247
//
219-
// [ 0 ] byte version = 0x01
248+
// [ 0 ] byte version = 0x01
220249
// [ 1..9 ] uint64 queue_gen (BE)
221250
// [ 9..25 ] 16 bytes message_id (raw bytes from hex decode)
222251
// [ 25..41 ] 16 bytes receipt_token
223252
//
224-
// The result is base64-urlsafe (no padding) so it passes through JSON and
225-
// HTTP query parameters untouched.
253+
// The result is base64-urlsafe (no padding) so it passes through
254+
// JSON and HTTP query parameters untouched.
226255
func encodeReceiptHandle(queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) {
227256
if len(receiptToken) != sqsReceiptTokenBytes {
228257
return "", errors.New("receipt token has wrong length")
@@ -231,35 +260,113 @@ func encodeReceiptHandle(queueGen uint64, messageIDHex string, receiptToken []by
231260
if err != nil || len(idBytes) != sqsMessageIDBytes {
232261
return "", errors.New("message id has wrong format")
233262
}
234-
buf := make([]byte, 0, 1+8+sqsMessageIDBytes+sqsReceiptTokenBytes)
235-
buf = append(buf, sqsReceiptHandleVersion)
263+
buf := make([]byte, 0, sqsReceiptHandleV1Size)
264+
buf = append(buf, sqsReceiptHandleVersion1)
236265
buf = appendU64(buf, queueGen)
237266
buf = append(buf, idBytes...)
238267
buf = append(buf, receiptToken...)
239268
return base64.RawURLEncoding.EncodeToString(buf), nil
240269
}
241270

271+
// encodeReceiptHandleV2 packs (partition, queue_gen, message_id,
272+
// receipt_token) for partitioned FIFO queues. Used by SendMessage
273+
// on a partitioned queue (PartitionCount > 1). Format:
274+
//
275+
// [ 0 ] byte version = 0x02
276+
// [ 1..5 ] uint32 partition (BE)
277+
// [ 5..13 ] uint64 queue_gen (BE)
278+
// [ 13..29 ] 16 bytes message_id
279+
// [ 29..45 ] 16 bytes receipt_token
280+
//
281+
// DeleteMessage / ChangeMessageVisibility use the partition field
282+
// to route the operation to the right partition's keyspace
283+
// without re-running partitionFor (the original MessageGroupId is
284+
// not in the handle).
285+
func encodeReceiptHandleV2(partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) {
286+
if len(receiptToken) != sqsReceiptTokenBytes {
287+
return "", errors.New("receipt token has wrong length")
288+
}
289+
idBytes, err := hex.DecodeString(messageIDHex)
290+
if err != nil || len(idBytes) != sqsMessageIDBytes {
291+
return "", errors.New("message id has wrong format")
292+
}
293+
buf := make([]byte, 0, sqsReceiptHandleV2Size)
294+
buf = append(buf, sqsReceiptHandleVersion2)
295+
buf = appendU32(buf, partition)
296+
buf = appendU64(buf, queueGen)
297+
buf = append(buf, idBytes...)
298+
buf = append(buf, receiptToken...)
299+
return base64.RawURLEncoding.EncodeToString(buf), nil
300+
}
301+
302+
// decodedReceiptHandle is the parsed shape of a receipt handle.
303+
// Version reports which on-wire format the handle was in:
304+
// - 0x01 (sqsReceiptHandleVersion1): legacy single-partition
305+
// handle. Partition is 0 by definition.
306+
// - 0x02 (sqsReceiptHandleVersion2): partitioned-FIFO handle.
307+
// Partition is the partition index the message was sent to.
308+
//
309+
// Callers that route by partition (DeleteMessage,
310+
// ChangeMessageVisibility on a partitioned queue) MUST inspect
311+
// Version before consulting Partition — a v1 handle's zero
312+
// Partition is the LEGACY meaning ("single partition"), not
313+
// "partition 0 of N". A queue's PartitionCount at the moment the
314+
// caller acts must agree with the handle's version, or the
315+
// dispatch is rejected with ReceiptHandleIsInvalid.
242316
type decodedReceiptHandle struct {
317+
Version byte
318+
Partition uint32
243319
QueueGeneration uint64
244320
MessageIDHex string
245321
ReceiptToken []byte
246322
}
247323

324+
// decodeReceiptHandle inspects the version byte to dispatch to
325+
// the v1 or v2 layout. Length and version mismatches both surface
326+
// as the same opaque error so a malicious caller cannot use the
327+
// error message to probe the format.
248328
func decodeReceiptHandle(raw string) (*decodedReceiptHandle, error) {
249329
b, err := base64.RawURLEncoding.DecodeString(raw)
250330
if err != nil {
251331
return nil, errors.WithStack(err)
252332
}
253-
want := 1 + 8 + sqsMessageIDBytes + sqsReceiptTokenBytes
254-
if len(b) != want || b[0] != sqsReceiptHandleVersion {
333+
if len(b) == 0 {
334+
return nil, errors.New("receipt handle length or version mismatch")
335+
}
336+
switch b[0] {
337+
case sqsReceiptHandleVersion1:
338+
return decodeReceiptHandleV1(b)
339+
case sqsReceiptHandleVersion2:
340+
return decodeReceiptHandleV2(b)
341+
default:
255342
return nil, errors.New("receipt handle length or version mismatch")
256343
}
257-
out := &decodedReceiptHandle{
344+
}
345+
346+
func decodeReceiptHandleV1(b []byte) (*decodedReceiptHandle, error) {
347+
if len(b) != sqsReceiptHandleV1Size {
348+
return nil, errors.New("receipt handle length or version mismatch")
349+
}
350+
return &decodedReceiptHandle{
351+
Version: sqsReceiptHandleVersion1,
352+
Partition: 0,
258353
QueueGeneration: binary.BigEndian.Uint64(b[1:9]),
259354
MessageIDHex: hex.EncodeToString(b[9 : 9+sqsMessageIDBytes]),
260355
ReceiptToken: bytes.Clone(b[9+sqsMessageIDBytes:]),
356+
}, nil
357+
}
358+
359+
func decodeReceiptHandleV2(b []byte) (*decodedReceiptHandle, error) {
360+
if len(b) != sqsReceiptHandleV2Size {
361+
return nil, errors.New("receipt handle length or version mismatch")
261362
}
262-
return out, nil
363+
return &decodedReceiptHandle{
364+
Version: sqsReceiptHandleVersion2,
365+
Partition: binary.BigEndian.Uint32(b[1:5]),
366+
QueueGeneration: binary.BigEndian.Uint64(b[5:13]),
367+
MessageIDHex: hex.EncodeToString(b[13 : 13+sqsMessageIDBytes]),
368+
ReceiptToken: bytes.Clone(b[13+sqsMessageIDBytes:]),
369+
}, nil
263370
}
264371

265372
// ------------------------ input decoding ------------------------

0 commit comments

Comments
 (0)