Skip to content

Commit 452b5f7

Browse files
committed
feat(tiering): Add experimental list node offloading
Extend tiered storage to offload individual QList nodes to disk, gated by --tiered_experimental_list_support. Key changes: - Add ListNodeId (dbid, node*, ql*) as a new tiering identity type alongside KeyRef; introduce PendingId/ReadId variants so OpManager can handle both key-level and node-level I/O uniformly. - Add tiering callbacks (offload_cb, onload_cb, delete_cb) to QList::TieringParams. PromoteToQLIfNeeded now wires them up when tiered storage is available, using the new db_id_ member on ListWrapper to store DbIndex for usage. - Update CancelStash / ReadInternal signatures to accept the generic PendingId / ReadId types instead of (dbid, key). - Add IsClosed() / is_closed_ guard so delete_cb skips I/O after TieredStorage::Close() is called. - Expose StashListNode / ReadTieredListNode free functions used by the ListWrapper callbacks. - Added ListNodeTieringTest suite Signed-off-by: mkaruza <mario@dragonflydb.io>
1 parent 72cebe7 commit 452b5f7

15 files changed

Lines changed: 567 additions & 119 deletions

src/core/compact_object.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,8 @@ CompactObjType CompactObj::ObjType() const {
815815
return OBJ_STRING;
816816
case ExternalRep::SERIALIZED_MAP:
817817
return OBJ_HASH;
818+
default:
819+
break;
818820
};
819821
}
820822

