Skip to content

Commit 9761a3c

Browse files
committed
go/oasis-node/cmd/storage: Add create checkpoint cmd
1 parent ea89ecc commit 9761a3c

4 files changed

Lines changed: 327 additions & 2 deletions

File tree

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
package storage
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"os"
8+
"path/filepath"
9+
10+
cmtCfg "github.com/cometbft/cometbft/config"
11+
cmtNode "github.com/cometbft/cometbft/node"
12+
"github.com/spf13/cobra"
13+
14+
"github.com/oasisprotocol/oasis-core/go/common"
15+
"github.com/oasisprotocol/oasis-core/go/config"
16+
cmtCommon "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/common"
17+
cmtDB "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/db"
18+
cmdCommon "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common"
19+
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
20+
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api"
21+
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
22+
)
23+
24+
const (
25+
consensusSubdir = "consensus"
26+
runtimesSubdir = "runtimes"
27+
)
28+
29+
func newCheckpointCmd() *cobra.Command {
30+
cmd := &cobra.Command{
31+
Use: "checkpoint",
32+
Short: "create and restore storage checkpoints",
33+
PersistentPreRunE: func(_ *cobra.Command, args []string) error {
34+
if err := cmdCommon.Init(); err != nil {
35+
cmdCommon.EarlyLogAndExit(err)
36+
}
37+
running, err := cmdCommon.IsNodeRunning()
38+
if err != nil {
39+
return fmt.Errorf("failed to ensure the node is not running: %w", err)
40+
}
41+
if running {
42+
return fmt.Errorf("checkpoint operations can only be done when the node is not running")
43+
}
44+
return nil
45+
},
46+
}
47+
48+
cmd.AddCommand(newCreateCmd())
49+
cmd.AddCommand(newImportCmd())
50+
51+
return cmd
52+
}
53+
54+
func newCreateCmd() *cobra.Command {
55+
var (
56+
height uint64
57+
runtimeID string
58+
round uint64
59+
outDir string
60+
)
61+
62+
cmd := &cobra.Command{
63+
Use: "create",
64+
Args: cobra.NoArgs,
65+
Short: "create storage checkpoints",
66+
RunE: func(cmd *cobra.Command, args []string) error {
67+
createConsensusCp := func() error {
68+
ndb, close, err := openConsensusNodeDB(cmdCommon.DataDir())
69+
if err != nil {
70+
return fmt.Errorf("failed to open consensus state DB: %w", err)
71+
}
72+
defer close()
73+
74+
consensusOutDir := filepath.Join(outDir, consensusSubdir)
75+
return createConsensusCheckpoint(cmd.Context(), ndb, height, consensusOutDir)
76+
}
77+
78+
createRuntimeCps := func() error {
79+
var ns common.Namespace
80+
if err := ns.UnmarshalHex(runtimeID); err != nil {
81+
return fmt.Errorf("malformed source runtime ID: %q: %w", ns, err)
82+
}
83+
84+
ndb, err := openRuntimeStateDB(cmdCommon.DataDir(), ns)
85+
if err != nil {
86+
return fmt.Errorf("failed to open runtime state DB: %w", err)
87+
}
88+
defer ndb.Close()
89+
90+
rtOutDir := filepath.Join(outDir, runtimesSubdir, ns.Hex())
91+
return createRuntimeCheckpoints(cmd.Context(), ndb, round, rtOutDir)
92+
93+
}
94+
95+
if height != 0 { // TODO handle zero value vs not set correctly.
96+
if err := createConsensusCp(); err != nil {
97+
return fmt.Errorf("failed to create consensus checkpoint (height: %d): %w", height, err)
98+
}
99+
}
100+
101+
if runtimeID != "" {
102+
if err := createRuntimeCps(); err != nil {
103+
return fmt.Errorf("failed to create checkpoints (runtime: %s, round: %d): %w", runtimeID, round, err)
104+
}
105+
}
106+
107+
return nil
108+
},
109+
}
110+
111+
cmd.Flags().Uint64Var(&height, "height", 0, "consensus height")
112+
cmd.Flags().StringVar(&runtimeID, "runtime", "", "hex encoded runtime ID")
113+
cmd.Flags().Uint64Var(&round, "round", 0, "round for which checkpoints will be created")
114+
cmd.Flags().StringVar(&outDir, "output-dir", "", "output directory")
115+
cmd.MarkFlagRequired("output-dir")
116+
117+
return cmd
118+
}
119+
120+
func newImportCmd() *cobra.Command {
121+
var inputDir string
122+
123+
cmd := &cobra.Command{
124+
Use: "import",
125+
Args: cobra.NoArgs,
126+
Short: "import storage checkpoints",
127+
RunE: func(cmd *cobra.Command, args []string) error {
128+
129+
// Consensus checkpoint:
130+
consensusInputDir := filepath.Join(inputDir, consensusSubdir)
131+
entries, err := os.ReadDir(consensusInputDir)
132+
switch {
133+
case err == nil:
134+
if err := restoreConsensusCp(cmd.Context(), cmdCommon.DataDir(), consensusInputDir); err != nil {
135+
return fmt.Errorf("failed to import consensus checkpoint: %w", err)
136+
}
137+
case os.IsNotExist(err):
138+
// No consensus checkpoints to restore
139+
default:
140+
return fmt.Errorf("failed to stat: %w", err)
141+
}
142+
143+
// Runtime checkpoints:
144+
runtimeInputDir := filepath.Join(inputDir, runtimesSubdir)
145+
entries, err = os.ReadDir(runtimeInputDir)
146+
switch {
147+
case err == nil:
148+
for _, ns := range entries {
149+
if err := restoreRuntimeCps(cmd.Context(), runtimeInputDir, ns.Name()); err != nil {
150+
return fmt.Errorf("failed to import checkpoints (runtime: %s): %w", ns, err)
151+
}
152+
}
153+
case os.IsNotExist(err):
154+
// No runtime checkpoints to restore
155+
default:
156+
return fmt.Errorf("failed to stat: %w", err)
157+
}
158+
159+
return nil
160+
},
161+
}
162+
163+
cmd.Flags().StringVar(&inputDir, "input-dir", "", "directory with checkpoints to import")
164+
cmd.MarkFlagRequired("input-dir")
165+
166+
return cmd
167+
}
168+
169+
func createConsensusCheckpoint(ctx context.Context, ndb api.NodeDB, height uint64, outputDir string) error {
170+
roots, err := ndb.GetRootsForVersion(height)
171+
if err != nil {
172+
return fmt.Errorf("failed to get roots for height %d: %w", height, err)
173+
}
174+
if len(roots) == 0 {
175+
return fmt.Errorf("empty roots")
176+
}
177+
return createCheckpoints(ctx, ndb, roots, outputDir)
178+
}
179+
180+
func createRuntimeCheckpoints(ctx context.Context, ndb api.NodeDB, round uint64, outputDir string) error {
181+
roots, err := ndb.GetRootsForVersion(round)
182+
if err != nil {
183+
return fmt.Errorf("failed to get roots for round %d: %w", round, err)
184+
}
185+
if lenRoots := len(roots); lenRoots != 2 {
186+
return fmt.Errorf("unexpected number of roots: got %d, want %d", lenRoots, 2)
187+
}
188+
return createCheckpoints(ctx, ndb, roots, outputDir)
189+
}
190+
191+
func createCheckpoints(ctx context.Context, ndb api.NodeDB, roots []node.Root, outputDir string) error {
192+
creator, err := checkpoint.NewFileCreator(outputDir, ndb)
193+
if err != nil {
194+
return fmt.Errorf("failed to create checkpoint file creator: %w", err)
195+
}
196+
197+
for _, root := range roots {
198+
_, err := creator.CreateCheckpoint(ctx, root, 8*1024*1024, 8) // 8MiB chunks
199+
if err != nil {
200+
return fmt.Errorf("failed to create checkpoint (rootType: %s): %w", root.Type, err)
201+
}
202+
}
203+
204+
return nil
205+
}
206+
207+
func restoreConsensusCp(ctx context.Context, dataDir, inputDir string) error {
208+
ndb, close, err := openConsensusNodeDB(cmdCommon.DataDir())
209+
if err != nil {
210+
return fmt.Errorf("failed to open consensus state DB: %w", err)
211+
}
212+
defer close()
213+
214+
if _, ok := ndb.GetLatestVersion(); ok {
215+
return fmt.Errorf("state db not empty")
216+
}
217+
218+
provider, err := checkpoint.NewFileCreator(inputDir, nil) // ndb = nil since we will only restore checkpoints.
219+
if err != nil {
220+
return fmt.Errorf("failed to create checkpoint file creator: %w", err)
221+
}
222+
223+
cps, err := provider.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{Version: 1})
224+
if err != nil {
225+
return fmt.Errorf("failed to read checkpoints: %w", err)
226+
}
227+
228+
if lenCps := len(cps); lenCps != 1 {
229+
return fmt.Errorf("unexpected number of checkpoints: got %d, want 1", lenCps)
230+
}
231+
232+
if err := restoreCheckpoints(ctx, provider, ndb, cps); err != nil {
233+
return fmt.Errorf("failed to restore checkpoint: %w", err)
234+
}
235+
236+
// TODO: This is just to show how CometBFT is meant to synchronize block and state stores
237+
// when the state sync is done offline (aka import checkpoint). Obviously we use our custom
238+
// genesis/doc provider and light clients. In theory this shows we could make bootstrap of
239+
// oasis node from the checkpoints completely trustless!
240+
//
241+
// In practice I plan to write our own version of `BootstrapUntrustedState`, where the node
242+
// creating checkpoints, so also dump bootstrap metadata.
243+
cmtConfig := cmtCfg.DefaultConfig()
244+
cmtConfig.SetRoot(filepath.Join(cmdCommon.DataDir(), cmtCommon.StateDir))
245+
dbProvider, err := cmtDB.Provider()
246+
if err != nil {
247+
return fmt.Errorf("failed to obtain db provider: %w", err)
248+
}
249+
cmtConfig.Genesis = config.GlobalConfig.Genesis.File
250+
cmtConfig.StateSync.RPCServers = config.GlobalConfig.Consensus.Providers
251+
cmtConfig.StateSync.TrustPeriod = config.GlobalConfig.Consensus.LightClient.Trust.Period
252+
cmtConfig.StateSync.TrustHeight = int64(config.GlobalConfig.Consensus.LightClient.Trust.Height)
253+
cmtConfig.StateSync.TrustHash = config.GlobalConfig.Consensus.LightClient.Trust.Hash
254+
255+
root := cps[0].Root
256+
if err := cmtNode.BootstrapState(ctx, cmtConfig, dbProvider, root.Version, root.Hash[:]); err != nil {
257+
return fmt.Errorf("failed to bootstrap cometbft dbs from the state DB: %w", err)
258+
}
259+
260+
return nil
261+
}
262+
263+
func restoreRuntimeCps(ctx context.Context, inputDir, namespace string) error {
264+
var ns common.Namespace
265+
if err := ns.UnmarshalHex(namespace); err != nil {
266+
return fmt.Errorf("malformed source runtime ID: %q: %w", ns, err)
267+
}
268+
269+
ndb, err := openRuntimeStateDB(cmdCommon.DataDir(), ns)
270+
if err != nil {
271+
return err
272+
}
273+
defer ndb.Close()
274+
275+
if _, ok := ndb.GetLatestVersion(); ok {
276+
return fmt.Errorf("state db not empty")
277+
}
278+
279+
cpDir := filepath.Join(inputDir, namespace)
280+
provider, err := checkpoint.NewFileCreator(cpDir, nil) // ndb = nil since we will only restore checkpoints.
281+
if err != nil {
282+
return fmt.Errorf("failed to create checkpoint file creator: %w", err)
283+
}
284+
285+
cps, err := provider.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{Version: 1, Namespace: ns})
286+
if err != nil {
287+
return fmt.Errorf("failed to read checkpoints: %w", err)
288+
}
289+
if err := restoreCheckpoints(ctx, provider, ndb, cps); err != nil {
290+
return fmt.Errorf("failed to restore checkpoint: %w", err)
291+
}
292+
293+
return nil
294+
}
295+
296+
func restoreCheckpoints(ctx context.Context, provider checkpoint.ChunkProvider, ndb api.NodeDB, cps []*checkpoint.Metadata) error {
297+
var roots []node.Root
298+
for _, cp := range cps {
299+
if err := ndb.StartMultipartInsert(cp.Root.Version); err != nil {
300+
return fmt.Errorf("failed to start multipart insert: %w", err)
301+
}
302+
defer ndb.AbortMultipartInsert()
303+
304+
for idx, _ := range cp.Chunks {
305+
chunk, err := cp.GetChunkMetadata(uint64(idx))
306+
if err != nil {
307+
return fmt.Errorf("failed to get chunk metadata: %w", err)
308+
}
309+
var buf bytes.Buffer
310+
if err := provider.GetCheckpointChunk(ctx, chunk, &buf); err != nil {
311+
return fmt.Errorf("failed to read checkpoint chunk (idx: %d): %w", idx, err)
312+
}
313+
if err := checkpoint.RestoreChunk(ctx, ndb, chunk, &buf); err != nil {
314+
return fmt.Errorf("failed to restore chunk: %w", err)
315+
}
316+
}
317+
roots = append(roots, cp.Root)
318+
}
319+
if err := ndb.Finalize(roots); err != nil {
320+
return fmt.Errorf("failed to finalize: %w", err) // TODO maybe you need to force sync?
321+
}
322+
323+
return nil
324+
}

go/oasis-node/cmd/storage/storage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,5 +605,6 @@ func Register(parentCmd *cobra.Command) {
605605
storageCmd.AddCommand(storageCompactCmd)
606606
storageCmd.AddCommand(pruneCmd)
607607
storageCmd.AddCommand(newInspectCmd())
608+
storageCmd.AddCommand(newCheckpointCmd())
608609
parentCmd.AddCommand(storageCmd)
609610
}

go/storage/mkvs/checkpoint/chunk.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func writeChunk(proof *syncer.Proof, w io.Writer) (hash.Hash, error) {
262262
return hb.Build(), nil
263263
}
264264

265-
func restoreChunk(ctx context.Context, ndb db.NodeDB, chunk *ChunkMetadata, r io.Reader) error {
265+
func RestoreChunk(ctx context.Context, ndb db.NodeDB, chunk *ChunkMetadata, r io.Reader) error {
266266
hb := hash.NewBuilder()
267267
tr := io.TeeReader(r, hb)
268268
sr := snappy.NewReader(tr)

go/storage/mkvs/checkpoint/restorer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (rs *restorer) RestoreChunk(ctx context.Context, idx uint64, r io.Reader) (
8383
return false, err
8484
}
8585

86-
err = restoreChunk(ctx, rs.ndb, chunk, r)
86+
err = RestoreChunk(ctx, rs.ndb, chunk, r)
8787
switch {
8888
case err == nil:
8989
case errors.Is(err, ErrChunkProofVerificationFailed):

0 commit comments

Comments
 (0)