Skip to content

Commit 795be4e

Browse files
kvstorage: add live WAG truncation
This commit adds the ability to perform live WAG truncation. It keeps track of the last successfully truncated WAG node index, and only attempts to perform a round of WAG truncation if (1) The state engine flushes, and (2) There is a higher WAG sequence number than the last one we truncated. Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
1 parent 979bbe0 commit 795be4e

5 files changed

Lines changed: 235 additions & 13 deletions

File tree

pkg/kv/kvserver/kvstorage/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ go_library(
3838
"//pkg/util/iterutil",
3939
"//pkg/util/log",
4040
"//pkg/util/protoutil",
41+
"//pkg/util/stop",
4142
"//pkg/util/timeutil",
4243
"@com_github_cockroachdb_errors//:errors",
4344
"@org_golang_x_time//rate",
@@ -88,6 +89,7 @@ go_test(
8889
"//pkg/util/tracing",
8990
"//pkg/util/uuid",
9091
"@com_github_cockroachdb_datadriven//:datadriven",
92+
"@com_github_cockroachdb_errors//:errors",
9193
"@com_github_stretchr_testify//require",
9294
"@org_golang_x_time//rate",
9395
],

pkg/kv/kvserver/kvstorage/wag/store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ func (s *Seq) Next() uint64 {
6565
return s.index.Add(1)
6666
}
6767

68+
// Load returns the last used WAG sequence number.
69+
func (s *Seq) Load() uint64 {
70+
return s.index.Load()
71+
}
72+
6873
// Write puts the WAG node under the specific sequence number into the given
6974
// writer. The index must have been allocated to the caller by the sequencer.
7075
func Write(w storage.Writer, index uint64, node wagpb.Node) error {

pkg/kv/kvserver/kvstorage/wag_truncator.go

Lines changed: 119 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package kvstorage
88
import (
99
"context"
1010
"math"
11+
"sync/atomic"
1112

1213
"github.com/cockroachdb/cockroach/pkg/keys"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -17,6 +18,8 @@ import (
1718
"github.com/cockroachdb/cockroach/pkg/roachpb"
1819
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1920
"github.com/cockroachdb/cockroach/pkg/storage"
21+
"github.com/cockroachdb/cockroach/pkg/util/log"
22+
"github.com/cockroachdb/cockroach/pkg/util/stop"
2023
"github.com/cockroachdb/errors"
2124
"golang.org/x/time/rate"
2225
)
@@ -28,43 +31,150 @@ import (
2831
// replayed and made durable in the StateEngine.
2932
// - [online] during the normal operation, truncate the WAG nodes that were
3033
// durably applied to the state machine.
31-
//
32-
// TODO(ibrahim): Add the periodic truncation logic.
3334
type WAGTruncator struct {
3435
st *cluster.Settings
3536
eng Engines
37+
seq *wag.Seq
38+
// wakeCh is signaled when there are potential WAG nodes to truncate.
39+
wakeCh chan struct{}
40+
// lastTruncatedWAGIndex is the index of the last WAG node that was
41+
// successfully truncated. This is to be used by online WAG truncation to
42+
// quickly seek into the potential WAG nodes to truncate.
43+
lastTruncatedWAGIndex atomic.Uint64
3644
}
3745

3846
// NewWAGTruncator creates a WAGTruncator.
39-
func NewWAGTruncator(st *cluster.Settings, eng Engines) *WAGTruncator {
40-
return &WAGTruncator{st: st, eng: eng}
47+
func NewWAGTruncator(st *cluster.Settings, eng Engines, seq *wag.Seq) *WAGTruncator {
48+
return &WAGTruncator{
49+
st: st,
50+
eng: eng,
51+
seq: seq,
52+
wakeCh: make(chan struct{}, 1),
53+
}
54+
}
55+
56+
// Start launches the background goroutine that performs live WAG truncation.
57+
// It must be called before any new WAG nodes are created. It sets
58+
// lastTruncatedWAGIndex to the current seq value, establishing the starting
59+
// point for periodic WAG truncation so that it begins from the first new WAG
60+
// node.
61+
//
62+
// TODO(ibrahim): Add a setting for keeping a suffix of the WAG for debugging.
63+
// For example, setting a maximum number of WAG nodes to retain for debugging
64+
// purposes. We could also pair it with some time threshold after which all WAG
65+
// nodes are automatically truncated to maintain a manageable size.
66+
func (t *WAGTruncator) Start(ctx context.Context, stopper *stop.Stopper) error {
67+
t.lastTruncatedWAGIndex.Store(t.seq.Load())
68+
return stopper.RunAsyncTask(ctx, "wag-truncation", func(ctx context.Context) {
69+
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
70+
defer cancel()
71+
for {
72+
select {
73+
case <-t.wakeCh:
74+
t.truncateAppliedNodesLive(ctx)
75+
case <-ctx.Done():
76+
return
77+
}
78+
}
79+
})
80+
}
81+
82+
// DurabilityAdvancedCallback is invoked whenever the state engine completes a
83+
// flush. It checks whether there could possibly be WAG nodes to truncate by
84+
// comparing lastTruncatedWAGIndex against seq.Load(). If there are potential
85+
// truncation opportunities, it sends a non-blocking signal to wake the
86+
// background goroutine. It must return quickly and must not call into the
87+
// engine to avoid deadlock (see storage.Engine.RegisterFlushCompletedCallback).
88+
func (t *WAGTruncator) DurabilityAdvancedCallback() {
89+
// The key point is the point in time where we call "t.seq.Load()". If the
90+
// sequence number is greater than the last truncated WAG sequence number, we
91+
// will attempt to truncate if there are any WAG nodes that are ready for
92+
// truncation. If it found no WAG nodes that are ready to be truncated, it
93+
// will attempt to truncate on every state engine flush until
94+
// sequence number == lastTruncatedWAGIndex.
95+
seq, lastTruncated := t.seq.Load(), t.lastTruncatedWAGIndex.Load()
96+
if seq == lastTruncated {
97+
return
98+
}
99+
if seq < lastTruncated {
100+
log.KvExec.Fatalf(context.Background(),
101+
"WAG seq %d < lastTruncatedWAGIndex %d", seq, lastTruncated)
102+
}
103+
select {
104+
case t.wakeCh <- struct{}{}:
105+
default:
106+
}
41107
}
42108

43109
// TruncateAll truncates all applied WAG nodes. It's meant to be used at engine
44110
// startup right after we replay the WAG nodes and sync the state engine.
45111
func (t *WAGTruncator) TruncateAll(ctx context.Context) error {
112+
_, err := t.truncateAppliedNodes(ctx, 0)
113+
return err
114+
}
115+
116+
// truncateAppliedNodesLive is called by the background goroutine to truncate
117+
// applied WAG nodes. Unlike TruncateAll, it only truncates nodes with index
118+
// after lastTruncatedWAGIndex. This makes it avoid jumping over gaps in WAG
119+
// node indices.
120+
//
121+
// Note that jumping over gaps during WAG truncation should be safe because a
122+
// WAG node is only truncated if it has been applied to the state engine, and it
123+
// by definition has all the required events applied. However, we avoid jumping
124+
// over gaps so that we know the exact index to truncate next, which facilitates
125+
// seeking past previously deleted WAG garbage.
126+
func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) {
127+
startIndex := t.lastTruncatedWAGIndex.Load() + 1
128+
lastTruncated, err := t.truncateAppliedNodes(ctx, startIndex)
129+
if err != nil {
130+
// TODO(ibrahim): We need to decide if we need to retry truncating this
131+
// WAG node now, or rely on the next state engine flush to do it.
132+
log.KvExec.Errorf(ctx, "truncating WAG node: %+v", err)
133+
}
134+
if lastTruncated >= startIndex {
135+
t.lastTruncatedWAGIndex.Store(lastTruncated)
136+
}
137+
}
138+
139+
// truncateAppliedNodes is a helper function used by both offline and online WAG
140+
// truncation functions. It repeatedly tries to delete a WAG node in a batch and
141+
// commit that batch. When startIndex is 0, it repeatedly truncates the first
142+
// available node (used at startup to drain the WAG). When startIndex is greater
143+
// than 0, it truncates nodes sequentially starting from that index (used during
144+
// online operation to avoid jumping over gaps in WAG indices).
145+
//
146+
// Returns the index of the last successfully truncated node when startIndex > 0,
147+
// or 0 if no nodes were truncated or startIndex was 0.
148+
func (t *WAGTruncator) truncateAppliedNodes(
149+
ctx context.Context, startIndex uint64,
150+
) (uint64, error) {
46151
stateReader := t.eng.StateEngine().NewReader(storage.GuaranteedDurability)
47152
defer stateReader.Close()
153+
nextIndex := startIndex
154+
var lastTruncated uint64
48155
for {
49156
if err := ctx.Err(); err != nil {
50-
return err
157+
return lastTruncated, err
51158
}
52159
b := t.eng.LogEngine().NewWriteBatch()
53160
truncated, err := t.truncateAppliedWAGNodeAndClearRaftState(
54-
ctx, Raft{RO: t.eng.LogEngine(), WO: b}, stateReader, 0, /* index */
161+
ctx, Raft{RO: t.eng.LogEngine(), WO: b}, stateReader, nextIndex,
55162
)
56163
if err == nil && truncated {
57164
err = b.Commit(false /* sync */)
58165
}
59166
b.Close()
60167
if err != nil {
61-
return err
168+
return lastTruncated, err
62169
}
63170
if !truncated {
64-
break
171+
return lastTruncated, nil
172+
}
173+
if nextIndex > 0 {
174+
lastTruncated = nextIndex
175+
nextIndex++
65176
}
66177
}
67-
return nil
68178
}
69179

70180
// truncateAppliedWAGNodeAndClearRaftState deletes a WAG node if all of its

pkg/kv/kvserver/kvstorage/wag_truncator_test.go

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package kvstorage
88
import (
99
"context"
1010
"math"
11+
"slices"
1112
"testing"
1213

1314
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -20,8 +21,11 @@ import (
2021
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2122
"github.com/cockroachdb/cockroach/pkg/storage"
2223
"github.com/cockroachdb/cockroach/pkg/storage/fs"
24+
"github.com/cockroachdb/cockroach/pkg/testutils"
2325
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2426
"github.com/cockroachdb/cockroach/pkg/util/log"
27+
"github.com/cockroachdb/cockroach/pkg/util/stop"
28+
"github.com/cockroachdb/errors"
2529
"github.com/stretchr/testify/require"
2630
"golang.org/x/time/rate"
2731
)
@@ -103,6 +107,28 @@ func (e *testEngines) listWAGNodes(t *testing.T) []uint64 {
103107
return indices
104108
}
105109

110+
func eventuallyExpectWAGNodesIndices(t *testing.T, e *testEngines, expected []uint64) {
111+
t.Helper()
112+
testutils.SucceedsSoon(t, func() error {
113+
actual := e.listWAGNodes(t)
114+
if slices.Equal(expected, actual) {
115+
return nil
116+
}
117+
return errors.Newf("expected WAG nodes %v, got %v", expected, actual)
118+
})
119+
}
120+
121+
func eventuallyExpectLastTruncatedWAGIndex(t *testing.T, truncator *WAGTruncator, expected uint64) {
122+
t.Helper()
123+
testutils.SucceedsSoon(t, func() error {
124+
actual := truncator.lastTruncatedWAGIndex.Load()
125+
if actual == expected {
126+
return nil
127+
}
128+
return errors.Newf("expected lastTruncatedWAGIndex %d, got %d", expected, actual)
129+
})
130+
}
131+
106132
func TestTruncateApplied(t *testing.T) {
107133
defer leaktest.AfterTest(t)()
108134
defer log.Scope(t).Close(t)
@@ -240,7 +266,7 @@ func TestTruncateApplied(t *testing.T) {
240266
t.Run("", func(t *testing.T) {
241267
e := makeTestEngines()
242268
defer e.Close()
243-
truncator := NewWAGTruncator(st, e.Engines)
269+
truncator := NewWAGTruncator(st, e.Engines, &e.seq)
244270
tc.setup(t, &e)
245271
require.NoError(t, e.stateEngine.Flush())
246272
require.NoError(t, truncator.TruncateAll(ctx))
@@ -268,7 +294,7 @@ func TestTruncateAndClearRaftState(t *testing.T) {
268294
t.Run(eventType.String(), func(t *testing.T) {
269295
e := makeTestEngines()
270296
defer e.Close()
271-
truncator := NewWAGTruncator(st, e.Engines)
297+
truncator := NewWAGTruncator(st, e.Engines, &e.seq)
272298

273299
// Write WAG nodes: init then destroy/subsume at index 20.
274300
e.writeWAGNode(t, wagpb.Event{
@@ -372,7 +398,7 @@ func TestTruncateGapHandling(t *testing.T) {
372398
t.Run("", func(t *testing.T) {
373399
e := makeTestEngines()
374400
defer e.Close()
375-
truncator := NewWAGTruncator(st, e.Engines)
401+
truncator := NewWAGTruncator(st, e.Engines, &e.seq)
376402

377403
// Write WAG nodes at indices 2, 4, 6.
378404
e.seq.Next()
@@ -412,3 +438,81 @@ func TestTruncateGapHandling(t *testing.T) {
412438
})
413439
}
414440
}
441+
442+
// TestWAGTruncatorBackground verifies that the WAGTruncator background
443+
// goroutine only truncates WAG nodes when both conditions are met: (1) the
444+
// state engine has flushed, and (2) there are WAG nodes to truncate (i.e.,
445+
// seq.Load() > lastTruncatedWAGIndex).
446+
func TestWAGTruncatorBackground(t *testing.T) {
447+
defer leaktest.AfterTest(t)()
448+
defer log.Scope(t).Close(t)
449+
ctx := context.Background()
450+
st := cluster.MakeTestingClusterSettings()
451+
e := makeTestEngines()
452+
defer e.Close()
453+
stopper := stop.NewStopper()
454+
defer stopper.Stop(ctx)
455+
r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1}
456+
sl := MakeStateLoader(r1.RangeID)
457+
458+
// Initialize replica state so events can be considered applied.
459+
require.NoError(t, sl.SetRaftReplicaID(ctx, e.StateEngine(), r1.ReplicaID))
460+
require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(),
461+
&kvserverpb.RangeAppliedState{RaftAppliedIndex: 100}))
462+
463+
// Start the periodic WAG truncation background task.
464+
truncator := NewWAGTruncator(st, e.Engines, &e.seq)
465+
require.NoError(t, truncator.Start(ctx, stopper))
466+
467+
flushStateEngineAndSignal := func() {
468+
require.NoError(t, e.StateEngine().Flush())
469+
truncator.DurabilityAdvancedCallback()
470+
}
471+
472+
// No WAG nodes exist. Flushing the state engine should not cause the
473+
// truncator to do anything (seq.Load() == lastTruncatedWAGIndex == 0).
474+
flushStateEngineAndSignal()
475+
eventuallyExpectWAGNodesIndices(t, &e, nil)
476+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(0))
477+
478+
// Write two WAG nodes whose events are applied (index <= 100).
479+
e.writeWAGNode(t, wagpb.Event{
480+
Addr: wagpb.MakeAddr(r1, 10), Type: wagpb.EventInit,
481+
})
482+
e.writeWAGNode(t, wagpb.Event{
483+
Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply,
484+
})
485+
486+
// WAG nodes exist, but the state engine hasn't flushed yet, so the
487+
// GuaranteedDurability reader won't see the replica state. Signal the
488+
// truncator without flushing first.
489+
truncator.DurabilityAdvancedCallback()
490+
eventuallyExpectWAGNodesIndices(t, &e, []uint64{1, 2})
491+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(0))
492+
493+
// Now flush the state engine and signal again. Both nodes should be
494+
// truncated since their events are applied (index 10 and 20 <= 100).
495+
flushStateEngineAndSignal()
496+
eventuallyExpectWAGNodesIndices(t, &e, nil)
497+
// The lastTruncatedWAGIndex is stored after truncateAppliedNodes returns,
498+
// so there is a small window between the WAG nodes being deleted and the
499+
// index being updated. Use an eventually-consistent check.
500+
eventuallyExpectLastTruncatedWAGIndex(t, truncator, 2)
501+
502+
// Write a third WAG node that is NOT applied (index 200 > 100).
503+
e.writeWAGNode(t, wagpb.Event{
504+
Addr: wagpb.MakeAddr(r1, 200), Type: wagpb.EventApply,
505+
})
506+
flushStateEngineAndSignal()
507+
// Node 3 should remain because its event isn't applied yet.
508+
eventuallyExpectWAGNodesIndices(t, &e, []uint64{3})
509+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(2))
510+
511+
// Advance the applied index past 200 and flush. Now node 3 should be
512+
// truncated.
513+
require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(),
514+
&kvserverpb.RangeAppliedState{RaftAppliedIndex: 200}))
515+
flushStateEngineAndSignal()
516+
eventuallyExpectWAGNodesIndices(t, &e, nil)
517+
eventuallyExpectLastTruncatedWAGIndex(t, truncator, 3)
518+
}

pkg/storage/engine.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,8 @@ type DurabilityRequirement int8
911911
const (
912912
// StandardDurability is what should normally be used.
913913
StandardDurability DurabilityRequirement = iota
914-
// GuaranteedDurability is an advanced option (only for raftLogTruncator).
914+
// GuaranteedDurability is an advanced option (only for raftLogTruncator
915+
// and WAGTruncator).
915916
GuaranteedDurability
916917
)
917918

0 commit comments

Comments
 (0)