Skip to content

Commit 18a1959

Browse files
committed
Remove callbacks and move tiering responsibility to qlist
1 parent 0eea5f3 commit 18a1959

4 files changed

Lines changed: 56 additions & 76 deletions

File tree

src/core/qlist.cc

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -459,15 +459,12 @@ size_t QList::DefragIfNeeded(PageUsage* page_usage) {
459459
return reallocated;
460460
}
461461

462-
void QList::SetTieringParams(const TieringParams& params) {
463-
tiering_params_ = make_unique<TieringParams>(params);
464-
}
465-
466462
QList::QList(int fill, int compress)
467463
: fill_(fill),
468464
dict_learning_failed_(0),
469465
dict_compress_failed_(0),
470466
dict_bulk_finished_(0),
467+
tiering_enabled_(0),
471468
compress_(compress),
472469
bookmark_count_(0) {
473470
}
@@ -480,10 +477,10 @@ QList::QList(QList&& other) noexcept
480477
dict_learning_failed_(other.dict_learning_failed_),
481478
dict_compress_failed_(other.dict_compress_failed_),
482479
dict_bulk_finished_(other.dict_bulk_finished_),
480+
tiering_enabled_(other.tiering_enabled_),
483481
compress_(other.compress_),
484482
bookmark_count_(other.bookmark_count_),
485-
num_offloaded_nodes_(other.num_offloaded_nodes_),
486-
tiering_params_(std::move(other.tiering_params_)) {
483+
num_offloaded_nodes_(other.num_offloaded_nodes_) {
487484
other.head_ = nullptr;
488485
other.len_ = other.count_ = 0;
489486
other.num_offloaded_nodes_ = 0;
@@ -503,9 +500,9 @@ QList& QList::operator=(QList&& other) noexcept {
503500
dict_learning_failed_ = other.dict_learning_failed_;
504501
dict_compress_failed_ = other.dict_compress_failed_;
505502
dict_bulk_finished_ = other.dict_bulk_finished_;
503+
tiering_enabled_ = other.tiering_enabled_;
506504
compress_ = other.compress_;
507505
bookmark_count_ = other.bookmark_count_;
508-
tiering_params_ = std::move(other.tiering_params_);
509506
num_offloaded_nodes_ = other.num_offloaded_nodes_;
510507
other.head_ = nullptr;
511508
other.len_ = other.count_ = other.num_offloaded_nodes_ = 0;
@@ -521,10 +518,8 @@ void QList::Clear() noexcept {
521518

522519
// If entry is offloaded we should skip freeing its memory.
523520
bool free_entry = current->offloaded == 0;
524-
if (current->offloaded || current->io_pending) {
525-
if (tiering_params_ && tiering_params_->delete_cb) {
526-
tiering_params_->delete_cb(current);
527-
}
521+
if (tiering_enabled_ && (current->offloaded || current->io_pending)) {
522+
CleanupOffloadedNode(current);
528523
} else {
529524
if (current->encoding != QUICKLIST_NODE_ENCODING_RAW) {
530525
quicklistLZF* lzf = (quicklistLZF*)current->entry;
@@ -919,7 +914,7 @@ void QList::Replace(Iterator it, std::string_view elem) {
919914
}
920915

921916
void QList::CoolOff(Node* node, uint32_t node_id) {
922-
if (tiering_params_) {
917+
if (tiering_enabled_) {
923918
// Dry run for offloading decision.
924919
// a. Node id is withing the offloadable depth - offload it if not already offloaded.
925920
// b. Node id is outside the offloadable depth - but we have too many nodes that are not
@@ -930,12 +925,12 @@ void QList::CoolOff(Node* node, uint32_t node_id) {
930925
// we won't need to traverse them again for "trivial" access patterns unless they
931926
// get accessed again. Another reason for missing offloaded nodes is that node_id can be
932927
// off due to merges (can be improved in future).
933-
if (node_id >= tiering_params_->node_depth_threshold &&
934-
node_id + tiering_params_->node_depth_threshold < len_) {
928+
if (node_id >= tiering_node_depth_threshold_ &&
929+
node_id + tiering_node_depth_threshold_ < len_) {
935930
if (!node->offloaded && !node->io_pending) {
936931
OffloadNode(node);
937932
}
938-
} else if (num_offloaded_nodes_ * 2 + tiering_params_->node_depth_threshold * 2 < len_) {
933+
} else if (num_offloaded_nodes_ * 2 + tiering_node_depth_threshold_ * 2 < len_) {
939934
// We check `num_offloaded_nodes_ * 2` above to avoid frequent traversals.
940935
// So only when the gap between offloaded and non-offloaded nodes is large enough,
941936
// we do a traversal to offload more nodes.
@@ -946,8 +941,8 @@ void QList::CoolOff(Node* node, uint32_t node_id) {
946941
// Traverse from both ends towards the middle as we expect more offloads towards the ends
947942
// due to usual access patterns of adding items via lpush/rpush.
948943
while (traverse_node_id <= len_ / 2 &&
949-
(num_offloaded_nodes_ + 2 * tiering_params_->node_depth_threshold) < len_) {
950-
if (traverse_node_id >= tiering_params_->node_depth_threshold) {
944+
(num_offloaded_nodes_ + 2 * tiering_node_depth_threshold_) < len_) {
945+
if (traverse_node_id >= tiering_node_depth_threshold_) {
951946
if (fw->offloaded == 0 && fw->io_pending == 0) {
952947
OffloadNode(fw);
953948
}
@@ -1047,18 +1042,17 @@ void QList::CompressByDepth(Node* node) {
10471042
}
10481043

10491044
void QList::Materialize(Node* node) {
1050-
if (!tiering_params_ || (!node->offloaded && !node->io_pending))
1045+
if (!tiering_enabled_ || (!node->offloaded && !node->io_pending))
10511046
return;
10521047

10531048
// Cancel stash in progress before loading.
1054-
if (node->io_pending && tiering_params_->delete_cb) {
1055-
tiering_params_->delete_cb(node);
1049+
if (node->io_pending) {
1050+
CleanupOffloadedNode(node);
10561051
}
10571052

10581053
// Load the offloaded node data back into memory.
1059-
if (node->offloaded && tiering_params_->onload_cb) {
1060-
stats.onload_requests++;
1061-
tiering_params_->onload_cb(node);
1054+
if (node->offloaded) {
1055+
ReadOffloadedNode(node);
10621056
}
10631057

10641058
DCHECK(!node->offloaded);
@@ -1198,10 +1192,8 @@ void QList::DelNode(Node* node) {
11981192
malloc_size_ -= node->sz;
11991193
}
12001194

1201-
if (tiering_params_ && (node->offloaded || node->io_pending)) {
1202-
if (tiering_params_->delete_cb) {
1203-
tiering_params_->delete_cb(node);
1204-
}
1195+
if (tiering_enabled_ && (node->offloaded || node->io_pending)) {
1196+
CleanupOffloadedNode(node);
12051197
}
12061198

12071199
/* If we deleted a node within our compress depth, we
@@ -1249,12 +1241,18 @@ bool QList::DelPackedIndex(Node* node, uint8_t* p) {
12491241
return false;
12501242
}
12511243

1252-
void QList::OffloadNode(Node* node) {
1253-
DCHECK(tiering_params_ && node->offloaded == 0 && node->io_pending == 0);
1244+
void QList::OffloadNode(Node* node) const {
1245+
DCHECK(tiering_enabled_ && node->offloaded == 0 && node->io_pending == 0);
12541246
stats.offload_requests++;
1255-
if (tiering_params_->offload_cb) {
1256-
tiering_params_->offload_cb(node);
1257-
}
1247+
node->io_pending = 1;
1248+
}
1249+
1250+
void QList::ReadOffloadedNode(QList::Node* node) const {
1251+
stats.onload_requests++;
1252+
}
1253+
1254+
void QList::CleanupOffloadedNode(QList::Node* node) const {
1255+
node->io_pending = 0;
12581256
}
12591257

12601258
void QList::InitIteratorEntry(Iterator* it) const {

src/core/qlist.h

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -126,17 +126,6 @@ class QList {
126126
num_offloaded_nodes_ += delta;
127127
}
128128

129-
struct TieringParams {
130-
uint32_t node_depth_threshold = 2;
131-
// Called when a node should be offloaded.
132-
// True if node meets criteria for offloading and stashing was initiated.
133-
std::function<bool(Node*)> offload_cb;
134-
// Called when an offloaded node needs its data loaded back into memory
135-
std::function<void(Node*)> onload_cb;
136-
// Called when an offloaded or io_pending node is being deleted.
137-
std::function<void(Node*)> delete_cb;
138-
};
139-
140129
/**
141130
* fill: The number of entries allowed per internal list node can be specified
142131
* as a fixed maximum size or a maximum number of elements.
@@ -265,14 +254,18 @@ class QList {
265254
// Returns count of nodes reallocated to help in testing.
266255
size_t DefragIfNeeded(PageUsage* page_usage);
267256

268-
void SetTieringParams(const TieringParams& params);
269-
270257
// Sets the malloc_size_ threshold at which ZSTD dictionary training is triggered.
271258
// 0 disables ZSTD dictionary compression.
272259
void set_compr_threshold(uint32_t threshold) {
273260
zstd_threshold_ = threshold;
274261
}
275262

263+
// Enable tiered storage and set node depth threshold
264+
void EnableTiering(uint32_t threshold) {
265+
tiering_enabled_ = 1;
266+
tiering_node_depth_threshold_ = threshold;
267+
}
268+
276269
struct Stats {
277270
uint64_t compression_attempts = 0;
278271

@@ -346,7 +339,13 @@ class QList {
346339

347340
void DelNode(Node* node);
348341
bool DelPackedIndex(Node* node, uint8_t* p);
349-
void OffloadNode(Node* node);
342+
343+
// Offload node to tiered storage
344+
void OffloadNode(Node* node) const;
345+
// Read offloaded node from tiered storage
346+
void ReadOffloadedNode(Node* node) const;
347+
// Delete offloaded node or cancel offloading of node
348+
void CleanupOffloadedNode(Node* node) const;
350349

351350
// Initializes iterator's zi_ to point to the element at offset_.
352351
// Decompresses the node if needed. Assumes current_ is not null.
@@ -360,13 +359,14 @@ class QList {
360359
uint16_t dict_learning_failed_ : 1; /* thread-local dict training failed for this list's data */
361360
uint16_t dict_compress_failed_ : 1; /* compression with thread-local dict failed for this list */
362361
uint16_t dict_bulk_finished_ : 1; /* bulk compression done, per-node compression active */
363-
uint16_t reserved1_ : 13;
362+
uint16_t tiering_enabled_ : 1; /* tiering storage enabled */
363+
uint16_t reserved1_ : 12;
364364
unsigned compress_ : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
365365
unsigned bookmark_count_ : QL_BM_BITS;
366366
unsigned reserved2_ : 12;
367367
uint32_t num_offloaded_nodes_ = 0;
368-
uint32_t zstd_threshold_ = 0; // 0 = disabled
369-
std::unique_ptr<TieringParams> tiering_params_;
368+
uint32_t zstd_threshold_ = 0; // 0 = disabled
369+
uint32_t tiering_node_depth_threshold_ = 0; // 0 = disabled
370370
};
371371

372372
} // namespace dfly

src/core/qlist_test.cc

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -413,28 +413,13 @@ TEST_F(QListTest, DefragmentListpackCompressed) {
413413

414414
TEST_F(QListTest, Tiering) {
415415
QList::stats.offload_requests = 0;
416+
QList tiered_ql;
416417

417-
// Simulate tiering setup where we mark offloaded nodes as pending so that
418-
// throttling logic works correctly. We need to free memory manually in the
419-
// delete callback.
420-
QList::TieringParams params{.node_depth_threshold = 1,
421-
.offload_cb =
422-
[](QList::Node* node) {
423-
node->io_pending = 1;
424-
return true;
425-
},
426-
.onload_cb = nullptr,
427-
.delete_cb =
428-
[](QList::Node* node) {
429-
zfree(node->entry);
430-
node->entry = nullptr;
431-
node->io_pending = 0;
432-
}};
433-
434-
ql_.SetTieringParams(params);
418+
// Enable tiering and set node_depth_threshold = 1
419+
tiered_ql.EnableTiering(1);
435420

436421
for (int i = 0; i < 8000; i++) {
437-
ql_.Push(absl::StrCat("value", i), QList::TAIL);
422+
tiered_ql.Push(absl::StrCat("value", i), QList::TAIL);
438423
}
439424

440425
EXPECT_EQ(QList::stats.offload_requests, 9);

src/server/list_family.cc

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ extern "C" {
2626
#include "server/error.h"
2727
#include "server/family_utils.h"
2828
#include "server/namespaces.h"
29-
#include "server/tiered_storage.h"
3029
#include "server/transaction.h"
3130

3231
/**
@@ -101,17 +100,15 @@ class ListWrapper {
101100
if (ShouldStoreAsListPack(sz + additional_size)) {
102101
return nullptr;
103102
}
103+
104104
QList* ql = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
105105
GetFlag(FLAGS_list_compress_depth));
106106

107-
if (GetFlag(FLAGS_list_tiering_threshold) > 0 && EngineShard::tlocal()->tiered_storage()) {
108-
QList::TieringParams params{.node_depth_threshold = GetFlag(FLAGS_list_tiering_threshold),
109-
.offload_cb = nullptr,
110-
.onload_cb = nullptr,
111-
.delete_cb = nullptr};
112-
113-
ql->SetTieringParams(params);
107+
const uint32_t tiering_node_depth_threshold = absl::GetFlag(FLAGS_list_tiering_threshold);
108+
if (tiering_node_depth_threshold > 0 && EngineShard::tlocal()->tiered_storage()) {
109+
ql->EnableTiering(tiering_node_depth_threshold);
114110
}
111+
115112
if (uint32_t zstd_thresh = GetFlag(FLAGS_list_experimental_zstd_dict_threshold);
116113
zstd_thresh > 0) {
117114
ql->set_compr_threshold(zstd_thresh);

0 commit comments

Comments
 (0)