Skip to content

Commit 1dc6884

Browse files
committed
Merge remote-tracking branch 'origin/feat/backup-phase0a-sqs' into feat/backup-phase0a-s3
2 parents b196bf7 + f923d99 commit 1dc6884

2 files changed

Lines changed: 151 additions & 4 deletions

File tree

internal/backup/sqs.go

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"path/filepath"
1111
"sort"
12+
"strings"
1213

1314
"github.com/cockroachdb/errors"
1415
)
@@ -28,6 +29,22 @@ const (
2829
SQSMsgByAgePrefix = "!sqs|msg|byage|"
2930
SQSMsgDedupPrefix = "!sqs|msg|dedup|"
3031
SQSMsgGroupPrefix = "!sqs|msg|group|"
32+
33+
// HT-FIFO partitioned-keyspace discriminator. Kept in sync with
34+
// adapter/sqs_keys.go sqsPartitionedDiscriminator. The literal
35+
// "p|" segment is inserted between the family and the queue
36+
// segment in every partitioned key:
37+
//
38+
// legacy: !sqs|msg|<family>|<encQueue><gen><rest>
39+
// partitioned: !sqs|msg|<family>|p|<encQueue>|<partition u32><gen u64><rest>
40+
//
41+
// validateQueueName rejects raw '|' in queue names, so a legacy
42+
// queue name can never start with the literal byte 'p' followed
43+
// by '|'; the discriminator unambiguously selects the parser
44+
// variant. Codex P1 round 9.
45+
sqsPartitionedDiscriminator = "p|"
46+
// partitionBytes is the fixed BE-uint32 partition field width.
47+
sqsPartitionBytes = 4
3148
)
3249

3350
// Stored value magic prefixes (mirrors adapter/sqs_catalog.go and
@@ -385,20 +402,27 @@ func stripPrefixSegment(key, prefix []byte) (string, error) {
385402
}
386403

