Skip to content

kvstorage: add live WAG truncation#168169

Closed
iskettaneh wants to merge 3 commits intocockroachdb:masterfrom
iskettaneh:rse_truncate_5
Closed

kvstorage: add live WAG truncation#168169
iskettaneh wants to merge 3 commits intocockroachdb:masterfrom
iskettaneh:rse_truncate_5

Conversation

@iskettaneh
Copy link
Copy Markdown
Contributor

@iskettaneh iskettaneh commented Apr 10, 2026

This PR does the following:

  1. adds the ability to perform live WAG truncation. It keeps track of the last successfully truncated WAG node index, and only attempts to perform a round of WAG truncation if (1) The state engine flushes, and (2) There is a higher WAG sequence number than the last one we truncated.
  2. Merges offline and online truncation into one concept.
  3. Introduce TestingKnobs to the WAGTruncator.

References: #167607

Release note: None

@trunk-io
Copy link
Copy Markdown
Contributor

trunk-io Bot commented Apr 10, 2026

Merging to master in this repository is managed by Trunk.

  • To merge this pull request, check the box to the left or comment /trunk merge below.

After your PR is submitted to the merge queue, this comment will be automatically updated with its status. If the PR fails, failure details will also be posted here

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@iskettaneh iskettaneh requested a review from pav-kv April 12, 2026 22:12
@iskettaneh iskettaneh marked this pull request as ready for review April 12, 2026 22:12
@iskettaneh iskettaneh requested a review from a team as a code owner April 12, 2026 22:12
@blathers-crl
Copy link
Copy Markdown

blathers-crl Bot commented Apr 12, 2026

Detected infrastructure failure (matched: self-hosted runner lost communication with the server). Automatically rerunning failed jobs. (run link)

1 similar comment
@blathers-crl
Copy link
Copy Markdown

blathers-crl Bot commented Apr 12, 2026

Detected infrastructure failure (matched: self-hosted runner lost communication with the server). Automatically rerunning failed jobs. (run link)

Comment thread pkg/kv/kvserver/kvstorage/wag_truncator.go Outdated
// by definition has all the required events applied. However, we avoid jumping
// over gaps so that we know the exact index to truncate next, which facilitates
// seeking past previously deleted WAG garbage.
func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this type became 2 non-overlapping APIs merged into one type. Consider making a separate type for the online truncation (which might be using the offline helper if needed).

Copy link
Copy Markdown
Collaborator

@pav-kv pav-kv Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you intend to use the same object? What's its lifetime? Something like:

  • Create truncator when Store is created.
  • Run the offline truncation using this object, around the replay time.
  • Start when the Store is started.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of a lifetime similar to the one you listed. Create the truncator as part of the store, after WAG replay, call TruncateALL (offline), and then call Start()

Comment thread pkg/kv/kvserver/kvstorage/wag/store.go Outdated
Comment thread pkg/kv/kvserver/kvstorage/wag_truncator.go Outdated
return
}
nextIndex := t.lastTruncatedWAGIndex.Load() + 1
b := t.eng.LogEngine().NewWriteBatch()
Copy link
Copy Markdown
Collaborator

@pav-kv pav-kv Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think there would be a way to share this code with the offline func?

One difference is that this loop reads/writes lastTruncatedWAGIndex. This could be easily factored out from the loop (e.g. return the last truncated index from the helper, and then set the atomic in the caller).

One observation that could be helpful: we don't super need lastTruncatedWAGIndex to be updated after every truncated node. Only after the last one. For a few reasons:

  • The lastTruncatedWAGIndex < seq.Load only possibly becomes false after the last node is truncated.
  • Only downside of updating lastTruncatedWAGIndex a bit late is that our func might be called one extra time before it quiesces. This is always a possibility anyway.
  • This func has monopoly on updating lastTruncatedWAGIndex. So it needs to read this field only once, then it knows exactly how/when it moves forward.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some refactoring to share some code between the two usecases.

Comment thread pkg/kv/kvserver/kvstorage/wag_truncator.go Outdated
Comment on lines +230 to +231
// TODO(ibrahim): We need to decide if we need to retry truncating this
// WAG node now, or rely on the next state engine flush to do it.
Copy link
Copy Markdown
Collaborator

@pav-kv pav-kv Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd lean to rely on the outer retry. Or fatal here for now.

Are you worried that, say, we are the last WAG node after which there are no new appends. If we fail and don't retry here, there won't be a signal waking us back up.

This isn't catastrophic. I think we should mostly prevent a situation that the WAG is ever growing. If it's stuck at some fixed size, this isn't too bad.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could ping the wake-up channel from here to guarantee the retry. The adverse effect of this is endless spinning, so I can see why some custom retry loop here may be attractive.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might end up with some time-driven waking up, if we implement some retention policy for the WAG. E.g. it would wake every hour to GC some long gone stuff. It could be a convenient place to rely on for these dormant errored deletions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried about this scenario: We try to truncate node X, but there is a transient error so the truncation fails. The code now doesn't retry to truncate x again, and we rely on the next State Engine flush. I believe that this is okay and it's not a big deal.
That being said, now that we have refactored the code a bit, it is easier now (if we want) to add some retry logic with exponential backoff and retry for a few seconds before relying on the next state engine flush.

