Skip to content

Commit a564c60

Browse files
authored
feat(sqs): receipt-handle v2 codec for partitioned FIFO queues (Phase 3.D PR 5a) (#724)
## Summary Phase 3.D PR 5a — adds the v2 receipt-handle layout that **PR 5b's send/receive partitioned fanout will consume**. Pure scaffold: the v2 encoder is added but never called by any production path until PR 5b lifts the §11 PR 2 dormancy gate. Splitting the codec out as a small isolated PR keeps PR 5b's review focused on the gate-and-lift atomic sequence — codec correctness and gate-lift correctness are independent concerns. ## Wire format | | v1 (legacy, single partition) | v2 (partitioned) | |---|---|---| | `[ 0 ]` | byte version = `0x01` | byte version = `0x02` | | `[ 1..5 ]` | _(none)_ | `uint32 partition (BE)` ← NEW | | `[ ?..? ]` | `uint64 queue_gen (BE)` | `uint64 queue_gen (BE)` | | `[ ?..? ]` | 16 bytes message_id | 16 bytes message_id | | `[ ?..? ]` | 16 bytes receipt_token | 16 bytes receipt_token | | **Total** | **41 bytes** | **45 bytes** | `decodedReceiptHandle` gains `Version` (byte) and `Partition` (uint32) fields. `decodeReceiptHandle` dispatches on the version byte; v1 keeps `Partition=0` by definition. ## What does NOT change yet - `encodeReceiptHandleV2` is **dormant** until PR 5b's SendMessage fanout dispatch wires it (gated by `meta.PartitionCount > 1`). Production traffic continues to use the v1 encoder verbatim — no behaviour change. - DeleteMessage / ChangeMessageVisibility still consume `decodedReceiptHandle` without inspecting `Version` or `Partition`. PR 5b adds the cross-version rejection contract (a v1 handle against a partitioned queue → `ReceiptHandleIsInvalid`, and vice-versa). ## Test plan - [x] `TestEncodeReceiptHandleV2_RoundTrip` — every `(partition, queue_gen, message_id, token)` tuple round-trips. - [x] `TestEncodeReceiptHandleV1_StillReportsV1` — regression: v1 encoder still produces v1-decodable handles with `Partition=0`. - [x] `TestDecodeReceiptHandle_VersionDispatch` — v1 and v2 produce distinct on-wire bytes (different version + 4-byte size delta); decoder picks the right layout. - [x] `TestDecodeReceiptHandle_RejectsLengthMismatch` — 5 cases (v1 byte / v2 length, v2 byte / v1 length, v1 truncated, v2 truncated, empty) all fail with the same opaque error. - [x] `TestDecodeReceiptHandle_RejectsUnknownVersion` — `0x00`, `0x03`, `0x42`, `0xFF` all reject. - [x] `TestEncodeReceiptHandleV2_RejectsBadInputs` — short/long token, non-hex/short/empty id all error. - [x] `TestReceiptHandleVersionConstants_Distinct` — pins `v1 != v2`, `v1 == 0x01`, `v2 == 0x02`, legacy alias points at v1, on-wire size constants match. - [x] `TestDecodeReceiptHandle_RejectsBase64Garbage` — base64 error surfaces correctly. - [x] Existing `TestSQSServer_ReceiptHandleCodecRoundTrip` continues to pass (v1 path). - [x] `go test -race ./adapter/...` pass. - [x] `golangci-lint ./adapter/...` clean. ## Self-review (per CLAUDE.md) 1. **Data loss** — codec only; no FSM/Pebble path. No issue. 2. **Concurrency / distributed failures** — no shared state; pure encode/decode. No issue. 3. **Performance** — encode/decode are O(handle size) ≤ 45 bytes. Version-byte dispatch is one switch. No issue. 4. **Data consistency** — v1 and v2 are length-distinct, version-byte-distinct, decoder rejects wrong-length blobs. A v1 client parsing a v2 handle (or vice versa) fails closed at the length check — pinned by the 5-case rejection test. No issue. 5. **Test coverage** — 8 new tests covering the codec contract surface; existing v1 round-trip test continues to pass. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * SQS message receipt handles now support versioned encoding format, enabling compatibility with partitioned FIFO queues while maintaining backward compatibility. * **Tests** * Added validation tests for receipt handle encoding and decoding wire-format contract across format versions. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents ebb7cc6 + c15d801 commit a564c60

3 files changed

Lines changed: 515 additions & 20 deletions

File tree

adapter/sqs_messages.go

Lines changed: 183 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,23 @@ const (
4343
// has <1% tail-latency overhead, large enough that an empty queue
4444
// does not spin.
4545
sqsLongPollInterval = 200 * time.Millisecond
46-
// Version byte prefixed to encoded receipt handles. Bumped when the
47-
// on-wire handle format changes so old handles fail to decode loudly.
48-
sqsReceiptHandleVersion = byte(0x01)
46+
// sqsReceiptHandleVersion1 is the version byte for the legacy
47+
// (single-partition) receipt-handle layout — pre-PR-5 handles
48+
// and post-PR-5 handles for queues with PartitionCount <= 1.
49+
// Bumped when the on-wire handle format changes so old handles
50+
// fail to decode loudly.
51+
sqsReceiptHandleVersion1 = byte(0x01)
52+
// sqsReceiptHandleVersion2 is the version byte for partitioned
53+
// FIFO handles (PartitionCount > 1). v2 carries the partition
54+
// index so DeleteMessage / ChangeMessageVisibility can dispatch
55+
// to the right partition's keyspace without re-running the
56+
// partitionFor hash. The version byte discriminates: a v1 handle
57+
// against a partitioned queue is rejected, a v2 handle against
58+
// a non-partitioned queue is rejected — both with
59+
// ReceiptHandleIsInvalid. Enforcement of the cross-version
60+
// rejection contract is wired by Phase 3.D PR 5b together with
61+
// the gate-and-lift; this scaffold PR adds the codec only.
62+
sqsReceiptHandleVersion2 = byte(0x02)
4963
// Byte sizes used when pre-sizing key buffers. The exact value is not
5064
// critical; it only avoids one append growth for typical queue/ID
5165
// lengths.
@@ -213,16 +227,40 @@ func newReceiptToken() ([]byte, error) {
213227
return buf, nil
214228
}
215229

216-
// encodeReceiptHandle packs (queue_gen, message_id, receipt_token) into a
217-
// single opaque blob. Format:
230+
// Per-field byte offsets and total sizes for the v1 / v2 receipt
231+
// handle layouts. Named constants so a future field-shape change
232+
// touches the offsets in one place instead of scattering numeric
233+
// arithmetic through encoders / decoders. Keep this aligned with
234+
// the table in encodeReceiptHandle / encodeReceiptHandleV2 doc
235+
// comments — the on-wire format is operator-visible.
236+
const (
237+
// v1 layout: [version][queue_gen][message_id][receipt_token].
238+
sqsReceiptHandleV1VersionOffset = 0
239+
sqsReceiptHandleV1GenOffset = sqsReceiptHandleV1VersionOffset + 1
240+
sqsReceiptHandleV1IDOffset = sqsReceiptHandleV1GenOffset + 8
241+
sqsReceiptHandleV1TokenOffset = sqsReceiptHandleV1IDOffset + sqsMessageIDBytes
242+
sqsReceiptHandleV1Size = sqsReceiptHandleV1TokenOffset + sqsReceiptTokenBytes
243+
244+
// v2 layout: [version][partition][queue_gen][message_id][receipt_token].
245+
sqsReceiptHandleV2VersionOffset = 0
246+
sqsReceiptHandleV2PartitionOffset = sqsReceiptHandleV2VersionOffset + 1
247+
sqsReceiptHandleV2GenOffset = sqsReceiptHandleV2PartitionOffset + 4
248+
sqsReceiptHandleV2IDOffset = sqsReceiptHandleV2GenOffset + 8
249+
sqsReceiptHandleV2TokenOffset = sqsReceiptHandleV2IDOffset + sqsMessageIDBytes
250+
sqsReceiptHandleV2Size = sqsReceiptHandleV2TokenOffset + sqsReceiptTokenBytes
251+
)
252+
253+
// encodeReceiptHandle packs (queue_gen, message_id, receipt_token) into
254+
// a single opaque v1 blob. Used by SendMessage on a NON-partitioned
255+
// FIFO queue and on Standard queues. Format:
218256
//
219-
// [ 0 ] byte version = 0x01
257+
// [ 0 ] byte version = 0x01
220258
// [ 1..9 ] uint64 queue_gen (BE)
221259
// [ 9..25 ] 16 bytes message_id (raw bytes from hex decode)
222260
// [ 25..41 ] 16 bytes receipt_token
223261
//
224-
// The result is base64-urlsafe (no padding) so it passes through JSON and
225-
// HTTP query parameters untouched.
262+
// The result is base64-urlsafe (no padding) so it passes through
263+
// JSON and HTTP query parameters untouched.
226264
func encodeReceiptHandle(queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) {
227265
if len(receiptToken) != sqsReceiptTokenBytes {
228266
return "", errors.New("receipt token has wrong length")
@@ -231,35 +269,162 @@ func encodeReceiptHandle(queueGen uint64, messageIDHex string, receiptToken []by
231269
if err != nil || len(idBytes) != sqsMessageIDBytes {
232270
return "", errors.New("message id has wrong format")
233271
}
234-
buf := make([]byte, 0, 1+8+sqsMessageIDBytes+sqsReceiptTokenBytes)
235-
buf = append(buf, sqsReceiptHandleVersion)
272+
buf := make([]byte, 0, sqsReceiptHandleV1Size)
273+
buf = append(buf, sqsReceiptHandleVersion1)
236274
buf = appendU64(buf, queueGen)
237275
buf = append(buf, idBytes...)
238276
buf = append(buf, receiptToken...)
239277
return base64.RawURLEncoding.EncodeToString(buf), nil
240278
}
241279

280+
// encodeReceiptHandleV2 packs (partition, queue_gen, message_id,
281+
// receipt_token) for partitioned FIFO queues. Used by SendMessage
282+
// on a partitioned queue (PartitionCount > 1). Format:
283+
//
284+
// [ 0 ] byte version = 0x02
285+
// [ 1..5 ] uint32 partition (BE)
286+
// [ 5..13 ] uint64 queue_gen (BE)
287+
// [ 13..29 ] 16 bytes message_id
288+
// [ 29..45 ] 16 bytes receipt_token
289+
//
290+
// DeleteMessage / ChangeMessageVisibility use the partition field
291+
// to route the operation to the right partition's keyspace
292+
// without re-running partitionFor (the original MessageGroupId is
293+
// not in the handle).
294+
func encodeReceiptHandleV2(partition uint32, queueGen uint64, messageIDHex string, receiptToken []byte) (string, error) {
295+
if len(receiptToken) != sqsReceiptTokenBytes {
296+
return "", errors.New("receipt token has wrong length")
297+
}
298+
idBytes, err := hex.DecodeString(messageIDHex)
299+
if err != nil || len(idBytes) != sqsMessageIDBytes {
300+
return "", errors.New("message id has wrong format")
301+
}
302+
buf := make([]byte, 0, sqsReceiptHandleV2Size)
303+
buf = append(buf, sqsReceiptHandleVersion2)
304+
buf = appendU32(buf, partition)
305+
buf = appendU64(buf, queueGen)
306+
buf = append(buf, idBytes...)
307+
buf = append(buf, receiptToken...)
308+
return base64.RawURLEncoding.EncodeToString(buf), nil
309+
}
310+
311+
// decodedReceiptHandle is the parsed shape of a receipt handle.
312+
// Version reports which on-wire format the handle was in:
313+
// - 0x01 (sqsReceiptHandleVersion1): legacy single-partition
314+
// handle. Partition is 0 by definition.
315+
// - 0x02 (sqsReceiptHandleVersion2): partitioned-FIFO handle.
316+
// Partition is the partition index the message was sent to.
317+
//
318+
// Callers that route by partition (DeleteMessage,
319+
// ChangeMessageVisibility on a partitioned queue) MUST inspect
320+
// Version before consulting Partition — a v1 handle's zero
321+
// Partition is the LEGACY meaning ("single partition"), not
322+
// "partition 0 of N". A queue's PartitionCount at the moment the
323+
// caller acts must agree with the handle's version, or the
324+
// dispatch is rejected with ReceiptHandleIsInvalid.
242325
type decodedReceiptHandle struct {
326+
Version byte
327+
Partition uint32
243328
QueueGeneration uint64
244329
MessageIDHex string
245330
ReceiptToken []byte
246331
}
247332

333+
// decodeReceiptHandle inspects the version byte to dispatch to
334+
// the v1 or v2 layout. Length and version mismatches both surface
335+
// as the same opaque error so a malicious caller cannot use the
336+
// error message to probe the format.
248337
func decodeReceiptHandle(raw string) (*decodedReceiptHandle, error) {
249338
b, err := base64.RawURLEncoding.DecodeString(raw)
250339
if err != nil {
251340
return nil, errors.WithStack(err)
252341
}
253-
want := 1 + 8 + sqsMessageIDBytes + sqsReceiptTokenBytes
254-
if len(b) != want || b[0] != sqsReceiptHandleVersion {
342+
if len(b) == 0 {
255343
return nil, errors.New("receipt handle length or version mismatch")
256344
}
257-
out := &decodedReceiptHandle{
258-
QueueGeneration: binary.BigEndian.Uint64(b[1:9]),
259-
MessageIDHex: hex.EncodeToString(b[9 : 9+sqsMessageIDBytes]),
260-
ReceiptToken: bytes.Clone(b[9+sqsMessageIDBytes:]),
345+
switch b[0] {
346+
case sqsReceiptHandleVersion1:
347+
return decodeReceiptHandleV1(b)
348+
case sqsReceiptHandleVersion2:
349+
return decodeReceiptHandleV2(b)
350+
default:
351+
return nil, errors.New("receipt handle length or version mismatch")
261352
}
262-
return out, nil
353+
}
354+
355+
func decodeReceiptHandleV1(b []byte) (*decodedReceiptHandle, error) {
356+
if len(b) != sqsReceiptHandleV1Size {
357+
return nil, errors.New("receipt handle length or version mismatch")
358+
}
359+
return &decodedReceiptHandle{
360+
Version: sqsReceiptHandleVersion1,
361+
Partition: 0,
362+
QueueGeneration: binary.BigEndian.Uint64(
363+
b[sqsReceiptHandleV1GenOffset:sqsReceiptHandleV1IDOffset]),
364+
MessageIDHex: hex.EncodeToString(
365+
b[sqsReceiptHandleV1IDOffset:sqsReceiptHandleV1TokenOffset]),
366+
ReceiptToken: bytes.Clone(
367+
b[sqsReceiptHandleV1TokenOffset:sqsReceiptHandleV1Size]),
368+
}, nil
369+
}
370+
371+
// decodeClientReceiptHandle is the public-API entry point for
372+
// decoding a client-supplied receipt handle. It wraps
373+
// decodeReceiptHandle with the dormancy gate that keeps the v2
374+
// codec inert until Phase 3.D PR 5b wires the partitioned-FIFO
375+
// data plane.
376+
//
377+
// # Why the gate
378+
//
379+
// PR 5a adds the v2 codec to the binary but does NOT yet wire any
380+
// production path that produces v2 handles — SendMessage on a
381+
// partitioned queue is rejected by the §11 PR 2 dormancy gate
382+
// (PartitionCount > 1 → InvalidAttributeValue). Without this
383+
// helper, a client could craft a v2 handle (re-encoding a
384+
// legitimately-issued v1 handle's queue_gen / message_id /
385+
// receipt_token under the v2 layout) and DeleteMessage /
386+
// ChangeMessageVisibility would accept it, since the downstream
387+
// validation only checks queue_gen + receipt_token. The behaviour
388+
// is technically correct (the v1 keyspace lookup still finds the
389+
// message) but it leaks the new wire format before PR 5b lands —
390+
// breaking the "no behavior change yet" guarantee of this PR
391+
// (codex/coderabbit major on PR #724).
392+
//
393+
// PR 5b lifts this gate together with the rest of the data-plane
394+
// fanout: it replaces the != v1 check with a queue-aware version
395+
// (v1 required on non-partitioned queues, v2 required on
396+
// partitioned ones), so neither version leaks into the wrong
397+
// keyspace. Until then, any v2 handle on the public API surfaces
398+
// as ReceiptHandleIsInvalid.
399+
func decodeClientReceiptHandle(raw string) (*decodedReceiptHandle, error) {
400+
handle, err := decodeReceiptHandle(raw)
401+
if err != nil {
402+
return nil, err
403+
}
404+
if handle.Version != sqsReceiptHandleVersion1 {
405+
// v2 codec is added but dormant until PR 5b. Reject any
406+
// non-v1 handle on the public API so the wire format
407+
// does not leak.
408+
return nil, errors.New("receipt handle version is not yet enabled on the public API")
409+
}
410+
return handle, nil
411+
}
412+
413+
func decodeReceiptHandleV2(b []byte) (*decodedReceiptHandle, error) {
414+
if len(b) != sqsReceiptHandleV2Size {
415+
return nil, errors.New("receipt handle length or version mismatch")
416+
}
417+
return &decodedReceiptHandle{
418+
Version: sqsReceiptHandleVersion2,
419+
Partition: binary.BigEndian.Uint32(
420+
b[sqsReceiptHandleV2PartitionOffset:sqsReceiptHandleV2GenOffset]),
421+
QueueGeneration: binary.BigEndian.Uint64(
422+
b[sqsReceiptHandleV2GenOffset:sqsReceiptHandleV2IDOffset]),
423+
MessageIDHex: hex.EncodeToString(
424+
b[sqsReceiptHandleV2IDOffset:sqsReceiptHandleV2TokenOffset]),
425+
ReceiptToken: bytes.Clone(
426+
b[sqsReceiptHandleV2TokenOffset:sqsReceiptHandleV2Size]),
427+
}, nil
263428
}
264429

265430
// ------------------------ input decoding ------------------------
@@ -1371,7 +1536,7 @@ func (s *SQSServer) parseQueueAndReceipt(queueUrl, receiptHandle string) (string
13711536
if err != nil {
13721537
return "", nil, err
13731538
}
1374-
handle, err := decodeReceiptHandle(receiptHandle)
1539+
handle, err := decodeClientReceiptHandle(receiptHandle)
13751540
if err != nil {
13761541
return "", nil, newSQSAPIError(http.StatusBadRequest, sqsErrReceiptHandleInvalid, "receipt handle is not parseable")
13771542
}

adapter/sqs_messages_batch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ func (s *SQSServer) deleteMessageBatch(w http.ResponseWriter, r *http.Request) {
467467
// retry-bound stale-is-success delete that single DeleteMessage
468468
// uses. Per-entry isolation matches AWS, where a malformed
469469
// handle in slot 3 must not poison slot 4.
470-
handle, decodeErr := decodeReceiptHandle(entry.ReceiptHandle)
470+
handle, decodeErr := decodeClientReceiptHandle(entry.ReceiptHandle)
471471
if decodeErr != nil {
472472
failed = append(failed, sqsBatchResultErrorEntry{
473473
Id: entry.Id,
@@ -567,7 +567,7 @@ func (s *SQSServer) applyChangeVisibilityBatchEntry(ctx context.Context, queueNa
567567
SenderFault: true,
568568
}
569569
}
570-
handle, decodeErr := decodeReceiptHandle(entry.ReceiptHandle)
570+
handle, decodeErr := decodeClientReceiptHandle(entry.ReceiptHandle)
571571
if decodeErr != nil {
572572
return false, sqsBatchResultErrorEntry{
573573
Id: entry.Id,

0 commit comments

Comments
 (0)