fix(core): Update CVCUponInsert#7068
Conversation
7ae7a06 to
420d9fb
Compare
|
augment review |
cf0c8a4 to
8ac61cd
Compare
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
8ac61cd to
7642082
Compare
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
4bfea25 to
181f8e7
Compare
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
c9c4155 to
58884f9
Compare
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
28b72b0 to
a0c46f4
Compare
|
augment review |
🤖 Augment PR SummarySummary: This PR refines CVC (Capture Version Change) handling for inserts/bump-ups to improve snapshot/serialization correctness and simplify callers. Changes:
Technical Notes: Insert-triggered change handling now walks the bucket set returned by CVC to ensure relevant buckets are considered consistently across serializer and db-slice flows (including full-segment cases). 🤖 Was this summary useful? React with 👍 or 👎 |
| ProcessBucketInternal(db_index, it, true); | ||
| } | ||
|
|
||
| } // namespace dfly |
There was a problem hiding this comment.
CVCUponInsert now returns empty buckets too, but ProcessBucketInternal still treats it.is_done() as an unconditional skip, and the delayed-entry flush in the skip-path is also gated on !it.is_done(). If delayed tiered reads are keyed by bucket identity even after the bucket becomes empty, this insert path may fail to wait/flush them despite the comment about ensuring delayed work is finished.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
Pull request overview
This PR updates Dragonfly’s snapshot/replication serialization flow to more conservatively process buckets affected by inserts/bumps (including refactoring the CVC APIs) in order to address replication discrepancies (issue #7006), especially around delayed (tiered) entries.
Changes:
- Refactor
DashTable::CVCUponInsertto return aBucketSetiterable (instead of callback-based) and adjustCVCUponBumpto ignore version thresholds. - Refactor
SerializerBasebucket processing intoProcessIfNeeded+ProcessBucketInternal, and update snapshot/restore codepaths to use the new API. - Remove unused “moved callbacks” plumbing from
DbSlice, and update affected call sites and tests.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/server/snapshot.cc | Switch snapshot traversal to call ProcessIfNeeded instead of the old ProcessBucket. |
| src/server/serializer_base.h | Rename/refactor bucket processing API to ProcessIfNeeded + internal helper. |
| src/server/serializer_base.cc | Implement the refactor; update insert-change handling to iterate buckets from CVCUponInsert. |
| src/server/journal/streamer.cc | Update restore streamer loop to use ProcessIfNeeded. |
| src/server/db_slice.h | Remove moved-callback API; add SnapshotVersions() view helper. |
| src/server/db_slice.cc | Remove moved-callback invocations; update insert/bump CVC call sites for new APIs. |
| src/core/dash.h | Change CVCUponInsert to return BucketSet; update CVCUponBump signature; add BucketSet type. |
| src/core/dash_internal.h | Refactor segment-level CVC helpers (CVCOnInsert returns optional; CVCOnBump ignores version threshold). |
| src/core/dash_test.cc | Update unit tests to match the new CVC APIs. |
| return std::views::iota(0u, limit) | | ||
| std::views::transform([=, ids = ids, owner = owner, seg_id = seg_id](uint8_t i) { | ||
| uint8_t index = is_all ? i : ids[i]; | ||
| return bucket_iterator{owner, seg_id, index}; |
There was a problem hiding this comment.
BucketSet::buckets() constructs bucket_iterator{owner, seg_id, index} which runs Seek2Occupied() and sets owner_ = nullptr for empty buckets; this makes empty buckets indistinguishable from end-iterators and prevents callers (e.g. SerializerBase::OnChange(key)) from getting bucket identity/version for empty buckets as intended. Consider yielding a bucket iterator that does not Seek2Occupied (e.g. construct with slot=0) or returning a dedicated bucket reference type that remains valid even when empty.
| return bucket_iterator{owner, seg_id, index}; | |
| return bucket_iterator{owner, seg_id, index, 0}; |
| // Assert the version is equal to a snapshot version so no other modifications have happened | ||
| DCHECK_GE(it.GetVersion(), snapshot_version_); | ||
| auto current_snapshots = db_slice_->SnapshotVersions(); | ||
| DCHECK(std::ranges::find(current_snapshots, it.GetVersion()) != current_snapshots.end()) | ||
| << absl::StrJoin(current_snapshots, " ") << " does not contain " << it.GetVersion(); |
There was a problem hiding this comment.
std::ranges::find is used here but this file does not include <algorithm>, which is where ranges algorithms are declared; this can break builds depending on transitive includes. Add the proper header (or switch to an absl helper already included) to make this TU self-contained.
|
|
||
| // Assert the version is equal to a snapshot version so no other modifications have happened | ||
| DCHECK_GE(it.GetVersion(), snapshot_version_); | ||
| auto current_snapshots = db_slice_->SnapshotVersions(); |
There was a problem hiding this comment.
ok, what is that? what scenario it guards against?
Please note that db_slice_->SnapshotVersions(); is called and executed in release mode as well.
| it.SetVersion(snapshot_version_); | ||
| MarkBucketSerializing(it.bucket_address()); | ||
|
|
||
| stats_.keys_serialized += SerializeBucket(db_index, it, on_update); |
There was a problem hiding this comment.
please move DCHECK_EQ inside SerializeBucket to the loop like I did here:
https://github.com/dragonflydb/dragonfly/pull/7071/changes#diff-f67586dd571688e986aba8f508c6ee198e0adbfa01ebd362b55403bac5f22b84R237
| #pragma once | ||
|
|
||
| #include <array> | ||
| #include <optional> |
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
32b44ab to
c612383
Compare
| if (ProcessBucket(db_index, it, true)) | ||
| ++stats_.buckets_on_change; | ||
|
|
||
| std::lock_guard guard(big_value_mu_); |
There was a problem hiding this comment.
can be replaced with ProcessIfNeeded which I find a bad name as it always calls ProcessBucketInternal so there is no if needed part
Signed-off-by: Vladislav <vlad@dragonflydb.io>
|
The PR description is outdated now |
Don't skip buckets with versions that are less than snapshort version in CVCUponInsert. This is because we set the bucket version just when we start serializing. As a result, a modification operation doesn't wait for serializatin to finish before proceeding - because the version is already equal to snapshot version. This bug would occur if the global mutex would be absent - so this PR reintroduces function scope locking for it in OnChange.
It also introduces BucketSet as a utility to get rid of the callback