Skip to content

Commit 55a5cbd

Browse files
committed
kvstorage: batch WAG node deletions
Previously, truncateAppliedWAGNodeAndClearRaftState deleted one WAG node per batch and committed immediately. This commit does the following: 1) Rename truncateAppliedWAGNodeAndClearRaftState() to truncateBatch(). 2) Introduce a cluster-setting that controls the batch size. 3) Try to fit up-to batchSize deletion in each call to truncateBatch(). Benchmark results: ``` BenchmarkWAGTruncation/batchSize=1 247114 15261 ns/op 1002 B/op 20 allocs/op BenchmarkWAGTruncation/batchSize=4 396939 5533 ns/op 608 B/op 11 allocs/op BenchmarkWAGTruncation/batchSize=8 452824 5220 ns/op 549 B/op 10 allocs/op BenchmarkWAGTruncation/batchSize=16 447286 4400 ns/op 505 B/op 9 allocs/op BenchmarkWAGTruncation/batchSize=32 512752 2402 ns/op 503 B/op 9 allocs/op BenchmarkWAGTruncation/batchSize=64 484096 3737 ns/op 481 B/op 9 allocs/op ``` Release note: None Epic: none
1 parent 7ffd286 commit 55a5cbd

3 files changed

Lines changed: 156 additions & 23 deletions

File tree

