diff --git a/src/core/dash.h b/src/core/dash.h index 525fa2beccac..7545544f17ee 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -616,6 +616,11 @@ struct DashTable<_Key, _Value, Policy>::BucketSet { }); } + bool operator==(const BucketSet& other) const { + return owner_ == other.owner_ && seg_id_ == other.seg_id_ && limit_ == other.limit_ && + ids_[0] == other.ids_[0] && ids_[1] == other.ids_[1]; + } + private: friend class DashTable; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 3bff8d850975..6c1b38f678ef 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -675,11 +675,17 @@ OpResult DbSlice::AddOrFindInternal(const Context& cntx, return OpStatus::WRONG_TYPE; } + // It's a new entry. auto status = res.status(); CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status; - // It's a new entry. - CallChangeCallbacks(cntx.db_index, ChangeReq{key}); + if (!change_cb_.empty()) { + auto bucket_set = db.prime.CVCUponInsert(key); + CallChangeCallbacks(cntx.db_index, bucket_set); + + // Set of possible insertion buckets must be the same after possibly blocking call + DCHECK(bucket_set == db.prime.CVCUponInsert(key)); + } ssize_t memory_offset = -key.size(); size_t reclaimed = 0; @@ -848,16 +854,13 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { auto on_change = [&](DbIndex db_index, const ChangeReq& req) { FiberAtomicGuard fg; - PrimeTable* table = GetTables(db_index); - if (const PrimeTable::bucket_iterator* bit = req.update()) { + if (const auto* bit = std::get_if(&req)) { if (!bit->is_done() && bit->GetVersion() < next_version) { iterate_bucket(*bit); } } else { - string_view key = get(req.change); - auto bucket_set = table->CVCUponInsert(key); - for (auto it : bucket_set.buckets()) { + for (auto it : std::get(req).buckets()) { if (!it.is_done() && it.GetVersion() < next_version) iterate_bucket(it); } diff --git a/src/server/serializer_base.cc b/src/server/serializer_base.cc index d68162ead824..8d83192fe6d8 100644 --- a/src/server/serializer_base.cc +++ b/src/server/serializer_base.cc @@ -98,7 +98,7 @@ SerializerBase::~SerializerBase() { void SerializerBase::RegisterChangeListener() { db_array_ = db_slice_->databases(); // copy pointers to survive flush auto cb = [this](DbIndex dbid, const ChangeReq& req) { - std::visit([&](auto it) { OnChangeBlocking(dbid, it); }, req.change); + std::visit([&](auto it) { OnChangeBlocking(dbid, it); }, req); }; snapshot_version_ = db_slice_->RegisterOnChange(cb); } @@ -202,14 +202,14 @@ void SerializerBase::OnChangeBlocking(DbIndex db_index, PrimeTable::bucket_itera ProcessBucket(db_index, it, true); } -void SerializerBase::OnChangeBlocking(DbIndex db_index, std::string_view key) { +void SerializerBase::OnChangeBlocking(DbIndex db_index, const PrimeTable::BucketSet& set) { // We must acquire the mutex ahead and process all buckets under the same lock. - // This ensures that CVCUponInsert and the table insertion that invoked this callback + // This ensures that bucket processing and the table insertion that invoked this callback // will be operating on the same state as all writes are linarly ordered by this mutex. std::unique_lock lk{big_value_mu_}; // We call Process even for up-to-date buckets to ensure all operations (delayed) are finished. - for (auto it : db_slice_->GetTables(db_index)->CVCUponInsert(key).buckets()) + for (auto it : set.buckets()) ProcessBucketInternal(db_index, it, true); } diff --git a/src/server/serializer_base.h b/src/server/serializer_base.h index 40c7316263e1..a5094c60e9e0 100644 --- a/src/server/serializer_base.h +++ b/src/server/serializer_base.h @@ -92,9 +92,8 @@ class SerializerBase : public DelayedEntryHandler { // Called when an existing bucket is about to be mutated. Calls ProcessBucket. void OnChangeBlocking(DbIndex db_index, PrimeTable::bucket_iterator it); - // Called when a new key is about to be inserted, - // calls CVCUponInsert -> OnChangeBlocking(bucket_iterator) for every touched bucket. - void OnChangeBlocking(DbIndex db_index, std::string_view key); + // Called when a new key is about to be inserted. Calls ProcessBucket for the buckets. + void OnChangeBlocking(DbIndex db_index, const PrimeTable::BucketSet& set); // --- Shared members (to be moved from subclasses in later PRs) --- diff --git a/src/server/table.h b/src/server/table.h index 8edec61231f3..b3f463852cdf 100644 --- a/src/server/table.h +++ b/src/server/table.h @@ -176,20 +176,7 @@ struct DbTable : boost::intrusive_ref_counter>; -// ChangeReq - describes the change to the table. -struct ChangeReq { - // If iterator is set then it's an update to the existing bucket. - // Otherwise (string_view is set) then it's a new key that is going to be added to the table. - std::variant change; - - explicit ChangeReq(PrimeTable::bucket_iterator it) : change(it) { - } - explicit ChangeReq(std::string_view key) : change(key) { - } - - const PrimeTable::bucket_iterator* update() const { - return std::get_if(&change); - } -}; +// ChangeReq - describes the change to the table: either single bucket or whole bucket set. +using ChangeReq = std::variant; } // namespace dfly