Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/dbnode/storage/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,9 @@ const (
)

type repairState struct {
LastAttempt time.Time
Status repairStatus
LastSuccessfulAttempt time.Time
LastAttempt time.Time
Status repairStatus
}

type namespaceRepairStateByTime map[xtime.UnixNano]repairState
Expand Down Expand Up @@ -577,19 +578,33 @@ func (r *dbRepairer) Repair() error {
repairRange.Start = repairRange.Start.Add(-blockSize)

var (
numUnrepairedBlocks = 0
hasRepairedABlockStart = false
leastRecentlyRepairedBlockStart time.Time
leastRecentlyRepairedBlockStartLastRepairTime time.Time
numUnrepairedBlocks = 0
hasRepairedABlockStart = false
leastRecentlyRepairedBlockStart time.Time
leastRecentlyRepairedBlockStartLastRepairTime time.Time
leastRecentlySuccessfullyRepairedBlockStartLastRepairTime = r.nowFn()
)

// Record the earliest repaired time range and the earliet successfully repaired time range across all the
// time ranges for a namespace.
repairRange.IterateBackward(blockSize, func(blockStart time.Time) bool {
repairState, ok := r.repairStatesByNs.repairStates(n.ID(), blockStart)

if ok && (leastRecentlyRepairedBlockStart.IsZero() ||
repairState.LastAttempt.Before(leastRecentlyRepairedBlockStartLastRepairTime)) {
leastRecentlyRepairedBlockStart = blockStart
leastRecentlyRepairedBlockStartLastRepairTime = repairState.LastAttempt
}

if repairState.LastSuccessfulAttempt.Before(leastRecentlySuccessfullyRepairedBlockStartLastRepairTime) {
leastRecentlySuccessfullyRepairedBlockStartLastRepairTime = repairState.LastSuccessfulAttempt
}
return true
})

repairRange.IterateBackward(blockSize, func(blockStart time.Time) bool {
repairState, ok := r.repairStatesByNs.repairStates(n.ID(), blockStart)

if ok && repairState.Status == repairSuccess {
return true
}
Expand Down Expand Up @@ -623,6 +638,12 @@ func (r *dbRepairer) Repair() error {
"namespace": n.ID().String(),
}).Gauge("max-seconds-since-last-block-repair").Update(secondsSinceLastRepair)

secondsSinceLastSuccessfullRepair :=
r.nowFn().Sub(leastRecentlySuccessfullyRepairedBlockStartLastRepairTime).Seconds()
r.scope.Tagged(map[string]string{
"namespace": n.ID().String(),
}).Gauge("max-seconds-since-oldest-successful-block-repair").Update(secondsSinceLastSuccessfullRepair)

if hasRepairedABlockStart {
// Previous loop performed a repair which means we've hit our limit of repairing
// one block per namespace per call to Repair() so we can skip the logic below.
Expand Down Expand Up @@ -681,6 +702,9 @@ func (r *dbRepairer) markRepairAttempt(
repairState, _ := r.repairStatesByNs.repairStates(namespace, blockStart)
repairState.Status = repairStatus
repairState.LastAttempt = repairTime
if repairStatus == repairSuccess {
repairState.LastSuccessfulAttempt = repairTime
}
r.repairStatesByNs.setRepairState(namespace, blockStart, repairState)
}

Expand Down
23 changes: 19 additions & 4 deletions src/dbnode/storage/repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,8 @@ func TestDatabaseRepairSkipsPoisonShard(t *testing.T) {
flushTimeStart = retention.FlushTimeStart(rOpts, now)
flushTimeEnd = retention.FlushTimeEnd(rOpts, now)

//flushTimeStartNano = xtime.ToUnixNano(flushTimeStart)
flushTimeEndNano = xtime.ToUnixNano(flushTimeEnd)
flushTimeStartNano = xtime.ToUnixNano(flushTimeStart)
flushTimeEndNano = xtime.ToUnixNano(flushTimeEnd)
)
require.NoError(t, nsOpts.Validate())
// Ensure only two flushable blocks in retention to make test logic simpler.
Expand All @@ -740,8 +740,14 @@ func TestDatabaseRepairSkipsPoisonShard(t *testing.T) {
repairState: repairStatesByNs{
"ns2": namespaceRepairStateByTime{
flushTimeEndNano: repairState{
Status: repairSuccess,
LastAttempt: time.Time{},
Status: repairSuccess,
LastAttempt: time.Unix(1557745001, 0),
LastSuccessfullAttempt: time.Unix(1557745001, 0),
},
flushTimeStartNano: repairState{
Status: repairFailed,
LastAttempt: time.Unix(1557745002, 0),
LastSuccessfullAttempt: time.Unix(1557745000, 0),
},
},
},
Expand Down Expand Up @@ -771,6 +777,9 @@ func TestDatabaseRepairSkipsPoisonShard(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.title, func(t *testing.T) {
opts := DefaultTestOptions().SetRepairOptions(testRepairOptions(ctrl))
iopts := opts.InstrumentOptions()
scope := tally.NewTestScope("", nil)
opts = opts.SetInstrumentOptions(iopts.SetMetricsScope(scope))
mockDatabase := NewMockdatabase(ctrl)

databaseRepairer, err := newDatabaseRepairer(mockDatabase, opts)
Expand Down Expand Up @@ -818,6 +827,12 @@ func TestDatabaseRepairSkipsPoisonShard(t *testing.T) {
mockDatabase.EXPECT().OwnedNamespaces().Return(namespaces, nil)

require.NotNil(t, repairer.Repair())

gaugesSnapshot := scope.Snapshot().Gauges()
require.Equal(t, now.Sub(time.Time{}).Seconds(),
gaugesSnapshot[tally.KeyForPrefixedStringMap("repair.max-seconds-since-oldest-successful-block-repair", map[string]string{"namespace": "ns1"})].Value())
require.Equal(t, now.Sub(time.Unix(1557745000, 0)).Seconds(),
gaugesSnapshot[tally.KeyForPrefixedStringMap("repair.max-seconds-since-oldest-successful-block-repair", map[string]string{"namespace": "ns2"})].Value())
})
}
}