Skip to content

Commit fde4ee3

Browse files
authored
backup: cmd/elastickv-snapshot-decode CLI + CHECKSUMS writer (#810)
## Summary Adds the Phase 0a operator-facing binary on top of the dispatcher in #806. Plus the CHECKSUMS writer / verifier and the `<uint64>.fsm` filename → snapshot-index parser the CLI needs to stamp MANIFEST.json. Stacked on `backup/snapshot-dispatcher` — once #806 merges, this PR's diff collapses to just the CLI + helpers and I'll retarget to `main`. ## What lands | File | Purpose | |---|---| | `cmd/elastickv-snapshot-decode/main.go` | Flag parsing, `DecodeSnapshot` invocation, `MANIFEST.json` + `CHECKSUMS` emission | | `cmd/elastickv-snapshot-decode/main_test.go` | End-to-end synthetic-snapshot golden path + four flag-validation paths + `parseAdapterSet` table test | | `internal/backup/checksums.go` | `WriteChecksums(root)` / `VerifyChecksums(root)` — sha256sum(1)-compatible | | `internal/backup/source.go` | `SnapshotIndexFromPath` — `<uint64>.fsm` filename parser used to stamp `MANIFEST.snapshot_index` | | `internal/backup/checksums_test.go`, `source_test.go` | Pin file shape, deterministic ordering, tamper detection, parse table | ## CLI surface ``` elastickv-snapshot-decode \ --input <fsm-file> (required) --output <directory-root> (required) [--adapter dynamodb,s3,redis,sqs|all] [--cluster-id <id>] [--include-incomplete-uploads] [--include-orphans] [--preserve-sqs-visibility] [--include-sqs-side-records] [--rename-collisions] [--dynamodb-bundle-mode per-item|jsonl] ``` Matches the design's §"Tooling" decoder section, minus the deferred `--input-snap-dir`, `--scope`, and `--dynamodb-bundle-size` flags (called out in the design as later additions). ## Output shape `MANIFEST.json` carries `phase = "phase0-snapshot-decode"`, the filename-derived `snapshot_index`, the dispatcher-surfaced `last_commit_ts`, exclusion flags from the CLI, and a non-nil `Adapter{}` pointer for every enabled adapter (per-scope enumeration is a follow-up). `CHECKSUMS` is the standard `sha256sum --binary` shape — verifiable with `sha256sum -c CHECKSUMS` from the dump root, no elastickv binary involved. ## Test plan - [x] `go test -race ./internal/backup/ ./cmd/elastickv-snapshot-decode/` (passes) - [x] `golangci-lint run ./...` over the touched packages (clean — 0 issues) - [x] `go build ./...` (clean) - [x] Golden-path test: synthetic `!redis|str|greeting` → `redis/db_0/strings/greeting.bin` + `MANIFEST.json` + `CHECKSUMS` + `VerifyChecksums` round-trip ## Self-review (CLAUDE.md §"Self-review of code changes") 1. **Data loss** — CHECKSUMS is written last so a crash mid-dump leaves an intact-but-unchecked tree; `MkdirAll(outputRoot)` is idempotent. 2. **Concurrency** — single-threaded; no shared state across goroutines. 3. **Performance** — sha256 streams via `io.Copy`; multi-GB files pass without buffering. CHECKSUMS walk is O(files). 4. **Data consistency** — `MANIFEST.last_commit_ts` comes from the dispatcher header (now surfaced via `ReadSnapshotWithHeader` so empty-snapshot dumps still populate it); `snapshot_index` parsed from the filename per the documented live-writer convention; encryption-state failures propagate from `DecodeSnapshot` as `ErrSnapshotEncryptedEntry`. 5. **Test coverage** — five CLI tests (golden path + four flag-validation paths); seven backup-package tests for the new helpers; existing backup-package tests still pass. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * New CLI to decode local snapshots into vendor-independent directory trees with adapter selection, output options, and manifest/CHECKSUMS generation * CHECKSUMS written in sha256sum-compatible format for integrity verification; manifest includes version, cluster metadata and parsed snapshot index when available * **Bug Fixes / Security** * CHECKSUMS verification rejects path-traversal and symlink escape attempts * **Tests** * Comprehensive tests covering decoding, manifesting, checksums, parsing, and security regressions <!-- review_stack_entry_start --> [![Review Change Stack](https://storage.googleapis.com/coderabbit_public_assets/review-stack-in-coderabbit-ui.svg)](https://app.coderabbit.ai/change-stack/bootjp/elastickv/pull/810?utm_source=github_walkthrough&utm_medium=github&utm_campaign=change_stack) <!-- review_stack_entry_end --> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 8b270d2 + 2822f2d commit fde4ee3

6 files changed

Lines changed: 1729 additions & 0 deletions

File tree

Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
// Command elastickv-snapshot-decode is the Phase 0a snapshot
2+
// decoder described in
3+
// docs/design/2026_04_29_proposed_snapshot_logical_decoder.md.
4+
//
5+
// It reads a Pebble snapshot (`.fsm`) emitted by the live FSM
6+
// (store/lsm_store.go) and writes a vendor-independent per-adapter
7+
// directory tree rooted at --output. The output tree carries a
8+
// MANIFEST.json describing the dump and a sha256sum(1)-compatible
9+
// CHECKSUMS file the operator can verify with stock UNIX tooling
10+
// — `sha256sum -c CHECKSUMS` from the dump root suffices, with no
11+
// elastickv binary involved.
12+
//
13+
// Phase 0a is offline-only. The tool needs a `.fsm` on disk; it
14+
// does not talk to a running cluster. Phase 1 (separate design)
15+
// adds a live-cluster extraction path that produces the same
16+
// directory tree from a pinned read_ts.
17+
package main
18+
19+
import (
20+
"flag"
21+
"fmt"
22+
"io"
23+
"log/slog"
24+
"os"
25+
"path/filepath"
26+
"strings"
27+
"time"
28+
29+
"github.com/bootjp/elastickv/internal/backup"
30+
"github.com/cockroachdb/errors"
31+
)
32+
33+
// defaultOutputDirMode mirrors the permissions every backup-package
34+
// encoder uses when it creates per-adapter subdirectories. Kept on
35+
// the CLI side so an operator who wants a more restrictive umask can
36+
// pre-create the output root before running the decoder; the
37+
// MkdirAll call below is a no-op when the directory already exists.
38+
const defaultOutputDirMode = 0o755
39+
40+
// version is stamped via -ldflags "-X main.version=<git-sha>" by
41+
// the build pipeline. Phase 0a copies it into MANIFEST.json's
42+
// elastickv_version field so a downstream restore-tool author
43+
// knows which release-vintage of the elastickv source produced the
44+
// dump.
45+
var version = "dev"
46+
47+
func main() {
48+
if err := run(os.Args[1:], os.Stderr); err != nil {
49+
fmt.Fprintf(os.Stderr, "elastickv-snapshot-decode: %v\n", err)
50+
os.Exit(1)
51+
}
52+
}
53+
54+
// run is the testable entrypoint: argv slice in, error out. Phase 0a's
55+
// tests drive it directly (no os.Exit + no signal handling) so they
56+
// can assert on the output tree shape.
57+
func run(argv []string, logSink io.Writer) error {
58+
cfg, err := parseFlags(argv)
59+
if err != nil {
60+
return err
61+
}
62+
logger := slog.New(slog.NewTextHandler(logSink, &slog.HandlerOptions{Level: slog.LevelInfo}))
63+
return decodeOne(cfg, logger)
64+
}
65+
66+
// config carries the parsed CLI flags. Kept separate from
67+
// backup.DecodeOptions so the CLI can layer flags the package does
68+
// not need (cluster_id, source-path bookkeeping for MANIFEST,
69+
// log-format) without contaminating the library.
70+
type config struct {
71+
inputPath string
72+
outputRoot string
73+
74+
adapters backup.AdapterSet
75+
76+
includeIncompleteUploads bool
77+
includeOrphans bool
78+
preserveSQSVisibility bool
79+
includeSQSSideRecords bool
80+
renameCollisions bool
81+
bundleJSONL bool
82+
83+
clusterID string
84+
}
85+
86+
// parseFlags lifts argv into a config. The default adapter set is
87+
// "everything"; --adapter overrides with a comma-separated list.
88+
func parseFlags(argv []string) (*config, error) {
89+
fs := flag.NewFlagSet("elastickv-snapshot-decode", flag.ContinueOnError)
90+
fs.SetOutput(io.Discard) // we surface errors via the returned error chain
91+
92+
var (
93+
inputPath string
94+
outputRoot string
95+
adapterCSV string
96+
bundleMode string
97+
clusterID string
98+
99+
includeIncompleteUploads bool
100+
includeOrphans bool
101+
preserveSQSVisibility bool
102+
includeSQSSideRecords bool
103+
renameCollisions bool
104+
)
105+
fs.StringVar(&inputPath, "input", "", "Path to the .fsm snapshot file (required)")
106+
fs.StringVar(&outputRoot, "output", "", "Destination directory tree root (required)")
107+
fs.StringVar(&adapterCSV, "adapter", "dynamodb,s3,redis,sqs", "Comma-separated subset of adapters to emit")
108+
fs.StringVar(&bundleMode, "dynamodb-bundle-mode", "per-item", "DynamoDB item layout: per-item | jsonl")
109+
fs.StringVar(&clusterID, "cluster-id", "", "Cluster identifier to stamp into MANIFEST.json (optional)")
110+
fs.BoolVar(&includeIncompleteUploads, "include-incomplete-uploads", false, "Emit in-flight S3 multipart uploads under _incomplete_uploads/")
111+
fs.BoolVar(&includeOrphans, "include-orphans", false, "Emit pre-generation S3 orphan blobs under _orphans/")
112+
fs.BoolVar(&preserveSQSVisibility, "preserve-sqs-visibility", false, "Carry live SQS visibility-state fields into the dump instead of zeroing them")
113+
fs.BoolVar(&includeSQSSideRecords, "include-sqs-side-records", false, "Emit SQS dedup/group/vis side records under _internals/")
114+
fs.BoolVar(&renameCollisions, "rename-collisions", false, "Rename user S3 keys ending in .elastickv-meta.json instead of failing")
115+
if err := fs.Parse(argv); err != nil {
116+
return nil, errors.WithStack(err)
117+
}
118+
if inputPath == "" {
119+
return nil, errors.New("--input is required")
120+
}
121+
if outputRoot == "" {
122+
return nil, errors.New("--output is required")
123+
}
124+
adapters, err := parseAdapterSet(adapterCSV)
125+
if err != nil {
126+
return nil, err
127+
}
128+
bundleJSONL, err := parseBundleMode(bundleMode)
129+
if err != nil {
130+
return nil, err
131+
}
132+
return &config{
133+
inputPath: inputPath,
134+
outputRoot: outputRoot,
135+
adapters: adapters,
136+
includeIncompleteUploads: includeIncompleteUploads,
137+
includeOrphans: includeOrphans,
138+
preserveSQSVisibility: preserveSQSVisibility,
139+
includeSQSSideRecords: includeSQSSideRecords,
140+
renameCollisions: renameCollisions,
141+
bundleJSONL: bundleJSONL,
142+
clusterID: clusterID,
143+
}, nil
144+
}
145+
146+
// parseAdapterSet decodes a comma-separated adapter list.
147+
// "all" is accepted as shorthand; unknown names surface as a hard
148+
// error so a typo does not silently disable an adapter.
149+
func parseAdapterSet(csv string) (backup.AdapterSet, error) {
150+
if csv == "" || csv == "all" {
151+
return backup.AllAdapters(), nil
152+
}
153+
var set backup.AdapterSet
154+
for _, raw := range strings.Split(csv, ",") {
155+
if err := applyAdapterName(strings.TrimSpace(raw), &set); err != nil {
156+
return backup.AdapterSet{}, err
157+
}
158+
}
159+
if set == (backup.AdapterSet{}) {
160+
return backup.AdapterSet{}, errors.New("--adapter selected zero adapters")
161+
}
162+
return set, nil
163+
}
164+
165+
// applyAdapterName flips the matching field of set on. Extracted
166+
// from parseAdapterSet so the parent function stays under the
167+
// project's cyclop=10 ceiling.
168+
func applyAdapterName(name string, set *backup.AdapterSet) error {
169+
switch name {
170+
case "dynamodb":
171+
set.DynamoDB = true
172+
case "s3":
173+
set.S3 = true
174+
case "redis":
175+
set.Redis = true
176+
case "sqs":
177+
set.SQS = true
178+
case "":
179+
// allow trailing comma
180+
default:
181+
return errors.Wrap(ErrUnknownAdapter, name)
182+
}
183+
return nil
184+
}
185+
186+
// ErrUnknownAdapter is returned by parseAdapterSet when the CSV
187+
// list contains a name we do not recognise. Surfaced as a typed
188+
// sentinel so the CLI's flag-validation tests can assert on it
189+
// without string matching.
190+
var ErrUnknownAdapter = errors.New("unknown adapter name")
191+
192+
// ErrBundleModeInvalid is returned by parseBundleMode when the
193+
// flag value is neither per-item nor jsonl.
194+
var ErrBundleModeInvalid = errors.New("invalid --dynamodb-bundle-mode")
195+
196+
// parseBundleMode validates the --dynamodb-bundle-mode value. The
197+
// underlying encoder rejects bundleJSONL=true at Finalize time
198+
// (the JSONL path is a Phase 0b follow-up), but we still accept
199+
// the flag now so the CLI surface is stable across that change.
200+
func parseBundleMode(mode string) (bool, error) {
201+
switch mode {
202+
case "per-item", "":
203+
return false, nil
204+
case "jsonl":
205+
return true, nil
206+
default:
207+
return false, errors.Wrap(ErrBundleModeInvalid, mode)
208+
}
209+
}
210+
211+
// decodeOne runs the dispatcher against the configured input and
212+
// writes MANIFEST.json + CHECKSUMS into the output tree.
213+
func decodeOne(cfg *config, logger *slog.Logger) error {
214+
if err := os.MkdirAll(cfg.outputRoot, defaultOutputDirMode); err != nil {
215+
return errors.WithStack(err)
216+
}
217+
in, err := os.Open(cfg.inputPath) //nolint:gosec // operator-supplied path
218+
if err != nil {
219+
return errors.WithStack(err)
220+
}
221+
defer func() { _ = in.Close() }()
222+
223+
res, err := backup.DecodeSnapshot(in, backup.DecodeOptions{
224+
OutRoot: cfg.outputRoot,
225+
Adapters: cfg.adapters,
226+
IncludeIncompleteUploads: cfg.includeIncompleteUploads,
227+
IncludeOrphans: cfg.includeOrphans,
228+
RenameS3Collisions: cfg.renameCollisions,
229+
PreserveSQSVisibility: cfg.preserveSQSVisibility,
230+
IncludeSQSSideRecords: cfg.includeSQSSideRecords,
231+
DynamoDBBundleJSONL: cfg.bundleJSONL,
232+
WarnSink: warnSinkFor(logger),
233+
})
234+
if err != nil {
235+
return errors.Wrapf(err, "decode %s", cfg.inputPath)
236+
}
237+
logger.Info("snapshot decoded",
238+
"input", cfg.inputPath,
239+
"output", cfg.outputRoot,
240+
"entries", res.Counters.Total,
241+
"dynamodb", res.Counters.DynamoDB,
242+
"s3", res.Counters.S3,
243+
"redis", res.Counters.Redis,
244+
"sqs", res.Counters.SQS,
245+
"tombstone", res.Counters.Tombstone,
246+
"internal", res.Counters.Internal,
247+
"unknown", res.Counters.Unknown,
248+
"last_commit_ts", res.Header.LastCommitTS)
249+
250+
if err := emitManifest(cfg, res); err != nil {
251+
return err
252+
}
253+
if err := backup.WriteChecksums(cfg.outputRoot); err != nil {
254+
return errors.Wrap(err, "write CHECKSUMS")
255+
}
256+
return nil
257+
}
258+
259+
// emitManifest writes MANIFEST.json under cfg.outputRoot. Source
260+
// metadata (FSM path and parsed snapshot_index) come from the CLI
261+
// invocation; last_commit_ts comes from the snapshot header the
262+
// dispatcher surfaces.
263+
func emitManifest(cfg *config, res backup.DecodeResult) error {
264+
m := backup.NewPhase0SnapshotManifest(time.Now())
265+
m.ElastickvVersion = version
266+
m.ClusterID = cfg.clusterID
267+
m.LastCommitTS = res.Header.LastCommitTS
268+
if idx, err := backup.SnapshotIndexFromPath(cfg.inputPath); err == nil {
269+
m.SnapshotIndex = idx
270+
}
271+
m.Source = &backup.Source{FSMPath: cfg.inputPath}
272+
m.Adapters = populateAdapterScopes(cfg.adapters, res)
273+
m.Exclusions = &backup.Exclusions{
274+
IncludeIncompleteUploads: cfg.includeIncompleteUploads,
275+
IncludeOrphans: cfg.includeOrphans,
276+
PreserveSQSVisibility: cfg.preserveSQSVisibility,
277+
IncludeSQSSideRecords: cfg.includeSQSSideRecords,
278+
}
279+
if cfg.bundleJSONL {
280+
m.DynamoDBLayout = backup.DynamoDBLayoutJSONL
281+
}
282+
// filepath.Join over `+ "/"` so the manifest path uses the
283+
// platform separator (gemini r1 medium on PR #810).
284+
out, err := os.Create(filepath.Join(cfg.outputRoot, "MANIFEST.json")) //nolint:gosec // operator-supplied path
285+
if err != nil {
286+
return errors.WithStack(err)
287+
}
288+
// Surface Close errors instead of swallowing them: on a
289+
// filesystem that delays writeback until Close (NFS, some
290+
// FUSE mounts), the write succeeds but the buffered-data
291+
// flush failure shows up only at Close time. Without this
292+
// surface, a manifest that was never durably written would
293+
// pass the cmd's error-return contract and the dump-tree
294+
// invariant ("MANIFEST.json is on disk") would silently
295+
// fail. Gemini r1 medium on PR #810. Sync runs before Close
296+
// so the explicit fsync is the authoritative durability
297+
// signal; Close's role is just to surface the kernel's
298+
// per-fd cleanup result.
299+
if err := backup.WriteManifest(out, m); err != nil {
300+
_ = out.Close()
301+
return errors.WithStack(err)
302+
}
303+
if err := out.Sync(); err != nil {
304+
_ = out.Close()
305+
return errors.WithStack(err)
306+
}
307+
if err := out.Close(); err != nil {
308+
return errors.WithStack(err)
309+
}
310+
return nil
311+
}
312+
313+
// populateAdapterScopes returns a non-nil pointer for every enabled
314+
// adapter. Scope-name enumeration (per-table, per-bucket, per-DB,
315+
// per-queue) is a follow-up — Phase 0a's MANIFEST stamps an empty
316+
// Adapter{} for each enabled adapter so the on-disk shape is
317+
// already correct when scope enumeration lands.
318+
//
319+
// An adapter that was enabled but produced zero entries is still
320+
// returned as a non-nil pointer with empty scope arrays. The
321+
// "scope-everything-excluded" state is meaningful (operator passed
322+
// --adapter dynamodb but the snapshot had no DynamoDB data) and
323+
// distinct from "adapter not enabled".
324+
func populateAdapterScopes(set backup.AdapterSet, _ backup.DecodeResult) *backup.Adapters {
325+
a := &backup.Adapters{}
326+
if set.DynamoDB {
327+
a.DynamoDB = &backup.Adapter{}
328+
}
329+
if set.S3 {
330+
a.S3 = &backup.Adapter{}
331+
}
332+
if set.Redis {
333+
a.Redis = &backup.Adapter{}
334+
}
335+
if set.SQS {
336+
a.SQS = &backup.Adapter{}
337+
}
338+
return a
339+
}
340+
341+
// warnSinkFor adapts a *slog.Logger into the warn-sink callback
342+
// the per-adapter encoders use. Event names ("redis_orphan_ttl",
343+
// "ddb_orphan_items", ...) become the slog `event` field; the
344+
// variadic key=value pairs are passed through verbatim so the
345+
// stable adapter-side keys stay stable on the way out.
346+
func warnSinkFor(logger *slog.Logger) func(event string, fields ...any) {
347+
return func(event string, fields ...any) {
348+
args := append([]any{"event", event}, fields...)
349+
logger.Warn("encoder warning", args...)
350+
}
351+
}

0 commit comments

Comments
 (0)