Skip to content

Commit fd69cf2

Browse files
committed
[Store] L2->L1 promotion-on-hit
1 parent 22fd38d commit fd69cf2

21 files changed

Lines changed: 2253 additions & 32 deletions

mooncake-integration/store/store_py.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,6 +1616,9 @@ PYBIND11_MODULE(store, m) {
16161616
.def("is_disk_replica",
16171617
static_cast<bool (Replica::Descriptor::*)() const noexcept>(
16181618
&Replica::Descriptor::is_disk_replica))
1619+
.def("is_local_disk_replica",
1620+
static_cast<bool (Replica::Descriptor::*)() const noexcept>(
1621+
&Replica::Descriptor::is_local_disk_replica))
16191622
.def(
16201623
"get_memory_descriptor",
16211624
static_cast<const MemoryDescriptor &(Replica::Descriptor::*)()

mooncake-store/include/client_service.h

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class QueryResult {
6060
*/
6161
class Client {
6262
public:
63-
~Client();
63+
virtual ~Client();
6464

6565
const UUID& getClientId() const { return client_id_; }
6666

@@ -405,6 +405,42 @@ class Client {
405405
tl::expected<void, ErrorCode> ReportSsdCapacity(
406406
int64_t ssd_total_capacity_bytes);
407407

408+
/**
409+
* @brief Heartbeat-driven pull of pending L2->L1 promotion work for this
410+
* client. Mirror of OffloadObjectHeartbeat. Returns key->size pairs the
411+
* caller (FileStorage) must read from local SSD and stage as MEMORY
412+
* replicas via PromotionAllocStart + NotifyPromotionSuccess.
413+
*/
414+
// Virtual to enable subclassing in unit tests.
415+
virtual tl::expected<void, ErrorCode> PromotionObjectHeartbeat(
416+
std::unordered_map<std::string, int64_t>& promotion_objects);
417+
418+
/**
419+
* @brief Stage a PROCESSING MEMORY replica for an existing key during
420+
* L2->L1 promotion. Returns the new replica's descriptor that the caller
421+
* writes via Transfer Engine before calling NotifyPromotionSuccess.
422+
*/
423+
virtual tl::expected<PromotionAllocStartResponse, ErrorCode>
424+
PromotionAllocStart(const std::string& key, uint64_t size,
425+
const std::vector<std::string>& preferred_segments);
426+
427+
/**
428+
* @brief Commit a staged MEMORY replica to COMPLETE; called after the
429+
* client has written the bytes via Transfer Engine.
430+
*/
431+
virtual tl::expected<void, ErrorCode> NotifyPromotionSuccess(
432+
const std::string& key);
433+
434+
/**
435+
* @brief Write `slices` into the memory replica described by
436+
* `memory_descriptor` via Transfer Engine. Used by FileStorage to fill a
437+
* PROCESSING memory replica staged by PromotionAllocStart before calling
438+
* NotifyPromotionSuccess.
439+
*/
440+
virtual ErrorCode PromotionWrite(
441+
const Replica::Descriptor& memory_descriptor,
442+
std::vector<Slice>& slices);
443+
408444
/**
409445
* @brief Performs a batched read of multiple objects using a
410446
* high-throughput Transfer Engine.
@@ -570,14 +606,16 @@ class Client {
570606

571607
bool IsReplicaOnLocalMemory(const Replica::Descriptor& replica);
572608

573-
private:
609+
protected:
574610
/**
575-
* @brief Private constructor to enforce creation through Create() method
611+
* @brief Constructor exposed to subclasses for testing only; production
612+
* code must go through Create().
576613
*/
577614
Client(const std::string& local_hostname,
578615
const std::string& metadata_connstring, const std::string& protocol,
579616
const std::map<std::string, std::string>& labels = {});
580617

618+
private:
581619
/**
582620
* @brief Internal helper functions for initialization and data transfer
583621
*/

mooncake-store/include/file_storage.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class FileStorage {
5151

5252
private:
5353
friend class FileStorageTest;
54+
friend class FileStoragePromotionTest;
5455
struct AllocatedBatch {
5556
uint64_t batch_id;
5657
std::vector<BufferHandle> handles;
@@ -82,10 +83,24 @@ class FileStorage {
8283
* client.
8384
* 2. Receives feedback on which objects should be offloaded.
8485
* 3. Triggers asynchronous offloading of pending objects.
86+
* 4. Pulls and processes any pending L2->L1 promotion tasks queued by the
87+
* master (mirror of step 1+2 in the reverse direction).
8588
* @return tl::expected<void, ErrorCode> indicating operation status.
8689
*/
8790
tl::expected<void, ErrorCode> Heartbeat();
8891

92+
/**
93+
* @brief Drives the L2->L1 promotion pipeline for one heartbeat tick.
94+
* Pulls promotion work from the master, stages a MEMORY replica for each
95+
* key, copies the bytes from local SSD into that replica, and notifies the
96+
* master on success. A failure on any single key is logged and skipped;
97+
* the master-side reaper decrements the source replica's refcnt and
98+
* erases the task entry on TTL expiry, and any orphaned PROCESSING
99+
* MEMORY replica is reaped via the standard discarded-replicas path.
100+
* @return tl::expected<void, ErrorCode> indicating operation status.
101+
*/
102+
tl::expected<void, ErrorCode> ProcessPromotionTasks();
103+
89104
tl::expected<bool, ErrorCode> IsEnableOffloading();
90105

91106
tl::expected<void, ErrorCode> BatchLoad(
@@ -123,4 +138,4 @@ class FileStorage {
123138
std::thread client_buffer_gc_thread_;
124139
};
125140

126-
} // namespace mooncake
141+
} // namespace mooncake

mooncake-store/include/master_client.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,32 @@ class MasterClient {
363363
const UUID& client_id, const std::vector<std::string>& keys,
364364
const std::vector<StorageObjectMetadata>& metadatas);
365365

366+
/**
367+
* @brief Heartbeat-driven pull of pending L2->L1 promotion work for a
368+
* client. Returns key->size pairs the caller should read from local
369+
* SSD and stage as MEMORY replicas via PromotionAllocStart +
370+
* NotifyPromotionSuccess.
371+
*/
372+
[[nodiscard]] tl::expected<std::unordered_map<std::string, int64_t>,
373+
ErrorCode>
374+
PromotionObjectHeartbeat(const UUID& client_id);
375+
376+
/**
377+
* @brief Stage a PROCESSING MEMORY replica for an existing key during
378+
* promotion. Returns the new replica's descriptor that the caller writes
379+
* via Transfer Engine.
380+
*/
381+
[[nodiscard]] tl::expected<PromotionAllocStartResponse, ErrorCode>
382+
PromotionAllocStart(const std::string& key, uint64_t size,
383+
const std::vector<std::string>& preferred_segments);
384+
385+
/**
386+
* @brief Commit a staged MEMORY replica to COMPLETE; called after the
387+
* client has written the bytes via Transfer Engine.
388+
*/
389+
[[nodiscard]] tl::expected<void, ErrorCode> NotifyPromotionSuccess(
390+
const UUID& client_id, const std::string& key);
391+
366392
/**
367393
* @brief Start a copy operation
368394
* @param key Object key

mooncake-store/include/master_config.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ struct MasterConfig {
8585
// Offload-on-evict: defer LOCAL_DISK offload to eviction time
8686
bool offload_on_evict = false;
8787
bool offload_force_evict = false;
88+
89+
// Promotion-on-hit: when Get observes a LOCAL_DISK-only key, queue an
90+
// async copy back to MEMORY so the next Get is fast.
91+
bool promotion_on_hit = false;
92+
uint32_t promotion_admission_threshold = 2;
93+
uint32_t promotion_queue_limit = 50000;
8894
};
8995

9096
class MasterServiceSupervisorConfig {
@@ -146,6 +152,9 @@ class MasterServiceSupervisorConfig {
146152
bool enable_cxl = false;
147153
bool offload_on_evict = false;
148154
bool offload_force_evict = false;
155+
bool promotion_on_hit = false;
156+
uint32_t promotion_admission_threshold = 2;
157+
uint32_t promotion_queue_limit = 50000;
149158
MasterServiceSupervisorConfig() = default;
150159

151160
// From MasterConfig
@@ -163,6 +172,9 @@ class MasterServiceSupervisorConfig {
163172
enable_offload = config.enable_offload;
164173
offload_on_evict = config.offload_on_evict;
165174
offload_force_evict = config.offload_force_evict;
175+
promotion_on_hit = config.promotion_on_hit;
176+
promotion_admission_threshold = config.promotion_admission_threshold;
177+
promotion_queue_limit = config.promotion_queue_limit;
166178
rpc_port = static_cast<int>(config.rpc_port);
167179
rpc_thread_num = static_cast<size_t>(config.rpc_thread_num);
168180

@@ -277,6 +289,9 @@ class WrappedMasterServiceConfig {
277289
bool enable_offload = false;
278290
bool offload_on_evict = false;
279291
bool offload_force_evict = false;
292+
bool promotion_on_hit = false;
293+
uint32_t promotion_admission_threshold = 2;
294+
uint32_t promotion_queue_limit = 50000;
280295
std::string ha_backend_type = "etcd";
281296
std::string ha_backend_connstring;
282297
std::string cluster_id = DEFAULT_CLUSTER_ID;
@@ -334,6 +349,9 @@ class WrappedMasterServiceConfig {
334349
enable_offload = config.enable_offload;
335350
offload_on_evict = config.offload_on_evict;
336351
offload_force_evict = config.offload_force_evict;
352+
promotion_on_hit = config.promotion_on_hit;
353+
promotion_admission_threshold = config.promotion_admission_threshold;
354+
promotion_queue_limit = config.promotion_queue_limit;
337355
ha_backend_type = config.ha_backend_type;
338356
ha_backend_connstring = config.ha_backend_connstring;
339357
if (ha_backend_connstring.empty()) {
@@ -414,6 +432,9 @@ class WrappedMasterServiceConfig {
414432
enable_offload = config.enable_offload;
415433
offload_on_evict = config.offload_on_evict;
416434
offload_force_evict = config.offload_force_evict;
435+
promotion_on_hit = config.promotion_on_hit;
436+
promotion_admission_threshold = config.promotion_admission_threshold;
437+
promotion_queue_limit = config.promotion_queue_limit;
417438
ha_backend_type = config.ha_backend_type;
418439
ha_backend_connstring = config.ha_backend_connstring;
419440
if (ha_backend_connstring.empty()) {
@@ -752,6 +773,9 @@ class MasterServiceConfig {
752773
bool enable_offload = false;
753774
bool offload_on_evict = false;
754775
bool offload_force_evict = false;
776+
bool promotion_on_hit = false;
777+
uint32_t promotion_admission_threshold = 2;
778+
uint32_t promotion_queue_limit = 50000;
755779
std::string ha_backend_type = "etcd";
756780
std::string ha_backend_connstring;
757781
std::string cluster_id = DEFAULT_CLUSTER_ID;
@@ -805,6 +829,9 @@ class MasterServiceConfig {
805829
enable_offload = config.enable_offload;
806830
offload_on_evict = config.offload_on_evict;
807831
offload_force_evict = config.offload_force_evict;
832+
promotion_on_hit = config.promotion_on_hit;
833+
promotion_admission_threshold = config.promotion_admission_threshold;
834+
promotion_queue_limit = config.promotion_queue_limit;
808835
ha_backend_type = config.ha_backend_type;
809836
ha_backend_connstring = config.ha_backend_connstring;
810837
cluster_id = config.cluster_id;

mooncake-store/include/master_service.h

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <ylt/util/tl/expected.hpp>
2020

2121
#include "allocation_strategy.h"
22+
#include "count_min_sketch.h"
2223
#include "master_metric_manager.h"
2324
#include "mutex.h"
2425
#include "segment.h"
@@ -456,6 +457,34 @@ class MasterService {
456457
const std::vector<StorageObjectMetadata>& metadatas)
457458
-> tl::expected<void, ErrorCode>;
458459

460+
/**
461+
* @brief Heartbeat-driven pull of pending promotion work for a client.
462+
* Returns the per-client promotion_objects map (key -> object size) and
463+
* clears it. The per-shard promotion_tasks map remains populated as the
464+
* source of truth until NotifyPromotionSuccess commits the new MEMORY
465+
* replica.
466+
*/
467+
auto PromotionObjectHeartbeat(const UUID& client_id)
468+
-> tl::expected<std::unordered_map<std::string, int64_t>, ErrorCode>;
469+
470+
/**
471+
* @brief Stage a PROCESSING MEMORY replica for an existing key. Allocates
472+
* DRAM via the existing AllocationStrategy, optionally biased toward the
473+
* caller's local memory segment via preferred_segments. The new replica is
474+
* invisible to readers until NotifyPromotionSuccess flips it to COMPLETE.
475+
*/
476+
auto PromotionAllocStart(const std::string& key, uint64_t size,
477+
const std::vector<std::string>& preferred_segments)
478+
-> tl::expected<PromotionAllocStartResponse, ErrorCode>;
479+
480+
/**
481+
* @brief Commit a staged MEMORY replica to COMPLETE; decrement source
482+
* refcnt; erase per-shard and per-client task entries. Mirror of
483+
* NotifyOffloadSuccess.
484+
*/
485+
auto NotifyPromotionSuccess(const UUID& client_id, const std::string& key)
486+
-> tl::expected<void, ErrorCode>;
487+
459488
/**
460489
* @brief Create a copy task to copy an object's replicas to target segments
461490
* @return Copy task ID on success, ErrorCode on failure
@@ -848,6 +877,19 @@ class MasterService {
848877
std::chrono::system_clock::time_point start_time;
849878
};
850879

880+
// Tracks an in-flight LOCAL_DISK -> MEMORY copy. The source LOCAL_DISK
881+
// replica is refcnt-pinned for the duration of the task so it cannot be
882+
// evicted. alloc_id pins down which staged replica
883+
// NotifyPromotionSuccess should commit, so a concurrent Put creating
884+
// another PROCESSING MEMORY replica cannot be confused with ours.
885+
// alloc_id is 0 until PromotionAllocStart records the new replica.
886+
struct PromotionTask {
887+
ReplicaID source_id; // the LOCAL_DISK replica being promoted
888+
ReplicaID alloc_id{0}; // the new MEMORY replica staged by AllocStart
889+
uint64_t object_size;
890+
std::chrono::system_clock::time_point start_time;
891+
};
892+
851893
static constexpr size_t kNumShards = 1024; // Number of metadata shards
852894

853895
// Sharded metadata maps and their mutexes
@@ -860,6 +902,8 @@ class MasterService {
860902
GUARDED_BY(mutex);
861903
std::unordered_map<std::string, const OffloadingTask> offloading_tasks
862904
GUARDED_BY(mutex);
905+
std::unordered_map<std::string, const PromotionTask> promotion_tasks
906+
GUARDED_BY(mutex);
863907
};
864908
std::array<MetadataShard, kNumShards> metadata_shards_;
865909

@@ -935,6 +979,25 @@ class MasterService {
935979
tl::expected<void, ErrorCode> PushOffloadingQueue(const std::string& key,
936980
Replica& replica);
937981

982+
/**
983+
* @brief Mirror of PushOffloadingQueue for promotion-on-hit. Inserts an
984+
* entry into the holder client's LocalDiskSegment::promotion_objects map.
985+
* Caller is responsible for refcnt-pinning the source replica and
986+
* recording the task in the shard's promotion_tasks map.
987+
*/
988+
tl::expected<void, ErrorCode> PushPromotionQueue(const std::string& key,
989+
Replica& source_replica);
990+
991+
/**
992+
* @brief Helper invoked from GetReplicaList when an only-LOCAL_DISK key is
993+
* observed. Applies the gating chain (frequency / watermark / dedup /
994+
* cap), refcnt-pins the source LOCAL_DISK replica, records a
995+
* PromotionTask, and pushes onto the holder client's promotion_objects
996+
* map. Acquires its own RW shard accessor; safe to call after
997+
* GetReplicaList's RO accessor has been released.
998+
*/
999+
void TryPushPromotionQueue(const std::string& key);
1000+
9381001
// Lease related members
9391002
const uint64_t default_kv_lease_ttl_; // in milliseconds
9401003
const uint64_t default_kv_soft_pin_ttl_; // in milliseconds
@@ -1180,6 +1243,16 @@ class MasterService {
11801243
// offload_on_evict_=true)
11811244
bool offload_force_evict_{false};
11821245

1246+
// Promotion-on-hit: opt-in flag enabling LOCAL_DISK -> MEMORY promotion
1247+
// when a Get observes a key with only LOCAL_DISK replicas.
1248+
bool promotion_on_hit_{false};
1249+
uint32_t promotion_admission_threshold_{2};
1250+
uint32_t promotion_queue_limit_{50000};
1251+
// Master-side frequency sketch. Constructed only when promotion_on_hit_ is
1252+
// true. CountMinSketch is mutex-protected internally so we can call into it
1253+
// from any GetReplicaList caller without additional locking.
1254+
std::unique_ptr<CountMinSketch> promotion_sketch_;
1255+
11831256
const std::string ha_backend_type_;
11841257

11851258
const std::string ha_backend_connstring_;

mooncake-store/include/rpc_service.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,17 @@ class WrappedMasterService {
152152
const UUID& client_id, const std::vector<std::string>& keys,
153153
const std::vector<StorageObjectMetadata>& metadatas);
154154

155+
// Promotion-on-hit RPCs.
156+
tl::expected<std::unordered_map<std::string, int64_t>, ErrorCode>
157+
PromotionObjectHeartbeat(const UUID& client_id);
158+
159+
tl::expected<PromotionAllocStartResponse, ErrorCode> PromotionAllocStart(
160+
const std::string& key, uint64_t size,
161+
const std::vector<std::string>& preferred_segments);
162+
163+
tl::expected<void, ErrorCode> NotifyPromotionSuccess(
164+
const UUID& client_id, const std::string& key);
165+
155166
tl::expected<UUID, ErrorCode> CreateDrainJob(
156167
const CreateDrainJobRequest& request);
157168

mooncake-store/include/rpc_types.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ struct CopyStartResponse {
6767
};
6868
YLT_REFL(CopyStartResponse, source, targets);
6969

70+
/**
71+
* @brief Response structure for PromotionAllocStart (L2->L1 promotion-on-hit).
72+
* Carries the staged PROCESSING MEMORY replica descriptor.
73+
*/
74+
struct PromotionAllocStartResponse {
75+
Replica::Descriptor memory_descriptor;
76+
};
77+
YLT_REFL(PromotionAllocStartResponse, memory_descriptor);
78+
7079
/**
7180
* @brief Response structure for MoveStart operation
7281
*/

mooncake-store/include/segment.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ struct LocalDiskSegment {
5353
int64_t ssd_total_capacity_bytes = 0; // last reported by client heartbeat
5454
std::unordered_map<std::string, int64_t> GUARDED_BY(offloading_mutex_)
5555
offloading_objects;
56+
// Promotion-on-hit pending work for this client. Populated by master's
57+
// TryPushPromotionQueue when a Get hits a LOCAL_DISK-only key on this
58+
// client. Drained by PromotionObjectHeartbeat. Same locking as
59+
// offloading_objects (offloading_mutex_) to keep call-site discipline
60+
// identical to #1899's offload path.
61+
std::unordered_map<std::string, int64_t> GUARDED_BY(offloading_mutex_)
62+
promotion_objects;
5663
explicit LocalDiskSegment(bool enable_offloading)
5764
: enable_offloading(enable_offloading) {}
5865

0 commit comments

Comments
 (0)