Skip to content

Commit 5937faa

Browse files
kvstorage: add live WAG truncation
This commit adds the ability to perform live WAG truncation. It keeps track of the last successfully truncated WAG node index, and only attempts to perform a round of WAG truncation if (1) The state engine flushes, and (2) There is a higher WAG sequence number than the last one we truncated. Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
1 parent 7e96c85 commit 5937faa

4 files changed

Lines changed: 217 additions & 9 deletions

File tree

pkg/kv/kvserver/kvstorage/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ go_library(
3737
"//pkg/util/iterutil",
3838
"//pkg/util/log",
3939
"//pkg/util/protoutil",
40+
"//pkg/util/stop",
4041
"//pkg/util/timeutil",
4142
"@com_github_cockroachdb_errors//:errors",
4243
"@org_golang_x_time//rate",

pkg/kv/kvserver/kvstorage/wag/store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ func (s *Seq) Next() uint64 {
6565
return s.index.Add(1)
6666
}
6767

68+
// Load returns the current WAG sequence number without allocating a new one.
69+
func (s *Seq) Load() uint64 {
70+
return s.index.Load()
71+
}
72+
6873
// Write puts the WAG node under the specific sequence number into the given
6974
// writer. The index must have been allocated to the caller by the sequencer.
7075
func Write(w storage.Writer, index uint64, node wagpb.Node) error {

pkg/kv/kvserver/kvstorage/wag_truncator.go

Lines changed: 112 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package kvstorage
88
import (
99
"context"
1010
"math"
11+
"sync/atomic"
1112

1213
"github.com/cockroachdb/cockroach/pkg/keys"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -17,6 +18,8 @@ import (
1718
"github.com/cockroachdb/cockroach/pkg/roachpb"
1819
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1920
"github.com/cockroachdb/cockroach/pkg/storage"
21+
"github.com/cockroachdb/cockroach/pkg/util/log"
22+
"github.com/cockroachdb/cockroach/pkg/util/stop"
2023
"github.com/cockroachdb/errors"
2124
"golang.org/x/time/rate"
2225
)
@@ -28,16 +31,77 @@ import (
2831
// replayed and made durable in the StateEngine.
2932
// - [online] during the normal operation, truncate the WAG nodes that were
3033
// durably applied to the state machine.
31-
//
32-
// TODO(ibrahim): Add the periodic truncation logic.
3334
type WAGTruncator struct {
34-
st *cluster.Settings
35-
eng Engines
35+
stopper *stop.Stopper
36+
st *cluster.Settings
37+
eng Engines
38+
seq *wag.Seq
39+
// wakeCh is signaled when there are potential WAG nodes to truncate.
40+
wakeCh chan struct{}
41+
// lastTruncatedWAGIndex is the index of the last WAG node that was
42+
// successfully truncated. This is to be used by online WAG truncation to
43+
// quickly do nothing if there are no WAG nodes to truncate.
44+
lastTruncatedWAGIndex atomic.Uint64
3645
}
3746

3847
// NewWAGTruncator creates a WAGTruncator.
39-
func NewWAGTruncator(st *cluster.Settings, eng Engines) *WAGTruncator {
40-
return &WAGTruncator{st: st, eng: eng}
48+
func NewWAGTruncator(
49+
stopper *stop.Stopper, st *cluster.Settings, eng Engines, seq *wag.Seq,
50+
) *WAGTruncator {
51+
return &WAGTruncator{
52+
stopper: stopper,
53+
st: st,
54+
eng: eng,
55+
seq: seq,
56+
wakeCh: make(chan struct{}, 1),
57+
}
58+
}
59+
60+
// Start launches the background goroutine that performs live WAG truncation.
61+
// It must be called before starting the engine. It sets lastTruncatedWAGIndex,
62+
// and we need to set it before any WAG node could have been created so that we
63+
// guarantee that the periodic WAG truncation will start from the first WAG node.
64+
func (t *WAGTruncator) Start(ctx context.Context) error {
65+
t.lastTruncatedWAGIndex.Store(t.seq.Load())
66+
return t.stopper.RunAsyncTask(ctx, "wag-truncation", func(ctx context.Context) {
67+
ctx, cancel := t.stopper.WithCancelOnQuiesce(ctx)
68+
defer cancel()
69+
for {
70+
select {
71+
case <-t.wakeCh:
72+
t.truncateAppliedNodesLive(ctx)
73+
case <-ctx.Done():
74+
return
75+
}
76+
}
77+
})
78+
}
79+
80+
// DurabilityAdvancedCallback is invoked whenever the state engine completes a
81+
// flush. It checks whether there are WAG nodes to truncate by comparing
82+
// lastTruncatedWAGIndex against seq.Load(). If there are potential truncation
83+
// opportunities, it sends a non-blocking signal to wake the background
84+
// goroutine. It must return quickly and must not call into the engine to avoid
85+
// deadlock (see storage.Engine.RegisterFlushCompletedCallback).
86+
func (t *WAGTruncator) DurabilityAdvancedCallback() {
87+
// The key point is the point in time where we call "t.seq.Load()". If the
88+
// sequence number is greater than the last truncated WAG sequence number, we
89+
// will attempt to truncate that specific WAG node (and all the WAG nodes
90+
// sequentially after it). If the sequence number == lastTruncatedWAGIndex,
91+
// then there were no WAG nodes to truncate at that time. We will attempt
92+
// again when the state engine completes another flush.
93+
seq, lastTruncated := t.seq.Load(), t.lastTruncatedWAGIndex.Load()
94+
if seq < lastTruncated {
95+
log.KvExec.Fatalf(context.Background(),
96+
"WAG seq %d < lastTruncatedWAGIndex %d", seq, lastTruncated)
97+
}
98+
if seq == lastTruncated {
99+
return
100+
}
101+
select {
102+
case t.wakeCh <- struct{}{}:
103+
default:
104+
}
41105
}
42106

43107
// TruncateAll truncates all applied WAG nodes. It's meant to be used at engine
@@ -133,6 +197,48 @@ func (t *WAGTruncator) truncateAppliedWAGNodeAndClearRaftState(
133197
return false, iter.Error()
134198
}
135199

200+
// truncateAppliedNodesLive is called by the background goroutine to truncate
201+
// applied WAG nodes. Unlike TruncateAll, it only truncates nodes with index
202+
// after lastTruncatedWAGIndex. This makes it avoid jumping over gaps in WAG
203+
// node indices.
204+
//
205+
// Note that jumping over gaps during WAG truncation should be safe because a
206+
// WAG node is only truncated if it has been applied to the state engine, and it
207+
// by definition has all the required events applied. However, we avoid jumping
208+
// over gaps so that we know the exact index to truncate next, which facilitates
209+
// seeking past previously deleted WAG garbage.
210+
func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) {
211+
stateReader := t.eng.StateEngine().NewReader(storage.GuaranteedDurability)
212+
defer stateReader.Close()
213+
for {
214+
if err := ctx.Err(); err != nil {
215+
return
216+
}
217+
nextIndex := t.lastTruncatedWAGIndex.Load() + 1
218+
b := t.eng.LogEngine().NewWriteBatch()
219+
truncated, err := t.truncateAppliedWAGNodeAndClearRaftState(
220+
ctx,
221+
Raft{RO: t.eng.LogEngine(), WO: b},
222+
stateReader,
223+
nextIndex,
224+
)
225+
if err == nil && truncated {
226+
err = b.Commit(false /* sync */)
227+
}
228+
b.Close()
229+
if err != nil {
230+
// TODO(ibrahim): We need to decide if we need to retry truncating this
231+
// WAG node now, or rely on the next state engine flush to do it.
232+
log.KvExec.Errorf(ctx, "truncating WAG node %d: %+v", nextIndex, err)
233+
return
234+
}
235+
if !truncated {
236+
return
237+
}
238+
t.lastTruncatedWAGIndex.Store(nextIndex)
239+
}
240+
}
241+
136242
// clearReplicaRaftLogAndSideloaded clears raft log entries at or below the given index for
137243
// a destroyed or subsumed replica, and it also deletes the sideloaded files associated with the
138244
// deleted entries.

pkg/kv/kvserver/kvstorage/wag_truncator_test.go

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ package kvstorage
77

88
import (
99
"context"
10+
"fmt"
1011
"math"
12+
"slices"
1113
"testing"
1214

1315
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -20,8 +22,10 @@ import (
2022
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2123
"github.com/cockroachdb/cockroach/pkg/storage"
2224
"github.com/cockroachdb/cockroach/pkg/storage/fs"
25+
"github.com/cockroachdb/cockroach/pkg/testutils"
2326
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2427
"github.com/cockroachdb/cockroach/pkg/util/log"
28+
"github.com/cockroachdb/cockroach/pkg/util/stop"
2529
"github.com/stretchr/testify/require"
2630
"golang.org/x/time/rate"
2731
)
@@ -103,10 +107,23 @@ func (e *testEngines) listWAGNodes(t *testing.T) []uint64 {
103107
return indices
104108
}
105109

110+
func eventuallyExpectWAGNodesIndices(t *testing.T, e *testEngines, expected []uint64) {
111+
t.Helper()
112+
testutils.SucceedsSoon(t, func() error {
113+
actual := e.listWAGNodes(t)
114+
if slices.Equal(expected, actual) {
115+
return nil
116+
}
117+
return fmt.Errorf("expected WAG nodes %v, got %v", expected, actual)
118+
})
119+
}
120+
106121
func TestTruncateApplied(t *testing.T) {
107122
defer leaktest.AfterTest(t)()
108123
defer log.Scope(t).Close(t)
109124
ctx := context.Background()
125+
stopper := stop.NewStopper()
126+
defer stopper.Stop(ctx)
110127
st := cluster.MakeTestingClusterSettings()
111128

112129
r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1}
@@ -240,7 +257,7 @@ func TestTruncateApplied(t *testing.T) {
240257
t.Run("", func(t *testing.T) {
241258
e := makeTestEngines()
242259
defer e.Close()
243-
truncator := NewWAGTruncator(st, e.Engines)
260+
truncator := NewWAGTruncator(stopper, st, e.Engines, &e.seq)
244261
tc.setup(t, &e)
245262
require.NoError(t, e.stateEngine.Flush())
246263
require.NoError(t, truncator.TruncateAll(ctx))
@@ -258,6 +275,8 @@ func TestTruncateAndClearRaftState(t *testing.T) {
258275
defer leaktest.AfterTest(t)()
259276
defer log.Scope(t).Close(t)
260277
ctx := context.Background()
278+
stopper := stop.NewStopper()
279+
defer stopper.Stop(ctx)
261280
st := cluster.MakeTestingClusterSettings()
262281

263282
r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1}
@@ -268,7 +287,7 @@ func TestTruncateAndClearRaftState(t *testing.T) {
268287
t.Run(eventType.String(), func(t *testing.T) {
269288
e := makeTestEngines()
270289
defer e.Close()
271-
truncator := NewWAGTruncator(st, e.Engines)
290+
truncator := NewWAGTruncator(stopper, st, e.Engines, &e.seq)
272291

273292
// Write WAG nodes: init then destroy/subsume at index 20.
274293
e.writeWAGNode(t, wagpb.Event{
@@ -339,6 +358,8 @@ func TestTruncateGapHandling(t *testing.T) {
339358
defer leaktest.AfterTest(t)()
340359
defer log.Scope(t).Close(t)
341360
ctx := context.Background()
361+
stopper := stop.NewStopper()
362+
defer stopper.Stop(ctx)
342363
st := cluster.MakeTestingClusterSettings()
343364

344365
r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1}
@@ -372,7 +393,7 @@ func TestTruncateGapHandling(t *testing.T) {
372393
t.Run("", func(t *testing.T) {
373394
e := makeTestEngines()
374395
defer e.Close()
375-
truncator := NewWAGTruncator(st, e.Engines)
396+
truncator := NewWAGTruncator(stopper, st, e.Engines, &e.seq)
376397

377398
// Write WAG nodes at indices 2, 4, 6.
378399
e.seq.Next()
@@ -412,3 +433,78 @@ func TestTruncateGapHandling(t *testing.T) {
412433
})
413434
}
414435
}
436+
437+
// TestWAGTruncatorBackground verifies that the WAGTruncator background
438+
// goroutine only truncates WAG nodes when both conditions are met: (1) the
439+
// state engine has flushed, and (2) there are WAG nodes to truncate (i.e.,
440+
// seq.Load() > lastTruncatedWAGIndex).
441+
func TestWAGTruncatorBackground(t *testing.T) {
442+
defer leaktest.AfterTest(t)()
443+
defer log.Scope(t).Close(t)
444+
ctx := context.Background()
445+
st := cluster.MakeTestingClusterSettings()
446+
e := makeTestEngines()
447+
defer e.Close()
448+
stopper := stop.NewStopper()
449+
defer stopper.Stop(ctx) // must run before e.Close() (LIFO)
450+
r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1}
451+
sl := MakeStateLoader(r1.RangeID)
452+
453+
// Initialize replica state so events can be considered applied.
454+
require.NoError(t, sl.SetRaftReplicaID(ctx, e.StateEngine(), r1.ReplicaID))
455+
require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(),
456+
&kvserverpb.RangeAppliedState{RaftAppliedIndex: 100}))
457+
458+
// Start the periodic WAG truncation background task.
459+
truncator := NewWAGTruncator(stopper, st, e.Engines, &e.seq)
460+
require.NoError(t, truncator.Start(ctx))
461+
462+
flushStateEngineAndSignal := func() {
463+
require.NoError(t, e.StateEngine().Flush())
464+
truncator.DurabilityAdvancedCallback()
465+
}
466+
467+
// No WAG nodes exist. Flushing the state engine should not cause the
468+
// truncator to do anything (seq.Load() == lastTruncatedWAGIndex == 0).
469+
flushStateEngineAndSignal()
470+
eventuallyExpectWAGNodesIndices(t, &e, nil)
471+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(0))
472+
473+
// Write two WAG nodes whose events are applied (index <= 100).
474+
e.writeWAGNode(t, wagpb.Event{
475+
Addr: wagpb.MakeAddr(r1, 10), Type: wagpb.EventInit,
476+
})
477+
e.writeWAGNode(t, wagpb.Event{
478+
Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply,
479+
})
480+
481+
// WAG nodes exist, but the state engine hasn't flushed yet, so the
482+
// GuaranteedDurability reader won't see the replica state. Signal the
483+
// truncator without flushing first.
484+
truncator.DurabilityAdvancedCallback()
485+
eventuallyExpectWAGNodesIndices(t, &e, []uint64{1, 2})
486+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(0))
487+
488+
// Now flush the state engine and signal again. Both nodes should be
489+
// truncated since their events are applied (index 10 and 20 <= 100).
490+
flushStateEngineAndSignal()
491+
eventuallyExpectWAGNodesIndices(t, &e, nil)
492+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(2))
493+
494+
// Write a third WAG node that is NOT applied (index 200 > 100).
495+
e.writeWAGNode(t, wagpb.Event{
496+
Addr: wagpb.MakeAddr(r1, 200), Type: wagpb.EventApply,
497+
})
498+
flushStateEngineAndSignal()
499+
// Node 3 should remain because its event isn't applied yet.
500+
eventuallyExpectWAGNodesIndices(t, &e, []uint64{3})
501+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(2))
502+
503+
// Advance the applied index past 200 and flush. Now node 3 should be
504+
// truncated.
505+
require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(),
506+
&kvserverpb.RangeAppliedState{RaftAppliedIndex: 200}))
507+
flushStateEngineAndSignal()
508+
eventuallyExpectWAGNodesIndices(t, &e, nil)
509+
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(3))
510+
}

0 commit comments

Comments
 (0)