Copy link
Copy Markdown
Contributor Author

@iskettaneh iskettaneh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iskettaneh made 5 comments and resolved 2 discussions.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on pav-kv).

Comment thread pkg/kv/kvserver/kvstorage/wag_truncator.go Outdated
Comment thread pkg/kv/kvserver/kvstorage/wag_truncator.go Outdated
// by definition has all the required events applied. However, we avoid jumping
// over gaps so that we know the exact index to truncate next, which facilitates
// seeking past previously deleted WAG garbage.
func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of a lifetime similar to the one you listed. Create the truncator as part of the store, after WAG replay, call TruncateALL (offline), and then call Start()

return
}
nextIndex := t.lastTruncatedWAGIndex.Load() + 1
b := t.eng.LogEngine().NewWriteBatch()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some refactoring to share some code between the two usecases.

Comment on lines +230 to +231
// TODO(ibrahim): We need to decide if we need to retry truncating this
// WAG node now, or rely on the next state engine flush to do it.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried about this scenario: We try to truncate node X, but there is a transient error so the truncation fails. The code now doesn't retry to truncate x again, and we rely on the next State Engine flush. I believe that this is okay and it's not a big deal.
That being said, now that we have refactored the code a bit, it is easier now (if we want) to add some retry logic with exponential backoff and retry for a few seconds before relying on the next state engine flush.

@iskettaneh iskettaneh requested a review from pav-kv April 13, 2026 14:59
@iskettaneh iskettaneh force-pushed the rse_truncate_5 branch 2 times, most recently from 94147fa to 59a9de3 Compare April 13, 2026 15:43
@iskettaneh iskettaneh requested a review from a team as a code owner April 13, 2026 15:43
@iskettaneh iskettaneh requested a review from RaduBerinde April 13, 2026 15:43
//
// TODO(ibrahim): Add a TestKnob for keeping a suffix of the WAG for debugging.
func (t *WAGTruncator) Start(ctx context.Context, stopper *stop.Stopper) error {
t.lastTruncatedWAGIndex.Store(t.seq.Load())
Copy link
Copy Markdown
Collaborator

@pav-kv pav-kv Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hypothetically, when we have some retention policy, e.g. "keep the last 100 nodes", we should initialize lastTruncatedWAGIndex not to seq.Load(), but to the last entry that the offline truncation deleted.

Say we have WAG indices {1,2,10,20}. After restart, we would keep them intact. It means that:

  • The online truncation needs to ignore gaps at indices <= 20.
  • And then enforce continuity after index 20.

There is then a remote possibility that we don't need the offline truncation in the first place. We could just initialize the "no gaps" threshold to seq.Load(), and then rely on the online truncation to do the job.

For now, should we move the lastTruncatedWAGIndex advancement inside truncateAppliedNodes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline and decided to merge offline and online truncation into one.

@blathers-crl
Copy link
Copy Markdown

blathers-crl Bot commented Apr 13, 2026

Detected infrastructure failure (matched: self-hosted runner lost communication with the server). Automatically rerunning failed jobs. (run link)

Comment thread pkg/kv/kvserver/kvstorage/wag_truncator_test.go Outdated
Copy link
Copy Markdown
Contributor Author

@iskettaneh iskettaneh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iskettaneh made 3 comments.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on pav-kv and RaduBerinde).

