Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//pkg/util/iterutil",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@org_golang_x_time//rate",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/kvstorage/wag/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (s *Seq) Next() uint64 {
return s.index.Add(1)
}

// Load returns the last used WAG sequence number.
func (s *Seq) Load() uint64 {
return s.index.Load()
}

// Write puts the WAG node under the specific sequence number into the given
// writer. The index must have been allocated to the caller by the sequencer.
func Write(w storage.Writer, index uint64, node wagpb.Node) error {
Expand Down
194 changes: 143 additions & 51 deletions pkg/kv/kvserver/kvstorage/wag_truncator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package kvstorage
import (
"context"
"math"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand All @@ -17,68 +18,167 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"golang.org/x/time/rate"
)

// WAGTruncatorTestingKnobs contains testing knobs for the WAGTruncator.
type WAGTruncatorTestingKnobs struct {
// AfterTruncationCallback, if set, is called after each invocation of
// truncateAppliedNodesLive completes. Can be used to synchronize tests
// with the background truncation goroutine.
AfterTruncationCallback func()
}

// WAGTruncator truncates applied WAG nodes and clears their associated raft
// state (log entries and sideloaded files). It supports both offline and online
// mode of operation:
// - [offline] during the store startup, truncate the WAG after it is
// replayed and made durable in the StateEngine.
// - [online] during the normal operation, truncate the WAG nodes that were
// durably applied to the state machine.
//
// TODO(ibrahim): Add the periodic truncation logic.
// state (log entries and sideloaded files).
type WAGTruncator struct {
st *cluster.Settings
eng Engines
st *cluster.Settings
knobs WAGTruncatorTestingKnobs
eng Engines
seq *wag.Seq
// lastWAGIndexBeforeStartup is the last WAG node index that existed before
// startup. Truncating WAG nodes with indices <= to this index can ignore
// gaps.
lastWAGIndexBeforeStartup uint64
// lastTruncatedWAGIndex is the index of the last WAG node that was
// successfully truncated. This is to quickly seek into the potential WAG
// nodes to truncate.
lastTruncatedWAGIndex atomic.Uint64
// wakeCh is signaled when there are potential WAG nodes to truncate.
wakeCh chan struct{}
}

// NewWAGTruncator creates a WAGTruncator.
func NewWAGTruncator(st *cluster.Settings, eng Engines) *WAGTruncator {
return &WAGTruncator{st: st, eng: eng}
func NewWAGTruncator(
st *cluster.Settings, knobs WAGTruncatorTestingKnobs, eng Engines, seq *wag.Seq,
) *WAGTruncator {
return &WAGTruncator{
st: st,
knobs: knobs,
eng: eng,
seq: seq,
lastWAGIndexBeforeStartup: seq.Load(),
wakeCh: make(chan struct{}, 1),
}
}

// Start launches the background goroutine that performs WAG truncation.
// TODO(ibrahim): Add a setting for keeping a suffix of the WAG for debugging.
// For example, setting a maximum number of WAG nodes to retain for debugging
// purposes. We could also pair it with some time threshold after which all WAG
// nodes are automatically truncated to maintain a manageable size.
func (t *WAGTruncator) Start(ctx context.Context, stopper *stop.Stopper) error {
return stopper.RunAsyncTask(ctx, "wag-truncation", func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()
for {
select {
case <-t.wakeCh:
startIndex := t.lastTruncatedWAGIndex.Load() + 1
if _, err := t.truncateAppliedNodes(ctx, startIndex); err != nil {
log.KvExec.Errorf(ctx, "truncating WAG node: %+v", err)
}
if t.knobs.AfterTruncationCallback != nil {
t.knobs.AfterTruncationCallback()
}
case <-ctx.Done():
return
}
}
})
}

// DurabilityAdvancedCallback is invoked whenever the state engine completes a
// flush. It checks whether there could possibly be WAG nodes to truncate by
// comparing lastTruncatedWAGIndex against seq.Load(). If there are potential
// truncation opportunities, it sends a non-blocking signal to wake the
// background goroutine. It must return quickly and must not call into the
// engine to avoid deadlock (see storage.Engine.RegisterFlushCompletedCallback).
func (t *WAGTruncator) DurabilityAdvancedCallback() {
// The key point is the point in time where we call "t.seq.Load()". If the
// sequence number is greater than the last truncated WAG sequence number, we
// will attempt to truncate if there are any WAG nodes that are ready for
// truncation. If it found no WAG nodes that are ready to be truncated, it
// will attempt to truncate on every state engine flush until
// sequence number == lastTruncatedWAGIndex.
//
// Note that we need to read lastTruncatedWAGIndex before seq. If we didn't
// do that, we could get a seq reading of x. Then this goroutine gets
// descheduled for some time, while this goroutine isn't running, seq could be
// incremented to x+1, and the live truncation might update
// lastTruncatedWAGIndex to x+1, and then when this goroutine gets scheduled
// again, it will read lastTruncatedWAGIndex as x+1 and seq as x
// and the assertion below might fail.
lastTruncated := t.lastTruncatedWAGIndex.Load()
seq := t.seq.Load()
if seq == lastTruncated {
return
}
if seq < lastTruncated {
log.KvExec.Fatalf(context.Background(),
"WAG seq %d < lastTruncatedWAGIndex %d", seq, lastTruncated)
}
select {
case t.wakeCh <- struct{}{}:
default:
}
}

// TruncateAll truncates all applied WAG nodes. It's meant to be used at engine
// startup right after we replay the WAG nodes and sync the state engine.
func (t *WAGTruncator) TruncateAll(ctx context.Context) error {
// truncateAppliedNodes is a helper function that repeatedly tries to delete a
// WAG node in a batch and commit that batch. It iterates from startIndex and
// attempts to delete the WAG nodes it sees. It stops when it encounters a WAG
// node that cannot be deleted, or when it encounters an error.
//
// Returns the index of the last successfully truncated node or 0 when no nodes
// were truncated. It also returns an error if any occurred during truncation.
func (t *WAGTruncator) truncateAppliedNodes(
ctx context.Context, startIndex uint64,
) (uint64, error) {
stateReader := t.eng.StateEngine().NewReader(storage.GuaranteedDurability)
defer stateReader.Close()
nextIndex := startIndex
var lastTruncated uint64
for {
if err := ctx.Err(); err != nil {
return err
return lastTruncated, err
}
b := t.eng.LogEngine().NewWriteBatch()
truncated, err := t.truncateAppliedWAGNodeAndClearRaftState(
ctx, Raft{RO: t.eng.LogEngine(), WO: b}, stateReader, 0, /* index */
truncatedIdx, err := t.truncateAppliedWAGNodeAndClearRaftState(
ctx, Raft{RO: t.eng.LogEngine(), WO: b}, stateReader, nextIndex,
)
if err == nil && truncated {
if err == nil && truncatedIdx != 0 {
err = b.Commit(false /* sync */)
}
b.Close()
if err != nil {
return err
return lastTruncated, err
}
if !truncated {
break
if truncatedIdx == 0 {
return lastTruncated, nil
}
// At this point we know that the last truncation succeeded and the batch
// was committed, and we can move on to the next node.
lastTruncated = truncatedIdx
t.lastTruncatedWAGIndex.Store(lastTruncated)
nextIndex = lastTruncated + 1
}
return nil
}

// truncateAppliedWAGNodeAndClearRaftState deletes a WAG node if all of its
// events have been applied to the state engine. For nodes containing
// EventDestroy or EventSubsume events, it also clears the corresponding raft
// log prefix from the engine and the sideloaded entries storage.
// If truncateIndex is 0, the function deletes the first WAG node regardless of
// its index. Otherwise, it only deletes the node matching truncateIndex if it
// exists.
//
// Returns a boolean indicating whether a node was successfully truncated or
// not. If the return value is false, it means that either there are no WAG
// nodes left, or that the WAG node has not been applied to the state engine.
// Also, an error is returned if the WAG node could not be fetched or deleted.
// It iterates from truncateIndex and attempts to delete the first WAG node it
// sees.
//
// Returns the following:
// - the index of the last WAG node that was deleted, or 0 if no nodes were
// deleted.
// - an error if there was an error deleting the WAG node.
//
// The caller must provide a stateRO reader with GuaranteedDurability so that
// only state confirmed flushed to persistent storage is visible. This ensures
Expand All @@ -88,35 +188,27 @@ func (t *WAGTruncator) TruncateAll(ctx context.Context) error {
// TODO(ibrahim): Support deleting multiple WAG nodes within the same batch.
func (t *WAGTruncator) truncateAppliedWAGNodeAndClearRaftState(
ctx context.Context, raft Raft, stateRO StateRO, truncateIndex uint64,
) (bool, error) {
) (uint64, error) {
var iter wag.Iterator
var iterStartKey roachpb.Key
if truncateIndex == 0 {
// Delete the first WAG node that exists, regardless of its index.
iterStartKey = keys.StoreWAGPrefix()
} else {
// Only delete the WAG node with the expected index.
iterStartKey = keys.StoreWAGNodeKey(truncateIndex)
}

iterStartKey := keys.StoreWAGNodeKey(truncateIndex)
for index, node := range iter.IterFrom(ctx, raft.RO, iterStartKey) {
if truncateIndex != 0 && truncateIndex != index {
return false, nil
if index > t.lastWAGIndexBeforeStartup && truncateIndex != index {
// We cannot ignore gaps for WAG indices > lastWAGIndexBeforeStartup.
return 0, nil
}

// TODO(ibrahim): Right now, the canApplyWAGNode function returns a list of
// raftCatchUpTargets that are not needed for the purposes of truncation,
// consider refactoring the function to return only the needed info.
replayAction, err := canApplyWAGNode(ctx, node, stateRO)
action, err := canApplyWAGNode(ctx, node, stateRO)
if err != nil {
return false, err
return 0, err
}
if replayAction.apply {
if action.apply {
// If an event needs to be applied, the WAG node cannot be deleted yet.
return false, nil
return 0, nil
}
if err := wag.Delete(raft.WO, index); err != nil {
return false, err
return 0, err
}

// Clean up the raft log prefix of a destroyed/subsumed replica.
Expand All @@ -125,12 +217,12 @@ func (t *WAGTruncator) truncateAppliedWAGNodeAndClearRaftState(
continue
}
if err := t.clearReplicaRaftLogAndSideloaded(ctx, raft, event.Addr.RangeID, event.Addr.Index); err != nil {
return false, err
return 0, err
}
}
return true, nil
return index, nil
}
return false, iter.Error()
return 0, iter.Error()
}

// clearReplicaRaftLogAndSideloaded clears raft log entries at or below the given index for
Expand Down
Loading
Loading