Skip to content

Commit 42fb7ba

Browse files
committed
backup: snapshot_reader (Phase 0a foundation for snapshot-decode binary)
Adds internal/backup/snapshot_reader.go: the .fsm file parser that underpins the cmd/elastickv-snapshot-decode binary (design docs/design/2026_04_29_proposed_snapshot_logical_decoder.md lines 440-491). Mirrors the native Pebble snapshot format produced by store/lsm_store.go::pebbleSnapshotMagic + restoreBatchLoopInto. File shape consumed: [8 bytes] magic "EKVPBBL1" [8 bytes] lastCommitTS (LittleEndian uint64) repeated: [8 bytes] keyLen (LittleEndian uint64) [keyLen] encoded key = <userKey><invTS(8 BE)> [8 bytes] valLen (LittleEndian uint64) [valLen] encoded value = <flags(1)><expireAt(8 LE)><body> Per-entry decoding strips: - The 8-byte inverted-TS suffix from the key (recovers commit_ts via XOR-inversion across the math.MaxUint64 boundary). - The 9-byte value header from the value (surfaces tombstone flag, encryption_state, expireAt). API surface (ReadSnapshot + SnapshotHeader + SnapshotEntry + sentinel errors). Callback-based so callers can stream multi-GB snapshots without buffering the whole file in memory; the emitted SnapshotEntry's byte slices alias scratch buffers and the caller must bytes.Clone for retention. Fail-closed contract (matches design 7.1): - ErrSnapshotBadMagic - first 8 bytes not "EKVPBBL1" (operator pointed at the wrong file) - ErrSnapshotTruncated - EOF mid-entry (a clean inter-entry EOF is the normal terminator and is NOT an error) - ErrSnapshotShortKey - encoded key < 8 bytes (no room for TS) - ErrSnapshotShortValue - encoded value < 9 bytes (no room for value header) - ErrSnapshotEncryptedReserved - reserved encryption_state bits (0b10, 0b11) or bits 3-7 set; same fail-closed trip-wire as store.ErrEncryptedValueReservedState - ErrSnapshotEncryptedEntry - encState=0b01 (encrypted). Phase 0a does not link the decryption keyring; Stage 8 of the encryption rollout adds a keyring-aware variant. Self-review: 1. Data loss - tombstone flag surfaced verbatim so callers can choose to suppress (default Phase 0a backup behavior) or include (a future diagnostic dump might want both). EOF distinction between clean terminator and truncation prevents silently dropping a partial entry. 2. Concurrency - reader is single-pass over an io.Reader; no shared state, no goroutines. 3. Performance - per-entry slices alias a fixed 64 KiB key buffer and a growable value buffer. For a 10M-entry snapshot this keeps allocs O(1) outside the value-grow boundary. bufio wrapper amortises read syscalls. 4. Consistency - encryption_state bits are decoded into the identical 4-way switch (cleartext / encrypted / 0b10 / 0b11) that store/lsm_store.go::decodeValue uses, so a future live-side enum extension fails closed here too. 5. Coverage - 17 table-driven tests in snapshot_reader_test.go covering: header-only, single-entry round-trip, tombstone flag, multi-entry order preservation, bad magic, truncated header, truncated entry, short-key, short-value, encrypted entry, all four reserved-encState cases, callback-error propagation, MVCC TS XOR-inversion across uint64 boundary, empty-value body. Caller audit (per /loop standing instruction): this PR adds a NEW public API surface (ReadSnapshot, SnapshotEntry, SnapshotHeader, PebbleSnapshotMagic, the Err* sentinels). No prior callers exist. Verified via 'grep -rn "ReadSnapshot|SnapshotEntry|SnapshotHeader" --include=*.go' - all matches are inside internal/backup/snapshot_reader{,test}.go. The follow-up PR (cmd/elastickv-snapshot-decode driver + dispatcher) will be this API's first consumer.
1 parent abd6a50 commit 42fb7ba

2 files changed

Lines changed: 746 additions & 0 deletions

File tree

