From c46051ef22e7351effe3dec780b651751087d4a6 Mon Sep 17 00:00:00 2001 From: iskettaneh <173953022+iskettaneh@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:46:30 -0400 Subject: [PATCH 1/3] kvstorage: add truncateAppliedNodes and handle gaps in WAG indices This commit adds a helper function "truncateAppliedNodes" which allows deleting multiple WAG nodes starting from an index. Also, this commit allows the truncator to handle gaps. The idea is that at startup, seq will be initialized to the index of the largest WAG index that was persisted before the last restart. We can expect to have gaps in the indices before that. However, we don't handle gaps in the indices after startup. If there is a gap, we won't jump over it and will wait for it to be persisted. Release note: None Co-Authored-By: roachdev-claude --- pkg/kv/kvserver/kvstorage/wag/store.go | 5 + pkg/kv/kvserver/kvstorage/wag_truncator.go | 91 ++++++++------- .../kvserver/kvstorage/wag_truncator_test.go | 105 ++++++++---------- 3 files changed, 102 insertions(+), 99 deletions(-) diff --git a/pkg/kv/kvserver/kvstorage/wag/store.go b/pkg/kv/kvserver/kvstorage/wag/store.go index 13d135c942d5..e42725c76947 100644 --- a/pkg/kv/kvserver/kvstorage/wag/store.go +++ b/pkg/kv/kvserver/kvstorage/wag/store.go @@ -65,6 +65,11 @@ func (s *Seq) Next() uint64 { return s.index.Add(1) } +// Load returns the last used WAG sequence number. +func (s *Seq) Load() uint64 { + return s.index.Load() +} + // Write puts the WAG node under the specific sequence number into the given // writer. The index must have been allocated to the caller by the sequencer. func Write(w storage.Writer, index uint64, node wagpb.Node) error { diff --git a/pkg/kv/kvserver/kvstorage/wag_truncator.go b/pkg/kv/kvserver/kvstorage/wag_truncator.go index 2c7c1270372a..ddcb4fa199a0 100644 --- a/pkg/kv/kvserver/kvstorage/wag_truncator.go +++ b/pkg/kv/kvserver/kvstorage/wag_truncator.go @@ -33,52 +33,69 @@ import ( type WAGTruncator struct { st *cluster.Settings eng Engines + seq *wag.Seq + // lastWAGIndexBeforeStartup is the last WAG node index that existed before + // startup. Truncating WAG nodes with indices <= to this index can ignore + // gaps. + lastWAGIndexBeforeStartup uint64 } // NewWAGTruncator creates a WAGTruncator. -func NewWAGTruncator(st *cluster.Settings, eng Engines) *WAGTruncator { - return &WAGTruncator{st: st, eng: eng} +func NewWAGTruncator(st *cluster.Settings, eng Engines, seq *wag.Seq) *WAGTruncator { + return &WAGTruncator{st: st, eng: eng, seq: seq, lastWAGIndexBeforeStartup: seq.Load()} } -// TruncateAll truncates all applied WAG nodes. It's meant to be used at engine -// startup right after we replay the WAG nodes and sync the state engine. -func (t *WAGTruncator) TruncateAll(ctx context.Context) error { +// truncateAppliedNodes is a helper function that repeatedly tries to delete a +// WAG node in a batch and commit that batch. It iterates from startIndex and +// attempts to delete the WAG nodes it sees. It stops when it encounters a WAG +// node that cannot be deleted, or when it encounters an error. +// +// Returns the index of the last successfully truncated node or 0 when no nodes +// were truncated. It also returns an error if any occurred during truncation. +func (t *WAGTruncator) truncateAppliedNodes( + ctx context.Context, startIndex uint64, +) (uint64, error) { stateReader := t.eng.StateEngine().NewReader(storage.GuaranteedDurability) defer stateReader.Close() + nextIndex := startIndex + var lastTruncated uint64 for { if err := ctx.Err(); err != nil { - return err + return lastTruncated, err } b := t.eng.LogEngine().NewWriteBatch() - truncated, err := t.truncateAppliedWAGNodeAndClearRaftState( - ctx, Raft{RO: t.eng.LogEngine(), WO: b}, stateReader, 0, /* index */ + truncatedIdx, err := t.truncateAppliedWAGNodeAndClearRaftState( + ctx, Raft{RO: t.eng.LogEngine(), WO: b}, stateReader, nextIndex, ) - if err == nil && truncated { + if err == nil && truncatedIdx != 0 { err = b.Commit(false /* sync */) } b.Close() if err != nil { - return err + return lastTruncated, err } - if !truncated { - break + if truncatedIdx == 0 { + return lastTruncated, nil } + // At this point we know that the last truncation succeeded and the batch + // was committed, and we can move on to the next node. + lastTruncated = truncatedIdx + nextIndex = lastTruncated + 1 } - return nil } // truncateAppliedWAGNodeAndClearRaftState deletes a WAG node if all of its // events have been applied to the state engine. For nodes containing // EventDestroy or EventSubsume events, it also clears the corresponding raft // log prefix from the engine and the sideloaded entries storage. -// If truncateIndex is 0, the function deletes the first WAG node regardless of -// its index. Otherwise, it only deletes the node matching truncateIndex if it -// exists. // -// Returns a boolean indicating whether a node was successfully truncated or -// not. If the return value is false, it means that either there are no WAG -// nodes left, or that the WAG node has not been applied to the state engine. -// Also, an error is returned if the WAG node could not be fetched or deleted. +// It iterates from truncateIndex and attempts to delete the first WAG node it +// sees. +// +// Returns the following: +// - the index of the last WAG node that was deleted, or 0 if no nodes were +// deleted. +// - an error if there was an error deleting the WAG node. // // The caller must provide a stateRO reader with GuaranteedDurability so that // only state confirmed flushed to persistent storage is visible. This ensures @@ -88,35 +105,27 @@ func (t *WAGTruncator) TruncateAll(ctx context.Context) error { // TODO(ibrahim): Support deleting multiple WAG nodes within the same batch. func (t *WAGTruncator) truncateAppliedWAGNodeAndClearRaftState( ctx context.Context, raft Raft, stateRO StateRO, truncateIndex uint64, -) (bool, error) { +) (uint64, error) { var iter wag.Iterator - var iterStartKey roachpb.Key - if truncateIndex == 0 { - // Delete the first WAG node that exists, regardless of its index. - iterStartKey = keys.StoreWAGPrefix() - } else { - // Only delete the WAG node with the expected index. - iterStartKey = keys.StoreWAGNodeKey(truncateIndex) - } - + iterStartKey := keys.StoreWAGNodeKey(truncateIndex) for index, node := range iter.IterFrom(ctx, raft.RO, iterStartKey) { - if truncateIndex != 0 && truncateIndex != index { - return false, nil + if index > t.lastWAGIndexBeforeStartup && truncateIndex != index { + // We cannot ignore gaps for WAG indices > lastWAGIndexBeforeStartup. + return 0, nil } - // TODO(ibrahim): Right now, the canApplyWAGNode function returns a list of // raftCatchUpTargets that are not needed for the purposes of truncation, // consider refactoring the function to return only the needed info. - replayAction, err := canApplyWAGNode(ctx, node, stateRO) + action, err := canApplyWAGNode(ctx, node, stateRO) if err != nil { - return false, err + return 0, err } - if replayAction.apply { + if action.apply { // If an event needs to be applied, the WAG node cannot be deleted yet. - return false, nil + return 0, nil } if err := wag.Delete(raft.WO, index); err != nil { - return false, err + return 0, err } // Clean up the raft log prefix of a destroyed/subsumed replica. @@ -125,12 +134,12 @@ func (t *WAGTruncator) truncateAppliedWAGNodeAndClearRaftState( continue } if err := t.clearReplicaRaftLogAndSideloaded(ctx, raft, event.Addr.RangeID, event.Addr.Index); err != nil { - return false, err + return 0, err } } - return true, nil + return index, nil } - return false, iter.Error() + return 0, iter.Error() } // clearReplicaRaftLogAndSideloaded clears raft log entries at or below the given index for diff --git a/pkg/kv/kvserver/kvstorage/wag_truncator_test.go b/pkg/kv/kvserver/kvstorage/wag_truncator_test.go index 18cc284030c2..c9ae311c2499 100644 --- a/pkg/kv/kvserver/kvstorage/wag_truncator_test.go +++ b/pkg/kv/kvserver/kvstorage/wag_truncator_test.go @@ -103,7 +103,9 @@ func (e *testEngines) listWAGNodes(t *testing.T) []uint64 { return indices } -func TestTruncateApplied(t *testing.T) { +// TestTruncateAppliedOnly verifies that we only truncate WAG nodes that are +// durably applied. +func TestTruncateAppliedOnly(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() @@ -240,20 +242,20 @@ func TestTruncateApplied(t *testing.T) { t.Run("", func(t *testing.T) { e := makeTestEngines() defer e.Close() - truncator := NewWAGTruncator(st, e.Engines) tc.setup(t, &e) + truncator := NewWAGTruncator(st, e.Engines, &e.seq) require.NoError(t, e.stateEngine.Flush()) - require.NoError(t, truncator.TruncateAll(ctx)) + _, err := truncator.truncateAppliedNodes(ctx, 0 /* startIndex */) + require.NoError(t, err) require.Equal(t, tc.wantWAGIndices, e.listWAGNodes(t)) }) } } -// TestTruncateAndClearRaftState verifies that -// truncateAppliedWAGNodeAndClearRaftState only clears raft log entries and -// sideloaded files up to the destroyed/subsumed replica's last index. Entries -// and files beyond that index may belong to a newer replica and must be -// preserved. +// TestTruncateAndClearRaftState verifies that WAG truncation only clears raft +// log entries and sideloaded files up to the destroyed/subsumed replica's last +// index. Entries and files beyond that index may belong to a newer replica and +// must be preserved. func TestTruncateAndClearRaftState(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -268,7 +270,7 @@ func TestTruncateAndClearRaftState(t *testing.T) { t.Run(eventType.String(), func(t *testing.T) { e := makeTestEngines() defer e.Close() - truncator := NewWAGTruncator(st, e.Engines) + truncator := NewWAGTruncator(st, e.Engines, &e.seq) // Write WAG nodes: init then destroy/subsume at index 20. e.writeWAGNode(t, wagpb.Event{ @@ -277,7 +279,6 @@ func TestTruncateAndClearRaftState(t *testing.T) { e.writeWAGNode(t, wagpb.Event{ Addr: wagpb.MakeAddr(r1, 20), Type: eventType, }) - // Create a WAG node for a newer replica for the same range. e.writeWAGNode(t, wagpb.Event{ Addr: wagpb.MakeAddr(r2, 0), Type: wagpb.EventCreate, @@ -303,7 +304,8 @@ func TestTruncateAndClearRaftState(t *testing.T) { require.NoError(t, ss.Put(ctx, idx, 1 /* term */, []byte("sst-data"))) } require.NoError(t, e.stateEngine.Flush()) - require.NoError(t, truncator.TruncateAll(ctx)) + _, err := truncator.truncateAppliedNodes(ctx, 1 /* startIndex */) + require.NoError(t, err) // Raft entries <= 20 belong to the old replica and must be deleted. The // rest shouldn't be deleted by the WAG truncator. require.Equal(t, @@ -328,53 +330,49 @@ func TestTruncateAndClearRaftState(t *testing.T) { } } -// TestTruncateGapHandling verifies that truncateAppliedWAGNodeAndClearRaftState -// handles gaps in WAG node indices correctly based on expectedIndex. When -// expectedIndex is 0, the first node is deleted regardless of its index. When -// non-zero, only the node at that exact index is deleted. -// -// The test sets up three WAG nodes with gaps between them: -// [Index: 2] -> [Index: 4] -> [Index: 6] -func TestTruncateGapHandling(t *testing.T) { +// TestTruncateAppliedNodes exercises truncateAppliedNodes() across different +// combinations of startIndex, and lastIndexBeforeStartup. The test sets up WAG +// nodes at indices [2, 4, 5, 6]. Node 6 isn't ready for truncation yet. +func TestTruncateAppliedNodes(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1} - sl := MakeStateLoader(1 /* rangeID */) + sl := MakeStateLoader(r1.RangeID) - for _, calls := range [][]struct { - index uint64 - wantTruncated bool - wantWAGIndices []uint64 + for _, tc := range []struct { + startIndex uint64 + lastIndexBeforeStartup uint64 + wantLastTruncated uint64 + wantRemaining []uint64 }{ { - // index=0 removes the first WAG node regardless of its index. - {index: 0, wantTruncated: true, wantWAGIndices: []uint64{4, 6}}, - {index: 0, wantTruncated: true, wantWAGIndices: []uint64{6}}, - {index: 0, wantTruncated: true, wantWAGIndices: nil}, + // We cannot ignore gaps after lastIndexBeforeStartup. + startIndex: 0, lastIndexBeforeStartup: 2, wantLastTruncated: 2, wantRemaining: []uint64{4, 5, 6}, + }, + { + // We cannot delete an unapplied node. + startIndex: 0, lastIndexBeforeStartup: 4, wantLastTruncated: 5, wantRemaining: []uint64{6}, + }, + { + startIndex: 3, lastIndexBeforeStartup: 2, wantLastTruncated: 0, wantRemaining: []uint64{2, 4, 5, 6}, }, { - // A non-existent index is a no-op. - {index: 1, wantTruncated: false, wantWAGIndices: []uint64{2, 4, 6}}, - {index: 3, wantTruncated: false, wantWAGIndices: []uint64{2, 4, 6}}, - {index: 5, wantTruncated: false, wantWAGIndices: []uint64{2, 4, 6}}, - {index: 7, wantTruncated: false, wantWAGIndices: []uint64{2, 4, 6}}, + startIndex: 3, lastIndexBeforeStartup: 4, wantLastTruncated: 5, wantRemaining: []uint64{2, 6}, }, { - // In theory, we can remove a WAG node at an index that is not the first. - {index: 4, wantTruncated: true, wantWAGIndices: []uint64{2, 6}}, - {index: 6, wantTruncated: true, wantWAGIndices: []uint64{2}}, - {index: 2, wantTruncated: true, wantWAGIndices: nil}, + startIndex: 7, lastIndexBeforeStartup: 0, wantLastTruncated: 0, wantRemaining: []uint64{2, 4, 5, 6}, + }, + { + startIndex: 7, lastIndexBeforeStartup: 6, wantLastTruncated: 0, wantRemaining: []uint64{2, 4, 5, 6}, }, } { t.Run("", func(t *testing.T) { e := makeTestEngines() defer e.Close() - truncator := NewWAGTruncator(st, e.Engines) - - // Write WAG nodes at indices 2, 4, 6. + // Write WAG nodes at indices 2, 4, 5. e.seq.Next() e.writeWAGNode(t, wagpb.Event{ Addr: wagpb.MakeAddr(r1, 0), Type: wagpb.EventCreate, @@ -383,32 +381,23 @@ func TestTruncateGapHandling(t *testing.T) { e.writeWAGNode(t, wagpb.Event{ Addr: wagpb.MakeAddr(r1, 15), Type: wagpb.EventInit, }) - e.seq.Next() e.writeWAGNode(t, wagpb.Event{ Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply, }) - - // Set applied state so all WAG nodes are considered applied. + e.writeWAGNode(t, wagpb.Event{ + Addr: wagpb.MakeAddr(r1, 25), Type: wagpb.EventApply, + }) + truncator := NewWAGTruncator(st, e.Engines, &e.seq) + truncator.lastWAGIndexBeforeStartup = tc.lastIndexBeforeStartup require.NoError(t, sl.SetRaftReplicaID(ctx, e.StateEngine(), r1.ReplicaID)) require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(), &kvserverpb.RangeAppliedState{RaftAppliedIndex: 20})) require.NoError(t, e.stateEngine.Flush()) - for _, c := range calls { - stateReader := e.StateEngine().NewReader(storage.GuaranteedDurability) - b := e.LogEngine().NewWriteBatch() - truncated, err := truncator.truncateAppliedWAGNodeAndClearRaftState( - ctx, Raft{RO: e.LogEngine(), WO: b}, stateReader, c.index, - ) - require.NoError(t, err) - require.Equal(t, c.wantTruncated, truncated) - if truncated { - require.NoError(t, b.Commit(false /* sync */)) - } - b.Close() - stateReader.Close() - require.Equal(t, c.wantWAGIndices, e.listWAGNodes(t)) - } + lastTruncated, err := truncator.truncateAppliedNodes(ctx, tc.startIndex) + require.NoError(t, err) + require.Equal(t, tc.wantLastTruncated, lastTruncated) + require.Equal(t, tc.wantRemaining, e.listWAGNodes(t)) }) } } From 3df9eea3ae2026970178099a6905f5b638855877 Mon Sep 17 00:00:00 2001 From: iskettaneh <173953022+iskettaneh@users.noreply.github.com> Date: Wed, 15 Apr 2026 14:37:15 -0400 Subject: [PATCH 2/3] kvstorage: Introduce WAGTruncator Testing Knobs This commit introduces TestingKnobs to the WAGTruncator. Release note: None Co-Authored-By: roachdev-claude --- pkg/kv/kvserver/kvstorage/wag_truncator.go | 16 ++++++++++++++-- pkg/kv/kvserver/kvstorage/wag_truncator_test.go | 6 +++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvserver/kvstorage/wag_truncator.go b/pkg/kv/kvserver/kvstorage/wag_truncator.go index ddcb4fa199a0..1f8916d89d13 100644 --- a/pkg/kv/kvserver/kvstorage/wag_truncator.go +++ b/pkg/kv/kvserver/kvstorage/wag_truncator.go @@ -21,6 +21,9 @@ import ( "golang.org/x/time/rate" ) +// WAGTruncatorTestingKnobs contains testing knobs for the WAGTruncator. +type WAGTruncatorTestingKnobs struct{} + // WAGTruncator truncates applied WAG nodes and clears their associated raft // state (log entries and sideloaded files). It supports both offline and online // mode of operation: @@ -38,11 +41,20 @@ type WAGTruncator struct { // startup. Truncating WAG nodes with indices <= to this index can ignore // gaps. lastWAGIndexBeforeStartup uint64 + knobs WAGTruncatorTestingKnobs } // NewWAGTruncator creates a WAGTruncator. -func NewWAGTruncator(st *cluster.Settings, eng Engines, seq *wag.Seq) *WAGTruncator { - return &WAGTruncator{st: st, eng: eng, seq: seq, lastWAGIndexBeforeStartup: seq.Load()} +func NewWAGTruncator( + st *cluster.Settings, knobs WAGTruncatorTestingKnobs, eng Engines, seq *wag.Seq, +) *WAGTruncator { + return &WAGTruncator{ + st: st, + knobs: knobs, + eng: eng, + seq: seq, + lastWAGIndexBeforeStartup: seq.Load(), + } } // truncateAppliedNodes is a helper function that repeatedly tries to delete a diff --git a/pkg/kv/kvserver/kvstorage/wag_truncator_test.go b/pkg/kv/kvserver/kvstorage/wag_truncator_test.go index c9ae311c2499..c0e290ca6a7b 100644 --- a/pkg/kv/kvserver/kvstorage/wag_truncator_test.go +++ b/pkg/kv/kvserver/kvstorage/wag_truncator_test.go @@ -243,7 +243,7 @@ func TestTruncateAppliedOnly(t *testing.T) { e := makeTestEngines() defer e.Close() tc.setup(t, &e) - truncator := NewWAGTruncator(st, e.Engines, &e.seq) + truncator := NewWAGTruncator(st, WAGTruncatorTestingKnobs{}, e.Engines, &e.seq) require.NoError(t, e.stateEngine.Flush()) _, err := truncator.truncateAppliedNodes(ctx, 0 /* startIndex */) require.NoError(t, err) @@ -270,7 +270,7 @@ func TestTruncateAndClearRaftState(t *testing.T) { t.Run(eventType.String(), func(t *testing.T) { e := makeTestEngines() defer e.Close() - truncator := NewWAGTruncator(st, e.Engines, &e.seq) + truncator := NewWAGTruncator(st, WAGTruncatorTestingKnobs{}, e.Engines, &e.seq) // Write WAG nodes: init then destroy/subsume at index 20. e.writeWAGNode(t, wagpb.Event{ @@ -387,7 +387,7 @@ func TestTruncateAppliedNodes(t *testing.T) { e.writeWAGNode(t, wagpb.Event{ Addr: wagpb.MakeAddr(r1, 25), Type: wagpb.EventApply, }) - truncator := NewWAGTruncator(st, e.Engines, &e.seq) + truncator := NewWAGTruncator(st, WAGTruncatorTestingKnobs{}, e.Engines, &e.seq) truncator.lastWAGIndexBeforeStartup = tc.lastIndexBeforeStartup require.NoError(t, sl.SetRaftReplicaID(ctx, e.StateEngine(), r1.ReplicaID)) require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(), From cd45efbeb220b4bea8c7b0fc6f662b83714968f6 Mon Sep 17 00:00:00 2001 From: iskettaneh <173953022+iskettaneh@users.noreply.github.com> Date: Fri, 10 Apr 2026 15:59:21 -0400 Subject: [PATCH 3/3] kvstorage: add live WAG truncation and merge it with offline truncation This commit adds the ability to perform live WAG truncation. It keeps track of the last successfully truncated WAG node index, and only attempts to perform a round of WAG truncation if (1) The state engine flushes, and (2) There is a higher WAG sequence number than the last one we truncated. Also, this commit gets rid of offline truncation and merges it with online truncation. It does this by tracking a new field `lastWAGIndexBeforeStartup` that marks the last WAG index at engine startup. We can remove WAG nodes before this index while ignoring gaps, and nodes after this index without ignoring gaps. Release note: None Co-Authored-By: roachdev-claude --- pkg/kv/kvserver/kvstorage/BUILD.bazel | 1 + pkg/kv/kvserver/kvstorage/wag_truncator.go | 97 +++++++++++++--- .../kvserver/kvstorage/wag_truncator_test.go | 109 ++++++++++++++---- pkg/storage/engine.go | 3 +- 4 files changed, 175 insertions(+), 35 deletions(-) diff --git a/pkg/kv/kvserver/kvstorage/BUILD.bazel b/pkg/kv/kvserver/kvstorage/BUILD.bazel index 6c829149bb11..0eafd2cd4245 100644 --- a/pkg/kv/kvserver/kvstorage/BUILD.bazel +++ b/pkg/kv/kvserver/kvstorage/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "//pkg/util/iterutil", "//pkg/util/log", "//pkg/util/protoutil", + "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@org_golang_x_time//rate", diff --git a/pkg/kv/kvserver/kvstorage/wag_truncator.go b/pkg/kv/kvserver/kvstorage/wag_truncator.go index 1f8916d89d13..7e35e33fc403 100644 --- a/pkg/kv/kvserver/kvstorage/wag_truncator.go +++ b/pkg/kv/kvserver/kvstorage/wag_truncator.go @@ -8,6 +8,7 @@ package kvstorage import ( "context" "math" + "sync/atomic" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -17,31 +18,37 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" "golang.org/x/time/rate" ) // WAGTruncatorTestingKnobs contains testing knobs for the WAGTruncator. -type WAGTruncatorTestingKnobs struct{} +type WAGTruncatorTestingKnobs struct { + // AfterTruncationCallback, if set, is called after each invocation of + // truncateAppliedNodesLive completes. Can be used to synchronize tests + // with the background truncation goroutine. + AfterTruncationCallback func() +} // WAGTruncator truncates applied WAG nodes and clears their associated raft -// state (log entries and sideloaded files). It supports both offline and online -// mode of operation: -// - [offline] during the store startup, truncate the WAG after it is -// replayed and made durable in the StateEngine. -// - [online] during the normal operation, truncate the WAG nodes that were -// durably applied to the state machine. -// -// TODO(ibrahim): Add the periodic truncation logic. +// state (log entries and sideloaded files). type WAGTruncator struct { - st *cluster.Settings - eng Engines - seq *wag.Seq + st *cluster.Settings + knobs WAGTruncatorTestingKnobs + eng Engines + seq *wag.Seq // lastWAGIndexBeforeStartup is the last WAG node index that existed before // startup. Truncating WAG nodes with indices <= to this index can ignore // gaps. lastWAGIndexBeforeStartup uint64 - knobs WAGTruncatorTestingKnobs + // lastTruncatedWAGIndex is the index of the last WAG node that was + // successfully truncated. This is to quickly seek into the potential WAG + // nodes to truncate. + lastTruncatedWAGIndex atomic.Uint64 + // wakeCh is signaled when there are potential WAG nodes to truncate. + wakeCh chan struct{} } // NewWAGTruncator creates a WAGTruncator. @@ -54,6 +61,69 @@ func NewWAGTruncator( eng: eng, seq: seq, lastWAGIndexBeforeStartup: seq.Load(), + wakeCh: make(chan struct{}, 1), + } +} + +// Start launches the background goroutine that performs WAG truncation. +// TODO(ibrahim): Add a setting for keeping a suffix of the WAG for debugging. +// For example, setting a maximum number of WAG nodes to retain for debugging +// purposes. We could also pair it with some time threshold after which all WAG +// nodes are automatically truncated to maintain a manageable size. +func (t *WAGTruncator) Start(ctx context.Context, stopper *stop.Stopper) error { + return stopper.RunAsyncTask(ctx, "wag-truncation", func(ctx context.Context) { + ctx, cancel := stopper.WithCancelOnQuiesce(ctx) + defer cancel() + for { + select { + case <-t.wakeCh: + startIndex := t.lastTruncatedWAGIndex.Load() + 1 + if _, err := t.truncateAppliedNodes(ctx, startIndex); err != nil { + log.KvExec.Errorf(ctx, "truncating WAG node: %+v", err) + } + if t.knobs.AfterTruncationCallback != nil { + t.knobs.AfterTruncationCallback() + } + case <-ctx.Done(): + return + } + } + }) +} + +// DurabilityAdvancedCallback is invoked whenever the state engine completes a +// flush. It checks whether there could possibly be WAG nodes to truncate by +// comparing lastTruncatedWAGIndex against seq.Load(). If there are potential +// truncation opportunities, it sends a non-blocking signal to wake the +// background goroutine. It must return quickly and must not call into the +// engine to avoid deadlock (see storage.Engine.RegisterFlushCompletedCallback). +func (t *WAGTruncator) DurabilityAdvancedCallback() { + // The key point is the point in time where we call "t.seq.Load()". If the + // sequence number is greater than the last truncated WAG sequence number, we + // will attempt to truncate if there are any WAG nodes that are ready for + // truncation. If it found no WAG nodes that are ready to be truncated, it + // will attempt to truncate on every state engine flush until + // sequence number == lastTruncatedWAGIndex. + // + // Note that we need to read lastTruncatedWAGIndex before seq. If we didn't + // do that, we could get a seq reading of x. Then this goroutine gets + // descheduled for some time, while this goroutine isn't running, seq could be + // incremented to x+1, and the live truncation might update + // lastTruncatedWAGIndex to x+1, and then when this goroutine gets scheduled + // again, it will read lastTruncatedWAGIndex as x+1 and seq as x + // and the assertion below might fail. + lastTruncated := t.lastTruncatedWAGIndex.Load() + seq := t.seq.Load() + if seq == lastTruncated { + return + } + if seq < lastTruncated { + log.KvExec.Fatalf(context.Background(), + "WAG seq %d < lastTruncatedWAGIndex %d", seq, lastTruncated) + } + select { + case t.wakeCh <- struct{}{}: + default: } } @@ -92,6 +162,7 @@ func (t *WAGTruncator) truncateAppliedNodes( // At this point we know that the last truncation succeeded and the batch // was committed, and we can move on to the next node. lastTruncated = truncatedIdx + t.lastTruncatedWAGIndex.Store(lastTruncated) nextIndex = lastTruncated + 1 } } diff --git a/pkg/kv/kvserver/kvstorage/wag_truncator_test.go b/pkg/kv/kvserver/kvstorage/wag_truncator_test.go index c0e290ca6a7b..5e00ef25ec6f 100644 --- a/pkg/kv/kvserver/kvstorage/wag_truncator_test.go +++ b/pkg/kv/kvserver/kvstorage/wag_truncator_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/stretchr/testify/require" "golang.org/x/time/rate" ) @@ -273,16 +274,10 @@ func TestTruncateAndClearRaftState(t *testing.T) { truncator := NewWAGTruncator(st, WAGTruncatorTestingKnobs{}, e.Engines, &e.seq) // Write WAG nodes: init then destroy/subsume at index 20. - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 10), Type: wagpb.EventInit, - }) - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 20), Type: eventType, - }) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 10), Type: wagpb.EventInit}) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 20), Type: eventType}) // Create a WAG node for a newer replica for the same range. - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r2, 0), Type: wagpb.EventCreate, - }) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r2, 0), Type: wagpb.EventCreate}) // Tombstone confirms destruction/subsumption. require.NoError(t, sl.SetRangeTombstone(ctx, e.StateEngine(), @@ -374,19 +369,11 @@ func TestTruncateAppliedNodes(t *testing.T) { defer e.Close() // Write WAG nodes at indices 2, 4, 5. e.seq.Next() - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 0), Type: wagpb.EventCreate, - }) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 0), Type: wagpb.EventCreate}) e.seq.Next() - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 15), Type: wagpb.EventInit, - }) - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply, - }) - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 25), Type: wagpb.EventApply, - }) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 15), Type: wagpb.EventInit}) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply}) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 25), Type: wagpb.EventApply}) truncator := NewWAGTruncator(st, WAGTruncatorTestingKnobs{}, e.Engines, &e.seq) truncator.lastWAGIndexBeforeStartup = tc.lastIndexBeforeStartup require.NoError(t, sl.SetRaftReplicaID(ctx, e.StateEngine(), r1.ReplicaID)) @@ -401,3 +388,83 @@ func TestTruncateAppliedNodes(t *testing.T) { }) } } + +// TestWAGTruncatorBackground verifies that the WAGTruncator background +// goroutine only truncates WAG nodes when both conditions are met: (1) the +// state engine has flushed, and (2) there are WAG nodes that are eligible for +// truncation. +func TestWAGTruncatorBackground(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + e := makeTestEngines() + defer e.Close() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1} + sl := MakeStateLoader(r1.RangeID) + + // Initialize replica state so events can be considered applied. + require.NoError(t, sl.SetRaftReplicaID(ctx, e.StateEngine(), r1.ReplicaID)) + require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(), + &kvserverpb.RangeAppliedState{RaftAppliedIndex: 100})) + + // Write two WAG nodes whose events are applied with a gap in between them. + // This is to simulate some WAG nodes at engine startup. + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 10), Type: wagpb.EventInit}) + e.seq.Next() + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply}) + + // Start the periodic WAG truncation background task with a knob that + // signals when truncation completes, so the test can synchronize + // deterministically instead of polling. + truncationDone := make(chan struct{}, 1) + truncator := NewWAGTruncator(st, WAGTruncatorTestingKnobs{ + AfterTruncationCallback: func() { + truncationDone <- struct{}{} + }, + }, e.Engines, &e.seq, + ) + // Start the periodic WAG truncation background task. + require.NoError(t, truncator.Start(ctx, stopper)) + // Create a WAG node after startup. + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 30), Type: wagpb.EventApply}) + require.Equal(t, []uint64{1, 3, 4}, e.listWAGNodes(t)) + + flushAndWaitForTruncation := func() { + require.NoError(t, e.StateEngine().Flush()) + truncator.DurabilityAdvancedCallback() + <-truncationDone + } + // We expect all WAG nodes to be truncated when the state engine is flushed. + flushAndWaitForTruncation() + require.Equal(t, ([]uint64)(nil), e.listWAGNodes(t)) + require.Equal(t, uint64(4), truncator.lastTruncatedWAGIndex.Load()) + + // Write two WAG nodes whose events are applied (index <= 100). + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 40), Type: wagpb.EventApply}) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 50), Type: wagpb.EventApply}) + require.Equal(t, []uint64{5, 6}, e.listWAGNodes(t)) + + // Now flush the state engine and signal again. Both nodes should be + // truncated since their events are applied. + flushAndWaitForTruncation() + require.Equal(t, ([]uint64)(nil), e.listWAGNodes(t)) + require.Equal(t, uint64(6), truncator.lastTruncatedWAGIndex.Load()) + + // Write another WAG node but it is NOT applied yet. + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 200), Type: wagpb.EventApply}) + flushAndWaitForTruncation() + // Node 7 should remain because its event isn't applied yet. + require.Equal(t, []uint64{7}, e.listWAGNodes(t)) + require.Equal(t, uint64(6), truncator.lastTruncatedWAGIndex.Load()) + + // Advance the applied index past 200 and flush. Now node 7 should be + // truncated. + require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(), + &kvserverpb.RangeAppliedState{RaftAppliedIndex: 200})) + flushAndWaitForTruncation() + require.Equal(t, ([]uint64)(nil), e.listWAGNodes(t)) + require.Equal(t, uint64(7), truncator.lastTruncatedWAGIndex.Load()) +} diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index e7c09bf06b2f..06c522fe2a83 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -911,7 +911,8 @@ type DurabilityRequirement int8 const ( // StandardDurability is what should normally be used. StandardDurability DurabilityRequirement = iota - // GuaranteedDurability is an advanced option (only for raftLogTruncator). + // GuaranteedDurability is an advanced option (only for raftLogTruncator + // and WAGTruncator). GuaranteedDurability )