pkg/kv/kvserver/kvstorage/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ go_library(
2929
"//pkg/kv/kvserver/spanset",
3030
"//pkg/raft/raftpb",
3131
"//pkg/roachpb",
32+
"//pkg/settings",
3233
"//pkg/settings/cluster",
3334
"//pkg/storage",
3435
"//pkg/storage/enginepb",

pkg/kv/kvserver/kvstorage/wag_truncator.go

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag/wagpb"
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/settings"
1920
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2021
"github.com/cockroachdb/cockroach/pkg/storage"
2122
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -24,6 +25,14 @@ import (
2425
"golang.org/x/time/rate"
2526
)
2627

28+
var wagTruncatorBatchSize = settings.RegisterIntSetting(
29+
settings.SystemOnly,
30+
"kv.wag.truncator_batch_size",
31+
"number of WAG nodes to delete per write batch during truncation",
32+
8,
33+
settings.IntInRange(1, 1024),
34+
)
35+
2736
// WAGTruncatorTestingKnobs contains testing knobs for the WAGTruncator.
2837
type WAGTruncatorTestingKnobs struct {
2938
// AfterTruncationCallback is called after each truncation attempt.
@@ -118,39 +127,39 @@ func (t *WAGTruncator) truncateAppliedNodes(ctx context.Context) error {
118127
stateReader := t.eng.StateEngine().NewReader(storage.GuaranteedDurability)
119128
defer stateReader.Close()
120129
for {
121-
truncated, err := t.truncateAppliedWAGNodeAndClearRaftState(ctx, stateReader)
130+
truncated, err := t.truncateBatch(ctx, stateReader)
122131
if err != nil || !truncated {
123132
return err
124133
}
125134
}
126135
}
127136

128-
// truncateAppliedWAGNodeAndClearRaftState deletes the first WAG node if all of
129-
// its events have been applied to the state engine. For nodes containing
137+
// truncateBatch deletes up to a batch-sized prefix of WAG nodes if all of
138+
// their events have been applied to the state engine. For nodes containing
130139
// EventDestroy or EventSubsume events, it also clears the corresponding raft
131-
// log prefix from the engine and the sideloaded entries storage.
140+
// log prefix from the engine and the sideloaded entries.
132141
//
133-
// Returns a bool indicating whether a WAG node was deleted or not.
142+
// Returns a bool indicating whether some WAG nodes were deleted or not.
134143
//
135144
// The caller must provide a stateRO reader with GuaranteedDurability so that
136145
// only state confirmed flushed to persistent storage is visible. This ensures
137146
// we never delete a WAG node whose mutations aren't flushed yet. The caller is
138147
// also responsible for creating and committing/closing the write batch in
139148
// raft.WO.
140-
// TODO(ibrahim): Support deleting multiple WAG nodes within the same batch.
141-
func (t *WAGTruncator) truncateAppliedWAGNodeAndClearRaftState(
142-
ctx context.Context, stateRO StateRO,
143-
) (bool, error) {
149+
func (t *WAGTruncator) truncateBatch(ctx context.Context, stateRO StateRO) (bool, error) {
150+
batchSize := wagTruncatorBatchSize.Get(&t.st.SV)
151+
var count int64
144152
var iter wag.Iterator
145-
truncateIndex := t.truncIndex.Load() + 1
146-
iterStartKey := keys.StoreWAGNodeKey(truncateIndex)
153+
targetIndex := t.truncIndex.Load() + 1
154+
147155
b := t.eng.LogEngine().NewWriteBatch()
148156
defer b.Close()
149-
150-
for index, node := range iter.IterFrom(ctx, t.eng.LogEngine(), iterStartKey) {
151-
if index != truncateIndex && index > t.initIndex {
157+
for index, node := range iter.IterFrom(
158+
ctx, t.eng.LogEngine(), keys.StoreWAGNodeKey(targetIndex),
159+
) {
160+
if index != targetIndex && index > t.initIndex {
152161
// We cannot ignore gaps for WAG indices > initIndex.
153-
return false, nil
162+
break
154163
}
155164
// TODO(ibrahim): Right now, the canApplyWAGNode function returns a list of
156165
// raftCatchUpTargets that are not needed for the purposes of truncation,
@@ -161,7 +170,7 @@ func (t *WAGTruncator) truncateAppliedWAGNodeAndClearRaftState(
161170
}
162171
if action.apply {
163172
// If an event needs to be applied, the WAG node cannot be deleted yet.
164-
return false, nil
173+
break
165174
}
166175
if err := wag.Delete(b, index); err != nil {
167176
return false, err
@@ -178,13 +187,23 @@ func (t *WAGTruncator) truncateAppliedWAGNodeAndClearRaftState(
178187
}
179188
}
180189

181-
if err = b.Commit(false); err != nil {
182-
return false, err
190+
targetIndex = index + 1
191+
count++
192+
if count >= batchSize {
193+
break
183194
}
184-
t.truncIndex.Store(index)
185-
return true, nil
186195
}
187-
return false, iter.Error() // either no more WAG nodes or iter hit an error
196+
if count == 0 {
197+
return false, nil
198+
}
199+
if err := iter.Error(); err != nil {
200+
return false, err
201+
}
202+
if err := b.Commit(false); err != nil {
203+
return false, err
204+
}
205+
t.truncIndex.Store(targetIndex - 1) // targetIndex is pointing at the last index truncated + 1.
206+
return true, nil
188207
}
189208

190209
// clearReplicaRaftLogAndSideloaded clears raft log entries at or below the given index for

pkg/kv/kvserver/kvstorage/wag_truncator_test.go

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kvstorage
77

88
import (
99
"context"
10+
"fmt"
1011
"math"
1112
"testing"
1213

@@ -55,12 +56,12 @@ func (e *testEngines) writeWAGNode(t *testing.T, event wagpb.Event) {
5556

5657
// writeWAGNodesAt writes WAG nodes at the specified WAG sequence indices for
5758
// the given replica. Each node contains an EventApply event with its raft index
58-
// set to the WAG entry index + 10.
59+
// set to the WAG entry index + 1.
5960
func (e *testEngines) writeWAGNodesAt(t *testing.T, wagIndices []uint64, r roachpb.FullReplicaID) {
6061
t.Helper()
6162
for idx, wagIdx := range wagIndices {
6263
event := wagpb.Event{
63-
Addr: wagpb.MakeAddr(r, kvpb.RaftIndex(idx+10)),
64+
Addr: wagpb.MakeAddr(r, kvpb.RaftIndex(idx+1)),
6465
Type: wagpb.EventApply,
6566
}
6667
require.NoError(t, wag.Write(
@@ -462,3 +463,115 @@ func TestWAGTruncatorBackground(t *testing.T) {
462463
require.Empty(t, e.listWAGNodes(t))
463464
require.Equal(t, uint64(7), truncator.truncIndex.Load())
464465
}
466+
467+
// TestTruncateBatching verifies that truncateBatch() respects the batch size
468+
// setting.
469+
//
470+
// WAG layout: indices [3, 7, 10, 11, 12], and node at index 12 is not durably
471+
// applied.
472+
func TestTruncateBatching(t *testing.T) {
473+
defer leaktest.AfterTest(t)()
474+
defer log.Scope(t).Close(t)
475+
ctx := context.Background()
476+
r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1}
477+
sl := MakeStateLoader(r1.RangeID)
478+
wagNodeIndices := []uint64{3, 7, 10, 11, 12}
479+
for _, tc := range []struct {
480+
initIndex uint64
481+
batchSize int64
482+
wantTruncated bool
483+
wantRemaining []uint64
484+
wantTruncIndex uint64
485+
}{
486+
{initIndex: 0, batchSize: 1, wantTruncated: false, wantRemaining: []uint64{3, 7, 10, 11, 12}, wantTruncIndex: 0},
487+
{initIndex: 0, batchSize: 8, wantTruncated: false, wantRemaining: []uint64{3, 7, 10, 11, 12}, wantTruncIndex: 0},
488+
{initIndex: 3, batchSize: 1, wantTruncated: true, wantRemaining: []uint64{7, 10, 11, 12}, wantTruncIndex: 3},
489+
{initIndex: 3, batchSize: 8, wantTruncated: true, wantRemaining: []uint64{7, 10, 11, 12}, wantTruncIndex: 3},
490+
{initIndex: 7, batchSize: 1, wantTruncated: true, wantRemaining: []uint64{7, 10, 11, 12}, wantTruncIndex: 3},
491+
{initIndex: 7, batchSize: 2, wantTruncated: true, wantRemaining: []uint64{10, 11, 12}, wantTruncIndex: 7},
492+
{initIndex: 7, batchSize: 8, wantTruncated: true, wantRemaining: []uint64{10, 11, 12}, wantTruncIndex: 7},
493+
{initIndex: 10, batchSize: 1, wantTruncated: true, wantRemaining: []uint64{7, 10, 11, 12}, wantTruncIndex: 3},
494+
{initIndex: 10, batchSize: 4, wantTruncated: true, wantRemaining: []uint64{12}, wantTruncIndex: 11},
495+
// Node 11 isn't applied yet, so it's not truncated.
496+
{initIndex: 10, batchSize: 8, wantTruncated: true, wantRemaining: []uint64{12}, wantTruncIndex: 11},
497+
} {
498+
t.Run("", func(t *testing.T) {
499+
st := cluster.MakeTestingClusterSettings()
500+
wagTruncatorBatchSize.Override(ctx, &st.SV, tc.batchSize)
501+
e := makeTestEngines()
502+
defer e.Close()
503+
e.writeWAGNodesAt(t, wagNodeIndices, r1)
504+
truncator := NewWAGTruncator(st, e.Engines, &e.seq)
505+
truncator.initIndex = tc.initIndex
506+
507+
require.NoError(t, sl.SetRaftReplicaID(ctx, e.StateEngine(), r1.ReplicaID))
508+
// Ensure that the last WAG node isn't applied.
509+
require.NoError(t, sl.SetRangeAppliedState(ctx, e.StateEngine(),
510+
&kvserverpb.RangeAppliedState{RaftAppliedIndex: kvpb.RaftIndex(len(wagNodeIndices) - 1)}))
511+
require.NoError(t, e.stateEngine.Flush())
512+
513+
stateReader := e.StateEngine().NewReader(storage.GuaranteedDurability)
514+
defer stateReader.Close()
515+
truncated, err := truncator.truncateBatch(ctx, stateReader)
516+
require.NoError(t, err)
517+
require.Equal(t, tc.wantTruncated, truncated)
518+
require.Equal(t, tc.wantRemaining, e.listWAGNodes(t))
519+
require.Equal(t, tc.wantTruncIndex, truncator.truncIndex.Load())
520+
})
521+
}
522+
}
523+
524+
// BenchmarkWAGTruncation measures the cost of truncating WAG nodes at different
525+
// batch sizes. It uses an in-memory engine so it doesn't really test the real
526+
// thing, but it should give an idea of the improvement of different batch
527+
// sizes.
528+
func BenchmarkWAGTruncation(b *testing.B) {
529+
defer log.Scope(b).Close(b)
530+
ctx := context.Background()
531+
r1 := roachpb.FullReplicaID{RangeID: 1, ReplicaID: 1}
532+
sl := MakeStateLoader(r1.RangeID)
533+
for _, batchSize := range []int64{1, 4, 8, 16, 32, 64} {
534+
b.Run(fmt.Sprintf("batchSize=%d", batchSize), func(b *testing.B) {
535+
b.StopTimer()
536+
st := cluster.MakeTestingClusterSettings()
537+
wagTruncatorBatchSize.Override(ctx, &st.SV, batchSize)
538+
eng := storage.NewDefaultInMemForTesting()
539+
defer eng.Close()
540+
engines := MakeEngines(eng)
541+
var seq wag.Seq
542+
truncator := NewWAGTruncator(st, engines, &seq)
543+
544+
// Write numNodes WAG nodes that are all eligible for
545+
// truncation.
546+
for j := 0; j < b.N; j++ {
547+
index := seq.Next()
548+
if err := wag.Write(eng, index, wagpb.Node{
549+
Events: []wagpb.Event{{
550+
Addr: wagpb.MakeAddr(r1, kvpb.RaftIndex(j+1)),
551+
Type: wagpb.EventApply,
552+
}},
553+
}); err != nil {
554+
b.Fatal(err)
555+
}
556+
}
557+
if err := sl.SetRaftReplicaID(ctx, eng, r1.ReplicaID); err != nil {
558+
b.Fatal(err)
559+
}
560+
if err := sl.SetRangeAppliedState(ctx, eng,
561+
&kvserverpb.RangeAppliedState{
562+
RaftAppliedIndex: kvpb.RaftIndex(b.N + 1),
563+
}); err != nil {
564+
b.Fatal(err)
565+
}
566+
if err := eng.Flush(); err != nil {
567+
b.Fatal(err)
568+
}
569+
570+
b.StartTimer()
571+
err := truncator.truncateAppliedNodes(ctx)
572+
b.StopTimer()
573+
require.NoError(b, err)
574+
require.Equal(b, truncator.truncIndex.Load(), uint64(b.N))
575+
})
576+
}
577+
}

0 commit comments

Comments
 (0)