diff --git a/pkg/kv/kvserver/kvstorage/BUILD.bazel b/pkg/kv/kvserver/kvstorage/BUILD.bazel index 6c829149bb11..0eafd2cd4245 100644 --- a/pkg/kv/kvserver/kvstorage/BUILD.bazel +++ b/pkg/kv/kvserver/kvstorage/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "//pkg/util/iterutil", "//pkg/util/log", "//pkg/util/protoutil", + "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@org_golang_x_time//rate", diff --git a/pkg/kv/kvserver/kvstorage/wag/store.go b/pkg/kv/kvserver/kvstorage/wag/store.go index 13d135c942d5..e42725c76947 100644 --- a/pkg/kv/kvserver/kvstorage/wag/store.go +++ b/pkg/kv/kvserver/kvstorage/wag/store.go @@ -65,6 +65,11 @@ func (s *Seq) Next() uint64 { return s.index.Add(1) } +// Load returns the last used WAG sequence number. +func (s *Seq) Load() uint64 { + return s.index.Load() +} + // Write puts the WAG node under the specific sequence number into the given // writer. The index must have been allocated to the caller by the sequencer. func Write(w storage.Writer, index uint64, node wagpb.Node) error { diff --git a/pkg/kv/kvserver/kvstorage/wag_truncator.go b/pkg/kv/kvserver/kvstorage/wag_truncator.go index 2c7c1270372a..7e35e33fc403 100644 --- a/pkg/kv/kvserver/kvstorage/wag_truncator.go +++ b/pkg/kv/kvserver/kvstorage/wag_truncator.go @@ -8,6 +8,7 @@ package kvstorage import ( "context" "math" + "sync/atomic" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -17,68 +18,167 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" "golang.org/x/time/rate" ) +// WAGTruncatorTestingKnobs contains testing knobs for the WAGTruncator. +type WAGTruncatorTestingKnobs struct { + // AfterTruncationCallback, if set, is called after each invocation of + // truncateAppliedNodesLive completes. Can be used to synchronize tests + // with the background truncation goroutine. + AfterTruncationCallback func() +} + // WAGTruncator truncates applied WAG nodes and clears their associated raft -// state (log entries and sideloaded files). It supports both offline and online -// mode of operation: -// - [offline] during the store startup, truncate the WAG after it is -// replayed and made durable in the StateEngine. -// - [online] during the normal operation, truncate the WAG nodes that were -// durably applied to the state machine. -// -// TODO(ibrahim): Add the periodic truncation logic. +// state (log entries and sideloaded files). type WAGTruncator struct { - st *cluster.Settings - eng Engines + st *cluster.Settings + knobs WAGTruncatorTestingKnobs + eng Engines + seq *wag.Seq + // lastWAGIndexBeforeStartup is the last WAG node index that existed before + // startup. Truncating WAG nodes with indices <= to this index can ignore + // gaps. + lastWAGIndexBeforeStartup uint64 + // lastTruncatedWAGIndex is the index of the last WAG node that was + // successfully truncated. This is to quickly seek into the potential WAG + // nodes to truncate. + lastTruncatedWAGIndex atomic.Uint64 + // wakeCh is signaled when there are potential WAG nodes to truncate. + wakeCh chan struct{} } // NewWAGTruncator creates a WAGTruncator. -func NewWAGTruncator(st *cluster.Settings, eng Engines) *WAGTruncator { - return &WAGTruncator{st: st, eng: eng} +func NewWAGTruncator( + st *cluster.Settings, knobs WAGTruncatorTestingKnobs, eng Engines, seq *wag.Seq, +) *WAGTruncator { + return &WAGTruncator{ + st: st, + knobs: knobs, + eng: eng, + seq: seq, + lastWAGIndexBeforeStartup: seq.Load(), + wakeCh: make(chan struct{}, 1), + } +} + +// Start launches the background goroutine that performs WAG truncation. +// TODO(ibrahim): Add a setting for keeping a suffix of the WAG for debugging. +// For example, setting a maximum number of WAG nodes to retain for debugging +// purposes. We could also pair it with some time threshold after which all WAG +// nodes are automatically truncated to maintain a manageable size. +func (t *WAGTruncator) Start(ctx context.Context, stopper *stop.Stopper) error { + return stopper.RunAsyncTask(ctx, "wag-truncation", func(ctx context.Context) { + ctx, cancel := stopper.WithCancelOnQuiesce(ctx) + defer cancel() + for { + select { + case <-t.wakeCh: + startIndex := t.lastTruncatedWAGIndex.Load() + 1 + if _, err := t.truncateAppliedNodes(ctx, startIndex); err != nil { + log.KvExec.Errorf(ctx, "truncating WAG node: %+v", err) + } + if t.knobs.AfterTruncationCallback != nil { + t.knobs.AfterTruncationCallback() + } + case <-ctx.Done(): + return + } + } + }) +} + +// DurabilityAdvancedCallback is invoked whenever the state engine completes a +// flush. It checks whether there could possibly be WAG nodes to truncate by +// comparing lastTruncatedWAGIndex against seq.Load(). If there are potential +// truncation opportunities, it sends a non-blocking signal to wake the +// background goroutine. It must return quickly and must not call into the +// engine to avoid deadlock (see storage.Engine.RegisterFlushCompletedCallback). +func (t *WAGTruncator) DurabilityAdvancedCallback() { + // The key point is the point in time where we call "t.seq.Load()". If the + // sequence number is greater than the last truncated WAG sequence number, we + // will attempt to truncate if there are any WAG nodes that are ready for + // truncation. If it found no WAG nodes that are ready to be truncated, it + // will attempt to truncate on every state engine flush until + // sequence number == lastTruncatedWAGIndex. + // + // Note that we need to read lastTruncatedWAGIndex before seq. If we didn't + // do that, we could get a seq reading of x. Then this goroutine gets + // descheduled for some time, while this goroutine isn't running, seq could be + // incremented to x+1, and the live truncation might update + // lastTruncatedWAGIndex to x+1, and then when this goroutine gets scheduled + // again, it will read lastTruncatedWAGIndex as x+1 and seq as x + // and the assertion below might fail. + lastTruncated := t.lastTruncatedWAGIndex.Load() + seq := t.seq.Load() + if seq == lastTruncated { + return + } + if seq < lastTruncated { + log.KvExec.Fatalf(context.Background(), + "WAG seq %d < lastTruncatedWAGIndex %d", seq, lastTruncated) + } + select { + case t.wakeCh <- struct{}{}: + default: + } } -// TruncateAll truncates all applied WAG nodes. It's meant to be used at engine -// startup right after we replay the WAG nodes and sync the state engine. -func (t *WAGTruncator) TruncateAll(ctx context.Context) error { +// truncateAppliedNodes is a helper function that repeatedly tries to delete a +// WAG node in a batch and commit that batch. It iterates from startIndex and +// attempts to delete the WAG nodes it sees. It stops when it encounters a WAG +// node that cannot be deleted, or when it encounters an error. +// +// Returns the index of the last successfully truncated node or 0 when no nodes +// were truncated. It also returns an error if any occurred during truncation. +func (t *WAGTruncator) truncateAppliedNodes( + ctx context.Context, startIndex uint64, +) (uint64, error) { stateReader := t.eng.StateEngine().NewReader(storage.GuaranteedDurability) defer stateReader.Close() + nextIndex := startIndex + var lastTruncated uint64 for { if err := ctx.Err(); err != nil { - return err + return lastTruncated, err } b := t.eng.LogEngine().NewWriteBatch() - truncated, err := t.truncateAppliedWAGNodeAndClearRaftState( - ctx, Raft{RO: t.eng.LogEngine(), WO: b}, stateReader, 0, /* index */ + truncatedIdx, err := t.truncateAppliedWAGNodeAndClearRaftState( + ctx, Raft{RO: t.eng.LogEngine(), WO: b}, stateReader, nextIndex, ) - if err == nil && truncated { + if err == nil && truncatedIdx != 0 { err = b.Commit(false /* sync */) } b.Close() if err != nil { - return err + return lastTruncated, err } - if !truncated { - break + if truncatedIdx == 0 { + return lastTruncated, nil } + // At this point we know that the last truncation succeeded and the batch + // was committed, and we can move on to the next node. + lastTruncated = truncatedIdx + t.lastTruncatedWAGIndex.Store(lastTruncated) + nextIndex = lastTruncated + 1 } - return nil } // truncateAppliedWAGNodeAndClearRaftState deletes a WAG node if all of its // events have been applied to the state engine. For nodes containing // EventDestroy or EventSubsume events, it also clears the corresponding raft // log prefix from the engine and the sideloaded entries storage. -// If truncateIndex is 0, the function deletes the first WAG node regardless of -// its index. Otherwise, it only deletes the node matching truncateIndex if it -// exists. // -// Returns a boolean indicating whether a node was successfully truncated or -// not. If the return value is false, it means that either there are no WAG -// nodes left, or that the WAG node has not been applied to the state engine. -// Also, an error is returned if the WAG node could not be fetched or deleted. +// It iterates from truncateIndex and attempts to delete the first WAG node it +// sees. +// +// Returns the following: +// - the index of the last WAG node that was deleted, or 0 if no nodes were +// deleted. +// - an error if there was an error deleting the WAG node. // // The caller must provide a stateRO reader with GuaranteedDurability so that // only state confirmed flushed to persistent storage is visible. This ensures @@ -88,35 +188,27 @@ func (t *WAGTruncator) TruncateAll(ctx context.Context) error { // TODO(ibrahim): Support deleting multiple WAG nodes within the same batch. func (t *WAGTruncator) truncateAppliedWAGNodeAndClearRaftState( ctx context.Context, raft Raft, stateRO StateRO, truncateIndex uint64, -) (bool, error) { +) (uint64, error) { var iter wag.Iterator - var iterStartKey roachpb.Key - if truncateIndex == 0 { - // Delete the first WAG node that exists, regardless of its index. - iterStartKey = keys.StoreWAGPrefix() - } else { - // Only delete the WAG node with the expected index. - iterStartKey = keys.StoreWAGNodeKey(truncateIndex) - } - + iterStartKey := keys.StoreWAGNodeKey(truncateIndex) for index, node := range iter.IterFrom(ctx, raft.RO, iterStartKey) { - if truncateIndex != 0 && truncateIndex != index { - return false, nil + if index > t.lastWAGIndexBeforeStartup && truncateIndex != index { + // We cannot ignore gaps for WAG indices > lastWAGIndexBeforeStartup. + return 0, nil } - // TODO(ibrahim): Right now, the canApplyWAGNode function returns a list of // raftCatchUpTargets that are not needed for the purposes of truncation, // consider refactoring the function to return only the needed info. - replayAction, err := canApplyWAGNode(ctx, node, stateRO) + action, err := canApplyWAGNode(ctx, node, stateRO) if err != nil { - return false, err + return 0, err } - if replayAction.apply { + if action.apply { // If an event needs to be applied, the WAG node cannot be deleted yet. - return false, nil + return 0, nil } if err := wag.Delete(raft.WO, index); err != nil { - return false, err + return 0, err } // Clean up the raft log prefix of a destroyed/subsumed replica. @@ -125,12 +217,12 @@ func (t *WAGTruncator) truncateAppliedWAGNodeAndClearRaftState( continue } if err := t.clearReplicaRaftLogAndSideloaded(ctx, raft, event.Addr.RangeID, event.Addr.Index); err != nil { - return false, err + return 0, err } } - return true, nil + return index, nil } - return false, iter.Error() + return 0, iter.Error() } // clearReplicaRaftLogAndSideloaded clears raft log entries at or below the given index for diff --git a/pkg/kv/kvserver/kvstorage/wag_truncator_test.go b/pkg/kv/kvserver/kvstorage/wag_truncator_test.go index 18cc284030c2..5e00ef25ec6f 100644 --- a/pkg/kv/kvserver/kvstorage/wag_truncator_test.go +++ b/pkg/kv/kvserver/kvstorage/wag_truncator_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/stretchr/testify/require" "golang.org/x/time/rate" ) @@ -103,7 +104,9 @@ func (e *testEngines) listWAGNodes(t *testing.T) []uint64 { return indices } -func TestTruncateApplied(t *testing.T) { +// TestTruncateAppliedOnly verifies that we only truncate WAG nodes that are +// durably applied. +func TestTruncateAppliedOnly(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() @@ -240,20 +243,20 @@ func TestTruncateApplied(t *testing.T) { t.Run("", func(t *testing.T) { e := makeTestEngines() defer e.Close() - truncator := NewWAGTruncator(st, e.Engines) tc.setup(t, &e) + truncator := NewWAGTruncator(st, WAGTruncatorTestingKnobs{}, e.Engines, &e.seq) require.NoError(t, e.stateEngine.Flush()) - require.NoError(t, truncator.TruncateAll(ctx)) + _, err := truncator.truncateAppliedNodes(ctx, 0 /* startIndex */) + require.NoError(t, err) require.Equal(t, tc.wantWAGIndices, e.listWAGNodes(t)) }) } } -// TestTruncateAndClearRaftState verifies that -// truncateAppliedWAGNodeAndClearRaftState only clears raft log entries and -// sideloaded files up to the destroyed/subsumed replica's last index. Entries -// and files beyond that index may belong to a newer replica and must be -// preserved. +// TestTruncateAndClearRaftState verifies that WAG truncation only clears raft +// log entries and sideloaded files up to the destroyed/subsumed replica's last +// index. Entries and files beyond that index may belong to a newer replica and +// must be preserved. func TestTruncateAndClearRaftState(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -268,20 +271,13 @@ func TestTruncateAndClearRaftState(t *testing.T) { t.Run(eventType.String(), func(t *testing.T) { e := makeTestEngines() defer e.Close() - truncator := NewWAGTruncator(st, e.Engines) + truncator := NewWAGTruncator(st, WAGTruncatorTestingKnobs{}, e.Engines, &e.seq) // Write WAG nodes: init then destroy/subsume at index 20. - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 10), Type: wagpb.EventInit, - }) - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 20), Type: eventType, - }) - + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 10), Type: wagpb.EventInit}) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 20), Type: eventType}) // Create a WAG node for a newer replica for the same range. - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r2, 0), Type: wagpb.EventCreate, - }) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r2, 0), Type: wagpb.EventCreate}) // Tombstone confirms destruction/subsumption. require.NoError(t, sl.SetRangeTombstone(ctx, e.StateEngine(), @@ -303,7 +299,8 @@ func TestTruncateAndClearRaftState(t *testing.T) { require.NoError(t, ss.Put(ctx, idx, 1 /* term */, []byte("sst-data"))) } require.NoError(t, e.stateEngine.Flush()) - require.NoError(t, truncator.TruncateAll(ctx)) + _, err := truncator.truncateAppliedNodes(ctx, 1 /* startIndex */) + require.NoError(t, err) // Raft entries <= 20 belong to the old replica and must be deleted. The // rest shouldn't be deleted by the WAG truncator. require.Equal(t, @@ -328,87 +325,146 @@ func TestTruncateAndClearRaftState(t *testing.T) { } } -// TestTruncateGapHandling verifies that truncateAppliedWAGNodeAndClearRaftState -// handles gaps in WAG node indices correctly based on expectedIndex. When -// expectedIndex is 0, the first node is deleted regardless of its index. When -// non-zero, only the node at that exact index is deleted. -// -// The test sets up three WAG nodes with gaps between them: -// [Index: 2] -> [Index: 4] -> [Index: 6] -func TestTruncateGapHandling(t *testing.T) { +// TestTruncateAppliedNodes exercises truncateAppliedNodes() across different +// combinations of startIndex, and lastIndexBeforeStartup. The test sets up WAG +// nodes at indices [2, 4, 5, 6]. Node 6 isn't ready for truncation yet. +func TestTruncateAppliedNodes(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() st := cluster.MakeTestingClusterSettings() r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1} - sl := MakeStateLoader(1 /* rangeID */) + sl := MakeStateLoader(r1.RangeID) - for _, calls := range [][]struct { - index uint64 - wantTruncated bool - wantWAGIndices []uint64 + for _, tc := range []struct { + startIndex uint64 + lastIndexBeforeStartup uint64 + wantLastTruncated uint64 + wantRemaining []uint64 }{ { - // index=0 removes the first WAG node regardless of its index. - {index: 0, wantTruncated: true, wantWAGIndices: []uint64{4, 6}}, - {index: 0, wantTruncated: true, wantWAGIndices: []uint64{6}}, - {index: 0, wantTruncated: true, wantWAGIndices: nil}, + // We cannot ignore gaps after lastIndexBeforeStartup. + startIndex: 0, lastIndexBeforeStartup: 2, wantLastTruncated: 2, wantRemaining: []uint64{4, 5, 6}, + }, + { + // We cannot delete an unapplied node. + startIndex: 0, lastIndexBeforeStartup: 4, wantLastTruncated: 5, wantRemaining: []uint64{6}, + }, + { + startIndex: 3, lastIndexBeforeStartup: 2, wantLastTruncated: 0, wantRemaining: []uint64{2, 4, 5, 6}, + }, + { + startIndex: 3, lastIndexBeforeStartup: 4, wantLastTruncated: 5, wantRemaining: []uint64{2, 6}, }, { - // A non-existent index is a no-op. - {index: 1, wantTruncated: false, wantWAGIndices: []uint64{2, 4, 6}}, - {index: 3, wantTruncated: false, wantWAGIndices: []uint64{2, 4, 6}}, - {index: 5, wantTruncated: false, wantWAGIndices: []uint64{2, 4, 6}}, - {index: 7, wantTruncated: false, wantWAGIndices: []uint64{2, 4, 6}}, + startIndex: 7, lastIndexBeforeStartup: 0, wantLastTruncated: 0, wantRemaining: []uint64{2, 4, 5, 6}, }, { - // In theory, we can remove a WAG node at an index that is not the first. - {index: 4, wantTruncated: true, wantWAGIndices: []uint64{2, 6}}, - {index: 6, wantTruncated: true, wantWAGIndices: []uint64{2}}, - {index: 2, wantTruncated: true, wantWAGIndices: nil}, + startIndex: 7, lastIndexBeforeStartup: 6, wantLastTruncated: 0, wantRemaining: []uint64{2, 4, 5, 6}, }, } { t.Run("", func(t *testing.T) { e := makeTestEngines() defer e.Close() - truncator := NewWAGTruncator(st, e.Engines) - - // Write WAG nodes at indices 2, 4, 6. + // Write WAG nodes at indices 2, 4, 5. e.seq.Next() - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 0), Type: wagpb.EventCreate, - }) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 0), Type: wagpb.EventCreate}) e.seq.Next() - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 15), Type: wagpb.EventInit, - }) - e.seq.Next() - e.writeWAGNode(t, wagpb.Event{ - Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply, - }) - - // Set applied state so all WAG nodes are considered applied. + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 15), Type: wagpb.EventInit}) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply}) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 25), Type: wagpb.EventApply}) + truncator := NewWAGTruncator(st, WAGTruncatorTestingKnobs{}, e.Engines, &e.seq) + truncator.lastWAGIndexBeforeStartup = tc.lastIndexBeforeStartup require.NoError(t, sl.SetRaftReplicaID(ctx, e.StateEngine(), r1.ReplicaID)) require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(), &kvserverpb.RangeAppliedState{RaftAppliedIndex: 20})) require.NoError(t, e.stateEngine.Flush()) - for _, c := range calls { - stateReader := e.StateEngine().NewReader(storage.GuaranteedDurability) - b := e.LogEngine().NewWriteBatch() - truncated, err := truncator.truncateAppliedWAGNodeAndClearRaftState( - ctx, Raft{RO: e.LogEngine(), WO: b}, stateReader, c.index, - ) - require.NoError(t, err) - require.Equal(t, c.wantTruncated, truncated) - if truncated { - require.NoError(t, b.Commit(false /* sync */)) - } - b.Close() - stateReader.Close() - require.Equal(t, c.wantWAGIndices, e.listWAGNodes(t)) - } + lastTruncated, err := truncator.truncateAppliedNodes(ctx, tc.startIndex) + require.NoError(t, err) + require.Equal(t, tc.wantLastTruncated, lastTruncated) + require.Equal(t, tc.wantRemaining, e.listWAGNodes(t)) }) } } + +// TestWAGTruncatorBackground verifies that the WAGTruncator background +// goroutine only truncates WAG nodes when both conditions are met: (1) the +// state engine has flushed, and (2) there are WAG nodes that are eligible for +// truncation. +func TestWAGTruncatorBackground(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + e := makeTestEngines() + defer e.Close() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1} + sl := MakeStateLoader(r1.RangeID) + + // Initialize replica state so events can be considered applied. + require.NoError(t, sl.SetRaftReplicaID(ctx, e.StateEngine(), r1.ReplicaID)) + require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(), + &kvserverpb.RangeAppliedState{RaftAppliedIndex: 100})) + + // Write two WAG nodes whose events are applied with a gap in between them. + // This is to simulate some WAG nodes at engine startup. + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 10), Type: wagpb.EventInit}) + e.seq.Next() + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply}) + + // Start the periodic WAG truncation background task with a knob that + // signals when truncation completes, so the test can synchronize + // deterministically instead of polling. + truncationDone := make(chan struct{}, 1) + truncator := NewWAGTruncator(st, WAGTruncatorTestingKnobs{ + AfterTruncationCallback: func() { + truncationDone <- struct{}{} + }, + }, e.Engines, &e.seq, + ) + // Start the periodic WAG truncation background task. + require.NoError(t, truncator.Start(ctx, stopper)) + // Create a WAG node after startup. + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 30), Type: wagpb.EventApply}) + require.Equal(t, []uint64{1, 3, 4}, e.listWAGNodes(t)) + + flushAndWaitForTruncation := func() { + require.NoError(t, e.StateEngine().Flush()) + truncator.DurabilityAdvancedCallback() + <-truncationDone + } + // We expect all WAG nodes to be truncated when the state engine is flushed. + flushAndWaitForTruncation() + require.Equal(t, ([]uint64)(nil), e.listWAGNodes(t)) + require.Equal(t, uint64(4), truncator.lastTruncatedWAGIndex.Load()) + + // Write two WAG nodes whose events are applied (index <= 100). + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 40), Type: wagpb.EventApply}) + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 50), Type: wagpb.EventApply}) + require.Equal(t, []uint64{5, 6}, e.listWAGNodes(t)) + + // Now flush the state engine and signal again. Both nodes should be + // truncated since their events are applied. + flushAndWaitForTruncation() + require.Equal(t, ([]uint64)(nil), e.listWAGNodes(t)) + require.Equal(t, uint64(6), truncator.lastTruncatedWAGIndex.Load()) + + // Write another WAG node but it is NOT applied yet. + e.writeWAGNode(t, wagpb.Event{Addr: wagpb.MakeAddr(r1, 200), Type: wagpb.EventApply}) + flushAndWaitForTruncation() + // Node 7 should remain because its event isn't applied yet. + require.Equal(t, []uint64{7}, e.listWAGNodes(t)) + require.Equal(t, uint64(6), truncator.lastTruncatedWAGIndex.Load()) + + // Advance the applied index past 200 and flush. Now node 7 should be + // truncated. + require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(), + &kvserverpb.RangeAppliedState{RaftAppliedIndex: 200})) + flushAndWaitForTruncation() + require.Equal(t, ([]uint64)(nil), e.listWAGNodes(t)) + require.Equal(t, uint64(7), truncator.lastTruncatedWAGIndex.Load()) +} diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index e7c09bf06b2f..06c522fe2a83 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -911,7 +911,8 @@ type DurabilityRequirement int8 const ( // StandardDurability is what should normally be used. StandardDurability DurabilityRequirement = iota - // GuaranteedDurability is an advanced option (only for raftLogTruncator). + // GuaranteedDurability is an advanced option (only for raftLogTruncator + // and WAGTruncator). GuaranteedDurability )