diff --git a/src/dbnode/storage/repair.go b/src/dbnode/storage/repair.go index 3518f9ec2d..51b6a8dfd7 100644 --- a/src/dbnode/storage/repair.go +++ b/src/dbnode/storage/repair.go @@ -395,8 +395,9 @@ const ( ) type repairState struct { - LastAttempt time.Time - Status repairStatus + LastSuccessfulAttempt time.Time + LastAttempt time.Time + Status repairStatus } type namespaceRepairStateByTime map[xtime.UnixNano]repairState @@ -577,19 +578,33 @@ func (r *dbRepairer) Repair() error { repairRange.Start = repairRange.Start.Add(-blockSize) var ( - numUnrepairedBlocks = 0 - hasRepairedABlockStart = false - leastRecentlyRepairedBlockStart time.Time - leastRecentlyRepairedBlockStartLastRepairTime time.Time + numUnrepairedBlocks = 0 + hasRepairedABlockStart = false + leastRecentlyRepairedBlockStart time.Time + leastRecentlyRepairedBlockStartLastRepairTime time.Time + leastRecentlySuccessfullyRepairedBlockStartLastRepairTime = r.nowFn() ) + + // Record the earliest repaired time range and the earliet successfully repaired time range across all the + // time ranges for a namespace. repairRange.IterateBackward(blockSize, func(blockStart time.Time) bool { repairState, ok := r.repairStatesByNs.repairStates(n.ID(), blockStart) + if ok && (leastRecentlyRepairedBlockStart.IsZero() || repairState.LastAttempt.Before(leastRecentlyRepairedBlockStartLastRepairTime)) { leastRecentlyRepairedBlockStart = blockStart leastRecentlyRepairedBlockStartLastRepairTime = repairState.LastAttempt } + if repairState.LastSuccessfulAttempt.Before(leastRecentlySuccessfullyRepairedBlockStartLastRepairTime) { + leastRecentlySuccessfullyRepairedBlockStartLastRepairTime = repairState.LastSuccessfulAttempt + } + return true + }) + + repairRange.IterateBackward(blockSize, func(blockStart time.Time) bool { + repairState, ok := r.repairStatesByNs.repairStates(n.ID(), blockStart) + if ok && repairState.Status == repairSuccess { return true } @@ -623,6 +638,12 @@ func (r *dbRepairer) Repair() error { "namespace": n.ID().String(), }).Gauge("max-seconds-since-last-block-repair").Update(secondsSinceLastRepair) + secondsSinceLastSuccessfullRepair := + r.nowFn().Sub(leastRecentlySuccessfullyRepairedBlockStartLastRepairTime).Seconds() + r.scope.Tagged(map[string]string{ + "namespace": n.ID().String(), + }).Gauge("max-seconds-since-oldest-successful-block-repair").Update(secondsSinceLastSuccessfullRepair) + if hasRepairedABlockStart { // Previous loop performed a repair which means we've hit our limit of repairing // one block per namespace per call to Repair() so we can skip the logic below. @@ -681,6 +702,9 @@ func (r *dbRepairer) markRepairAttempt( repairState, _ := r.repairStatesByNs.repairStates(namespace, blockStart) repairState.Status = repairStatus repairState.LastAttempt = repairTime + if repairStatus == repairSuccess { + repairState.LastSuccessfulAttempt = repairTime + } r.repairStatesByNs.setRepairState(namespace, blockStart, repairState) } diff --git a/src/dbnode/storage/repair_test.go b/src/dbnode/storage/repair_test.go index e199b4ac85..706fd37ca9 100644 --- a/src/dbnode/storage/repair_test.go +++ b/src/dbnode/storage/repair_test.go @@ -718,8 +718,8 @@ func TestDatabaseRepairSkipsPoisonShard(t *testing.T) { flushTimeStart = retention.FlushTimeStart(rOpts, now) flushTimeEnd = retention.FlushTimeEnd(rOpts, now) - //flushTimeStartNano = xtime.ToUnixNano(flushTimeStart) - flushTimeEndNano = xtime.ToUnixNano(flushTimeEnd) + flushTimeStartNano = xtime.ToUnixNano(flushTimeStart) + flushTimeEndNano = xtime.ToUnixNano(flushTimeEnd) ) require.NoError(t, nsOpts.Validate()) // Ensure only two flushable blocks in retention to make test logic simpler. @@ -740,8 +740,14 @@ func TestDatabaseRepairSkipsPoisonShard(t *testing.T) { repairState: repairStatesByNs{ "ns2": namespaceRepairStateByTime{ flushTimeEndNano: repairState{ - Status: repairSuccess, - LastAttempt: time.Time{}, + Status: repairSuccess, + LastAttempt: time.Unix(1557745001, 0), + LastSuccessfullAttempt: time.Unix(1557745001, 0), + }, + flushTimeStartNano: repairState{ + Status: repairFailed, + LastAttempt: time.Unix(1557745002, 0), + LastSuccessfullAttempt: time.Unix(1557745000, 0), }, }, }, @@ -771,6 +777,9 @@ func TestDatabaseRepairSkipsPoisonShard(t *testing.T) { for _, tc := range testCases { t.Run(tc.title, func(t *testing.T) { opts := DefaultTestOptions().SetRepairOptions(testRepairOptions(ctrl)) + iopts := opts.InstrumentOptions() + scope := tally.NewTestScope("", nil) + opts = opts.SetInstrumentOptions(iopts.SetMetricsScope(scope)) mockDatabase := NewMockdatabase(ctrl) databaseRepairer, err := newDatabaseRepairer(mockDatabase, opts) @@ -818,6 +827,12 @@ func TestDatabaseRepairSkipsPoisonShard(t *testing.T) { mockDatabase.EXPECT().OwnedNamespaces().Return(namespaces, nil) require.NotNil(t, repairer.Repair()) + + gaugesSnapshot := scope.Snapshot().Gauges() + require.Equal(t, now.Sub(time.Time{}).Seconds(), + gaugesSnapshot[tally.KeyForPrefixedStringMap("repair.max-seconds-since-oldest-successful-block-repair", map[string]string{"namespace": "ns1"})].Value()) + require.Equal(t, now.Sub(time.Unix(1557745000, 0)).Seconds(), + gaugesSnapshot[tally.KeyForPrefixedStringMap("repair.max-seconds-since-oldest-successful-block-repair", map[string]string{"namespace": "ns2"})].Value()) }) } }