Skip to content

Commit c153659

Browse files
authored
feat(sqs): HT-FIFO partitioned-keyspace constructors (Phase 3.D PR 3) (#703)
## Summary Phase 3.D PR 3 of the split-queue FIFO rollout (per §11 of `docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md`). Adds the partitioned-keyspace constructors without touching the legacy keyspace or any existing call site. The §11 PR 2 dormancy gate still rejects `PartitionCount > 1` at `CreateQueue`, so these helpers are dead code in production until PR 5 atomically lifts the gate and wires the data-plane fanout. ## Keyspace shape ``` legacy: !sqs|msg|<family>|<queue>|<gen>|<rest> partitioned: !sqs|msg|<family>|p|<queue>|<partition>|<gen>|<rest> ``` The `p|` discriminator after the family prefix is safe by construction: `validateQueueName` forbids `|` in queue names, and the queue segment is base32-raw-URL encoded (cannot start with the literal byte `'p'` followed by `'|'`). The partition is a 4-byte big-endian `uint32` so a prefix scan `!sqs|msg|<family>|p|<queue>|<partition>|` picks exactly one partition's keys. ## What's added - Constants: `sqsPartitionedDiscriminator`, `SqsPartitionedMsg{Data,Vis,Dedup,Group,ByAge}Prefix`. - Constructors: `sqsPartitionedMsg{Data,Vis,Dedup,Group,ByAge}Key`, `sqsPartitionedMsgVisPrefixForQueue`, `sqsPartitionedMsgByAgePrefixForQueueAllPartitions`. - Reaper helper: `sqsMsgByAgePrefixesForQueue` returns the `{legacy, partitioned}` pair so the reaper enumerates both keyspaces during cleanup. - Parser: `parseSqsPartitionedMsgByAgeKey` with the matching record type `sqsPartitionedMsgByAgeRecord` (legacy `parseSqsMsgByAgeKey` is unchanged). - Helper: `appendU32` for the 4-byte partition segment. ## What's NOT added - No existing call site changes — every current `sqsMsgDataKey` / `sqsMsgVisKey` / etc. invocation keeps using the legacy constructor, so existing queues stay byte-identical on disk. - No dispatch wrapper. PR 5 will add the if/else dispatch at each call site once `partitionFor` is wired into the send/receive paths (per the §3.1 design choice of "dispatch at the call site, no variadic, no silent argument loss"). ## Test plan `adapter/sqs_keys_test.go` covers: - [x] Byte-distinct legacy vs partitioned across all 5 families (data, vis, dedup, group, byage). - [x] Per-partition isolation: a partition-k vis prefix never matches a partition-(k+1) key. - [x] Determinism: same inputs yield byte-identical keys across calls. - [x] Round-trip parse for partitioned byage keys at corner partition values (0, 7, 31). - [x] Mutual rejection: legacy parser rejects partitioned keys and vice versa (the dual-parse contract). - [x] Reaper enumeration helper returns both prefixes in `{legacy, partitioned}` order. - [x] Discriminator constants all end with `p|`. - [x] Different queue names produce different keys at the same `(partition, gen)`. - [x] `go test -race ./adapter/...` clean. - [x] `golangci-lint ./adapter/...` clean. ## Self-review (per CLAUDE.md "Self-review of code changes") 1. **Data loss** — No FSM, Pebble, or retention path is touched. Legacy constructors are unchanged on every byte and call site, so existing queues are byte-identical on disk. The new partitioned helpers are dead code until PR 5 lifts the dormancy gate, so no production data lands under the partitioned prefix in this PR. No issue. 2. **Concurrency / distributed failures** — No new goroutine, lock, or Raft path. The constructors are pure byte-builders. The reaper helper returns a fresh slice per call. No issue. 3. **Performance** — New constructors are one allocation each, sized like the legacy ones. The reaper enumeration adds at most one extra `Range` iteration per queue (over the partitioned prefix, which is empty for legacy queues). Hot path (Send/Receive) is untouched. No issue. 4. **Data consistency** — The `p|` discriminator + base32 queue segment combination is verified non-overlapping. Legacy and partitioned keyspaces cannot collide. Mutual-rejection tests assert each parser refuses the other's keyspace. No issue. 5. **Test coverage** — 7 new test functions covering byte layout, per-partition isolation, determinism, round-trip parse, mutual rejection, reaper enumeration, and discriminator-constant invariants. PR 7 lands the HT-FIFO Jepsen suite per §11.
2 parents 579d193 + be7784a commit c153659

3 files changed

Lines changed: 577 additions & 12 deletions

File tree

adapter/sqs_keys.go

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,52 @@ const (
4949
SqsQueueTombstonePrefix = "!sqs|queue|tombstone|"
5050
)
5151

52+
// HT-FIFO partitioned-keyspace discriminator. Per the §3.1 design in
53+
// docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md, partitioned
54+
// FIFO queues live in a separate keyspace so the legacy single-
55+
// partition layout can stay byte-identical on disk:
56+
//
57+
// legacy: !sqs|msg|<family>|<queue>|<gen>|<rest>
58+
// partitioned: !sqs|msg|<family>|p|<queue>|<partition>|<gen>|<rest>
59+
//
60+
// The literal "p|" segment is the discriminator. validateQueueName
61+
// rejects "|" in queue names, so a legacy "!sqs|msg|data|<queue>|..."
62+
// can never collide with a partitioned "!sqs|msg|data|p|<queue>|..."
63+
// — the queue-name segment is base64-raw-URL-encoded (see
64+
// encodeSQSSegment) and cannot start with the literal ASCII byte 'p'
65+
// followed by '|'.
66+
//
67+
// Each partitioned constructor terminates the variable-length
68+
// queue-name segment with a '|' before the fixed-width partition
69+
// uint32. Without that delimiter, a prefix scan for queue "q" would
70+
// also match queue "q1" because base64("q") is a strict byte prefix
71+
// of base64("q1"). The discriminator inserts the '|' into the prefix
72+
// itself; the per-constructor terminator inserts it after the queue.
73+
const sqsPartitionedDiscriminator = "p|"
74+
75+
// sqsPartitionedQueueTerminator is appended after the encoded queue
76+
// name in every partitioned key. It mirrors the role the fixed-width
77+
// generation suffix plays in tombstone keys: a hard end-of-segment
78+
// marker that prevents queue-name prefix collisions during scans.
79+
// '|' is safe by construction — validateQueueName rejects raw '|',
80+
// and base64.RawURLEncoding never emits '|' (it uses A-Z, a-z, 0-9,
81+
// '-', '_').
82+
const sqsPartitionedQueueTerminator = '|'
83+
84+
// SqsPartitionedMsg*Prefix mirrors each legacy SqsMsg*Prefix with the
85+
// partitioned-keyspace discriminator inserted. Defined as full string
86+
// constants (rather than runtime concatenation in each constructor)
87+
// so the byte-layout invariant is asserted by the type system: a
88+
// future rename of the discriminator must touch the constants here,
89+
// not 6+ scattered string concatenations.
90+
const (
91+
SqsPartitionedMsgDataPrefix = "!sqs|msg|data|" + sqsPartitionedDiscriminator
92+
SqsPartitionedMsgVisPrefix = "!sqs|msg|vis|" + sqsPartitionedDiscriminator
93+
SqsPartitionedMsgDedupPrefix = "!sqs|msg|dedup|" + sqsPartitionedDiscriminator
94+
SqsPartitionedMsgGroupPrefix = "!sqs|msg|group|" + sqsPartitionedDiscriminator
95+
SqsPartitionedMsgByAgePrefix = "!sqs|msg|byage|" + sqsPartitionedDiscriminator
96+
)
97+
5298
func sqsQueueMetaKey(queueName string) []byte {
5399
return []byte(SqsQueueMetaPrefix + encodeSQSSegment(queueName))
54100
}
@@ -209,3 +255,194 @@ func queueNameFromMetaKey(key []byte) (string, bool) {
209255
}
210256
return name, true
211257
}
258+
259+
// ---------------------- HT-FIFO partitioned keyspace ----------------------
260+
//
261+
// The constructors below mirror the legacy sqsMsg*Key family with a
262+
// partition uint32 inserted between the queue segment and the
263+
// generation. The legacy keyspace is unchanged on disk, so existing
264+
// queues and Standard queues stay byte-identical — these helpers are
265+
// reachable only when meta.PartitionCount > 1, and the §11 PR 2
266+
// dormancy gate currently rejects that at CreateQueue. The data plane
267+
// dispatch lands together with the gate-lift in PR 5.
268+
//
269+
// Each helper appends the partition as a fixed-width big-endian
270+
// uint32 so prefix scans `!sqs|msg|<family>|p|<queue>|<partition>|`
271+
// can pick exactly one partition's keys without touching its
272+
// neighbours.
273+
274+
// sqsPartitionedMsgDataKey builds the data-record key for a
275+
// partitioned FIFO queue.
276+
func sqsPartitionedMsgDataKey(queueName string, partition uint32, gen uint64, messageID string) []byte {
277+
buf := make([]byte, 0, len(SqsPartitionedMsgDataPrefix)+sqsKeyCapLarge)
278+
buf = append(buf, SqsPartitionedMsgDataPrefix...)
279+
buf = append(buf, encodeSQSSegment(queueName)...)
280+
buf = append(buf, sqsPartitionedQueueTerminator)
281+
buf = appendU32(buf, partition)
282+
buf = appendU64(buf, gen)
283+
buf = append(buf, encodeSQSSegment(messageID)...)
284+
return buf
285+
}
286+
287+
// sqsPartitionedMsgVisKey builds the visibility-index key for a
288+
// partitioned FIFO queue.
289+
func sqsPartitionedMsgVisKey(queueName string, partition uint32, gen uint64, visibleAtMillis int64, messageID string) []byte {
290+
buf := make([]byte, 0, len(SqsPartitionedMsgVisPrefix)+sqsKeyCapLarge)
291+
buf = append(buf, SqsPartitionedMsgVisPrefix...)
292+
buf = append(buf, encodeSQSSegment(queueName)...)
293+
buf = append(buf, sqsPartitionedQueueTerminator)
294+
buf = appendU32(buf, partition)
295+
buf = appendU64(buf, gen)
296+
buf = appendU64(buf, uint64MaxZero(visibleAtMillis))
297+
buf = append(buf, encodeSQSSegment(messageID)...)
298+
return buf
299+
}
300+
301+
// sqsPartitionedMsgVisPrefixForQueue returns the prefix of every
302+
// vis-index key for a single (queue, partition, gen) triple. The
303+
// partition fan-out scans this prefix on each partition independently.
304+
func sqsPartitionedMsgVisPrefixForQueue(queueName string, partition uint32, gen uint64) []byte {
305+
buf := make([]byte, 0, len(SqsPartitionedMsgVisPrefix)+sqsKeyCapSmall)
306+
buf = append(buf, SqsPartitionedMsgVisPrefix...)
307+
buf = append(buf, encodeSQSSegment(queueName)...)
308+
buf = append(buf, sqsPartitionedQueueTerminator)
309+
buf = appendU32(buf, partition)
310+
buf = appendU64(buf, gen)
311+
return buf
312+
}
313+
314+
// sqsPartitionedMsgDedupKey builds the FIFO dedup key for a
315+
// partitioned queue. The dedup window is per-partition by design
316+
// (DeduplicationScope=messageGroup with PartitionCount>1) — the
317+
// validator in adapter/sqs_partitioning.go rejects the queue-scoped
318+
// scope on partitioned queues, so this key shape is always reachable
319+
// from the same partition that ran the dedup check.
320+
func sqsPartitionedMsgDedupKey(queueName string, partition uint32, gen uint64, dedupID string) []byte {
321+
buf := make([]byte, 0, len(SqsPartitionedMsgDedupPrefix)+sqsKeyCapLarge)
322+
buf = append(buf, SqsPartitionedMsgDedupPrefix...)
323+
buf = append(buf, encodeSQSSegment(queueName)...)
324+
buf = append(buf, sqsPartitionedQueueTerminator)
325+
buf = appendU32(buf, partition)
326+
buf = appendU64(buf, gen)
327+
buf = append(buf, encodeSQSSegment(dedupID)...)
328+
return buf
329+
}
330+
331+
// sqsPartitionedMsgGroupKey builds the FIFO group-lock key for a
332+
// partitioned queue. partitionFor maps a MessageGroupId to one
333+
// partition, so the group lock for any given group lives on exactly
334+
// one partition — there is no cross-partition group-lock invariant
335+
// to maintain.
336+
func sqsPartitionedMsgGroupKey(queueName string, partition uint32, gen uint64, groupID string) []byte {
337+
buf := make([]byte, 0, len(SqsPartitionedMsgGroupPrefix)+sqsKeyCapLarge)
338+
buf = append(buf, SqsPartitionedMsgGroupPrefix...)
339+
buf = append(buf, encodeSQSSegment(queueName)...)
340+
buf = append(buf, sqsPartitionedQueueTerminator)
341+
buf = appendU32(buf, partition)
342+
buf = appendU64(buf, gen)
343+
buf = append(buf, encodeSQSSegment(groupID)...)
344+
return buf
345+
}
346+
347+
// sqsPartitionedMsgByAgeKey builds the send-age index key for a
348+
// partitioned queue. The reaper enumerates both the legacy and
349+
// partitioned byage prefixes when reaping a queue (see
350+
// sqsMsgByAgePrefixesForQueue) so a queue that is partitioned today
351+
// — or, hypothetically, that gains partitions across a future
352+
// migration — does not strand its old data.
353+
func sqsPartitionedMsgByAgeKey(queueName string, partition uint32, gen uint64, sendTimestampMs int64, messageID string) []byte {
354+
buf := make([]byte, 0, len(SqsPartitionedMsgByAgePrefix)+sqsKeyCapLarge)
355+
buf = append(buf, SqsPartitionedMsgByAgePrefix...)
356+
buf = append(buf, encodeSQSSegment(queueName)...)
357+
buf = append(buf, sqsPartitionedQueueTerminator)
358+
buf = appendU32(buf, partition)
359+
buf = appendU64(buf, gen)
360+
buf = appendU64(buf, uint64MaxZero(sendTimestampMs))
361+
buf = append(buf, encodeSQSSegment(messageID)...)
362+
return buf
363+
}
364+
365+
// sqsPartitionedMsgByAgePrefixForQueueAllPartitions returns the
366+
// prefix shared by every partitioned byage entry for one queue
367+
// (across all partitions and all generations). The reaper uses it
368+
// alongside the legacy prefix to enumerate orphan records on a
369+
// partitioned queue.
370+
func sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName string) []byte {
371+
buf := make([]byte, 0, len(SqsPartitionedMsgByAgePrefix)+sqsKeyCapSmall)
372+
buf = append(buf, SqsPartitionedMsgByAgePrefix...)
373+
buf = append(buf, encodeSQSSegment(queueName)...)
374+
buf = append(buf, sqsPartitionedQueueTerminator)
375+
return buf
376+
}
377+
378+
// sqsMsgByAgePrefixesForQueue returns the {legacy, partitioned}
379+
// prefix pair for a queue's byage records. The reaper iterates both:
380+
// a queue created before HT-FIFO landed has only legacy entries; a
381+
// partitioned queue created after PR 5 has only partitioned entries;
382+
// no queue has both today, but enumerating both keeps the reaper
383+
// future-proof against an offline-rebuild migration that produces a
384+
// mixed-prefix queue.
385+
func sqsMsgByAgePrefixesForQueue(queueName string) [][]byte {
386+
return [][]byte{
387+
sqsMsgByAgePrefixAllGenerations(queueName),
388+
sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName),
389+
}
390+
}
391+
392+
// sqsPartitionedByAgeKeyHeaderLen is the byte length of the
393+
// (partition, gen, ts) header that follows the queue segment in a
394+
// partitioned byage key — one big-endian uint32 plus two big-endian
395+
// uint64s.
396+
const sqsPartitionedByAgeKeyHeaderLen = 4 + 8 + 8
397+
398+
// parseSqsPartitionedMsgByAgeKey reverses sqsPartitionedMsgByAgeKey.
399+
// Returns ok=false when the key does not match the expected partitioned
400+
// shape. The reaper tries this parser when parseSqsMsgByAgeKey fails,
401+
// so it can handle a queue with both legacy and partitioned entries
402+
// (today only one or the other applies, but the dual-parse keeps the
403+
// reaper safe against future migrations).
404+
func parseSqsPartitionedMsgByAgeKey(key []byte, queueName string) (sqsPartitionedMsgByAgeRecord, bool) {
405+
expected := sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName)
406+
if !bytes.HasPrefix(key, expected) {
407+
return sqsPartitionedMsgByAgeRecord{}, false
408+
}
409+
rest := key[len(expected):]
410+
if len(rest) < sqsPartitionedByAgeKeyHeaderLen {
411+
return sqsPartitionedMsgByAgeRecord{}, false
412+
}
413+
partition := binary.BigEndian.Uint32(rest[:4])
414+
gen := binary.BigEndian.Uint64(rest[4:12])
415+
tsRaw := binary.BigEndian.Uint64(rest[12:sqsPartitionedByAgeKeyHeaderLen])
416+
msgIDEnc := string(rest[sqsPartitionedByAgeKeyHeaderLen:])
417+
msgID, err := decodeSQSSegment(msgIDEnc)
418+
if err != nil {
419+
return sqsPartitionedMsgByAgeRecord{}, false
420+
}
421+
// Wall-clock millis fits in int63 — see parseSqsMsgByAgeKey for
422+
// the same bound.
423+
if tsRaw > 1<<63-1 {
424+
return sqsPartitionedMsgByAgeRecord{}, false
425+
}
426+
return sqsPartitionedMsgByAgeRecord{
427+
Partition: partition,
428+
Generation: gen,
429+
SendTimestampMs: int64(tsRaw),
430+
MessageID: msgID,
431+
}, true
432+
}
433+
434+
// sqsPartitionedMsgByAgeRecord is the parsed shape of a partitioned
435+
// byage key. Mirrors sqsMsgByAgeRecord with partition added.
436+
type sqsPartitionedMsgByAgeRecord struct {
437+
Partition uint32
438+
Generation uint64
439+
SendTimestampMs int64
440+
MessageID string
441+
}
442+
443+
// appendU32 mirrors appendU64 for the partition segment.
444+
func appendU32(dst []byte, v uint32) []byte {
445+
var buf [4]byte
446+
binary.BigEndian.PutUint32(buf[:], v)
447+
return append(dst, buf[:]...)
448+
}

0 commit comments

Comments
 (0)