internal/backup/snapshot_reader.go

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
package backup
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"encoding/binary"
7+
"io"
8+
9+
cockroachdberr "github.com/cockroachdb/errors"
10+
)
11+
12+
// snapshot_reader.go consumes the native Pebble snapshot format
13+
// produced by store/lsm_store.go::pebbleSnapshotMagic +
14+
// restoreBatchLoopInto and yields each entry as a (userKey,
15+
// userValue, tombstone, expireAt) tuple after stripping the MVCC
16+
// encoding the live store layers on top of raw Pebble bytes.
17+
//
18+
// Snapshot file shape:
19+
//
20+
// [8 bytes] magic "EKVPBBL1"
21+
// [8 bytes] lastCommitTS (LittleEndian uint64)
22+
// repeated:
23+
// [8 bytes] keyLen (LittleEndian uint64)
24+
// [keyLen] encoded key = <userKey><invTS(8 BE)>
25+
// [8 bytes] valLen (LittleEndian uint64)
26+
// [valLen] encoded value = <flags(1)><expireAt(8 LE)><body>
27+
// (flags bit 0 = tombstone, bits 1-2 = encryption_state)
28+
//
29+
// Mirrors store/lsm_store.go:1670-1697 (readRestoreEntry) and
30+
// :336-340 (fillEncodedKey) and :419-422 (fillEncodedValue). The
31+
// constants are duplicated here so this package stays
32+
// adapter/store-independent (the design requires the decoder to
33+
// run as an offline tool against a `.fsm` file with no live cluster
34+
// libraries linked).
35+
36+
// Snapshot format constants — mirror store/lsm_store.go.
37+
const (
38+
// PebbleSnapshotMagicLen is the byte length of the "EKVPBBL1"
39+
// header. Exposed so callers can sniff the first 8 bytes of a
40+
// file to decide whether to dispatch into ReadSnapshot or fall
41+
// through to another reader.
42+
PebbleSnapshotMagicLen = 8
43+
44+
// snapshotTSSize is the 8-byte inverted-TS suffix appended to
45+
// every encoded key (`store.fillEncodedKey`).
46+
snapshotTSSize = 8
47+
48+
// snapshotValueHeaderSize is the 9-byte value-header prefix
49+
// (flags + expireAt) on every encoded value
50+
// (`store.fillEncodedValue`).
51+
snapshotValueHeaderSize = 9
52+
53+
// snapshotTombstoneMask / snapshotEncStateMask / snapshotEncStateShift
54+
// mirror store.tombstoneMask / encStateMask / encStateShift. A
55+
// rename on the live side without an accompanying update here
56+
// would surface at the snapshot reader's table-driven tests.
57+
snapshotTombstoneMask byte = 0b0000_0001
58+
snapshotEncStateMask byte = 0b0000_0110
59+
snapshotEncStateShift = 1
60+
snapshotEncStateReserved byte = 0b1111_1000 // bits 3-7 must be zero
61+
snapshotEncStateCleartx byte = 0b00
62+
snapshotEncStateEncrypt byte = 0b01
63+
)
64+
65+
// PebbleSnapshotMagic is the 8-byte file header that introduces a
66+
// native Pebble snapshot. Exposed for callers that need to sniff a
67+
// file before deciding which reader to dispatch to.
68+
var PebbleSnapshotMagic = [PebbleSnapshotMagicLen]byte{'E', 'K', 'V', 'P', 'B', 'B', 'L', '1'}
69+
70+
// ErrSnapshotBadMagic is returned when the first 8 bytes of the
71+
// reader do not match `EKVPBBL1`. The decoder caller should treat
72+
// this as an immediate hard failure rather than try to skip past
73+
// the bad header — a wrong magic almost always indicates the file
74+
// is not actually a Pebble snapshot (an MVCC streaming snapshot,
75+
// a tar archive, a partial truncate, etc.).
76+
var ErrSnapshotBadMagic = cockroachdberr.New("backup: snapshot magic header does not match \"EKVPBBL1\"")
77+
78+
// ErrSnapshotTruncated is returned when the snapshot ends mid-entry
79+
// (after a key length but before the key, or after a value length
80+
// but before the value). A clean EOF at the start of the
81+
// key-length field is a normal terminator and is NOT an error.
82+
var ErrSnapshotTruncated = cockroachdberr.New("backup: snapshot truncated mid-entry")
83+
84+
// ErrSnapshotEncryptedReserved is returned when a value-header
85+
// carries reserved encryption_state bits (0b10 or 0b11). Mirrors
86+
// store.ErrEncryptedValueReservedState — the decoder fails closed
87+
// rather than treat the body as cleartext, matching the design's
88+
// §7.1 fail-closed contract.
89+
var ErrSnapshotEncryptedReserved = cockroachdberr.New("backup: value header carries reserved encryption_state; decoder cannot interpret this entry")
90+
91+
// ErrSnapshotEncryptedEntry is returned when a value-header
92+
// declares the entry is encrypted (encState=0b01). Phase 0a does
93+
// NOT carry the decryption keyring; an encrypted snapshot must be
94+
// decoded with a Phase 0a+keyring binary or after Stage 8 of the
95+
// encryption rollout reverses the encryption.
96+
var ErrSnapshotEncryptedEntry = cockroachdberr.New("backup: snapshot contains encrypted entries — Phase 0a does not link the decryption keyring")
97+
98+
// ErrSnapshotShortKey is returned when an entry's encoded key is
99+
// shorter than the 8-byte timestamp suffix that
100+
// `store.fillEncodedKey` always appends. Indicates a corrupt
101+
// snapshot — the live store would never emit such a key.
102+
var ErrSnapshotShortKey = cockroachdberr.New("backup: encoded key shorter than timestamp suffix")
103+
104+
// ErrSnapshotShortValue is returned when an entry's encoded value
105+
// is shorter than the 9-byte value header. Indicates a corrupt
106+
// snapshot — the live store always writes the header even for
107+
// tombstones.
108+
var ErrSnapshotShortValue = cockroachdberr.New("backup: encoded value shorter than value-header")
109+
110+
// SnapshotEntry is one decoded entry emitted by ReadSnapshot's
111+
// callback. Fields are the user-visible key / value bytes plus the
112+
// MVCC metadata the decoder peeled off (commit timestamp, expiry,
113+
// tombstone marker). Slices are owned by the snapshot reader's
114+
// scratch buffer and may be overwritten when the callback returns —
115+
// callers that need to retain bytes across iterations must
116+
// `bytes.Clone` them.
117+
type SnapshotEntry struct {
118+
UserKey []byte
119+
UserValue []byte
120+
CommitTS uint64
121+
ExpireAt uint64
122+
Tombstone bool
123+
}
124+
125+
// SnapshotHeader is the decoded preamble returned to the caller
126+
// before iteration begins so the caller can record the snapshot's
127+
// commit-time horizon in its MANIFEST.json (per design §380-422).
128+
type SnapshotHeader struct {
129+
LastCommitTS uint64
130+
}
131+
132+
// ReadSnapshot reads the EKVPBBL1 header from r, then yields every
133+
// entry through fn. fn receives a transient SnapshotEntry whose
134+
// byte slices are NOT safe to retain across calls (the reader
135+
// reuses scratch buffers to keep per-entry allocations bounded for
136+
// multi-GB snapshots). If fn returns an error, iteration stops and
137+
// the error is returned verbatim.
138+
//
139+
// Iteration terminates cleanly on EOF at the start of an entry's
140+
// key-length field. EOF inside an entry returns
141+
// ErrSnapshotTruncated.
142+
//
143+
// Tombstone entries (flags bit 0 set) are surfaced via
144+
// SnapshotEntry.Tombstone — callers decide whether to suppress
145+
// them (Phase 0a's intended behavior for backup output) or include
146+
// them (a multi-version diagnostic dump might want both).
147+
func ReadSnapshot(r io.Reader, fn func(SnapshotHeader, SnapshotEntry) error) error {
148+
br := bufio.NewReader(r)
149+
header, err := readSnapshotHeader(br)
150+
if err != nil {
151+
return err
152+
}
153+
var (
154+
keyBuf [1 << 16]byte
155+
valBuf []byte
156+
)
157+
for {
158+
stop, err := readOneEntry(br, header, keyBuf[:], &valBuf, fn)
159+
if err != nil {
160+
return err
161+
}
162+
if stop {
163+
return nil
164+
}
165+
}
166+
}
167+
168+
// readOneEntry handles one (key, value) tuple plus the callback
169+
// dispatch. Extracted from ReadSnapshot so the parent stays under
170+
// the cyclop budget — the same shape every backup encoder uses
171+
// (small fixed driver loop + extracted per-record helper).
172+
// Returns (true, nil) on the natural inter-entry EOF terminator.
173+
func readOneEntry(
174+
r *bufio.Reader,
175+
header SnapshotHeader,
176+
keyScratch []byte,
177+
valBuf *[]byte,
178+
fn func(SnapshotHeader, SnapshotEntry) error,
179+
) (bool, error) {
180+
kLen, eof, err := readEntryLen(r)
181+
if err != nil {
182+
return false, err
183+
}
184+
if eof {
185+
return true, nil
186+
}
187+
key, err := readExact(r, keyScratch[:0], kLen)
188+
if err != nil {
189+
return false, cockroachdberr.WithStack(err)
190+
}
191+
vLen, _, err := readEntryLen(r)
192+
if err != nil {
193+
// A clean EOF here means the snapshot truncated between
194+
// the key bytes and the value-length field — not the
195+
// same as a clean inter-entry EOF.
196+
if cockroachdberr.Is(err, io.EOF) {
197+
return false, cockroachdberr.WithStack(ErrSnapshotTruncated)
198+
}
199+
return false, err
200+
}
201+
*valBuf, err = readExactGrow(r, (*valBuf)[:0], vLen)
202+
if err != nil {
203+
return false, cockroachdberr.WithStack(err)
204+
}
205+
entry, err := decodeSnapshotEntry(key, *valBuf)
206+
if err != nil {
207+
return false, err
208+
}
209+
if err := fn(header, entry); err != nil {
210+
return false, err
211+
}
212+
return false, nil
213+
}
214+
215+
// readSnapshotHeader consumes the 8-byte magic and the 8-byte LE
216+
// lastCommitTS. Returns ErrSnapshotBadMagic on header mismatch.
217+
func readSnapshotHeader(r io.Reader) (SnapshotHeader, error) {
218+
var magic [PebbleSnapshotMagicLen]byte
219+
if _, err := io.ReadFull(r, magic[:]); err != nil {
220+
return SnapshotHeader{}, cockroachdberr.WithStack(err)
221+
}
222+
if !bytes.Equal(magic[:], PebbleSnapshotMagic[:]) {
223+
return SnapshotHeader{}, cockroachdberr.Wrapf(ErrSnapshotBadMagic,
224+
"got %q", magic[:])
225+
}
226+
var ts uint64
227+
if err := binary.Read(r, binary.LittleEndian, &ts); err != nil {
228+
return SnapshotHeader{}, cockroachdberr.WithStack(err)
229+
}
230+
return SnapshotHeader{LastCommitTS: ts}, nil
231+
}
232+
233+
// readEntryLen reads an 8-byte LittleEndian length prefix. Returns
234+
// (0, true, nil) on clean EOF — used to detect the natural end of
235+
// the snapshot. Any other read error (including unexpected EOF) is
236+
// returned verbatim.
237+
func readEntryLen(r io.Reader) (uint64, bool, error) {
238+
var raw [8]byte
239+
n, err := io.ReadFull(r, raw[:])
240+
if err == nil {
241+
return binary.LittleEndian.Uint64(raw[:]), false, nil
242+
}
243+
if cockroachdberr.Is(err, io.EOF) && n == 0 {
244+
return 0, true, nil
245+
}
246+
if cockroachdberr.Is(err, io.ErrUnexpectedEOF) {
247+
return 0, false, cockroachdberr.WithStack(ErrSnapshotTruncated)
248+
}
249+
return 0, false, cockroachdberr.WithStack(err)
250+
}
251+
252+
// readExact reads exactly n bytes into dst (extending it as
253+
// needed). The returned slice aliases dst's underlying array — the
254+
// caller must not retain it across loop iterations.
255+
func readExact(r io.Reader, dst []byte, n uint64) ([]byte, error) {
256+
if uint64(cap(dst)) < n {
257+
// Cap fallback path: allocate a fresh slice when the
258+
// caller's scratch buffer isn't large enough. For the
259+
// stack-allocated keyBuf this only kicks in on
260+
// pathologically long keys.
261+
return readExactGrow(r, dst, n)
262+
}
263+
dst = dst[:n]
264+
if _, err := io.ReadFull(r, dst); err != nil {
265+
if cockroachdberr.Is(err, io.ErrUnexpectedEOF) || cockroachdberr.Is(err, io.EOF) {
266+
return nil, cockroachdberr.WithStack(ErrSnapshotTruncated)
267+
}
268+
return nil, cockroachdberr.WithStack(err)
269+
}
270+
return dst, nil
271+
}
272+
273+
// readExactGrow is the heap-fallback variant of readExact. Used
274+
// for value bodies, which can be up to several MiB and so live in
275+
// a separately grown buffer rather than a fixed stack array.
276+
func readExactGrow(r io.Reader, dst []byte, n uint64) ([]byte, error) {
277+
if uint64(cap(dst)) < n {
278+
dst = make([]byte, n)
279+
} else {
280+
dst = dst[:n]
281+
}
282+
if _, err := io.ReadFull(r, dst); err != nil {
283+
if cockroachdberr.Is(err, io.ErrUnexpectedEOF) || cockroachdberr.Is(err, io.EOF) {
284+
return nil, cockroachdberr.WithStack(ErrSnapshotTruncated)
285+
}
286+
return nil, cockroachdberr.WithStack(err)
287+
}
288+
return dst, nil
289+
}
290+
291+
// decodeSnapshotEntry strips the 8-byte inverted-TS key suffix and
292+
// the 9-byte value header, surfacing the user-visible byte slices
293+
// plus the MVCC metadata. Returns ErrSnapshotShortKey /
294+
// ErrSnapshotShortValue on length violations and
295+
// ErrSnapshotEncryptedReserved / ErrSnapshotEncryptedEntry on bad
296+
// or unsupported encryption_state bits.
297+
func decodeSnapshotEntry(encKey, encVal []byte) (SnapshotEntry, error) {
298+
if len(encKey) < snapshotTSSize {
299+
return SnapshotEntry{}, cockroachdberr.Wrapf(ErrSnapshotShortKey,
300+
"encoded key length %d < %d", len(encKey), snapshotTSSize)
301+
}
302+
if len(encVal) < snapshotValueHeaderSize {
303+
return SnapshotEntry{}, cockroachdberr.Wrapf(ErrSnapshotShortValue,
304+
"encoded value length %d < %d", len(encVal), snapshotValueHeaderSize)
305+
}
306+
userKey := encKey[:len(encKey)-snapshotTSSize]
307+
invTS := binary.BigEndian.Uint64(encKey[len(encKey)-snapshotTSSize:])
308+
commitTS := ^invTS
309+
310+
flags := encVal[0]
311+
if flags&snapshotEncStateReserved != 0 {
312+
return SnapshotEntry{}, cockroachdberr.Wrapf(ErrSnapshotEncryptedReserved,
313+
"value header byte %#08b", flags)
314+
}
315+
encState := (flags & snapshotEncStateMask) >> snapshotEncStateShift
316+
switch encState {
317+
case snapshotEncStateCleartx:
318+
// fall through
319+
case snapshotEncStateEncrypt:
320+
return SnapshotEntry{}, cockroachdberr.WithStack(ErrSnapshotEncryptedEntry)
321+
default:
322+
return SnapshotEntry{}, cockroachdberr.Wrapf(ErrSnapshotEncryptedReserved,
323+
"encryption_state=%#x is reserved", encState)
324+
}
325+
tombstone := (flags & snapshotTombstoneMask) != 0
326+
expireAt := binary.LittleEndian.Uint64(encVal[1:snapshotValueHeaderSize])
327+
userValue := encVal[snapshotValueHeaderSize:]
328+
return SnapshotEntry{
329+
UserKey: userKey,
330+
UserValue: userValue,
331+
CommitTS: commitTS,
332+
ExpireAt: expireAt,
333+
Tombstone: tombstone,
334+
}, nil
335+
}

0 commit comments

Comments
 (0)