From 198fc57d1e6ee57296eae46a12ba48a86ddc4298 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 12:16:31 -0700 Subject: [PATCH 01/12] go/store/nbs: Add some machinery to move towards lazy-loading NomsBlockStore in some cases. --- go/store/nbs/journal.go | 20 +++--- go/store/nbs/journal_test.go | 11 +-- go/store/nbs/store.go | 128 ++++++++++++++++++++++++++++++++++- 3 files changed, 146 insertions(+), 13 deletions(-) diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index d5e5ce341d2..6cfd09ace6b 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -607,22 +607,26 @@ func (c journalConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees []t return c.child.chooseConjoinees(pruned) } -// newJournalManifest makes a new file manifest. -// When failOnTimeout is true, callers want a hard error instead of falling back to read-only mode. -// (The behavior change is implemented separately; this is the plumbing flag.) -func newJournalManifest(ctx context.Context, dir string, failOnTimeout bool) (m *journalManifest, err error) { +func newJournalLock(dir string, failOnTimeout bool) (*fslock.Lock, chunks.ExclusiveAccessMode, error) { lock := fslock.New(filepath.Join(dir, lockFileName)) // try to take the file lock. if we fail, make the manifest read-only. // if we succeed, hold the file lock until we close the journalManifest - err = lock.LockWithTimeout(lockFileTimeout) + err := lock.LockWithTimeout(lockFileTimeout) if errors.Is(err, fslock.ErrTimeout) { if failOnTimeout { - return nil, ErrDatabaseLocked + return nil, chunks.ExclusiveAccessMode_ReadOnly, ErrDatabaseLocked } - lock, err = nil, nil // read only + return nil, chunks.ExclusiveAccessMode_ReadOnly, nil } else if err != nil { - return nil, err + return nil, chunks.ExclusiveAccessMode_ReadOnly, err } + return lock, chunks.ExclusiveAccessMode_Exclusive, nil +} + +// newJournalManifest makes a new file manifest. +// When failOnTimeout is true, callers want a hard error instead of falling back to read-only mode. +// (The behavior change is implemented separately; this is the plumbing flag.) +func newJournalManifest(ctx context.Context, dir string, lock *fslock.Lock) (m *journalManifest, err error) { m = &journalManifest{dir: dir, lock: lock} var f *os.File diff --git a/go/store/nbs/journal_test.go b/go/store/nbs/journal_test.go index 04245fb9b2b..03c5eb1106d 100644 --- a/go/store/nbs/journal_test.go +++ b/go/store/nbs/journal_test.go @@ -37,7 +37,9 @@ func makeTestChunkJournal(t *testing.T) *ChunkJournal { dir, err := os.MkdirTemp("", "") require.NoError(t, err) t.Cleanup(func() { file.RemoveAll(dir) }) - m, err := newJournalManifest(ctx, dir, false) + l, _, err := newJournalLock(dir, false) + require.NoError(t, err) + m, err := newJournalManifest(ctx, dir, l) require.NoError(t, err) q := NewUnlimitedMemQuotaProvider() p := newFSTablePersister(dir, q, false) @@ -52,7 +54,9 @@ func makeTestChunkJournal(t *testing.T) *ChunkJournal { } func openTestChunkJournal(t *testing.T, dir string) *ChunkJournal { - m, err := newJournalManifest(t.Context(), dir, false) + l, _, err := newJournalLock(dir, false) + require.NoError(t, err) + m, err := newJournalManifest(t.Context(), dir, l) require.NoError(t, err) q := NewUnlimitedMemQuotaProvider() p := newFSTablePersister(dir, q, false) @@ -107,8 +111,7 @@ func TestChunkJournalReadOnly(t *testing.T) { rw := makeTestChunkJournal(t) assert.Equal(t, chunks.ExclusiveAccessMode(chunks.ExclusiveAccessMode_Exclusive), rw.AccessMode()) - _, err := newJournalManifest(t.Context(), rw.backing.dir, true) - require.Error(t, err) + _, _, err := newJournalLock(rw.backing.dir, true) require.ErrorIs(t, err, ErrDatabaseLocked) }) } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index d5c67ca1503..373666a3b0a 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -139,6 +139,15 @@ type NomsBlockStore struct { fatalBehavior dherrors.FatalBehavior closed bool + + staticAccessMode chunks.ExclusiveAccessMode + loadOnce sync.Once + + // If loadThunk is passed |false|, it is being called as part + // of Close(). It should clean up any retained resources, + // instead of loading the database. + loadThunk func(loadIt bool) + loadErr error } func (nbs *NomsBlockStore) PersistGhostHashes(ctx context.Context, refs hash.HashSet) error { @@ -161,8 +170,20 @@ type Range struct { DictLength uint32 } +func (nbs *NomsBlockStore) ensureLoad() error { + if nbs.loadThunk != nil { + nbs.loadOnce.Do(func() { + nbs.loadThunk(true) + }) + } + return nbs.loadErr +} + // IterateRoots iterates over the in-memory roots tracked by the ChunkJournal, if there is one. func (nbs *NomsBlockStore) IterateRoots(f func(root string, timestamp *time.Time) error) error { + if err := nbs.ensureLoad(); err != nil { + return err + } cj := nbs.chunkJournal() if cj == nil { return nil @@ -179,6 +200,9 @@ func (nbs *NomsBlockStore) chunkJournal() *ChunkJournal { } func (nbs *NomsBlockStore) ChunkJournalSize() (int64, bool) { + if err := nbs.ensureLoad(); err != nil { + return 0, false + } nbs.mu.Lock() defer nbs.mu.Unlock() if cj := nbs.chunkJournal(); cj != nil { @@ -188,6 +212,9 @@ func (nbs *NomsBlockStore) ChunkJournalSize() (int64, bool) { } func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { + if err := nbs.ensureLoad(); err != nil { + return nil, err + } valctx.ValidateContext(ctx) sourcesToRanges, err := nbs.getChunkLocations(ctx, hashes) if err != nil { @@ -271,6 +298,9 @@ func (nbs *NomsBlockStore) getChunkLocations(ctx context.Context, hashes hash.Ha } func (nbs *NomsBlockStore) GetChunkLocations(ctx context.Context, hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) { + if err := nbs.ensureLoad(); err != nil { + return nil, err + } valctx.ValidateContext(ctx) sourcesToRanges, err := nbs.getChunkLocations(ctx, hashes) if err != nil { @@ -394,6 +424,9 @@ func (nbs *NomsBlockStore) finalizeConjoin(ctx context.Context, err error) { } func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (ManifestInfo, error) { + if err := nbs.ensureLoad(); err != nil { + return nil, err + } valctx.ValidateContext(ctx) sources, err := nbs.openChunkSourcesForManifestUpdateAndRebase(ctx, updates, nil) if err != nil { @@ -545,6 +578,9 @@ func (nbs *NomsBlockStore) updateManifestAddFiles(ctx context.Context, updates m } func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updates map[hash.Hash]uint32, option ManifestAppendixOption) (ManifestInfo, error) { + if err := nbs.ensureLoad(); err != nil { + return nil, err + } valctx.ValidateContext(ctx) sources, err := nbs.openChunkSourcesForManifestUpdateAndRebase(ctx, updates, nil) if err != nil { @@ -606,6 +642,9 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp // assumes that stores grow monotonically unless the |gcGen| of a manifest changes. Since this interface // cannot set |gcGen|, callers must ensure that calls to this function grow the store monotonically. func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root hash.Hash, tableFiles map[hash.Hash]uint32, appendixTableFiles map[hash.Hash]uint32) (err error) { + if err := store.ensureLoad(); err != nil { + return err + } store.mu.Lock() defer store.mu.Unlock() contents := manifestContents{ @@ -757,7 +796,9 @@ func NewLocalJournalingStoreWithOptions(ctx context.Context, nbfVers, dir string return nil, err } - m, err := newJournalManifest(ctx, dir, opts.FailOnLockTimeout) + lock, _, err := newJournalLock(dir, opts.FailOnLockTimeout) + + m, err := newJournalManifest(ctx, dir, lock) if err != nil { return nil, err } @@ -871,6 +912,9 @@ func (nbs *NomsBlockStore) waitForGC(ctx context.Context, cycle uint64) error { } func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.InsertAddrsCurry) error { + if err := nbs.ensureLoad(); err != nil { + return err + } valctx.ValidateContext(ctx) return nbs.putChunk(ctx, c, getAddrs, nbs.refCheck) } @@ -994,6 +1038,9 @@ func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) err } func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) { + if err := nbs.ensureLoad(); err != nil { + return chunks.Chunk{}, err + } valctx.ValidateContext(ctx) ctx, span := tracer.Start(ctx, "nbs.Get") defer span.End() @@ -1058,6 +1105,9 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, } func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error { + if err := nbs.ensureLoad(); err != nil { + return err + } valctx.ValidateContext(ctx) ctx, span := tracer.Start(ctx, "nbs.GetMany", trace.WithAttributes(attribute.Int("num_hashes", len(hashes)))) defer span.End() @@ -1069,6 +1119,9 @@ func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, fou } func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, ToChunker)) error { + if err := nbs.ensureLoad(); err != nil { + return err + } valctx.ValidateContext(ctx) return nbs.getManyCompressed(ctx, hashes, found, gcDependencyMode_TakeDependency) } @@ -1179,6 +1232,9 @@ func toGetRecords(hashes hash.HashSet) []getRecord { } func (nbs *NomsBlockStore) Count() (uint32, error) { + if err := nbs.ensureLoad(); err != nil { + return 0, err + } count, tables := func() (count uint32, tables chunkReader) { nbs.mu.RLock() defer nbs.mu.RUnlock() @@ -1193,6 +1249,9 @@ func (nbs *NomsBlockStore) Count() (uint32, error) { } func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) { + if err := nbs.ensureLoad(); err != nil { + return false, err + } valctx.ValidateContext(ctx) t1 := time.Now() defer func() { @@ -1251,6 +1310,9 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) { } func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { + if err := nbs.ensureLoad(); err != nil { + return nil, err + } valctx.ValidateContext(ctx) return nbs.hasManyDep(ctx, hashes, gcDependencyMode_TakeDependency) } @@ -1384,6 +1446,9 @@ func toHasRecords(hashes hash.HashSet) []hasRecord { } func (nbs *NomsBlockStore) Rebase(ctx context.Context) error { + if err := nbs.ensureLoad(); err != nil { + return err + } valctx.ValidateContext(ctx) nbs.mu.Lock() defer nbs.mu.Unlock() @@ -1419,6 +1484,9 @@ func (nbs *NomsBlockStore) rebase(ctx context.Context) error { } func (nbs *NomsBlockStore) Root(ctx context.Context) (hash.Hash, error) { + if err := nbs.ensureLoad(); err != nil { + return hash.Hash{}, err + } valctx.ValidateContext(ctx) nbs.mu.RLock() defer nbs.mu.RUnlock() @@ -1426,6 +1494,9 @@ func (nbs *NomsBlockStore) Root(ctx context.Context) (hash.Hash, error) { } func (nbs *NomsBlockStore) Commit(ctx context.Context, current, last hash.Hash) (success bool, err error) { + if err := nbs.ensureLoad(); err != nil { + return false, err + } valctx.ValidateContext(ctx) return nbs.commit(ctx, current, last, nbs.refCheck) } @@ -1588,16 +1659,32 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has } func (nbs *NomsBlockStore) Version() string { + if err := nbs.ensureLoad(); err != nil { + panic(err) + } nbs.mu.RLock() defer nbs.mu.RUnlock() return nbs.upstream.nbfVers } func (nbs *NomsBlockStore) AccessMode() chunks.ExclusiveAccessMode { + if nbs.loadThunk != nil { + return nbs.staticAccessMode + } return nbs.persister.AccessMode() } func (nbs *NomsBlockStore) Close() error { + if nbs.loadThunk != nil { + loaded := true + nbs.loadOnce.Do(func() { + nbs.loadThunk(false) + loaded = false + }) + if !loaded { + return nil + } + } nbs.mu.Lock() defer nbs.mu.Unlock() nbs.closed = true @@ -1624,6 +1711,9 @@ func (nbs *NomsBlockStore) Stats() interface{} { } func (nbs *NomsBlockStore) StatsSummary() string { + if err := nbs.ensureLoad(); err != nil { + return "failed to load" + } nbs.mu.Lock() defer nbs.mu.Unlock() cnt := nbs.tables.count() @@ -1668,6 +1758,9 @@ func (tf tableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) { // Sources retrieves the current root hash, a list of all table files (which may include appendix tablefiles), // and a second list of only the appendix table files func (nbs *NomsBlockStore) Sources(ctx context.Context) (chunks.TableFileSources, error) { + if err := nbs.ensureLoad(); err != nil { + return chunks.TableFileSources{}, err + } valctx.ValidateContext(ctx) nbs.mu.Lock() defer nbs.mu.Unlock() @@ -1749,6 +1842,9 @@ func newTableFile(cs chunkSource, info tableSpec, behavior dherrors.FatalBehavio } func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) { + if err := nbs.ensureLoad(); err != nil { + return 0, err + } nbs.mu.Lock() defer nbs.mu.Unlock() @@ -1776,6 +1872,9 @@ func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[hash.Hash]chunkSource, erro } func (nbs *NomsBlockStore) SupportedOperations() chunks.TableFileStoreOps { + if err := nbs.ensureLoad(); err != nil { + panic(err) + } var ok bool _, ok = nbs.persister.(tableFilePersister) @@ -1788,6 +1887,9 @@ func (nbs *NomsBlockStore) SupportedOperations() chunks.TableFileStoreOps { } func (nbs *NomsBlockStore) Path() (string, bool) { + if err := nbs.ensureLoad(); err != nil { + return "", false + } if tfp, ok := nbs.persister.(tableFilePersister); ok { switch p := tfp.(type) { case *fsTablePersister, *ChunkJournal: @@ -1801,6 +1903,9 @@ func (nbs *NomsBlockStore) Path() (string, bool) { // WriteTableFile will read a table file from the provided reader and write it to the TableFileStore func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, splitOffset uint64, numChunks int, _ []byte, getRd func() (io.ReadCloser, uint64, error)) (io.Closer, error) { + if err := nbs.ensureLoad(); err != nil { + return nil, err + } valctx.ValidateContext(ctx) tfp, ok := nbs.persister.(tableFilePersister) if !ok { @@ -1827,6 +1932,9 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, // AddTableFilesToManifest adds table files to the manifest func (nbs *NomsBlockStore) AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int, getAddrs chunks.InsertAddrsCurry) error { + if err := nbs.ensureLoad(); err != nil { + return err + } valctx.ValidateContext(ctx) return nbs.addTableFilesToManifest(ctx, fileIdToNumChunks, getAddrs, nbs.refCheck, nil) } @@ -2002,6 +2110,9 @@ func (nbs *NomsBlockStore) openChunkSourcesForManifestUpdateAndRebase(ctx contex // PruneTableFiles deletes old table files that are no longer referenced in the manifest. func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) { + if err := nbs.ensureLoad(); err != nil { + return err + } valctx.ValidateContext(ctx) return nbs.pruneTableFiles(ctx) } @@ -2011,6 +2122,9 @@ func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context) (err error) { } func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool, _ chunks.GCMode) error { + if err := nbs.ensureLoad(); err != nil { + return err + } nbs.mu.Lock() defer nbs.mu.Unlock() return nbs.lockedBeginGC(keeper) @@ -2099,6 +2213,9 @@ func (nbs *NomsBlockStore) beginRead() (endRead func()) { } func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrs, filter chunks.HasManyFunc, dest chunks.ChunkStore, gcConfig chunks.GCConfig, incrementalUpdateManifest bool) (chunks.MarkAndSweeper, error) { + if err := nbs.ensureLoad(); err != nil { + return nil, err + } valctx.ValidateContext(ctx) return markAndSweepChunks(ctx, nbs, nbs, dest, getAddrs, filter, gcConfig, incrementalUpdateManifest) } @@ -2413,6 +2530,9 @@ func (gcf gcFinalizer) Close() error { } func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { + if err := nbs.ensureLoad(); err != nil { + return err + } for _, v := range nbs.tables.novel { err := v.iterateAllChunks(ctx, cb, nbs.stats) if err != nil { @@ -2435,6 +2555,9 @@ func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk c } func (nbs *NomsBlockStore) TolerantIterateAllChunks(ctx context.Context, cb func(chunks.Chunk), errCb func(sourceFile string, err error)) { + if err := nbs.ensureLoad(); err != nil { + panic(err) + } for _, v := range nbs.tables.novel { fileName := v.hash().String() + v.suffix() v.tolerantIterateAllChunks(ctx, cb, func(err error) { errCb(fileName, err) }, nbs.stats) @@ -2554,6 +2677,9 @@ func CalcReads(nbs *NomsBlockStore, hashes hash.HashSet, blockSize uint64, keepe // files in oldgen are conjoined together. // Returns the hash of the newly created conjoined table file. func (nbs *NomsBlockStore) ConjoinTableFiles(ctx context.Context, storageIds []hash.Hash) (hash.Hash, error) { + if err := nbs.ensureLoad(); err != nil { + return hash.Hash{}, err + } nbs.mu.Lock() defer nbs.mu.Unlock() From 83f8dd358a3e203d8b1ea67074824f496b304c27 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 13:14:11 -0700 Subject: [PATCH 02/12] go/store/nbs: Lazily load chunk journal database resources when constructing a NomsBlockStore. Most Dolt commands currently construct a MultiRepoEnv and then call into the SQL layer to perform the actual work. In turn, the SQL layer has some logic which looks at the state of the loaded databases and sees whether the process has ended up loading some of them in ReadOnly mode. If it has, then in some cases the Dolt process will attempt to connect to a running `dolt sql-server` instead of doing the work against the database files itself. This PR makes is to that the application code paths which inspect AccessMode == ReadOnly can get that signal and make the attempt to reach out to the server *before* all of the database state has to be loaded from disk. This greatly improves the performance of running `dolt` CLI commands against a running `dolt sql-server`, from the same directory as the server data-dir. --- go/cmd/dolt/commands/fsck.go | 5 + go/libraries/doltcore/env/environment.go | 67 ++++---- go/libraries/doltcore/env/multi_repo_env.go | 3 +- go/store/nbs/journal_record.go | 13 +- go/store/nbs/journal_record_test.go | 16 +- go/store/nbs/journal_writer.go | 2 +- go/store/nbs/store.go | 172 +++++++++++++------- integration-tests/bats/chunk-journal.bats | 2 +- integration-tests/bats/fsck.bats | 2 +- 9 files changed, 173 insertions(+), 109 deletions(-) diff --git a/go/cmd/dolt/commands/fsck.go b/go/cmd/dolt/commands/fsck.go index 52d12abac4d..3cb665df847 100644 --- a/go/cmd/dolt/commands/fsck.go +++ b/go/cmd/dolt/commands/fsck.go @@ -133,6 +133,11 @@ func (cmd FsckCmd) Exec(ctx context.Context, commandStr string, args []string, d ddb, _, _, err := dbFact.CreateDbNoCache(ctx, types.Format_DOLT, u, params, func(vErr error) { report.ScanErrs.AppendE(vErr) }) + if err == nil { + // Creating NomsBlockStore can lazily load the actual database resources. We force them here by performing + // an actual read against the database. + _, err = datas.ChunkStoreFromDatabase(ddb).Root(ctx) + } if err != nil { if errors.Is(err, nbs.ErrJournalDataLoss) { cli.PrintErrln("WARNING: Chunk journal is corrupted and some data may be lost.") diff --git a/go/libraries/doltcore/env/environment.go b/go/libraries/doltcore/env/environment.go index 9622deea616..fe6af7dffe2 100644 --- a/go/libraries/doltcore/env/environment.go +++ b/go/libraries/doltcore/env/environment.go @@ -247,45 +247,48 @@ func LoadDoltDB(ctx context.Context, dEnv *DoltEnv) { dEnv.doltDB = ddb dEnv.DBLoadError = dbLoadErr - if dbLoadErr == nil && dEnv.HasDoltDir() { - if !dEnv.HasDoltTempTableDir() { - tmpDir, err := dEnv.TempTableFilesDir() - if err != nil { - dEnv.DBLoadError = err - } - err = dEnv.FS.MkDirs(tmpDir) - dEnv.DBLoadError = err - } else { - // fire and forget cleanup routine. Will delete as many old temp files as it can during the main commands execution. - // The process will not wait for this to finish so this may not always complete. - go func() { - // TODO dEnv.HasDoltTempTableDir() true but dEnv.TempTableFileDir() panics - tmpTableDir, err := dEnv.FS.Abs(filepath.Join(dEnv.urlStr, dbfactory.DoltDir, tempTablesDir)) + if ddb != nil && ddb.AccessMode() != chunks.ExclusiveAccessMode_ReadOnly { + // Only do the following when we have write access to the database. + if dbLoadErr == nil && dEnv.HasDoltDir() { + if !dEnv.HasDoltTempTableDir() { + tmpDir, err := dEnv.TempTableFilesDir() if err != nil { - return + dEnv.DBLoadError = err } - _ = dEnv.FS.Iter(tmpTableDir, true, func(path string, size int64, isDir bool) (stop bool) { - if !isDir { - lm, exists := dEnv.FS.LastModified(path) + err = dEnv.FS.MkDirs(tmpDir) + dEnv.DBLoadError = err + } else { + // fire and forget cleanup routine. Will delete as many old temp files as it can during the main commands execution. + // The process will not wait for this to finish so this may not always complete. + go func() { + // TODO dEnv.HasDoltTempTableDir() true but dEnv.TempTableFileDir() panics + tmpTableDir, err := dEnv.FS.Abs(filepath.Join(dEnv.urlStr, dbfactory.DoltDir, tempTablesDir)) + if err != nil { + return + } + _ = dEnv.FS.Iter(tmpTableDir, true, func(path string, size int64, isDir bool) (stop bool) { + if !isDir { + lm, exists := dEnv.FS.LastModified(path) - if exists && time.Now().Sub(lm) > (time.Hour*24) { - _ = dEnv.FS.DeleteFile(path) + if exists && time.Now().Sub(lm) > (time.Hour*24) { + _ = dEnv.FS.DeleteFile(path) + } } - } - return false - }) - }() + return false + }) + }() + } } - } - if dEnv.RSLoadErr == nil && dbLoadErr == nil { - // If the working set isn't present in the DB, create it from the repo state. This step can be removed post 1.0. - _, err := dEnv.WorkingSet(ctx) - if errors.Is(err, doltdb.ErrWorkingSetNotFound) { - _ = dEnv.initWorkingSetFromRepoState(ctx) - } else if err != nil { - dEnv.RSLoadErr = err + if dEnv.RSLoadErr == nil && dbLoadErr == nil { + // If the working set isn't present in the DB, create it from the repo state. This step can be removed post 1.0. + _, err := dEnv.WorkingSet(ctx) + if errors.Is(err, doltdb.ErrWorkingSetNotFound) { + _ = dEnv.initWorkingSetFromRepoState(ctx) + } else if err != nil { + dEnv.RSLoadErr = err + } } } }) diff --git a/go/libraries/doltcore/env/multi_repo_env.go b/go/libraries/doltcore/env/multi_repo_env.go index d045dd01328..5a41949914e 100644 --- a/go/libraries/doltcore/env/multi_repo_env.go +++ b/go/libraries/doltcore/env/multi_repo_env.go @@ -191,9 +191,10 @@ func multiEnvForConfigDirectoryEnv(ctx context.Context, config config.ReadWriteC LoadDoltDB(ctx, dEnv) dbErr := dEnv.DBLoadError if dbErr != nil { + // XXX: We are not guaranteed to see these errors here, because the database might + // be lazily loaded. if errors.Is(dbErr, nbs.ErrJournalDataLoss) { logrus.Errorf("failed to load database %s with error: %s", dbName, dbErr.Error()) - logrus.Errorf("please run 'dolt fsck' to assess the damage and attempt repairs") } else if !errors.Is(dbErr, doltdb.ErrMissingDoltDataDir) { logrus.Warnf("failed to load database with error: %s", dbErr.Error()) } diff --git a/go/store/nbs/journal_record.go b/go/store/nbs/journal_record.go index de91bcfde44..c1c5d62a83b 100644 --- a/go/store/nbs/journal_record.go +++ b/go/store/nbs/journal_record.go @@ -274,7 +274,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) noOp := func(o int64, r journalRec) error { return nil } // First verify that the journal has data loss. var offset int64 - offset, err = processJournalRecords(context.Background(), journalFile, true /* tryTruncate */, 0, noOp, nil) + offset, err = processJournalRecords(context.Background(), journalPath, journalFile, true /* tryTruncate */, 0, noOp, nil) if err == nil { // No data loss detected, nothing to do. return "", fmt.Errorf("no data loss detected in chunk journal file; no recovery performed") @@ -420,7 +420,7 @@ func processJournalRecordsReader(ctx context.Context, r io.Reader, offin int64, // // The |warningsCb| callback is called with any errors encountered that we automatically recover from. This allows the caller // to handle the situation in a context specific way. -func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate bool, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) { +func processJournalRecords(ctx context.Context, path string, r io.ReadSeeker, tryTruncate bool, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) { var ( recovered bool rdr *bufio.Reader @@ -451,7 +451,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate boo } } if dataLossFound { - return off, NewJournalDataLossError(off) + return off, NewJournalDataLossError(path, off) } } @@ -480,8 +480,11 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate boo var ErrJournalDataLoss = errors.New("corrupted journal") -func NewJournalDataLossError(offset int64) error { - return fmt.Errorf("possible data loss detected in journal file at offset %d: %w", offset, ErrJournalDataLoss) +// Because the database might be lazily loaded, this error can end up returned from many paths on +// first access of the database. For that reason, includes UX instructions to the user, in a slight +// layering violation. +func NewJournalDataLossError(path string, offset int64) error { + return fmt.Errorf("possible data loss detected in journal file %s at offset %d: %w\nplease run 'dolt fsck' to assess the damage and attempt repairs", path, offset, ErrJournalDataLoss) } // possibleDataLossCheck checks for parsable data remaining in |reader| which constitutes data loss. When calling this diff --git a/go/store/nbs/journal_record_test.go b/go/store/nbs/journal_record_test.go index fa1154a7b1f..cd91d7537f1 100644 --- a/go/store/nbs/journal_record_test.go +++ b/go/store/nbs/journal_record_test.go @@ -116,7 +116,7 @@ func TestProcessJournalRecords(t *testing.T) { } var recoverErr error - n, err := processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) + n, err := processJournalRecords(ctx, "", bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) require.NoError(t, err) @@ -125,7 +125,7 @@ func TestProcessJournalRecords(t *testing.T) { // write a bogus record to the end and verify that we don't get an error i, sum = 0, 0 writeCorruptJournalRecord(journal[off:]) - n, err = processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) + n, err = processJournalRecords(ctx, "", bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) require.NoError(t, err) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) @@ -227,7 +227,7 @@ func TestJournalForDataLoss(t *testing.T) { } var recoverErr error - _, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), true, 0, check, func(e error) { recoverErr = e }) + _, err := processJournalRecords(ctx, "", bytes.NewReader(journal[:off]), true, 0, check, func(e error) { recoverErr = e }) if td.lossExpected { require.Error(t, err) @@ -269,7 +269,7 @@ func TestJournalTruncated(t *testing.T) { off += writeRootHashRecord(journal[off:], r.address) for i := range 16 { var recoverErr error - _, err := processJournalRecords(t.Context(), bytes.NewReader(journal[:int(off)-i]), true, 0, func(int64, journalRec) error { return nil }, func(e error) { recoverErr = e }) + _, err := processJournalRecords(t.Context(), "", bytes.NewReader(journal[:int(off)-i]), true, 0, func(int64, journalRec) error { return nil }, func(e error) { recoverErr = e }) require.NoError(t, err, "err should be nil, iteration %d", i) require.NoError(t, recoverErr, "recoverErr should be nil, iteration %d", i) } @@ -285,7 +285,7 @@ func TestJournalTruncated(t *testing.T) { off += writeChunkRecord(journal[off:], mustCompressedChunk(r)) } } - _, err := processJournalRecords(t.Context(), bytes.NewReader(journal[:off]), true, 0, func(int64, journalRec) error { return nil }, nil) + _, err := processJournalRecords(t.Context(), "", bytes.NewReader(journal[:off]), true, 0, func(int64, journalRec) error { return nil }, nil) require.Error(t, err) } @@ -334,7 +334,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) { // no confidence in the rest of the test. ctx := context.Background() var recoverErr error - bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) + bytesRead, err := processJournalRecords(ctx, "", bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) require.NoError(t, err) require.Equal(t, off, uint32(bytesRead)) require.Error(t, recoverErr) // We do expect a warning here, but no data loss. @@ -358,7 +358,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) { // Copy lost data into journal buffer at the test offset. copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], lostData) - _, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) + _, err := processJournalRecords(ctx, "", bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) require.Error(t, err) require.True(t, errors.Is(err, ErrJournalDataLoss)) require.Error(t, recoverErr) @@ -502,7 +502,7 @@ func processJournalAndCollectRecords(t *testing.T, journalData []byte) []testRec t.FailNow() } - _, err := processJournalRecords(ctx, bytes.NewReader(journalData), true, 0, func(offset int64, rec journalRec) error { + _, err := processJournalRecords(ctx, "", bytes.NewReader(journalData), true, 0, func(offset int64, rec journalRec) error { records = append(records, testRecord{hash: rec.address, kind: rec.kind}) return nil }, warnCb) diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index 0e9c62f16c0..4392e5866f8 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -277,7 +277,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, canWrite bool, re // process the non-indexed portion of the journal starting at |wr.indexed|, // at minimum the non-indexed portion will include a root hash record. // Index lookups are added to the ongoing batch to re-synchronize. - wr.off, err = processJournalRecords(ctx, wr.journal, canWrite, wr.indexed, func(o int64, r journalRec) error { + wr.off, err = processJournalRecords(ctx, p, wr.journal, canWrite, wr.indexed, func(o int64, r journalRec) error { switch r.kind { case chunkJournalRecKind: rng := Range{ diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 373666a3b0a..938b8cb1a9d 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -28,6 +28,7 @@ import ( "io" "os" "path/filepath" + "runtime/debug" "sort" "strings" "sync" @@ -36,6 +37,7 @@ import ( "cloud.google.com/go/storage" "github.com/dustin/go-humanize" + "github.com/fatih/color" lru "github.com/hashicorp/golang-lru/v2" "github.com/oracle/oci-go-sdk/v65/common" "github.com/oracle/oci-go-sdk/v65/objectstorage" @@ -49,6 +51,7 @@ import ( "github.com/dolthub/dolt/go/libraries/utils/valctx" "github.com/dolthub/dolt/go/store/blobstore" "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/constants" "github.com/dolthub/dolt/go/store/hash" ) @@ -141,12 +144,13 @@ type NomsBlockStore struct { closed bool staticAccessMode chunks.ExclusiveAccessMode + staticVersion string loadOnce sync.Once // If loadThunk is passed |false|, it is being called as part // of Close(). It should clean up any retained resources, // instead of loading the database. - loadThunk func(loadIt bool) + loadThunk func(ctx context.Context, loadIt bool) loadErr error } @@ -170,10 +174,10 @@ type Range struct { DictLength uint32 } -func (nbs *NomsBlockStore) ensureLoad() error { +func (nbs *NomsBlockStore) ensureLoad(ctx context.Context) error { if nbs.loadThunk != nil { nbs.loadOnce.Do(func() { - nbs.loadThunk(true) + nbs.loadThunk(ctx, true) }) } return nbs.loadErr @@ -181,7 +185,7 @@ func (nbs *NomsBlockStore) ensureLoad() error { // IterateRoots iterates over the in-memory roots tracked by the ChunkJournal, if there is one. func (nbs *NomsBlockStore) IterateRoots(f func(root string, timestamp *time.Time) error) error { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(context.TODO()); err != nil { return err } cj := nbs.chunkJournal() @@ -200,7 +204,7 @@ func (nbs *NomsBlockStore) chunkJournal() *ChunkJournal { } func (nbs *NomsBlockStore) ChunkJournalSize() (int64, bool) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(context.Background()); err != nil { return 0, false } nbs.mu.Lock() @@ -212,7 +216,7 @@ func (nbs *NomsBlockStore) ChunkJournalSize() (int64, bool) { } func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return nil, err } valctx.ValidateContext(ctx) @@ -298,7 +302,7 @@ func (nbs *NomsBlockStore) getChunkLocations(ctx context.Context, hashes hash.Ha } func (nbs *NomsBlockStore) GetChunkLocations(ctx context.Context, hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return nil, err } valctx.ValidateContext(ctx) @@ -424,7 +428,7 @@ func (nbs *NomsBlockStore) finalizeConjoin(ctx context.Context, err error) { } func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (ManifestInfo, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return nil, err } valctx.ValidateContext(ctx) @@ -578,7 +582,7 @@ func (nbs *NomsBlockStore) updateManifestAddFiles(ctx context.Context, updates m } func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updates map[hash.Hash]uint32, option ManifestAppendixOption) (ManifestInfo, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return nil, err } valctx.ValidateContext(ctx) @@ -642,7 +646,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp // assumes that stores grow monotonically unless the |gcGen| of a manifest changes. Since this interface // cannot set |gcGen|, callers must ensure that calls to this function grow the store monotonically. func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root hash.Hash, tableFiles map[hash.Hash]uint32, appendixTableFiles map[hash.Hash]uint32) (err error) { - if err := store.ensureLoad(); err != nil { + if err := store.ensureLoad(ctx); err != nil { return err } store.mu.Lock() @@ -796,30 +800,71 @@ func NewLocalJournalingStoreWithOptions(ctx context.Context, nbfVers, dir string return nil, err } - lock, _, err := newJournalLock(dir, opts.FailOnLockTimeout) - - m, err := newJournalManifest(ctx, dir, lock) + lock, staticAccessMode, err := newJournalLock(dir, opts.FailOnLockTimeout) if err != nil { return nil, err } - p := newFSTablePersister(dir, q, mmapArchiveIndexes) - // The NomsBlockStore is not constructed yet, so bootstrapping errors should fail store - // creation rather than crash the process. Callers configure crash behavior afterwards - // via SetFatalBehavior. - journal, err := newChunkJournal(ctx, nbfVers, dir, m, p.(*fsTablePersister), dherrors.FatalBehaviorError, warningsCb) + nbs, err := newEmptyNomsBlockStore(defaultMemTableSize) if err != nil { + if lock != nil { + lock.Unlock() + } return nil, err } + nbs.staticAccessMode = staticAccessMode + nbs.staticVersion = constants.FormatDoltString + nbs.loadThunk = func(ctx context.Context, loadIt bool) { + if loadIt == false { + if lock != nil { + lock.Unlock() + } + return + } + // XXX: Just printing here for now so that we can audit when the databases are actually loaded. + fmt.Fprintf(color.Error, "actually loading database at %s\n%s\n", dir, string(debug.Stack())) + m, err := newJournalManifest(ctx, dir, lock) + if err != nil { + nbs.loadErr = err + if lock != nil { + lock.Unlock() + } + return + } + p := newFSTablePersister(dir, q, mmapArchiveIndexes).(*fsTablePersister) - // |journal| serves as both the tablePersister and (wrapped) the manifest. - // The wrapper keeps the two roles' Close paths distinct: the persister path - // closes the journal writer, while the manifest path releases the backing - // manifest's file lock. - mm := manifest(journalManifestWrapper{journal: journal}) - c := journalConjoiner{child: inlineConjoiner{defaultMaxTables}} + // The NomsBlockStore is not constructed yet, so bootstrapping errors should fail store + // creation rather than crash the process. Callers configure crash behavior afterwards + // via SetFatalBehavior. + journal, err := newChunkJournal(ctx, nbfVers, dir, m, p, dherrors.FatalBehaviorError, warningsCb) + if err != nil { + m.Close() + nbs.loadErr = err + return + } + + // |journal| serves as both the tablePersister and (wrapped) the manifest. + // The wrapper keeps the two roles' Close paths distinct: the persister path + // closes the journal writer, while the manifest path releases the backing + // manifest's file lock. + mm := manifest(journalManifestWrapper{journal: journal}) + c := journalConjoiner{child: inlineConjoiner{defaultMaxTables}} - return newNomsBlockStore(ctx, nbfVers, mm, journal, q, c, defaultMemTableSize) + nbs.manifest = mm + nbs.persister = journal + nbs.conjoiner = c + nbs.tables = newTableSet(journal, q) + nbs.upstream = manifestContents{nbfVers: nbfVers} + + if err = nbs.rebase(ctx); err != nil { + journal.Close() + mm.Close() + nbs.loadErr = err + return + } + } + + return nbs, nil } func checkDir(dir string) error { @@ -833,22 +878,15 @@ func checkDir(dir string) error { return nil } -func newNomsBlockStore(ctx context.Context, nbfVerStr string, m manifest, p tablePersister, q MemoryQuotaProvider, c conjoinStrategy, memTableSize uint64) (*NomsBlockStore, error) { +func newEmptyNomsBlockStore(memTableSize uint64) (*NomsBlockStore, error) { if memTableSize == 0 { memTableSize = defaultMemTableSize } - hasCache, err := lru.New2Q[hash.Hash, struct{}](hasCacheSize) if err != nil { return nil, err } - nbs := &NomsBlockStore{ - manifest: m, - persister: p, - conjoiner: c, - tables: newTableSet(p, q), - upstream: manifestContents{nbfVers: nbfVerStr}, memtableSz: memTableSize, hasCache: hasCache, stats: NewStats(), @@ -856,6 +894,20 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, m manifest, p tabl } nbs.gcCond = sync.NewCond(&nbs.mu) nbs.conjoinOpCond = sync.NewCond(&nbs.mu) + return nbs, nil +} + +func newNomsBlockStore(ctx context.Context, nbfVerStr string, m manifest, p tablePersister, q MemoryQuotaProvider, c conjoinStrategy, memTableSize uint64) (*NomsBlockStore, error) { + nbs, err := newEmptyNomsBlockStore(memTableSize) + if err != nil { + return nil, err + } + + nbs.manifest = m + nbs.persister = p + nbs.conjoiner = c + nbs.tables = newTableSet(p, q) + nbs.upstream = manifestContents{nbfVers: nbfVerStr} t1 := time.Now() defer nbs.stats.OpenLatency.SampleTimeSince(t1) @@ -912,7 +964,7 @@ func (nbs *NomsBlockStore) waitForGC(ctx context.Context, cycle uint64) error { } func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.InsertAddrsCurry) error { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return err } valctx.ValidateContext(ctx) @@ -1038,7 +1090,7 @@ func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) err } func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return chunks.Chunk{}, err } valctx.ValidateContext(ctx) @@ -1105,7 +1157,7 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, } func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return err } valctx.ValidateContext(ctx) @@ -1119,7 +1171,7 @@ func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, fou } func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, ToChunker)) error { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return err } valctx.ValidateContext(ctx) @@ -1232,7 +1284,7 @@ func toGetRecords(hashes hash.HashSet) []getRecord { } func (nbs *NomsBlockStore) Count() (uint32, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(context.Background()); err != nil { return 0, err } count, tables := func() (count uint32, tables chunkReader) { @@ -1249,7 +1301,7 @@ func (nbs *NomsBlockStore) Count() (uint32, error) { } func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return false, err } valctx.ValidateContext(ctx) @@ -1310,7 +1362,7 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) { } func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return nil, err } valctx.ValidateContext(ctx) @@ -1446,7 +1498,7 @@ func toHasRecords(hashes hash.HashSet) []hasRecord { } func (nbs *NomsBlockStore) Rebase(ctx context.Context) error { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return err } valctx.ValidateContext(ctx) @@ -1484,7 +1536,7 @@ func (nbs *NomsBlockStore) rebase(ctx context.Context) error { } func (nbs *NomsBlockStore) Root(ctx context.Context) (hash.Hash, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return hash.Hash{}, err } valctx.ValidateContext(ctx) @@ -1494,7 +1546,7 @@ func (nbs *NomsBlockStore) Root(ctx context.Context) (hash.Hash, error) { } func (nbs *NomsBlockStore) Commit(ctx context.Context, current, last hash.Hash) (success bool, err error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return false, err } valctx.ValidateContext(ctx) @@ -1659,8 +1711,8 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has } func (nbs *NomsBlockStore) Version() string { - if err := nbs.ensureLoad(); err != nil { - panic(err) + if nbs.staticVersion != "" { + return nbs.staticVersion } nbs.mu.RLock() defer nbs.mu.RUnlock() @@ -1678,10 +1730,10 @@ func (nbs *NomsBlockStore) Close() error { if nbs.loadThunk != nil { loaded := true nbs.loadOnce.Do(func() { - nbs.loadThunk(false) + nbs.loadThunk(context.Background(), false) loaded = false }) - if !loaded { + if !loaded || nbs.loadErr != nil { return nil } } @@ -1711,7 +1763,7 @@ func (nbs *NomsBlockStore) Stats() interface{} { } func (nbs *NomsBlockStore) StatsSummary() string { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(context.Background()); err != nil { return "failed to load" } nbs.mu.Lock() @@ -1758,7 +1810,7 @@ func (tf tableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) { // Sources retrieves the current root hash, a list of all table files (which may include appendix tablefiles), // and a second list of only the appendix table files func (nbs *NomsBlockStore) Sources(ctx context.Context) (chunks.TableFileSources, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return chunks.TableFileSources{}, err } valctx.ValidateContext(ctx) @@ -1842,7 +1894,7 @@ func newTableFile(cs chunkSource, info tableSpec, behavior dherrors.FatalBehavio } func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return 0, err } nbs.mu.Lock() @@ -1872,7 +1924,7 @@ func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[hash.Hash]chunkSource, erro } func (nbs *NomsBlockStore) SupportedOperations() chunks.TableFileStoreOps { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(context.Background()); err != nil { panic(err) } var ok bool @@ -1887,7 +1939,7 @@ func (nbs *NomsBlockStore) SupportedOperations() chunks.TableFileStoreOps { } func (nbs *NomsBlockStore) Path() (string, bool) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(context.Background()); err != nil { return "", false } if tfp, ok := nbs.persister.(tableFilePersister); ok { @@ -1903,7 +1955,7 @@ func (nbs *NomsBlockStore) Path() (string, bool) { // WriteTableFile will read a table file from the provided reader and write it to the TableFileStore func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, splitOffset uint64, numChunks int, _ []byte, getRd func() (io.ReadCloser, uint64, error)) (io.Closer, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return nil, err } valctx.ValidateContext(ctx) @@ -1932,7 +1984,7 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, // AddTableFilesToManifest adds table files to the manifest func (nbs *NomsBlockStore) AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int, getAddrs chunks.InsertAddrsCurry) error { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return err } valctx.ValidateContext(ctx) @@ -2110,7 +2162,7 @@ func (nbs *NomsBlockStore) openChunkSourcesForManifestUpdateAndRebase(ctx contex // PruneTableFiles deletes old table files that are no longer referenced in the manifest. func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return err } valctx.ValidateContext(ctx) @@ -2122,7 +2174,7 @@ func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context) (err error) { } func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool, _ chunks.GCMode) error { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(context.Background()); err != nil { return err } nbs.mu.Lock() @@ -2213,7 +2265,7 @@ func (nbs *NomsBlockStore) beginRead() (endRead func()) { } func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrs, filter chunks.HasManyFunc, dest chunks.ChunkStore, gcConfig chunks.GCConfig, incrementalUpdateManifest bool) (chunks.MarkAndSweeper, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return nil, err } valctx.ValidateContext(ctx) @@ -2530,7 +2582,7 @@ func (gcf gcFinalizer) Close() error { } func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return err } for _, v := range nbs.tables.novel { @@ -2555,7 +2607,7 @@ func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk c } func (nbs *NomsBlockStore) TolerantIterateAllChunks(ctx context.Context, cb func(chunks.Chunk), errCb func(sourceFile string, err error)) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { panic(err) } for _, v := range nbs.tables.novel { @@ -2677,7 +2729,7 @@ func CalcReads(nbs *NomsBlockStore, hashes hash.HashSet, blockSize uint64, keepe // files in oldgen are conjoined together. // Returns the hash of the newly created conjoined table file. func (nbs *NomsBlockStore) ConjoinTableFiles(ctx context.Context, storageIds []hash.Hash) (hash.Hash, error) { - if err := nbs.ensureLoad(); err != nil { + if err := nbs.ensureLoad(ctx); err != nil { return hash.Hash{}, err } nbs.mu.Lock() diff --git a/integration-tests/bats/chunk-journal.bats b/integration-tests/bats/chunk-journal.bats index a3210a67c12..ed5bbf719ee 100644 --- a/integration-tests/bats/chunk-journal.bats +++ b/integration-tests/bats/chunk-journal.bats @@ -101,7 +101,7 @@ file_size() { local grown_size=$(file_size "$journal") - run dolt status + run dolt log [ "$status" -eq 1 ] [[ "$output" =~ "invalid journal record length: 5242881 exceeds max allowed size of 5242880" ]] || false diff --git a/integration-tests/bats/fsck.bats b/integration-tests/bats/fsck.bats index f4379800e44..b98af1842b7 100644 --- a/integration-tests/bats/fsck.bats +++ b/integration-tests/bats/fsck.bats @@ -108,7 +108,7 @@ UPDATE tbl SET guid = UUID() WHERE i >= @random_id LIMIT 1;" printf '\x00\x00\x00\x00' >> "$journal" cat $BATS_CWD/corrupt_dbs/journal_data.bin >> "$journal" - run dolt status + run dolt log [ "$status" -eq 1 ] [[ "$output" =~ "please run 'dolt fsck' to assess the damage and attempt repairs" ]] || false From 47662730b99cd9e4ced90785e4ed3d235daa4704 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 14:44:32 -0700 Subject: [PATCH 03/12] go: Remove straggling XXX comments and color.Error prints from the lazy load NomsBlockStore work. --- go/libraries/doltcore/env/multi_repo_env.go | 2 -- go/store/nbs/store.go | 4 ---- 2 files changed, 6 deletions(-) diff --git a/go/libraries/doltcore/env/multi_repo_env.go b/go/libraries/doltcore/env/multi_repo_env.go index 5a41949914e..e3a33675099 100644 --- a/go/libraries/doltcore/env/multi_repo_env.go +++ b/go/libraries/doltcore/env/multi_repo_env.go @@ -191,8 +191,6 @@ func multiEnvForConfigDirectoryEnv(ctx context.Context, config config.ReadWriteC LoadDoltDB(ctx, dEnv) dbErr := dEnv.DBLoadError if dbErr != nil { - // XXX: We are not guaranteed to see these errors here, because the database might - // be lazily loaded. if errors.Is(dbErr, nbs.ErrJournalDataLoss) { logrus.Errorf("failed to load database %s with error: %s", dbName, dbErr.Error()) } else if !errors.Is(dbErr, doltdb.ErrMissingDoltDataDir) { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 938b8cb1a9d..e50d421df9a 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -28,7 +28,6 @@ import ( "io" "os" "path/filepath" - "runtime/debug" "sort" "strings" "sync" @@ -37,7 +36,6 @@ import ( "cloud.google.com/go/storage" "github.com/dustin/go-humanize" - "github.com/fatih/color" lru "github.com/hashicorp/golang-lru/v2" "github.com/oracle/oci-go-sdk/v65/common" "github.com/oracle/oci-go-sdk/v65/objectstorage" @@ -821,8 +819,6 @@ func NewLocalJournalingStoreWithOptions(ctx context.Context, nbfVers, dir string } return } - // XXX: Just printing here for now so that we can audit when the databases are actually loaded. - fmt.Fprintf(color.Error, "actually loading database at %s\n%s\n", dir, string(debug.Stack())) m, err := newJournalManifest(ctx, dir, lock) if err != nil { nbs.loadErr = err From 32c4c304ff8ff88c369ffdea5e7cd33a8fdc4714 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 14:47:45 -0700 Subject: [PATCH 04/12] go: chunks,nbs: Make BeginGC take context.Context. Helps with lazy loading. --- go/store/chunks/chunk_store.go | 2 +- go/store/chunks/memory_store.go | 2 +- go/store/chunks/test_utils.go | 4 ++-- go/store/nbs/gc_waitforgc_test.go | 2 +- go/store/nbs/generational_chunk_store.go | 6 +++--- go/store/nbs/nbs_metrics_wrapper.go | 4 ++-- go/store/nbs/store.go | 4 ++-- go/store/nbs/store_test.go | 2 +- go/store/types/value_store.go | 4 ++-- 9 files changed, 15 insertions(+), 15 deletions(-) diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index 4a85e69cb64..7d696703014 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -258,7 +258,7 @@ type ChunkStoreGarbageCollector interface { // // This function should not block indefinitely and should return an // error if a GC is already in progress. - BeginGC(addChunk func(hash.Hash) bool, mode GCMode) error + BeginGC(ctx context.Context, addChunk func(hash.Hash) bool, mode GCMode) error // EndGC indicates that the GC is over. The previously provided // addChunk function must not be called after this function. diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index 4572c4fda77..d679976846b 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -332,7 +332,7 @@ func (ms *MemoryStoreView) Commit(ctx context.Context, current, last hash.Hash) return success, nil } -func (ms *MemoryStoreView) BeginGC(keeper func(hash.Hash) bool, _ GCMode) error { +func (ms *MemoryStoreView) BeginGC(_ context.Context, keeper func(hash.Hash) bool, _ GCMode) error { return ms.transitionToGC(keeper) } diff --git a/go/store/chunks/test_utils.go b/go/store/chunks/test_utils.go index f91cf6dd6f6..0b69d4a59b3 100644 --- a/go/store/chunks/test_utils.go +++ b/go/store/chunks/test_utils.go @@ -75,12 +75,12 @@ func (s *TestStoreView) Put(ctx context.Context, c Chunk, getAddrs InsertAddrsCu return s.ChunkStore.Put(ctx, c, getAddrs) } -func (s *TestStoreView) BeginGC(keeper func(hash.Hash) bool, mode GCMode) error { +func (s *TestStoreView) BeginGC(ctx context.Context, keeper func(hash.Hash) bool, mode GCMode) error { collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector) if !ok { return ErrUnsupportedOperation } - return collector.BeginGC(keeper, mode) + return collector.BeginGC(ctx, keeper, mode) } func (s *TestStoreView) EndGC(mode GCMode) { diff --git a/go/store/nbs/gc_waitforgc_test.go b/go/store/nbs/gc_waitforgc_test.go index ee55a73d50f..61cf227b959 100644 --- a/go/store/nbs/gc_waitforgc_test.go +++ b/go/store/nbs/gc_waitforgc_test.go @@ -76,7 +76,7 @@ func TestWaitForGCNotTrappedAcrossCycles(t *testing.T) { return true } - require.NoError(t, st.BeginGC(cycle1Keeper, chunks.GCMode_Full)) + require.NoError(t, st.BeginGC(t.Context(), cycle1Keeper, chunks.GCMode_Full)) // Launch a goroutine that Puts the same chunk again. addChunk holds // nbs.mu for the entire call. The chunk already exists in the diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index 66235eeed85..b516567f0ee 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -510,8 +510,8 @@ func (gcs *GenerationalNBS) OldGenGCFilter() chunks.HasManyFunc { } } -func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool, mode chunks.GCMode) error { - err := gcs.newGen.BeginGC(keeper, mode) +func (gcs *GenerationalNBS) BeginGC(ctx context.Context, keeper func(hash.Hash) bool, mode chunks.GCMode) error { + err := gcs.newGen.BeginGC(ctx, keeper, mode) if err != nil { return err } @@ -521,7 +521,7 @@ func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool, mode chunks.GCM // going away. In Full mode, we want to take read dependencies // from the OldGen as well. if mode == chunks.GCMode_Full { - err = gcs.oldGen.BeginGC(keeper, mode) + err = gcs.oldGen.BeginGC(ctx, keeper, mode) if err != nil { gcs.newGen.EndGC(mode) return err diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index 3b77f9f65d6..3a816d82ad5 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -66,8 +66,8 @@ func (nbsMW *NBSMetricWrapper) SupportedOperations() chunks.TableFileStoreOps { return nbsMW.nbs.SupportedOperations() } -func (nbsMW *NBSMetricWrapper) BeginGC(keeper func(hash.Hash) bool, mode chunks.GCMode) error { - return nbsMW.nbs.BeginGC(keeper, mode) +func (nbsMW *NBSMetricWrapper) BeginGC(ctx context.Context, keeper func(hash.Hash) bool, mode chunks.GCMode) error { + return nbsMW.nbs.BeginGC(ctx, keeper, mode) } func (nbsMW *NBSMetricWrapper) EndGC(mode chunks.GCMode) { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index e50d421df9a..4bd1b9033da 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -2169,8 +2169,8 @@ func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context) (err error) { return nbs.persister.PruneTableFiles(ctx) } -func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool, _ chunks.GCMode) error { - if err := nbs.ensureLoad(context.Background()); err != nil { +func (nbs *NomsBlockStore) BeginGC(ctx context.Context, keeper func(hash.Hash) bool, _ chunks.GCMode) error { + if err := nbs.ensureLoad(ctx); err != nil { return err } nbs.mu.Lock() diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index b6530a8c59b..e7f8b7aecd2 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -344,7 +344,7 @@ func TestNBSCopyGC(t *testing.T) { require.NoError(t, err) require.True(t, ok) - require.NoError(t, st.BeginGC(nil, chunks.GCMode_Full)) + require.NoError(t, st.BeginGC(t.Context(), nil, chunks.GCMode_Full)) noopFilter := func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { return hashes, nil } diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index 0ac83363fdb..1328492ea9e 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -589,7 +589,7 @@ func (lvs *ValueStore) GC(ctx context.Context, gcConfig chunks.GCConfig, oldGenR } err := func() error { - err := collector.BeginGC(lvs.gcAddChunk, gcConfig.Mode) + err := collector.BeginGC(ctx, lvs.gcAddChunk, gcConfig.Mode) if err != nil { return err } @@ -678,7 +678,7 @@ func (lvs *ValueStore) GC(ctx context.Context, gcConfig chunks.GCConfig, oldGenR newGenRefs.InsertAll(oldGenRefs) err := func() error { - err := collector.BeginGC(lvs.gcAddChunk, chunks.GCMode_Full) + err := collector.BeginGC(ctx, lvs.gcAddChunk, chunks.GCMode_Full) if err != nil { return err } From 846b618b075b3d04fd3162855011b1798648f566 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 14:49:02 -0700 Subject: [PATCH 05/12] go: nbs: Make ChunkJournalSize take context.Context. Helps with lazy loading. --- go/libraries/doltcore/doltdb/doltdb.go | 5 ++++- go/store/nbs/store.go | 10 +++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index ebd642bbbe0..e84d515f9ff 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -2172,7 +2172,10 @@ func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) { if err != nil { return StoreSizes{}, err } - journalSz, ok := newGenNBS.ChunkJournalSize() + journalSz, ok, err := newGenNBS.ChunkJournalSize(ctx) + if err != nil { + return StoreSizes{}, err + } if ok { return StoreSizes{ JournalBytes: uint64(journalSz), diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 4bd1b9033da..34f7d17268f 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -201,16 +201,16 @@ func (nbs *NomsBlockStore) chunkJournal() *ChunkJournal { return nil } -func (nbs *NomsBlockStore) ChunkJournalSize() (int64, bool) { - if err := nbs.ensureLoad(context.Background()); err != nil { - return 0, false +func (nbs *NomsBlockStore) ChunkJournalSize(ctx context.Context) (int64, bool, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return 0, false, err } nbs.mu.Lock() defer nbs.mu.Unlock() if cj := nbs.chunkJournal(); cj != nil { - return cj.Size(), true + return cj.Size(), true, nil } - return 0, false + return 0, false, nil } func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { From 4498f26f4ec615713b975568f14fae0a002c533a Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 14:50:28 -0700 Subject: [PATCH 06/12] go: nbs: Make IterateRoots take context.Context. Helps with lazy loading. --- go/libraries/doltcore/doltdb/doltdb.go | 4 ++-- go/libraries/doltcore/sqle/dtablefunctions/dolt_reflog.go | 2 +- go/store/nbs/store.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index e84d515f9ff..bf37aae3b0f 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -2107,7 +2107,7 @@ func (ddb *DoltDB) restoreDefaultConjoinBehavior() { // NomsBlockStore instance which exposes its roots through // IterateRoots. Otherwise returns |nil| without visiting any roots, // including the current one. -func (ddb *DoltDB) IterateRoots(cb func(root string, timestamp *time.Time) error) error { +func (ddb *DoltDB) IterateRoots(ctx context.Context, cb func(root string, timestamp *time.Time) error) error { cs := datas.ChunkStoreFromDatabase(ddb.db) if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok { @@ -2115,7 +2115,7 @@ func (ddb *DoltDB) IterateRoots(cb func(root string, timestamp *time.Time) error } if nbsStore, ok := cs.(*nbs.NomsBlockStore); ok { - return nbsStore.IterateRoots(cb) + return nbsStore.IterateRoots(ctx, cb) } else { return nil } diff --git a/go/libraries/doltcore/sqle/dtablefunctions/dolt_reflog.go b/go/libraries/doltcore/sqle/dtablefunctions/dolt_reflog.go index abe153c4781..5cc787ef417 100644 --- a/go/libraries/doltcore/sqle/dtablefunctions/dolt_reflog.go +++ b/go/libraries/doltcore/sqle/dtablefunctions/dolt_reflog.go @@ -95,7 +95,7 @@ func (rltf *ReflogTableFunction) RowIter(ctx *sql.Context, row sql.Row) (sql.Row previousCommitsByRef := make(map[string]string) rows := make([]sql.Row, 0) - err := ddb.IterateRoots(func(root string, timestamp *time.Time) error { + err := ddb.IterateRoots(ctx, func(root string, timestamp *time.Time) error { hashof := hash.Parse(root) datasets, err := ddb.DatasetsByRootHash(ctx, hashof) if err != nil { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 34f7d17268f..2952dc0e4e5 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -182,8 +182,8 @@ func (nbs *NomsBlockStore) ensureLoad(ctx context.Context) error { } // IterateRoots iterates over the in-memory roots tracked by the ChunkJournal, if there is one. -func (nbs *NomsBlockStore) IterateRoots(f func(root string, timestamp *time.Time) error) error { - if err := nbs.ensureLoad(context.TODO()); err != nil { +func (nbs *NomsBlockStore) IterateRoots(ctx context.Context, f func(root string, timestamp *time.Time) error) error { + if err := nbs.ensureLoad(ctx); err != nil { return err } cj := nbs.chunkJournal() From 61c4f6102ff27a48cdc531a706cc14ca966fe70d Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 14:53:03 -0700 Subject: [PATCH 07/12] go: nbs: Count: Make it take context.Context. Helps with lazy loading. --- go/cmd/dolt/commands/fsck.go | 4 ++-- go/store/chunks/chunk_store.go | 2 +- go/store/chunks/memory_store.go | 2 +- go/store/chunks/test_utils.go | 2 +- go/store/nbs/generational_chunk_store.go | 6 +++--- go/store/nbs/nbs_metrics_wrapper.go | 4 ++-- go/store/nbs/store.go | 4 ++-- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go/cmd/dolt/commands/fsck.go b/go/cmd/dolt/commands/fsck.go index 3cb665df847..e4e5ccde42b 100644 --- a/go/cmd/dolt/commands/fsck.go +++ b/go/cmd/dolt/commands/fsck.go @@ -496,11 +496,11 @@ type roundTripper struct { } func newRoundTripper(ctx context.Context, gs *nbs.GenerationalNBS, progress chan FsckProgressMessage, errs *Errs, fileErrCounts map[string]int) (*roundTripper, error) { - chunkCount, err := gs.OldGen().Count() + chunkCount, err := gs.OldGen().Count(ctx) if err != nil { return nil, err } - chunkCount2, err := gs.NewGen().Count() + chunkCount2, err := gs.NewGen().Count(ctx) if err != nil { return nil, err } diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index 7d696703014..5b941a9218d 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -273,7 +273,7 @@ type ChunkStoreGarbageCollector interface { MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrs, filter HasManyFunc, dest ChunkStore, config GCConfig, incrementalUpdateManifest bool) (MarkAndSweeper, error) // Count returns the number of chunks in the store. - Count() (uint32, error) + Count(ctx context.Context) (uint32, error) // IterateAllChunks iterates over all chunks in the store, calling the provided callback for each chunk. This is // a wrapper over the internal chunkSource.iterateAllChunks() method. diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index d679976846b..6453ff57387 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -429,7 +429,7 @@ func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetA }, nil } -func (ms *MemoryStoreView) Count() (uint32, error) { +func (ms *MemoryStoreView) Count(_ context.Context) (uint32, error) { return uint32(len(ms.pending)), nil } diff --git a/go/store/chunks/test_utils.go b/go/store/chunks/test_utils.go index 0b69d4a59b3..8ee21b7bfdb 100644 --- a/go/store/chunks/test_utils.go +++ b/go/store/chunks/test_utils.go @@ -99,7 +99,7 @@ func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddr return collector.MarkAndSweepChunks(ctx, getAddrs, filter, collector, gcConfig, false) } -func (s *TestStoreView) Count() (uint32, error) { +func (s *TestStoreView) Count(_ context.Context) (uint32, error) { panic("currently unused") } diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index b516567f0ee..017dc660ac4 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -561,12 +561,12 @@ func (gcs *GenerationalNBS) TolerantIterateAllChunks(ctx context.Context, cb fun gcs.oldGen.TolerantIterateAllChunks(ctx, cb, errCb) } -func (gcs *GenerationalNBS) Count() (uint32, error) { - newGenCnt, err := gcs.newGen.Count() +func (gcs *GenerationalNBS) Count(ctx context.Context) (uint32, error) { + newGenCnt, err := gcs.newGen.Count(ctx) if err != nil { return 0, err } - oldGenCnt, err := gcs.oldGen.Count() + oldGenCnt, err := gcs.oldGen.Count(ctx) if err != nil { return 0, err } diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index 3a816d82ad5..5045f67fddf 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -78,8 +78,8 @@ func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs return nbsMW.nbs.MarkAndSweepChunks(ctx, getAddrs, filter, dest, gcConfig, incrementalUpdateManifest) } -func (nbsMW *NBSMetricWrapper) Count() (uint32, error) { - return nbsMW.nbs.Count() +func (nbsMW *NBSMetricWrapper) Count(ctx context.Context) (uint32, error) { + return nbsMW.nbs.Count(ctx) } func (nbsMW *NBSMetricWrapper) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 2952dc0e4e5..5180b45fe89 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1279,8 +1279,8 @@ func toGetRecords(hashes hash.HashSet) []getRecord { return reqs } -func (nbs *NomsBlockStore) Count() (uint32, error) { - if err := nbs.ensureLoad(context.Background()); err != nil { +func (nbs *NomsBlockStore) Count(ctx context.Context) (uint32, error) { + if err := nbs.ensureLoad(ctx); err != nil { return 0, err } count, tables := func() (count uint32, tables chunkReader) { From 41f569fe49fa57ffc3b5ef639d938d8e135d4145 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 14:58:53 -0700 Subject: [PATCH 08/12] go: nbs: SupportOperations takes context.Context. Helps with lazy loading. --- go/libraries/doltcore/doltdb/doltdb.go | 10 +++++++++- .../doltcore/remotestorage/chunk_store.go | 4 ++-- go/store/chunks/tablefilestore.go | 2 +- go/store/datas/database.go | 11 +++++++---- go/store/nbs/generational_chunk_store.go | 4 ++-- go/store/nbs/nbs_metrics_wrapper.go | 4 ++-- go/store/nbs/store.go | 17 ++++++++++------- 7 files changed, 33 insertions(+), 19 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index bf37aae3b0f..c664fb57c0d 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -2035,7 +2035,15 @@ func pullHash( destCS := datas.ChunkStoreFromDatabase(destDB) waf := types.WalkAddrsForNBF(srcDB.Format(), skipHashes) - if datas.CanUsePuller(srcDB) && datas.CanUsePuller(destDB) { + srcCanUsePuller, err := datas.CanUsePuller(ctx, srcDB) + if err != nil { + return err + } + destCanUsePuller, err := datas.CanUsePuller(ctx, destDB) + if err != nil { + return err + } + if srcCanUsePuller && destCanUsePuller { puller, err := pull.NewPuller(ctx, tempDir, defaultTargetFileSize, srcCS, destCS, waf, targetHashes, statsCh) if err == pull.ErrDBUpToDate { return nil diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index e3f91a01b73..71409a41cb3 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -1082,13 +1082,13 @@ const ( chunkAggDistance = 8 * 1024 ) -func (dcs *DoltChunkStore) SupportedOperations() chunks.TableFileStoreOps { +func (dcs *DoltChunkStore) SupportedOperations(_ context.Context) (chunks.TableFileStoreOps, error) { return chunks.TableFileStoreOps{ CanRead: true, CanWrite: true, CanPrune: false, CanGC: false, - } + }, nil } // WriteTableFile reads a table file from the provided reader and writes it to the chunk store. diff --git a/go/store/chunks/tablefilestore.go b/go/store/chunks/tablefilestore.go index 17b8a756a8a..437f8cdc868 100644 --- a/go/store/chunks/tablefilestore.go +++ b/go/store/chunks/tablefilestore.go @@ -90,7 +90,7 @@ type TableFileStore interface { Commit(ctx context.Context, current, last hash.Hash) (bool, error) // SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing. - SupportedOperations() TableFileStoreOps + SupportedOperations(ctx context.Context) (TableFileStoreOps, error) } type TableFileSources struct { diff --git a/go/store/datas/database.go b/go/store/datas/database.go index ee051d7ab70..17863ee1e4b 100644 --- a/go/store/datas/database.go +++ b/go/store/datas/database.go @@ -201,13 +201,16 @@ type GarbageCollector interface { // CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all // Databases support this yet. -func CanUsePuller(db Database) bool { +func CanUsePuller(ctx context.Context, db Database) (bool, error) { cs := db.chunkStore() if tfs, ok := cs.(chunks.TableFileStore); ok { - ops := tfs.SupportedOperations() - return ops.CanRead && ops.CanWrite + ops, err := tfs.SupportedOperations(ctx) + if err != nil { + return false, err + } + return ops.CanRead && ops.CanWrite, nil } - return false + return false, nil } func GetCSStatSummaryForDB(db Database) string { diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index 017dc660ac4..add45ced01d 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -446,8 +446,8 @@ func (gcs *GenerationalNBS) PruneTableFiles(ctx context.Context) error { } // SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing. -func (gcs *GenerationalNBS) SupportedOperations() chunks.TableFileStoreOps { - return gcs.newGen.SupportedOperations() +func (gcs *GenerationalNBS) SupportedOperations(ctx context.Context) (chunks.TableFileStoreOps, error) { + return gcs.newGen.SupportedOperations(ctx) } func (gcs *GenerationalNBS) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index 5045f67fddf..933ca3064df 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -62,8 +62,8 @@ func (nbsMW *NBSMetricWrapper) AddTableFilesToManifest(ctx context.Context, file } // Forwards SupportedOperations to wrapped block store. -func (nbsMW *NBSMetricWrapper) SupportedOperations() chunks.TableFileStoreOps { - return nbsMW.nbs.SupportedOperations() +func (nbsMW *NBSMetricWrapper) SupportedOperations(ctx context.Context) (chunks.TableFileStoreOps, error) { + return nbsMW.nbs.SupportedOperations(ctx) } func (nbsMW *NBSMetricWrapper) BeginGC(ctx context.Context, keeper func(hash.Hash) bool, mode chunks.GCMode) error { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 5180b45fe89..a7bee1fcf05 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1759,7 +1759,7 @@ func (nbs *NomsBlockStore) Stats() interface{} { } func (nbs *NomsBlockStore) StatsSummary() string { - if err := nbs.ensureLoad(context.Background()); err != nil { + if err := nbs.ensureLoad(context.TODO()); err != nil { return "failed to load" } nbs.mu.Lock() @@ -1919,9 +1919,9 @@ func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[hash.Hash]chunkSource, erro } -func (nbs *NomsBlockStore) SupportedOperations() chunks.TableFileStoreOps { - if err := nbs.ensureLoad(context.Background()); err != nil { - panic(err) +func (nbs *NomsBlockStore) SupportedOperations(ctx context.Context) (chunks.TableFileStoreOps, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return chunks.TableFileStoreOps{}, err } var ok bool _, ok = nbs.persister.(tableFilePersister) @@ -1931,7 +1931,7 @@ func (nbs *NomsBlockStore) SupportedOperations() chunks.TableFileStoreOps { CanWrite: ok, CanPrune: ok, CanGC: ok, - } + }, nil } func (nbs *NomsBlockStore) Path() (string, bool) { @@ -2284,7 +2284,10 @@ func (nbs *NomsBlockStore) hasLocalGCNovelty() bool { } func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src CompressedChunkStoreForGC, dest chunks.ChunkStore, getAddrs chunks.GetAddrs, filter chunks.HasManyFunc, gcConfig chunks.GCConfig, incrementalUpdateManifest bool) (chunks.MarkAndSweeper, error) { - ops := nbs.SupportedOperations() + ops, err := nbs.SupportedOperations(ctx) + if err != nil { + return nil, err + } if !ops.CanGC || !ops.CanPrune { return nil, chunks.ErrUnsupportedOperation } @@ -2325,7 +2328,7 @@ func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src Compressed } return nil } - err := precheck() + err = precheck() if err != nil { return nil, err } From a8fe772e4df0439525f8337a12217a47a3f8c7c1 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 15:03:59 -0700 Subject: [PATCH 09/12] go: nbs: Path takes context.Context. Helps with lazy loading. --- go/libraries/doltcore/remotesrv/dbcache.go | 2 +- go/libraries/doltcore/remotesrv/grpc.go | 20 +++++++++------ go/store/nbs/admin_newgen_to_oldgen.go | 10 ++++++-- go/store/nbs/archive_build.go | 10 ++++++-- go/store/nbs/generational_chunk_store.go | 30 +++++++++++++++------- go/store/nbs/store.go | 12 ++++----- 6 files changed, 56 insertions(+), 28 deletions(-) diff --git a/go/libraries/doltcore/remotesrv/dbcache.go b/go/libraries/doltcore/remotesrv/dbcache.go index ca2ca05a05c..a8ba530bf26 100644 --- a/go/libraries/doltcore/remotesrv/dbcache.go +++ b/go/libraries/doltcore/remotesrv/dbcache.go @@ -30,7 +30,7 @@ type RemoteSrvStore interface { chunks.ChunkStore chunks.TableFileStore - Path() (string, bool) + Path(ctx context.Context) (string, bool, error) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]nbs.Range, error) } diff --git a/go/libraries/doltcore/remotesrv/grpc.go b/go/libraries/doltcore/remotesrv/grpc.go index 253481d939a..a7672a94883 100644 --- a/go/libraries/doltcore/remotesrv/grpc.go +++ b/go/libraries/doltcore/remotesrv/grpc.go @@ -143,8 +143,11 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh return resp, nil } -func (rs *RemoteChunkStore) getRelativeStorePath(cs RemoteSrvStore) (string, error) { - cspath, ok := cs.Path() +func (rs *RemoteChunkStore) getRelativeStorePath(ctx context.Context, cs RemoteSrvStore) (string, error) { + cspath, ok, err := cs.Path(ctx) + if err != nil { + return "", err + } if !ok { return "", status.Error(codes.Internal, "chunkstore misconfigured; cannot generate HTTP paths") } @@ -175,7 +178,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot hashes, _ := remotestorage.ParseByteSlices(req.ChunkHashes) - prefix, err := rs.getRelativeStorePath(cs) + prefix, err := rs.getRelativeStorePath(ctx, cs) if err != nil { logger.WithError(err).Error("error getting file store path for chunk store") return nil, err @@ -280,7 +283,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore if err != nil { return err } - prefix, err = rs.getRelativeStorePath(cs) + prefix, err = rs.getRelativeStorePath(stream.Context(), cs) if err != nil { logger.WithError(err).Error("error getting file store path for chunk store") return err @@ -396,7 +399,7 @@ func (rs *RemoteChunkStore) StreamChunkLocations(stream remotesapi.ChunkStoreSer if err != nil { return err } - prefix, err = rs.getRelativeStorePath(cs) + prefix, err = rs.getRelativeStorePath(stream.Context(), cs) if err != nil { logger.WithError(err).Error("error getting file store path for chunk store") return err @@ -765,13 +768,13 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi. md, _ := metadata.FromIncomingContext(ctx) - tableFileInfo, err := getTableFileInfo(logger, md, rs, tables, req, cs) + tableFileInfo, err := getTableFileInfo(ctx, logger, md, rs, tables, req, cs) if err != nil { logger.WithError(err).Error("error getting table file info") return nil, err } - appendixTableFileInfo, err := getTableFileInfo(logger, md, rs, appendixTables, req, cs) + appendixTableFileInfo, err := getTableFileInfo(ctx, logger, md, rs, appendixTables, req, cs) if err != nil { logger.WithError(err).Error("error getting appendix table file info") return nil, err @@ -792,6 +795,7 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi. } func getTableFileInfo( + ctx context.Context, logger *logrus.Entry, md metadata.MD, rs *RemoteChunkStore, @@ -799,7 +803,7 @@ func getTableFileInfo( req *remotesapi.ListTableFilesRequest, cs RemoteSrvStore, ) ([]*remotesapi.TableFileInfo, error) { - prefix, err := rs.getRelativeStorePath(cs) + prefix, err := rs.getRelativeStorePath(ctx, cs) if err != nil { return nil, err } diff --git a/go/store/nbs/admin_newgen_to_oldgen.go b/go/store/nbs/admin_newgen_to_oldgen.go index 2bf7c47abdc..1e96638381e 100644 --- a/go/store/nbs/admin_newgen_to_oldgen.go +++ b/go/store/nbs/admin_newgen_to_oldgen.go @@ -30,8 +30,14 @@ func MoveNewGenToOldGen( progress chan string, ) error { if gs, ok := cs.(*GenerationalNBS); ok { - srcPath, _ := gs.newGen.Path() - dstPath, _ := gs.oldGen.Path() + srcPath, _, err := gs.newGen.Path(ctx) + if err != nil { + return err + } + dstPath, _, err := gs.oldGen.Path(ctx) + if err != nil { + return err + } allFiles := make([]hash.Hash, 0, len(gs.newGen.tables.upstream)) sourceSet := gs.newGen.tables.upstream diff --git a/go/store/nbs/archive_build.go b/go/store/nbs/archive_build.go index 87c213e8943..ab517bbc25a 100644 --- a/go/store/nbs/archive_build.go +++ b/go/store/nbs/archive_build.go @@ -61,7 +61,10 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p } func unArchiveSingleBlockStore(ctx context.Context, blockStore *NomsBlockStore, smd StorageMetadata, progress chan interface{}) error { - outPath, _ := blockStore.Path() + outPath, _, err := blockStore.Path(ctx) + if err != nil { + return err + } // The source set changes out from under us, but the names of the table files will stay stable enough // to iterate over them. @@ -164,7 +167,10 @@ func archiveSingleBlockStore(ctx context.Context, blockStore *NomsBlockStore, da // Currently, we don't have any stats to report. Required for calls to the lower layers tho. var stats Stats - path, _ := blockStore.Path() + path, _, err := blockStore.Path(ctx) + if err != nil { + return err + } allFiles := make([]hash.Hash, 0, len(blockStore.tables.upstream)) diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index add45ced01d..4aa0b472ca9 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -397,7 +397,10 @@ func (gcs *GenerationalNBS) Sources(ctx context.Context) (chunks.TableFileSource ret.Root = newgensources.Root ret.TableFiles = append([]chunks.TableFile{}, newgensources.TableFiles...) ret.AppendixTableFiles = append([]chunks.TableFile{}, newgensources.AppendixTableFiles...) - prefix := gcs.RelativeOldGenPath() + prefix, err := gcs.RelativeOldGenPath(ctx) + if err != nil { + return chunks.TableFileSources{}, err + } for _, tf := range oldgensources.TableFiles { ret.TableFiles = append(ret.TableFiles, prefixedTableFile{tf, prefix}) } @@ -456,7 +459,10 @@ func (gcs *GenerationalNBS) GetChunkLocationsWithPaths(ctx context.Context, hash return nil, err } if len(hashes) > 0 { - prefix := gcs.RelativeOldGenPath() + prefix, err := gcs.RelativeOldGenPath(ctx) + if err != nil { + return nil, err + } toadd, err := gcs.oldGen.GetChunkLocationsWithPaths(ctx, hashes) if err != nil { return nil, err @@ -485,19 +491,25 @@ func (gcs *GenerationalNBS) GetChunkLocations(ctx context.Context, hashes hash.H return res, nil } -func (gcs *GenerationalNBS) RelativeOldGenPath() string { - newgenpath, ngpok := gcs.newGen.Path() - oldgenpath, ogpok := gcs.oldGen.Path() +func (gcs *GenerationalNBS) RelativeOldGenPath(ctx context.Context) (string, error) { + newgenpath, ngpok, err := gcs.newGen.Path(ctx) + if err != nil { + return "", err + } + oldgenpath, ogpok, err := gcs.oldGen.Path(ctx) + if err != nil { + return "", err + } if ngpok && ogpok { if p, err := filepath.Rel(newgenpath, oldgenpath); err == nil { - return p + return p, nil } } - return "oldgen" + return "oldgen", nil } -func (gcs *GenerationalNBS) Path() (string, bool) { - return gcs.newGen.Path() +func (gcs *GenerationalNBS) Path(ctx context.Context) (string, bool, error) { + return gcs.newGen.Path(ctx) } func (gcs *GenerationalNBS) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index a7bee1fcf05..09b99bbd16f 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1934,19 +1934,19 @@ func (nbs *NomsBlockStore) SupportedOperations(ctx context.Context) (chunks.Tabl }, nil } -func (nbs *NomsBlockStore) Path() (string, bool) { - if err := nbs.ensureLoad(context.Background()); err != nil { - return "", false +func (nbs *NomsBlockStore) Path(ctx context.Context) (string, bool, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return "", false, err } if tfp, ok := nbs.persister.(tableFilePersister); ok { switch p := tfp.(type) { case *fsTablePersister, *ChunkJournal: - return p.Path(), true + return p.Path(), true, nil default: - return "", false + return "", false, nil } } - return "", false + return "", false, nil } // WriteTableFile will read a table file from the provided reader and write it to the TableFileStore From 37b641f4964cbe4a2a41c64bcee93b05454fc801 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 15:06:56 -0700 Subject: [PATCH 10/12] go: nbs: TolerantIterateAllChunks takes context.Context. Helps with lazy loading. --- go/cmd/dolt/commands/fsck.go | 4 ++-- go/store/nbs/generational_chunk_store.go | 15 +++++++++++---- go/store/nbs/store.go | 9 +++++---- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/go/cmd/dolt/commands/fsck.go b/go/cmd/dolt/commands/fsck.go index e4e5ccde42b..3e0fdbe626d 100644 --- a/go/cmd/dolt/commands/fsck.go +++ b/go/cmd/dolt/commands/fsck.go @@ -522,11 +522,11 @@ func newRoundTripper(ctx context.Context, gs *nbs.GenerationalNBS, progress chan } func (rt *roundTripper) scanAll(ctx context.Context) error { - rt.gs.TolerantIterateAllChunks(ctx, rt.roundTripAndCategorizeChunk, func(sourceFile string, err error) { + err := rt.gs.TolerantIterateAllChunks(ctx, rt.roundTripAndCategorizeChunk, func(sourceFile string, err error) { rt.errs.AppendE(err) rt.fileErrCounts[sourceFile]++ }) - return ctx.Err() + return errors.Join(err, ctx.Err()) } // roundTripAndCategorizeChunk verifies the chunk's hash matches its content, categorizes it by type. This method is diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index 4aa0b472ca9..1ff91d0292a 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -565,12 +565,19 @@ func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk return nil } -func (gcs *GenerationalNBS) TolerantIterateAllChunks(ctx context.Context, cb func(chunks.Chunk), errCb func(sourceFile string, err error)) { - gcs.newGen.TolerantIterateAllChunks(ctx, cb, errCb) +func (gcs *GenerationalNBS) TolerantIterateAllChunks(ctx context.Context, cb func(chunks.Chunk), errCb func(sourceFile string, err error)) error { + err := gcs.newGen.TolerantIterateAllChunks(ctx, cb, errCb) + if err != nil { + return err + } if ctx.Err() != nil { - return + return nil } - gcs.oldGen.TolerantIterateAllChunks(ctx, cb, errCb) + err = gcs.oldGen.TolerantIterateAllChunks(ctx, cb, errCb) + if err != nil { + return err + } + return nil } func (gcs *GenerationalNBS) Count(ctx context.Context) (uint32, error) { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 09b99bbd16f..772fa164323 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -2605,24 +2605,25 @@ func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk c return nil } -func (nbs *NomsBlockStore) TolerantIterateAllChunks(ctx context.Context, cb func(chunks.Chunk), errCb func(sourceFile string, err error)) { +func (nbs *NomsBlockStore) TolerantIterateAllChunks(ctx context.Context, cb func(chunks.Chunk), errCb func(sourceFile string, err error)) error { if err := nbs.ensureLoad(ctx); err != nil { - panic(err) + return err } for _, v := range nbs.tables.novel { fileName := v.hash().String() + v.suffix() v.tolerantIterateAllChunks(ctx, cb, func(err error) { errCb(fileName, err) }, nbs.stats) if ctx.Err() != nil { - return + return nil } } for _, v := range nbs.tables.upstream { fileName := v.hash().String() + v.suffix() v.tolerantIterateAllChunks(ctx, cb, func(err error) { errCb(fileName, err) }, nbs.stats) if ctx.Err() != nil { - return + return nil } } + return nil } func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec, mode chunks.GCMode, srcs chunkSourceSet) (err error) { From 4b7d15e40da060219b2f1d6eaab93864150b2a3f Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 15:16:37 -0700 Subject: [PATCH 11/12] go: nbs,dconfig: Add a testing env variable which allows for configuring Dolt to panic when it loads table files. --- go/libraries/doltcore/dconfig/envvars.go | 5 +++++ go/store/nbs/file_table_persister.go | 16 ++++++++++++++++ go/store/nbs/journal.go | 1 + 3 files changed, 22 insertions(+) diff --git a/go/libraries/doltcore/dconfig/envvars.go b/go/libraries/doltcore/dconfig/envvars.go index 36472aac196..0f17ca4dcd8 100755 --- a/go/libraries/doltcore/dconfig/envvars.go +++ b/go/libraries/doltcore/dconfig/envvars.go @@ -57,4 +57,9 @@ const ( // Used for tests. If set, Dolt will error if it would rebuild a table's row data. EnvAssertNoTableRewrite = "DOLT_TEST_ASSERT_NO_TABLE_REWRITE" EnvAssertNoInMemoryArchiveIndex = "DOLT_TEST_ASSERT_NO_IN_MEMORY_ARCHIVE_INDEX" + + // Used for tests. Make Dolt fail if it loads table file data + // from disk, such as bootstraping the journal file or loading + // a table file with tableFilePersister.Open. + EnvAssertNoTableFilesRead = "DOLT_TEST_ASSERT_NO_TABLE_FILES_READ" ) diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index 1aa600293b8..b97c0e1b0ca 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -35,6 +35,7 @@ import ( "sync" "time" + "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" dherrors "github.com/dolthub/dolt/go/libraries/utils/errors" "github.com/dolthub/dolt/go/libraries/utils/file" "github.com/dolthub/dolt/go/store/chunks" @@ -42,6 +43,20 @@ import ( "github.com/dolthub/dolt/go/store/util/tempfiles" ) +var loadingTableFilesDisabled bool + +func init() { + if os.Getenv(dconfig.EnvAssertNoTableFilesRead) != "" { + loadingTableFilesDisabled = true + } +} + +func PanicIfLoadingTableFilesDisabled() { + if loadingTableFilesDisabled { + panic("tried to load a table file or journal file but loading table files is disabled") + } +} + const tempTablePrefix = "nbs_table_" func newFSTablePersister(dir string, q MemoryQuotaProvider, mmapArchiveIndexes bool) tablePersister { @@ -145,6 +160,7 @@ func (ftplc *fsTablePersisterRefCounter) addRef() { } func (ftp *fsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) { + PanicIfLoadingTableFilesDisabled() ftp.pruneMu.RLock() defer ftp.pruneMu.RUnlock() rc := fsTablePersisterRefCounter{ftp, name} diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index 6cfd09ace6b..b9f59c80bc3 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -80,6 +80,7 @@ var _ manifest = journalManifestWrapper{} // NomsBlockStore is not yet constructed at this point, so callers pass FatalBehaviorError to // fail store creation rather than crash the process. func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifest, p *fsTablePersister, behavior dherrors.FatalBehavior, warningsCb func(error)) (*ChunkJournal, error) { + PanicIfLoadingTableFilesDisabled() path, err := filepath.Abs(filepath.Join(dir, chunkJournalName)) if err != nil { return nil, err From fd9284e20d92b91896ee03ba0474f1b800504316 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 28 May 2026 15:31:31 -0700 Subject: [PATCH 12/12] integration-tests/go-sql-server-driver: Add some tests which assert correct behavior of table file loading in the presence and absence of a running sql-server. --- .../no_table_files_read_test.go | 183 ++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 integration-tests/go-sql-server-driver/no_table_files_read_test.go diff --git a/integration-tests/go-sql-server-driver/no_table_files_read_test.go b/integration-tests/go-sql-server-driver/no_table_files_read_test.go new file mode 100644 index 00000000000..bff901b9ae4 --- /dev/null +++ b/integration-tests/go-sql-server-driver/no_table_files_read_test.go @@ -0,0 +1,183 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" + driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" +) + +// These tests assert the behavior of an optimization where dolt CLI commands +// avoid loading potentially-large local database files when the command can +// instead be serviced by a running `dolt sql-server`. +// +// The hook for asserting that no table files are loaded is the +// DOLT_TEST_ASSERT_NO_TABLE_FILES_READ environment variable. When it is set, +// dolt panics if it ever bootstraps the chunk journal or opens a table file +// from disk. We use it to prove that, when a server is running, the CLI never +// touches the local database files. +// +// The test matrix has three top-level scenarios: +// +// 1. A dolt sql-server is running. The CLI is invoked with +// DOLT_TEST_ASSERT_NO_TABLE_FILES_READ=1 and must succeed, because it +// dispatches to the server without reading any table files. +// 2. No server is running, but a stale sql-server.info file is present. The +// CLI is invoked (executing locally) and must still succeed. +// 3. No server is running, but a concurrent `dolt sql` process holds the +// database lock. The CLI is invoked (executing locally); reads succeed and +// writes fail because they cannot acquire the write lock. +// +// In scenarios 2 and 3 the CLI must read the local database, so we do not set +// DOLT_TEST_ASSERT_NO_TABLE_FILES_READ for those invocations. + +// makePopulatedRepo creates a fresh repo store and an initialized repo named +// |name| with a committed table, some rows, and a remote configured. +func makePopulatedRepo(t *testing.T, u driver.DoltUser, name string) driver.Repo { + rs, err := u.MakeRepoStore() + require.NoError(t, err) + repo, err := rs.MakeRepo(name) + require.NoError(t, err) + err = repo.DoltExec("sql", "-q", "create table vals (id int primary key auto_increment, v int);"+ + "insert into vals (v) values (1), (2), (3);"+ + "call dolt_commit('-Am', 'initial data')") + require.NoError(t, err) + require.NoError(t, repo.CreateRemote("origin", "file://"+filepath.Join(repo.Dir, "remote"))) + return repo +} + +// withNoTableFilesRead sets DOLT_TEST_ASSERT_NO_TABLE_FILES_READ=1 on |cmd| so +// that the command panics if it loads any table files from disk. +func withNoTableFilesRead(cmd *exec.Cmd) *exec.Cmd { + cmd.Env = append(cmd.Env, dconfig.EnvAssertNoTableFilesRead+"=1") + return cmd +} + +func assertCmdSucceeds(t *testing.T, cmd *exec.Cmd) string { + out, err := cmd.CombinedOutput() + require.NoError(t, err, "expected command to succeed, output:\n%s", string(out)) + return string(out) +} + +func assertCmdFails(t *testing.T, cmd *exec.Cmd) string { + out, err := cmd.CombinedOutput() + require.Error(t, err, "expected command to fail, output:\n%s", string(out)) + return string(out) +} + +func TestNoTableFilesRead(t *testing.T) { + t.Parallel() + + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + + // Sanity check: with the assertion hook enabled and no server running, a + // CLI command which reads the database must fail. This proves the hook + // itself works and the success cases below are meaningful. + t.Run("AssertHookFailsWhenLoadingTableFiles", func(t *testing.T) { + repo := makePopulatedRepo(t, u, "assert_hook") + out := assertCmdFails(t, withNoTableFilesRead(repo.DoltCmd("log"))) + require.Regexp(t, "loading table files is disabled", out) + }) + + // Scenario 1: a dolt sql-server is running. All CLI invocations set the + // assertion hook and must succeed by dispatching to the server. + t.Run("WithRunningServer", func(t *testing.T) { + repo := makePopulatedRepo(t, u, "with_running_server") + + var ports DynamicResources + ports.global = &GlobalPorts + ports.t = t + RunServerUntilEndOfTest(t, repo, &driver.Server{ + Args: []string{"--port", `{{get_port "server"}}`}, + DynamicPort: "server", + }, &ports) + + t.Run("sql read", func(t *testing.T) { + out := assertCmdSucceeds(t, withNoTableFilesRead(repo.DoltCmd("sql", "-q", "select count(*) from vals"))) + require.Regexp(t, "3", out) + }) + t.Run("sql write", func(t *testing.T) { + assertCmdSucceeds(t, withNoTableFilesRead(repo.DoltCmd("sql", "-q", "insert into vals (v) values (10)"))) + }) + t.Run("remote -v", func(t *testing.T) { + out := assertCmdSucceeds(t, withNoTableFilesRead(repo.DoltCmd("remote", "-v"))) + require.Regexp(t, "origin", out) + }) + t.Run("log", func(t *testing.T) { + out := assertCmdSucceeds(t, withNoTableFilesRead(repo.DoltCmd("log"))) + require.Regexp(t, "initial data", out) + }) + }) + + // Scenario 2: no server running, but a stale sql-server.info file is + // present. The CLI executes locally and must still work. + t.Run("WithStaleInfoFile", func(t *testing.T) { + repo := makePopulatedRepo(t, u, "with_stale_info_file") + path := filepath.Join(repo.Dir, ".dolt/sql-server.info") + require.NoError(t, os.WriteFile(path, []byte("1:3306:this_is_not_a_real_secret"), 0600)) + + t.Run("sql read", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("sql", "-q", "select count(*) from vals")) + require.Regexp(t, "3", out) + }) + t.Run("sql write", func(t *testing.T) { + assertCmdSucceeds(t, repo.DoltCmd("sql", "-q", "insert into vals (v) values (20)")) + }) + t.Run("remote -v", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("remote", "-v")) + require.Regexp(t, "origin", out) + }) + t.Run("log", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("log")) + require.Regexp(t, "initial data", out) + }) + }) + + // Scenario 3: no server running, but a concurrent `dolt sql` process holds + // the database lock. The CLI executes locally: reads succeed and writes + // fail because they cannot acquire the exclusive write lock. + t.Run("WithConcurrentDoltSql", func(t *testing.T) { + repo := makePopulatedRepo(t, u, "with_concurrent_dolt_sql") + RunDoltSQLUntilEndOfTest(t, repo) + + t.Run("sql read", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("sql", "-q", "select count(*) from vals")) + require.Regexp(t, "3", out) + }) + t.Run("sql write fails", func(t *testing.T) { + out := assertCmdFails(t, repo.DoltCmd("sql", "-q", "insert into vals (v) values (30)")) + require.Regexp(t, "(?i)read only", out) + }) + t.Run("remote -v", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("remote", "-v")) + require.Regexp(t, "origin", out) + }) + t.Run("log", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("log")) + require.Regexp(t, "initial data", out) + }) + }) +}