src/core/compact_object.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,9 @@ class CompactObj {
171171

172172
// Different representations of external values
173173
enum class ExternalRep : uint8_t {
174-
STRING, // OBJ_STRING, Basic representation with various string encodings
175-
SERIALIZED_MAP // OBJ_HASH, Serialized map
174+
STRING, // OBJ_STRING, Basic representation with various string encodings
175+
SERIALIZED_MAP, // OBJ_HASH, Serialized map
176+
LIST_NODE // OBJ_LIST, QList::Node
176177
};
177178

178179
explicit CompactObj(bool is_key)

src/core/qlist.cc

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ extern "C" {
1717

1818
#include "base/logging.h"
1919
#include "core/page_usage/page_usage_stats.h"
20+
#include "core/tiering_types.h"
2021

2122
using namespace std;
2223

@@ -48,7 +49,7 @@ namespace dfly {
4849
namespace {
4950

5051
static_assert(sizeof(QList) == 48);
51-
static_assert(sizeof(QList::Node) == 40);
52+
static_assert(sizeof(QList::Node) == 48);
5253

5354
enum IterDir : uint8_t { FWD = 1, REV = 0 };
5455

@@ -166,6 +167,7 @@ QList::Node* CreateRAW(int container, uint8_t* entry, size_t sz) {
166167
node->recompress = 0;
167168
node->dont_compress = 0;
168169
node->offloaded = 0;
170+
node->io_pending = 0;
169171

170172
return node;
171173
}
@@ -459,6 +461,14 @@ void QList::Clear() noexcept {
459461

460462
while (len_) {
461463
Node* next = current->next;
464+
465+
// Clean up offloaded/pending nodes before freeing.
466+
if (current->offloaded || current->io_pending) {
467+
if (tiering_params_ && tiering_params_->delete_cb) {
468+
tiering_params_->delete_cb(current);
469+
}
470+
}
471+
462472
if (current->encoding != QUICKLIST_NODE_ENCODING_RAW) {
463473
quicklistLZF* lzf = (quicklistLZF*)current->entry;
464474
stats.compressed_bytes -= lzf->sz;
@@ -849,7 +859,7 @@ void QList::CoolOff(Node* node, uint32_t node_id) {
849859
// off due to merges (can be improved in future).
850860
if (node_id >= tiering_params_->node_depth_threshold &&
851861
node_id + tiering_params_->node_depth_threshold < len_) {
852-
if (!node->offloaded) {
862+
if (!node->offloaded && !node->io_pending) {
853863
OffloadNode(node);
854864
}
855865
} else if (num_offloaded_nodes_ * 2 + tiering_params_->node_depth_threshold * 2 < len_) {
@@ -865,12 +875,12 @@ void QList::CoolOff(Node* node, uint32_t node_id) {
865875
while (traverse_node_id <= len_ / 2 &&
866876
(num_offloaded_nodes_ + 2 * tiering_params_->node_depth_threshold) < len_) {
867877
if (traverse_node_id >= tiering_params_->node_depth_threshold) {
868-
if (fw->offloaded == 0) {
878+
if (fw->offloaded == 0 && fw->io_pending == 0) {
869879
OffloadNode(fw);
870880
}
871881

872882
// Avoid offloading the same node twice when fw and rev meet in the middle.
873-
if (rev != fw && rev->offloaded == 0) {
883+
if (rev != fw && rev->offloaded == 0 && rev->io_pending == 0) {
874884
OffloadNode(rev);
875885
}
876886
}
@@ -938,12 +948,28 @@ void QList::CompressByDepth(Node* node) {
938948
void QList::AccessForReads(bool recompress, Node* node) {
939949
DCHECK(node);
940950
stats.total_node_reads++;
951+
952+
if (node->io_pending) {
953+
// Offload in progres so cancel it and read the node synchronously.
954+
DCHECK(tiering_params_);
955+
if (tiering_params_->delete_cb) {
956+
tiering_params_->delete_cb(node);
957+
}
958+
DCHECK(!node->io_pending);
959+
}
960+
941961
if (node->offloaded) {
942962
DCHECK(tiering_params_);
943963
stats.onload_requests++;
964+
if (tiering_params_->onload_cb) {
965+
tiering_params_->onload_cb(node);
966+
}
967+
// After onload_cb returns, the node entry must be restored.
968+
DCHECK(!node->offloaded);
969+
DCHECK(node->entry != nullptr);
944970
num_offloaded_nodes_--;
945-
node->offloaded = 0;
946971
}
972+
947973
if (len_ > 2 && node != head_ && node->next != nullptr) {
948974
stats.interior_node_reads++;
949975
}
@@ -1065,8 +1091,16 @@ void QList::DelNode(Node* node) {
10651091
len_--;
10661092
count_ -= node->count;
10671093
malloc_size_ -= node->sz;
1068-
if (node->offloaded) {
1069-
num_offloaded_nodes_--;
1094+
1095+
if (node->offloaded || node->io_pending) {
1096+
if (node->offloaded)
1097+
num_offloaded_nodes_--;
1098+
else if (node->io_pending)
1099+
num_offloaded_nodes_--; // was pre-counted in OffloadNode
1100+
// Clean up disk segment and deregister fragment.
1101+
if (tiering_params_ && tiering_params_->delete_cb) {
1102+
tiering_params_->delete_cb(node);
1103+
}
10701104
}
10711105

10721106
/* If we deleted a node within our compress depth, we
@@ -1102,9 +1136,14 @@ bool QList::DelPackedIndex(Node* node, uint8_t* p) {
11021136

11031137
void QList::OffloadNode(Node* node) {
11041138
DCHECK(tiering_params_ && node->offloaded == 0);
1139+
if (node->io_pending)
1140+
return;
1141+
11051142
num_offloaded_nodes_++;
11061143
stats.offload_requests++;
1107-
node->offloaded = 1;
1144+
if (tiering_params_->offload_cb) {
1145+
tiering_params_->offload_cb(node);
1146+
}
11081147
}
11091148

11101149
void QList::InitIteratorEntry(Iterator* it) const {
@@ -1396,4 +1435,13 @@ auto QList::Iterator::Get() const -> Entry {
13961435
return ptr ? Entry(reinterpret_cast<char*>(ptr), sz) : Entry(val);
13971436
}
13981437

1438+
void QList::Node::SetExternal(size_t offset, uint32_t size) {
1439+
DCHECK(entry);
1440+
zfree(entry);
1441+
offloaded = 1;
1442+
entry = nullptr;
1443+
ext.offset = offset;
1444+
ext.size = size;
1445+
}
1446+
13991447
} // namespace dfly

src/core/qlist.h

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <absl/functional/function_ref.h>
88

99
#include <cstdint>
10+
#include <functional>
1011
#include <memory>
1112
#include <string>
1213

@@ -61,16 +62,27 @@ class QList {
6162
uint16_t attempted_compress : 1; /* node can't compress; too small */
6263
uint16_t dont_compress : 1; /* prevent compression of entry that will be used later */
6364
uint16_t offloaded : 1; /* node is offloaded to colder storage */
64-
uint16_t reserved1 : 8; /* reserved for future use */
65-
66-
uint16_t reserved2; /* more bits to steal for future usage */
67-
uint32_t reserved3; /* more bits to steal for future usage */
65+
uint16_t io_pending : 1; /* node has pending io operation (offload or onload) */
66+
uint16_t reserved1 : 7; /* reserved for future use */
6867

6968
bool IsCompressed() const {
7069
return encoding != QUICKLIST_NODE_ENCODING_RAW;
7170
}
7271

7372
size_t GetLZF(void** data) const;
73+
74+
struct __attribute__((__packed__)) ExternalRecord {
75+
uint32_t size;
76+
size_t offset;
77+
};
78+
static_assert(sizeof(ExternalRecord) == 12);
79+
80+
ExternalRecord ext;
81+
82+
void SetExternal(size_t offset, uint32_t sz);
83+
std::pair<size_t, size_t> GetExternalSlice() const {
84+
return std::make_pair(size_t(ext.offset), size_t(ext.size));
85+
}
7486
};
7587

7688
using Entry = CollectionEntry;
@@ -100,9 +112,18 @@ class QList {
100112
using IterateFunc = absl::FunctionRef<bool(Entry)>;
101113
enum InsertOpt : uint8_t { BEFORE, AFTER };
102114

115+
void AdjustMallocSize(size_t delta) {
116+
malloc_size_ += delta;
117+
}
118+
103119
struct TieringParams {
104-
// TODO: hook functions and params that allow qlist offloading nodes to colder storage.
105120
uint32_t node_depth_threshold = 2;
121+
// Called when a node should be offloaded to disk.
122+
std::function<void(Node*)> offload_cb;
123+
// Called when an offloaded node needs its data loaded back from disk.
124+
std::function<void(Node*)> onload_cb;
125+
// Called when an offloaded or io_pending node is being deleted.
126+
std::function<void(Node*)> delete_cb;
106127
};
107128

108129
/**

src/core/tiering_types.cc

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,44 @@
44

55
#include "core/tiering_types.h"
66

7+
#include <utility>
8+
9+
#include "core/compact_object.h"
10+
#include "core/overloaded.h"
711
#include "redis/redis_aux.h"
812

913
namespace dfly::tiering {
1014

15+
bool FragmentRef::IsOffloaded() const {
16+
return std::visit(Overloaded{[](CompactValue* pv) { return pv->IsExternal(); },
17+
[](QList::Node* node) { return node->offloaded != 0; }},
18+
val_);
19+
}
20+
21+
void FragmentRef::ClearOffloaded() {
22+
std::visit(Overloaded{[](CompactValue* pv) { pv->RemoveExternal(); },
23+
[](QList::Node* node) { node->offloaded = 0; }},
24+
val_);
25+
}
26+
27+
bool FragmentRef::HasStashPending() const {
28+
return std::visit(Overloaded{[](CompactValue* pv) { return pv->HasStashPending(); },
29+
[](QList::Node* node) { return node->io_pending != 0; }},
30+
val_);
31+
}
32+
33+
void FragmentRef::SetStashPending(bool b) {
34+
std::visit(Overloaded{[b](CompactValue* pv) { pv->SetStashPending(b); },
35+
[b](QList::Node* node) { node->io_pending = b ? 1 : 0; }},
36+
val_);
37+
}
38+
39+
CompactObjType FragmentRef::ObjType() const {
40+
return std::visit(Overloaded{[](CompactValue* pv) { return pv->ObjType(); },
41+
[](QList::Node*) -> CompactObjType { return OBJ_LIST; }},
42+
val_);
43+
}
44+
1145
auto FragmentRef::GetDescr(const CompactValue* pv) -> SerializationDescr {
1246
switch (pv->ObjType()) {
1347
case OBJ_STRING: {
@@ -27,12 +61,31 @@ auto FragmentRef::GetDescr(const CompactValue* pv) -> SerializationDescr {
2761
};
2862
}
2963

64+
auto FragmentRef::GetDescr(const QList::Node* node) -> SerializationDescr {
65+
if (!node->entry || node->sz == 0)
66+
return {};
67+
68+
std::string_view entry_view{reinterpret_cast<const char*>(node->entry), node->sz};
69+
return {std::array<std::string_view, 2>{entry_view, {}}, CompactObj::ExternalRep::LIST_NODE};
70+
}
71+
72+
auto FragmentRef::GetSerializationDescr() const -> SerializationDescr {
73+
return std::visit(Overloaded{[](CompactValue* pv) { return GetDescr(pv); },
74+
[](QList::Node* node) { return GetDescr(node); }},
75+
val_);
76+
}
77+
3078
TieredCoolRecord* FragmentRef::GetCoolRecord() const {
31-
return std::visit(
32-
[](auto* pv) -> TieredCoolRecord* {
33-
return pv->IsExternal() && pv->IsCool() ? pv->GetCool().record : nullptr;
34-
},
35-
val_);
79+
return std::visit(Overloaded{[](CompactValue* pv) -> TieredCoolRecord* {
80+
return pv->IsExternal() && pv->IsCool() ? pv->GetCool().record
81+
: nullptr;
82+
},
83+
[](QList::Node* node) -> TieredCoolRecord* { return nullptr; }},
84+
val_);
85+
}
86+
87+
std::pair<size_t, size_t> FragmentRef::GetExternalSlice() const {
88+
return std::visit([](auto* val) { return val->GetExternalSlice(); }, val_);
3689
}
3790

3891
} // namespace dfly::tiering

src/core/tiering_types.h

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <boost/intrusive/list_hook.hpp>
88

99
#include "core/compact_object.h"
10+
#include "core/qlist.h"
1011

1112
namespace dfly::tiering {
1213

@@ -39,45 +40,36 @@ class FragmentRef {
3940
FragmentRef(CompactValue* pv) : val_(pv) { // NOLINT
4041
}
4142

42-
bool IsOffloaded() const {
43-
return std::visit([](auto* pv) { return pv->IsExternal(); }, val_);
43+
FragmentRef(QList::Node& node) : val_(&node) { // NOLINT
4444
}
4545

46-
// Resets offloaded state for this fragment.
47-
void ClearOffloaded() {
48-
std::visit([](auto* pv) { pv->RemoveExternal(); }, val_);
46+
FragmentRef(QList::Node* node) : val_(node) { // NOLINT
4947
}
5048

51-
bool HasStashPending() const {
52-
return std::visit([](auto* pv) { return pv->HasStashPending(); }, val_);
53-
}
49+
bool IsOffloaded() const;
5450

55-
void ClearStashPending() {
56-
std::visit([](auto* pv) { pv->SetStashPending(false); }, val_);
57-
}
51+
// Resets offloaded state for this fragment.
52+
void ClearOffloaded();
5853

59-
CompactObjType ObjType() const {
60-
return std::visit([](auto* pv) { return pv->ObjType(); }, val_);
61-
}
54+
bool HasStashPending() const;
55+
void SetStashPending(bool b);
56+
57+
CompactObjType ObjType() const;
6258

6359
// Determine required byte size and encoding type based on value.
64-
SerializationDescr GetSerializationDescr() const {
65-
return std::visit([](auto* pv) { return GetDescr(pv); }, val_);
66-
}
60+
SerializationDescr GetSerializationDescr() const;
6761

6862
// Returns a pointer to TieredCoolRecord if this fragment is cool, and null otherwise.
6963
TieredCoolRecord* GetCoolRecord() const;
7064

7165
// Returns the external slice of the offloaded value. Only valid if IsOffloaded() is true.
72-
std::pair<size_t, size_t> GetExternalSlice() const {
73-
return std::visit([](auto* pv) { return pv->GetExternalSlice(); }, val_);
74-
}
66+
std::pair<size_t, size_t> GetExternalSlice() const;
7567

7668
private:
7769
static SerializationDescr GetDescr(const CompactValue* pv);
70+
static SerializationDescr GetDescr(const QList::Node* node);
7871

79-
// TODO: to support more types, for example Node* from qlist.h.
80-
std::variant<CompactValue*> val_;
72+
std::variant<CompactValue*, QList::Node*> val_;
8173
};
8274

8375
} // namespace dfly::tiering

0 commit comments

Comments
 (0)