//
// TODO(ibrahim): Add a TestKnob for keeping a suffix of the WAG for debugging.
func (t *WAGTruncator) Start(ctx context.Context, stopper *stop.Stopper) error {
t.lastTruncatedWAGIndex.Store(t.seq.Load())
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline and decided to merge offline and online truncation into one.

Comment thread pkg/kv/kvserver/kvstorage/wag_truncator.go Outdated
Comment thread pkg/kv/kvserver/kvstorage/wag_truncator_test.go Outdated
Copy link
Copy Markdown
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a quick look. Still looking, but dumping a few comments since I think the ignore gaps etc logic could be simplified.

st *cluster.Settings,
eng Engines,
seq *wag.Seq,
lastIndexAfterStartup uint64,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need this parameter - it can be read from seq.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines +122 to +126
defer func() {
if t.knobs.AfterTruncationCallback != nil {
t.knobs.AfterTruncationCallback()
}
}()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't pay the cost of defer in prod:

Suggested change
defer func() {
if t.knobs.AfterTruncationCallback != nil {
t.knobs.AfterTruncationCallback()
}
}()
if cb := t.knobs.AfterTruncationCallback; cb != nil {
defer cb()
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no defer anymore now that we have changed the structure a bit.

// There are WAG nodes that we can potentially truncate with
// ignoreGaps=true. These are the WAG nodes that existed at engine startup,
// and gaps are expected to be present.
lastTruncated, err := t.truncateAppliedNodes(ctx, startIndex, t.lastWAGIndexBeforeStartup, true /* ignoreGaps */)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think truncateAppliedNodes could understand automatically whether it should ignore gaps (based on where the current index is compared to lastWAGIndexBeforeStartup).

Seemingly, we don't need to pass in t.lastWAGIndexBeforeStartup (the method has access to it) or ignoreGaps (the method knows this rule).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was building it from the ground up. But I totally agree, when looking at the PR as a whole, it makes sense that we don't really need the ignoreGaps field. A lot of things could be simplified without it.

// Returns the index of the last successfully truncated node or 0 when no nodes
// were truncated. It also returns an error if any occurred during truncation.
func (t *WAGTruncator) truncateAppliedNodes(
ctx context.Context, startIndex uint64, lastIndex uint64, ignoreGaps bool,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we enforce the caller to know ignoreGaps rules? It seems to me that we could simply encapsulate this knowledge in this func, because we have access to t.lastWAGIndexBeforeStartup threshold.

Or maybe in the method down the stack that it calls. Basically, do we need the entire stack to drag this info around, or can push it all the way down?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was building it from the ground up. But I totally agree, when looking at the PR as a whole, it makes sense that we don't really need the ignoreGaps field. A lot of things could be simplified without it.


// truncateAppliedNodesLive is called by the background goroutine to truncate
// applied WAG nodes.
func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somehow feel that this method is not needed, with the general structure I imagine:

// does everything
//	- knows where to start from
//	- remembers where it left off
truncateAppliedNodes(ctx)
->
// - knows what is skippable
truncateAppliedWAGNodeAndClearRaftState(index)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was needed to set the ignoreGaps field. But now that this logic is inside truncateAppliedWAGNodeAndClearRaftState, we don't really need it.

Copy link
Copy Markdown
Contributor Author

@iskettaneh iskettaneh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iskettaneh made 5 comments.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on pav-kv and RaduBerinde).

st *cluster.Settings,
eng Engines,
seq *wag.Seq,
lastIndexAfterStartup uint64,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// truncateAppliedNodesLive is called by the background goroutine to truncate
// applied WAG nodes.
func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was needed to set the ignoreGaps field. But now that this logic is inside truncateAppliedWAGNodeAndClearRaftState, we don't really need it.

Comment on lines +122 to +126
defer func() {
if t.knobs.AfterTruncationCallback != nil {
t.knobs.AfterTruncationCallback()
}
}()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no defer anymore now that we have changed the structure a bit.

// There are WAG nodes that we can potentially truncate with
// ignoreGaps=true. These are the WAG nodes that existed at engine startup,
// and gaps are expected to be present.
lastTruncated, err := t.truncateAppliedNodes(ctx, startIndex, t.lastWAGIndexBeforeStartup, true /* ignoreGaps */)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was building it from the ground up. But I totally agree, when looking at the PR as a whole, it makes sense that we don't really need the ignoreGaps field. A lot of things could be simplified without it.

// Returns the index of the last successfully truncated node or 0 when no nodes
// were truncated. It also returns an error if any occurred during truncation.
func (t *WAGTruncator) truncateAppliedNodes(
ctx context.Context, startIndex uint64, lastIndex uint64, ignoreGaps bool,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was building it from the ground up. But I totally agree, when looking at the PR as a whole, it makes sense that we don't really need the ignoreGaps field. A lot of things could be simplified without it.

@iskettaneh iskettaneh requested a review from pav-kv April 15, 2026 19:02
iskettaneh and others added 3 commits April 15, 2026 15:11
This commit adds a helper function "truncateAppliedNodes" which allows deleting
multiple WAG nodes starting from an index.

Also, this commit allows the truncator to handle gaps. The idea is that at
startup, seq will be initialized to the index of the largest WAG index that
was persisted before the last restart. We can expect to have gaps in the
indices before that. However, we don't handle gaps in the indices after
startup. If there is a gap, we won't jump over it and will wait for it to
be persisted.

Release note: None

Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
This commit introduces TestingKnobs to the WAGTruncator.

Release note: None

Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
This commit adds the ability to perform live WAG truncation. It keeps track of
the last successfully truncated WAG node index, and only attempts to perform a
round of WAG truncation if (1) The state engine flushes, and (2) There is a
higher WAG sequence number than the last one we truncated.

Also, this commit gets rid of offline truncation and merges it with online
truncation. It does this by tracking a new field `lastWAGIndexBeforeStartup`
that marks the last WAG index at engine startup. We can remove WAG nodes
before this index while ignoring gaps, and nodes after this index without
ignoring gaps.

Release note: None

Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
@iskettaneh
Copy link
Copy Markdown
Contributor Author

Since the design significantly changed since the start of this PR, I am going to close this one.

@iskettaneh iskettaneh closed this Apr 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants