diff --git a/internal/backup/keymap.go b/internal/backup/keymap.go new file mode 100644 index 00000000..99f55417 --- /dev/null +++ b/internal/backup/keymap.go @@ -0,0 +1,258 @@ +package backup + +import ( + "bufio" + "bytes" + "encoding/base64" + "encoding/json" + "io" + + "github.com/cockroachdb/errors" +) + +// jsonNullLiteral is the byte-for-byte JSON null token. We compare raw +// json.RawMessage values against this rather than relying on +// post-Unmarshal string emptiness, because `null` and `""` collapse to +// the same Go-side value once Unmarshal'd into a typed field. +var jsonNullLiteral = []byte("null") + +// KEYMAP.jsonl shape (one record per line): +// +// {"encoded":"","original":"","kind":"sha-fallback"} +// +// Records are written in encounter order (the order the encoder produced +// them) and never modified after write. The file is append-only; if the same +// encoded segment is written twice the reader keeps the last entry, but the +// encoder is expected not to emit duplicates within a single dump. +// +// Records exist only for entries whose original bytes are NOT recoverable +// from the encoded filename alone: +// +// - KindSHAFallback — segment is `__` +// (filename length exceeded EncodeSegment's 240-byte ceiling). +// - KindS3LeafData — S3 object renamed to `.elastickv-leaf-data` +// because both `` and `/...` existed in the same bucket. +// - KindMetaCollision — user S3 object key happened to end in +// `.elastickv-meta.json`; renamed under --rename-collisions. +// +// A consumer that does not care about reversing these to original bytes can +// ignore KEYMAP.jsonl entirely. +const ( + KindSHAFallback = "sha-fallback" + KindS3LeafData = "s3-leaf-data" + KindMetaCollision = "meta-suffix-rename" +) + +// keymapBufSizeWriter is the bufio.Writer buffer size for the JSONL writer. +// 64 KiB amortises the per-syscall cost across hundreds of small records +// without holding pathological amounts of memory. +const keymapBufSizeWriter = 64 << 10 + +// keymapBufSizeReader bounds bufio.Scanner's per-line buffer. KEYMAP +// records carry a ~240-byte encoded segment plus a base64url-encoded +// original key. The source store (store/mvcc_store.go +// maxSnapshotKeySize) caps a single key at 1 MiB; base64url expansion +// is ~4/3 (1 MiB → ~1.33 MiB), and the surrounding JSON object adds a +// fixed ~80 bytes of field names / brackets / commas. A 1 MiB cap was +// therefore not enough to cover a maximum-sized valid key — Codex P1 +// round 6 (commit 2cd58a93). 4 MiB carries 2× margin over the +// theoretical worst case while still bounding pathological lines, and +// matches the doubling cadence we'd want if the upstream key cap were +// ever raised. +const keymapBufSizeReader = 4 << 20 + +// ErrInvalidKeymapRecord is returned by Reader.Next when a line does not +// parse as a KeymapRecord (malformed JSON, missing field, malformed +// base64, etc.). +var ErrInvalidKeymapRecord = errors.New("backup: invalid KEYMAP.jsonl record") + +// KeymapRecord is a single mapping from encoded filename component back to +// the original key bytes. Original bytes are arbitrary (binary safe), so +// they are encoded as base64url-no-padding for transport in JSON. +type KeymapRecord struct { + // Encoded is the filename segment as it appears in the dump tree. + Encoded string `json:"encoded"` + // OriginalB64 is base64url-no-padding of the original key bytes. + OriginalB64 string `json:"original"` + // Kind classifies why this record exists; see Kind* constants. + Kind string `json:"kind"` +} + +// Original returns the decoded original key bytes from r.OriginalB64. +func (r KeymapRecord) Original() ([]byte, error) { + out, err := base64.RawURLEncoding.DecodeString(r.OriginalB64) + if err != nil { + return nil, errors.Wrap(ErrInvalidKeymapRecord, err.Error()) + } + return out, nil +} + +// KeymapWriter appends records to a KEYMAP.jsonl stream. Concurrent calls to +// Write are serialised through the underlying bufio.Writer; the caller is +// expected to use a single writer per scope. +type KeymapWriter struct { + bw *bufio.Writer + enc *json.Encoder + // count tracks how many records have been written; exposed so the caller + // can decide to omit an empty KEYMAP.jsonl file (per the spec, the file + // is omitted when no entries exist). + count int +} + +// NewKeymapWriter returns a writer that appends JSONL records to w. Close +// must be called to flush. +func NewKeymapWriter(w io.Writer) *KeymapWriter { + bw := bufio.NewWriterSize(w, keymapBufSizeWriter) + enc := json.NewEncoder(bw) + enc.SetEscapeHTML(false) // we never embed user keys in HTML; preserve `<>&` + return &KeymapWriter{bw: bw, enc: enc} +} + +// Write appends one KeymapRecord. The record is JSON-serialised with a +// trailing newline (json.Encoder behavior), giving the JSONL contract. +func (w *KeymapWriter) Write(rec KeymapRecord) error { + if rec.Encoded == "" { + return errors.WithStack(errors.New("backup: KEYMAP record encoded must be non-empty")) + } + if rec.Kind == "" { + return errors.WithStack(errors.New("backup: KEYMAP record kind must be non-empty")) + } + if err := w.enc.Encode(rec); err != nil { + return errors.WithStack(err) + } + w.count++ + return nil +} + +// WriteOriginal is a convenience wrapper that base64-encodes raw original +// bytes for the caller. +func (w *KeymapWriter) WriteOriginal(encoded string, original []byte, kind string) error { + return w.Write(KeymapRecord{ + Encoded: encoded, + OriginalB64: base64.RawURLEncoding.EncodeToString(original), + Kind: kind, + }) +} + +// Count returns the number of records written so far. Useful for the +// "omit empty KEYMAP file" decision after the dump completes. +func (w *KeymapWriter) Count() int { return w.count } + +// Close flushes any buffered records to the underlying writer. +func (w *KeymapWriter) Close() error { + if w.bw == nil { + return nil + } + if err := w.bw.Flush(); err != nil { + return errors.WithStack(err) + } + return nil +} + +// KeymapReader iterates JSONL records line-by-line. Memory footprint is +// bounded by keymapBufSizeReader regardless of file size. +type KeymapReader struct { + sc *bufio.Scanner + err error +} + +// NewKeymapReader wraps r so the caller can iterate records via Next. +func NewKeymapReader(r io.Reader) *KeymapReader { + sc := bufio.NewScanner(r) + sc.Buffer(make([]byte, 0, keymapBufSizeReader), keymapBufSizeReader) + return &KeymapReader{sc: sc} +} + +// Next decodes the next record. It returns (rec, true, nil) on success, +// (zero, false, nil) at end of stream, and (zero, false, err) on parse +// failure or I/O error. Once an error is returned the reader is sticky: +// subsequent calls return the same error. +// +// The base64-encoded `original` field is validated at parse time rather +// than lazily: a malformed dump must surface on the first read of the +// affected line, not propagate silently until a much later +// rec.Original() call. Same error class either way. +func (r *KeymapReader) Next() (KeymapRecord, bool, error) { + if r.err != nil { + return KeymapRecord{}, false, r.err + } + if !r.sc.Scan() { + if err := r.sc.Err(); err != nil { + r.err = errors.WithStack(err) + return KeymapRecord{}, false, r.err + } + return KeymapRecord{}, false, nil + } + line := r.sc.Bytes() + rec, err := decodeKeymapLine(line) + if err != nil { + r.err = err + return KeymapRecord{}, false, r.err + } + return rec, true, nil +} + +// decodeKeymapLine parses one JSONL record. It enforces three properties: +// +// 1. The record must contain `encoded`, `original`, and `kind` fields, +// and none of them may be the JSON literal `null` — Go unmarshals +// a null string field into "", and base64.DecodeString("") would +// silently accept it as an empty original key, rewriting the +// mapping. Codex P2 round 5 + P1 round 7-follow-up. +// 2. `encoded` and `kind` must be non-empty strings. +// 3. `original` (the base64) must be parseable at parse time so a +// corrupted dump fails on first read rather than at later +// Original() call. Codex P1 #179. +func decodeKeymapLine(line []byte) (KeymapRecord, error) { + // Two-phase decode: first into a presence-aware map so we can + // distinguish "field absent" from "field present and empty + // string"; then into the typed struct for value extraction. + var fields map[string]json.RawMessage + if err := json.Unmarshal(line, &fields); err != nil { + return KeymapRecord{}, errors.Wrap(ErrInvalidKeymapRecord, err.Error()) + } + for _, name := range [...]string{"encoded", "original", "kind"} { + raw, ok := fields[name] + if !ok { + return KeymapRecord{}, errors.Wrapf(ErrInvalidKeymapRecord, "missing field %q", name) + } + // `"original": null` round-trips to "" through json.Unmarshal + // into a `string` target, and base64.DecodeString("") would + // then silently accept it. Reject the JSON null literal + // explicitly so corrupted/truncated records don't slip + // through with empty-bytes mappings. + if bytes.Equal(raw, jsonNullLiteral) { + return KeymapRecord{}, errors.Wrapf(ErrInvalidKeymapRecord, "field %q is null", name) + } + } + var rec KeymapRecord + if err := json.Unmarshal(line, &rec); err != nil { + return KeymapRecord{}, errors.Wrap(ErrInvalidKeymapRecord, err.Error()) + } + if rec.Encoded == "" || rec.Kind == "" { + return KeymapRecord{}, errors.Wrap(ErrInvalidKeymapRecord, "missing encoded or kind") + } + if _, err := base64.RawURLEncoding.DecodeString(rec.OriginalB64); err != nil { + return KeymapRecord{}, errors.Wrap(ErrInvalidKeymapRecord, err.Error()) + } + return rec, nil +} + +// LoadKeymap reads every record from r into an in-memory map keyed by +// encoded segment. The last record wins on duplicates. Suitable for +// scopes where the keymap fits comfortably in memory; for large scopes +// callers should use KeymapReader directly. +func LoadKeymap(r io.Reader) (map[string]KeymapRecord, error) { + out := make(map[string]KeymapRecord) + rd := NewKeymapReader(r) + for { + rec, ok, err := rd.Next() + if err != nil { + return nil, err + } + if !ok { + return out, nil + } + out[rec.Encoded] = rec + } +} diff --git a/internal/backup/keymap_test.go b/internal/backup/keymap_test.go new file mode 100644 index 00000000..6175093d --- /dev/null +++ b/internal/backup/keymap_test.go @@ -0,0 +1,328 @@ +package backup + +import ( + "bytes" + "strings" + "testing" + + "github.com/cockroachdb/errors" +) + +type keymapCase struct { + encoded string + original []byte + kind string +} + +func keymapRoundTripCases() []keymapCase { + return []keymapCase{ + {"abcdef0123456789abcdef0123456789__hello", []byte("hello-but-much-longer-than-fits"), KindSHAFallback}, + {"path%2Fto.elastickv-leaf-data", []byte("path/to"), KindS3LeafData}, + {"foo.elastickv-meta.json.user-data", []byte("foo.elastickv-meta.json"), KindMetaCollision}, + {"binary-key", []byte{0x00, 0xff, 0x01, 0xfe}, KindSHAFallback}, + {"empty-original", []byte{}, KindSHAFallback}, + } +} + +func writeKeymapCases(t *testing.T, w *KeymapWriter, cases []keymapCase) { + t.Helper() + for _, c := range cases { + if err := w.WriteOriginal(c.encoded, c.original, c.kind); err != nil { + t.Fatalf("Write(%q): %v", c.encoded, err) + } + } +} + +func assertKeymapRecord(t *testing.T, got map[string]KeymapRecord, c keymapCase) { + t.Helper() + rec, ok := got[c.encoded] + if !ok { + t.Fatalf("missing record for %q", c.encoded) + } + if rec.Kind != c.kind { + t.Fatalf("%q kind = %q, want %q", c.encoded, rec.Kind, c.kind) + } + orig, err := rec.Original() + if err != nil { + t.Fatalf("%q Original: %v", c.encoded, err) + } + if !bytes.Equal(orig, c.original) { + t.Fatalf("%q original = %x, want %x", c.encoded, orig, c.original) + } +} + +func TestKeymapWriter_RoundTrip(t *testing.T) { + t.Parallel() + cases := keymapRoundTripCases() + var buf bytes.Buffer + w := NewKeymapWriter(&buf) + writeKeymapCases(t, w, cases) + if w.Count() != len(cases) { + t.Fatalf("Count = %d, want %d", w.Count(), len(cases)) + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + got, err := LoadKeymap(&buf) + if err != nil { + t.Fatalf("LoadKeymap: %v", err) + } + if len(got) != len(cases) { + t.Fatalf("loaded len = %d, want %d", len(got), len(cases)) + } + for _, c := range cases { + assertKeymapRecord(t, got, c) + } +} + +func TestKeymapWriter_RejectsEmptyEncoded(t *testing.T) { + t.Parallel() + w := NewKeymapWriter(&bytes.Buffer{}) + if err := w.Write(KeymapRecord{Encoded: "", Kind: KindSHAFallback}); err == nil { + t.Fatalf("expected error for empty encoded, got nil") + } + if err := w.Write(KeymapRecord{Encoded: "x", Kind: ""}); err == nil { + t.Fatalf("expected error for empty kind, got nil") + } +} + +func TestKeymapWriter_DoesNotEscapeHTML(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + w := NewKeymapWriter(&buf) + // json.Encoder escapes `<`, `>`, `&` by default; we disable that so + // keys containing these bytes encode/decode without surprise. + if err := w.WriteOriginal("a%3Cb%3Ec", []byte("ac&d"), KindSHAFallback); err != nil { + t.Fatalf("WriteOriginal: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + out := buf.String() + if strings.Contains(out, `<`) || strings.Contains(out, `>`) || strings.Contains(out, `&`) { + t.Fatalf("unwanted HTML escape in output: %q", out) + } + // And the base64 of "ac&d" appears intact: + if !strings.Contains(out, "YTxiPmMmZA") { + t.Fatalf("missing base64 of original in output: %q", out) + } +} + +func TestKeymapWriter_OmitEmpty(t *testing.T) { + t.Parallel() + // The "omit when empty" decision is the caller's; the writer just + // reports whether any records were written. + var buf bytes.Buffer + w := NewKeymapWriter(&buf) + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + if w.Count() != 0 { + t.Fatalf("Count = %d, want 0", w.Count()) + } + if buf.Len() != 0 { + t.Fatalf("empty writer produced output: %q", buf.String()) + } +} + +func TestKeymapReader_RejectsMalformedJSON(t *testing.T) { + t.Parallel() + r := NewKeymapReader(strings.NewReader("not-json\n")) + _, _, err := r.Next() + if !errors.Is(err, ErrInvalidKeymapRecord) { + t.Fatalf("err = %v, want ErrInvalidKeymapRecord", err) + } + // Sticky: subsequent calls return the same wrapped error class. + _, _, err2 := r.Next() + if !errors.Is(err2, ErrInvalidKeymapRecord) { + t.Fatalf("non-sticky error: %v", err2) + } +} + +func TestKeymapReader_RejectsRecordWithoutEncodedOrKind(t *testing.T) { + t.Parallel() + cases := []string{ + `{"original":"AA"}`, + `{"encoded":"","kind":"sha-fallback"}`, + `{"encoded":"x"}`, + `{"encoded":"x","kind":""}`, + } + for _, line := range cases { + r := NewKeymapReader(strings.NewReader(line + "\n")) + _, _, err := r.Next() + if !errors.Is(err, ErrInvalidKeymapRecord) { + t.Fatalf("input %q: err = %v, want ErrInvalidKeymapRecord", line, err) + } + } +} + +func TestKeymapReader_RejectsBlankLines(t *testing.T) { + t.Parallel() + // bufio.Scanner skips trailing newline but emits an empty line when one + // is in the middle of the stream. We require strict JSONL — every + // non-empty line must be a record. An empty line in the middle must + // surface as ErrInvalidKeymapRecord rather than be silently skipped, + // so truncated dumps are recognised. + input := `{"encoded":"x","original":"AA","kind":"sha-fallback"}` + "\n\n" + + `{"encoded":"y","original":"AA","kind":"sha-fallback"}` + "\n" + r := NewKeymapReader(strings.NewReader(input)) + if _, ok, err := r.Next(); !ok || err != nil { + t.Fatalf("first record: ok=%v err=%v", ok, err) + } + if _, _, err := r.Next(); !errors.Is(err, ErrInvalidKeymapRecord) { + t.Fatalf("blank line: err=%v want ErrInvalidKeymapRecord", err) + } +} + +func TestLoadKeymap_LastRecordWins(t *testing.T) { + t.Parallel() + input := `{"encoded":"x","original":"YQ","kind":"sha-fallback"}` + "\n" + + `{"encoded":"x","original":"Yg","kind":"sha-fallback"}` + "\n" + got, err := LoadKeymap(strings.NewReader(input)) + if err != nil { + t.Fatalf("LoadKeymap: %v", err) + } + rec, ok := got["x"] + if !ok { + t.Fatalf("missing record") + } + orig, err := rec.Original() + if err != nil { + t.Fatalf("Original: %v", err) + } + if string(orig) != "b" { + t.Fatalf("last-wins broken: got %q want %q", orig, "b") + } +} + +func TestKeymapRecord_OriginalRejectsBadBase64(t *testing.T) { + t.Parallel() + rec := KeymapRecord{Encoded: "x", OriginalB64: "!!!", Kind: KindSHAFallback} + if _, err := rec.Original(); !errors.Is(err, ErrInvalidKeymapRecord) { + t.Fatalf("err = %v, want ErrInvalidKeymapRecord", err) + } +} + +func TestKeymapReader_RejectsMalformedBase64AtParseTime(t *testing.T) { + t.Parallel() + // JSON parses fine; the structural fields are present; only the + // `original` base64 is malformed. The reader must catch this on + // the first Next() rather than defer it to a later Original() + // call — Codex P1 #179. + input := `{"encoded":"x","original":"!!!","kind":"sha-fallback"}` + "\n" + r := NewKeymapReader(strings.NewReader(input)) + _, _, err := r.Next() + if !errors.Is(err, ErrInvalidKeymapRecord) { + t.Fatalf("err=%v want ErrInvalidKeymapRecord on parse-time base64 validation", err) + } +} + +// TestKeymapReader_RejectsExplicitNullField is the regression for +// Codex P1 round 7-follow-up: `"original": null` round-trips through +// json.Unmarshal into rec.OriginalB64 == "", which base64.DecodeString +// then accepts as empty bytes — silently rewriting the mapping. The +// presence-aware decode must also reject the JSON `null` literal for +// each required field. +func TestKeymapReader_RejectsExplicitNullField(t *testing.T) { + t.Parallel() + cases := []struct { + name string + body string + }{ + {"null original", `{"encoded":"x","original":null,"kind":"sha-fallback"}`}, + {"null encoded", `{"encoded":null,"original":"AA","kind":"sha-fallback"}`}, + {"null kind", `{"encoded":"x","original":"AA","kind":null}`}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + r := NewKeymapReader(strings.NewReader(tc.body + "\n")) + _, _, err := r.Next() + if !errors.Is(err, ErrInvalidKeymapRecord) { + t.Fatalf("err=%v want ErrInvalidKeymapRecord on null field", err) + } + }) + } +} + +// TestKeymapReader_RejectsMissingOriginalField exercises Codex P2 round 5: +// a record that omits `original` entirely must not be accepted as if the +// original key were empty bytes, because base64.DecodeString("") succeeds +// silently. A truncated dump that drops `original` would otherwise rewrite +// the encoded->original mapping to empty bytes and break exact key recovery +// for SHA-fallback or collision-renamed entries. +func TestKeymapReader_RejectsMissingOriginalField(t *testing.T) { + t.Parallel() + // All structural keys present except `original`. Without the + // presence check this passes, because rec.OriginalB64 defaults to + // "" and base64 decode of "" succeeds. + input := `{"encoded":"x","kind":"sha-fallback"}` + "\n" + r := NewKeymapReader(strings.NewReader(input)) + _, _, err := r.Next() + if !errors.Is(err, ErrInvalidKeymapRecord) { + t.Fatalf("err=%v want ErrInvalidKeymapRecord on missing `original` field", err) + } + // Sticky: a subsequent Next must keep returning the same error class. + _, _, err2 := r.Next() + if !errors.Is(err2, ErrInvalidKeymapRecord) { + t.Fatalf("non-sticky error: %v", err2) + } +} + +// TestKeymapReader_AcceptsMaxSizedOriginal is the regression for Codex +// P1 round 6: a record whose `original` is the source store's maximum +// allowed key (1 MiB, per store/mvcc_store.go maxSnapshotKeySize) must +// round-trip cleanly. Before the bump the scanner cap was 1 MiB, but +// base64url expands the value to ~1.33 MiB; KeymapReader.Next failed +// with `bufio.Scanner: token too long` and the dump could not be +// loaded back. Test reads the largest legitimate KEYMAP line we will +// ever produce. +func TestKeymapReader_AcceptsMaxSizedOriginal(t *testing.T) { + t.Parallel() + const maxSnapshotKeyBytes = 1 << 20 + original := make([]byte, maxSnapshotKeyBytes) + for i := range original { + original[i] = byte(i % 251) //nolint:mnd // arbitrary byte spread + } + var buf bytes.Buffer + w := NewKeymapWriter(&buf) + if err := w.WriteOriginal("encoded-x", original, KindSHAFallback); err != nil { + t.Fatalf("WriteOriginal: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + rd := NewKeymapReader(&buf) + rec, ok, err := rd.Next() + if err != nil || !ok { + t.Fatalf("Next: ok=%v err=%v", ok, err) + } + got, err := rec.Original() + if err != nil { + t.Fatalf("Original: %v", err) + } + if !bytes.Equal(got, original) { + t.Fatalf("Original round-trip lost data: len got=%d want=%d", len(got), len(original)) + } +} + +// TestKeymapReader_AcceptsExplicitEmptyOriginal sanity-checks that an +// explicitly-empty `original` (the field is present, value is "") still +// parses. The contract is that absence is rejected, not emptiness. +func TestKeymapReader_AcceptsExplicitEmptyOriginal(t *testing.T) { + t.Parallel() + input := `{"encoded":"x","original":"","kind":"sha-fallback"}` + "\n" + r := NewKeymapReader(strings.NewReader(input)) + rec, ok, err := r.Next() + if err != nil || !ok { + t.Fatalf("err=%v ok=%v want a record", err, ok) + } + got, err := rec.Original() + if err != nil { + t.Fatalf("Original(): %v", err) + } + if len(got) != 0 { + t.Fatalf("Original = %q, want empty", got) + } +} diff --git a/internal/backup/manifest.go b/internal/backup/manifest.go new file mode 100644 index 00000000..da73c5ef --- /dev/null +++ b/internal/backup/manifest.go @@ -0,0 +1,452 @@ +package backup + +import ( + "bytes" + "encoding/json" + "io" + "time" + + "github.com/cockroachdb/errors" +) + +// MANIFEST.json is the only file a restore tool must read first. All other +// files in a dump are decoded from their on-disk path and contents. The +// manifest records: +// +// - format_version (the only field a restore tool MUST consult before +// trusting anything else) +// - phase ("phase0-snapshot-decode" or "phase1-live-pinned") so a +// consumer that cares about cross-shard PIT consistency can warn or +// refuse on Phase 0 inputs +// - source/origin metadata so a restore is auditable +// - exclusion flags + format-policy fields so the producer's rendering +// choices are explicit at restore time + +// CurrentFormatVersion is the format major-version this code emits and +// accepts. Restore-side code MUST refuse `format_version > current`. A +// minor-version bump (e.g., adding optional fields) does not change this +// constant. +const CurrentFormatVersion uint32 = 1 + +const ( + // PhasePhase0SnapshotDecode marks dumps produced by Phase 0a (offline + // snapshot decoder). + PhasePhase0SnapshotDecode = "phase0-snapshot-decode" + // PhasePhase1LivePinned marks dumps produced by Phase 1 (live PIT + // extraction with cluster-wide read_ts pinning). + PhasePhase1LivePinned = "phase1-live-pinned" +) + +const ( + // ChecksumAlgorithmSHA256 is the only checksum algorithm Phase 0a writes. + // Phase 1 may add others later (e.g. blake3) under the same field. + ChecksumAlgorithmSHA256 = "sha256" + // ChecksumFormatSha256sum identifies the line-oriented sha256sum(1) + // format used by the CHECKSUMS file. Operators verify with + // `sha256sum -c CHECKSUMS` from the dump root. + ChecksumFormatSha256sum = "sha256sum" + // EncodedFilenameCharsetRFC3986 is the EncodeSegment charset used for + // every non-S3-object filename in the dump. + EncodedFilenameCharsetRFC3986 = "rfc3986-unreserved-plus-percent" + // S3MetaSuffixDefault is the reserved suffix for the S3 sidecar + // metadata file (`.elastickv-meta.json`). + S3MetaSuffixDefault = ".elastickv-meta.json" + // S3CollisionStrategyLeafDataSuffix renames the shorter of two + // colliding S3 keys to `.elastickv-leaf-data` and records the + // rename in KEYMAP.jsonl. + S3CollisionStrategyLeafDataSuffix = "leaf-data-suffix" + // DynamoDBLayoutPerItem emits one item per file + // (`items//.json`); the user's stated default. + DynamoDBLayoutPerItem = "per-item" + // DynamoDBLayoutJSONL bundles items into `items/data-.jsonl` + // (opt-in via --dynamodb-bundle-mode jsonl). + DynamoDBLayoutJSONL = "jsonl" + // KeySegmentMaxBytesDefault matches EncodeSegment's maxSegmentBytes. + KeySegmentMaxBytesDefault uint32 = 240 +) + +// Source records where a Phase 0 dump came from. Phase 1 dumps leave Source +// nil and populate Live instead. +type Source struct { + // FSMPath is the absolute or relative path of the .fsm file the + // decoder consumed. + FSMPath string `json:"fsm_path"` + // FSMCRC32C is the CRC32C value the decoder verified against the + // .fsm file's footer (lowercase hex). + FSMCRC32C string `json:"fsm_crc32c,omitempty"` +} + +// Live records the cluster-wide pinning information that produced a Phase 1 +// dump. Phase 0 dumps leave this nil. +type Live struct { + // ReadTS is the pinned read_ts at which BackupScanner traversed the + // keyspace. + ReadTS uint64 `json:"read_ts"` + // PinTokenSHA256 is the hex SHA-256 of the pin_token issued by + // BeginBackup. Stored as a hash rather than the raw token so the + // manifest carries no auth-sensitive material. + PinTokenSHA256 string `json:"pin_token_sha256,omitempty"` +} + +// Adapters lists which scopes were dumped per adapter. The pointer +// values express two distinguishable on-disk states: +// +// - nil -> the adapter was excluded from this dump (e.g. +// `--adapter dynamodb,s3` filtered it out). The corresponding +// JSON key is absent. +// - non-nil pointer to Adapter{} -> the adapter was in scope but +// no scopes for it were emitted (no tables, no buckets, etc.). +// The JSON key is present with an empty object. +// - non-nil pointer to a populated Adapter -> the listed scopes +// were emitted. +// +// Storing pointers (rather than zero-value Adapter structs) is what +// keeps "excluded by filter" distinguishable from "included but +// empty" through json.Marshal — non-pointer fields would collapse +// both states into the same on-disk shape. +type Adapters struct { + DynamoDB *Adapter `json:"dynamodb,omitempty"` + S3 *Adapter `json:"s3,omitempty"` + Redis *Adapter `json:"redis,omitempty"` + SQS *Adapter `json:"sqs,omitempty"` +} + +// Adapter holds the scope identifiers for one adapter. Field names are +// per-adapter to match the protocol's natural vocabulary. +type Adapter struct { + Tables []string `json:"tables,omitempty"` + Buckets []string `json:"buckets,omitempty"` + Databases []uint32 `json:"databases,omitempty"` + Queues []string `json:"queues,omitempty"` +} + +// Exclusions records the producer-side flags that affected which records +// were emitted. Restore tools log these so an operator can correlate a +// surprising dump shape with the producer invocation. +type Exclusions struct { + IncludeIncompleteUploads bool `json:"include_incomplete_uploads"` + IncludeOrphans bool `json:"include_orphans"` + PreserveSQSVisibility bool `json:"preserve_sqs_visibility"` + IncludeSQSSideRecords bool `json:"include_sqs_side_records"` +} + +// Manifest is the on-disk MANIFEST.json structure. Field tags match the +// spec in docs/design/2026_04_29_proposed_snapshot_logical_decoder.md. +type Manifest struct { + FormatVersion uint32 `json:"format_version"` + Phase string `json:"phase"` + ElastickvVersion string `json:"elastickv_version,omitempty"` + ClusterID string `json:"cluster_id,omitempty"` + SnapshotIndex uint64 `json:"snapshot_index,omitempty"` + LastCommitTS uint64 `json:"last_commit_ts,omitempty"` + WallTimeISO string `json:"wall_time_iso"` + Source *Source `json:"source,omitempty"` + Live *Live `json:"live,omitempty"` + // Adapters and Exclusions are pointer types so ReadManifest can + // distinguish "section omitted entirely" (a corrupted or + // truncated dump that should fail validation) from "section + // present but populated with default values" (legitimate + // scope-everything-excluded). Codex P2 #146 (round 3). + Adapters *Adapters `json:"adapters"` + Exclusions *Exclusions `json:"exclusions"` + ChecksumAlgorithm string `json:"checksum_algorithm"` + ChecksumFormat string `json:"checksum_format"` + + EncodedFilenameCharset string `json:"encoded_filename_charset"` + KeySegmentMaxBytes uint32 `json:"key_segment_max_bytes"` + S3MetaSuffix string `json:"s3_meta_suffix"` + S3CollisionStrategy string `json:"s3_collision_strategy"` + DynamoDBLayout string `json:"dynamodb_layout"` +} + +// ErrUnsupportedFormatVersion is returned by ReadManifest when the on-disk +// format_version is greater than CurrentFormatVersion or zero. +var ErrUnsupportedFormatVersion = errors.New("backup: manifest format_version unsupported") + +// ErrInvalidManifest is returned by ReadManifest when the JSON parses but +// fails structural validation (missing required field, unknown phase, etc.). +var ErrInvalidManifest = errors.New("backup: manifest invalid") + +// NewPhase0SnapshotManifest seeds a manifest with the Phase 0a defaults. +// Callers fill in scope (Adapters), Source/wall time and exclusions before +// passing it to WriteManifest. Adapters and Exclusions are seeded to +// non-nil zero values so the resulting manifest passes the +// "section-present" validation; callers populating individual scopes +// reach in via the now-non-nil pointer. +func NewPhase0SnapshotManifest(now time.Time) Manifest { + return Manifest{ + FormatVersion: CurrentFormatVersion, + Phase: PhasePhase0SnapshotDecode, + WallTimeISO: now.UTC().Format(time.RFC3339Nano), + Adapters: &Adapters{}, + Exclusions: &Exclusions{}, + ChecksumAlgorithm: ChecksumAlgorithmSHA256, + ChecksumFormat: ChecksumFormatSha256sum, + EncodedFilenameCharset: EncodedFilenameCharsetRFC3986, + KeySegmentMaxBytes: KeySegmentMaxBytesDefault, + S3MetaSuffix: S3MetaSuffixDefault, + S3CollisionStrategy: S3CollisionStrategyLeafDataSuffix, + DynamoDBLayout: DynamoDBLayoutPerItem, + } +} + +// WriteManifest serialises m as pretty-printed JSON to w. +// +// Pretty-printing is deliberate — MANIFEST.json is operator-facing and is +// expected to be `cat`-ed and `jq`-ed during incident response. +func WriteManifest(w io.Writer, m Manifest) error { + if err := m.validate(); err != nil { + return err + } + enc := json.NewEncoder(w) + enc.SetIndent("", " ") //nolint:mnd // 2-space indent matches `jq -.` default + enc.SetEscapeHTML(false) + if err := enc.Encode(m); err != nil { + return errors.WithStack(err) + } + return nil +} + +// ReadManifest decodes and validates a MANIFEST.json from r. The returned +// error is wrapped as ErrUnsupportedFormatVersion or ErrInvalidManifest so +// callers can branch on errors.Is. +func ReadManifest(r io.Reader) (Manifest, error) { + // Read the entire payload once so we can pre-decode just the + // format_version before strict struct decoding. Without this + // two-phase approach, a manifest produced by a newer major version + // that also changed the JSON type of a known field (e.g. `phase` + // switched from string to int) would surface as + // ErrInvalidManifest instead of ErrUnsupportedFormatVersion, + // breaking the documented version-branching contract for callers + // that key off errors.Is(err, ErrUnsupportedFormatVersion). See + // Codex P2, round 5. + payload, err := io.ReadAll(r) + if err != nil { + return Manifest{}, errors.Wrap(ErrInvalidManifest, err.Error()) + } + if err := probeManifestFormatVersion(payload); err != nil { + return Manifest{}, err + } + // Phase 2: strict struct decode on a known-supported version. + var m Manifest + dec := json.NewDecoder(bytes.NewReader(payload)) + // We intentionally do NOT call DisallowUnknownFields here. + // The format-version contract (Codex P1, follow-up) is: + // - format_version > CurrentFormatVersion -> hard refuse + // (the major break signal) + // - format_version == CurrentFormatVersion AND extra unknown + // fields appear -> a newer minor version added them; the + // older reader silently ignores. That's the documented + // same-major minor-evolution path. + // Rejecting unknown fields outright would turn every minor + // optional-field addition into a hard read failure during + // mixed-version operation. + if err := dec.Decode(&m); err != nil { + return Manifest{}, errors.Wrap(ErrInvalidManifest, err.Error()) + } + // MANIFEST.json is exactly one JSON object. Trailing bytes + // (a second object, junk, even whitespace-only padding) point at + // concatenation bugs or partial-write corruption — both of which + // must surface here rather than be silently discarded. We use + // io.Discard rather than parsing because we only care that + // nothing-decodable is present; structural validation lives in + // validate(). + if dec.More() { + return Manifest{}, errors.Wrap(ErrInvalidManifest, + "trailing bytes after manifest JSON object") + } + if err := validateExclusionsFieldsPresent(payload); err != nil { + return Manifest{}, err + } + if err := m.validate(); err != nil { + return Manifest{}, err + } + return m, nil +} + +// probeManifestFormatVersion runs the relaxed-shape format_version +// gate that ReadManifest applies before the strict struct decode. +// Splitting it into its own function keeps ReadManifest under the +// project's cyclomatic-complexity ceiling. The contract: +// +// - missing or null `format_version` -> ErrInvalidManifest +// (truncated/malformed file; Codex P2 round 8). Without this +// branch json.Unmarshal would collapse absence to zero and the +// version gate would misclassify as upgrade-required. +// - `format_version` = 0 -> ErrUnsupportedFormatVersion (the +// reserved sentinel for "no version assigned"). +// - `format_version` > CurrentFormatVersion -> +// ErrUnsupportedFormatVersion (newer producer; upgrade-required). +// - within range -> nil; the strict struct decode runs next. +func probeManifestFormatVersion(payload []byte) error { + var top map[string]json.RawMessage + if err := json.Unmarshal(payload, &top); err != nil { + return errors.Wrap(ErrInvalidManifest, err.Error()) + } + rawFV, hasFV := top["format_version"] + if !hasFV { + return errors.Wrap(ErrInvalidManifest, "format_version missing") + } + if bytes.Equal(rawFV, jsonNullLiteral) { + return errors.Wrap(ErrInvalidManifest, "format_version is null") + } + var probe struct { + FormatVersion uint32 `json:"format_version"` + } + if err := json.Unmarshal(payload, &probe); err != nil { + return errors.Wrap(ErrInvalidManifest, err.Error()) + } + if probe.FormatVersion == 0 { + return errors.Wrap(ErrUnsupportedFormatVersion, "format_version is zero") + } + if probe.FormatVersion > CurrentFormatVersion { + return errors.Wrapf(ErrUnsupportedFormatVersion, + "format_version %d > current %d (newer producer)", probe.FormatVersion, CurrentFormatVersion) + } + return nil +} + +// validateExclusionsFieldsPresent rejects manifests whose `exclusions` +// section omits any of the required boolean flags. Go's +// json.Unmarshal silently fills missing booleans with `false`, so a +// truncated or partially-corrupted manifest would otherwise pass with +// altered exclusion semantics — losing the producer-side provenance +// the section is meant to capture (Codex P2 round 7). Each flag must +// be present and not the JSON `null` literal; type validation already +// runs as part of the strict struct decode. +func validateExclusionsFieldsPresent(payload []byte) error { + var top map[string]json.RawMessage + if err := json.Unmarshal(payload, &top); err != nil { + return errors.Wrap(ErrInvalidManifest, err.Error()) + } + rawExcl, ok := top["exclusions"] + if !ok { + // validateRequiredFields surfaces the absent-section error + // with a clearer message; defer to it. + return nil + } + var excl map[string]json.RawMessage + if err := json.Unmarshal(rawExcl, &excl); err != nil { + return errors.Wrap(ErrInvalidManifest, err.Error()) + } + for _, name := range exclusionsRequiredFields { + raw, present := excl[name] + if !present { + return errors.Wrapf(ErrInvalidManifest, + "exclusions.%s missing (cannot infer producer-side default)", name) + } + if bytes.Equal(raw, jsonNullLiteral) { + return errors.Wrapf(ErrInvalidManifest, + "exclusions.%s is null", name) + } + } + return nil +} + +// exclusionsRequiredFields lists the JSON tag names of every +// Exclusions field that must be explicitly present in the manifest. +// Kept in sync with the struct definition above; a missing entry +// here would silently re-introduce the omitted-flag bug. +var exclusionsRequiredFields = [...]string{ + "include_incomplete_uploads", + "include_orphans", + "preserve_sqs_visibility", + "include_sqs_side_records", +} + +func (m Manifest) validate() error { + if err := m.validateRequiredFields(); err != nil { + return err + } + if err := m.validatePolicyFields(); err != nil { + return err + } + return m.validatePhaseSpecific() +} + +func (m Manifest) validateRequiredFields() error { + if m.FormatVersion == 0 { + return errors.Wrap(ErrInvalidManifest, "format_version is zero") + } + // WriteManifest must refuse manifests advertising a version this + // build cannot produce — without this gate, a caller mutating + // `m.FormatVersion = CurrentFormatVersion + 1` would write a + // manifest that ReadManifest in the same package then rejects as + // ErrUnsupportedFormatVersion, producing self-incompatible + // backup metadata. Codex P2 round 8. + if m.FormatVersion > CurrentFormatVersion { + return errors.Wrapf(ErrInvalidManifest, + "format_version %d > current %d (this build cannot produce that)", m.FormatVersion, CurrentFormatVersion) + } + switch m.Phase { + case PhasePhase0SnapshotDecode, PhasePhase1LivePinned: + default: + return errors.Wrapf(ErrInvalidManifest, "unknown phase %q", m.Phase) + } + if m.WallTimeISO == "" { + return errors.Wrap(ErrInvalidManifest, "wall_time_iso missing") + } + if _, err := time.Parse(time.RFC3339Nano, m.WallTimeISO); err != nil { + return errors.Wrapf(ErrInvalidManifest, "wall_time_iso unparseable: %v", err) + } + // Adapters and Exclusions are required structural sections. + // A manifest that omits either is treated as truncated/corrupted + // (Codex P2 #146 round 3). + if m.Adapters == nil { + return errors.Wrap(ErrInvalidManifest, "adapters section missing") + } + if m.Exclusions == nil { + return errors.Wrap(ErrInvalidManifest, "exclusions section missing") + } + return nil +} + +func (m Manifest) validatePolicyFields() error { + if m.ChecksumAlgorithm == "" { + return errors.Wrap(ErrInvalidManifest, "checksum_algorithm missing") + } + if m.ChecksumFormat == "" { + return errors.Wrap(ErrInvalidManifest, "checksum_format missing") + } + if m.EncodedFilenameCharset == "" { + return errors.Wrap(ErrInvalidManifest, "encoded_filename_charset missing") + } + if m.KeySegmentMaxBytes == 0 { + return errors.Wrap(ErrInvalidManifest, "key_segment_max_bytes is zero") + } + if m.S3MetaSuffix == "" { + return errors.Wrap(ErrInvalidManifest, "s3_meta_suffix missing") + } + if m.S3CollisionStrategy == "" { + return errors.Wrap(ErrInvalidManifest, "s3_collision_strategy missing") + } + if m.DynamoDBLayout != DynamoDBLayoutPerItem && m.DynamoDBLayout != DynamoDBLayoutJSONL { + return errors.Wrapf(ErrInvalidManifest, "dynamodb_layout %q unsupported", m.DynamoDBLayout) + } + return nil +} + +func (m Manifest) validatePhaseSpecific() error { + switch m.Phase { + case PhasePhase0SnapshotDecode: + if m.Live != nil { + return errors.Wrap(ErrInvalidManifest, "phase0 must not set live") + } + case PhasePhase1LivePinned: + if m.Source != nil { + return errors.Wrap(ErrInvalidManifest, "phase1 must not set source") + } + // A phase1 dump's whole point is the cluster-wide read_ts + // pin recorded under Live. A manifest that omits Live cannot + // describe its consistency point and downstream restore / + // audit logic must not silently accept it as valid (Codex + // P1 #295). + if m.Live == nil { + return errors.Wrap(ErrInvalidManifest, "phase1 must set live") + } + if m.Live.ReadTS == 0 { + return errors.Wrap(ErrInvalidManifest, "phase1 live.read_ts must be non-zero") + } + } + return nil +} diff --git a/internal/backup/manifest_test.go b/internal/backup/manifest_test.go new file mode 100644 index 00000000..9ebbe747 --- /dev/null +++ b/internal/backup/manifest_test.go @@ -0,0 +1,529 @@ +package backup + +import ( + "bytes" + "encoding/json" + "strings" + "testing" + "time" + + "github.com/cockroachdb/errors" +) + +func TestManifest_Phase0RoundTrip(t *testing.T) { + t.Parallel() + now := time.Date(2026, 4, 29, 15, 42, 11, 94_000_000, time.UTC) + m := NewPhase0SnapshotManifest(now) + m.ElastickvVersion = "v1.7.3" + m.ClusterID = "ek-prod-us-east-1" + m.SnapshotIndex = 18432021 + m.LastCommitTS = 4517352099840000 + m.Source = &Source{FSMPath: "/data/fsm-snap/0000000000000064.fsm", FSMCRC32C: "deadbeef"} + m.Adapters = &Adapters{ + DynamoDB: &Adapter{Tables: []string{"orders", "users"}}, + S3: &Adapter{Buckets: []string{"photos"}}, + Redis: &Adapter{Databases: []uint32{0}}, + SQS: &Adapter{Queues: []string{"orders-fifo.fifo"}}, + } + m.Exclusions = &Exclusions{} // all defaults + + var buf bytes.Buffer + if err := WriteManifest(&buf, m); err != nil { + t.Fatalf("WriteManifest: %v", err) + } + + got, err := ReadManifest(&buf) + if err != nil { + t.Fatalf("ReadManifest: %v", err) + } + if got.Phase != PhasePhase0SnapshotDecode { + t.Fatalf("Phase = %q, want %q", got.Phase, PhasePhase0SnapshotDecode) + } + if got.SnapshotIndex != m.SnapshotIndex { + t.Fatalf("SnapshotIndex = %d, want %d", got.SnapshotIndex, m.SnapshotIndex) + } + if got.Source == nil || got.Source.FSMPath != m.Source.FSMPath { + t.Fatalf("Source.FSMPath = %v, want %v", got.Source, m.Source) + } + if got.Live != nil { + t.Fatalf("phase0 manifest must not set Live, got %+v", got.Live) + } +} + +func TestManifest_Phase1MustSetLive(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + m.Phase = PhasePhase1LivePinned + m.Source = nil + // Live deliberately omitted -- the gap Codex P1 #295 caught. + var buf bytes.Buffer + err := WriteManifest(&buf, m) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest", err) + } +} + +func TestManifest_Phase1RejectsZeroReadTS(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + m.Phase = PhasePhase1LivePinned + m.Source = nil + m.Live = &Live{ReadTS: 0} + var buf bytes.Buffer + err := WriteManifest(&buf, m) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest for zero read_ts", err) + } +} + +func TestManifest_Phase1WithLiveAndNonZeroReadTSIsValid(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + m.Phase = PhasePhase1LivePinned + m.Source = nil + m.Live = &Live{ReadTS: 12345} + var buf bytes.Buffer + if err := WriteManifest(&buf, m); err != nil { + t.Fatalf("WriteManifest: %v", err) + } + got, err := ReadManifest(&buf) + if err != nil { + t.Fatalf("ReadManifest: %v", err) + } + if got.Live == nil || got.Live.ReadTS != 12345 { + t.Fatalf("Live mismatch: %+v", got.Live) + } +} + +func TestManifest_Phase1MustNotSetSource(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + m.Phase = PhasePhase1LivePinned + m.Source = &Source{FSMPath: "ignored"} + var buf bytes.Buffer + err := WriteManifest(&buf, m) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("WriteManifest err=%v want ErrInvalidManifest", err) + } +} + +func TestManifest_Phase0MustNotSetLive(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + m.Live = &Live{ReadTS: 12345} + var buf bytes.Buffer + err := WriteManifest(&buf, m) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("WriteManifest err=%v want ErrInvalidManifest", err) + } +} + +func TestReadManifest_RejectsFutureFormatVersion(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + m.FormatVersion = CurrentFormatVersion + 1 + // validate() runs before encoding, so go around it. + body, _ := json.Marshal(m) + _, err := ReadManifest(bytes.NewReader(body)) + if !errors.Is(err, ErrUnsupportedFormatVersion) { + t.Fatalf("err=%v want ErrUnsupportedFormatVersion", err) + } +} + +// TestReadManifest_FutureMajorVersionTakesPrecedenceOverTypeMismatch is the +// regression test for Codex P2 round 5: a newer-major manifest that also +// changes the JSON type of a known field (e.g. `phase` from string to int) +// must surface as ErrUnsupportedFormatVersion, not ErrInvalidManifest. The +// version-branching contract advertised to callers (errors.Is(err, +// ErrUnsupportedFormatVersion) means "upgrade required") only holds if the +// format_version probe runs before the strict struct decode. +func TestReadManifest_FutureMajorVersionTakesPrecedenceOverTypeMismatch(t *testing.T) { + t.Parallel() + body := `{ + "format_version": 999, + "phase": 42, + "wall_time_iso": "2026-04-29T00:00:00Z", + "adapters": {"dynamodb":{}, "s3":{}, "redis":{}, "sqs":{}}, + "exclusions": {"include_incomplete_uploads":false,"include_orphans":false,"preserve_sqs_visibility":false,"include_sqs_side_records":false}, + "checksum_algorithm": "sha256", + "checksum_format": "sha256sum", + "encoded_filename_charset": "rfc3986-unreserved-plus-percent", + "key_segment_max_bytes": 240, + "s3_meta_suffix": ".elastickv-meta.json", + "s3_collision_strategy": "leaf-data-suffix", + "dynamodb_layout": "per-item" + }` + _, err := ReadManifest(strings.NewReader(body)) + if !errors.Is(err, ErrUnsupportedFormatVersion) { + t.Fatalf("err=%v want ErrUnsupportedFormatVersion (must precede strict decode)", err) + } +} + +func TestReadManifest_RejectsZeroFormatVersion(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + m.FormatVersion = 0 + body, _ := json.Marshal(m) + _, err := ReadManifest(bytes.NewReader(body)) + if !errors.Is(err, ErrUnsupportedFormatVersion) { + t.Fatalf("err=%v want ErrUnsupportedFormatVersion", err) + } +} + +// TestReadManifest_RejectsMissingFormatVersion is the regression for +// Codex P2 round 8: an absent `format_version` unmarshals into uint32 +// zero, which the version gate would otherwise misclassify as +// ErrUnsupportedFormatVersion ("upgrade required"). A truncated / +// malformed manifest that dropped the field belongs in the +// ErrInvalidManifest branch instead. +func TestReadManifest_RejectsMissingFormatVersion(t *testing.T) { + t.Parallel() + body := `{ + "phase": "phase0-snapshot-decode", + "wall_time_iso": "2026-04-29T00:00:00Z", + "adapters": {}, + "exclusions": {"include_incomplete_uploads":false,"include_orphans":false,"preserve_sqs_visibility":false,"include_sqs_side_records":false}, + "checksum_algorithm": "sha256", + "checksum_format": "sha256sum", + "encoded_filename_charset": "rfc3986-unreserved-plus-percent", + "key_segment_max_bytes": 240, + "s3_meta_suffix": ".elastickv-meta.json", + "s3_collision_strategy": "leaf-data-suffix", + "dynamodb_layout": "per-item" + }` + _, err := ReadManifest(strings.NewReader(body)) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest", err) + } + if errors.Is(err, ErrUnsupportedFormatVersion) { + t.Fatalf("missing format_version must not surface as upgrade-required: %v", err) + } +} + +// TestReadManifest_RejectsNullFormatVersion mirrors the missing-field +// case for `"format_version": null`. +func TestReadManifest_RejectsNullFormatVersion(t *testing.T) { + t.Parallel() + body := `{ + "format_version": null, + "phase": "phase0-snapshot-decode", + "wall_time_iso": "2026-04-29T00:00:00Z", + "adapters": {}, + "exclusions": {"include_incomplete_uploads":false,"include_orphans":false,"preserve_sqs_visibility":false,"include_sqs_side_records":false}, + "checksum_algorithm": "sha256", + "checksum_format": "sha256sum", + "encoded_filename_charset": "rfc3986-unreserved-plus-percent", + "key_segment_max_bytes": 240, + "s3_meta_suffix": ".elastickv-meta.json", + "s3_collision_strategy": "leaf-data-suffix", + "dynamodb_layout": "per-item" + }` + _, err := ReadManifest(strings.NewReader(body)) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest", err) + } +} + +// TestWriteManifest_RejectsFutureFormatVersion is the regression for +// Codex P2 round 8: WriteManifest must refuse manifests advertising +// a version this build cannot produce. Without this gate, a caller +// mutating m.FormatVersion = CurrentFormatVersion + 1 writes a +// manifest that the same package's ReadManifest then refuses, +// producing self-incompatible backup metadata. +func TestWriteManifest_RejectsFutureFormatVersion(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + m.FormatVersion = CurrentFormatVersion + 1 + var buf bytes.Buffer + err := WriteManifest(&buf, m) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("WriteManifest err=%v want ErrInvalidManifest for future format_version", err) + } +} + +func TestReadManifest_AcceptsUnknownFieldsForSameMajorMinorEvolution(t *testing.T) { + t.Parallel() + // Same-major minor evolution: a newer producer adds an optional + // field; older readers must silently ignore it rather than fail + // the read. Codex P1 #205 (round 2) caught the earlier + // DisallowUnknownFields strictness which broke the documented + // same-major compatibility model. + body := `{ + "format_version": 1, + "phase": "phase0-snapshot-decode", + "wall_time_iso": "2026-04-29T00:00:00Z", + "adapters": {"dynamodb":{}, "s3":{}, "redis":{}, "sqs":{}}, + "exclusions": {"include_incomplete_uploads":false,"include_orphans":false,"preserve_sqs_visibility":false,"include_sqs_side_records":false}, + "checksum_algorithm": "sha256", + "checksum_format": "sha256sum", + "encoded_filename_charset": "rfc3986-unreserved-plus-percent", + "key_segment_max_bytes": 240, + "s3_meta_suffix": ".elastickv-meta.json", + "s3_collision_strategy": "leaf-data-suffix", + "dynamodb_layout": "per-item", + "future_optional_field": "added in v1.minor" + }` + got, err := ReadManifest(strings.NewReader(body)) + if err != nil { + t.Fatalf("unknown optional field must be silently accepted: %v", err) + } + if got.FormatVersion != 1 { + t.Fatalf("format_version = %d", got.FormatVersion) + } +} + +func TestReadManifest_RejectsUnknownPhase(t *testing.T) { + t.Parallel() + body := `{ + "format_version": 1, + "phase": "phase99-future", + "wall_time_iso": "2026-04-29T00:00:00Z", + "adapters": {"dynamodb":{}, "s3":{}, "redis":{}, "sqs":{}}, + "exclusions": {"include_incomplete_uploads":false,"include_orphans":false,"preserve_sqs_visibility":false,"include_sqs_side_records":false}, + "checksum_algorithm": "sha256", + "checksum_format": "sha256sum", + "encoded_filename_charset": "rfc3986-unreserved-plus-percent", + "key_segment_max_bytes": 240, + "s3_meta_suffix": ".elastickv-meta.json", + "s3_collision_strategy": "leaf-data-suffix", + "dynamodb_layout": "per-item" + }` + _, err := ReadManifest(strings.NewReader(body)) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest", err) + } +} + +func TestReadManifest_RejectsBadWallTime(t *testing.T) { + t.Parallel() + body := `{ + "format_version": 1, + "phase": "phase0-snapshot-decode", + "wall_time_iso": "not-a-date", + "adapters": {"dynamodb":{}, "s3":{}, "redis":{}, "sqs":{}}, + "exclusions": {"include_incomplete_uploads":false,"include_orphans":false,"preserve_sqs_visibility":false,"include_sqs_side_records":false}, + "checksum_algorithm": "sha256", + "checksum_format": "sha256sum", + "encoded_filename_charset": "rfc3986-unreserved-plus-percent", + "key_segment_max_bytes": 240, + "s3_meta_suffix": ".elastickv-meta.json", + "s3_collision_strategy": "leaf-data-suffix", + "dynamodb_layout": "per-item" + }` + _, err := ReadManifest(strings.NewReader(body)) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest", err) + } +} + +func TestReadManifest_RejectsUnsupportedDynamoDBLayout(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + m.DynamoDBLayout = "bogus" + body, _ := json.Marshal(m) + _, err := ReadManifest(bytes.NewReader(body)) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest", err) + } +} + +func TestNewPhase0SnapshotManifest_DefaultsArePopulated(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + if m.FormatVersion != CurrentFormatVersion { + t.Fatalf("FormatVersion = %d, want %d", m.FormatVersion, CurrentFormatVersion) + } + if m.Phase != PhasePhase0SnapshotDecode { + t.Fatalf("Phase = %q, want %q", m.Phase, PhasePhase0SnapshotDecode) + } + if m.ChecksumAlgorithm != ChecksumAlgorithmSHA256 { + t.Fatalf("ChecksumAlgorithm = %q, want %q", m.ChecksumAlgorithm, ChecksumAlgorithmSHA256) + } + if m.ChecksumFormat != ChecksumFormatSha256sum { + t.Fatalf("ChecksumFormat = %q, want %q", m.ChecksumFormat, ChecksumFormatSha256sum) + } + if m.S3MetaSuffix != S3MetaSuffixDefault { + t.Fatalf("S3MetaSuffix = %q", m.S3MetaSuffix) + } + if m.S3CollisionStrategy != S3CollisionStrategyLeafDataSuffix { + t.Fatalf("S3CollisionStrategy = %q", m.S3CollisionStrategy) + } + if m.DynamoDBLayout != DynamoDBLayoutPerItem { + t.Fatalf("DynamoDBLayout = %q", m.DynamoDBLayout) + } + if m.KeySegmentMaxBytes != KeySegmentMaxBytesDefault { + t.Fatalf("KeySegmentMaxBytes = %d, want %d", m.KeySegmentMaxBytes, KeySegmentMaxBytesDefault) + } +} + +func TestReadManifest_RejectsTrailingBytes(t *testing.T) { + t.Parallel() + // Two manifests concatenated; the second must surface as a + // trailing-bytes error rather than be silently discarded — Codex + // P2 #194. + m := NewPhase0SnapshotManifest(time.Now()) + body, err := json.Marshal(m) + if err != nil { + t.Fatalf("marshal: %v", err) + } + bad := append([]byte{}, body...) + bad = append(bad, body...) + _, err = ReadManifest(bytes.NewReader(bad)) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest on trailing bytes", err) + } +} + +func TestReadManifest_RejectsTrailingNonWhitespace(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + body, err := json.Marshal(m) + if err != nil { + t.Fatalf("marshal: %v", err) + } + bad := append([]byte{}, body...) + bad = append(bad, []byte("garbage")...) + _, err = ReadManifest(bytes.NewReader(bad)) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest on trailing garbage", err) + } +} + +func TestAdaptersStruct_NilVsEmptyDistinguishedOnDisk(t *testing.T) { + t.Parallel() + // Gemini #98: an excluded adapter (nil pointer) must serialize + // differently from an included-but-empty adapter (non-nil pointer + // to Adapter{}). + excluded := Adapters{ + DynamoDB: &Adapter{}, // present, no scopes + // S3 / Redis / SQS left nil — out of scope + } + body, err := json.Marshal(excluded) + if err != nil { + t.Fatal(err) + } + out := string(body) + if !strings.Contains(out, `"dynamodb":{}`) { + t.Fatalf("included-empty must serialise as `dynamodb:{}`, got %s", out) + } + if strings.Contains(out, `"s3"`) || strings.Contains(out, `"redis"`) || strings.Contains(out, `"sqs"`) { + t.Fatalf("excluded adapters must be omitted, got %s", out) + } +} + +func TestReadManifest_RejectsMissingAdapters(t *testing.T) { + t.Parallel() + // Adapters section omitted from the JSON entirely — Codex P2 + // #146 round 3. With Adapters as a pointer the omission decodes + // as nil; validation must surface ErrInvalidManifest rather than + // treat an empty zero-value section as valid. + body := `{ + "format_version": 1, + "phase": "phase0-snapshot-decode", + "wall_time_iso": "2026-04-29T00:00:00Z", + "exclusions": {"include_incomplete_uploads":false,"include_orphans":false,"preserve_sqs_visibility":false,"include_sqs_side_records":false}, + "checksum_algorithm": "sha256", + "checksum_format": "sha256sum", + "encoded_filename_charset": "rfc3986-unreserved-plus-percent", + "key_segment_max_bytes": 240, + "s3_meta_suffix": ".elastickv-meta.json", + "s3_collision_strategy": "leaf-data-suffix", + "dynamodb_layout": "per-item" + }` + _, err := ReadManifest(strings.NewReader(body)) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest for missing adapters", err) + } +} + +func TestReadManifest_RejectsMissingExclusions(t *testing.T) { + t.Parallel() + body := `{ + "format_version": 1, + "phase": "phase0-snapshot-decode", + "wall_time_iso": "2026-04-29T00:00:00Z", + "adapters": {}, + "checksum_algorithm": "sha256", + "checksum_format": "sha256sum", + "encoded_filename_charset": "rfc3986-unreserved-plus-percent", + "key_segment_max_bytes": 240, + "s3_meta_suffix": ".elastickv-meta.json", + "s3_collision_strategy": "leaf-data-suffix", + "dynamodb_layout": "per-item" + }` + _, err := ReadManifest(strings.NewReader(body)) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest for missing exclusions", err) + } +} + +// TestReadManifest_RejectsMissingExclusionFlag is the regression for +// Codex P2 round 7: each Exclusions sub-field must be explicitly +// present in the JSON. Boolean fields default to `false` in Go, so an +// omitted `preserve_sqs_visibility` would otherwise pass the strict +// decode and silently look "off" — losing producer-side provenance. +func TestReadManifest_RejectsMissingExclusionFlag(t *testing.T) { + t.Parallel() + cases := []struct { + name string + excl string + }{ + { + "missing include_incomplete_uploads", + `{"include_orphans":false,"preserve_sqs_visibility":false,"include_sqs_side_records":false}`, + }, + { + "missing include_orphans", + `{"include_incomplete_uploads":false,"preserve_sqs_visibility":false,"include_sqs_side_records":false}`, + }, + { + "missing preserve_sqs_visibility", + `{"include_incomplete_uploads":false,"include_orphans":false,"include_sqs_side_records":false}`, + }, + { + "missing include_sqs_side_records", + `{"include_incomplete_uploads":false,"include_orphans":false,"preserve_sqs_visibility":false}`, + }, + { + "explicit-null preserve_sqs_visibility", + `{"include_incomplete_uploads":false,"include_orphans":false,"preserve_sqs_visibility":null,"include_sqs_side_records":false}`, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + body := `{ + "format_version": 1, + "phase": "phase0-snapshot-decode", + "wall_time_iso": "2026-04-29T00:00:00Z", + "adapters": {}, + "exclusions": ` + tc.excl + `, + "checksum_algorithm": "sha256", + "checksum_format": "sha256sum", + "encoded_filename_charset": "rfc3986-unreserved-plus-percent", + "key_segment_max_bytes": 240, + "s3_meta_suffix": ".elastickv-meta.json", + "s3_collision_strategy": "leaf-data-suffix", + "dynamodb_layout": "per-item" + }` + _, err := ReadManifest(strings.NewReader(body)) + if !errors.Is(err, ErrInvalidManifest) { + t.Fatalf("err=%v want ErrInvalidManifest", err) + } + }) + } +} + +func TestWriteManifest_ProducesPrettyJSON(t *testing.T) { + t.Parallel() + m := NewPhase0SnapshotManifest(time.Now()) + var buf bytes.Buffer + if err := WriteManifest(&buf, m); err != nil { + t.Fatalf("WriteManifest: %v", err) + } + out := buf.String() + // Pretty: contains newlines and the 2-space indent we configured. + if !strings.Contains(out, "\n \"format_version\"") { + t.Fatalf("expected pretty 2-space indent in output:\n%s", out) + } +}