Skip to content

Commit 74cee63

Browse files
kvstorage: add live WAG truncation
This commit adds the ability to perform live WAG truncation. It keeps track of the last successfully truncated WAG node index, and only attempts to perform a round of WAG truncation if (1) The state engine flushes, and (2) There is a higher WAG sequence number than the last one we truncated. Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
1 parent 7e96c85 commit 74cee63

4 files changed

Lines changed: 231 additions & 12 deletions

File tree

pkg/kv/kvserver/kvstorage/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ go_library(
3737
"//pkg/util/iterutil",
3838
"//pkg/util/log",
3939
"//pkg/util/protoutil",
40+
"//pkg/util/stop",
4041
"//pkg/util/timeutil",
4142
"@com_github_cockroachdb_errors//:errors",
4243
"@org_golang_x_time//rate",
@@ -86,6 +87,7 @@ go_test(
8687
"//pkg/util/tracing",
8788
"//pkg/util/uuid",
8889
"@com_github_cockroachdb_datadriven//:datadriven",
90+
"@com_github_cockroachdb_errors//:errors",
8991
"@com_github_stretchr_testify//require",
9092
"@org_golang_x_time//rate",
9193
],

pkg/kv/kvserver/kvstorage/wag/store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ func (s *Seq) Next() uint64 {
6565
return s.index.Add(1)
6666
}
6767

68+
// Load returns the last used WAG sequence number.
69+
func (s *Seq) Load() uint64 {
70+
return s.index.Load()
71+
}
72+
6873
// Write puts the WAG node under the specific sequence number into the given
6974
// writer. The index must have been allocated to the caller by the sequencer.
7075
func Write(w storage.Writer, index uint64, node wagpb.Node) error {

pkg/kv/kvserver/kvstorage/wag_truncator.go

Lines changed: 115 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package kvstorage
88
import (
99
"context"
1010
"math"
11+
"sync/atomic"
1112

1213
"github.com/cockroachdb/cockroach/pkg/keys"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -17,6 +18,8 @@ import (
1718
"github.com/cockroachdb/cockroach/pkg/roachpb"
1819
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1920
"github.com/cockroachdb/cockroach/pkg/storage"
21+
"github.com/cockroachdb/cockroach/pkg/util/log"
22+
"github.com/cockroachdb/cockroach/pkg/util/stop"
2023
"github.com/cockroachdb/errors"
2124
"golang.org/x/time/rate"
2225
)
@@ -28,43 +31,146 @@ import (
2831
// replayed and made durable in the StateEngine.
2932
// - [online] during the normal operation, truncate the WAG nodes that were
3033
// durably applied to the state machine.
31-
//
32-
// TODO(ibrahim): Add the periodic truncation logic.
3334
type WAGTruncator struct {
3435
st *cluster.Settings
3536
eng Engines
37+
seq *wag.Seq
38+
// wakeCh is signaled when there are potential WAG nodes to truncate.
39+
wakeCh chan struct{}
40+
// lastTruncatedWAGIndex is the index of the last WAG node that was
41+
// successfully truncated. This is to be used by online WAG truncation to
42+
// quickly seek into the potential WAG nodes to truncate.
43+
lastTruncatedWAGIndex atomic.Uint64
3644
}
3745

3846
// NewWAGTruncator creates a WAGTruncator.
39-
func NewWAGTruncator(st *cluster.Settings, eng Engines) *WAGTruncator {
40-
return &WAGTruncator{st: st, eng: eng}
47+
func NewWAGTruncator(st *cluster.Settings, eng Engines, seq *wag.Seq) *WAGTruncator {
48+
return &WAGTruncator{
49+
st: st,
50+
eng: eng,
51+
seq: seq,
52+
wakeCh: make(chan struct{}, 1),
53+
}
54+
}
55+
56+
// Start launches the background goroutine that performs live WAG truncation.
57+
// It must be called before starting the engine. It sets lastTruncatedWAGIndex,
58+
// and we need to set it before any WAG node could have been created so that we
59+
// guarantee that the periodic WAG truncation will start from the first WAG node.
60+
//
61+
// TODO(ibrahim): Add a TestKnob for keeping a suffix of the WAG for debugging.
62+
func (t *WAGTruncator) Start(ctx context.Context, stopper *stop.Stopper) error {
63+
t.lastTruncatedWAGIndex.Store(t.seq.Load())
64+
return stopper.RunAsyncTask(ctx, "wag-truncation", func(ctx context.Context) {
65+
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
66+
defer cancel()
67+
for {
68+
select {
69+
case <-t.wakeCh:
70+
t.truncateAppliedNodesLive(ctx)
71+
case <-ctx.Done():
72+
return
73+
}
74+
}
75+
})
76+
}
77+
78+
// DurabilityAdvancedCallback is invoked whenever the state engine completes a
79+
// flush. It checks whether there could possibly be WAG nodes to truncate by
80+
// comparing lastTruncatedWAGIndex against seq.Load(). If there are potential
81+
// truncation opportunities, it sends a non-blocking signal to wake the
82+
// background goroutine. It must return quickly and must not call into the
83+
// engine to avoid deadlock (see storage.Engine.RegisterFlushCompletedCallback).
84+
func (t *WAGTruncator) DurabilityAdvancedCallback() {
85+
// The key point is the point in time where we call "t.seq.Load()". If the
86+
// sequence number is greater than the last truncated WAG sequence number, we
87+
// will attempt to truncate if there are any WAG nodes that are ready for
88+
// truncation. If it found no WAG nodes that are ready to be truncated, it
89+
// will attempt to truncate on every state engine flush until
90+
// sequence number == lastTruncatedWAGIndex.
91+
seq, lastTruncated := t.seq.Load(), t.lastTruncatedWAGIndex.Load()
92+
if seq == lastTruncated {
93+
return
94+
}
95+
if seq < lastTruncated {
96+
log.KvExec.Fatalf(context.Background(),
97+
"WAG seq %d < lastTruncatedWAGIndex %d", seq, lastTruncated)
98+
}
99+
select {
100+
case t.wakeCh <- struct{}{}:
101+
default:
102+
}
41103
}
42104

43105
// TruncateAll truncates all applied WAG nodes. It's meant to be used at engine
44106
// startup right after we replay the WAG nodes and sync the state engine.
45107
func (t *WAGTruncator) TruncateAll(ctx context.Context) error {
108+
_, err := t.truncateAppliedNodes(ctx, 0)
109+
return err
110+
}
111+
112+
// truncateAppliedNodesLive is called by the background goroutine to truncate
113+
// applied WAG nodes. Unlike TruncateAll, it only truncates nodes with index
114+
// after lastTruncatedWAGIndex. This makes it avoid jumping over gaps in WAG
115+
// node indices.
116+
//
117+
// Note that jumping over gaps during WAG truncation should be safe because a
118+
// WAG node is only truncated if it has been applied to the state engine, and it
119+
// by definition has all the required events applied. However, we avoid jumping
120+
// over gaps so that we know the exact index to truncate next, which facilitates
121+
// seeking past previously deleted WAG garbage.
122+
func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) {
123+
startIndex := t.lastTruncatedWAGIndex.Load() + 1
124+
lastTruncated, err := t.truncateAppliedNodes(ctx, startIndex)
125+
if err != nil {
126+
// TODO(ibrahim): We need to decide if we need to retry truncating this
127+
// WAG node now, or rely on the next state engine flush to do it.
128+
log.KvExec.Errorf(ctx, "truncating WAG node: %+v", err)
129+
}
130+
if lastTruncated >= startIndex {
131+
t.lastTruncatedWAGIndex.Store(lastTruncated)
132+
}
133+
}
134+
135+
// truncateAppliedNodes is a helper function used by both offline and online WAG
136+
// truncation functions. It repeatedly tries to delete a WAG node in a batch and
137+
// commit that batch. When startIndex is 0, it repeatedly truncates the first
138+
// available node (used at startup to drain the WAG). When startIndex is greater
139+
// than 0, it truncates nodes sequentially starting from that index (used during
140+
// online operation to avoid jumping over gaps in WAG indices).
141+
//
142+
// Returns the index of the last successfully truncated node, or 0 if no nodes
143+
// were truncated.
144+
func (t *WAGTruncator) truncateAppliedNodes(
145+
ctx context.Context, startIndex uint64,
146+
) (uint64, error) {
46147
stateReader := t.eng.StateEngine().NewReader(storage.GuaranteedDurability)
47148
defer stateReader.Close()
149+
nextIndex := startIndex
150+
var lastTruncated uint64
48151
for {
49152
if err := ctx.Err(); err != nil {
50-
return err
153+
return lastTruncated, err
51154
}
52155
b := t.eng.LogEngine().NewWriteBatch()
53156
truncated, err := t.truncateAppliedWAGNodeAndClearRaftState(
54-
ctx, Raft{RO: t.eng.LogEngine(), WO: b}, stateReader, 0, /* index */
157+
ctx, Raft{RO: t.eng.LogEngine(), WO: b}, stateReader, nextIndex,
55158
)
56159
if err == nil && truncated {
57160
err = b.Commit(false /* sync */)
58161
}
59162
b.Close()
60163
if err != nil {
61-
return err
164+
return lastTruncated, err
62165
}
63166
if !truncated {
64-
break
167+
return lastTruncated, nil
168+
}
169+
if nextIndex > 0 {
170+
lastTruncated = nextIndex
171+
nextIndex++
65172
}
66173
}
67-
return nil
68174
}
69175

70176
// truncateAppliedWAGNodeAndClearRaftState deletes a WAG node if all of its

pkg/kv/kvserver/kvstorage/wag_truncator_test.go

Lines changed: 109 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package kvstorage
88
import (
99
"context"
1010
"math"
11+
"slices"
1112
"testing"
1213

1314
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -20,8 +21,11 @@ import (
2021
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2122
"github.com/cockroachdb/cockroach/pkg/storage"
2223
"github.com/cockroachdb/cockroach/pkg/storage/fs"
24+
"github.com/cockroachdb/cockroach/pkg/testutils"
2325
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2426
"github.com/cockroachdb/cockroach/pkg/util/log"
27+
"github.com/cockroachdb/cockroach/pkg/util/stop"
28+
"github.com/cockroachdb/errors"
2529
"github.com/stretchr/testify/require"
2630
"golang.org/x/time/rate"
2731
)
@@ -103,6 +107,30 @@ func (e *testEngines) listWAGNodes(t *testing.T) []uint64 {
103107
return indices
104108
}
105109

110+
func eventuallyExpectWAGNodesIndices(t *testing.T, e *testEngines, expected []uint64) {
111+
t.Helper()
112+
testutils.SucceedsSoon(t, func() error {
113+
actual := e.listWAGNodes(t)
114+
if slices.Equal(expected, actual) {
115+
return nil
116+
}
117+
return errors.Newf("expected WAG nodes %v, got %v", expected, actual)
118+
})
119+
}
120+
121+
func eventuallyExpectLastTruncatedWAGIndex(
122+
t *testing.T, truncator *WAGTruncator, expected uint64,
123+
) {
124+
t.Helper()
125+
testutils.SucceedsSoon(t, func() error {
126+
actual := truncator.lastTruncatedWAGIndex.Load()
127+
if actual == expected {
128+
return nil
129+
}
130+
return errors.Newf("expected lastTruncatedWAGIndex %d, got %d", expected, actual)
131+
})
132+
}
133+
106134
func TestTruncateApplied(t *testing.T) {
107135
defer leaktest.AfterTest(t)()
108136
defer log.Scope(t).Close(t)
@@ -240,7 +268,7 @@ func TestTruncateApplied(t *testing.T) {
240268
t.Run("", func(t *testing.T) {
241269
e := makeTestEngines()
242270
defer e.Close()
243-
truncator := NewWAGTruncator(st, e.Engines)
271+
truncator := NewWAGTruncator(st, e.Engines, &e.seq)
244272
tc.setup(t, &e)
245273
require.NoError(t, e.stateEngine.Flush())
246274
require.NoError(t, truncator.TruncateAll(ctx))
@@ -268,7 +296,7 @@ func TestTruncateAndClearRaftState(t *testing.T) {
268296
t.Run(eventType.String(), func(t *testing.T) {
269297
e := makeTestEngines()
270298
defer e.Close()
271-
truncator := NewWAGTruncator(st, e.Engines)
299+
truncator := NewWAGTruncator(st, e.Engines, &e.seq)
272300

273301
// Write WAG nodes: init then destroy/subsume at index 20.
274302
e.writeWAGNode(t, wagpb.Event{
@@ -372,7 +400,7 @@ func TestTruncateGapHandling(t *testing.T) {
372400
t.Run("", func(t *testing.T) {
373401
e := makeTestEngines()
374402
defer e.Close()
375-
truncator := NewWAGTruncator(st, e.Engines)
403+
truncator := NewWAGTruncator(st, e.Engines, &e.seq)
376404

377405
// Write WAG nodes at indices 2, 4, 6.
378406
e.seq.Next()
@@ -412,3 +440,81 @@ func TestTruncateGapHandling(t *testing.T) {
412440
})
413441
}
414442
}
443+
444+
// TestWAGTruncatorBackground verifies that the WAGTruncator background
445+
// goroutine only truncates WAG nodes when both conditions are met: (1) the
446+
// state engine has flushed, and (2) there are WAG nodes to truncate (i.e.,
447+
// seq.Load() > lastTruncatedWAGIndex).
448+
func TestWAGTruncatorBackground(t *testing.T) {
449+
defer leaktest.AfterTest(t)()
450+
defer log.Scope(t).Close(t)
451+
ctx := context.Background()
452+
st := cluster.MakeTestingClusterSettings()
453+
e := makeTestEngines()
454+
defer e.Close()
455+
stopper := stop.NewStopper()
456+
defer stopper.Stop(ctx)
457+
r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1}
458+
sl := MakeStateLoader(r1.RangeID)
459+
460+
// Initialize replica state so events can be considered applied.
461+
require.NoError(t, sl.SetRaftReplicaID(ctx, e.StateEngine(), r1.ReplicaID))
462+
require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(),
463+
&kvserverpb.RangeAppliedState{RaftAppliedIndex: 100}))
464+
465+
// Start the periodic WAG truncation background task.
466+
truncator := NewWAGTruncator(st, e.Engines, &e.seq)
467+
require.NoError(t, truncator.Start(ctx, stopper))
468+
469+
flushStateEngineAndSignal := func() {
470+
require.NoError(t, e.StateEngine().Flush())
471+
truncator.DurabilityAdvancedCallback()
472+
}
473+
474+
// No WAG nodes exist. Flushing the state engine should not cause the
475+
// truncator to do anything (seq.Load() == lastTruncatedWAGIndex == 0).
476+
flushStateEngineAndSignal()
477+
eventuallyExpectWAGNodesIndices(t, &e, nil)
478+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(0))
479+
480+
// Write two WAG nodes whose events are applied (index <= 100).
481+
e.writeWAGNode(t, wagpb.Event{
482+
Addr: wagpb.MakeAddr(r1, 10), Type: wagpb.EventInit,
483+
})
484+
e.writeWAGNode(t, wagpb.Event{
485+
Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply,
486+
})
487+
488+
// WAG nodes exist, but the state engine hasn't flushed yet, so the
489+
// GuaranteedDurability reader won't see the replica state. Signal the
490+
// truncator without flushing first.
491+
truncator.DurabilityAdvancedCallback()
492+
eventuallyExpectWAGNodesIndices(t, &e, []uint64{1, 2})
493+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(0))
494+
495+
// Now flush the state engine and signal again. Both nodes should be
496+
// truncated since their events are applied (index 10 and 20 <= 100).
497+
flushStateEngineAndSignal()
498+
eventuallyExpectWAGNodesIndices(t, &e, nil)
499+
// The lastTruncatedWAGIndex is stored after truncateAppliedNodes returns,
500+
// so there is a small window between the WAG nodes being deleted and the
501+
// index being updated. Use an eventually-consistent check.
502+
eventuallyExpectLastTruncatedWAGIndex(t, truncator, 2)
503+
504+
// Write a third WAG node that is NOT applied (index 200 > 100).
505+
e.writeWAGNode(t, wagpb.Event{
506+
Addr: wagpb.MakeAddr(r1, 200), Type: wagpb.EventApply,
507+
})
508+
flushStateEngineAndSignal()
509+
// Node 3 should remain because its event isn't applied yet.
510+
eventuallyExpectWAGNodesIndices(t, &e, []uint64{3})
511+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(2))
512+
513+
// Advance the applied index past 200 and flush. Now node 3 should be
514+
// truncated.
515+
require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(),
516+
&kvserverpb.RangeAppliedState{RaftAppliedIndex: 200}))
517+
flushStateEngineAndSignal()
518+
eventuallyExpectWAGNodesIndices(t, &e, nil)
519+
eventuallyExpectLastTruncatedWAGIndex(t, truncator, 3)
520+
}

0 commit comments

Comments
 (0)