kvstorage: add live WAG truncation#168169
kvstorage: add live WAG truncation#168169iskettaneh wants to merge 3 commits intocockroachdb:masterfrom
Conversation
|
Merging to
After your PR is submitted to the merge queue, this comment will be automatically updated with its status. If the PR fails, failure details will also be posted here |
5937faa to
1a1a6cb
Compare
1a1a6cb to
4f696be
Compare
|
Detected infrastructure failure (matched: self-hosted runner lost communication with the server). Automatically rerunning failed jobs. (run link) |
1 similar comment
|
Detected infrastructure failure (matched: self-hosted runner lost communication with the server). Automatically rerunning failed jobs. (run link) |
| // by definition has all the required events applied. However, we avoid jumping | ||
| // over gaps so that we know the exact index to truncate next, which facilitates | ||
| // seeking past previously deleted WAG garbage. | ||
| func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) { |
There was a problem hiding this comment.
It seems like this type became 2 non-overlapping APIs merged into one type. Consider making a separate type for the online truncation (which might be using the offline helper if needed).
There was a problem hiding this comment.
Or do you intend to use the same object? What's its lifetime? Something like:
- Create truncator when
Storeis created. - Run the offline truncation using this object, around the replay time.
Startwhen theStoreis started.
There was a problem hiding this comment.
I was thinking of a lifetime similar to the one you listed. Create the truncator as part of the store, after WAG replay, call TruncateALL (offline), and then call Start()
| return | ||
| } | ||
| nextIndex := t.lastTruncatedWAGIndex.Load() + 1 | ||
| b := t.eng.LogEngine().NewWriteBatch() |
There was a problem hiding this comment.
Do you think there would be a way to share this code with the offline func?
One difference is that this loop reads/writes lastTruncatedWAGIndex. This could be easily factored out from the loop (e.g. return the last truncated index from the helper, and then set the atomic in the caller).
One observation that could be helpful: we don't super need lastTruncatedWAGIndex to be updated after every truncated node. Only after the last one. For a few reasons:
- The
lastTruncatedWAGIndex < seq.Loadonly possibly becomes false after the last node is truncated. - Only downside of updating
lastTruncatedWAGIndexa bit late is that our func might be called one extra time before it quiesces. This is always a possibility anyway. - This func has monopoly on updating
lastTruncatedWAGIndex. So it needs to read this field only once, then it knows exactly how/when it moves forward.
There was a problem hiding this comment.
I did some refactoring to share some code between the two usecases.
| // TODO(ibrahim): We need to decide if we need to retry truncating this | ||
| // WAG node now, or rely on the next state engine flush to do it. |
There was a problem hiding this comment.
I'd lean to rely on the outer retry. Or fatal here for now.
Are you worried that, say, we are the last WAG node after which there are no new appends. If we fail and don't retry here, there won't be a signal waking us back up.
This isn't catastrophic. I think we should mostly prevent a situation that the WAG is ever growing. If it's stuck at some fixed size, this isn't too bad.
There was a problem hiding this comment.
We could ping the wake-up channel from here to guarantee the retry. The adverse effect of this is endless spinning, so I can see why some custom retry loop here may be attractive.
There was a problem hiding this comment.
I think we might end up with some time-driven waking up, if we implement some retention policy for the WAG. E.g. it would wake every hour to GC some long gone stuff. It could be a convenient place to rely on for these dormant errored deletions.
There was a problem hiding this comment.
I was worried about this scenario: We try to truncate node X, but there is a transient error so the truncation fails. The code now doesn't retry to truncate x again, and we rely on the next State Engine flush. I believe that this is okay and it's not a big deal.
That being said, now that we have refactored the code a bit, it is easier now (if we want) to add some retry logic with exponential backoff and retry for a few seconds before relying on the next state engine flush.
4f696be to
74cee63
Compare
iskettaneh
left a comment
There was a problem hiding this comment.
@iskettaneh made 5 comments and resolved 2 discussions.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on pav-kv).
| // by definition has all the required events applied. However, we avoid jumping | ||
| // over gaps so that we know the exact index to truncate next, which facilitates | ||
| // seeking past previously deleted WAG garbage. | ||
| func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) { |
There was a problem hiding this comment.
I was thinking of a lifetime similar to the one you listed. Create the truncator as part of the store, after WAG replay, call TruncateALL (offline), and then call Start()
| return | ||
| } | ||
| nextIndex := t.lastTruncatedWAGIndex.Load() + 1 | ||
| b := t.eng.LogEngine().NewWriteBatch() |
There was a problem hiding this comment.
I did some refactoring to share some code between the two usecases.
| // TODO(ibrahim): We need to decide if we need to retry truncating this | ||
| // WAG node now, or rely on the next state engine flush to do it. |
There was a problem hiding this comment.
I was worried about this scenario: We try to truncate node X, but there is a transient error so the truncation fails. The code now doesn't retry to truncate x again, and we rely on the next State Engine flush. I believe that this is okay and it's not a big deal.
That being said, now that we have refactored the code a bit, it is easier now (if we want) to add some retry logic with exponential backoff and retry for a few seconds before relying on the next state engine flush.
94147fa to
59a9de3
Compare
| // | ||
| // TODO(ibrahim): Add a TestKnob for keeping a suffix of the WAG for debugging. | ||
| func (t *WAGTruncator) Start(ctx context.Context, stopper *stop.Stopper) error { | ||
| t.lastTruncatedWAGIndex.Store(t.seq.Load()) |
There was a problem hiding this comment.
Hypothetically, when we have some retention policy, e.g. "keep the last 100 nodes", we should initialize lastTruncatedWAGIndex not to seq.Load(), but to the last entry that the offline truncation deleted.
Say we have WAG indices {1,2,10,20}. After restart, we would keep them intact. It means that:
- The online truncation needs to ignore gaps at indices <= 20.
- And then enforce continuity after index 20.
There is then a remote possibility that we don't need the offline truncation in the first place. We could just initialize the "no gaps" threshold to seq.Load(), and then rely on the online truncation to do the job.
For now, should we move the lastTruncatedWAGIndex advancement inside truncateAppliedNodes?
There was a problem hiding this comment.
Discussed offline and decided to merge offline and online truncation into one.
59a9de3 to
795be4e
Compare
|
Detected infrastructure failure (matched: self-hosted runner lost communication with the server). Automatically rerunning failed jobs. (run link) |
795be4e to
fe777bb
Compare
iskettaneh
left a comment
There was a problem hiding this comment.
@iskettaneh made 3 comments.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on pav-kv and RaduBerinde).
| // | ||
| // TODO(ibrahim): Add a TestKnob for keeping a suffix of the WAG for debugging. | ||
| func (t *WAGTruncator) Start(ctx context.Context, stopper *stop.Stopper) error { | ||
| t.lastTruncatedWAGIndex.Store(t.seq.Load()) |
There was a problem hiding this comment.
Discussed offline and decided to merge offline and online truncation into one.
pav-kv
left a comment
There was a problem hiding this comment.
Took a quick look. Still looking, but dumping a few comments since I think the ignore gaps etc logic could be simplified.
| st *cluster.Settings, | ||
| eng Engines, | ||
| seq *wag.Seq, | ||
| lastIndexAfterStartup uint64, |
There was a problem hiding this comment.
Don't need this parameter - it can be read from seq.
| defer func() { | ||
| if t.knobs.AfterTruncationCallback != nil { | ||
| t.knobs.AfterTruncationCallback() | ||
| } | ||
| }() |
There was a problem hiding this comment.
Don't pay the cost of defer in prod:
| defer func() { | |
| if t.knobs.AfterTruncationCallback != nil { | |
| t.knobs.AfterTruncationCallback() | |
| } | |
| }() | |
| if cb := t.knobs.AfterTruncationCallback; cb != nil { | |
| defer cb() | |
| } |
There was a problem hiding this comment.
There is no defer anymore now that we have changed the structure a bit.
| // There are WAG nodes that we can potentially truncate with | ||
| // ignoreGaps=true. These are the WAG nodes that existed at engine startup, | ||
| // and gaps are expected to be present. | ||
| lastTruncated, err := t.truncateAppliedNodes(ctx, startIndex, t.lastWAGIndexBeforeStartup, true /* ignoreGaps */) |
There was a problem hiding this comment.
I think truncateAppliedNodes could understand automatically whether it should ignore gaps (based on where the current index is compared to lastWAGIndexBeforeStartup).
Seemingly, we don't need to pass in t.lastWAGIndexBeforeStartup (the method has access to it) or ignoreGaps (the method knows this rule).
There was a problem hiding this comment.
I guess I was building it from the ground up. But I totally agree, when looking at the PR as a whole, it makes sense that we don't really need the ignoreGaps field. A lot of things could be simplified without it.
| // Returns the index of the last successfully truncated node or 0 when no nodes | ||
| // were truncated. It also returns an error if any occurred during truncation. | ||
| func (t *WAGTruncator) truncateAppliedNodes( | ||
| ctx context.Context, startIndex uint64, lastIndex uint64, ignoreGaps bool, |
There was a problem hiding this comment.
Any reason we enforce the caller to know ignoreGaps rules? It seems to me that we could simply encapsulate this knowledge in this func, because we have access to t.lastWAGIndexBeforeStartup threshold.
Or maybe in the method down the stack that it calls. Basically, do we need the entire stack to drag this info around, or can push it all the way down?
There was a problem hiding this comment.
I guess I was building it from the ground up. But I totally agree, when looking at the PR as a whole, it makes sense that we don't really need the ignoreGaps field. A lot of things could be simplified without it.
|
|
||
| // truncateAppliedNodesLive is called by the background goroutine to truncate | ||
| // applied WAG nodes. | ||
| func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) { |
There was a problem hiding this comment.
I somehow feel that this method is not needed, with the general structure I imagine:
// does everything
// - knows where to start from
// - remembers where it left off
truncateAppliedNodes(ctx)
->
// - knows what is skippable
truncateAppliedWAGNodeAndClearRaftState(index)There was a problem hiding this comment.
It was needed to set the ignoreGaps field. But now that this logic is inside truncateAppliedWAGNodeAndClearRaftState, we don't really need it.
fe777bb to
3c795ac
Compare
iskettaneh
left a comment
There was a problem hiding this comment.
@iskettaneh made 5 comments.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on pav-kv and RaduBerinde).
| st *cluster.Settings, | ||
| eng Engines, | ||
| seq *wag.Seq, | ||
| lastIndexAfterStartup uint64, |
|
|
||
| // truncateAppliedNodesLive is called by the background goroutine to truncate | ||
| // applied WAG nodes. | ||
| func (t *WAGTruncator) truncateAppliedNodesLive(ctx context.Context) { |
There was a problem hiding this comment.
It was needed to set the ignoreGaps field. But now that this logic is inside truncateAppliedWAGNodeAndClearRaftState, we don't really need it.
| defer func() { | ||
| if t.knobs.AfterTruncationCallback != nil { | ||
| t.knobs.AfterTruncationCallback() | ||
| } | ||
| }() |
There was a problem hiding this comment.
There is no defer anymore now that we have changed the structure a bit.
| // There are WAG nodes that we can potentially truncate with | ||
| // ignoreGaps=true. These are the WAG nodes that existed at engine startup, | ||
| // and gaps are expected to be present. | ||
| lastTruncated, err := t.truncateAppliedNodes(ctx, startIndex, t.lastWAGIndexBeforeStartup, true /* ignoreGaps */) |
There was a problem hiding this comment.
I guess I was building it from the ground up. But I totally agree, when looking at the PR as a whole, it makes sense that we don't really need the ignoreGaps field. A lot of things could be simplified without it.
| // Returns the index of the last successfully truncated node or 0 when no nodes | ||
| // were truncated. It also returns an error if any occurred during truncation. | ||
| func (t *WAGTruncator) truncateAppliedNodes( | ||
| ctx context.Context, startIndex uint64, lastIndex uint64, ignoreGaps bool, |
There was a problem hiding this comment.
I guess I was building it from the ground up. But I totally agree, when looking at the PR as a whole, it makes sense that we don't really need the ignoreGaps field. A lot of things could be simplified without it.
This commit adds a helper function "truncateAppliedNodes" which allows deleting multiple WAG nodes starting from an index. Also, this commit allows the truncator to handle gaps. The idea is that at startup, seq will be initialized to the index of the largest WAG index that was persisted before the last restart. We can expect to have gaps in the indices before that. However, we don't handle gaps in the indices after startup. If there is a gap, we won't jump over it and will wait for it to be persisted. Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
This commit introduces TestingKnobs to the WAGTruncator. Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
This commit adds the ability to perform live WAG truncation. It keeps track of the last successfully truncated WAG node index, and only attempts to perform a round of WAG truncation if (1) The state engine flushes, and (2) There is a higher WAG sequence number than the last one we truncated. Also, this commit gets rid of offline truncation and merges it with online truncation. It does this by tracking a new field `lastWAGIndexBeforeStartup` that marks the last WAG index at engine startup. We can remove WAG nodes before this index while ignoring gaps, and nodes after this index without ignoring gaps. Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
3c795ac to
cd45efb
Compare
|
Since the design significantly changed since the start of this PR, I am going to close this one. |
This PR does the following:
References: #167607
Release note: None