Skip to content

Commit 2eb19ed

Browse files
committed
Bootstrap snapshots in commitlog.
1 parent 9b817fa commit 2eb19ed

2 files changed

Lines changed: 211 additions & 13 deletions

File tree

src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go

Lines changed: 210 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,14 @@ import (
3434
"github.com/m3db/m3/src/dbnode/persist/fs"
3535
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
3636
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
37+
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
3738
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
3839
"github.com/m3db/m3/src/dbnode/storage/series"
3940
"github.com/m3db/m3/src/dbnode/topology"
4041
"github.com/m3db/m3/src/dbnode/tracepoint"
4142
"github.com/m3db/m3/src/dbnode/ts"
43+
"github.com/m3db/m3/src/m3ninx/index/segment"
44+
idxpersist "github.com/m3db/m3/src/m3ninx/persist"
4245
"github.com/m3db/m3/src/x/checked"
4346
"github.com/m3db/m3/src/x/context"
4447
"github.com/m3db/m3/src/x/ident"
@@ -57,7 +60,9 @@ const (
5760
type newIteratorFn func(opts commitlog.IteratorOpts) (
5861
iter commitlog.Iterator, corruptFiles []commitlog.ErrorWithPath, err error)
5962
type snapshotFilesFn func(filePathPrefix string, namespace ident.ID, shard uint32) (fs.FileSetFilesSlice, error)
60-
type newReaderFn func(bytesPool pool.CheckedBytesPool, opts fs.Options) (fs.DataFileSetReader, error)
63+
type indexSnapshotFilesFn func(filePathPrefix string, namespace ident.ID) (fs.FileSetFilesSlice, error)
64+
type newDataReaderFn func(bytesPool pool.CheckedBytesPool, opts fs.Options) (fs.DataFileSetReader, error)
65+
type readIndexSegmentsFn func(opts fs.ReadIndexSegmentsOptions) ([]segment.Segment, error)
6166

6267
type commitLogSource struct {
6368
opts Options
@@ -67,9 +72,11 @@ type commitLogSource struct {
6772
// Filesystem inspection capture before node was started.
6873
inspection fs.Inspection
6974

70-
newIteratorFn newIteratorFn
71-
snapshotFilesFn snapshotFilesFn
72-
newReaderFn newReaderFn
75+
newIteratorFn newIteratorFn
76+
snapshotFilesFn snapshotFilesFn
77+
indexSnapshotFilesFn indexSnapshotFilesFn
78+
newDataReaderFn newDataReaderFn
79+
readIndexSegmentsFn readIndexSegmentsFn
7380

7481
metrics commitLogSourceMetrics
7582
}
@@ -135,9 +142,11 @@ func newCommitLogSource(
135142

136143
inspection: inspection,
137144

138-
newIteratorFn: commitlog.NewIterator,
139-
snapshotFilesFn: fs.SnapshotFiles,
140-
newReaderFn: fs.NewReader,
145+
newIteratorFn: commitlog.NewIterator,
146+
snapshotFilesFn: fs.SnapshotFiles,
147+
indexSnapshotFilesFn: fs.IndexSnapshotFiles,
148+
newDataReaderFn: fs.NewReader,
149+
readIndexSegmentsFn: fs.ReadIndexSegments,
141150

142151
metrics: newCommitLogSourceMetrics(scope),
143152
}
@@ -183,6 +192,7 @@ func (s *commitLogSource) Read(
183192
filePathPrefix = fsOpts.FilePathPrefix()
184193
namespaceIter = namespaces.Namespaces.Iter()
185194
namespaceResults = make(map[string]*readNamespaceResult, len(namespaceIter))
195+
indexResult = result.NewIndexBootstrapResult()
186196
setInitialTopologyState bool
187197
initialTopologyState *topology.StateSnapshot
188198
)
@@ -240,6 +250,28 @@ func (s *commitLogSource) Read(
240250
return bootstrap.NamespaceResults{}, err
241251
}
242252
}
253+
254+
// Read index snapshot files
255+
indexSnapshotFiles, err := s.indexSnapshotFilesFn(
256+
filePathPrefix,
257+
ns.Metadata.ID(),
258+
)
259+
if err != nil {
260+
return bootstrap.NamespaceResults{}, err
261+
}
262+
263+
// Get latest index snapshot per block start
264+
mostRecentIndexSnapshotsByBlock, err := s.mostRecentIndexSnapshotsByBlock(
265+
ns.Metadata, shardTimeRanges, indexSnapshotFiles)
266+
if err != nil {
267+
return bootstrap.NamespaceResults{}, err
268+
}
269+
if err := s.bootstrapIndexSnapshots(
270+
indexResult,
271+
mostRecentIndexSnapshotsByBlock,
272+
); err != nil {
273+
return bootstrap.NamespaceResults{}, err
274+
}
243275
}
244276

245277
s.log.Info("read snapshots done",
@@ -531,12 +563,12 @@ func (s *commitLogSource) Read(
531563
shardTimeRanges := ns.namespace.DataRunOptions.ShardTimeRanges
532564
dataResult = shardTimeRanges.ToUnfulfilledDataResult()
533565
}
534-
var indexResult result.IndexBootstrapResult
535566
if ns.namespace.Metadata.Options().IndexOptions().Enabled() {
536-
indexResult = result.NewIndexBootstrapResult()
537567
if shouldReturnUnfulfilled {
538-
shardTimeRanges := ns.namespace.IndexRunOptions.ShardTimeRanges
539-
indexResult = shardTimeRanges.ToUnfulfilledIndexResult()
568+
// TODO(bodu): F/u on shouldReturnUnfulfilled.
569+
//shardTimeRanges := ns.namespace.IndexRunOptions.ShardTimeRanges
570+
//shardTimeRanges.ToUnfulfilledIndexResult()
571+
//indexResult.SetUnfulfilled()
540572
}
541573
}
542574
bootstrapResult.Results.Set(id, bootstrap.NamespaceResult{
@@ -654,6 +686,75 @@ func (s *commitLogSource) mostRecentCompleteSnapshotByBlockShard(
654686
return mostRecentSnapshotsByBlockShard
655687
}
656688

689+
// mostRecentCompleteSnapshotByBlock returns a
690+
// map[xtime.UnixNano]fs.FileSetFile with the contract that
691+
// for each block in shardsTimeRanges, an entry will
692+
// exist in the map such that FileSetFile.CachedSnapshotTime is the
693+
// actual cached snapshot time, or the blockStart.
694+
func (s *commitLogSource) mostRecentCompleteIndexSnapshotByBlock(
695+
shardsTimeRanges result.ShardTimeRanges,
696+
blockSize time.Duration,
697+
indexSnapshotFiles fs.FileSetFilesSlice,
698+
fsOpts fs.Options,
699+
) map[xtime.UnixNano]fs.FileSetFile {
700+
var (
701+
minBlock, maxBlock = shardsTimeRanges.MinMax()
702+
mostRecentIndexSnapshotsByBlock = map[xtime.UnixNano]fs.FileSetFile{}
703+
)
704+
705+
for currBlockStart := minBlock.Truncate(blockSize); currBlockStart.Before(maxBlock); currBlockStart = currBlockStart.Add(blockSize) {
706+
// Anonymous func for easier clean up using defer.
707+
func() {
708+
var (
709+
currBlockUnixNanos = xtime.ToUnixNano(currBlockStart)
710+
mostRecentSnapshot fs.FileSetFile
711+
)
712+
713+
defer func() {
714+
if mostRecentSnapshot.IsZero() {
715+
// If we were unable to determine the most recent snapshot time for a given
716+
// blockStart, then just fall back to using the blockStart time as that will
717+
// force us to read the entire commit log for that duration.
718+
mostRecentSnapshot.CachedSnapshotTime = currBlockStart
719+
}
720+
mostRecentIndexSnapshotsByBlock[currBlockUnixNanos] = mostRecentSnapshot
721+
}()
722+
723+
mostRecentSnapshotVolume, ok := indexSnapshotFiles.LatestVolumeForBlock(currBlockStart)
724+
if !ok {
725+
// If there are no complete snapshot files for this block, then rely on
726+
// the defer to fallback to using the block start time.
727+
return
728+
}
729+
730+
// Make sure we're able to read the snapshot time. This will also set the
731+
// CachedSnapshotTime field so that we can rely upon it from here on out.
732+
_, _, err := mostRecentSnapshotVolume.SnapshotTimeAndID()
733+
if err != nil {
734+
namespace := mostRecentSnapshot.ID.Namespace
735+
if namespace == nil {
736+
namespace = ident.StringID("<nil>")
737+
}
738+
s.log.With(
739+
zap.Stringer("namespace", namespace),
740+
zap.Time("blockStart", mostRecentSnapshot.ID.BlockStart),
741+
zap.Int("index", mostRecentSnapshot.ID.VolumeIndex),
742+
zap.Strings("filepaths", mostRecentSnapshot.AbsoluteFilePaths),
743+
zap.Error(err),
744+
).Error("error resolving snapshot time for index snapshot file")
745+
746+
// If we couldn't determine the snapshot time for the snapshot file, then rely
747+
// on the defer to fallback to using the block start time.
748+
return
749+
}
750+
751+
mostRecentSnapshot = mostRecentSnapshotVolume
752+
}()
753+
}
754+
755+
return mostRecentIndexSnapshotsByBlock
756+
}
757+
657758
func (s *commitLogSource) bootstrapShardSnapshots(
658759
ns namespace.Metadata,
659760
accumulator bootstrap.NamespaceDataAccumulator,
@@ -752,7 +853,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot(
752853
)
753854

754855
// Bootstrap the snapshot file.
755-
reader, err := s.newReaderFn(bytesPool, fsOpts)
856+
reader, err := s.newDataReaderFn(bytesPool, fsOpts)
756857
if err != nil {
757858
return err
758859
}
@@ -832,6 +933,75 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot(
832933
return nil
833934
}
834935

936+
func (s *commitLogSource) bootstrapIndexSnapshots(
937+
indexResult result.IndexBootstrapResult,
938+
mostRecentIndexSnapshotsByBlock map[xtime.UnixNano]fs.FileSetFile,
939+
) error {
940+
for blockStart, mostRecentIndexSnapshots := range mostRecentIndexSnapshotsByBlock {
941+
if mostRecentIndexSnapshots.CachedSnapshotTime.Equal(blockStart.ToTime()) ||
942+
// Should never happen
943+
mostRecentIndexSnapshots.IsZero() {
944+
// There is no snapshot file for this time, and even if there was, there would
945+
// be no point in reading it. In this specific case its not an error scenario
946+
// because the fact that snapshotTime == blockStart means we already accounted
947+
// for the fact that this snapshot did not exist when we were deciding which
948+
// commit logs to read.
949+
s.log.Debug("no index snapshots for blockStart",
950+
zap.Time("blockStart", blockStart.ToTime()))
951+
continue
952+
}
953+
954+
// TODO(bodu): Read in index info files to determine index volume type and shards for
955+
// shard time ranges fulfilled.
956+
if err := s.bootstrapIndexBlockSnapshot(
957+
indexResult,
958+
blockStart.ToTime(),
959+
mostRecentIndexSnapshots,
960+
); err != nil {
961+
return err
962+
}
963+
}
964+
965+
return nil
966+
}
967+
968+
func (s *commitLogSource) bootstrapIndexBlockSnapshot(
969+
indexResult result.IndexBootstrapResult,
970+
blockStart time.Time,
971+
mostRecentIndexSnapshot fs.FileSetFile,
972+
) error {
973+
var (
974+
fsOpts = s.opts.CommitLogOptions().FilesystemOptions()
975+
)
976+
977+
s.log.Debug("reading index snapshot segments",
978+
zap.Time("blockStart", blockStart),
979+
zap.Int("volume", mostRecentIndexSnapshot.ID.VolumeIndex))
980+
981+
// Bootstrap the index snapshot file.
982+
segments, err := s.readIndexSegmentsFn(fs.ReadIndexSegmentsOptions{
983+
ReaderOptions: fs.IndexReaderOpenOptions{
984+
Identifier: mostRecentIndexSnapshot.ID,
985+
FileSetType: persist.FileSetSnapshotType,
986+
},
987+
FilesystemOptions: fsOpts,
988+
})
989+
if err != nil {
990+
return err
991+
}
992+
snapshottedSegments := make([]segment.Segment, 0, len(segments))
993+
for _, segment := range segments {
994+
// Snapshotted segments have not been persisted to disk yet.
995+
snapshottedSegments = append(snapshottedSegments, bootstrapper.NewSegment(segment, false))
996+
}
997+
indexBlockByVolumeType := result.NewIndexBlockByVolumeType(blockStart)
998+
// TODO(bodu): Use index info to calculate shard time ranges and get index volume type.
999+
// Need to use CachedSnapshotTime as end ts so we know from when to start reading commit logs to fulfill the rest of the range.
1000+
indexBlockByVolumeType.SetBlock(idxpersist.DefaultIndexVolumeType, result.NewIndexBlock(snapshottedSegments, nil))
1001+
indexResult.Add(indexBlockByVolumeType, nil)
1002+
return nil
1003+
}
1004+
8351005
func (s *commitLogSource) mostRecentSnapshotByBlockShard(
8361006
ns namespace.Metadata,
8371007
shardsTimeRanges result.ShardTimeRanges,
@@ -864,6 +1034,34 @@ func (s *commitLogSource) mostRecentSnapshotByBlockShard(
8641034
return mostRecentCompleteSnapshotByBlockShard, nil
8651035
}
8661036

1037+
func (s *commitLogSource) mostRecentIndexSnapshotsByBlock(
1038+
ns namespace.Metadata,
1039+
shardsTimeRanges result.ShardTimeRanges,
1040+
indexSnapshotFiles fs.FileSetFilesSlice,
1041+
) (
1042+
map[xtime.UnixNano]fs.FileSetFile,
1043+
error,
1044+
) {
1045+
blockSize := ns.Options().RetentionOptions().BlockSize()
1046+
1047+
mostRecentCompleteIndexSnapshotByBlock := s.mostRecentCompleteIndexSnapshotByBlock(
1048+
shardsTimeRanges, blockSize, indexSnapshotFiles, s.opts.CommitLogOptions().FilesystemOptions())
1049+
for block, mostRecent := range mostRecentCompleteIndexSnapshotByBlock {
1050+
if mostRecent.CachedSnapshotTime.IsZero() {
1051+
// Should never happen.
1052+
return nil, instrument.InvariantErrorf(
1053+
"block: %s had zero value for most recent index snapshot time",
1054+
block.ToTime().String())
1055+
}
1056+
1057+
s.log.Debug("most recent index snapshot for block",
1058+
zap.Time("blockStart", block.ToTime()),
1059+
zap.Time("mostRecent", mostRecent.CachedSnapshotTime))
1060+
}
1061+
1062+
return mostRecentCompleteIndexSnapshotByBlock, nil
1063+
}
1064+
8671065
// TODO(rartoul): Refactor this to take the SnapshotMetadata files into account to reduce
8681066
// the number of commitlog files that need to be read.
8691067
func (s *commitLogSource) readCommitLogFilePredicate(f commitlog.FileFilterInfo) bool {

src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ func testItMergesSnapshotsAndCommitLogs(t *testing.T, opts Options,
391391
)
392392
mockReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), io.EOF)
393393

394-
src.newReaderFn = func(
394+
src.newDataReaderFn = func(
395395
bytesPool pool.CheckedBytesPool,
396396
opts fs.Options,
397397
) (fs.DataFileSetReader, error) {

0 commit comments

Comments
 (0)