diff --git a/internal/backup/sqs.go b/internal/backup/sqs.go index de31d7fa..58567b2d 100644 --- a/internal/backup/sqs.go +++ b/internal/backup/sqs.go @@ -9,6 +9,9 @@ import ( "os" "path/filepath" "sort" + "strconv" + "strings" + "unicode/utf8" "github.com/cockroachdb/errors" ) @@ -28,6 +31,22 @@ const ( SQSMsgByAgePrefix = "!sqs|msg|byage|" SQSMsgDedupPrefix = "!sqs|msg|dedup|" SQSMsgGroupPrefix = "!sqs|msg|group|" + + // HT-FIFO partitioned-keyspace discriminator. Kept in sync with + // adapter/sqs_keys.go sqsPartitionedDiscriminator. The literal + // "p|" segment is inserted between the family and the queue + // segment in every partitioned key: + // + // legacy: !sqs|msg|| + // partitioned: !sqs|msg||p|| + // + // validateQueueName rejects raw '|' in queue names, so a legacy + // queue name can never start with the literal byte 'p' followed + // by '|'; the discriminator unambiguously selects the parser + // variant. Codex P1 round 9. + sqsPartitionedDiscriminator = "p|" + // partitionBytes is the fixed BE-uint32 partition field width. + sqsPartitionBytes = 4 ) // Stored value magic prefixes (mirrors adapter/sqs_catalog.go and @@ -90,6 +109,18 @@ type sqsQueueState struct { name string // decoded queue name; populated on meta arrival meta *sqsQueueMetaPublic messages []sqsMessageRecord + // activeGen captures the queue's current generation, parsed + // from a !sqs|queue|gen| record. PurgeQueue and + // DeleteQueue bump the generation so the reaper can later + // drop residual !sqs|msg|data|<...> rows; if + // the snapshot is taken before reaping completes those stale + // rows are still in the keyspace and must NOT be emitted to + // messages.jsonl. Zero means "no gen record observed yet" and + // disables the filter (the live cluster always writes a gen + // record on CreateQueue, so a missing gen at backup time + // means we have an orphan-only queue and the orphan branch + // already drops messages). Codex P1 round 10. + activeGen uint64 // internalBuf accumulates side records in their on-disk shape if // includeSideRecords is on. Each line is the encoded prefix + // hex(rest-of-key) + value (b64) — implementation-grade detail @@ -106,6 +137,14 @@ type sqsInternalRecord struct { // sqsQueueMetaPublic is the dump-format projection of the live // adapter/sqs_catalog.go sqsQueueMeta. Field names match the AWS-style // vocabulary an external restore tool would use. +// +// PartitionCount, FifoThroughputLimit, and DeduplicationScope mirror +// the HT-FIFO attributes captured at CreateQueue time. The adapter +// rejects mutating these via SetQueueAttributes (they are immutable +// per AWS contract), so dropping them at backup would silently +// recreate single-partition / default-routing / queue-scoped-dedup +// queues on restore — non-fidelity-preserving for any partitioned +// FIFO workload. Codex P1 round 12. type sqsQueueMetaPublic struct { FormatVersion uint32 `json:"format_version"` Name string `json:"name"` @@ -117,6 +156,40 @@ type sqsQueueMetaPublic struct { ReceiveMessageWaitSeconds int64 `json:"receive_message_wait_seconds,omitempty"` MaximumMessageSize int64 `json:"maximum_message_size,omitempty"` RedrivePolicy string `json:"redrive_policy,omitempty"` + PartitionCount uint32 `json:"partition_count,omitempty"` + FifoThroughputLimit string `json:"fifo_throughput_limit,omitempty"` + DeduplicationScope string `json:"deduplication_scope,omitempty"` +} + +// sqsMessageBody is the dump-format projection's body field. It marshals +// as a JSON string when the bytes are valid UTF-8 (the AWS SQS contract +// — body is XML-text), so restorers can pipe each `body` straight into +// SendMessage. For non-UTF-8 bytes the encoder falls back to a +// `{"base64":"..."}` envelope so binary payloads still round-trip +// without lossy replacement-character rewrites. Codex P1 round 9. +type sqsMessageBody []byte + +// MarshalJSON implements json.Marshaler. +func (b sqsMessageBody) MarshalJSON() ([]byte, error) { + if utf8.Valid(b) { + // Emit as a plain JSON string. json.Marshal handles + // escaping (`"`, `\`, control chars) — the bytes that + // reach this path are valid UTF-8, so no information is + // lost. + out, err := json.Marshal(string(b)) + if err != nil { + return nil, errors.WithStack(err) + } + return out, nil + } + envelope := struct { + Base64 string `json:"base64"` + }{Base64: base64.RawURLEncoding.EncodeToString(b)} + out, err := json.Marshal(envelope) + if err != nil { + return nil, errors.WithStack(err) + } + return out, nil } // sqsMessageRecord is the dump-format projection. Mirrors the live @@ -126,7 +199,7 @@ type sqsQueueMetaPublic struct { // round-trip; the encoder zeroes the visibility-state fields by default. type sqsMessageRecord struct { MessageID string `json:"message_id"` - Body []byte `json:"body"` + Body sqsMessageBody `json:"body"` MD5OfBody string `json:"md5_of_body,omitempty"` MD5OfMessageAttributes string `json:"md5_of_message_attributes,omitempty"` MessageAttributes map[string]json.RawMessage `json:"message_attributes,omitempty"` @@ -205,6 +278,28 @@ func (s *SQSEncoder) HandleQueueMeta(key, value []byte) error { return nil } +// HandleQueueGen processes one !sqs|queue|gen| record. The +// value is a base-10 decimal string holding the queue's current +// generation (mirrors adapter/sqs_catalog.go's CreateQueue Put: the +// live cluster writes strconv.FormatUint(gen, 10)). Capturing +// activeGen lets flushQueue drop messages tagged with older +// generations — those are residual rows left by PurgeQueue / +// DeleteQueue that the reaper has not yet cleaned, and emitting +// them to messages.jsonl would resurrect purged messages on +// restore. Codex P1 round 10. +func (s *SQSEncoder) HandleQueueGen(key, value []byte) error { + encoded, err := stripPrefixSegment(key, []byte(SQSQueueGenPrefix)) + if err != nil { + return err + } + gen, err := strconv.ParseUint(strings.TrimSpace(string(value)), 10, 64) //nolint:mnd // 10 == decimal radix + if err != nil { + return errors.Wrap(ErrSQSMalformedKey, err.Error()) + } + s.queueState(encoded).activeGen = gen + return nil +} + // HandleMessageData processes one !sqs|msg|data| // record. The encoded queue segment is parsed out of the key and used as // the per-queue routing key; the message is buffered until Finalize so it @@ -252,18 +347,23 @@ func (s *SQSEncoder) HandleSideRecord(prefix string, key, value []byte) error { } // Finalize flushes every queue's _queue.json and messages.jsonl. Queues -// with buffered messages but no meta record (orphans) emit a warning and -// are skipped — restoring orphan messages without a queue config would -// silently create a queue with default settings, which is rarely what -// the operator wants. +// with buffered messages but no meta record (orphans) emit a warning +// and have their messages dropped — restoring orphan messages without +// a queue config would silently create a queue with default settings, +// which is rarely what the operator wants. However, if +// --include-sqs-side-records is on and this orphan queue has buffered +// side records (vis/byage/dedup/group/tombstone), those are still +// flushed under the encoded-prefix directory: the most common reason +// for a missing meta is a DeleteQueue that left tombstones, and +// dropping exactly those records is the opposite of what the operator +// asked for. Codex P2 round 8. func (s *SQSEncoder) Finalize() error { var firstErr error for _, st := range s.queues { if st.meta == nil { - s.emitWarn("sqs_orphan_messages", - "encoded_queue", st.encoded, - "buffered_messages", len(st.messages), - "hint", "no !sqs|queue|meta record matched this encoded prefix; messages dropped from the dump") + if err := s.flushOrphanQueueSideRecords(st); err != nil && firstErr == nil { + firstErr = err + } continue } if err := s.flushQueue(st); err != nil && firstErr == nil { @@ -273,6 +373,30 @@ func (s *SQSEncoder) Finalize() error { return firstErr } +// flushOrphanQueueSideRecords emits buffered side records for a queue +// whose !sqs|queue|meta row never arrived. Without this branch, +// --include-sqs-side-records silently drops the post-DeleteQueue +// tombstones and dedup-window history operators most often opt in +// for. The orphan dir is named by the encoded prefix because no +// decoded queue name is available; restore tools can join it with +// the messages-dropped warning to reconstruct context. +func (s *SQSEncoder) flushOrphanQueueSideRecords(st *sqsQueueState) error { + s.emitWarn("sqs_orphan_messages", + "encoded_queue", st.encoded, + "buffered_messages", len(st.messages), + "buffered_side_records", len(st.internalBuf), + "hint", "no !sqs|queue|meta record matched this encoded prefix; messages dropped from the dump") + if !s.includeSideRecords || len(st.internalBuf) == 0 { + return nil + } + // Use the encoded prefix as the directory name — it's the only + // stable identifier available when meta is missing. Suffix it + // with `.orphan` so a restore tool cannot mistake it for a real + // queue dir produced from a successful meta flush. + dir := filepath.Join(s.outRoot, "sqs", st.encoded+".orphan") + return s.flushInternals(dir, st.internalBuf) +} + func (s *SQSEncoder) flushQueue(st *sqsQueueState) error { dir := filepath.Join(s.outRoot, "sqs", EncodeSegment([]byte(st.name))) if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode @@ -281,23 +405,33 @@ func (s *SQSEncoder) flushQueue(st *sqsQueueState) error { if err := writeFileAtomic(filepath.Join(dir, "_queue.json"), mustMarshalIndent(st.meta)); err != nil { return err } - if len(st.messages) == 0 { - return nil - } - sortMessagesForEmit(st.messages) - jl, err := openJSONL(filepath.Join(dir, "messages.jsonl")) - if err != nil { - return err - } - for i := range st.messages { - if err := jl.enc.Encode(st.messages[i]); err != nil { - _ = closeJSONL(jl) - return errors.WithStack(err) + // Drop messages tagged with stale generations: PurgeQueue and + // DeleteQueue bump the queue's gen but the reaper deletes the + // affected !sqs|msg|data| rows asynchronously. A snapshot taken + // mid-reap would otherwise resurrect purged messages on restore. + // activeGen == 0 means we did not see a !sqs|queue|gen| record + // for this queue; preserving the legacy behaviour (no filter) + // is the safe fallback because every CreateQueue writes a gen. + // Codex P1 round 10. + visibleMessages, dropped := filterStaleGenMessages(st.messages, st.activeGen) + if dropped > 0 { + s.emitWarn("sqs_stale_generation_messages_dropped", + "queue", st.name, + "active_gen", st.activeGen, + "dropped", dropped, + "hint", "messages with mismatched queue_generation suppressed; reaper had not finished cleanup at snapshot time") + } + if len(visibleMessages) > 0 { + if err := writeMessagesJSONL(dir, visibleMessages); err != nil { + return err } } - if err := closeJSONL(jl); err != nil { - return err - } + // Side records ("--include-sqs-side-records") flush regardless of + // whether the queue has any current messages. A purged or + // metadata-only queue can legitimately have side records (e.g., + // dedup window history, vis/byage entries from in-flight reaper + // state) and dropping them when messages == 0 silently weakens + // the --include-sqs-side-records contract — flagged as Codex P2. if len(st.internalBuf) > 0 { if err := s.flushInternals(dir, st.internalBuf); err != nil { return err @@ -351,24 +485,35 @@ func stripPrefixSegment(key, prefix []byte) (string, error) { } // parseSQSMessageDataKey peels !sqs|msg|data| -// and returns encQueue. The gen and msgID are not surfaced because the -// dump format pulls QueueGeneration / MessageID out of the value record. +// (or its partitioned variant !sqs|msg|data|p||) +// and returns encQueue. The gen, partition, and msgID are not surfaced +// because the dump format pulls those fields out of the value record. // -// Boundary detection: encQueue is base64url-no-padding, alphabet +// Boundary detection (legacy): encQueue is base64url-no-padding, alphabet // [A-Za-z0-9-_]. The gen is 8 raw bytes. For any production gen value // (< 2^56), the first byte is 0x00, which is not in the base64url // alphabet, so the first non-alphabet byte is the gen-start. We document // this assumption rather than build a more elaborate prober — gens // approaching 2^56 would have already wrapped many other invariants. +// +// Boundary detection (partitioned): the queue segment is terminated by +// a literal '|' before the fixed-width partition u32. Codex P1 round 9. func parseSQSMessageDataKey(key []byte) (string, error) { rest, err := stripPrefixSegment(key, []byte(SQSMsgDataPrefix)) if err != nil { return "", err } + if isPartitionedRest(rest) { + return parseSQSPartitionedQueueAndTrailer(rest, true /*hasMsgID*/, key) + } idx := scanBase64URLBoundary(rest) - if idx == 0 || idx+genBytes > len(rest) { + // idx == 0 -> no queue segment; idx+genBytes >= len(rest) -> no + // room for any msg-id segment after the gen. Both are malformed. + // AWS SQS message IDs are non-empty by construction, so an empty + // msg-id segment can never be a legitimate snapshot record. + if idx == 0 || idx+genBytes >= len(rest) { return "", errors.Wrapf(ErrSQSMalformedKey, - "queue segment boundary not found in %q", key) + "queue segment or message-id segment not found in %q", key) } encQueue := rest[:idx] if _, err := base64.RawURLEncoding.DecodeString(encQueue); err != nil { @@ -388,20 +533,81 @@ func parseSQSMessageDataKey(key []byte) (string, error) { // (vis/byage/dedup/group/tombstone). Callers in this PR only need to // know the encoded queue segment for routing; full structural parsing // of side-record keys is deferred until Phase 0a's reaper-aware mode -// lands. +// lands. Both the legacy and partitioned (`p||...`) shapes are +// recognised — Codex P2 round 9. func parseSQSGenericKey(key []byte, prefix string) (string, error) { rest, err := stripPrefixSegment(key, []byte(prefix)) if err != nil { return "", err } + if isPartitionedRest(rest) { + return parseSQSPartitionedQueueAndTrailer(rest, false /*hasMsgID*/, key) + } idx := scanBase64URLBoundary(rest) - if idx == 0 { + // All side-record key shapes (vis / byage / dedup / group / + // tombstone) terminate the encoded queue segment with at least + // one binary trailer (the gen u64), so idx must be strictly less + // than len(rest). idx == len(rest) means the trailer is missing — + // either a truncated key or the wrong prefix. + if idx == 0 || idx == len(rest) { return "", errors.Wrapf(ErrSQSMalformedKey, "queue segment not found after prefix %q", prefix) } return rest[:idx], nil } +// isPartitionedRest reports whether `rest` (the suffix after a +// !sqs|msg|| prefix has been stripped) starts with the +// HT-FIFO partitioned discriminator "p|". +func isPartitionedRest(rest string) bool { + return strings.HasPrefix(rest, sqsPartitionedDiscriminator) +} + +// parseSQSPartitionedQueueAndTrailer parses the partitioned suffix +// `p||[]`. Returns the +// encoded queue segment when the structural invariants hold: +// +// - the discriminator is followed by a non-empty queue segment +// - the queue segment is terminated by a literal '|' +// - the trailer carries at least partition u32 + gen u64 bytes +// - if hasMsgID == true, an additional non-empty base64url +// msg-id segment follows the trailer. +// +// Anything else surfaces ErrSQSMalformedKey rather than emitting +// records under a wrong queue. +func parseSQSPartitionedQueueAndTrailer(rest string, hasMsgID bool, originalKey []byte) (string, error) { + body := rest[len(sqsPartitionedDiscriminator):] + terminator := strings.IndexByte(body, '|') + if terminator <= 0 { + return "", errors.Wrapf(ErrSQSMalformedKey, + "partitioned key missing queue terminator in %q", originalKey) + } + encQueue := body[:terminator] + if _, err := base64.RawURLEncoding.DecodeString(encQueue); err != nil { + return "", errors.Wrap(ErrSQSMalformedKey, err.Error()) + } + trailer := body[terminator+1:] + const fixedTrailerBytes = sqsPartitionBytes + genBytes + if hasMsgID { + // Need partition+gen plus at least 1 byte of msg-id. + if len(trailer) <= fixedTrailerBytes { + return "", errors.Wrapf(ErrSQSMalformedKey, + "partitioned msg-data key missing message-id in %q", originalKey) + } + encMsgID := trailer[fixedTrailerBytes:] + if _, err := base64.RawURLEncoding.DecodeString(encMsgID); err != nil { + return "", errors.Wrap(ErrSQSMalformedKey, err.Error()) + } + return encQueue, nil + } + // Side records: trailer must carry at least partition+gen. + if len(trailer) < fixedTrailerBytes { + return "", errors.Wrapf(ErrSQSMalformedKey, + "partitioned side-record key trailer truncated in %q", originalKey) + } + return encQueue, nil +} + // scanBase64URLBoundary returns the index of the first byte in s that is // NOT in the base64url alphabet [A-Za-z0-9-_]. Returns len(s) if every // byte is alphabet. @@ -448,6 +654,10 @@ func decodeSQSQueueMetaValue(value []byte) (*sqsQueueMetaPublic, error) { ReceiveMessageWaitSeconds int64 `json:"receive_message_wait_seconds"` MaximumMessageSize int64 `json:"maximum_message_size"` RedrivePolicy string `json:"redrive_policy"` + // HT-FIFO immutable attributes — see adapter/sqs_catalog.go. + PartitionCount uint32 `json:"partition_count"` + FifoThroughputLimit string `json:"fifo_throughput_limit"` + DeduplicationScope string `json:"deduplication_scope"` } if err := json.Unmarshal(body, &live); err != nil { return nil, errors.Wrap(ErrSQSInvalidQueueMeta, err.Error()) @@ -463,6 +673,9 @@ func decodeSQSQueueMetaValue(value []byte) (*sqsQueueMetaPublic, error) { ReceiveMessageWaitSeconds: live.ReceiveMessageWaitSeconds, MaximumMessageSize: live.MaximumMessageSize, RedrivePolicy: live.RedrivePolicy, + PartitionCount: live.PartitionCount, + FifoThroughputLimit: live.FifoThroughputLimit, + DeduplicationScope: live.DeduplicationScope, }, nil } @@ -522,6 +735,50 @@ func decodeSQSMessageValue(value []byte) (sqsMessageRecord, error) { }, nil } +// writeMessagesJSONL emits the buffered (visible) messages to +// messages.jsonl in send-order. Split out of flushQueue to keep that +// function's cyclomatic complexity under the project's linter ceiling. +func writeMessagesJSONL(dir string, msgs []sqsMessageRecord) error { + sortMessagesForEmit(msgs) + jl, err := openJSONL(filepath.Join(dir, "messages.jsonl")) + if err != nil { + return err + } + for i := range msgs { + if err := jl.enc.Encode(msgs[i]); err != nil { + _ = closeJSONL(jl) + return errors.WithStack(err) + } + } + if err := closeJSONL(jl); err != nil { + return err + } + return nil +} + +// filterStaleGenMessages partitions in into (visible, droppedCount). +// A message is visible if activeGen is zero (no gen record observed +// for this queue, which means we cannot make a confident call) OR +// its QueueGeneration matches activeGen. Anything else is residual +// state from a PurgeQueue / DeleteQueue that the reaper has not yet +// removed; emitting it would resurrect purged messages on restore. +// Codex P1 round 10. +func filterStaleGenMessages(in []sqsMessageRecord, activeGen uint64) ([]sqsMessageRecord, int) { + if activeGen == 0 { + return in, 0 + } + out := make([]sqsMessageRecord, 0, len(in)) + dropped := 0 + for _, m := range in { + if m.QueueGeneration == activeGen { + out = append(out, m) + continue + } + dropped++ + } + return out, dropped +} + func sortMessagesForEmit(msgs []sqsMessageRecord) { sort.SliceStable(msgs, func(i, j int) bool { a, b := msgs[i], msgs[j] diff --git a/internal/backup/sqs_test.go b/internal/backup/sqs_test.go index 9307c40a..c1a0bf90 100644 --- a/internal/backup/sqs_test.go +++ b/internal/backup/sqs_test.go @@ -2,9 +2,12 @@ package backup import ( "bufio" + "bytes" + "encoding/base64" "encoding/json" "os" "path/filepath" + "slices" "testing" "github.com/cockroachdb/errors" @@ -361,6 +364,383 @@ func TestSQS_IncludeSideRecordsBuffersBetweenFinalize(t *testing.T) { } } +func TestSQS_ParseMessageDataKey_RejectsEmptyMsgIDSegment(t *testing.T) { + t.Parallel() + // Synthesise a key whose msg-id segment is empty: prefix + + // base64url("q") + 8-byte gen, nothing after. AWS SQS message + // IDs are non-empty by construction; an empty trailer cannot be + // a legitimate snapshot record. + key := append([]byte(SQSMsgDataPrefix), []byte("cQ")...) + key = append(key, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) + if _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) { + t.Fatalf("err=%v want ErrSQSMalformedKey for empty msg-id", err) + } +} + +// TestSQS_QueueMetaPreservesHTFIFOAttributes is the regression for +// Codex P1 round 12: PartitionCount, FifoThroughputLimit, and +// DeduplicationScope are immutable HT-FIFO attributes captured at +// CreateQueue and rejected by SetQueueAttributes. Dropping them at +// backup time would silently recreate single-partition / default- +// routing / queue-scoped-dedup queues on restore — non-fidelity +// preserving for any partitioned FIFO workload. +func TestSQS_QueueMetaPreservesHTFIFOAttributes(t *testing.T) { + t.Parallel() + enc, root := newSQSEncoder(t) + queue := "ht-fifo.fifo" + val := encodeQueueMetaValue(t, map[string]any{ + "name": queue, + "is_fifo": true, + "content_based_dedup": true, + "visibility_timeout_seconds": 30, + "message_retention_seconds": 345600, + "partition_count": 4, + "fifo_throughput_limit": "perMessageGroupId", + "deduplication_scope": "messageGroup", + }) + if err := enc.HandleQueueMeta(EncodeQueueMetaKey(queue), val); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + got := readQueueJSON(t, filepath.Join(root, "sqs", queue, "_queue.json")) + if floatField(t, got, "partition_count") != 4 { + t.Fatalf("partition_count = %v want 4", got["partition_count"]) + } + if got["fifo_throughput_limit"] != "perMessageGroupId" { + t.Fatalf("fifo_throughput_limit = %v want perMessageGroupId", got["fifo_throughput_limit"]) + } + if got["deduplication_scope"] != "messageGroup" { + t.Fatalf("deduplication_scope = %v want messageGroup", got["deduplication_scope"]) + } +} + +// TestSQS_StaleGenerationMessagesDropped is the regression for Codex +// P1 round 10: PurgeQueue and DeleteQueue bump the queue's generation +// but the affected !sqs|msg|data||... rows are removed +// asynchronously by the reaper. A snapshot taken mid-cleanup would +// otherwise resurrect those purged messages on restore. The encoder +// now consults the !sqs|queue|gen| record and drops messages whose +// QueueGeneration does not match the active gen, emitting an +// `sqs_stale_generation_messages_dropped` warning for visibility. +func TestSQS_StaleGenerationMessagesDropped(t *testing.T) { + t.Parallel() + enc, root := newSQSEncoder(t) + var events []string + enc.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + queue := "q" + if err := enc.HandleQueueMeta(EncodeQueueMetaKey(queue), encodeQueueMetaValue(t, map[string]any{ + "name": queue, "visibility_timeout_seconds": 30, "message_retention_seconds": 60, + })); err != nil { + t.Fatal(err) + } + encQueue := base64.RawURLEncoding.EncodeToString([]byte(queue)) + genKey := append([]byte(SQSQueueGenPrefix), []byte(encQueue)...) + if err := enc.HandleQueueGen(genKey, []byte("7")); err != nil { + t.Fatal(err) + } + live := encodeMessageValue(t, map[string]any{ + "message_id": "live", + "body": []byte("ok"), + "send_timestamp_millis": 100, + "queue_generation": 7, + }) + if err := enc.HandleMessageData(EncodeMsgDataKey(queue, 7, "live"), live); err != nil { + t.Fatal(err) + } + stale := encodeMessageValue(t, map[string]any{ + "message_id": "ghost", + "body": []byte("from-prev-gen"), + "send_timestamp_millis": 50, + "queue_generation": 6, + }) + if err := enc.HandleMessageData(EncodeMsgDataKey(queue, 6, "ghost"), stale); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + recs := readMessagesJSONL(t, filepath.Join(root, "sqs", queue, "messages.jsonl")) + if len(recs) != 1 { + t.Fatalf("messages = %d want 1 (stale gen must drop)", len(recs)) + } + if recs[0]["message_id"] != "live" { + t.Fatalf("survived msg = %v want live", recs[0]["message_id"]) + } + if !slices.Contains(events, "sqs_stale_generation_messages_dropped") { + t.Fatalf("expected sqs_stale_generation_messages_dropped warning, got %v", events) + } +} + +// TestSQS_StaleGenerationFilterDisabledWithoutGenRecord asserts the +// safe fallback: a queue with no !sqs|queue|gen| record observed +// keeps the legacy behavior (no filter), so a backup that lacks the +// gen record does not silently lose every message. +func TestSQS_StaleGenerationFilterDisabledWithoutGenRecord(t *testing.T) { + t.Parallel() + enc, root := newSQSEncoder(t) + queue := "q" + if err := enc.HandleQueueMeta(EncodeQueueMetaKey(queue), encodeQueueMetaValue(t, map[string]any{ + "name": queue, "visibility_timeout_seconds": 30, "message_retention_seconds": 60, + })); err != nil { + t.Fatal(err) + } + val := encodeMessageValue(t, map[string]any{ + "message_id": "m1", + "body": []byte("payload"), + "send_timestamp_millis": 1000, + "queue_generation": 99, + }) + if err := enc.HandleMessageData(EncodeMsgDataKey(queue, 99, "m1"), val); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + recs := readMessagesJSONL(t, filepath.Join(root, "sqs", queue, "messages.jsonl")) + if len(recs) != 1 { + t.Fatalf("messages = %d want 1 (no gen record => no filter)", len(recs)) + } +} + +// TestSQS_MessageBodyEmittedAsTextForUTF8 is the regression for Codex +// P1 round 9 on PR #714: `body` was a `[]byte` field, so json.Encoder +// rendered it as base64 in messages.jsonl. Replaying that JSONL via +// SendMessage would push the base64 string itself as the body +// (e.g., `hello` becoming `aGVsbG8=`), corrupting application +// payloads. The dump-format projection now emits valid UTF-8 bodies +// as plain strings so the JSONL is restoration-equivalent. +func TestSQS_MessageBodyEmittedAsTextForUTF8(t *testing.T) { + t.Parallel() + enc, root := newSQSEncoder(t) + queue := "q" + if err := enc.HandleQueueMeta(EncodeQueueMetaKey(queue), encodeQueueMetaValue(t, map[string]any{ + "name": queue, "visibility_timeout_seconds": 30, "message_retention_seconds": 60, + })); err != nil { + t.Fatal(err) + } + val := encodeMessageValue(t, map[string]any{ + "message_id": "m1", + "body": []byte("hello"), + "send_timestamp_millis": 1, + "queue_generation": 1, + }) + if err := enc.HandleMessageData(EncodeMsgDataKey(queue, 1, "m1"), val); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + raw, err := os.ReadFile(filepath.Join(root, "sqs", queue, "messages.jsonl")) //nolint:gosec // test path + if err != nil { + t.Fatal(err) + } + if !bytes.Contains(raw, []byte(`"body":"hello"`)) { + t.Fatalf("body must serialise as plain string, got %s", raw) + } + if bytes.Contains(raw, []byte("aGVsbG8")) { + t.Fatalf("body must NOT serialise as base64, got %s", raw) + } +} + +// TestSQS_MessageBodyFallsBackToBase64ForBinary covers the binary +// fallback: when the body is not valid UTF-8, the projection emits +// a typed `{"base64":"..."}` envelope so restore tools can detect +// the encoded form rather than receiving a lossy +// replacement-character rewrite. +func TestSQS_MessageBodyFallsBackToBase64ForBinary(t *testing.T) { + t.Parallel() + enc, root := newSQSEncoder(t) + queue := "qbin" + if err := enc.HandleQueueMeta(EncodeQueueMetaKey(queue), encodeQueueMetaValue(t, map[string]any{ + "name": queue, "visibility_timeout_seconds": 30, "message_retention_seconds": 60, + })); err != nil { + t.Fatal(err) + } + // 0x80 is a continuation byte; a leading 0x80 makes the sequence + // invalid UTF-8. + val := encodeMessageValue(t, map[string]any{ + "message_id": "m1", + "body": []byte{0x80, 0xff, 0x01}, + "send_timestamp_millis": 1, + "queue_generation": 1, + }) + if err := enc.HandleMessageData(EncodeMsgDataKey(queue, 1, "m1"), val); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + raw, err := os.ReadFile(filepath.Join(root, "sqs", queue, "messages.jsonl")) //nolint:gosec // test path + if err != nil { + t.Fatal(err) + } + if !bytes.Contains(raw, []byte(`"body":{"base64":`)) { + t.Fatalf("binary body must use base64 envelope, got %s", raw) + } +} + +// TestSQS_ParsePartitionedMessageDataKey is the regression for Codex +// P1 round 9: HT-FIFO partitioned msg-data keys carry the literal "p|" +// discriminator before the queue segment and a fixed-width partition +// uint32 between the queue terminator and the gen. The parser must +// recognise this shape — the legacy heuristic would otherwise read "p" +// as the queue segment, fail base64 decode, and abort backup decoding +// for any cluster running partitioned FIFO queues. +func TestSQS_ParsePartitionedMessageDataKey(t *testing.T) { + t.Parallel() + encQueue := "cXVldWUx" // base64url("queue1") + encMsgID := "bXNnLTAwMQ" // base64url("msg-001") + // Layout: !sqs|msg|data|p|| + key := []byte(SQSMsgDataPrefix + sqsPartitionedDiscriminator + encQueue + "|") + key = append(key, 0x00, 0x00, 0x00, 0x07) // partition = 7 + key = append(key, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) // gen + key = append(key, []byte(encMsgID)...) + got, err := parseSQSMessageDataKey(key) + if err != nil { + t.Fatalf("parseSQSMessageDataKey: %v", err) + } + if got != encQueue { + t.Fatalf("got %q want %q", got, encQueue) + } +} + +// TestSQS_ParsePartitionedSideRecordKey covers parseSQSGenericKey for +// every partitioned side-record family (Codex P2 round 9). +func TestSQS_ParsePartitionedSideRecordKey(t *testing.T) { + t.Parallel() + encQueue := "cXVldWUy" // base64url("queue2") + cases := []string{ + SQSMsgVisPrefix, + SQSMsgByAgePrefix, + SQSMsgDedupPrefix, + SQSMsgGroupPrefix, + } + for _, prefix := range cases { + t.Run(prefix, func(t *testing.T) { + t.Parallel() + key := []byte(prefix + sqsPartitionedDiscriminator + encQueue + "|") + key = append(key, 0x00, 0x00, 0x00, 0x03) // partition + key = append(key, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) // gen + got, err := parseSQSGenericKey(key, prefix) + if err != nil { + t.Fatalf("parseSQSGenericKey(%q): %v", prefix, err) + } + if got != encQueue { + t.Fatalf("prefix %q: got %q want %q", prefix, got, encQueue) + } + }) + } +} + +// TestSQS_ParsePartitionedMessageDataKey_RejectsTruncatedTrailer +// guards against a partitioned key whose trailer is too short to +// carry partition + gen + msg-id. +func TestSQS_ParsePartitionedMessageDataKey_RejectsTruncatedTrailer(t *testing.T) { + t.Parallel() + encQueue := "cQ" + key := []byte(SQSMsgDataPrefix + sqsPartitionedDiscriminator + encQueue + "|") + // Only 4 partition bytes, no gen, no msg-id. + key = append(key, 0x00, 0x00, 0x00, 0x01) + if _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) { + t.Fatalf("err=%v want ErrSQSMalformedKey for truncated partitioned trailer", err) + } +} + +func TestSQS_ParseGenericKey_RejectsTrailerlessKey(t *testing.T) { + t.Parallel() + // Side-record key whose entire suffix is base64url-clean (no + // trailer bytes). Must surface as malformed rather than treating + // the whole tail as the queue segment. + key := append([]byte(SQSMsgVisPrefix), []byte("cQQQ")...) + if _, err := parseSQSGenericKey(key, SQSMsgVisPrefix); !errors.Is(err, ErrSQSMalformedKey) { + t.Fatalf("err=%v want ErrSQSMalformedKey for trailerless side-record key", err) + } +} + +func TestSQS_SideRecordsFlushedEvenWhenZeroMessages(t *testing.T) { + t.Parallel() + enc, root := newSQSEncoder(t) + enc.WithIncludeSideRecords(true) + queue := "purged" + if err := enc.HandleQueueMeta(EncodeQueueMetaKey(queue), encodeQueueMetaValue(t, map[string]any{ + "name": queue, "visibility_timeout_seconds": 30, "message_retention_seconds": 60, + })); err != nil { + t.Fatal(err) + } + // Side record only, no message-data records — purged queue scenario. + visKey := append([]byte(SQSMsgVisPrefix), []byte("cHVyZ2Vk")...) // base64url("purged") + visKey = append(visKey, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) + if err := enc.HandleSideRecord(SQSMsgVisPrefix, visKey, []byte("opaque")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + want := filepath.Join(root, "sqs", queue, "_internals", "side_records.jsonl") + if _, err := os.Stat(want); err != nil { + t.Fatalf("expected side-records file even with zero messages: %v", err) + } +} + +// TestSQS_OrphanQueueSideRecordsPreserved is the regression for Codex +// P2 round 8: when a queue's !sqs|queue|meta record is missing (e.g. +// after DeleteQueue left tombstones but the meta row was removed) and +// --include-sqs-side-records is on, side records were silently dropped +// alongside any orphan messages. The opt-in contract is the opposite: +// side records exist precisely so deletion-era state is recoverable. +// Now those records flush to a `.orphan` directory while the +// orphan-messages warning fires. +func TestSQS_OrphanQueueSideRecordsPreserved(t *testing.T) { + t.Parallel() + enc, root := newSQSEncoder(t) + enc.WithIncludeSideRecords(true) + var events []string + enc.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + // Side record arrives without a meta row first (deletion-era). + encQueue := "ZGVsZXRlZA" // base64url("deleted") + visKey := append([]byte(SQSMsgVisPrefix), []byte(encQueue)...) + visKey = append(visKey, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) + if err := enc.HandleSideRecord(SQSMsgVisPrefix, visKey, []byte("vis-record")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + if len(events) == 0 || events[0] != "sqs_orphan_messages" { + t.Fatalf("expected sqs_orphan_messages warning, got %v", events) + } + want := filepath.Join(root, "sqs", encQueue+".orphan", "_internals", "side_records.jsonl") + if _, err := os.Stat(want); err != nil { + t.Fatalf("expected orphan side-records file at %s: %v", want, err) + } +} + +// TestSQS_OrphanQueueSideRecordsSuppressedWhenOptOut asserts that the +// orphan-side-records branch is gated on --include-sqs-side-records: +// without the flag, the warning still fires but no .orphan dir is +// created (consistent with the default-off contract for side records). +func TestSQS_OrphanQueueSideRecordsSuppressedWhenOptOut(t *testing.T) { + t.Parallel() + enc, root := newSQSEncoder(t) + // includeSideRecords is off by default — HandleSideRecord drops + // the record at intake, so the buffer is empty by Finalize. We + // exercise the path anyway to confirm no .orphan dir is created. + encQueue := "b3B0LW91dA" // base64url("opt-out") + visKey := append([]byte(SQSMsgVisPrefix), []byte(encQueue)...) + visKey = append(visKey, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02) + if err := enc.HandleSideRecord(SQSMsgVisPrefix, visKey, []byte("vis")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + if _, err := os.Stat(filepath.Join(root, "sqs", encQueue+".orphan")); !os.IsNotExist(err) { + t.Fatalf("orphan dir created without --include-sqs-side-records: stat err=%v", err) + } +} + func TestSQS_DefaultDoesNotEmitInternals(t *testing.T) { t.Parallel() enc, root := newSQSEncoder(t)