Skip to content

Commit 46df68e

Browse files
committed
feat(tiering): Support for partial offloading of LIST nodes
Signed-off-by: mkaruza <mario@dragonflydb.io>
1 parent 950f538 commit 46df68e

14 files changed

Lines changed: 576 additions & 110 deletions

src/core/compact_object.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,8 @@ CompactObjType CompactObj::ObjType() const {
821821
return OBJ_STRING;
822822
case ExternalRep::SERIALIZED_MAP:
823823
return OBJ_HASH;
824+
case ExternalRep::LIST_NODE:
825+
return OBJ_LIST;
824826
};
825827
}
826828

src/core/compact_object.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,9 @@ class CompactObj {
172172

173173
// Different representations of external values
174174
enum class ExternalRep : uint8_t {
175-
STRING, // OBJ_STRING, Basic representation with various string encodings
176-
SERIALIZED_MAP // OBJ_HASH, Serialized map
175+
STRING, // OBJ_STRING, Basic representation with various string encodings
176+
SERIALIZED_MAP, // OBJ_HASH, Serialized map
177+
LIST_NODE // OBJ_LIST, QList::Node
177178
};
178179

179180
explicit CompactObj(bool is_key)
@@ -452,7 +453,7 @@ class CompactObj {
452453
}
453454

454455
struct SdsTtlString {
455-
char* sds_ptr; // SDS string (length via sdslen)
456+
char* sds_ptr; // SDS string (length via sdslen)
456457

457458
uint64_t exp_ms; // absolute expiry time in ms
458459
std::string_view view() const;

src/core/qlist.cc

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ extern "C" {
1818

1919
#include "base/logging.h"
2020
#include "core/page_usage/page_usage_stats.h"
21+
#include "core/tiering_types.h"
2122

2223
using namespace std;
2324

@@ -49,7 +50,7 @@ namespace dfly {
4950
namespace {
5051

5152
static_assert(sizeof(QList) == 48);
52-
static_assert(sizeof(QList::Node) == 40);
53+
static_assert(sizeof(QList::Node) == 48);
5354

5455
enum IterDir : uint8_t { FWD = 1, REV = 0 };
5556

@@ -167,6 +168,8 @@ QList::Node* CreateRAW(int container, uint8_t* entry, size_t sz) {
167168
node->recompress = 0;
168169
node->dont_compress = 0;
169170
node->offloaded = 0;
171+
node->io_pending = 0;
172+
node->fragment = nullptr;
170173

171174
return node;
172175
}
@@ -515,6 +518,14 @@ void QList::Clear() noexcept {
515518

516519
while (len_) {
517520
Node* next = current->next;
521+
522+
// Clean up offloaded/pending nodes before freeing.
523+
if (current->offloaded || current->io_pending) {
524+
if (tiering_params_ && tiering_params_->delete_cb) {
525+
tiering_params_->delete_cb(current);
526+
}
527+
}
528+
518529
if (current->encoding != QUICKLIST_NODE_ENCODING_RAW) {
519530
quicklistLZF* lzf = (quicklistLZF*)current->entry;
520531
stats.compressed_bytes -= lzf->sz;
@@ -905,7 +916,7 @@ void QList::CoolOff(Node* node, uint32_t node_id) {
905916
// off due to merges (can be improved in future).
906917
if (node_id >= tiering_params_->node_depth_threshold &&
907918
node_id + tiering_params_->node_depth_threshold < len_) {
908-
if (!node->offloaded) {
919+
if (!node->offloaded && !node->io_pending) {
909920
OffloadNode(node);
910921
}
911922
} else if (num_offloaded_nodes_ * 2 + tiering_params_->node_depth_threshold * 2 < len_) {
@@ -921,12 +932,12 @@ void QList::CoolOff(Node* node, uint32_t node_id) {
921932
while (traverse_node_id <= len_ / 2 &&
922933
(num_offloaded_nodes_ + 2 * tiering_params_->node_depth_threshold) < len_) {
923934
if (traverse_node_id >= tiering_params_->node_depth_threshold) {
924-
if (fw->offloaded == 0) {
935+
if (fw->offloaded == 0 && fw->io_pending == 0) {
925936
OffloadNode(fw);
926937
}
927938

928939
// Avoid offloading the same node twice when fw and rev meet in the middle.
929-
if (rev != fw && rev->offloaded == 0) {
940+
if (rev != fw && rev->offloaded == 0 && rev->io_pending == 0) {
930941
OffloadNode(rev);
931942
}
932943
}
@@ -994,12 +1005,28 @@ void QList::CompressByDepth(Node* node) {
9941005
void QList::AccessForReads(bool recompress, Node* node) {
9951006
DCHECK(node);
9961007
stats.total_node_reads++;
1008+
1009+
if (node->io_pending) {
1010+
// A stash is in flight — cancel it. The delete_cb handles cancellation.
1011+
DCHECK(tiering_params_);
1012+
if (tiering_params_->delete_cb) {
1013+
tiering_params_->delete_cb(node);
1014+
}
1015+
DCHECK(!node->io_pending);
1016+
}
1017+
9971018
if (node->offloaded) {
9981019
DCHECK(tiering_params_);
9991020
stats.onload_requests++;
1021+
if (tiering_params_->onload_cb) {
1022+
tiering_params_->onload_cb(node);
1023+
}
1024+
// After onload_cb returns, the node entry must be restored.
1025+
DCHECK(!node->offloaded);
1026+
DCHECK(node->entry != nullptr);
10001027
num_offloaded_nodes_--;
1001-
node->offloaded = 0;
10021028
}
1029+
10031030
if (len_ > 2 && node != head_ && node->next != nullptr) {
10041031
stats.interior_node_reads++;
10051032
}
@@ -1121,8 +1148,16 @@ void QList::DelNode(Node* node) {
11211148
len_--;
11221149
count_ -= node->count;
11231150
malloc_size_ -= node->sz;
1124-
if (node->offloaded) {
1125-
num_offloaded_nodes_--;
1151+
1152+
if (node->offloaded || node->io_pending) {
1153+
if (node->offloaded)
1154+
num_offloaded_nodes_--;
1155+
else if (node->io_pending)
1156+
num_offloaded_nodes_--; // was pre-counted in OffloadNode
1157+
// Clean up disk segment and deregister fragment.
1158+
if (tiering_params_ && tiering_params_->delete_cb) {
1159+
tiering_params_->delete_cb(node);
1160+
}
11261161
}
11271162

11281163
/* If we deleted a node within our compress depth, we
@@ -1158,9 +1193,16 @@ bool QList::DelPackedIndex(Node* node, uint8_t* p) {
11581193

11591194
void QList::OffloadNode(Node* node) {
11601195
DCHECK(tiering_params_ && node->offloaded == 0);
1196+
if (node->io_pending || node->IsCompressed())
1197+
return;
1198+
11611199
num_offloaded_nodes_++;
11621200
stats.offload_requests++;
1163-
node->offloaded = 1;
1201+
if (tiering_params_->offload_cb) {
1202+
tiering_params_->offload_cb(node);
1203+
} else {
1204+
node->offloaded = 1;
1205+
}
11641206
}
11651207

11661208
void QList::InitIteratorEntry(Iterator* it) const {
@@ -1387,6 +1429,16 @@ uint8_t* QList::TryExtractListpack() {
13871429
return res;
13881430
}
13891431

1432+
void QList::Node::SetExternal(tiering::Fragment* f) {
1433+
// Update malloc size for parent QList node
1434+
DCHECK(f->GetOpaque());
1435+
reinterpret_cast<QList*>(f->GetOpaque())->AdjustMallocSize(-sz);
1436+
zfree(entry);
1437+
entry = nullptr;
1438+
fragment = f;
1439+
offloaded = 1;
1440+
}
1441+
13901442
bool QList::Iterator::Next() {
13911443
if (!current_)
13921444
return false;

src/core/qlist.h

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <absl/functional/function_ref.h>
88

99
#include <cstdint>
10+
#include <functional>
1011
#include <memory>
1112
#include <string>
1213

@@ -26,6 +27,10 @@
2627

2728
namespace dfly {
2829

30+
namespace tiering {
31+
class Fragment;
32+
}
33+
2934
class PageUsage;
3035

3136
// Heuristic: for values smaller than 2 KiB we prefer the compact listpack
@@ -64,15 +69,26 @@ class QList {
6469
uint16_t attempted_compress : 1; /* node can't compress; too small */
6570
uint16_t dont_compress : 1; /* prevent compression of entry that will be used later */
6671
uint16_t offloaded : 1; /* node is offloaded to colder storage */
67-
uint16_t reserved1 : 8; /* reserved for future use */
68-
69-
uint16_t reserved2; /* more bits to steal for future usage */
70-
uint32_t reserved3; /* more bits to steal for future usage */
72+
uint16_t io_pending : 1; /* stash I/O is in flight for this node */
73+
uint16_t reserved1 : 7; /* reserved for future use */
7174

7275
bool IsCompressed() const {
7376
return encoding != QUICKLIST_NODE_ENCODING_RAW;
7477
}
7578

79+
tiering::Fragment* fragment = nullptr;
80+
81+
tiering::Fragment* GetFragment() const {
82+
return fragment;
83+
}
84+
85+
void RemoveExternal() {
86+
fragment = nullptr;
87+
offloaded = 0;
88+
}
89+
90+
void SetExternal(tiering::Fragment* f);
91+
7692
size_t GetLZF(void** data) const;
7793
};
7894

@@ -103,9 +119,18 @@ class QList {
103119
using IterateFunc = absl::FunctionRef<bool(Entry)>;
104120
enum InsertOpt : uint8_t { BEFORE, AFTER };
105121

122+
void AdjustMallocSize(size_t delta) {
123+
malloc_size_ += delta;
124+
}
125+
106126
struct TieringParams {
107-
// TODO: hook functions and params that allow qlist offloading nodes to colder storage.
108127
uint32_t node_depth_threshold = 2;
128+
// Called when a node should be offloaded to disk.
129+
std::function<void(Node*)> offload_cb;
130+
// Called when an offloaded node needs its data loaded back from disk.
131+
std::function<void(Node*)> onload_cb;
132+
// Called when an offloaded or io_pending node is being deleted.
133+
std::function<void(Node*)> delete_cb;
109134
};
110135

111136
/**

src/core/tiering_types.cc

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,51 @@
55
#include "core/tiering_types.h"
66

77
#include "core/compact_object.h"
8+
#include "core/overloaded.h"
89
#include "redis/redis_aux.h"
910

1011
namespace dfly::tiering {
1112

1213
bool Fragment::IsExternal() const {
13-
return std::visit([](CompactValue* pv) { return pv->IsExternal(); }, val_);
14+
return std::visit(Overloaded{[](CompactValue* pv) { return pv->IsExternal(); },
15+
[](QList::Node* node) { return node->offloaded != 0; }},
16+
val_);
1417
}
1518

1619
void Fragment::RemoveExternal() {
17-
std::visit([](CompactValue* pv) { pv->RemoveExternal(); }, val_);
20+
std::visit(Overloaded{[](CompactValue* pv) { pv->RemoveExternal(); },
21+
[](QList::Node* node) { node->RemoveExternal(); }},
22+
val_);
1823
}
1924

2025
void Fragment::SetExternal() {
21-
std::visit([this](CompactValue* pv) { pv->SetExternal(this); }, val_);
26+
std::visit(Overloaded{[this](CompactValue* pv) { pv->SetExternal(this); },
27+
[this](QList::Node* node) { node->SetExternal(this); }},
28+
val_);
2229
}
2330

2431
bool Fragment::HasStashPending() const {
25-
return std::visit([](CompactValue* pv) { return pv->HasStashPending(); }, val_);
32+
return std::visit(Overloaded{[](CompactValue* pv) { return pv->HasStashPending(); },
33+
[](QList::Node* node) { return node->io_pending != 0; }},
34+
val_);
2635
}
2736

2837
void Fragment::SetStashPending(bool b) {
29-
std::visit([b](CompactValue* pv) { pv->SetStashPending(b); }, val_);
38+
std::visit(Overloaded{[b](CompactValue* pv) { pv->SetStashPending(b); },
39+
[b](QList::Node* node) { node->io_pending = b ? 1 : 0; }},
40+
val_);
3041
}
3142

3243
CompactObjType Fragment::ObjType() const {
33-
return std::visit([](CompactValue* pv) { return pv->ObjType(); }, val_);
44+
return std::visit(Overloaded{[](CompactValue* pv) { return pv->ObjType(); },
45+
[](QList::Node*) -> CompactObjType { return OBJ_LIST; }},
46+
val_);
47+
}
48+
49+
size_t Fragment::MallocUsed() const {
50+
return std::visit(Overloaded{[](CompactValue* pv) { return pv->MallocUsed(); },
51+
[](QList::Node* node) { return node->sz; }},
52+
val_);
3453
}
3554

3655
auto Fragment::GetDescr(const CompactValue* pv) -> SerializationDescr {
@@ -52,8 +71,18 @@ auto Fragment::GetDescr(const CompactValue* pv) -> SerializationDescr {
5271
};
5372
}
5473

74+
auto Fragment::GetDescr(const QList::Node* node) -> SerializationDescr {
75+
if (!node->entry || node->sz == 0)
76+
return {};
77+
78+
std::string_view entry_view{reinterpret_cast<const char*>(node->entry), node->sz};
79+
return {std::array<std::string_view, 2>{entry_view, {}}, CompactObj::ExternalRep::LIST_NODE};
80+
}
81+
5582
auto Fragment::GetSerializationDescr() const -> SerializationDescr {
56-
return std::visit([](CompactValue* pv) { return GetDescr(pv); }, val_);
83+
return std::visit(Overloaded{[](CompactValue* pv) { return GetDescr(pv); },
84+
[](QList::Node* node) { return GetDescr(node); }},
85+
val_);
5786
}
5887

5988
std::pair<size_t, size_t> Fragment::GetExternalSlice() const {

src/core/tiering_types.h

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <boost/intrusive/list_hook.hpp>
88

99
#include "core/compact_object.h"
10+
#include "core/qlist.h"
1011

1112
namespace dfly::tiering {
1213

@@ -32,14 +33,17 @@ class Fragment {
3233
CompactObj::ExternalRep rep = CompactObj::ExternalRep::STRING;
3334
};
3435

35-
using FragmentType = std::variant<CompactValue*>;
36+
using FragmentType = std::variant<CompactValue*, QList::Node*>;
3637

3738
Fragment(CompactValue& pv) : val_(&pv) { // NOLINT
3839
}
3940

4041
Fragment(CompactValue* pv) : val_(pv) { // NOLINT
4142
}
4243

44+
Fragment(QList::Node* node) : val_(node) { // NOLINT
45+
}
46+
4347
bool IsExternal() const;
4448
void RemoveExternal();
4549
void SetExternal();
@@ -48,6 +52,7 @@ class Fragment {
4852
void SetStashPending(bool b);
4953

5054
CompactObjType ObjType() const;
55+
size_t MallocUsed() const;
5156

5257
// Determine required byte size and encoding type based on value.
5358
SerializationDescr GetSerializationDescr() const;
@@ -91,6 +96,14 @@ class Fragment {
9196
val_ = pv;
9297
}
9398

99+
void SetOpaque(void* opaque) {
100+
opaque_ = opaque;
101+
}
102+
103+
void* GetOpaque() {
104+
return opaque_;
105+
}
106+
94107
void SetId(size_t id) {
95108
id_ = id;
96109
}
@@ -101,6 +114,7 @@ class Fragment {
101114

102115
private:
103116
static SerializationDescr GetDescr(const CompactValue* pv);
117+
static SerializationDescr GetDescr(const QList::Node* node);
104118

105119
size_t id_ = 0;
106120

@@ -113,8 +127,9 @@ class Fragment {
113127
uint32_t serialized_size_ = 0;
114128
size_t offset_ = 0;
115129

116-
// First byte of the value if Huffman encoded
117-
uint8_t first_byte_ = 0;
130+
uint8_t first_byte_ = 0; // First byte of the value if Huffman encoded
131+
132+
void* opaque_; // Auxilary structure that can be attached to fragment value object
118133

119134
FragmentType val_;
120135
};

0 commit comments

Comments
 (0)