diff --git a/go/cmd/dolt/commands/fsck.go b/go/cmd/dolt/commands/fsck.go index 52d12abac4d..3e0fdbe626d 100644 --- a/go/cmd/dolt/commands/fsck.go +++ b/go/cmd/dolt/commands/fsck.go @@ -133,6 +133,11 @@ func (cmd FsckCmd) Exec(ctx context.Context, commandStr string, args []string, d ddb, _, _, err := dbFact.CreateDbNoCache(ctx, types.Format_DOLT, u, params, func(vErr error) { report.ScanErrs.AppendE(vErr) }) + if err == nil { + // Creating NomsBlockStore can lazily load the actual database resources. We force them here by performing + // an actual read against the database. + _, err = datas.ChunkStoreFromDatabase(ddb).Root(ctx) + } if err != nil { if errors.Is(err, nbs.ErrJournalDataLoss) { cli.PrintErrln("WARNING: Chunk journal is corrupted and some data may be lost.") @@ -491,11 +496,11 @@ type roundTripper struct { } func newRoundTripper(ctx context.Context, gs *nbs.GenerationalNBS, progress chan FsckProgressMessage, errs *Errs, fileErrCounts map[string]int) (*roundTripper, error) { - chunkCount, err := gs.OldGen().Count() + chunkCount, err := gs.OldGen().Count(ctx) if err != nil { return nil, err } - chunkCount2, err := gs.NewGen().Count() + chunkCount2, err := gs.NewGen().Count(ctx) if err != nil { return nil, err } @@ -517,11 +522,11 @@ func newRoundTripper(ctx context.Context, gs *nbs.GenerationalNBS, progress chan } func (rt *roundTripper) scanAll(ctx context.Context) error { - rt.gs.TolerantIterateAllChunks(ctx, rt.roundTripAndCategorizeChunk, func(sourceFile string, err error) { + err := rt.gs.TolerantIterateAllChunks(ctx, rt.roundTripAndCategorizeChunk, func(sourceFile string, err error) { rt.errs.AppendE(err) rt.fileErrCounts[sourceFile]++ }) - return ctx.Err() + return errors.Join(err, ctx.Err()) } // roundTripAndCategorizeChunk verifies the chunk's hash matches its content, categorizes it by type. This method is diff --git a/go/libraries/doltcore/dconfig/envvars.go b/go/libraries/doltcore/dconfig/envvars.go index 36472aac196..0f17ca4dcd8 100755 --- a/go/libraries/doltcore/dconfig/envvars.go +++ b/go/libraries/doltcore/dconfig/envvars.go @@ -57,4 +57,9 @@ const ( // Used for tests. If set, Dolt will error if it would rebuild a table's row data. EnvAssertNoTableRewrite = "DOLT_TEST_ASSERT_NO_TABLE_REWRITE" EnvAssertNoInMemoryArchiveIndex = "DOLT_TEST_ASSERT_NO_IN_MEMORY_ARCHIVE_INDEX" + + // Used for tests. Make Dolt fail if it loads table file data + // from disk, such as bootstraping the journal file or loading + // a table file with tableFilePersister.Open. + EnvAssertNoTableFilesRead = "DOLT_TEST_ASSERT_NO_TABLE_FILES_READ" ) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index ebd642bbbe0..c664fb57c0d 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -2035,7 +2035,15 @@ func pullHash( destCS := datas.ChunkStoreFromDatabase(destDB) waf := types.WalkAddrsForNBF(srcDB.Format(), skipHashes) - if datas.CanUsePuller(srcDB) && datas.CanUsePuller(destDB) { + srcCanUsePuller, err := datas.CanUsePuller(ctx, srcDB) + if err != nil { + return err + } + destCanUsePuller, err := datas.CanUsePuller(ctx, destDB) + if err != nil { + return err + } + if srcCanUsePuller && destCanUsePuller { puller, err := pull.NewPuller(ctx, tempDir, defaultTargetFileSize, srcCS, destCS, waf, targetHashes, statsCh) if err == pull.ErrDBUpToDate { return nil @@ -2107,7 +2115,7 @@ func (ddb *DoltDB) restoreDefaultConjoinBehavior() { // NomsBlockStore instance which exposes its roots through // IterateRoots. Otherwise returns |nil| without visiting any roots, // including the current one. -func (ddb *DoltDB) IterateRoots(cb func(root string, timestamp *time.Time) error) error { +func (ddb *DoltDB) IterateRoots(ctx context.Context, cb func(root string, timestamp *time.Time) error) error { cs := datas.ChunkStoreFromDatabase(ddb.db) if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok { @@ -2115,7 +2123,7 @@ func (ddb *DoltDB) IterateRoots(cb func(root string, timestamp *time.Time) error } if nbsStore, ok := cs.(*nbs.NomsBlockStore); ok { - return nbsStore.IterateRoots(cb) + return nbsStore.IterateRoots(ctx, cb) } else { return nil } @@ -2172,7 +2180,10 @@ func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) { if err != nil { return StoreSizes{}, err } - journalSz, ok := newGenNBS.ChunkJournalSize() + journalSz, ok, err := newGenNBS.ChunkJournalSize(ctx) + if err != nil { + return StoreSizes{}, err + } if ok { return StoreSizes{ JournalBytes: uint64(journalSz), diff --git a/go/libraries/doltcore/env/environment.go b/go/libraries/doltcore/env/environment.go index 9622deea616..fe6af7dffe2 100644 --- a/go/libraries/doltcore/env/environment.go +++ b/go/libraries/doltcore/env/environment.go @@ -247,45 +247,48 @@ func LoadDoltDB(ctx context.Context, dEnv *DoltEnv) { dEnv.doltDB = ddb dEnv.DBLoadError = dbLoadErr - if dbLoadErr == nil && dEnv.HasDoltDir() { - if !dEnv.HasDoltTempTableDir() { - tmpDir, err := dEnv.TempTableFilesDir() - if err != nil { - dEnv.DBLoadError = err - } - err = dEnv.FS.MkDirs(tmpDir) - dEnv.DBLoadError = err - } else { - // fire and forget cleanup routine. Will delete as many old temp files as it can during the main commands execution. - // The process will not wait for this to finish so this may not always complete. - go func() { - // TODO dEnv.HasDoltTempTableDir() true but dEnv.TempTableFileDir() panics - tmpTableDir, err := dEnv.FS.Abs(filepath.Join(dEnv.urlStr, dbfactory.DoltDir, tempTablesDir)) + if ddb != nil && ddb.AccessMode() != chunks.ExclusiveAccessMode_ReadOnly { + // Only do the following when we have write access to the database. + if dbLoadErr == nil && dEnv.HasDoltDir() { + if !dEnv.HasDoltTempTableDir() { + tmpDir, err := dEnv.TempTableFilesDir() if err != nil { - return + dEnv.DBLoadError = err } - _ = dEnv.FS.Iter(tmpTableDir, true, func(path string, size int64, isDir bool) (stop bool) { - if !isDir { - lm, exists := dEnv.FS.LastModified(path) + err = dEnv.FS.MkDirs(tmpDir) + dEnv.DBLoadError = err + } else { + // fire and forget cleanup routine. Will delete as many old temp files as it can during the main commands execution. + // The process will not wait for this to finish so this may not always complete. + go func() { + // TODO dEnv.HasDoltTempTableDir() true but dEnv.TempTableFileDir() panics + tmpTableDir, err := dEnv.FS.Abs(filepath.Join(dEnv.urlStr, dbfactory.DoltDir, tempTablesDir)) + if err != nil { + return + } + _ = dEnv.FS.Iter(tmpTableDir, true, func(path string, size int64, isDir bool) (stop bool) { + if !isDir { + lm, exists := dEnv.FS.LastModified(path) - if exists && time.Now().Sub(lm) > (time.Hour*24) { - _ = dEnv.FS.DeleteFile(path) + if exists && time.Now().Sub(lm) > (time.Hour*24) { + _ = dEnv.FS.DeleteFile(path) + } } - } - return false - }) - }() + return false + }) + }() + } } - } - if dEnv.RSLoadErr == nil && dbLoadErr == nil { - // If the working set isn't present in the DB, create it from the repo state. This step can be removed post 1.0. - _, err := dEnv.WorkingSet(ctx) - if errors.Is(err, doltdb.ErrWorkingSetNotFound) { - _ = dEnv.initWorkingSetFromRepoState(ctx) - } else if err != nil { - dEnv.RSLoadErr = err + if dEnv.RSLoadErr == nil && dbLoadErr == nil { + // If the working set isn't present in the DB, create it from the repo state. This step can be removed post 1.0. + _, err := dEnv.WorkingSet(ctx) + if errors.Is(err, doltdb.ErrWorkingSetNotFound) { + _ = dEnv.initWorkingSetFromRepoState(ctx) + } else if err != nil { + dEnv.RSLoadErr = err + } } } }) diff --git a/go/libraries/doltcore/env/multi_repo_env.go b/go/libraries/doltcore/env/multi_repo_env.go index d045dd01328..e3a33675099 100644 --- a/go/libraries/doltcore/env/multi_repo_env.go +++ b/go/libraries/doltcore/env/multi_repo_env.go @@ -193,7 +193,6 @@ func multiEnvForConfigDirectoryEnv(ctx context.Context, config config.ReadWriteC if dbErr != nil { if errors.Is(dbErr, nbs.ErrJournalDataLoss) { logrus.Errorf("failed to load database %s with error: %s", dbName, dbErr.Error()) - logrus.Errorf("please run 'dolt fsck' to assess the damage and attempt repairs") } else if !errors.Is(dbErr, doltdb.ErrMissingDoltDataDir) { logrus.Warnf("failed to load database with error: %s", dbErr.Error()) } diff --git a/go/libraries/doltcore/remotesrv/dbcache.go b/go/libraries/doltcore/remotesrv/dbcache.go index ca2ca05a05c..a8ba530bf26 100644 --- a/go/libraries/doltcore/remotesrv/dbcache.go +++ b/go/libraries/doltcore/remotesrv/dbcache.go @@ -30,7 +30,7 @@ type RemoteSrvStore interface { chunks.ChunkStore chunks.TableFileStore - Path() (string, bool) + Path(ctx context.Context) (string, bool, error) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]nbs.Range, error) } diff --git a/go/libraries/doltcore/remotesrv/grpc.go b/go/libraries/doltcore/remotesrv/grpc.go index 253481d939a..a7672a94883 100644 --- a/go/libraries/doltcore/remotesrv/grpc.go +++ b/go/libraries/doltcore/remotesrv/grpc.go @@ -143,8 +143,11 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh return resp, nil } -func (rs *RemoteChunkStore) getRelativeStorePath(cs RemoteSrvStore) (string, error) { - cspath, ok := cs.Path() +func (rs *RemoteChunkStore) getRelativeStorePath(ctx context.Context, cs RemoteSrvStore) (string, error) { + cspath, ok, err := cs.Path(ctx) + if err != nil { + return "", err + } if !ok { return "", status.Error(codes.Internal, "chunkstore misconfigured; cannot generate HTTP paths") } @@ -175,7 +178,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot hashes, _ := remotestorage.ParseByteSlices(req.ChunkHashes) - prefix, err := rs.getRelativeStorePath(cs) + prefix, err := rs.getRelativeStorePath(ctx, cs) if err != nil { logger.WithError(err).Error("error getting file store path for chunk store") return nil, err @@ -280,7 +283,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore if err != nil { return err } - prefix, err = rs.getRelativeStorePath(cs) + prefix, err = rs.getRelativeStorePath(stream.Context(), cs) if err != nil { logger.WithError(err).Error("error getting file store path for chunk store") return err @@ -396,7 +399,7 @@ func (rs *RemoteChunkStore) StreamChunkLocations(stream remotesapi.ChunkStoreSer if err != nil { return err } - prefix, err = rs.getRelativeStorePath(cs) + prefix, err = rs.getRelativeStorePath(stream.Context(), cs) if err != nil { logger.WithError(err).Error("error getting file store path for chunk store") return err @@ -765,13 +768,13 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi. md, _ := metadata.FromIncomingContext(ctx) - tableFileInfo, err := getTableFileInfo(logger, md, rs, tables, req, cs) + tableFileInfo, err := getTableFileInfo(ctx, logger, md, rs, tables, req, cs) if err != nil { logger.WithError(err).Error("error getting table file info") return nil, err } - appendixTableFileInfo, err := getTableFileInfo(logger, md, rs, appendixTables, req, cs) + appendixTableFileInfo, err := getTableFileInfo(ctx, logger, md, rs, appendixTables, req, cs) if err != nil { logger.WithError(err).Error("error getting appendix table file info") return nil, err @@ -792,6 +795,7 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi. } func getTableFileInfo( + ctx context.Context, logger *logrus.Entry, md metadata.MD, rs *RemoteChunkStore, @@ -799,7 +803,7 @@ func getTableFileInfo( req *remotesapi.ListTableFilesRequest, cs RemoteSrvStore, ) ([]*remotesapi.TableFileInfo, error) { - prefix, err := rs.getRelativeStorePath(cs) + prefix, err := rs.getRelativeStorePath(ctx, cs) if err != nil { return nil, err } diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index e3f91a01b73..71409a41cb3 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -1082,13 +1082,13 @@ const ( chunkAggDistance = 8 * 1024 ) -func (dcs *DoltChunkStore) SupportedOperations() chunks.TableFileStoreOps { +func (dcs *DoltChunkStore) SupportedOperations(_ context.Context) (chunks.TableFileStoreOps, error) { return chunks.TableFileStoreOps{ CanRead: true, CanWrite: true, CanPrune: false, CanGC: false, - } + }, nil } // WriteTableFile reads a table file from the provided reader and writes it to the chunk store. diff --git a/go/libraries/doltcore/sqle/dtablefunctions/dolt_reflog.go b/go/libraries/doltcore/sqle/dtablefunctions/dolt_reflog.go index abe153c4781..5cc787ef417 100644 --- a/go/libraries/doltcore/sqle/dtablefunctions/dolt_reflog.go +++ b/go/libraries/doltcore/sqle/dtablefunctions/dolt_reflog.go @@ -95,7 +95,7 @@ func (rltf *ReflogTableFunction) RowIter(ctx *sql.Context, row sql.Row) (sql.Row previousCommitsByRef := make(map[string]string) rows := make([]sql.Row, 0) - err := ddb.IterateRoots(func(root string, timestamp *time.Time) error { + err := ddb.IterateRoots(ctx, func(root string, timestamp *time.Time) error { hashof := hash.Parse(root) datasets, err := ddb.DatasetsByRootHash(ctx, hashof) if err != nil { diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index 4a85e69cb64..5b941a9218d 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -258,7 +258,7 @@ type ChunkStoreGarbageCollector interface { // // This function should not block indefinitely and should return an // error if a GC is already in progress. - BeginGC(addChunk func(hash.Hash) bool, mode GCMode) error + BeginGC(ctx context.Context, addChunk func(hash.Hash) bool, mode GCMode) error // EndGC indicates that the GC is over. The previously provided // addChunk function must not be called after this function. @@ -273,7 +273,7 @@ type ChunkStoreGarbageCollector interface { MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrs, filter HasManyFunc, dest ChunkStore, config GCConfig, incrementalUpdateManifest bool) (MarkAndSweeper, error) // Count returns the number of chunks in the store. - Count() (uint32, error) + Count(ctx context.Context) (uint32, error) // IterateAllChunks iterates over all chunks in the store, calling the provided callback for each chunk. This is // a wrapper over the internal chunkSource.iterateAllChunks() method. diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index 4572c4fda77..6453ff57387 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -332,7 +332,7 @@ func (ms *MemoryStoreView) Commit(ctx context.Context, current, last hash.Hash) return success, nil } -func (ms *MemoryStoreView) BeginGC(keeper func(hash.Hash) bool, _ GCMode) error { +func (ms *MemoryStoreView) BeginGC(_ context.Context, keeper func(hash.Hash) bool, _ GCMode) error { return ms.transitionToGC(keeper) } @@ -429,7 +429,7 @@ func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetA }, nil } -func (ms *MemoryStoreView) Count() (uint32, error) { +func (ms *MemoryStoreView) Count(_ context.Context) (uint32, error) { return uint32(len(ms.pending)), nil } diff --git a/go/store/chunks/tablefilestore.go b/go/store/chunks/tablefilestore.go index 17b8a756a8a..437f8cdc868 100644 --- a/go/store/chunks/tablefilestore.go +++ b/go/store/chunks/tablefilestore.go @@ -90,7 +90,7 @@ type TableFileStore interface { Commit(ctx context.Context, current, last hash.Hash) (bool, error) // SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing. - SupportedOperations() TableFileStoreOps + SupportedOperations(ctx context.Context) (TableFileStoreOps, error) } type TableFileSources struct { diff --git a/go/store/chunks/test_utils.go b/go/store/chunks/test_utils.go index f91cf6dd6f6..8ee21b7bfdb 100644 --- a/go/store/chunks/test_utils.go +++ b/go/store/chunks/test_utils.go @@ -75,12 +75,12 @@ func (s *TestStoreView) Put(ctx context.Context, c Chunk, getAddrs InsertAddrsCu return s.ChunkStore.Put(ctx, c, getAddrs) } -func (s *TestStoreView) BeginGC(keeper func(hash.Hash) bool, mode GCMode) error { +func (s *TestStoreView) BeginGC(ctx context.Context, keeper func(hash.Hash) bool, mode GCMode) error { collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector) if !ok { return ErrUnsupportedOperation } - return collector.BeginGC(keeper, mode) + return collector.BeginGC(ctx, keeper, mode) } func (s *TestStoreView) EndGC(mode GCMode) { @@ -99,7 +99,7 @@ func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddr return collector.MarkAndSweepChunks(ctx, getAddrs, filter, collector, gcConfig, false) } -func (s *TestStoreView) Count() (uint32, error) { +func (s *TestStoreView) Count(_ context.Context) (uint32, error) { panic("currently unused") } diff --git a/go/store/datas/database.go b/go/store/datas/database.go index ee051d7ab70..17863ee1e4b 100644 --- a/go/store/datas/database.go +++ b/go/store/datas/database.go @@ -201,13 +201,16 @@ type GarbageCollector interface { // CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all // Databases support this yet. -func CanUsePuller(db Database) bool { +func CanUsePuller(ctx context.Context, db Database) (bool, error) { cs := db.chunkStore() if tfs, ok := cs.(chunks.TableFileStore); ok { - ops := tfs.SupportedOperations() - return ops.CanRead && ops.CanWrite + ops, err := tfs.SupportedOperations(ctx) + if err != nil { + return false, err + } + return ops.CanRead && ops.CanWrite, nil } - return false + return false, nil } func GetCSStatSummaryForDB(db Database) string { diff --git a/go/store/nbs/admin_newgen_to_oldgen.go b/go/store/nbs/admin_newgen_to_oldgen.go index 2bf7c47abdc..1e96638381e 100644 --- a/go/store/nbs/admin_newgen_to_oldgen.go +++ b/go/store/nbs/admin_newgen_to_oldgen.go @@ -30,8 +30,14 @@ func MoveNewGenToOldGen( progress chan string, ) error { if gs, ok := cs.(*GenerationalNBS); ok { - srcPath, _ := gs.newGen.Path() - dstPath, _ := gs.oldGen.Path() + srcPath, _, err := gs.newGen.Path(ctx) + if err != nil { + return err + } + dstPath, _, err := gs.oldGen.Path(ctx) + if err != nil { + return err + } allFiles := make([]hash.Hash, 0, len(gs.newGen.tables.upstream)) sourceSet := gs.newGen.tables.upstream diff --git a/go/store/nbs/archive_build.go b/go/store/nbs/archive_build.go index 87c213e8943..ab517bbc25a 100644 --- a/go/store/nbs/archive_build.go +++ b/go/store/nbs/archive_build.go @@ -61,7 +61,10 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p } func unArchiveSingleBlockStore(ctx context.Context, blockStore *NomsBlockStore, smd StorageMetadata, progress chan interface{}) error { - outPath, _ := blockStore.Path() + outPath, _, err := blockStore.Path(ctx) + if err != nil { + return err + } // The source set changes out from under us, but the names of the table files will stay stable enough // to iterate over them. @@ -164,7 +167,10 @@ func archiveSingleBlockStore(ctx context.Context, blockStore *NomsBlockStore, da // Currently, we don't have any stats to report. Required for calls to the lower layers tho. var stats Stats - path, _ := blockStore.Path() + path, _, err := blockStore.Path(ctx) + if err != nil { + return err + } allFiles := make([]hash.Hash, 0, len(blockStore.tables.upstream)) diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index 1aa600293b8..b97c0e1b0ca 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -35,6 +35,7 @@ import ( "sync" "time" + "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" dherrors "github.com/dolthub/dolt/go/libraries/utils/errors" "github.com/dolthub/dolt/go/libraries/utils/file" "github.com/dolthub/dolt/go/store/chunks" @@ -42,6 +43,20 @@ import ( "github.com/dolthub/dolt/go/store/util/tempfiles" ) +var loadingTableFilesDisabled bool + +func init() { + if os.Getenv(dconfig.EnvAssertNoTableFilesRead) != "" { + loadingTableFilesDisabled = true + } +} + +func PanicIfLoadingTableFilesDisabled() { + if loadingTableFilesDisabled { + panic("tried to load a table file or journal file but loading table files is disabled") + } +} + const tempTablePrefix = "nbs_table_" func newFSTablePersister(dir string, q MemoryQuotaProvider, mmapArchiveIndexes bool) tablePersister { @@ -145,6 +160,7 @@ func (ftplc *fsTablePersisterRefCounter) addRef() { } func (ftp *fsTablePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) { + PanicIfLoadingTableFilesDisabled() ftp.pruneMu.RLock() defer ftp.pruneMu.RUnlock() rc := fsTablePersisterRefCounter{ftp, name} diff --git a/go/store/nbs/gc_waitforgc_test.go b/go/store/nbs/gc_waitforgc_test.go index ee55a73d50f..61cf227b959 100644 --- a/go/store/nbs/gc_waitforgc_test.go +++ b/go/store/nbs/gc_waitforgc_test.go @@ -76,7 +76,7 @@ func TestWaitForGCNotTrappedAcrossCycles(t *testing.T) { return true } - require.NoError(t, st.BeginGC(cycle1Keeper, chunks.GCMode_Full)) + require.NoError(t, st.BeginGC(t.Context(), cycle1Keeper, chunks.GCMode_Full)) // Launch a goroutine that Puts the same chunk again. addChunk holds // nbs.mu for the entire call. The chunk already exists in the diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index 66235eeed85..1ff91d0292a 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -397,7 +397,10 @@ func (gcs *GenerationalNBS) Sources(ctx context.Context) (chunks.TableFileSource ret.Root = newgensources.Root ret.TableFiles = append([]chunks.TableFile{}, newgensources.TableFiles...) ret.AppendixTableFiles = append([]chunks.TableFile{}, newgensources.AppendixTableFiles...) - prefix := gcs.RelativeOldGenPath() + prefix, err := gcs.RelativeOldGenPath(ctx) + if err != nil { + return chunks.TableFileSources{}, err + } for _, tf := range oldgensources.TableFiles { ret.TableFiles = append(ret.TableFiles, prefixedTableFile{tf, prefix}) } @@ -446,8 +449,8 @@ func (gcs *GenerationalNBS) PruneTableFiles(ctx context.Context) error { } // SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing. -func (gcs *GenerationalNBS) SupportedOperations() chunks.TableFileStoreOps { - return gcs.newGen.SupportedOperations() +func (gcs *GenerationalNBS) SupportedOperations(ctx context.Context) (chunks.TableFileStoreOps, error) { + return gcs.newGen.SupportedOperations(ctx) } func (gcs *GenerationalNBS) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { @@ -456,7 +459,10 @@ func (gcs *GenerationalNBS) GetChunkLocationsWithPaths(ctx context.Context, hash return nil, err } if len(hashes) > 0 { - prefix := gcs.RelativeOldGenPath() + prefix, err := gcs.RelativeOldGenPath(ctx) + if err != nil { + return nil, err + } toadd, err := gcs.oldGen.GetChunkLocationsWithPaths(ctx, hashes) if err != nil { return nil, err @@ -485,19 +491,25 @@ func (gcs *GenerationalNBS) GetChunkLocations(ctx context.Context, hashes hash.H return res, nil } -func (gcs *GenerationalNBS) RelativeOldGenPath() string { - newgenpath, ngpok := gcs.newGen.Path() - oldgenpath, ogpok := gcs.oldGen.Path() +func (gcs *GenerationalNBS) RelativeOldGenPath(ctx context.Context) (string, error) { + newgenpath, ngpok, err := gcs.newGen.Path(ctx) + if err != nil { + return "", err + } + oldgenpath, ogpok, err := gcs.oldGen.Path(ctx) + if err != nil { + return "", err + } if ngpok && ogpok { if p, err := filepath.Rel(newgenpath, oldgenpath); err == nil { - return p + return p, nil } } - return "oldgen" + return "oldgen", nil } -func (gcs *GenerationalNBS) Path() (string, bool) { - return gcs.newGen.Path() +func (gcs *GenerationalNBS) Path(ctx context.Context) (string, bool, error) { + return gcs.newGen.Path(ctx) } func (gcs *GenerationalNBS) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) { @@ -510,8 +522,8 @@ func (gcs *GenerationalNBS) OldGenGCFilter() chunks.HasManyFunc { } } -func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool, mode chunks.GCMode) error { - err := gcs.newGen.BeginGC(keeper, mode) +func (gcs *GenerationalNBS) BeginGC(ctx context.Context, keeper func(hash.Hash) bool, mode chunks.GCMode) error { + err := gcs.newGen.BeginGC(ctx, keeper, mode) if err != nil { return err } @@ -521,7 +533,7 @@ func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool, mode chunks.GCM // going away. In Full mode, we want to take read dependencies // from the OldGen as well. if mode == chunks.GCMode_Full { - err = gcs.oldGen.BeginGC(keeper, mode) + err = gcs.oldGen.BeginGC(ctx, keeper, mode) if err != nil { gcs.newGen.EndGC(mode) return err @@ -553,20 +565,27 @@ func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk return nil } -func (gcs *GenerationalNBS) TolerantIterateAllChunks(ctx context.Context, cb func(chunks.Chunk), errCb func(sourceFile string, err error)) { - gcs.newGen.TolerantIterateAllChunks(ctx, cb, errCb) +func (gcs *GenerationalNBS) TolerantIterateAllChunks(ctx context.Context, cb func(chunks.Chunk), errCb func(sourceFile string, err error)) error { + err := gcs.newGen.TolerantIterateAllChunks(ctx, cb, errCb) + if err != nil { + return err + } if ctx.Err() != nil { - return + return nil } - gcs.oldGen.TolerantIterateAllChunks(ctx, cb, errCb) + err = gcs.oldGen.TolerantIterateAllChunks(ctx, cb, errCb) + if err != nil { + return err + } + return nil } -func (gcs *GenerationalNBS) Count() (uint32, error) { - newGenCnt, err := gcs.newGen.Count() +func (gcs *GenerationalNBS) Count(ctx context.Context) (uint32, error) { + newGenCnt, err := gcs.newGen.Count(ctx) if err != nil { return 0, err } - oldGenCnt, err := gcs.oldGen.Count() + oldGenCnt, err := gcs.oldGen.Count(ctx) if err != nil { return 0, err } diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index d5e5ce341d2..b9f59c80bc3 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -80,6 +80,7 @@ var _ manifest = journalManifestWrapper{} // NomsBlockStore is not yet constructed at this point, so callers pass FatalBehaviorError to // fail store creation rather than crash the process. func newChunkJournal(ctx context.Context, nbfVers, dir string, m *journalManifest, p *fsTablePersister, behavior dherrors.FatalBehavior, warningsCb func(error)) (*ChunkJournal, error) { + PanicIfLoadingTableFilesDisabled() path, err := filepath.Abs(filepath.Join(dir, chunkJournalName)) if err != nil { return nil, err @@ -607,22 +608,26 @@ func (c journalConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees []t return c.child.chooseConjoinees(pruned) } -// newJournalManifest makes a new file manifest. -// When failOnTimeout is true, callers want a hard error instead of falling back to read-only mode. -// (The behavior change is implemented separately; this is the plumbing flag.) -func newJournalManifest(ctx context.Context, dir string, failOnTimeout bool) (m *journalManifest, err error) { +func newJournalLock(dir string, failOnTimeout bool) (*fslock.Lock, chunks.ExclusiveAccessMode, error) { lock := fslock.New(filepath.Join(dir, lockFileName)) // try to take the file lock. if we fail, make the manifest read-only. // if we succeed, hold the file lock until we close the journalManifest - err = lock.LockWithTimeout(lockFileTimeout) + err := lock.LockWithTimeout(lockFileTimeout) if errors.Is(err, fslock.ErrTimeout) { if failOnTimeout { - return nil, ErrDatabaseLocked + return nil, chunks.ExclusiveAccessMode_ReadOnly, ErrDatabaseLocked } - lock, err = nil, nil // read only + return nil, chunks.ExclusiveAccessMode_ReadOnly, nil } else if err != nil { - return nil, err + return nil, chunks.ExclusiveAccessMode_ReadOnly, err } + return lock, chunks.ExclusiveAccessMode_Exclusive, nil +} + +// newJournalManifest makes a new file manifest. +// When failOnTimeout is true, callers want a hard error instead of falling back to read-only mode. +// (The behavior change is implemented separately; this is the plumbing flag.) +func newJournalManifest(ctx context.Context, dir string, lock *fslock.Lock) (m *journalManifest, err error) { m = &journalManifest{dir: dir, lock: lock} var f *os.File diff --git a/go/store/nbs/journal_record.go b/go/store/nbs/journal_record.go index de91bcfde44..c1c5d62a83b 100644 --- a/go/store/nbs/journal_record.go +++ b/go/store/nbs/journal_record.go @@ -274,7 +274,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) noOp := func(o int64, r journalRec) error { return nil } // First verify that the journal has data loss. var offset int64 - offset, err = processJournalRecords(context.Background(), journalFile, true /* tryTruncate */, 0, noOp, nil) + offset, err = processJournalRecords(context.Background(), journalPath, journalFile, true /* tryTruncate */, 0, noOp, nil) if err == nil { // No data loss detected, nothing to do. return "", fmt.Errorf("no data loss detected in chunk journal file; no recovery performed") @@ -420,7 +420,7 @@ func processJournalRecordsReader(ctx context.Context, r io.Reader, offin int64, // // The |warningsCb| callback is called with any errors encountered that we automatically recover from. This allows the caller // to handle the situation in a context specific way. -func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate bool, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) { +func processJournalRecords(ctx context.Context, path string, r io.ReadSeeker, tryTruncate bool, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) { var ( recovered bool rdr *bufio.Reader @@ -451,7 +451,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate boo } } if dataLossFound { - return off, NewJournalDataLossError(off) + return off, NewJournalDataLossError(path, off) } } @@ -480,8 +480,11 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate boo var ErrJournalDataLoss = errors.New("corrupted journal") -func NewJournalDataLossError(offset int64) error { - return fmt.Errorf("possible data loss detected in journal file at offset %d: %w", offset, ErrJournalDataLoss) +// Because the database might be lazily loaded, this error can end up returned from many paths on +// first access of the database. For that reason, includes UX instructions to the user, in a slight +// layering violation. +func NewJournalDataLossError(path string, offset int64) error { + return fmt.Errorf("possible data loss detected in journal file %s at offset %d: %w\nplease run 'dolt fsck' to assess the damage and attempt repairs", path, offset, ErrJournalDataLoss) } // possibleDataLossCheck checks for parsable data remaining in |reader| which constitutes data loss. When calling this diff --git a/go/store/nbs/journal_record_test.go b/go/store/nbs/journal_record_test.go index fa1154a7b1f..cd91d7537f1 100644 --- a/go/store/nbs/journal_record_test.go +++ b/go/store/nbs/journal_record_test.go @@ -116,7 +116,7 @@ func TestProcessJournalRecords(t *testing.T) { } var recoverErr error - n, err := processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) + n, err := processJournalRecords(ctx, "", bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) require.NoError(t, err) @@ -125,7 +125,7 @@ func TestProcessJournalRecords(t *testing.T) { // write a bogus record to the end and verify that we don't get an error i, sum = 0, 0 writeCorruptJournalRecord(journal[off:]) - n, err = processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) + n, err = processJournalRecords(ctx, "", bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) require.NoError(t, err) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) @@ -227,7 +227,7 @@ func TestJournalForDataLoss(t *testing.T) { } var recoverErr error - _, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), true, 0, check, func(e error) { recoverErr = e }) + _, err := processJournalRecords(ctx, "", bytes.NewReader(journal[:off]), true, 0, check, func(e error) { recoverErr = e }) if td.lossExpected { require.Error(t, err) @@ -269,7 +269,7 @@ func TestJournalTruncated(t *testing.T) { off += writeRootHashRecord(journal[off:], r.address) for i := range 16 { var recoverErr error - _, err := processJournalRecords(t.Context(), bytes.NewReader(journal[:int(off)-i]), true, 0, func(int64, journalRec) error { return nil }, func(e error) { recoverErr = e }) + _, err := processJournalRecords(t.Context(), "", bytes.NewReader(journal[:int(off)-i]), true, 0, func(int64, journalRec) error { return nil }, func(e error) { recoverErr = e }) require.NoError(t, err, "err should be nil, iteration %d", i) require.NoError(t, recoverErr, "recoverErr should be nil, iteration %d", i) } @@ -285,7 +285,7 @@ func TestJournalTruncated(t *testing.T) { off += writeChunkRecord(journal[off:], mustCompressedChunk(r)) } } - _, err := processJournalRecords(t.Context(), bytes.NewReader(journal[:off]), true, 0, func(int64, journalRec) error { return nil }, nil) + _, err := processJournalRecords(t.Context(), "", bytes.NewReader(journal[:off]), true, 0, func(int64, journalRec) error { return nil }, nil) require.Error(t, err) } @@ -334,7 +334,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) { // no confidence in the rest of the test. ctx := context.Background() var recoverErr error - bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) + bytesRead, err := processJournalRecords(ctx, "", bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) require.NoError(t, err) require.Equal(t, off, uint32(bytesRead)) require.Error(t, recoverErr) // We do expect a warning here, but no data loss. @@ -358,7 +358,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) { // Copy lost data into journal buffer at the test offset. copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], lostData) - _, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) + _, err := processJournalRecords(ctx, "", bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) require.Error(t, err) require.True(t, errors.Is(err, ErrJournalDataLoss)) require.Error(t, recoverErr) @@ -502,7 +502,7 @@ func processJournalAndCollectRecords(t *testing.T, journalData []byte) []testRec t.FailNow() } - _, err := processJournalRecords(ctx, bytes.NewReader(journalData), true, 0, func(offset int64, rec journalRec) error { + _, err := processJournalRecords(ctx, "", bytes.NewReader(journalData), true, 0, func(offset int64, rec journalRec) error { records = append(records, testRecord{hash: rec.address, kind: rec.kind}) return nil }, warnCb) diff --git a/go/store/nbs/journal_test.go b/go/store/nbs/journal_test.go index 04245fb9b2b..03c5eb1106d 100644 --- a/go/store/nbs/journal_test.go +++ b/go/store/nbs/journal_test.go @@ -37,7 +37,9 @@ func makeTestChunkJournal(t *testing.T) *ChunkJournal { dir, err := os.MkdirTemp("", "") require.NoError(t, err) t.Cleanup(func() { file.RemoveAll(dir) }) - m, err := newJournalManifest(ctx, dir, false) + l, _, err := newJournalLock(dir, false) + require.NoError(t, err) + m, err := newJournalManifest(ctx, dir, l) require.NoError(t, err) q := NewUnlimitedMemQuotaProvider() p := newFSTablePersister(dir, q, false) @@ -52,7 +54,9 @@ func makeTestChunkJournal(t *testing.T) *ChunkJournal { } func openTestChunkJournal(t *testing.T, dir string) *ChunkJournal { - m, err := newJournalManifest(t.Context(), dir, false) + l, _, err := newJournalLock(dir, false) + require.NoError(t, err) + m, err := newJournalManifest(t.Context(), dir, l) require.NoError(t, err) q := NewUnlimitedMemQuotaProvider() p := newFSTablePersister(dir, q, false) @@ -107,8 +111,7 @@ func TestChunkJournalReadOnly(t *testing.T) { rw := makeTestChunkJournal(t) assert.Equal(t, chunks.ExclusiveAccessMode(chunks.ExclusiveAccessMode_Exclusive), rw.AccessMode()) - _, err := newJournalManifest(t.Context(), rw.backing.dir, true) - require.Error(t, err) + _, _, err := newJournalLock(rw.backing.dir, true) require.ErrorIs(t, err, ErrDatabaseLocked) }) } diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index 0e9c62f16c0..4392e5866f8 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -277,7 +277,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, canWrite bool, re // process the non-indexed portion of the journal starting at |wr.indexed|, // at minimum the non-indexed portion will include a root hash record. // Index lookups are added to the ongoing batch to re-synchronize. - wr.off, err = processJournalRecords(ctx, wr.journal, canWrite, wr.indexed, func(o int64, r journalRec) error { + wr.off, err = processJournalRecords(ctx, p, wr.journal, canWrite, wr.indexed, func(o int64, r journalRec) error { switch r.kind { case chunkJournalRecKind: rng := Range{ diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index 3b77f9f65d6..933ca3064df 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -62,12 +62,12 @@ func (nbsMW *NBSMetricWrapper) AddTableFilesToManifest(ctx context.Context, file } // Forwards SupportedOperations to wrapped block store. -func (nbsMW *NBSMetricWrapper) SupportedOperations() chunks.TableFileStoreOps { - return nbsMW.nbs.SupportedOperations() +func (nbsMW *NBSMetricWrapper) SupportedOperations(ctx context.Context) (chunks.TableFileStoreOps, error) { + return nbsMW.nbs.SupportedOperations(ctx) } -func (nbsMW *NBSMetricWrapper) BeginGC(keeper func(hash.Hash) bool, mode chunks.GCMode) error { - return nbsMW.nbs.BeginGC(keeper, mode) +func (nbsMW *NBSMetricWrapper) BeginGC(ctx context.Context, keeper func(hash.Hash) bool, mode chunks.GCMode) error { + return nbsMW.nbs.BeginGC(ctx, keeper, mode) } func (nbsMW *NBSMetricWrapper) EndGC(mode chunks.GCMode) { @@ -78,8 +78,8 @@ func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, getAddrs return nbsMW.nbs.MarkAndSweepChunks(ctx, getAddrs, filter, dest, gcConfig, incrementalUpdateManifest) } -func (nbsMW *NBSMetricWrapper) Count() (uint32, error) { - return nbsMW.nbs.Count() +func (nbsMW *NBSMetricWrapper) Count(ctx context.Context) (uint32, error) { + return nbsMW.nbs.Count(ctx) } func (nbsMW *NBSMetricWrapper) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index d5c67ca1503..772fa164323 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -49,6 +49,7 @@ import ( "github.com/dolthub/dolt/go/libraries/utils/valctx" "github.com/dolthub/dolt/go/store/blobstore" "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/constants" "github.com/dolthub/dolt/go/store/hash" ) @@ -139,6 +140,16 @@ type NomsBlockStore struct { fatalBehavior dherrors.FatalBehavior closed bool + + staticAccessMode chunks.ExclusiveAccessMode + staticVersion string + loadOnce sync.Once + + // If loadThunk is passed |false|, it is being called as part + // of Close(). It should clean up any retained resources, + // instead of loading the database. + loadThunk func(ctx context.Context, loadIt bool) + loadErr error } func (nbs *NomsBlockStore) PersistGhostHashes(ctx context.Context, refs hash.HashSet) error { @@ -161,8 +172,20 @@ type Range struct { DictLength uint32 } +func (nbs *NomsBlockStore) ensureLoad(ctx context.Context) error { + if nbs.loadThunk != nil { + nbs.loadOnce.Do(func() { + nbs.loadThunk(ctx, true) + }) + } + return nbs.loadErr +} + // IterateRoots iterates over the in-memory roots tracked by the ChunkJournal, if there is one. -func (nbs *NomsBlockStore) IterateRoots(f func(root string, timestamp *time.Time) error) error { +func (nbs *NomsBlockStore) IterateRoots(ctx context.Context, f func(root string, timestamp *time.Time) error) error { + if err := nbs.ensureLoad(ctx); err != nil { + return err + } cj := nbs.chunkJournal() if cj == nil { return nil @@ -178,16 +201,22 @@ func (nbs *NomsBlockStore) chunkJournal() *ChunkJournal { return nil } -func (nbs *NomsBlockStore) ChunkJournalSize() (int64, bool) { +func (nbs *NomsBlockStore) ChunkJournalSize(ctx context.Context) (int64, bool, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return 0, false, err + } nbs.mu.Lock() defer nbs.mu.Unlock() if cj := nbs.chunkJournal(); cj != nil { - return cj.Size(), true + return cj.Size(), true, nil } - return 0, false + return 0, false, nil } func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return nil, err + } valctx.ValidateContext(ctx) sourcesToRanges, err := nbs.getChunkLocations(ctx, hashes) if err != nil { @@ -271,6 +300,9 @@ func (nbs *NomsBlockStore) getChunkLocations(ctx context.Context, hashes hash.Ha } func (nbs *NomsBlockStore) GetChunkLocations(ctx context.Context, hashes hash.HashSet) (map[hash.Hash]map[hash.Hash]Range, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return nil, err + } valctx.ValidateContext(ctx) sourcesToRanges, err := nbs.getChunkLocations(ctx, hashes) if err != nil { @@ -394,6 +426,9 @@ func (nbs *NomsBlockStore) finalizeConjoin(ctx context.Context, err error) { } func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (ManifestInfo, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return nil, err + } valctx.ValidateContext(ctx) sources, err := nbs.openChunkSourcesForManifestUpdateAndRebase(ctx, updates, nil) if err != nil { @@ -545,6 +580,9 @@ func (nbs *NomsBlockStore) updateManifestAddFiles(ctx context.Context, updates m } func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updates map[hash.Hash]uint32, option ManifestAppendixOption) (ManifestInfo, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return nil, err + } valctx.ValidateContext(ctx) sources, err := nbs.openChunkSourcesForManifestUpdateAndRebase(ctx, updates, nil) if err != nil { @@ -606,6 +644,9 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp // assumes that stores grow monotonically unless the |gcGen| of a manifest changes. Since this interface // cannot set |gcGen|, callers must ensure that calls to this function grow the store monotonically. func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root hash.Hash, tableFiles map[hash.Hash]uint32, appendixTableFiles map[hash.Hash]uint32) (err error) { + if err := store.ensureLoad(ctx); err != nil { + return err + } store.mu.Lock() defer store.mu.Unlock() contents := manifestContents{ @@ -757,28 +798,69 @@ func NewLocalJournalingStoreWithOptions(ctx context.Context, nbfVers, dir string return nil, err } - m, err := newJournalManifest(ctx, dir, opts.FailOnLockTimeout) + lock, staticAccessMode, err := newJournalLock(dir, opts.FailOnLockTimeout) if err != nil { return nil, err } - p := newFSTablePersister(dir, q, mmapArchiveIndexes) - // The NomsBlockStore is not constructed yet, so bootstrapping errors should fail store - // creation rather than crash the process. Callers configure crash behavior afterwards - // via SetFatalBehavior. - journal, err := newChunkJournal(ctx, nbfVers, dir, m, p.(*fsTablePersister), dherrors.FatalBehaviorError, warningsCb) + nbs, err := newEmptyNomsBlockStore(defaultMemTableSize) if err != nil { + if lock != nil { + lock.Unlock() + } return nil, err } + nbs.staticAccessMode = staticAccessMode + nbs.staticVersion = constants.FormatDoltString + nbs.loadThunk = func(ctx context.Context, loadIt bool) { + if loadIt == false { + if lock != nil { + lock.Unlock() + } + return + } + m, err := newJournalManifest(ctx, dir, lock) + if err != nil { + nbs.loadErr = err + if lock != nil { + lock.Unlock() + } + return + } + p := newFSTablePersister(dir, q, mmapArchiveIndexes).(*fsTablePersister) - // |journal| serves as both the tablePersister and (wrapped) the manifest. - // The wrapper keeps the two roles' Close paths distinct: the persister path - // closes the journal writer, while the manifest path releases the backing - // manifest's file lock. - mm := manifest(journalManifestWrapper{journal: journal}) - c := journalConjoiner{child: inlineConjoiner{defaultMaxTables}} + // The NomsBlockStore is not constructed yet, so bootstrapping errors should fail store + // creation rather than crash the process. Callers configure crash behavior afterwards + // via SetFatalBehavior. + journal, err := newChunkJournal(ctx, nbfVers, dir, m, p, dherrors.FatalBehaviorError, warningsCb) + if err != nil { + m.Close() + nbs.loadErr = err + return + } + + // |journal| serves as both the tablePersister and (wrapped) the manifest. + // The wrapper keeps the two roles' Close paths distinct: the persister path + // closes the journal writer, while the manifest path releases the backing + // manifest's file lock. + mm := manifest(journalManifestWrapper{journal: journal}) + c := journalConjoiner{child: inlineConjoiner{defaultMaxTables}} + + nbs.manifest = mm + nbs.persister = journal + nbs.conjoiner = c + nbs.tables = newTableSet(journal, q) + nbs.upstream = manifestContents{nbfVers: nbfVers} - return newNomsBlockStore(ctx, nbfVers, mm, journal, q, c, defaultMemTableSize) + if err = nbs.rebase(ctx); err != nil { + journal.Close() + mm.Close() + nbs.loadErr = err + return + } + } + + return nbs, nil } func checkDir(dir string) error { @@ -792,22 +874,15 @@ func checkDir(dir string) error { return nil } -func newNomsBlockStore(ctx context.Context, nbfVerStr string, m manifest, p tablePersister, q MemoryQuotaProvider, c conjoinStrategy, memTableSize uint64) (*NomsBlockStore, error) { +func newEmptyNomsBlockStore(memTableSize uint64) (*NomsBlockStore, error) { if memTableSize == 0 { memTableSize = defaultMemTableSize } - hasCache, err := lru.New2Q[hash.Hash, struct{}](hasCacheSize) if err != nil { return nil, err } - nbs := &NomsBlockStore{ - manifest: m, - persister: p, - conjoiner: c, - tables: newTableSet(p, q), - upstream: manifestContents{nbfVers: nbfVerStr}, memtableSz: memTableSize, hasCache: hasCache, stats: NewStats(), @@ -815,6 +890,20 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, m manifest, p tabl } nbs.gcCond = sync.NewCond(&nbs.mu) nbs.conjoinOpCond = sync.NewCond(&nbs.mu) + return nbs, nil +} + +func newNomsBlockStore(ctx context.Context, nbfVerStr string, m manifest, p tablePersister, q MemoryQuotaProvider, c conjoinStrategy, memTableSize uint64) (*NomsBlockStore, error) { + nbs, err := newEmptyNomsBlockStore(memTableSize) + if err != nil { + return nil, err + } + + nbs.manifest = m + nbs.persister = p + nbs.conjoiner = c + nbs.tables = newTableSet(p, q) + nbs.upstream = manifestContents{nbfVers: nbfVerStr} t1 := time.Now() defer nbs.stats.OpenLatency.SampleTimeSince(t1) @@ -871,6 +960,9 @@ func (nbs *NomsBlockStore) waitForGC(ctx context.Context, cycle uint64) error { } func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.InsertAddrsCurry) error { + if err := nbs.ensureLoad(ctx); err != nil { + return err + } valctx.ValidateContext(ctx) return nbs.putChunk(ctx, c, getAddrs, nbs.refCheck) } @@ -994,6 +1086,9 @@ func (nbs *NomsBlockStore) errorIfDangling(root hash.Hash, checker refCheck) err } func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return chunks.Chunk{}, err + } valctx.ValidateContext(ctx) ctx, span := tracer.Start(ctx, "nbs.Get") defer span.End() @@ -1058,6 +1153,9 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, } func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error { + if err := nbs.ensureLoad(ctx); err != nil { + return err + } valctx.ValidateContext(ctx) ctx, span := tracer.Start(ctx, "nbs.GetMany", trace.WithAttributes(attribute.Int("num_hashes", len(hashes)))) defer span.End() @@ -1069,6 +1167,9 @@ func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, fou } func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, ToChunker)) error { + if err := nbs.ensureLoad(ctx); err != nil { + return err + } valctx.ValidateContext(ctx) return nbs.getManyCompressed(ctx, hashes, found, gcDependencyMode_TakeDependency) } @@ -1178,7 +1279,10 @@ func toGetRecords(hashes hash.HashSet) []getRecord { return reqs } -func (nbs *NomsBlockStore) Count() (uint32, error) { +func (nbs *NomsBlockStore) Count(ctx context.Context) (uint32, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return 0, err + } count, tables := func() (count uint32, tables chunkReader) { nbs.mu.RLock() defer nbs.mu.RUnlock() @@ -1193,6 +1297,9 @@ func (nbs *NomsBlockStore) Count() (uint32, error) { } func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return false, err + } valctx.ValidateContext(ctx) t1 := time.Now() defer func() { @@ -1251,6 +1358,9 @@ func (nbs *NomsBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) { } func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return nil, err + } valctx.ValidateContext(ctx) return nbs.hasManyDep(ctx, hashes, gcDependencyMode_TakeDependency) } @@ -1384,6 +1494,9 @@ func toHasRecords(hashes hash.HashSet) []hasRecord { } func (nbs *NomsBlockStore) Rebase(ctx context.Context) error { + if err := nbs.ensureLoad(ctx); err != nil { + return err + } valctx.ValidateContext(ctx) nbs.mu.Lock() defer nbs.mu.Unlock() @@ -1419,6 +1532,9 @@ func (nbs *NomsBlockStore) rebase(ctx context.Context) error { } func (nbs *NomsBlockStore) Root(ctx context.Context) (hash.Hash, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return hash.Hash{}, err + } valctx.ValidateContext(ctx) nbs.mu.RLock() defer nbs.mu.RUnlock() @@ -1426,6 +1542,9 @@ func (nbs *NomsBlockStore) Root(ctx context.Context) (hash.Hash, error) { } func (nbs *NomsBlockStore) Commit(ctx context.Context, current, last hash.Hash) (success bool, err error) { + if err := nbs.ensureLoad(ctx); err != nil { + return false, err + } valctx.ValidateContext(ctx) return nbs.commit(ctx, current, last, nbs.refCheck) } @@ -1588,16 +1707,32 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has } func (nbs *NomsBlockStore) Version() string { + if nbs.staticVersion != "" { + return nbs.staticVersion + } nbs.mu.RLock() defer nbs.mu.RUnlock() return nbs.upstream.nbfVers } func (nbs *NomsBlockStore) AccessMode() chunks.ExclusiveAccessMode { + if nbs.loadThunk != nil { + return nbs.staticAccessMode + } return nbs.persister.AccessMode() } func (nbs *NomsBlockStore) Close() error { + if nbs.loadThunk != nil { + loaded := true + nbs.loadOnce.Do(func() { + nbs.loadThunk(context.Background(), false) + loaded = false + }) + if !loaded || nbs.loadErr != nil { + return nil + } + } nbs.mu.Lock() defer nbs.mu.Unlock() nbs.closed = true @@ -1624,6 +1759,9 @@ func (nbs *NomsBlockStore) Stats() interface{} { } func (nbs *NomsBlockStore) StatsSummary() string { + if err := nbs.ensureLoad(context.TODO()); err != nil { + return "failed to load" + } nbs.mu.Lock() defer nbs.mu.Unlock() cnt := nbs.tables.count() @@ -1668,6 +1806,9 @@ func (tf tableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) { // Sources retrieves the current root hash, a list of all table files (which may include appendix tablefiles), // and a second list of only the appendix table files func (nbs *NomsBlockStore) Sources(ctx context.Context) (chunks.TableFileSources, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return chunks.TableFileSources{}, err + } valctx.ValidateContext(ctx) nbs.mu.Lock() defer nbs.mu.Unlock() @@ -1749,6 +1890,9 @@ func newTableFile(cs chunkSource, info tableSpec, behavior dherrors.FatalBehavio } func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return 0, err + } nbs.mu.Lock() defer nbs.mu.Unlock() @@ -1775,7 +1919,10 @@ func (nbs *NomsBlockStore) chunkSourcesByAddr() (map[hash.Hash]chunkSource, erro } -func (nbs *NomsBlockStore) SupportedOperations() chunks.TableFileStoreOps { +func (nbs *NomsBlockStore) SupportedOperations(ctx context.Context) (chunks.TableFileStoreOps, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return chunks.TableFileStoreOps{}, err + } var ok bool _, ok = nbs.persister.(tableFilePersister) @@ -1784,23 +1931,29 @@ func (nbs *NomsBlockStore) SupportedOperations() chunks.TableFileStoreOps { CanWrite: ok, CanPrune: ok, CanGC: ok, - } + }, nil } -func (nbs *NomsBlockStore) Path() (string, bool) { +func (nbs *NomsBlockStore) Path(ctx context.Context) (string, bool, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return "", false, err + } if tfp, ok := nbs.persister.(tableFilePersister); ok { switch p := tfp.(type) { case *fsTablePersister, *ChunkJournal: - return p.Path(), true + return p.Path(), true, nil default: - return "", false + return "", false, nil } } - return "", false + return "", false, nil } // WriteTableFile will read a table file from the provided reader and write it to the TableFileStore func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, splitOffset uint64, numChunks int, _ []byte, getRd func() (io.ReadCloser, uint64, error)) (io.Closer, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return nil, err + } valctx.ValidateContext(ctx) tfp, ok := nbs.persister.(tableFilePersister) if !ok { @@ -1827,6 +1980,9 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, // AddTableFilesToManifest adds table files to the manifest func (nbs *NomsBlockStore) AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int, getAddrs chunks.InsertAddrsCurry) error { + if err := nbs.ensureLoad(ctx); err != nil { + return err + } valctx.ValidateContext(ctx) return nbs.addTableFilesToManifest(ctx, fileIdToNumChunks, getAddrs, nbs.refCheck, nil) } @@ -2002,6 +2158,9 @@ func (nbs *NomsBlockStore) openChunkSourcesForManifestUpdateAndRebase(ctx contex // PruneTableFiles deletes old table files that are no longer referenced in the manifest. func (nbs *NomsBlockStore) PruneTableFiles(ctx context.Context) (err error) { + if err := nbs.ensureLoad(ctx); err != nil { + return err + } valctx.ValidateContext(ctx) return nbs.pruneTableFiles(ctx) } @@ -2010,7 +2169,10 @@ func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context) (err error) { return nbs.persister.PruneTableFiles(ctx) } -func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool, _ chunks.GCMode) error { +func (nbs *NomsBlockStore) BeginGC(ctx context.Context, keeper func(hash.Hash) bool, _ chunks.GCMode) error { + if err := nbs.ensureLoad(ctx); err != nil { + return err + } nbs.mu.Lock() defer nbs.mu.Unlock() return nbs.lockedBeginGC(keeper) @@ -2099,6 +2261,9 @@ func (nbs *NomsBlockStore) beginRead() (endRead func()) { } func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, getAddrs chunks.GetAddrs, filter chunks.HasManyFunc, dest chunks.ChunkStore, gcConfig chunks.GCConfig, incrementalUpdateManifest bool) (chunks.MarkAndSweeper, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return nil, err + } valctx.ValidateContext(ctx) return markAndSweepChunks(ctx, nbs, nbs, dest, getAddrs, filter, gcConfig, incrementalUpdateManifest) } @@ -2119,7 +2284,10 @@ func (nbs *NomsBlockStore) hasLocalGCNovelty() bool { } func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src CompressedChunkStoreForGC, dest chunks.ChunkStore, getAddrs chunks.GetAddrs, filter chunks.HasManyFunc, gcConfig chunks.GCConfig, incrementalUpdateManifest bool) (chunks.MarkAndSweeper, error) { - ops := nbs.SupportedOperations() + ops, err := nbs.SupportedOperations(ctx) + if err != nil { + return nil, err + } if !ops.CanGC || !ops.CanPrune { return nil, chunks.ErrUnsupportedOperation } @@ -2160,7 +2328,7 @@ func markAndSweepChunks(ctx context.Context, nbs *NomsBlockStore, src Compressed } return nil } - err := precheck() + err = precheck() if err != nil { return nil, err } @@ -2413,6 +2581,9 @@ func (gcf gcFinalizer) Close() error { } func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { + if err := nbs.ensureLoad(ctx); err != nil { + return err + } for _, v := range nbs.tables.novel { err := v.iterateAllChunks(ctx, cb, nbs.stats) if err != nil { @@ -2434,21 +2605,25 @@ func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk c return nil } -func (nbs *NomsBlockStore) TolerantIterateAllChunks(ctx context.Context, cb func(chunks.Chunk), errCb func(sourceFile string, err error)) { +func (nbs *NomsBlockStore) TolerantIterateAllChunks(ctx context.Context, cb func(chunks.Chunk), errCb func(sourceFile string, err error)) error { + if err := nbs.ensureLoad(ctx); err != nil { + return err + } for _, v := range nbs.tables.novel { fileName := v.hash().String() + v.suffix() v.tolerantIterateAllChunks(ctx, cb, func(err error) { errCb(fileName, err) }, nbs.stats) if ctx.Err() != nil { - return + return nil } } for _, v := range nbs.tables.upstream { fileName := v.hash().String() + v.suffix() v.tolerantIterateAllChunks(ctx, cb, func(err error) { errCb(fileName, err) }, nbs.stats) if ctx.Err() != nil { - return + return nil } } + return nil } func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec, mode chunks.GCMode, srcs chunkSourceSet) (err error) { @@ -2554,6 +2729,9 @@ func CalcReads(nbs *NomsBlockStore, hashes hash.HashSet, blockSize uint64, keepe // files in oldgen are conjoined together. // Returns the hash of the newly created conjoined table file. func (nbs *NomsBlockStore) ConjoinTableFiles(ctx context.Context, storageIds []hash.Hash) (hash.Hash, error) { + if err := nbs.ensureLoad(ctx); err != nil { + return hash.Hash{}, err + } nbs.mu.Lock() defer nbs.mu.Unlock() diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index b6530a8c59b..e7f8b7aecd2 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -344,7 +344,7 @@ func TestNBSCopyGC(t *testing.T) { require.NoError(t, err) require.True(t, ok) - require.NoError(t, st.BeginGC(nil, chunks.GCMode_Full)) + require.NoError(t, st.BeginGC(t.Context(), nil, chunks.GCMode_Full)) noopFilter := func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { return hashes, nil } diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index 0ac83363fdb..1328492ea9e 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -589,7 +589,7 @@ func (lvs *ValueStore) GC(ctx context.Context, gcConfig chunks.GCConfig, oldGenR } err := func() error { - err := collector.BeginGC(lvs.gcAddChunk, gcConfig.Mode) + err := collector.BeginGC(ctx, lvs.gcAddChunk, gcConfig.Mode) if err != nil { return err } @@ -678,7 +678,7 @@ func (lvs *ValueStore) GC(ctx context.Context, gcConfig chunks.GCConfig, oldGenR newGenRefs.InsertAll(oldGenRefs) err := func() error { - err := collector.BeginGC(lvs.gcAddChunk, chunks.GCMode_Full) + err := collector.BeginGC(ctx, lvs.gcAddChunk, chunks.GCMode_Full) if err != nil { return err } diff --git a/integration-tests/bats/chunk-journal.bats b/integration-tests/bats/chunk-journal.bats index a3210a67c12..ed5bbf719ee 100644 --- a/integration-tests/bats/chunk-journal.bats +++ b/integration-tests/bats/chunk-journal.bats @@ -101,7 +101,7 @@ file_size() { local grown_size=$(file_size "$journal") - run dolt status + run dolt log [ "$status" -eq 1 ] [[ "$output" =~ "invalid journal record length: 5242881 exceeds max allowed size of 5242880" ]] || false diff --git a/integration-tests/bats/fsck.bats b/integration-tests/bats/fsck.bats index f4379800e44..b98af1842b7 100644 --- a/integration-tests/bats/fsck.bats +++ b/integration-tests/bats/fsck.bats @@ -108,7 +108,7 @@ UPDATE tbl SET guid = UUID() WHERE i >= @random_id LIMIT 1;" printf '\x00\x00\x00\x00' >> "$journal" cat $BATS_CWD/corrupt_dbs/journal_data.bin >> "$journal" - run dolt status + run dolt log [ "$status" -eq 1 ] [[ "$output" =~ "please run 'dolt fsck' to assess the damage and attempt repairs" ]] || false diff --git a/integration-tests/go-sql-server-driver/no_table_files_read_test.go b/integration-tests/go-sql-server-driver/no_table_files_read_test.go new file mode 100644 index 00000000000..bff901b9ae4 --- /dev/null +++ b/integration-tests/go-sql-server-driver/no_table_files_read_test.go @@ -0,0 +1,183 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" + driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" +) + +// These tests assert the behavior of an optimization where dolt CLI commands +// avoid loading potentially-large local database files when the command can +// instead be serviced by a running `dolt sql-server`. +// +// The hook for asserting that no table files are loaded is the +// DOLT_TEST_ASSERT_NO_TABLE_FILES_READ environment variable. When it is set, +// dolt panics if it ever bootstraps the chunk journal or opens a table file +// from disk. We use it to prove that, when a server is running, the CLI never +// touches the local database files. +// +// The test matrix has three top-level scenarios: +// +// 1. A dolt sql-server is running. The CLI is invoked with +// DOLT_TEST_ASSERT_NO_TABLE_FILES_READ=1 and must succeed, because it +// dispatches to the server without reading any table files. +// 2. No server is running, but a stale sql-server.info file is present. The +// CLI is invoked (executing locally) and must still succeed. +// 3. No server is running, but a concurrent `dolt sql` process holds the +// database lock. The CLI is invoked (executing locally); reads succeed and +// writes fail because they cannot acquire the write lock. +// +// In scenarios 2 and 3 the CLI must read the local database, so we do not set +// DOLT_TEST_ASSERT_NO_TABLE_FILES_READ for those invocations. + +// makePopulatedRepo creates a fresh repo store and an initialized repo named +// |name| with a committed table, some rows, and a remote configured. +func makePopulatedRepo(t *testing.T, u driver.DoltUser, name string) driver.Repo { + rs, err := u.MakeRepoStore() + require.NoError(t, err) + repo, err := rs.MakeRepo(name) + require.NoError(t, err) + err = repo.DoltExec("sql", "-q", "create table vals (id int primary key auto_increment, v int);"+ + "insert into vals (v) values (1), (2), (3);"+ + "call dolt_commit('-Am', 'initial data')") + require.NoError(t, err) + require.NoError(t, repo.CreateRemote("origin", "file://"+filepath.Join(repo.Dir, "remote"))) + return repo +} + +// withNoTableFilesRead sets DOLT_TEST_ASSERT_NO_TABLE_FILES_READ=1 on |cmd| so +// that the command panics if it loads any table files from disk. +func withNoTableFilesRead(cmd *exec.Cmd) *exec.Cmd { + cmd.Env = append(cmd.Env, dconfig.EnvAssertNoTableFilesRead+"=1") + return cmd +} + +func assertCmdSucceeds(t *testing.T, cmd *exec.Cmd) string { + out, err := cmd.CombinedOutput() + require.NoError(t, err, "expected command to succeed, output:\n%s", string(out)) + return string(out) +} + +func assertCmdFails(t *testing.T, cmd *exec.Cmd) string { + out, err := cmd.CombinedOutput() + require.Error(t, err, "expected command to fail, output:\n%s", string(out)) + return string(out) +} + +func TestNoTableFilesRead(t *testing.T) { + t.Parallel() + + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + + // Sanity check: with the assertion hook enabled and no server running, a + // CLI command which reads the database must fail. This proves the hook + // itself works and the success cases below are meaningful. + t.Run("AssertHookFailsWhenLoadingTableFiles", func(t *testing.T) { + repo := makePopulatedRepo(t, u, "assert_hook") + out := assertCmdFails(t, withNoTableFilesRead(repo.DoltCmd("log"))) + require.Regexp(t, "loading table files is disabled", out) + }) + + // Scenario 1: a dolt sql-server is running. All CLI invocations set the + // assertion hook and must succeed by dispatching to the server. + t.Run("WithRunningServer", func(t *testing.T) { + repo := makePopulatedRepo(t, u, "with_running_server") + + var ports DynamicResources + ports.global = &GlobalPorts + ports.t = t + RunServerUntilEndOfTest(t, repo, &driver.Server{ + Args: []string{"--port", `{{get_port "server"}}`}, + DynamicPort: "server", + }, &ports) + + t.Run("sql read", func(t *testing.T) { + out := assertCmdSucceeds(t, withNoTableFilesRead(repo.DoltCmd("sql", "-q", "select count(*) from vals"))) + require.Regexp(t, "3", out) + }) + t.Run("sql write", func(t *testing.T) { + assertCmdSucceeds(t, withNoTableFilesRead(repo.DoltCmd("sql", "-q", "insert into vals (v) values (10)"))) + }) + t.Run("remote -v", func(t *testing.T) { + out := assertCmdSucceeds(t, withNoTableFilesRead(repo.DoltCmd("remote", "-v"))) + require.Regexp(t, "origin", out) + }) + t.Run("log", func(t *testing.T) { + out := assertCmdSucceeds(t, withNoTableFilesRead(repo.DoltCmd("log"))) + require.Regexp(t, "initial data", out) + }) + }) + + // Scenario 2: no server running, but a stale sql-server.info file is + // present. The CLI executes locally and must still work. + t.Run("WithStaleInfoFile", func(t *testing.T) { + repo := makePopulatedRepo(t, u, "with_stale_info_file") + path := filepath.Join(repo.Dir, ".dolt/sql-server.info") + require.NoError(t, os.WriteFile(path, []byte("1:3306:this_is_not_a_real_secret"), 0600)) + + t.Run("sql read", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("sql", "-q", "select count(*) from vals")) + require.Regexp(t, "3", out) + }) + t.Run("sql write", func(t *testing.T) { + assertCmdSucceeds(t, repo.DoltCmd("sql", "-q", "insert into vals (v) values (20)")) + }) + t.Run("remote -v", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("remote", "-v")) + require.Regexp(t, "origin", out) + }) + t.Run("log", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("log")) + require.Regexp(t, "initial data", out) + }) + }) + + // Scenario 3: no server running, but a concurrent `dolt sql` process holds + // the database lock. The CLI executes locally: reads succeed and writes + // fail because they cannot acquire the exclusive write lock. + t.Run("WithConcurrentDoltSql", func(t *testing.T) { + repo := makePopulatedRepo(t, u, "with_concurrent_dolt_sql") + RunDoltSQLUntilEndOfTest(t, repo) + + t.Run("sql read", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("sql", "-q", "select count(*) from vals")) + require.Regexp(t, "3", out) + }) + t.Run("sql write fails", func(t *testing.T) { + out := assertCmdFails(t, repo.DoltCmd("sql", "-q", "insert into vals (v) values (30)")) + require.Regexp(t, "(?i)read only", out) + }) + t.Run("remote -v", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("remote", "-v")) + require.Regexp(t, "origin", out) + }) + t.Run("log", func(t *testing.T) { + out := assertCmdSucceeds(t, repo.DoltCmd("log")) + require.Regexp(t, "initial data", out) + }) + }) +}