Skip to content

Commit 7755a41

Browse files
kvstorage: add WAG truncation infrastructure
This PR adds the infrastructure that will be used to truncating WAG nodes. It will be used in two places: 1) Truncating all WAG nodes on startup, and 2) Periodically truncating durably applied WAG nodes. A lot of functions are similar to WAG replay, and there are TODOs to reconcile the functions between WAG replay and truncation. There is also some work left for clearing the Raft state, and deleting sideloaded files. Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com> kvstorage/wag: prepare for WAG truncation This commit sets up the premitives for WAG truncations by: 1) Adding a `Delete` function for removing WAG nodes by index. 2) Changing the WAG `Iterator.Iter` method to return `iter.Seq2[uint64, wagpb.Node]` pair. This will be useful when we iterate over the WAG and truncate the nodes that have been applied and synced. Epic: none Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
1 parent 16c01b7 commit 7755a41

3 files changed

Lines changed: 441 additions & 0 deletions

File tree

pkg/kv/kvserver/kvstorage/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
"replica_state.go",
1212
"stateloader.go",
1313
"storage.go",
14+
"wag_truncator.go",
1415
],
1516
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage",
1617
visibility = ["//visibility:public"],
@@ -20,6 +21,7 @@ go_library(
2021
"//pkg/kv/kvpb",
2122
"//pkg/kv/kvserver/kvserverpb",
2223
"//pkg/kv/kvserver/kvstorage/wag",
24+
"//pkg/kv/kvserver/kvstorage/wag/wagpb",
2325
"//pkg/kv/kvserver/logstore",
2426
"//pkg/kv/kvserver/rditer",
2527
"//pkg/kv/kvserver/spanset",
@@ -48,6 +50,7 @@ go_test(
4850
"initial_test.go",
4951
"stateloader_test.go",
5052
"storage_test.go",
53+
"wag_truncator_test.go",
5154
],
5255
data = glob(["testdata/**"]),
5356
embed = [":kvstorage"],
@@ -57,6 +60,8 @@ go_test(
5760
"//pkg/kv/kvpb",
5861
"//pkg/kv/kvserver/concurrency/lock",
5962
"//pkg/kv/kvserver/kvserverpb",
63+
"//pkg/kv/kvserver/kvstorage/wag",
64+
"//pkg/kv/kvserver/kvstorage/wag/wagpb",
6065
"//pkg/kv/kvserver/logstore",
6166
"//pkg/kv/kvserver/print",
6267
"//pkg/kv/kvserver/spanset",
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright 2026 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package kvstorage
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag"
13+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag/wagpb"
14+
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/cockroach/pkg/util/log"
16+
"github.com/cockroachdb/errors"
17+
)
18+
19+
// TruncateResult indicates the outcome of a truncateAppliedNodes call.
20+
type TruncateResult int
21+
22+
const (
23+
// TruncatedNone means no WAG nodes existed or none were applied.
24+
TruncatedNone TruncateResult = iota + 1
25+
// TruncatedSome means some but not all WAG nodes were deleted.
26+
TruncatedSome
27+
// TruncatedAll means all WAG nodes were deleted.
28+
TruncatedAll
29+
)
30+
31+
// PersistedRangeState describes the applied state of a range in the state
32+
// machine, as needed by the WAG replay decision logic.
33+
type PersistedRangeState struct {
34+
ReplicaID roachpb.ReplicaID
35+
TombstoneNextReplicaID roachpb.ReplicaID
36+
AppliedIndex kvpb.RaftIndex
37+
}
38+
39+
// loadPersistedRangeState loads the replay-relevant state for a range from the
40+
// state machine using StateLoader.
41+
// TODO(ibrahim): Merge these helper functions with the helper functions that
42+
// are used during WAG replay.
43+
func loadPersistedRangeState(
44+
ctx context.Context, stateRO StateRO, rangeID roachpb.RangeID,
45+
) (PersistedRangeState, error) {
46+
sl := MakeStateLoader(rangeID)
47+
rid, err := sl.LoadRaftReplicaID(ctx, stateRO)
48+
if err != nil {
49+
return PersistedRangeState{}, err
50+
}
51+
ts, err := sl.LoadRangeTombstone(ctx, stateRO)
52+
if err != nil {
53+
return PersistedRangeState{}, err
54+
}
55+
as, err := sl.LoadRangeAppliedState(ctx, stateRO)
56+
if err != nil {
57+
return PersistedRangeState{}, err
58+
}
59+
return PersistedRangeState{
60+
ReplicaID: rid.ReplicaID,
61+
TombstoneNextReplicaID: ts.NextReplicaID,
62+
AppliedIndex: as.RaftAppliedIndex,
63+
}, nil
64+
}
65+
66+
// isEventApplied checks whether the state for the given RangeID has progressed
67+
// past the given Addr. Per the WAG Addr ordering, a higher ReplicaID implies
68+
// all events for lower ReplicaIDs (including their destruction) have been
69+
// applied, and the same ReplicaID is ordered by Index.
70+
// TODO(Ibrahim): Refactor to use ReplicaMark (#156696) which will encapsulate
71+
// the replicaID/tombstone comparison logic.
72+
// TODO(ibrahim): Merge these helper functions with the helper functions that
73+
// are used during WAG replay.
74+
func isEventApplied(
75+
ctx context.Context, state PersistedRangeState, event wagpb.Event,
76+
) (bool, error) {
77+
switch {
78+
case event.Addr.ReplicaID < state.TombstoneNextReplicaID:
79+
// Destroyed replica; skip.
80+
return true, nil
81+
case event.Addr.ReplicaID < state.ReplicaID:
82+
// Stale replica superseded by a newer one; skip.
83+
return true, nil
84+
case event.Addr.ReplicaID == state.ReplicaID:
85+
if event.Type == wagpb.EventDestroy || event.Type == wagpb.EventSubsume {
86+
return false, nil
87+
}
88+
// For other events, compare raft indices.
89+
return event.Addr.Index <= state.AppliedIndex, nil
90+
default:
91+
// New replica not yet seen on this store; apply.
92+
return false, nil
93+
}
94+
}
95+
96+
// isNodeApplied checks whether all events in a WAG node have been applied to
97+
// the state engine, by comparing each event's Addr against the replica state.
98+
// TODO(ibrahim): Merge these helper functions with the helper functions that
99+
// are used during WAG replay.
100+
func isNodeApplied(ctx context.Context, stateRO StateRO, node wagpb.Node) (bool, error) {
101+
for _, event := range node.Events {
102+
state, err := loadPersistedRangeState(ctx, stateRO, event.Addr.RangeID)
103+
if err != nil {
104+
return false, errors.Wrapf(err, "loading state for r%d", event.Addr.RangeID)
105+
}
106+
107+
applied, err := isEventApplied(ctx, state, event)
108+
if err != nil {
109+
return false, err
110+
}
111+
if !applied {
112+
return false, nil
113+
}
114+
}
115+
return true, nil
116+
}
117+
118+
// truncateAppliedNodes deletes WAG nodes that have been applied. It iterates
119+
// over WAG nodes in order, checks whether each node has been applied, and
120+
// deletes it if that was the case. It stops at the first unapplied node.
121+
// The result indicates whether no nodes, some nodes, or all nodes were deleted.
122+
//
123+
// The caller must provide a stateRO reader with GuaranteedDurability so that
124+
// only state confirmed flushed to persistent storage is visible. This ensures
125+
// we never delete a WAG node whose mutations aren't flushed yet.
126+
// TODO(Ibrahim): Add logic to clear sideloaded files for destroyed/subsumed
127+
// replicas.
128+
func truncateAppliedNodes(
129+
ctx context.Context, raftRW RaftRW, stateRO StateRO,
130+
) (TruncateResult, error) {
131+
var iter wag.Iterator
132+
seen, deleted := 0, 0
133+
for index, node := range iter.Iter(ctx, raftRW) {
134+
seen++
135+
applied, err := isNodeApplied(ctx, stateRO, node)
136+
if err != nil {
137+
return 0, err
138+
}
139+
if !applied {
140+
break
141+
}
142+
if err := wag.Delete(raftRW, index); err != nil {
143+
return 0, err
144+
}
145+
deleted++
146+
// TODO(Ibrahim): Add logic to clear raft state (log entries, HardState,
147+
// TruncatedState) for destroyed/subsumed replicas.
148+
}
149+
if err := iter.Error(); err != nil {
150+
return 0, err
151+
}
152+
if deleted == 0 {
153+
return TruncatedNone, nil
154+
}
155+
156+
log.KvExec.Infof(ctx, "truncated %d applied WAG nodes", deleted)
157+
if deleted == seen {
158+
return TruncatedAll, nil
159+
}
160+
return TruncatedSome, nil
161+
}

0 commit comments

Comments
 (0)