Skip to content

Commit c379a31

Browse files
authored
backup: Phase 0b M1 — snapshot encoder core (EKVPBBL1 writer + MVCC re-encoding) (#825)
## Summary **Phase 0b M1 — snapshot encoder core.** The format-level machinery every per-adapter reverse-encoder (M2–M5) will share, the inverse of `decode.go`'s dispatch. No adapter directory-walking yet — that lands per-adapter in the following milestones. Design: `docs/design/2026_05_25_proposed_snapshot_logical_encoder.md` (merged in #823). `internal/backup/encode.go`: - **`ResolveCommitTS`** — `--last-commit-ts` override resolution with the fail-closed contract: an override is accepted only when `>= manifest.last_commit_ts` (raising the restored HLC ceiling is safe; lowering it risks a post-restart timestamp colliding with a restored row → `ErrLastCommitTSRegression`). - **`encodeMVCCKey` / `encodeMVCCValue`** — exact inverse of `snapshot_reader.go::decodeSnapshotEntry`: `<userKey><^commitTS BE>` key, `<flags=0><expireAt LE><body>` value. Reuses the same MVCC constants the decode direction defines so the two cannot drift. - **`snapshotBuilder`** — accumulates MVCC-framed entries; fails closed on duplicate encoded keys (`ErrEncodeDuplicateKey`) and per-entry size-cap violations (`ErrEncodeKeyTooLarge` / `ErrEncodeValueTooLarge`); `WriteTo` emits the sorted EKVPBBL1 stream (magic + `lastCommitTS` LE + length-prefixed KV pairs, **no checksum footer** — matches `store/snapshot_pebble.go::WriteTo`). - **`DecodeLiveEntries`** — the round-trip self-test primitive: decodes a just-written buffer back through `ReadSnapshot` so a caller never emits an unloadable `.fsm`. ## Risk New code only; no existing behavior changed. The package's offline-tool boundary is preserved (no live-cluster imports). `ResolveCommitTS` introduces fail-closed semantics but has no production caller yet — the only caller is the test; the production caller lands with the M6 CLI. ## Self-review (5 lenses) - **Data loss**: encoder writes live records only, never tombstones; `DecodeLiveEntries` verifies decodability before finalize (the design's round-trip gate). - **Concurrency/distributed**: `snapshotBuilder` is single-goroutine by contract (adapters feed sequentially); no shared state. - **Performance**: in-memory sort; worst-case bound documented in the design (external-sort follow-up milestone). - **Consistency**: uniform `commitTS` stamping keeps every key at/below the restored HLC ceiling; fail-closed override prevents a ceiling regression. - **Test coverage**: MVCC key/value layout, decode round-trip via the **real** `decodeSnapshotEntry`, single-Redis-string `build→write→DecodeLiveEntries` round-trip, sorted output regardless of Add order, duplicate-key rejection, oversize-key rejection, `ResolveCommitTS` override table (incl. fail-closed regression). ## Test plan - [x] `go test ./internal/backup/` (full package, no regression) - [x] `golangci-lint run internal/backup/` (0 issues)
2 parents a6ae626 + df2407a commit c379a31

2 files changed

Lines changed: 617 additions & 0 deletions

File tree

internal/backup/encode.go

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
package backup
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"io"
7+
"sort"
8+
9+
"github.com/cockroachdb/errors"
10+
)
11+
12+
// encode.go is the Phase 0b encoder core (design:
13+
// docs/design/2026_05_25_proposed_snapshot_logical_encoder.md). It is
14+
// the inverse of decode.go: per-adapter reverse encoders (added in the
15+
// Redis/DynamoDB/S3/SQS milestones) hand reconstructed internal
16+
// (userKey, userValue, expireAt) records to a snapshotBuilder, which
17+
// MVCC-frames them, sorts by encoded key, and streams the native
18+
// EKVPBBL1 `.fsm` the live store loads.
19+
//
20+
// This file owns only the format-level machinery that every adapter
21+
// shares:
22+
//
23+
// - commit-timestamp resolution (MANIFEST.last_commit_ts plus the
24+
// fail-closed `--last-commit-ts` override),
25+
// - MVCC re-encoding (the inverse of snapshot_reader.go's
26+
// decodeSnapshotEntry: invTS key suffix + value header),
27+
// - duplicate-key rejection and per-entry size caps,
28+
// - the sorted EKVPBBL1 writer (matching store/snapshot_pebble.go
29+
// WriteTo — magic + lastCommitTS + sorted KV stream, no checksum
30+
// footer),
31+
// - a round-trip self-test harness that decodes the just-written
32+
// bytes back through ReadSnapshot so a caller never emits an
33+
// unloadable `.fsm`.
34+
//
35+
// The MVCC constants (snapshotTSSize, snapshotValueHeaderSize,
36+
// Max*EncodedKeySize, PebbleSnapshotMagic, the flag masks) are the
37+
// same ones snapshot_reader.go defines for the decode direction; encode
38+
// reuses them so the two directions cannot drift.
39+
40+
// ErrEncodeDuplicateKey is returned when two reconstructed records
41+
// MVCC-encode to the same key. The live store's keyspace is a set;
42+
// a duplicate means an adapter reverse-encoder produced colliding
43+
// internal keys, which would make the loaded snapshot
44+
// order-dependent. The encoder fails closed rather than emit a
45+
// `.fsm` whose Pebble image depends on insertion order.
46+
var ErrEncodeDuplicateKey = errors.New("backup: duplicate encoded key in snapshot build")
47+
48+
// ErrEncodeKeyTooLarge / ErrEncodeValueTooLarge mirror the decode-side
49+
// MaxSnapshotEncodedKeySize / MaxSnapshotEncodedValueSize caps. A
50+
// reconstructed entry exceeding them would produce a `.fsm` the live
51+
// restore path rejects, so the encoder fails closed before writing a
52+
// single byte rather than emit an unloadable file.
53+
var ErrEncodeKeyTooLarge = errors.New("backup: encoded key length exceeds limit")
54+
var ErrEncodeValueTooLarge = errors.New("backup: encoded value length exceeds limit")
55+
56+
// ErrSnapshotBuilderReused is returned by WriteTo when it is called
57+
// more than once on the same builder. A builder is single-use (one
58+
// per encode run); a second WriteTo would re-emit the already-written
59+
// entries, producing a valid-but-unintended stream. Enforced so the
60+
// per-adapter feed loops in later milestones cannot silently double-
61+
// emit (claude review on PR #825).
62+
var ErrSnapshotBuilderReused = errors.New("backup: snapshotBuilder.WriteTo called more than once")
63+
64+
// ErrLastCommitTSRegression is returned by ResolveCommitTS when a
65+
// `--last-commit-ts` override is older than MANIFEST.last_commit_ts.
66+
// Seeding the restored node's HLC ceiling below a timestamp already
67+
// durable in the dump would let a post-restart leader re-issue a
68+
// timestamp at-or-below a restored row's commit ts — the exact
69+
// HLC-ceiling regression the design's §"MVCC re-encoding" forbids.
70+
// Raising the ceiling (T >= manifest) is always safe; lowering it is
71+
// not, so the override is accepted in one direction only.
72+
var ErrLastCommitTSRegression = errors.New("backup: --last-commit-ts override is older than manifest last_commit_ts")
73+
74+
// ResolveCommitTS returns the commit timestamp the encoder stamps on
75+
// every reconstructed key (design §"MVCC re-encoding": uniform
76+
// stamping). manifestTS is MANIFEST.last_commit_ts; override is the
77+
// optional `--last-commit-ts` value (nil = no override).
78+
//
79+
// Fail-closed contract: an override is accepted only when it is
80+
// >= manifestTS (raising the restored HLC ceiling is safe; lowering
81+
// it risks a post-restart timestamp colliding with a restored row).
82+
// The returned value is used verbatim for BOTH the EKVPBBL1 header
83+
// and every key's invTS, so the two never disagree.
84+
func ResolveCommitTS(manifestTS uint64, override *uint64) (uint64, error) {
85+
if override == nil {
86+
return manifestTS, nil
87+
}
88+
if *override < manifestTS {
89+
return 0, errors.Wrapf(ErrLastCommitTSRegression,
90+
"override %d < manifest %d", *override, manifestTS)
91+
}
92+
return *override, nil
93+
}
94+
95+
// encodeMVCCKey is the inverse of the key half of
96+
// snapshot_reader.go::decodeSnapshotEntry: it appends the 8-byte
97+
// big-endian inverted-timestamp suffix the live store's
98+
// fillEncodedKey writes. invTS = ^commitTS so that, under Pebble's
99+
// ascending byte order, newer (higher-ts) versions of a user key sort
100+
// before older ones — the MVCC ordering the live store relies on.
101+
func encodeMVCCKey(userKey []byte, commitTS uint64) []byte {
102+
out := make([]byte, len(userKey)+snapshotTSSize)
103+
copy(out, userKey)
104+
binary.BigEndian.PutUint64(out[len(userKey):], ^commitTS)
105+
return out
106+
}
107+
108+
// encodeMVCCValue is the inverse of the value half of
109+
// decodeSnapshotEntry: it prepends the 9-byte value header
110+
// (flags byte + 8-byte little-endian expireAt). Phase 0b emits live,
111+
// cleartext, non-tombstone records only, so flags is always zero.
112+
func encodeMVCCValue(userValue []byte, expireAt uint64) []byte {
113+
out := make([]byte, snapshotValueHeaderSize+len(userValue))
114+
out[0] = 0 // flags: tombstone=0, encryption_state=cleartext, reserved=0
115+
binary.LittleEndian.PutUint64(out[1:snapshotValueHeaderSize], expireAt)
116+
copy(out[snapshotValueHeaderSize:], userValue)
117+
return out
118+
}
119+
120+
// encodedKV is one fully MVCC-framed entry held by snapshotBuilder
121+
// until the sorted write.
122+
type encodedKV struct {
123+
key []byte
124+
val []byte
125+
}
126+
127+
// snapshotBuilder accumulates MVCC-framed entries and writes the
128+
// sorted EKVPBBL1 stream. One per encode run. Not safe for concurrent
129+
// use — adapters feed it sequentially from the directory-tree walk.
130+
type snapshotBuilder struct {
131+
commitTS uint64
132+
entries []encodedKV
133+
seen map[string]struct{}
134+
written bool
135+
}
136+
137+
// newSnapshotBuilder constructs a builder that stamps every key with
138+
// commitTS (the value ResolveCommitTS returned).
139+
func newSnapshotBuilder(commitTS uint64) *snapshotBuilder {
140+
return &snapshotBuilder{
141+
commitTS: commitTS,
142+
seen: make(map[string]struct{}),
143+
}
144+
}
145+
146+
// Add MVCC-frames one reconstructed live record and stages it for the
147+
// sorted write. Applies the per-entry size caps and duplicate-key
148+
// rejection before retaining the bytes, so a violating record fails
149+
// the whole encode rather than producing an unloadable or
150+
// order-dependent `.fsm`. userKey/userValue are copied — callers may
151+
// reuse their buffers after Add returns.
152+
func (b *snapshotBuilder) Add(userKey, userValue []byte, expireAt uint64) error {
153+
// A builder is single-use. Once WriteTo has flushed, any further
154+
// Add would stage entries that can never be written (the second
155+
// WriteTo fails closed before re-sorting), silently dropping
156+
// records. Reject with the same sentinel WriteTo uses so a caller
157+
// reusing an exhausted builder gets one consistent signal
158+
// regardless of which method it calls (claude review on PR #825 —
159+
// a silent-data-loss footgun for the M2-M5 adapter feed loops).
160+
if b.written {
161+
return errors.WithStack(ErrSnapshotBuilderReused)
162+
}
163+
// Size-check from the user-buffer lengths before allocating the
164+
// framed buffers — encKey = userKey + snapshotTSSize, encVal =
165+
// snapshotValueHeaderSize + userValue — so an oversize record
166+
// fails closed without a wasted allocation (claude review nit).
167+
if uint64(len(userKey))+snapshotTSSize > MaxSnapshotEncodedKeySize {
168+
return errors.Wrapf(ErrEncodeKeyTooLarge,
169+
"length %d > %d", len(userKey)+snapshotTSSize, MaxSnapshotEncodedKeySize)
170+
}
171+
if uint64(len(userValue))+snapshotValueHeaderSize > MaxSnapshotEncodedValueSize {
172+
return errors.Wrapf(ErrEncodeValueTooLarge,
173+
"length %d > %d", len(userValue)+snapshotValueHeaderSize, MaxSnapshotEncodedValueSize)
174+
}
175+
key := encodeMVCCKey(userKey, b.commitTS)
176+
if _, dup := b.seen[string(key)]; dup {
177+
return errors.Wrapf(ErrEncodeDuplicateKey, "userKey %q", userKey)
178+
}
179+
b.seen[string(key)] = struct{}{}
180+
b.entries = append(b.entries, encodedKV{key: key, val: encodeMVCCValue(userValue, expireAt)})
181+
return nil
182+
}
183+
184+
// Len reports how many entries have been staged.
185+
func (b *snapshotBuilder) Len() int { return len(b.entries) }
186+
187+
// WriteTo sorts the staged entries by encoded key (ascending byte
188+
// order, matching the live Pebble-snapshot iteration order) and writes
189+
// the native EKVPBBL1 stream: magic + lastCommitTS (LE) + length-
190+
// prefixed (key, value) pairs. There is no checksum footer — the
191+
// format terminates on a clean EOF (store/snapshot_pebble.go WriteTo,
192+
// internal/backup/snapshot_reader.go ReadSnapshot).
193+
func (b *snapshotBuilder) WriteTo(w io.Writer) (int64, error) {
194+
if b.written {
195+
return 0, errors.WithStack(ErrSnapshotBuilderReused)
196+
}
197+
b.written = true
198+
sort.Slice(b.entries, func(i, j int) bool {
199+
return bytes.Compare(b.entries[i].key, b.entries[j].key) < 0
200+
})
201+
cw := &countingWriter{w: w}
202+
if _, err := cw.Write([]byte(PebbleSnapshotMagic)); err != nil {
203+
return cw.n, errors.WithStack(err)
204+
}
205+
if err := binary.Write(cw, binary.LittleEndian, b.commitTS); err != nil {
206+
return cw.n, errors.WithStack(err)
207+
}
208+
for i := range b.entries {
209+
if err := writeLengthPrefixed(cw, b.entries[i].key); err != nil {
210+
return cw.n, err
211+
}
212+
if err := writeLengthPrefixed(cw, b.entries[i].val); err != nil {
213+
return cw.n, err
214+
}
215+
}
216+
return cw.n, nil
217+
}
218+
219+
// writeLengthPrefixed writes an 8-byte little-endian length followed
220+
// by the bytes — the per-field framing ReadSnapshot's readEntryLen
221+
// consumes.
222+
func writeLengthPrefixed(w io.Writer, b []byte) error {
223+
if err := binary.Write(w, binary.LittleEndian, uint64(len(b))); err != nil {
224+
return errors.WithStack(err)
225+
}
226+
if _, err := w.Write(b); err != nil {
227+
return errors.WithStack(err)
228+
}
229+
return nil
230+
}
231+
232+
// countingWriter mirrors store/snapshot_pebble.go's helper so WriteTo
233+
// can report the byte count without importing the store package
234+
// (the offline-tool boundary the design requires).
235+
type countingWriter struct {
236+
w io.Writer
237+
n int64
238+
}
239+
240+
func (w *countingWriter) Write(p []byte) (int, error) {
241+
n, err := w.w.Write(p)
242+
w.n += int64(n)
243+
if err != nil {
244+
return n, errors.WithStack(err)
245+
}
246+
return n, nil
247+
}
248+
249+
// RoundTripEntry is one live record recovered by DecodeLiveEntries.
250+
// The byte slices are owned by the caller (cloned out of the reader's
251+
// scratch buffer).
252+
type RoundTripEntry struct {
253+
UserKey []byte
254+
UserValue []byte
255+
ExpireAt uint64
256+
}
257+
258+
// DecodeLiveEntries decodes an EKVPBBL1 stream through ReadSnapshot
259+
// and returns its live (non-tombstone) entries plus the header. It is
260+
// the round-trip self-test primitive from the design's §"Round-trip
261+
// self-test": the encoder decodes its own just-written bytes and
262+
// compares the recovered records against what it fed the builder
263+
// before committing the final `.fsm` to disk, so a node never
264+
// receives an unloadable snapshot.
265+
//
266+
// Tombstones are skipped (the encoder never writes them, so seeing one
267+
// would indicate a corrupted build; surfacing only live records keeps
268+
// the comparison symmetric with what Add accepts). Byte slices are
269+
// cloned because ReadSnapshot reuses its scratch buffers across the
270+
// callback.
271+
func DecodeLiveEntries(r io.Reader) ([]RoundTripEntry, SnapshotHeader, error) {
272+
var out []RoundTripEntry
273+
hdr, err := ReadSnapshotWithHeader(r, func(_ SnapshotHeader, e SnapshotEntry) error {
274+
if e.Tombstone {
275+
return nil
276+
}
277+
out = append(out, RoundTripEntry{
278+
UserKey: bytes.Clone(e.UserKey),
279+
UserValue: bytes.Clone(e.UserValue),
280+
ExpireAt: e.ExpireAt,
281+
})
282+
return nil
283+
})
284+
if err != nil {
285+
return nil, SnapshotHeader{}, err
286+
}
287+
return out, hdr, nil
288+
}

0 commit comments

Comments
 (0)