387404
// parseSQSMessageDataKey peels !sqs|msg|data|<encQueue><gen 8B><encMsgID>
388-
// and returns encQueue. The gen and msgID are not surfaced because the
389-
// dump format pulls QueueGeneration / MessageID out of the value record.
405+
// (or its partitioned variant !sqs|msg|data|p|<encQueue>|<part 4B><gen 8B><encMsgID>)
406+
// and returns encQueue. The gen, partition, and msgID are not surfaced
407+
// because the dump format pulls those fields out of the value record.
390408
//
391-
// Boundary detection: encQueue is base64url-no-padding, alphabet
409+
// Boundary detection (legacy): encQueue is base64url-no-padding, alphabet
392410
// [A-Za-z0-9-_]. The gen is 8 raw bytes. For any production gen value
393411
// (< 2^56), the first byte is 0x00, which is not in the base64url
394412
// alphabet, so the first non-alphabet byte is the gen-start. We document
395413
// this assumption rather than build a more elaborate prober — gens
396414
// approaching 2^56 would have already wrapped many other invariants.
415+
//
416+
// Boundary detection (partitioned): the queue segment is terminated by
417+
// a literal '|' before the fixed-width partition u32. Codex P1 round 9.
397418
func parseSQSMessageDataKey(key []byte) (string, error) {
398419
rest, err := stripPrefixSegment(key, []byte(SQSMsgDataPrefix))
399420
if err != nil {
400421
return "", err
401422
}
423+
if isPartitionedRest(rest) {
424+
return parseSQSPartitionedQueueAndTrailer(rest, true /*hasMsgID*/, key)
425+
}
402426
idx := scanBase64URLBoundary(rest)
403427
// idx == 0 -> no queue segment; idx+genBytes >= len(rest) -> no
404428
// room for any msg-id segment after the gen. Both are malformed.
@@ -426,12 +450,16 @@ func parseSQSMessageDataKey(key []byte) (string, error) {
426450
// (vis/byage/dedup/group/tombstone). Callers in this PR only need to
427451
// know the encoded queue segment for routing; full structural parsing
428452
// of side-record keys is deferred until Phase 0a's reaper-aware mode
429-
// lands.
453+
// lands. Both the legacy and partitioned (`p|<queue>|...`) shapes are
454+
// recognised — Codex P2 round 9.
430455
func parseSQSGenericKey(key []byte, prefix string) (string, error) {
431456
rest, err := stripPrefixSegment(key, []byte(prefix))
432457
if err != nil {
433458
return "", err
434459
}
460+
if isPartitionedRest(rest) {
461+
return parseSQSPartitionedQueueAndTrailer(rest, false /*hasMsgID*/, key)
462+
}
435463
idx := scanBase64URLBoundary(rest)
436464
// All side-record key shapes (vis / byage / dedup / group /
437465
// tombstone) terminate the encoded queue segment with at least
@@ -445,6 +473,58 @@ func parseSQSGenericKey(key []byte, prefix string) (string, error) {
445473
return rest[:idx], nil
446474
}
447475

476+
// isPartitionedRest reports whether `rest` (the suffix after a
477+
// !sqs|msg|<family>| prefix has been stripped) starts with the
478+
// HT-FIFO partitioned discriminator "p|".
479+
func isPartitionedRest(rest string) bool {
480+
return strings.HasPrefix(rest, sqsPartitionedDiscriminator)
481+
}
482+
483+
// parseSQSPartitionedQueueAndTrailer parses the partitioned suffix
484+
// `p|<encQueue>|<partition 4B><gen 8B>[<encMsgID>]`. Returns the
485+
// encoded queue segment when the structural invariants hold:
486+
//
487+
// - the discriminator is followed by a non-empty queue segment
488+
// - the queue segment is terminated by a literal '|'
489+
// - the trailer carries at least partition u32 + gen u64 bytes
490+
// - if hasMsgID == true, an additional non-empty base64url
491+
// msg-id segment follows the trailer.
492+
//
493+
// Anything else surfaces ErrSQSMalformedKey rather than emitting
494+
// records under a wrong queue.
495+
func parseSQSPartitionedQueueAndTrailer(rest string, hasMsgID bool, originalKey []byte) (string, error) {
496+
body := rest[len(sqsPartitionedDiscriminator):]
497+
terminator := strings.IndexByte(body, '|')
498+
if terminator <= 0 {
499+
return "", errors.Wrapf(ErrSQSMalformedKey,
500+
"partitioned key missing queue terminator in %q", originalKey)
501+
}
502+
encQueue := body[:terminator]
503+
if _, err := base64.RawURLEncoding.DecodeString(encQueue); err != nil {
504+
return "", errors.Wrap(ErrSQSMalformedKey, err.Error())
505+
}
506+
trailer := body[terminator+1:]
507+
const fixedTrailerBytes = sqsPartitionBytes + genBytes
508+
if hasMsgID {
509+
// Need partition+gen plus at least 1 byte of msg-id.
510+
if len(trailer) <= fixedTrailerBytes {
511+
return "", errors.Wrapf(ErrSQSMalformedKey,
512+
"partitioned msg-data key missing message-id in %q", originalKey)
513+
}
514+
encMsgID := trailer[fixedTrailerBytes:]
515+
if _, err := base64.RawURLEncoding.DecodeString(encMsgID); err != nil {
516+
return "", errors.Wrap(ErrSQSMalformedKey, err.Error())
517+
}
518+
return encQueue, nil
519+
}
520+
// Side records: trailer must carry at least partition+gen.
521+
if len(trailer) < fixedTrailerBytes {
522+
return "", errors.Wrapf(ErrSQSMalformedKey,
523+
"partitioned side-record key trailer truncated in %q", originalKey)
524+
}
525+
return encQueue, nil
526+
}
527+
448528
// scanBase64URLBoundary returns the index of the first byte in s that is
449529
// NOT in the base64url alphabet [A-Za-z0-9-_]. Returns len(s) if every
450530
// byte is alphabet.

internal/backup/sqs_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,73 @@ func TestSQS_ParseMessageDataKey_RejectsEmptyMsgIDSegment(t *testing.T) {
374374
}
375375
}
376376

377+
// TestSQS_ParsePartitionedMessageDataKey is the regression for Codex
378+
// P1 round 9: HT-FIFO partitioned msg-data keys carry the literal "p|"
379+
// discriminator before the queue segment and a fixed-width partition
380+
// uint32 between the queue terminator and the gen. The parser must
381+
// recognise this shape — the legacy heuristic would otherwise read "p"
382+
// as the queue segment, fail base64 decode, and abort backup decoding
383+
// for any cluster running partitioned FIFO queues.
384+
func TestSQS_ParsePartitionedMessageDataKey(t *testing.T) {
385+
t.Parallel()
386+
encQueue := "cXVldWUx" // base64url("queue1")
387+
encMsgID := "bXNnLTAwMQ" // base64url("msg-001")
388+
// Layout: !sqs|msg|data|p|<encQueue>|<part 4B><gen 8B><encMsgID>
389+
key := []byte(SQSMsgDataPrefix + sqsPartitionedDiscriminator + encQueue + "|")
390+
key = append(key, 0x00, 0x00, 0x00, 0x07) // partition = 7
391+
key = append(key, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) // gen
392+
key = append(key, []byte(encMsgID)...)
393+
got, err := parseSQSMessageDataKey(key)
394+
if err != nil {
395+
t.Fatalf("parseSQSMessageDataKey: %v", err)
396+
}
397+
if got != encQueue {
398+
t.Fatalf("got %q want %q", got, encQueue)
399+
}
400+
}
401+
402+
// TestSQS_ParsePartitionedSideRecordKey covers parseSQSGenericKey for
403+
// every partitioned side-record family (Codex P2 round 9).
404+
func TestSQS_ParsePartitionedSideRecordKey(t *testing.T) {
405+
t.Parallel()
406+
encQueue := "cXVldWUy" // base64url("queue2")
407+
cases := []string{
408+
SQSMsgVisPrefix,
409+
SQSMsgByAgePrefix,
410+
SQSMsgDedupPrefix,
411+
SQSMsgGroupPrefix,
412+
}
413+
for _, prefix := range cases {
414+
t.Run(prefix, func(t *testing.T) {
415+
t.Parallel()
416+
key := []byte(prefix + sqsPartitionedDiscriminator + encQueue + "|")
417+
key = append(key, 0x00, 0x00, 0x00, 0x03) // partition
418+
key = append(key, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) // gen
419+
got, err := parseSQSGenericKey(key, prefix)
420+
if err != nil {
421+
t.Fatalf("parseSQSGenericKey(%q): %v", prefix, err)
422+
}
423+
if got != encQueue {
424+
t.Fatalf("prefix %q: got %q want %q", prefix, got, encQueue)
425+
}
426+
})
427+
}
428+
}
429+
430+
// TestSQS_ParsePartitionedMessageDataKey_RejectsTruncatedTrailer
431+
// guards against a partitioned key whose trailer is too short to
432+
// carry partition + gen + msg-id.
433+
func TestSQS_ParsePartitionedMessageDataKey_RejectsTruncatedTrailer(t *testing.T) {
434+
t.Parallel()
435+
encQueue := "cQ"
436+
key := []byte(SQSMsgDataPrefix + sqsPartitionedDiscriminator + encQueue + "|")
437+
// Only 4 partition bytes, no gen, no msg-id.
438+
key = append(key, 0x00, 0x00, 0x00, 0x01)
439+
if _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) {
440+
t.Fatalf("err=%v want ErrSQSMalformedKey for truncated partitioned trailer", err)
441+
}
442+
}
443+
377444
func TestSQS_ParseGenericKey_RejectsTrailerlessKey(t *testing.T) {
378445
t.Parallel()
379446
// Side-record key whose entire suffix is base64url-clean (no

0 commit comments

Comments
 (0)