2020#include " base/flags.h"
2121#include " base/logging.h"
2222#include " core/detail/listpack_wrap.h"
23+ #include " core/qlist.h"
2324#include " server/db_slice.h"
2425#include " server/engine_shard_set.h"
2526#include " server/snapshot.h"
@@ -54,6 +55,8 @@ ABSL_FLAG(float, tiered_upload_threshold, 0.1,
5455
5556ABSL_FLAG (bool , tiered_experimental_hash_support, false , " Experimental hash datatype offloading" );
5657
58+ ABSL_FLAG (bool , tiered_experimental_list_support, false , " Experimental list node offloading" );
59+
5760namespace dfly {
5861
5962using namespace std ;
@@ -119,7 +122,10 @@ size_t TieredStorage::StashDescriptor::Serialize(io::MutableBytes buffer) const
119122 lw, {reinterpret_cast <char *>(buffer.data ()), buffer.length ()});
120123 }
121124 case CompactObj::ExternalRep::LIST_NODE: {
122- // Make compiler happy. It will be implemented in following PR
125+ // LIST_NODE uses the string_view pair path (same as STRING).
126+ auto strs = std::get<std::array<std::string_view, 2 >>(blob);
127+ memcpy (buffer.data (), strs[0 ].data (), strs[0 ].size ());
128+ return strs[0 ].size ();
123129 }
124130 };
125131 return 0 ;
@@ -148,6 +154,32 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
148154 ClearStashPending (key);
149155 }
150156
157+ // Clear stash pending flag for list node
158+ void ClearStashPending (tiering::ListNodeId id) {
159+ stats_.total_cancels ++;
160+ QList::Node* node = reinterpret_cast <QList::Node*>(std::get<2 >(id));
161+ node->io_pending = 0 ;
162+ // If stashing failed we need to decrease offloaded nodes count.
163+ QList* ql = reinterpret_cast <QList*>(std::get<1 >(id));
164+ ql->IncrementNumOffloadedNodes (-1 );
165+ }
166+
167+ void CancelStash (tiering::KeyRef id, size_t size) {
168+ UnblockBackpressure (id, false );
169+ // TODO: Don't recompute size estimate, try-delete bin first
170+ if (OccupiesWholePages (size)) {
171+ CancelPending (id);
172+ } else if (auto bin = ts_->bins_ ->Delete (id.first , id.second ); bin) {
173+ CancelPending (*bin);
174+ }
175+ }
176+
177+ void CancelStash (tiering::ListNodeId id) {
178+ QList* ql = reinterpret_cast <QList*>(std::get<1 >(id));
179+ ql->IncrementNumOffloadedNodes (-1 );
180+ CancelPending (id);
181+ }
182+
151183 DbTableStats* GetDbTableStats (DbIndex dbid) {
152184 return db_slice_.MutableStats (dbid);
153185 }
@@ -200,7 +232,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
200232 break ;
201233 }
202234 case CompactObj::ExternalRep::LIST_NODE: {
203- // Make compiler happy. It will be implemented in following PR.
235+ LOG (DFATAL) << " LIST_NODE should not be uploaded to PrimeValue " ;
204236 break ;
205237 }
206238 };
@@ -239,6 +271,26 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
239271 SetExternal ({sub_dbid, sub_key}, sub_segment);
240272 }
241273
274+ // Finalize stash for a fragments identified by pointer
275+ void SetExternal (tiering::ListNodeId id, tiering::DiskSegment segment) {
276+ auto * stats = GetDbTableStats (std::get<0 >(id));
277+
278+ stats->tiered_entries ++;
279+ stats->tiered_used_bytes += segment.length ;
280+ stats_.total_stashes ++;
281+
282+ QList::Node* node = reinterpret_cast <QList::Node*>(std::get<2 >(id));
283+ QList* ql = reinterpret_cast <QList*>(std::get<1 >(id));
284+
285+ node->io_pending = 0 ;
286+
287+ // Adjust parent QList node malloc size / number of offloaded nodes.
288+ ql->AdjustMallocSize (-segment.length );
289+ node->SetExternal (segment.offset , segment.length );
290+
291+ stats->AddTypeMemoryUsage (OBJ_LIST, -segment.length );
292+ }
293+
242294 // If any backpressure (throttling) is active, notify that the operation finished
243295 void UnblockBackpressure (OpManager::KeyRef id, bool result) {
244296 if (auto node = ts_->stash_backpressure_ .extract (id); !node.empty ())
@@ -293,10 +345,28 @@ bool TieredStorage::ShardOpManager::NotifyFetched(const OwnedEntryId& id,
293345 tiering::Decoder* decoder) {
294346 ++stats_.total_fetches ;
295347
296- if (id == OwnedEntryId{kFragmentedBin }) { // Generally we read whole bins only for defrag
297- auto * bdecoder = static_cast <tiering::BareDecoder*>(decoder);
298- Defragment (segment, bdecoder->slice );
299- return true ; // delete
348+ if (const auto * key = std::get_if<tiering::ListNodeId>(&id); key) {
349+ ++stats_.total_uploads ;
350+
351+ QList* ql = reinterpret_cast <QList*>(std::get<1 >(*key));
352+ // Adjust malloc size and number of offloded nodes before uploading.
353+ ql->AdjustMallocSize (segment.length );
354+ ql->IncrementNumOffloadedNodes (-1 );
355+
356+ DbTableStats* stats = GetDbTableStats (std::get<0 >(*key));
357+ stats->AddTypeMemoryUsage (OBJ_LIST, segment.length );
358+
359+ // We return false here, because we don't want to delete the value from storage yet.
360+ // It will be done in onload_cb callback.
361+ return false ;
362+ }
363+
364+ if (const auto * i = std::get_if<uintptr_t >(&id); i) {
365+ if (*i == kFragmentedBin ) { // Generally we read whole bins only for defrag
366+ auto * bdecoder = static_cast <tiering::BareDecoder*>(decoder);
367+ Defragment (segment, bdecoder->slice );
368+ return true ; // delete
369+ }
300370 }
301371
302372 tiering::Decoder::UploadMetrics metrics = decoder->GetMetrics ();
@@ -389,6 +459,7 @@ error_code TieredStorage::Open(string_view base_path) {
389459}
390460
391461void TieredStorage::Close () {
462+ is_closed_ = true ;
392463 for (auto & [_, f] : stash_backpressure_)
393464 f.Resolve (false );
394465 op_manager_->Close ();
@@ -398,17 +469,17 @@ bool TieredStorage::HasModificationPending(tiering::DiskSegment segment) const {
398469 return op_manager_->HasModificationPending (segment);
399470}
400471
401- void TieredStorage::ReadInternal (DbIndex dbid, std::string_view key,
402- const tiering::DiskSegment& segment,
472+ void TieredStorage::ReadInternal (tiering::ReadId id, const tiering::DiskSegment& segment,
403473 const tiering::Decoder& decoder,
404474 std::function<void (io::Result<tiering::Decoder*>)> cb,
405475 bool read_only) {
406476 // TODO: improve performance by avoiding one more function wrap
407- op_manager_->Enqueue (KeyRef (dbid, key), segment, decoder, std::move (cb), read_only);
477+ op_manager_->Enqueue (std::visit ([](auto && value) -> tiering::PendingId { return value; }, id),
478+ segment, decoder, std::move (cb), read_only);
408479}
409480
410- void TieredStorage::Stash (DbIndex dbid, string_view key, const StashDescriptor& blobs,
411- BackPressureFuture* backpressure) {
481+ void TieredStorage::StashPrimeValue (DbIndex dbid, string_view key, const StashDescriptor& blobs,
482+ BackPressureFuture* backpressure) {
412483 CHECK (!bins_->IsPending (dbid, key)); // Because has stash pending is false (ShouldStash checks)
413484
414485 size_t est_size = blobs.EstimatedSerializedSize ();
@@ -448,6 +519,7 @@ void TieredStorage::Stash(DbIndex dbid, string_view key, const StashDescriptor&
448519}
449520
450521void TieredStorage::Delete (DbIndex dbid, FragmentRef fragment_ref) {
522+ DCHECK (!is_closed_);
451523 DCHECK (!fragment_ref.HasStashPending ());
452524 ++stats_.total_deletes ;
453525
@@ -460,22 +532,17 @@ void TieredStorage::Delete(DbIndex dbid, FragmentRef fragment_ref) {
460532 op_manager_->DeleteOffloaded (dbid, segment);
461533}
462534
463- void TieredStorage::CancelStash (DbIndex dbid, std::string_view key,
464- tiering::FragmentRef fragment_ref) {
535+ void TieredStorage::CancelStash (tiering::PendingId id, tiering::FragmentRef fragment_ref) {
465536 DCHECK (fragment_ref.HasStashPending ());
466-
467- // If any previous write was happening, it has been cancelled
468- if (auto node = stash_backpressure_.extract (make_pair (dbid, key)); !node.empty ())
469- std::move (node.mapped ()).Resolve (false );
470-
471- // TODO: Don't recompute size estimate, try-delete bin first
472- StashDescriptor blobs{fragment_ref.GetSerializationDescr ()};
473- size_t size = blobs.EstimatedSerializedSize ();
474- if (OccupiesWholePages (size)) {
475- op_manager_->CancelPending (KeyRef (dbid, key));
476- } else if (auto bin = bins_->Delete (dbid, key); bin) {
477- op_manager_->CancelPending (*bin);
478- }
537+ DCHECK (std::holds_alternative<KeyRef>(id) || std::holds_alternative<tiering::ListNodeId>(id));
538+ std::visit (absl::Overload{[&fragment_ref, this ](KeyRef id) {
539+ StashDescriptor blobs{fragment_ref.GetSerializationDescr ()};
540+ op_manager_->CancelStash (id, blobs.EstimatedSerializedSize ());
541+ },
542+ [this ](tiering::ListNodeId id) { op_manager_->CancelStash (id); },
543+ // Make variant exhaustive, but we should never call with this type.
544+ [](uintptr_t ) { LOG (DFATAL) << " Invalid id type for CancelStash" ; }},
545+ id);
479546 fragment_ref.SetStashPending (false );
480547}
481548
@@ -532,13 +599,15 @@ void TieredStorage::UpdateFromFlags() {
532599 .offload_threshold = absl::GetFlag (FLAGS_tiered_offload_threshold),
533600 .upload_threshold = absl::GetFlag (FLAGS_tiered_upload_threshold),
534601 .experimental_hash_offload = absl::GetFlag (FLAGS_tiered_experimental_hash_support),
602+ .experimental_list_offload = absl::GetFlag (FLAGS_tiered_experimental_list_support),
535603 };
536604}
537605
538606std::vector<std::string> TieredStorage::GetMutableFlagNames () {
539607 return base::GetFlagNames (FLAGS_tiered_min_value_size, FLAGS_tiered_experimental_cooling,
540608 FLAGS_tiered_storage_write_depth, FLAGS_tiered_offload_threshold,
541- FLAGS_tiered_upload_threshold, FLAGS_tiered_experimental_hash_support);
609+ FLAGS_tiered_upload_threshold, FLAGS_tiered_experimental_hash_support,
610+ FLAGS_tiered_experimental_list_support);
542611}
543612
544613bool TieredStorage::ShouldOffload () const {
@@ -576,7 +645,7 @@ void TieredStorage::RunOffloading(DbIndex dbid) {
576645 } else {
577646 stats_.offloading_stashes ++;
578647 it->second .SetStashPending (true );
579- Stash (dbid, it->first .GetSlice (&tmp), *blobs, nullptr );
648+ StashPrimeValue (dbid, it->first .GetSlice (&tmp), *blobs, nullptr );
580649 }
581650 }
582651 };
@@ -640,12 +709,20 @@ auto TieredStorage::ShouldStash(const tiering::FragmentRef& fragment_ref) const
640709 if (fragment_ref.ObjType () == OBJ_HASH && !config_.experimental_hash_offload )
641710 return nullopt ;
642711
712+ // For now, list node offloading is conditional
713+ if (fragment_ref.ObjType () == OBJ_LIST && !config_.experimental_list_offload )
714+ return nullopt ;
715+
643716 // Estimate value size
644717 StashDescriptor blobs{fragment_ref.GetSerializationDescr ()};
645718 size_t estimated_size = blobs.EstimatedSerializedSize ();
646719 if (estimated_size < config_.min_value_size )
647720 return nullopt ;
648721
722+ // If the fragment is list node we offload only if it occupies whole page.
723+ if (fragment_ref.ObjType () == OBJ_LIST && !OccupiesWholePages (estimated_size))
724+ return nullopt ;
725+
649726 // Limit write depth. TODO: Provide backpressure?
650727 if (op_manager_->GetStats ().pending_stash_cnt >= config_.write_depth_limit ) {
651728 ++stats_.stash_overflow_cnt ;
@@ -707,7 +784,35 @@ void StashPrimeValue(DbIndex dbid, std::string_view key, PrimeValue* pv, TieredS
707784 BackPressureFuture* backpressure) {
708785 if (auto blobs = ts->ShouldStash (*pv); blobs) {
709786 pv->SetStashPending (true );
710- ts->Stash (dbid, key, *blobs, backpressure);
787+ ts->StashPrimeValue (dbid, key, *blobs, backpressure);
788+ }
789+ }
790+
791+ bool StashListNode (DbIndex dbid, QList::Node* node, QList* ql, TieredStorage* ts,
792+ BackPressureFuture* backpressure) {
793+ if (auto blobs = ts->ShouldStash (*node); blobs) {
794+ // Increment before stashing; decremented on failure in `ClearStashPending`
795+ ql->IncrementNumOffloadedNodes (1 );
796+ node->io_pending = 1 ;
797+ ts->StashPartialValue (tiering::ListNodeId{dbid, ql, node}, *blobs, backpressure);
798+ return true ;
799+ }
800+ return false ;
801+ }
802+
803+ void TieredStorage::StashPartialValue (tiering::PendingId id, const StashDescriptor& blobs,
804+ BackPressureFuture* backpressure) {
805+ size_t est_size = blobs.EstimatedSerializedSize ();
806+ DCHECK_GT (est_size, 0u );
807+
808+ auto serialize = absl::bind_front (&StashDescriptor::Serialize, &blobs);
809+
810+ error_code ec = op_manager_->PrepareAndStash (id, est_size, serialize);
811+ if (ec) {
812+ bool to_log = ec != errc::file_too_large && ec != errc::operation_would_block &&
813+ ec != errc::operation_in_progress;
814+ LOG_IF (ERROR, to_log) << " Node stash failed: " << ec.message ();
815+ std::visit ([this ](const auto & value) { op_manager_->ClearStashPending (value); }, id);
711816 }
712817}
713818
@@ -716,7 +821,18 @@ void ReadTiered(DbIndex dbid, std::string_view key, const PrimeValue& value,
716821 auto cb = [readf = std::move (readf)](io::Result<tiering::StringDecoder*> res) mutable {
717822 readf (res.transform ([](tiering::StringDecoder* d) { return d->GetView (); }));
718823 };
719- ts->Read (dbid, key, value.GetExternalSlice (), tiering::StringDecoder{value}, std::move (cb));
824+ ts->Read (KeyRef{dbid, key}, value.GetExternalSlice (), tiering::StringDecoder{value},
825+ std::move (cb));
826+ }
827+
828+ void ReadTieredListNode (DbIndex dbid, QList::Node* node, QList* ql,
829+ const tiering::DiskSegment& segment,
830+ std::function<void (io::Result<std::string_view>)> readf,
831+ TieredStorage* ts) {
832+ auto cb = [readf = std::move (readf)](io::Result<tiering::BareDecoder*> res) mutable {
833+ readf (res.transform ([](tiering::BareDecoder* d) { return d->slice ; }));
834+ };
835+ ts->Read (tiering::ListNodeId{dbid, ql, node}, segment, tiering::BareDecoder{}, std::move (cb));
720836}
721837
722838template <typename T>
@@ -730,8 +846,8 @@ TieredStorage::TResult<T> ModifyTiered(DbIndex dbid, std::string_view key, const
730846 auto cb = [future, modf = std::move (modf)](io::Result<tiering::StringDecoder*> res) mutable {
731847 future.Resolve (res.transform ([&modf](auto * d) { return modf (d->Write ()); }));
732848 };
733- ts->Read (dbid, key, value.GetExternalSlice (), tiering::StringDecoder{value}, std::move (cb) ,
734- false );
849+ ts->Read (KeyRef{ dbid, key} , value.GetExternalSlice (), tiering::StringDecoder{value},
850+ std::move (cb), false );
735851
736852 return future;
737853}
0 commit comments