Skip to content

Commit fe777bb

Browse files
kvstorage: Introduce WAGTruncator Testing Knobs
This commit introduces TestingKnobs to the WAGTruncator, and adds AfterTruncationCallback() that is called after truncation pass has finished. Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
1 parent a535d4f commit fe777bb

3 files changed

Lines changed: 49 additions & 46 deletions

File tree

pkg/kv/kvserver/kvstorage/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ go_test(
8989
"//pkg/util/tracing",
9090
"//pkg/util/uuid",
9191
"@com_github_cockroachdb_datadriven//:datadriven",
92-
"@com_github_cockroachdb_errors//:errors",
9392
"@com_github_stretchr_testify//require",
9493
"@org_golang_x_time//rate",
9594
],

pkg/kv/kvserver/kvstorage/wag_truncator.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@ import (
2424
"golang.org/x/time/rate"
2525
)
2626

27+
// WAGTruncatorTestingKnobs contains testing knobs for the WAGTruncator.
28+
type WAGTruncatorTestingKnobs struct {
29+
// AfterTruncationCallback, if set, is called after each invocation of
30+
// truncateAppliedNodesLive completes. Can be used to synchronize tests
31+
// with the background truncation goroutine.
32+
AfterTruncationCallback func()
33+
}
34+
2735
// WAGTruncator truncates applied WAG nodes and clears their associated raft
2836
// state (log entries and sideloaded files).
2937
type WAGTruncator struct {
@@ -40,18 +48,24 @@ type WAGTruncator struct {
4048
lastTruncatedWAGIndex atomic.Uint64
4149
// wakeCh is signaled when there are potential WAG nodes to truncate.
4250
wakeCh chan struct{}
51+
knobs WAGTruncatorTestingKnobs
4352
}
4453

4554
// NewWAGTruncator creates a WAGTruncator.
4655
func NewWAGTruncator(
47-
st *cluster.Settings, eng Engines, seq *wag.Seq, lastIndexAfterStartup uint64,
56+
st *cluster.Settings,
57+
eng Engines,
58+
seq *wag.Seq,
59+
lastIndexAfterStartup uint64,
60+
knobs WAGTruncatorTestingKnobs,
4861
) *WAGTruncator {
4962
return &WAGTruncator{
5063
st: st,
5164
eng: eng,
5265
seq: seq,
5366
lastWAGIndexBeforeStartup: lastIndexAfterStartup,
5467
wakeCh: make(chan struct{}, 1),
68+
knobs: knobs,
5569
}
5670
}
5771

@@ -105,6 +119,11 @@ func (t *WAGTruncator) DurabilityAdvancedCallback() {
105119
// truncateAppliedNodesLive is called by the background goroutine to truncate
106120
// applied WAG nodes.
107121
func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) {
122+
defer func() {
123+
if t.knobs.AfterTruncationCallback != nil {
124+
t.knobs.AfterTruncationCallback()
125+
}
126+
}()
108127
startIndex := t.lastTruncatedWAGIndex.Load() + 1
109128
if startIndex <= t.lastWAGIndexBeforeStartup {
110129
// There are WAG nodes that we can potentially truncate with

pkg/kv/kvserver/kvstorage/wag_truncator_test.go

Lines changed: 29 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package kvstorage
88
import (
99
"context"
1010
"math"
11-
"slices"
1211
"testing"
1312

1413
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -21,11 +20,9 @@ import (
2120
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2221
"github.com/cockroachdb/cockroach/pkg/storage"
2322
"github.com/cockroachdb/cockroach/pkg/storage/fs"
24-
"github.com/cockroachdb/cockroach/pkg/testutils"
2523
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2624
"github.com/cockroachdb/cockroach/pkg/util/log"
2725
"github.com/cockroachdb/cockroach/pkg/util/stop"
28-
"github.com/cockroachdb/errors"
2926
"github.com/stretchr/testify/require"
3027
"golang.org/x/time/rate"
3128
)
@@ -107,28 +104,6 @@ func (e *testEngines) listWAGNodes(t *testing.T) []uint64 {
107104
return indices
108105
}
109106

110-
func eventuallyExpectWAGNodesIndices(t *testing.T, e *testEngines, expected []uint64) {
111-
t.Helper()
112-
testutils.SucceedsSoon(t, func() error {
113-
actual := e.listWAGNodes(t)
114-
if slices.Equal(expected, actual) {
115-
return nil
116-
}
117-
return errors.Newf("expected WAG nodes %v, got %v", expected, actual)
118-
})
119-
}
120-
121-
func eventuallyExpectLastTruncatedWAGIndex(t *testing.T, truncator *WAGTruncator, expected uint64) {
122-
t.Helper()
123-
testutils.SucceedsSoon(t, func() error {
124-
actual := truncator.lastTruncatedWAGIndex.Load()
125-
if actual == expected {
126-
return nil
127-
}
128-
return errors.Newf("expected lastTruncatedWAGIndex %d, got %d", expected, actual)
129-
})
130-
}
131-
132107
func TestTruncateApplied(t *testing.T) {
133108
defer leaktest.AfterTest(t)()
134109
defer log.Scope(t).Close(t)
@@ -266,7 +241,7 @@ func TestTruncateApplied(t *testing.T) {
266241
t.Run("", func(t *testing.T) {
267242
e := makeTestEngines()
268243
defer e.Close()
269-
truncator := NewWAGTruncator(st, e.Engines, &e.seq, 0)
244+
truncator := NewWAGTruncator(st, e.Engines, &e.seq, 0, WAGTruncatorTestingKnobs{})
270245
tc.setup(t, &e)
271246
require.NoError(t, e.stateEngine.Flush())
272247
_, err := truncator.truncateAppliedNodes(ctx, 0, /* startIndex */
@@ -296,7 +271,7 @@ func TestTruncateAndClearRaftState(t *testing.T) {
296271
t.Run(eventType.String(), func(t *testing.T) {
297272
e := makeTestEngines()
298273
defer e.Close()
299-
truncator := NewWAGTruncator(st, e.Engines, &e.seq, 0)
274+
truncator := NewWAGTruncator(st, e.Engines, &e.seq, 0, WAGTruncatorTestingKnobs{})
300275

301276
// Write WAG nodes: init then destroy/subsume at index 20.
302277
e.writeWAGNode(t, wagpb.Event{
@@ -428,7 +403,7 @@ func TestTruncateGapHandling(t *testing.T) {
428403
t.Run("", func(t *testing.T) {
429404
e := makeTestEngines()
430405
defer e.Close()
431-
truncator := NewWAGTruncator(st, e.Engines, &e.seq, 0)
406+
truncator := NewWAGTruncator(st, e.Engines, &e.seq, 0, WAGTruncatorTestingKnobs{})
432407

433408
// Write WAG nodes at indices 2, 4, 6.
434409
e.seq.Next()
@@ -551,7 +526,7 @@ func TestTruncateAppliedNodes(t *testing.T) {
551526
t.Run("", func(t *testing.T) {
552527
e := makeTestEngines()
553528
defer e.Close()
554-
truncator := NewWAGTruncator(st, e.Engines, &e.seq, 0)
529+
truncator := NewWAGTruncator(st, e.Engines, &e.seq, 0, WAGTruncatorTestingKnobs{})
555530

556531
// Write WAG nodes at indices 2, 4, 5.
557532
e.seq.Next()
@@ -613,23 +588,33 @@ func TestWAGTruncatorBackground(t *testing.T) {
613588
Addr: wagpb.MakeAddr(r1, 20), Type: wagpb.EventApply,
614589
})
615590

616-
// Start the periodic WAG truncation background task.
617-
truncator := NewWAGTruncator(st, e.Engines, &e.seq, e.seq.Load() /* lastIndexAfterStartup */)
591+
// Start the periodic WAG truncation background task with a knob that
592+
// signals when truncation completes, so the test can synchronize
593+
// deterministically instead of polling.
594+
truncationDone := make(chan struct{}, 1)
595+
truncator := NewWAGTruncator(st, e.Engines, &e.seq,
596+
e.seq.Load(), /* lastIndexAfterStartup */
597+
WAGTruncatorTestingKnobs{
598+
AfterTruncationCallback: func() {
599+
truncationDone <- struct{}{}
600+
},
601+
})
618602
require.NoError(t, truncator.Start(ctx, stopper))
619603
// Create a WAG node after startup.
620604
e.writeWAGNode(t, wagpb.Event{
621605
Addr: wagpb.MakeAddr(r1, 30), Type: wagpb.EventApply,
622606
})
623607
require.Equal(t, []uint64{1, 3, 4}, e.listWAGNodes(t))
624608

625-
flushStateEngineAndSignal := func() {
609+
flushAndWaitForTruncation := func() {
626610
require.NoError(t, e.StateEngine().Flush())
627611
truncator.DurabilityAdvancedCallback()
612+
<-truncationDone
628613
}
629614
// We expect all WAG nodes to be truncated when the state engine is flushed.
630-
flushStateEngineAndSignal()
631-
eventuallyExpectWAGNodesIndices(t, &e, nil)
632-
eventuallyExpectLastTruncatedWAGIndex(t, truncator, 4)
615+
flushAndWaitForTruncation()
616+
require.Equal(t, ([]uint64)(nil), e.listWAGNodes(t))
617+
require.Equal(t, uint64(4), truncator.lastTruncatedWAGIndex.Load())
633618

634619
// Write two WAG nodes whose events are applied (index <= 100).
635620
e.writeWAGNode(t, wagpb.Event{
@@ -642,24 +627,24 @@ func TestWAGTruncatorBackground(t *testing.T) {
642627

643628
// Now flush the state engine and signal again. Both nodes should be
644629
// truncated since their events are applied.
645-
flushStateEngineAndSignal()
646-
eventuallyExpectWAGNodesIndices(t, &e, nil)
647-
eventuallyExpectLastTruncatedWAGIndex(t, truncator, 6)
630+
flushAndWaitForTruncation()
631+
require.Equal(t, ([]uint64)(nil), e.listWAGNodes(t))
632+
require.Equal(t, uint64(6), truncator.lastTruncatedWAGIndex.Load())
648633

649634
// Write another WAG node but it is NOT applied yet.
650635
e.writeWAGNode(t, wagpb.Event{
651636
Addr: wagpb.MakeAddr(r1, 200), Type: wagpb.EventApply,
652637
})
653-
flushStateEngineAndSignal()
638+
flushAndWaitForTruncation()
654639
// Node 7 should remain because its event isn't applied yet.
655-
eventuallyExpectWAGNodesIndices(t, &e, []uint64{7})
656-
require.Equal(t, truncator.lastTruncatedWAGIndex.Load(), uint64(6))
640+
require.Equal(t, []uint64{7}, e.listWAGNodes(t))
641+
require.Equal(t, uint64(6), truncator.lastTruncatedWAGIndex.Load())
657642

658643
// Advance the applied index past 200 and flush. Now node 7 should be
659644
// truncated.
660645
require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(),
661646
&kvserverpb.RangeAppliedState{RaftAppliedIndex: 200}))
662-
flushStateEngineAndSignal()
663-
eventuallyExpectWAGNodesIndices(t, &e, nil)
664-
eventuallyExpectLastTruncatedWAGIndex(t, truncator, 7)
647+
flushAndWaitForTruncation()
648+
require.Equal(t, ([]uint64)(nil), e.listWAGNodes(t))
649+
require.Equal(t, uint64(7), truncator.lastTruncatedWAGIndex.Load())
665650
}

0 commit comments

Comments
 (0)