1515
1616namespace dfly {
1717
18+ void BucketDependencies::Increment (BucketIdentity bucket) {
19+ auto & counter = deps_[bucket];
20+ if (!counter)
21+ counter = std::make_shared<LocalLatch>();
22+ counter->lock ();
23+ }
24+
25+ void BucketDependencies::Decrement (BucketIdentity bucket) {
26+ auto it = deps_.find (bucket);
27+ DCHECK (it != deps_.end ());
28+ it->second ->unlock ();
29+
30+ if (!it->second ->IsBlocked ())
31+ deps_.erase (it);
32+ }
33+
34+ void BucketDependencies::Wait (BucketIdentity bucket) const {
35+ auto it = deps_.find (bucket);
36+ if (it == deps_.end ())
37+ return ;
38+
39+ auto counter = it->second ; // copy value for address stability
40+ counter->Wait ();
41+ }
42+
43+ bool BucketDependencies::DEBUG_IsBusy (BucketIdentity bucket) const {
44+ return deps_.contains (bucket);
45+ }
46+
1847void DelayedEntryHandler::EnqueueOffloaded (BucketIdentity bucket, DbIndex db_index, PrimeKey pk,
1948 const PrimeValue& pv, time_t expire_time,
2049 uint32_t mc_flags) {
@@ -26,6 +55,8 @@ void DelayedEntryHandler::EnqueueOffloaded(BucketIdentity bucket, DbIndex db_ind
2655 auto future = ReadTieredString (db_index, key, pv, EngineShard::tlocal ()->tiered_storage ());
2756 auto entry = std::make_unique<TieredDelayedEntry>(db_index, std::move (pk), std::move (future),
2857 expire_time, mc_flags);
58+
59+ deps_.Increment (bucket);
2960 delayed_entries_.emplace (bucket, std::move (entry));
3061}
3162
@@ -35,7 +66,7 @@ void DelayedEntryHandler::ProcessDelayedEntries(bool force, BucketIdentity flush
3566 if (delayed_entries_.size () > kMaxDelayedEntries )
3667 force |= true ;
3768
38- auto serialize_entry = [&](auto it) {
69+ auto serialize_entry = [&](decltype (delayed_entries_)::iterator it) {
3970 auto & entry = it->second ;
4071 auto value = entry->value .Get ();
4172
@@ -47,6 +78,8 @@ void DelayedEntryHandler::ProcessDelayedEntries(bool force, BucketIdentity flush
4778
4879 PrimeValue pv{*value};
4980 SerializeFetchedEntry (*entry, pv);
81+
82+ deps_.Decrement (it->first );
5083 delayed_entries_.erase (it++);
5184 };
5285
@@ -68,8 +101,9 @@ void DelayedEntryHandler::ProcessDelayedEntries(bool force, BucketIdentity flush
68101}
69102
70103SerializerBase::SerializerBase (DbSlice* slice, ExecutionState* cntx)
71- : db_slice_(slice), base_cntx_(cntx) {
72- DCHECK (db_slice_);
104+ : DelayedEntryHandler(static_cast <BucketDependencies&>(*this )),
105+ db_slice_ (slice),
106+ base_cntx_(cntx) {
73107 DCHECK (base_cntx_);
74108}
75109
@@ -97,45 +131,19 @@ void SerializerBase::UnregisterChangeListener() {
97131 db_slice_->UnregisterOnChange (version);
98132}
99133
100- void SerializerBase::MarkBucketSerializing (BucketIdentity bid) {
101- DCHECK (!bucket_states_.contains (bid)) << " Bucket already in transient state" ;
102- bucket_states_[bid] = BucketPhase::kSerializing ;
103- }
104-
105- void SerializerBase::FinishBucketIteration (BucketIdentity bid) {
106- auto it = bucket_states_.find (bid);
107- DCHECK (it != bucket_states_.end ());
108- DCHECK (it->second == BucketPhase::kSerializing );
109-
110- bucket_states_.erase (it);
111- ++stats_.buckets_serialized ;
112- }
113-
114- bool SerializerBase::ShouldProcessBucket (PrimeTable::bucket_iterator it) {
115- // Check if bucket is invalid or was already serialized
116- if (it.is_done () || it.GetVersion () >= snapshot_version_) {
117- ++stats_.buckets_skipped ;
118- return false ;
119- }
120-
121- // Check if this bucket is currently being serialized
122- if (bucket_states_.contains (it.bucket_address ())) {
123- ++stats_.change_during_serialization ;
124- return false ;
125- }
126-
127- return true ;
128- }
129-
130134bool SerializerBase::ProcessBucket (DbIndex db_index, PrimeTable::bucket_iterator it,
131135 bool on_update) {
132136 std::lock_guard guard (big_value_mu_);
133137
134- // Check if this bucket should be serialized
135- if (! ShouldProcessBucket (it) ) {
138+ // Check if this bucket is stale
139+ if (it. is_done () || it. GetVersion () >= snapshot_version_ ) {
136140 // Force flush all delayed entries in the touched bucket
137141 if (EngineShard::tlocal ()->tiered_storage () != nullptr && on_update && !it.is_done ())
138142 ProcessDelayedEntries (false , it.bucket_address (), base_cntx_);
143+
144+ // Expected to be fully serialized due to big_value_mu_ guarding all paths
145+ // Otherwise, this needs to be changed to a wait
146+ DCHECK (!BucketDependencies::DEBUG_IsBusy (it.bucket_address ()));
139147 return false ;
140148 }
141149
@@ -148,9 +156,11 @@ bool SerializerBase::ProcessBucket(DbIndex db_index, PrimeTable::bucket_iterator
148156 }
149157
150158 it.SetVersion (snapshot_version_);
151- MarkBucketSerializing (it.bucket_address ());
159+ BucketDependencies::Increment (it.bucket_address ());
160+
152161 stats_.keys_serialized += SerializeBucket (db_index, it, on_update);
153- FinishBucketIteration (it.bucket_address ());
162+ stats_.buckets_serialized ++;
163+ BucketDependencies::Decrement (it.bucket_address ());
154164
155165 if (EngineShard::tlocal ()->tiered_storage () != nullptr )
156166 ProcessDelayedEntries (false , on_update ? it.bucket_address () : 0 , base_cntx_);
0 commit comments