Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/core/dash.h
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,11 @@ struct DashTable<_Key, _Value, Policy>::BucketSet {
});
}

bool operator==(const BucketSet& other) const {
return owner_ == other.owner_ && seg_id_ == other.seg_id_ && limit_ == other.limit_ &&
ids_[0] == other.ids_[0] && ids_[1] == other.ids_[1];
}

private:
friend class DashTable;

Expand Down
17 changes: 10 additions & 7 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -675,11 +675,17 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
return OpStatus::WRONG_TYPE;
}

// It's a new entry.
auto status = res.status();
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status;

// It's a new entry.
CallChangeCallbacks(cntx.db_index, ChangeReq{key});
if (!change_cb_.empty()) {
auto bucket_set = db.prime.CVCUponInsert(key);
CallChangeCallbacks(cntx.db_index, bucket_set);

// Set of possible insertion buckets must be the same after possibly blocking call
DCHECK(bucket_set == db.prime.CVCUponInsert(key));
}

ssize_t memory_offset = -key.size();
size_t reclaimed = 0;
Expand Down Expand Up @@ -848,16 +854,13 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {

auto on_change = [&](DbIndex db_index, const ChangeReq& req) {
FiberAtomicGuard fg;
PrimeTable* table = GetTables(db_index);

if (const PrimeTable::bucket_iterator* bit = req.update()) {
if (const auto* bit = std::get_if<PrimeTable::bucket_iterator>(&req)) {
if (!bit->is_done() && bit->GetVersion() < next_version) {
iterate_bucket(*bit);
}
} else {
string_view key = get<string_view>(req.change);
auto bucket_set = table->CVCUponInsert(key);
for (auto it : bucket_set.buckets()) {
for (auto it : std::get<PrimeTable::BucketSet>(req).buckets()) {
if (!it.is_done() && it.GetVersion() < next_version)
iterate_bucket(it);
}
Expand Down
8 changes: 4 additions & 4 deletions src/server/serializer_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ SerializerBase::~SerializerBase() {
void SerializerBase::RegisterChangeListener() {
db_array_ = db_slice_->databases(); // copy pointers to survive flush
auto cb = [this](DbIndex dbid, const ChangeReq& req) {
std::visit([&](auto it) { OnChangeBlocking(dbid, it); }, req.change);
std::visit([&](auto it) { OnChangeBlocking(dbid, it); }, req);
};
snapshot_version_ = db_slice_->RegisterOnChange(cb);
}
Expand Down Expand Up @@ -202,14 +202,14 @@ void SerializerBase::OnChangeBlocking(DbIndex db_index, PrimeTable::bucket_itera
ProcessBucket(db_index, it, true);
}

void SerializerBase::OnChangeBlocking(DbIndex db_index, std::string_view key) {
void SerializerBase::OnChangeBlocking(DbIndex db_index, const PrimeTable::BucketSet& set) {
// We must acquire the mutex ahead and process all buckets under the same lock.
// This ensures that CVCUponInsert and the table insertion that invoked this callback
// This ensures that bucket processing and the table insertion that invoked this callback
// will be operating on the same state as all writes are linarly ordered by this mutex.
std::unique_lock lk{big_value_mu_};

// We call Process even for up-to-date buckets to ensure all operations (delayed) are finished.
for (auto it : db_slice_->GetTables(db_index)->CVCUponInsert(key).buckets())
for (auto it : set.buckets())
ProcessBucketInternal(db_index, it, true);
}

Expand Down
5 changes: 2 additions & 3 deletions src/server/serializer_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ class SerializerBase : public DelayedEntryHandler {
// Called when an existing bucket is about to be mutated. Calls ProcessBucket.
void OnChangeBlocking(DbIndex db_index, PrimeTable::bucket_iterator it);

// Called when a new key is about to be inserted,
// calls CVCUponInsert -> OnChangeBlocking(bucket_iterator) for every touched bucket.
void OnChangeBlocking(DbIndex db_index, std::string_view key);
// Called when a new key is about to be inserted. Calls ProcessBucket for the buckets.
void OnChangeBlocking(DbIndex db_index, const PrimeTable::BucketSet& set);

// --- Shared members (to be moved from subclasses in later PRs) ---

Expand Down
17 changes: 2 additions & 15 deletions src/server/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,7 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
// the snapshot process. We copy the pointers in StartSnapshotInShard function.
using DbTableArray = std::vector<boost::intrusive_ptr<DbTable>>;

// ChangeReq - describes the change to the table.
struct ChangeReq {
// If iterator is set then it's an update to the existing bucket.
// Otherwise (string_view is set) then it's a new key that is going to be added to the table.
std::variant<PrimeTable::bucket_iterator, std::string_view> change;

explicit ChangeReq(PrimeTable::bucket_iterator it) : change(it) {
}
explicit ChangeReq(std::string_view key) : change(key) {
}

const PrimeTable::bucket_iterator* update() const {
return std::get_if<PrimeTable::bucket_iterator>(&change);
}
};
// ChangeReq - describes the change to the table: either single bucket or whole bucket set.
using ChangeReq = std::variant<PrimeTable::bucket_iterator, PrimeTable::BucketSet>;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that ChangeReq is a std::variant<PrimeTable::bucket_iterator, PrimeTable::BucketSet>, the replication/serialization docs that describe ChangeReq::update() and req.change appear stale and could mislead future work. Consider updating docs/shard-serialization.md to reflect the new variant-based API.

Severity: low

Other Locations
  • docs/shard-serialization.md:169

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.


} // namespace dfly
Loading