Skip to content

Commit c46bb2b

Browse files
kvstorage/wag: prepare for WAG truncation
This commit sets up the premitives for WAG truncations by: 1) Adding a `Delete` function for removing WAG nodes by index. 2) Changing the WAG `Iterator.Iter` method to return `iter.Seq2[uint64, wagpb.Node]` pair. This will be useful when we iterate over the WAG and truncate the nodes that have been applied and synced. Epic: none Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
1 parent 2d7256a commit c46bb2b

2 files changed

Lines changed: 30 additions & 7 deletions

File tree

pkg/kv/kvserver/kvstorage/wag/BUILD.bazel

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,41 @@ go_library(
44
name = "wag",
55
srcs = [
66
"store.go",
7+
"truncator.go",
78
"writer.go",
89
],
910
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag",
1011
visibility = ["//visibility:public"],
1112
deps = [
1213
"//pkg/keys",
14+
"//pkg/kv/kvpb",
15+
"//pkg/kv/kvserver/kvserverpb",
1316
"//pkg/kv/kvserver/kvstorage/wag/wagpb",
17+
"//pkg/roachpb",
1418
"//pkg/storage",
19+
"//pkg/util/hlc",
20+
"//pkg/util/log",
1521
],
1622
)
1723

1824
go_test(
1925
name = "wag_test",
20-
srcs = ["store_test.go"],
26+
srcs = [
27+
"store_test.go",
28+
"truncator_test.go",
29+
],
2130
data = glob(["testdata/**"]),
2231
embed = [":wag"],
2332
deps = [
33+
"//pkg/keys",
2434
"//pkg/kv/kvpb",
35+
"//pkg/kv/kvserver/kvserverpb",
2536
"//pkg/kv/kvserver/kvstorage/wag/wagpb",
2637
"//pkg/kv/kvserver/print",
2738
"//pkg/roachpb",
2839
"//pkg/storage",
2940
"//pkg/testutils/echotest",
41+
"//pkg/util/hlc",
3042
"//pkg/util/leaktest",
3143
"//pkg/util/log",
3244
"@com_github_stretchr_testify//require",

pkg/kv/kvserver/kvstorage/wag/store.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,16 @@ func Write(w storage.Writer, index uint64, node wagpb.Node) error {
4848
return w.PutUnversioned(keys.StoreWAGNodeKey(index), data)
4949
}
5050

51+
// Delete removes the WAG node at the given sequence number.
52+
func Delete(w storage.Writer, index uint64) error {
53+
return w.ClearUnversioned(keys.StoreWAGNodeKey(index), storage.ClearOptions{})
54+
}
55+
5156
// Iterator helps to scan the WAG sequence.
5257
//
5358
// var iter wag.Iterator
54-
// for node := range iter.Iter(ctx, reader) {
55-
// // process node
59+
// for index, node := range iter.Iter(ctx, reader) {
60+
// // process index, node
5661
// }
5762
// if err := iter.Error(); err != nil {
5863
// return err
@@ -64,8 +69,9 @@ type Iterator struct {
6469
err error
6570
}
6671

67-
// Iter returns an iterator that scans the WAG sequence.
68-
func (it *Iterator) Iter(ctx context.Context, r storage.Reader) iter.Seq[wagpb.Node] {
72+
// Iter returns an iterator that scans the WAG sequence. The iterator yields a
73+
// pair containing the WAG node index and the WAG node itself.
74+
func (it *Iterator) Iter(ctx context.Context, r storage.Reader) iter.Seq2[uint64, wagpb.Node] {
6975
prefix := keys.StoreWAGPrefix()
7076
mi, err := r.NewMVCCIterator(ctx, storage.MVCCKeyIterKind, storage.IterOptions{
7177
UpperBound: prefix.PrefixEnd(),
@@ -76,13 +82,18 @@ func (it *Iterator) Iter(ctx context.Context, r storage.Reader) iter.Seq[wagpb.N
7682
}
7783
mi.SeekGE(storage.MakeMVCCMetadataKey(prefix))
7884

79-
return func(yield func(wagpb.Node) bool) {
85+
return func(yield func(uint64, wagpb.Node) bool) {
8086
defer mi.Close()
8187
for ; ; mi.Next() {
8288
if ok, err := mi.Valid(); err != nil || !ok {
8389
it.err = err
8490
return
8591
}
92+
index, err := keys.DecodeWAGNodeKey(mi.UnsafeKey().Key)
93+
if err != nil {
94+
it.err = err
95+
return
96+
}
8697
v, err := mi.UnsafeValue()
8798
if err != nil {
8899
it.err = err
@@ -92,7 +103,7 @@ func (it *Iterator) Iter(ctx context.Context, r storage.Reader) iter.Seq[wagpb.N
92103
if it.err = node.Unmarshal(v); it.err != nil { // nolint:protounmarshal
93104
return
94105
}
95-
if !yield(node) {
106+
if !yield(index, node) {
96107
return
97108
}
98109
}

0 commit comments

Comments
 (0)