Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions go/cmd/dolt/commands/fsck.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ func (cmd FsckCmd) Exec(ctx context.Context, commandStr string, args []string, d
ddb, _, _, err := dbFact.CreateDbNoCache(ctx, types.Format_DOLT, u, params, func(vErr error) {
report.ScanErrs.AppendE(vErr)
})
if err == nil {
// Creating NomsBlockStore can lazily load the actual database resources. We force them here by performing
// an actual read against the database.
_, err = datas.ChunkStoreFromDatabase(ddb).Root(ctx)
}
if err != nil {
if errors.Is(err, nbs.ErrJournalDataLoss) {
cli.PrintErrln("WARNING: Chunk journal is corrupted and some data may be lost.")
Expand Down Expand Up @@ -491,11 +496,11 @@ type roundTripper struct {
}

func newRoundTripper(ctx context.Context, gs *nbs.GenerationalNBS, progress chan FsckProgressMessage, errs *Errs, fileErrCounts map[string]int) (*roundTripper, error) {
chunkCount, err := gs.OldGen().Count()
chunkCount, err := gs.OldGen().Count(ctx)
if err != nil {
return nil, err
}
chunkCount2, err := gs.NewGen().Count()
chunkCount2, err := gs.NewGen().Count(ctx)
if err != nil {
return nil, err
}
Expand All @@ -517,11 +522,11 @@ func newRoundTripper(ctx context.Context, gs *nbs.GenerationalNBS, progress chan
}

func (rt *roundTripper) scanAll(ctx context.Context) error {
rt.gs.TolerantIterateAllChunks(ctx, rt.roundTripAndCategorizeChunk, func(sourceFile string, err error) {
err := rt.gs.TolerantIterateAllChunks(ctx, rt.roundTripAndCategorizeChunk, func(sourceFile string, err error) {
rt.errs.AppendE(err)
rt.fileErrCounts[sourceFile]++
})
return ctx.Err()
return errors.Join(err, ctx.Err())
}

// roundTripAndCategorizeChunk verifies the chunk's hash matches its content, categorizes it by type. This method is
Expand Down
5 changes: 5 additions & 0 deletions go/libraries/doltcore/dconfig/envvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ const (
// Used for tests. If set, Dolt will error if it would rebuild a table's row data.
EnvAssertNoTableRewrite = "DOLT_TEST_ASSERT_NO_TABLE_REWRITE"
EnvAssertNoInMemoryArchiveIndex = "DOLT_TEST_ASSERT_NO_IN_MEMORY_ARCHIVE_INDEX"

// Used for tests. Make Dolt fail if it loads table file data
// from disk, such as bootstraping the journal file or loading
// a table file with tableFilePersister.Open.
EnvAssertNoTableFilesRead = "DOLT_TEST_ASSERT_NO_TABLE_FILES_READ"
)
19 changes: 15 additions & 4 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,7 +2035,15 @@ func pullHash(
destCS := datas.ChunkStoreFromDatabase(destDB)
waf := types.WalkAddrsForNBF(srcDB.Format(), skipHashes)

if datas.CanUsePuller(srcDB) && datas.CanUsePuller(destDB) {
srcCanUsePuller, err := datas.CanUsePuller(ctx, srcDB)
if err != nil {
return err
}
destCanUsePuller, err := datas.CanUsePuller(ctx, destDB)
if err != nil {
return err
}
if srcCanUsePuller && destCanUsePuller {
puller, err := pull.NewPuller(ctx, tempDir, defaultTargetFileSize, srcCS, destCS, waf, targetHashes, statsCh)
if err == pull.ErrDBUpToDate {
return nil
Expand Down Expand Up @@ -2107,15 +2115,15 @@ func (ddb *DoltDB) restoreDefaultConjoinBehavior() {
// NomsBlockStore instance which exposes its roots through
// IterateRoots. Otherwise returns |nil| without visiting any roots,
// including the current one.
func (ddb *DoltDB) IterateRoots(cb func(root string, timestamp *time.Time) error) error {
func (ddb *DoltDB) IterateRoots(ctx context.Context, cb func(root string, timestamp *time.Time) error) error {
cs := datas.ChunkStoreFromDatabase(ddb.db)

if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok {
cs = generationalNBS.NewGen()
}

if nbsStore, ok := cs.(*nbs.NomsBlockStore); ok {
return nbsStore.IterateRoots(cb)
return nbsStore.IterateRoots(ctx, cb)
} else {
return nil
}
Expand Down Expand Up @@ -2172,7 +2180,10 @@ func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) {
if err != nil {
return StoreSizes{}, err
}
journalSz, ok := newGenNBS.ChunkJournalSize()
journalSz, ok, err := newGenNBS.ChunkJournalSize(ctx)
if err != nil {
return StoreSizes{}, err
}
if ok {
return StoreSizes{
JournalBytes: uint64(journalSz),
Expand Down
67 changes: 35 additions & 32 deletions go/libraries/doltcore/env/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,45 +247,48 @@ func LoadDoltDB(ctx context.Context, dEnv *DoltEnv) {
dEnv.doltDB = ddb
dEnv.DBLoadError = dbLoadErr

if dbLoadErr == nil && dEnv.HasDoltDir() {
if !dEnv.HasDoltTempTableDir() {
tmpDir, err := dEnv.TempTableFilesDir()
if err != nil {
dEnv.DBLoadError = err
}
err = dEnv.FS.MkDirs(tmpDir)
dEnv.DBLoadError = err
} else {
// fire and forget cleanup routine. Will delete as many old temp files as it can during the main commands execution.
// The process will not wait for this to finish so this may not always complete.
go func() {
// TODO dEnv.HasDoltTempTableDir() true but dEnv.TempTableFileDir() panics
tmpTableDir, err := dEnv.FS.Abs(filepath.Join(dEnv.urlStr, dbfactory.DoltDir, tempTablesDir))
if ddb != nil && ddb.AccessMode() != chunks.ExclusiveAccessMode_ReadOnly {
// Only do the following when we have write access to the database.
if dbLoadErr == nil && dEnv.HasDoltDir() {
if !dEnv.HasDoltTempTableDir() {
tmpDir, err := dEnv.TempTableFilesDir()
if err != nil {
return
dEnv.DBLoadError = err
}
_ = dEnv.FS.Iter(tmpTableDir, true, func(path string, size int64, isDir bool) (stop bool) {
if !isDir {
lm, exists := dEnv.FS.LastModified(path)
err = dEnv.FS.MkDirs(tmpDir)
dEnv.DBLoadError = err
} else {
// fire and forget cleanup routine. Will delete as many old temp files as it can during the main commands execution.
// The process will not wait for this to finish so this may not always complete.
go func() {
// TODO dEnv.HasDoltTempTableDir() true but dEnv.TempTableFileDir() panics
tmpTableDir, err := dEnv.FS.Abs(filepath.Join(dEnv.urlStr, dbfactory.DoltDir, tempTablesDir))
if err != nil {
return
}
_ = dEnv.FS.Iter(tmpTableDir, true, func(path string, size int64, isDir bool) (stop bool) {
if !isDir {
lm, exists := dEnv.FS.LastModified(path)

if exists && time.Now().Sub(lm) > (time.Hour*24) {
_ = dEnv.FS.DeleteFile(path)
if exists && time.Now().Sub(lm) > (time.Hour*24) {
_ = dEnv.FS.DeleteFile(path)
}
}
}

return false
})
}()
return false
})
}()
}
}
}

if dEnv.RSLoadErr == nil && dbLoadErr == nil {
// If the working set isn't present in the DB, create it from the repo state. This step can be removed post 1.0.
_, err := dEnv.WorkingSet(ctx)
if errors.Is(err, doltdb.ErrWorkingSetNotFound) {
_ = dEnv.initWorkingSetFromRepoState(ctx)
} else if err != nil {
dEnv.RSLoadErr = err
if dEnv.RSLoadErr == nil && dbLoadErr == nil {
// If the working set isn't present in the DB, create it from the repo state. This step can be removed post 1.0.
_, err := dEnv.WorkingSet(ctx)
if errors.Is(err, doltdb.ErrWorkingSetNotFound) {
_ = dEnv.initWorkingSetFromRepoState(ctx)
} else if err != nil {
dEnv.RSLoadErr = err
}
}
}
})
Expand Down
1 change: 0 additions & 1 deletion go/libraries/doltcore/env/multi_repo_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ func multiEnvForConfigDirectoryEnv(ctx context.Context, config config.ReadWriteC
if dbErr != nil {
if errors.Is(dbErr, nbs.ErrJournalDataLoss) {
logrus.Errorf("failed to load database %s with error: %s", dbName, dbErr.Error())
logrus.Errorf("please run 'dolt fsck' to assess the damage and attempt repairs")
} else if !errors.Is(dbErr, doltdb.ErrMissingDoltDataDir) {
logrus.Warnf("failed to load database with error: %s", dbErr.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/remotesrv/dbcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type RemoteSrvStore interface {
chunks.ChunkStore
chunks.TableFileStore

Path() (string, bool)
Path(ctx context.Context) (string, bool, error)
GetChunkLocationsWithPaths(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]nbs.Range, error)
}

Expand Down
20 changes: 12 additions & 8 deletions go/libraries/doltcore/remotesrv/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,11 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh
return resp, nil
}

func (rs *RemoteChunkStore) getRelativeStorePath(cs RemoteSrvStore) (string, error) {
cspath, ok := cs.Path()
func (rs *RemoteChunkStore) getRelativeStorePath(ctx context.Context, cs RemoteSrvStore) (string, error) {
cspath, ok, err := cs.Path(ctx)
if err != nil {
return "", err
}
if !ok {
return "", status.Error(codes.Internal, "chunkstore misconfigured; cannot generate HTTP paths")
}
Expand Down Expand Up @@ -175,7 +178,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot

hashes, _ := remotestorage.ParseByteSlices(req.ChunkHashes)

prefix, err := rs.getRelativeStorePath(cs)
prefix, err := rs.getRelativeStorePath(ctx, cs)
if err != nil {
logger.WithError(err).Error("error getting file store path for chunk store")
return nil, err
Expand Down Expand Up @@ -280,7 +283,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
if err != nil {
return err
}
prefix, err = rs.getRelativeStorePath(cs)
prefix, err = rs.getRelativeStorePath(stream.Context(), cs)
if err != nil {
logger.WithError(err).Error("error getting file store path for chunk store")
return err
Expand Down Expand Up @@ -396,7 +399,7 @@ func (rs *RemoteChunkStore) StreamChunkLocations(stream remotesapi.ChunkStoreSer
if err != nil {
return err
}
prefix, err = rs.getRelativeStorePath(cs)
prefix, err = rs.getRelativeStorePath(stream.Context(), cs)
if err != nil {
logger.WithError(err).Error("error getting file store path for chunk store")
return err
Expand Down Expand Up @@ -765,13 +768,13 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.

md, _ := metadata.FromIncomingContext(ctx)

tableFileInfo, err := getTableFileInfo(logger, md, rs, tables, req, cs)
tableFileInfo, err := getTableFileInfo(ctx, logger, md, rs, tables, req, cs)
if err != nil {
logger.WithError(err).Error("error getting table file info")
return nil, err
}

appendixTableFileInfo, err := getTableFileInfo(logger, md, rs, appendixTables, req, cs)
appendixTableFileInfo, err := getTableFileInfo(ctx, logger, md, rs, appendixTables, req, cs)
if err != nil {
logger.WithError(err).Error("error getting appendix table file info")
return nil, err
Expand All @@ -792,14 +795,15 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.
}

func getTableFileInfo(
ctx context.Context,
logger *logrus.Entry,
md metadata.MD,
rs *RemoteChunkStore,
tableList []chunks.TableFile,
req *remotesapi.ListTableFilesRequest,
cs RemoteSrvStore,
) ([]*remotesapi.TableFileInfo, error) {
prefix, err := rs.getRelativeStorePath(cs)
prefix, err := rs.getRelativeStorePath(ctx, cs)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,13 +1082,13 @@ const (
chunkAggDistance = 8 * 1024
)

func (dcs *DoltChunkStore) SupportedOperations() chunks.TableFileStoreOps {
func (dcs *DoltChunkStore) SupportedOperations(_ context.Context) (chunks.TableFileStoreOps, error) {
return chunks.TableFileStoreOps{
CanRead: true,
CanWrite: true,
CanPrune: false,
CanGC: false,
}
}, nil
}

// WriteTableFile reads a table file from the provided reader and writes it to the chunk store.
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/sqle/dtablefunctions/dolt_reflog.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (rltf *ReflogTableFunction) RowIter(ctx *sql.Context, row sql.Row) (sql.Row

previousCommitsByRef := make(map[string]string)
rows := make([]sql.Row, 0)
err := ddb.IterateRoots(func(root string, timestamp *time.Time) error {
err := ddb.IterateRoots(ctx, func(root string, timestamp *time.Time) error {
hashof := hash.Parse(root)
datasets, err := ddb.DatasetsByRootHash(ctx, hashof)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ type ChunkStoreGarbageCollector interface {
//
// This function should not block indefinitely and should return an
// error if a GC is already in progress.
BeginGC(addChunk func(hash.Hash) bool, mode GCMode) error
BeginGC(ctx context.Context, addChunk func(hash.Hash) bool, mode GCMode) error

// EndGC indicates that the GC is over. The previously provided
// addChunk function must not be called after this function.
Expand All @@ -273,7 +273,7 @@ type ChunkStoreGarbageCollector interface {
MarkAndSweepChunks(ctx context.Context, getAddrs GetAddrs, filter HasManyFunc, dest ChunkStore, config GCConfig, incrementalUpdateManifest bool) (MarkAndSweeper, error)

// Count returns the number of chunks in the store.
Count() (uint32, error)
Count(ctx context.Context) (uint32, error)

// IterateAllChunks iterates over all chunks in the store, calling the provided callback for each chunk. This is
// a wrapper over the internal chunkSource.iterateAllChunks() method.
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (ms *MemoryStoreView) Commit(ctx context.Context, current, last hash.Hash)
return success, nil
}

func (ms *MemoryStoreView) BeginGC(keeper func(hash.Hash) bool, _ GCMode) error {
func (ms *MemoryStoreView) BeginGC(_ context.Context, keeper func(hash.Hash) bool, _ GCMode) error {
return ms.transitionToGC(keeper)
}

Expand Down Expand Up @@ -429,7 +429,7 @@ func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetA
}, nil
}

func (ms *MemoryStoreView) Count() (uint32, error) {
func (ms *MemoryStoreView) Count(_ context.Context) (uint32, error) {
return uint32(len(ms.pending)), nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/store/chunks/tablefilestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type TableFileStore interface {
Commit(ctx context.Context, current, last hash.Hash) (bool, error)

// SupportedOperations returns a description of the support TableFile operations. Some stores only support reading table files, not writing.
SupportedOperations() TableFileStoreOps
SupportedOperations(ctx context.Context) (TableFileStoreOps, error)
}

type TableFileSources struct {
Expand Down
6 changes: 3 additions & 3 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ func (s *TestStoreView) Put(ctx context.Context, c Chunk, getAddrs InsertAddrsCu
return s.ChunkStore.Put(ctx, c, getAddrs)
}

func (s *TestStoreView) BeginGC(keeper func(hash.Hash) bool, mode GCMode) error {
func (s *TestStoreView) BeginGC(ctx context.Context, keeper func(hash.Hash) bool, mode GCMode) error {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok {
return ErrUnsupportedOperation
}
return collector.BeginGC(keeper, mode)
return collector.BeginGC(ctx, keeper, mode)
}

func (s *TestStoreView) EndGC(mode GCMode) {
Expand All @@ -99,7 +99,7 @@ func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, getAddrs GetAddr
return collector.MarkAndSweepChunks(ctx, getAddrs, filter, collector, gcConfig, false)
}

func (s *TestStoreView) Count() (uint32, error) {
func (s *TestStoreView) Count(_ context.Context) (uint32, error) {
panic("currently unused")
}

Expand Down
11 changes: 7 additions & 4 deletions go/store/datas/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,16 @@ type GarbageCollector interface {

// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all
// Databases support this yet.
func CanUsePuller(db Database) bool {
func CanUsePuller(ctx context.Context, db Database) (bool, error) {
cs := db.chunkStore()
if tfs, ok := cs.(chunks.TableFileStore); ok {
ops := tfs.SupportedOperations()
return ops.CanRead && ops.CanWrite
ops, err := tfs.SupportedOperations(ctx)
if err != nil {
return false, err
}
return ops.CanRead && ops.CanWrite, nil
}
return false
return false, nil
}

func GetCSStatSummaryForDB(db Database) string {
Expand Down
Loading
Loading