Skip to content

Commit efdc68e

Browse files
committed
feat(tiering): Experimental list node tiering
Decoupline QList from TieredStorage implementation details via TieringParams structure callbacks. - Introduce TieringParams struct holding function callbacks and offload state - Add Node::Upload() to restore an externally-loaded node back into memory - Add QList::SetDbIndex() to handle db reassignment, materializing offloaded nodes when tiering is active - Add StashListNode / ReadTieredListNode in TieredStorage - Add tiered_storage_test.cc coverage for list node offload/reload lifecycle - Add tiering_test.py integration tests for list tiering behavior Signed-off-by: mkaruza <mario@dragonflydb.io>
1 parent c402b21 commit efdc68e

12 files changed

Lines changed: 651 additions & 88 deletions

File tree

src/core/qlist.cc

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,14 @@ void QList::Node::SetExternal(size_t offset, uint32_t size) {
426426
ext_size = size;
427427
}
428428

429+
void QList::Node::Upload(QList* ql, std::string_view val) {
430+
entry = static_cast<unsigned char*>(zmalloc(val.size()));
431+
memcpy(entry, val.data(), val.size());
432+
ql->AdjustMallocSize(val.size());
433+
ql->AdjustOffloadNodeCount(-1);
434+
offloaded = 0;
435+
}
436+
429437
void QList::SetPackedThreshold(unsigned threshold) {
430438
packed_threshold = threshold;
431439
}
@@ -480,10 +488,9 @@ QList::QList(QList&& other) noexcept
480488
tiering_enabled_(other.tiering_enabled_),
481489
compress_(other.compress_),
482490
bookmark_count_(other.bookmark_count_),
483-
num_offloaded_nodes_(other.num_offloaded_nodes_) {
491+
tiering_params_(std::move(other.tiering_params_)) {
484492
other.head_ = nullptr;
485493
other.len_ = other.count_ = 0;
486-
other.num_offloaded_nodes_ = 0;
487494
}
488495

489496
QList::~QList() {
@@ -503,9 +510,9 @@ QList& QList::operator=(QList&& other) noexcept {
503510
tiering_enabled_ = other.tiering_enabled_;
504511
compress_ = other.compress_;
505512
bookmark_count_ = other.bookmark_count_;
506-
num_offloaded_nodes_ = other.num_offloaded_nodes_;
507513
other.head_ = nullptr;
508-
other.len_ = other.count_ = other.num_offloaded_nodes_ = 0;
514+
other.len_ = other.count_ = 0;
515+
tiering_params_ = std::move(other.tiering_params_);
509516
}
510517
return *this;
511518
}
@@ -519,7 +526,7 @@ void QList::Clear() noexcept {
519526
// If entry is offloaded we should skip freeing its memory.
520527
bool free_entry = current->offloaded == 0;
521528
if (tiering_enabled_ && (current->offloaded || current->io_pending)) {
522-
CleanupOffloadedNode(current);
529+
tiering_params_->cleanup(this, current);
523530
} else {
524531
if (current->encoding != QUICKLIST_NODE_ENCODING_RAW) {
525532
quicklistLZF* lzf = (quicklistLZF*)current->entry;
@@ -540,7 +547,6 @@ void QList::Clear() noexcept {
540547
head_ = nullptr;
541548
count_ = 0;
542549
malloc_size_ = 0;
543-
num_offloaded_nodes_ = 0;
544550
}
545551

546552
void QList::Push(string_view value, Where where) {
@@ -915,6 +921,9 @@ void QList::Replace(Iterator it, std::string_view elem) {
915921

916922
void QList::CoolOff(Node* node, uint32_t node_id) {
917923
if (tiering_enabled_) {
924+
uint32_t threshold = tiering_params_->node_depth_threshold;
925+
uint32_t num_offloaded_nodes = tiering_params_->num_offloaded_nodes;
926+
918927
// Dry run for offloading decision.
919928
// a. Node id is withing the offloadable depth - offload it if not already offloaded.
920929
// b. Node id is outside the offloadable depth - but we have too many nodes that are not
@@ -925,12 +934,11 @@ void QList::CoolOff(Node* node, uint32_t node_id) {
925934
// we won't need to traverse them again for "trivial" access patterns unless they
926935
// get accessed again. Another reason for missing offloaded nodes is that node_id can be
927936
// off due to merges (can be improved in future).
928-
if (node_id >= tiering_node_depth_threshold_ &&
929-
node_id + tiering_node_depth_threshold_ < len_) {
937+
if (node_id >= threshold && node_id + threshold < len_) {
930938
if (!node->offloaded && !node->io_pending) {
931-
OffloadNode(node);
939+
tiering_params_->offload(this, node);
932940
}
933-
} else if (num_offloaded_nodes_ * 2 + tiering_node_depth_threshold_ * 2 < len_) {
941+
} else if (num_offloaded_nodes * 2 + threshold * 2 < len_) {
934942
// We check `num_offloaded_nodes_ * 2` above to avoid frequent traversals.
935943
// So only when the gap between offloaded and non-offloaded nodes is large enough,
936944
// we do a traversal to offload more nodes.
@@ -940,16 +948,15 @@ void QList::CoolOff(Node* node, uint32_t node_id) {
940948

941949
// Traverse from both ends towards the middle as we expect more offloads towards the ends
942950
// due to usual access patterns of adding items via lpush/rpush.
943-
while (traverse_node_id <= len_ / 2 &&
944-
(num_offloaded_nodes_ + 2 * tiering_node_depth_threshold_) < len_) {
945-
if (traverse_node_id >= tiering_node_depth_threshold_) {
951+
while (traverse_node_id <= len_ / 2 && (num_offloaded_nodes + 2 * threshold) < len_) {
952+
if (traverse_node_id >= threshold) {
946953
if (fw->offloaded == 0 && fw->io_pending == 0) {
947-
OffloadNode(fw);
954+
tiering_params_->offload(this, fw);
948955
}
949956

950957
// Avoid offloading the same node twice when fw and rev meet in the middle.
951958
if (rev != fw && rev->offloaded == 0 && rev->io_pending == 0) {
952-
OffloadNode(rev);
959+
tiering_params_->offload(this, rev);
953960
}
954961
}
955962
fw = fw->next;
@@ -1047,12 +1054,12 @@ void QList::Materialize(Node* node) {
10471054

10481055
// Cancel stash in progress before loading.
10491056
if (node->io_pending) {
1050-
CleanupOffloadedNode(node);
1057+
tiering_params_->cleanup(this, node);
10511058
}
10521059

10531060
// Load the offloaded node data back into memory.
10541061
if (node->offloaded) {
1055-
ReadOffloadedNode(node);
1062+
tiering_params_->load(this, node);
10561063
}
10571064

10581065
DCHECK(!node->offloaded);
@@ -1195,7 +1202,7 @@ void QList::DelNode(Node* node) {
11951202
}
11961203

11971204
if (tiering_enabled_ && (node->offloaded || node->io_pending)) {
1198-
CleanupOffloadedNode(node);
1205+
tiering_params_->cleanup(this, node);
11991206
}
12001207

12011208
/* If we deleted a node within our compress depth, we
@@ -1243,18 +1250,22 @@ bool QList::DelPackedIndex(Node* node, uint8_t* p) {
12431250
return false;
12441251
}
12451252

1246-
void QList::OffloadNode(Node* node) const {
1247-
DCHECK(tiering_enabled_ && node->offloaded == 0 && node->io_pending == 0);
1248-
stats.offload_requests++;
1249-
node->io_pending = 1;
1250-
}
1251-
1252-
void QList::ReadOffloadedNode(QList::Node* node) const {
1253-
stats.onload_requests++;
1254-
}
1255-
1256-
void QList::CleanupOffloadedNode(QList::Node* node) const {
1257-
node->io_pending = 0;
1253+
void QList::SetDbIndex(DbIndex db_id) {
1254+
if (db_id_ == db_id) {
1255+
return;
1256+
}
1257+
// With tiering enabled, we materialize all offloaded nodes before reassigning the db_id.
1258+
// This is suboptimal: pending nodes could be canceled, and fully offloaded nodes only need
1259+
// a statistics update — they don't depend on db_id, only on their storage offset.
1260+
if (tiering_enabled_ && tiering_params_->num_offloaded_nodes > 0) {
1261+
Node* node = head_;
1262+
while (node) {
1263+
Node* next = node->next;
1264+
Materialize(node);
1265+
node = (next == head_) ? nullptr : next;
1266+
}
1267+
}
1268+
db_id_ = db_id;
12581269
}
12591270

12601271
void QList::InitIteratorEntry(Iterator* it) const {

src/core/qlist.h

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <string>
1313

1414
#include "core/collection_entry.h"
15+
#include "server/common_types.h"
1516

1617
#define QL_COMP_BITS 16
1718
#define QL_BM_BITS 4
@@ -85,6 +86,7 @@ class QList {
8586
size_t GetLZF(void** data) const;
8687

8788
void SetExternal(size_t offset, uint32_t sz);
89+
void Upload(QList* ql, std::string_view val);
8890
std::pair<size_t, size_t> GetExternalSlice() const {
8991
return std::make_pair(size_t(ext_offset), size_t(ext_size));
9092
}
@@ -122,9 +124,22 @@ class QList {
122124
}
123125

124126
// Add to the number of offloaded nodes by one.
125-
void IncrementNumOffloadedNodes(int delta) {
126-
num_offloaded_nodes_ += delta;
127+
void AdjustOffloadNodeCount(int delta) {
128+
if (tiering_enabled_) {
129+
tiering_params_->num_offloaded_nodes += delta;
130+
}
131+
}
132+
133+
DbIndex GetDbIndex() const {
134+
return db_id_;
127135
}
136+
struct TieringParams {
137+
uint32_t num_offloaded_nodes = 0;
138+
uint32_t node_depth_threshold = 0;
139+
void (*offload)(QList* ql, Node* node) = nullptr;
140+
void (*load)(QList* ql, Node* node) = nullptr;
141+
void (*cleanup)(QList* ql, Node* node) = nullptr;
142+
};
128143

129144
/**
130145
* fill: The number of entries allowed per internal list node can be specified
@@ -260,12 +275,15 @@ class QList {
260275
zstd_threshold_ = threshold;
261276
}
262277

263-
// Enable tiered storage and set node depth threshold
264-
void EnableTiering(uint32_t threshold) {
278+
// Enable tiered storage.
279+
void EnableTiering(const TieringParams& params) {
265280
tiering_enabled_ = 1;
266-
tiering_node_depth_threshold_ = threshold;
281+
tiering_params_ = std::make_unique<TieringParams>(params);
267282
}
268283

284+
// Updates the db index associated with this list.
285+
void SetDbIndex(DbIndex db_id);
286+
269287
struct Stats {
270288
uint64_t compression_attempts = 0;
271289

@@ -340,13 +358,6 @@ class QList {
340358
void DelNode(Node* node);
341359
bool DelPackedIndex(Node* node, uint8_t* p);
342360

343-
// Offload node to tiered storage
344-
void OffloadNode(Node* node) const;
345-
// Read offloaded node from tiered storage
346-
void ReadOffloadedNode(Node* node) const;
347-
// Delete offloaded node or cancel offloading of node
348-
void CleanupOffloadedNode(Node* node) const;
349-
350361
// Initializes iterator's zi_ to point to the element at offset_.
351362
// Decompresses the node if needed. Assumes current_ is not null.
352363
void InitIteratorEntry(Iterator* it) const;
@@ -364,9 +375,9 @@ class QList {
364375
unsigned compress_ : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
365376
unsigned bookmark_count_ : QL_BM_BITS;
366377
unsigned reserved2_ : 12;
367-
uint32_t num_offloaded_nodes_ = 0;
368-
uint32_t zstd_threshold_ = 0; // 0 = disabled
369-
uint32_t tiering_node_depth_threshold_ = 0; // 0 = disabled
378+
uint16_t db_id_ = kInvalidDbId;
379+
uint32_t zstd_threshold_ = 0; // 0 = disabled
380+
std::unique_ptr<TieringParams> tiering_params_;
370381
};
371382

372383
} // namespace dfly

src/core/qlist_test.cc

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -411,20 +411,6 @@ TEST_F(QListTest, DefragmentListpackCompressed) {
411411
ASSERT_EQ(i, total_items);
412412
}
413413

414-
TEST_F(QListTest, Tiering) {
415-
QList::stats.offload_requests = 0;
416-
QList tiered_ql;
417-
418-
// Enable tiering and set node_depth_threshold = 1
419-
tiered_ql.EnableTiering(1);
420-
421-
for (int i = 0; i < 8000; i++) {
422-
tiered_ql.Push(absl::StrCat("value", i), QList::TAIL);
423-
}
424-
425-
EXPECT_EQ(QList::stats.offload_requests, 9);
426-
}
427-
428414
// MergeNodes must not follow the head_->prev circular link when looking for
429415
// adjacent nodes to merge. Splitting a full head node and calling MergeNodes
430416
// on the right half used to traverse new_head->prev (= tail), merging two

src/server/db_slice.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,6 +1566,12 @@ void DbSlice::RemoveOffloadedEntriesFromTieredStorage(absl::Span<const DbIndex>
15661566
} else if (it->second.HasStashPending()) {
15671567
tiered_storage->CancelStash(std::make_pair(index, it->first.GetSlice(&scratch)),
15681568
&it->second);
1569+
} else if (it->second.ObjType() == OBJ_LIST && it->second.Encoding() == kEncodingQL2) {
1570+
// Nodes can be offloaded to tiered storage, but the main object doesn't have external or
1571+
// pending flag set. We need to clear the list explicitly.
1572+
if (auto* ql = static_cast<QList*>(it->second.RObjPtr()); ql) {
1573+
ql->Clear();
1574+
}
15691575
}
15701576
});
15711577
} while (cursor);

src/server/generic_family.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,14 @@ OpStatus OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) {
974974
auto& add_res = *op_result;
975975
add_res.it->first.SetSticky(sticky);
976976

977+
// When tiering is enabled, update tiered-storage metadata for partial moved values.
978+
if (EngineShard::tlocal()->tiered_storage()) {
979+
if (add_res.it->second.ObjType() == OBJ_LIST && add_res.it->second.Encoding() == kEncodingQL2) {
980+
auto* ql = static_cast<QList*>(add_res.it->second.RObjPtr());
981+
ql->SetDbIndex(target_db);
982+
}
983+
}
984+
977985
auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
978986
if (add_res.it->second.ObjType() == OBJ_LIST && bc) {
979987
bc->Awaken(target_db, key);

src/server/list_family.cc

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ extern "C" {
2626
#include "server/error.h"
2727
#include "server/family_utils.h"
2828
#include "server/namespaces.h"
29+
#include "server/tiered_storage.h"
2930
#include "server/transaction.h"
3031

3132
namespace rng = std::ranges;
@@ -83,6 +84,37 @@ using time_point = Transaction::time_point;
8384

8485
namespace {
8586

87+
void OffloadListNode(QList* ql, QList::Node* node) {
88+
TieredStorage* ts = EngineShard::tlocal()->tiered_storage();
89+
DCHECK(ts);
90+
QList::stats.offload_requests++;
91+
StashListNode(ql->GetDbIndex(), ql, node, ts, nullptr);
92+
}
93+
94+
void LoadListNode(QList* ql, QList::Node* node) {
95+
TieredStorage* ts = EngineShard::tlocal()->tiered_storage();
96+
DCHECK(ts);
97+
QList::stats.onload_requests++;
98+
auto res = ReadTieredListNode(ql->GetDbIndex(), ql, node, node->GetExternalSlice(), ts).Get();
99+
if (!res) {
100+
LOG(WARNING) << "Failed to load list node from tiered storage: " << res.error().message();
101+
}
102+
}
103+
104+
void CleanupListNode(QList* ql, QList::Node* node) {
105+
TieredStorage* ts = EngineShard::tlocal()->tiered_storage();
106+
DCHECK(ts);
107+
if (!ts->IsClosed()) {
108+
if (node->io_pending) {
109+
ts->CancelStash(tiering::ListNodeId{ql->GetDbIndex(), ql, node}, node);
110+
} else {
111+
// We don't pass QList pointer so we need to decrease num_offloaded_nodes_ now.
112+
ql->AdjustOffloadNodeCount(-1);
113+
ts->Delete(ql->GetDbIndex(), node);
114+
}
115+
}
116+
}
117+
86118
class ListWrapper {
87119
using LP = detail::ListPack;
88120

@@ -106,9 +138,18 @@ class ListWrapper {
106138
QList* ql = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
107139
GetFlag(FLAGS_list_compress_depth));
108140

141+
// Set db index for new QList
142+
ql->SetDbIndex(db_id_);
143+
109144
const uint32_t tiering_node_depth_threshold = absl::GetFlag(FLAGS_list_tiering_threshold);
110145
if (tiering_node_depth_threshold > 0 && EngineShard::tlocal()->tiered_storage()) {
111-
ql->EnableTiering(tiering_node_depth_threshold);
146+
QList::TieringParams params{
147+
.node_depth_threshold = tiering_node_depth_threshold,
148+
.offload = OffloadListNode,
149+
.load = LoadListNode,
150+
.cleanup = CleanupListNode,
151+
};
152+
ql->EnableTiering(params);
112153
}
113154

114155
if (uint32_t zstd_thresh = GetFlag(FLAGS_list_experimental_zstd_dict_threshold);

0 commit comments

Comments
 (0)