Skip to content

Commit 8de24a1

Browse files
kvstorage: add WAG replay logic (#167029)
kvstorage: add WAG replay logic
2 parents 216f7ae + 7696e69 commit 8de24a1

3 files changed

Lines changed: 542 additions & 0 deletions

File tree

pkg/kv/kvserver/kvstorage/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
"replica_state.go",
1212
"stateloader.go",
1313
"storage.go",
14+
"wag_replay.go",
1415
],
1516
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage",
1617
visibility = ["//visibility:public"],
@@ -20,6 +21,7 @@ go_library(
2021
"//pkg/kv/kvpb",
2122
"//pkg/kv/kvserver/kvserverpb",
2223
"//pkg/kv/kvserver/kvstorage/wag",
24+
"//pkg/kv/kvserver/kvstorage/wag/wagpb",
2325
"//pkg/kv/kvserver/logstore",
2426
"//pkg/kv/kvserver/rditer",
2527
"//pkg/kv/kvserver/spanset",
@@ -48,6 +50,7 @@ go_test(
4850
"initial_test.go",
4951
"stateloader_test.go",
5052
"storage_test.go",
53+
"wag_replay_test.go",
5154
],
5255
data = glob(["testdata/**"]),
5356
embed = [":kvstorage"],
@@ -57,6 +60,8 @@ go_test(
5760
"//pkg/kv/kvpb",
5861
"//pkg/kv/kvserver/concurrency/lock",
5962
"//pkg/kv/kvserver/kvserverpb",
63+
"//pkg/kv/kvserver/kvstorage/wag",
64+
"//pkg/kv/kvserver/kvstorage/wag/wagpb",
6065
"//pkg/kv/kvserver/logstore",
6166
"//pkg/kv/kvserver/print",
6267
"//pkg/kv/kvserver/spanset",
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
// Copyright 2026 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package kvstorage
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag"
13+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag/wagpb"
14+
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/errors"
16+
)
17+
18+
// persistedRangeState describes the applied state of a range in the state
19+
// machine, as needed by the WAG replay decision logic.
20+
type persistedRangeState struct {
21+
replicaID roachpb.ReplicaID
22+
tombstoneNextReplicaID roachpb.ReplicaID
23+
appliedIndex kvpb.RaftIndex
24+
}
25+
26+
// validate checks persistedRangeState invariants.
27+
func (s persistedRangeState) validate() error {
28+
// When a replica exists (ReplicaID > 0), the tombstone must not exceed the
29+
// replica ID. The tombstone is only bumped above a replica's ID when that
30+
// replica is destroyed.
31+
if s.replicaID > 0 && s.tombstoneNextReplicaID > s.replicaID {
32+
return errors.AssertionFailedf(
33+
"tombstone (NextReplicaID=%d) is above current ReplicaID=%d",
34+
s.tombstoneNextReplicaID, s.replicaID,
35+
)
36+
}
37+
return nil
38+
}
39+
40+
// loadPersistedRangeState loads the replay-relevant state for a range from the
41+
// state machine using StateLoader.
42+
func loadPersistedRangeState(
43+
ctx context.Context, stateRO StateRO, rangeID roachpb.RangeID,
44+
) (persistedRangeState, error) {
45+
sl := MakeStateLoader(rangeID)
46+
rid, err := sl.LoadRaftReplicaID(ctx, stateRO)
47+
if err != nil {
48+
return persistedRangeState{}, err
49+
}
50+
ts, err := sl.LoadRangeTombstone(ctx, stateRO)
51+
if err != nil {
52+
return persistedRangeState{}, err
53+
}
54+
as, err := sl.LoadRangeAppliedState(ctx, stateRO)
55+
if err != nil {
56+
return persistedRangeState{}, err
57+
}
58+
state := persistedRangeState{
59+
replicaID: rid.ReplicaID,
60+
tombstoneNextReplicaID: ts.NextReplicaID,
61+
appliedIndex: as.RaftAppliedIndex,
62+
}
63+
return state, state.validate()
64+
}
65+
66+
// replayAction describes what the replay loop must do for a WAG node.
67+
type replayAction struct {
68+
// apply indicates whether the WAG node's mutation needs to be applied.
69+
apply bool
70+
// catchUps lists ranges that must be caught up via raft log replay before
71+
// the WAG node can be applied. Empty when apply is false, and may be empty
72+
// when apply is true (e.g. for EventCreate/EventInit nodes).
73+
catchUps []raftCatchUpTarget
74+
}
75+
76+
// raftCatchUpTarget identifies a range/replica that must be caught up to a
77+
// specific raft index via raft log replay before a WAG node can be applied.
78+
type raftCatchUpTarget struct {
79+
rangeID roachpb.RangeID
80+
replicaID roachpb.ReplicaID
81+
index kvpb.RaftIndex
82+
}
83+
84+
// canApply reports whether a WAG event can be applied to the state machine,
85+
// given the current applied state for the event's RangeID. It compares the
86+
// event against the state machine's current position for this range. See
87+
// canApplyWAGNode for the node-level wrapper.
88+
//
89+
// The decision is based on where event replica ID falls relative to
90+
// state.replicaID on the number line, giving three regions:
91+
//
92+
// [0, state.replicaID) → old (destroyed or never existed); skip
93+
// state.replicaID → current replica; compare raft indices
94+
// (state.replicaID, ∞) → new replica; apply
95+
//
96+
// TODO(mira): Refactor to use ReplicaMark (#156696) which will encapsulate
97+
// the replicaID/tombstone comparison logic.
98+
//
99+
// TODO(mira): Some of the cases below are not possible for all event types.
100+
// E.g. For a new replica with event.Addr.ReplicaID > state.replicaID, we'd
101+
// expect an EventCreate, not another type of event. Assert on these.
102+
func (state persistedRangeState) canApply(event wagpb.Event) bool {
103+
// The WAG protocol ensures that any WAG node event has a non-zero ReplicaID.
104+
if event.Addr.ReplicaID == 0 {
105+
panic(errors.AssertionFailedf("WAG event for r%d has zero ReplicaID", event.Addr.RangeID))
106+
}
107+
switch {
108+
case event.Addr.ReplicaID < state.tombstoneNextReplicaID ||
109+
event.Addr.ReplicaID < state.replicaID:
110+
// Old replica (destroyed or never existed); skip. The persistedRangeState
111+
// validation guarantees state.tombstoneNextReplicaID <= state.replicaID
112+
// when a replica exists, but we can't rely on the state.replicaID
113+
// comparison alone because when no current replica exists
114+
// (state.replicaID == 0), only the tombstone can identify stale events.
115+
return false
116+
case event.Addr.ReplicaID == state.replicaID:
117+
// Current replica. Destroy/Subsume events always need applying here —
118+
// if their mutation had already been applied, the tombstone would have
119+
// been bumped and the first case would have matched.
120+
if event.Type == wagpb.EventDestroy || event.Type == wagpb.EventSubsume {
121+
return true
122+
}
123+
// For other events, compare raft indices.
124+
return event.Addr.Index > state.appliedIndex
125+
case event.Addr.ReplicaID > state.replicaID:
126+
// New replica not yet seen on this store; apply.
127+
return true
128+
default:
129+
panic(errors.AssertionFailedf("unhandled: event %s, state %+v", event, state))
130+
}
131+
}
132+
133+
// raftCatchUp returns the raft index the replica must be caught up to before
134+
// this WAG event can be applied. Zero means no catch-up is needed.
135+
func raftCatchUp(event wagpb.Event) kvpb.RaftIndex {
136+
switch event.Type {
137+
case wagpb.EventCreate, wagpb.EventInit:
138+
// No prior raft log; no catch-up.
139+
return 0
140+
case wagpb.EventApply, wagpb.EventSubsume, wagpb.EventDestroy:
141+
// Subsume, Destroy: the replica must be fully caught up before destruction.
142+
return event.Addr.Index
143+
case wagpb.EventSplit, wagpb.EventMerge:
144+
// The replica must be caught up to the command just before the
145+
// split/merge at event.Index.
146+
return event.Addr.Index - 1
147+
default:
148+
panic(errors.AssertionFailedf("unexpected event type %d", event.Type))
149+
}
150+
}
151+
152+
// canApplyWAGNode determines whether a WAG node's mutation can be applied to
153+
// the state machine. It checks each event in the node and returns the replay
154+
// action, which indicates whether to apply and which ranges need raft log
155+
// catch-up first.
156+
//
157+
// All events in a node are expected to agree on whether they need applying,
158+
// since they are written and applied atomically.
159+
func canApplyWAGNode(ctx context.Context, node wagpb.Node, stateRO StateRO) (replayAction, error) {
160+
var result replayAction
161+
for i, event := range node.Events {
162+
state, err := loadPersistedRangeState(ctx, stateRO, event.Addr.RangeID)
163+
if err != nil {
164+
return replayAction{}, errors.Wrapf(err, "loading state for r%d", event.Addr.RangeID)
165+
}
166+
// A given RangeID appears at most once in the events list, so the decision
167+
// of whether an event can be applied is independent for each event.
168+
apply := state.canApply(event)
169+
if i == 0 {
170+
result.apply = apply
171+
} else if apply != result.apply {
172+
return replayAction{}, errors.Newf(
173+
"partial apply: event[0]=%s (apply=%t), event[%d]=%s (apply=%t)",
174+
node.Events[0], result.apply, i, event, apply,
175+
)
176+
}
177+
if !apply {
178+
continue
179+
}
180+
if catchUp := raftCatchUp(event); catchUp > 0 {
181+
result.catchUps = append(result.catchUps, raftCatchUpTarget{
182+
rangeID: event.Addr.RangeID,
183+
replicaID: event.Addr.ReplicaID,
184+
index: catchUp,
185+
})
186+
}
187+
}
188+
return result, nil
189+
}
190+
191+
// ReplayWAG iterates over the WAG in the log engine and applies any unapplied
192+
// nodes to the state machine. It is called during store startup, before the
193+
// store goes online.
194+
func ReplayWAG(ctx context.Context, raftRO RaftRO, stateRW StateRW) error {
195+
var iter wag.Iterator
196+
for wagIdx, node := range iter.Iter(ctx, raftRO) {
197+
action, err := canApplyWAGNode(ctx, node, stateRW)
198+
if err != nil {
199+
return errors.Wrapf(err, "WAG node %d", wagIdx)
200+
}
201+
if !action.apply {
202+
continue
203+
}
204+
// TODO(mira): For each entry in action.catchUps, replay raft log
205+
// entries for the target range/replica up to the target index before
206+
// applying the WAG node. The current raft log replay in
207+
// handleRaftReadyRaftMuLocked needs to be factored out and invoked here.
208+
// The catch-up code should assert that the target index is >= the
209+
// replica's current applied index.
210+
if err := applyMutation(ctx, stateRW, node.Mutation); err != nil {
211+
return errors.Wrapf(err, "WAG node %d", wagIdx)
212+
}
213+
}
214+
return iter.Error()
215+
}
216+
217+
// applyMutation applies a WAG node's mutation to the state machine. It handles
218+
// both write batch and ingestion mutations. The ingestion is applied before the
219+
// batch, because a mutation may have both: the ingestion is idempotent, and the
220+
// batch "finalizes" the mutation (e.g. updates the applied index which inputs
221+
// into the "is applied" decision).
222+
func applyMutation(ctx context.Context, stateWO StateWO, m wagpb.Mutation) error {
223+
if m.Ingestion != nil {
224+
// TODO(mira): Implement ingestion replay. This requires translating the
225+
// Ingestion proto (SST paths, shared/external tables) into the appropriate
226+
// IngestAndExciseFiles call on the state machine.
227+
return errors.UnimplementedErrorf(
228+
errors.IssueLink{}, "WAG ingestion replay not yet implemented",
229+
)
230+
}
231+
if len(m.Batch) > 0 {
232+
return stateWO.ApplyBatchRepr(m.Batch, false /* sync */)
233+
}
234+
return nil
235+
}

0 commit comments

Comments
 (0)