Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions mooncake-integration/store/store_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,13 @@ PYBIND11_MODULE(store, m) {
})
.def("get", &mooncake::MooncakeStorePyWrapper::get)
.def("get_batch", &mooncake::MooncakeStorePyWrapper::get_batch)
.def("get_offload_rpc_read_count",
[](MooncakeStorePyWrapper &self) -> int64_t {
auto real_client =
std::dynamic_pointer_cast<RealClient>(self.store_);
return real_client ? real_client->get_offload_rpc_read_count()
: 0;
})
.def(
"get_buffer",
[](MooncakeStorePyWrapper &self, const std::string &key) {
Expand Down
51 changes: 48 additions & 3 deletions mooncake-store/include/client_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class QueryResult {
*/
class Client {
public:
~Client();
virtual ~Client();

const UUID& getClientId() const { return client_id_; }

Expand Down Expand Up @@ -410,6 +410,49 @@ class Client {
tl::expected<void, ErrorCode> ReportSsdCapacity(
int64_t ssd_total_capacity_bytes);

/**
* @brief Heartbeat-driven pull of pending L2->L1 promotion work for this
* client. Mirror of OffloadObjectHeartbeat. Returns key->size pairs the
* caller (FileStorage) must read from local SSD and stage as MEMORY
* replicas via PromotionAllocStart + NotifyPromotionSuccess.
*/
// Virtual to enable subclassing in unit tests.
virtual tl::expected<void, ErrorCode> PromotionObjectHeartbeat(
std::unordered_map<std::string, int64_t>& promotion_objects);

/**
* @brief Stage a PROCESSING MEMORY replica for an existing key during
* L2->L1 promotion. Returns the new replica's descriptor that the caller
* writes via Transfer Engine before calling NotifyPromotionSuccess.
*/
virtual tl::expected<PromotionAllocStartResponse, ErrorCode>
PromotionAllocStart(const std::string& key, uint64_t size,
const std::vector<std::string>& preferred_segments);

/**
* @brief Commit a staged MEMORY replica to COMPLETE; called after the
* client has written the bytes via Transfer Engine.
*/
virtual tl::expected<void, ErrorCode> NotifyPromotionSuccess(
const std::string& key);

/**
* @brief Release master-side promotion task after a client-side failure
* between PromotionAllocStart and the transfer's completion. Idempotent.
*/
virtual tl::expected<void, ErrorCode> NotifyPromotionFailure(
const std::string& key);

/**
* @brief Write `slices` into the memory replica described by
* `memory_descriptor` via Transfer Engine. Used by FileStorage to fill a
* PROCESSING memory replica staged by PromotionAllocStart before calling
* NotifyPromotionSuccess.
*/
virtual ErrorCode PromotionWrite(
const Replica::Descriptor& memory_descriptor,
std::vector<Slice>& slices);

/**
* @brief Performs a batched read of multiple objects using a
* high-throughput Transfer Engine.
Expand Down Expand Up @@ -577,14 +620,16 @@ class Client {

bool IsReplicaOnLocalMemory(const Replica::Descriptor& replica);

private:
protected:
/**
* @brief Private constructor to enforce creation through Create() method
* @brief Constructor exposed to subclasses for testing only; production
* code must go through Create().
*/
Client(const std::string& local_hostname,
const std::string& metadata_connstring, const std::string& protocol,
const std::map<std::string, std::string>& labels = {});

private:
/**
* @brief Internal helper functions for initialization and data transfer
*/
Expand Down
17 changes: 16 additions & 1 deletion mooncake-store/include/file_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class FileStorage {

private:
friend class FileStorageTest;
friend class FileStoragePromotionTest;
struct AllocatedBatch {
uint64_t batch_id;
std::vector<BufferHandle> handles;
Expand Down Expand Up @@ -82,10 +83,24 @@ class FileStorage {
* client.
* 2. Receives feedback on which objects should be offloaded.
* 3. Triggers asynchronous offloading of pending objects.
* 4. Pulls and processes any pending L2->L1 promotion tasks queued by the
* master (mirror of step 1+2 in the reverse direction).
* @return tl::expected<void, ErrorCode> indicating operation status.
*/
tl::expected<void, ErrorCode> Heartbeat();

/**
* @brief Drives the L2->L1 promotion pipeline for one heartbeat tick.
* Pulls promotion work from the master, stages a MEMORY replica for each
* key, copies the bytes from local SSD into that replica, and notifies the
* master on success. A failure on any single key is logged and skipped;
* the master-side reaper decrements the source replica's refcnt and
* erases the task entry on TTL expiry, and any orphaned PROCESSING
* MEMORY replica is reaped via the standard discarded-replicas path.
* @return tl::expected<void, ErrorCode> indicating operation status.
*/
tl::expected<void, ErrorCode> ProcessPromotionTasks();

tl::expected<bool, ErrorCode> IsEnableOffloading();

tl::expected<void, ErrorCode> BatchLoad(
Expand Down Expand Up @@ -123,4 +138,4 @@ class FileStorage {
std::thread client_buffer_gc_thread_;
};

} // namespace mooncake
} // namespace mooncake
35 changes: 35 additions & 0 deletions mooncake-store/include/master_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,41 @@ class MasterClient {
const UUID& client_id, const std::vector<std::string>& keys,
const std::vector<StorageObjectMetadata>& metadatas);

/**
* @brief Heartbeat-driven pull of pending L2->L1 promotion work for a
* client. Returns key->size pairs the caller should read from local
* SSD and stage as MEMORY replicas via PromotionAllocStart +
* NotifyPromotionSuccess.
*/
[[nodiscard]] tl::expected<std::unordered_map<std::string, int64_t>,
ErrorCode>
PromotionObjectHeartbeat(const UUID& client_id);

/**
* @brief Stage a PROCESSING MEMORY replica for an existing key during
* promotion. Returns the new replica's descriptor that the caller writes
* via Transfer Engine.
*/
[[nodiscard]] tl::expected<PromotionAllocStartResponse, ErrorCode>
PromotionAllocStart(const UUID& client_id, const std::string& key,
uint64_t size,
const std::vector<std::string>& preferred_segments);

/**
* @brief Release master-side promotion task state after a client-side
* failure that prevents the holder from calling NotifyPromotionSuccess.
* Idempotent; returns OK if the task was already swept by the reaper.
*/
[[nodiscard]] tl::expected<void, ErrorCode> NotifyPromotionFailure(
const UUID& client_id, const std::string& key);

/**
* @brief Commit a staged MEMORY replica to COMPLETE; called after the
* client has written the bytes via Transfer Engine.
*/
[[nodiscard]] tl::expected<void, ErrorCode> NotifyPromotionSuccess(
const UUID& client_id, const std::string& key);

/**
* @brief Start a copy operation
* @param key Object key
Expand Down
27 changes: 27 additions & 0 deletions mooncake-store/include/master_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ struct MasterConfig {
// Offload-on-evict: defer LOCAL_DISK offload to eviction time
bool offload_on_evict = false;
bool offload_force_evict = false;

// Promotion-on-hit: when Get observes a LOCAL_DISK-only key, queue an
// async copy back to MEMORY so the next Get is fast.
bool promotion_on_hit = false;
uint32_t promotion_admission_threshold = 2;
uint32_t promotion_queue_limit = 50000;
};

class MasterServiceSupervisorConfig {
Expand Down Expand Up @@ -159,6 +165,9 @@ class MasterServiceSupervisorConfig {
bool enable_cxl = false;
bool offload_on_evict = false;
bool offload_force_evict = false;
bool promotion_on_hit = false;
uint32_t promotion_admission_threshold = 2;
uint32_t promotion_queue_limit = 50000;
MasterServiceSupervisorConfig() = default;

// From MasterConfig
Expand All @@ -176,6 +185,9 @@ class MasterServiceSupervisorConfig {
enable_offload = config.enable_offload;
offload_on_evict = config.offload_on_evict;
offload_force_evict = config.offload_force_evict;
promotion_on_hit = config.promotion_on_hit;
promotion_admission_threshold = config.promotion_admission_threshold;
promotion_queue_limit = config.promotion_queue_limit;
rpc_port = static_cast<int>(config.rpc_port);
rpc_thread_num = static_cast<size_t>(config.rpc_thread_num);

Expand Down Expand Up @@ -288,6 +300,9 @@ class WrappedMasterServiceConfig {
bool enable_offload = false;
bool offload_on_evict = false;
bool offload_force_evict = false;
bool promotion_on_hit = false;
uint32_t promotion_admission_threshold = 2;
uint32_t promotion_queue_limit = 50000;
std::string ha_backend_type = "etcd";
std::string ha_backend_connstring;
std::string cluster_id = DEFAULT_CLUSTER_ID;
Expand Down Expand Up @@ -345,6 +360,9 @@ class WrappedMasterServiceConfig {
enable_offload = config.enable_offload;
offload_on_evict = config.offload_on_evict;
offload_force_evict = config.offload_force_evict;
promotion_on_hit = config.promotion_on_hit;
promotion_admission_threshold = config.promotion_admission_threshold;
promotion_queue_limit = config.promotion_queue_limit;
ha_backend_type = config.ha_backend_type;
ha_backend_connstring = ResolveConfiguredHABackendConnstring(
ha_backend_type, config.ha_backend_connstring,
Expand Down Expand Up @@ -424,6 +442,9 @@ class WrappedMasterServiceConfig {
enable_offload = config.enable_offload;
offload_on_evict = config.offload_on_evict;
offload_force_evict = config.offload_force_evict;
promotion_on_hit = config.promotion_on_hit;
promotion_admission_threshold = config.promotion_admission_threshold;
promotion_queue_limit = config.promotion_queue_limit;
ha_backend_type = config.ha_backend_type;
ha_backend_connstring = ResolveConfiguredHABackendConnstring(
ha_backend_type, config.ha_backend_connstring,
Expand Down Expand Up @@ -761,6 +782,9 @@ class MasterServiceConfig {
bool enable_offload = false;
bool offload_on_evict = false;
bool offload_force_evict = false;
bool promotion_on_hit = false;
uint32_t promotion_admission_threshold = 2;
uint32_t promotion_queue_limit = 50000;
std::string ha_backend_type = "etcd";
std::string ha_backend_connstring;
std::string cluster_id = DEFAULT_CLUSTER_ID;
Expand Down Expand Up @@ -814,6 +838,9 @@ class MasterServiceConfig {
enable_offload = config.enable_offload;
offload_on_evict = config.offload_on_evict;
offload_force_evict = config.offload_force_evict;
promotion_on_hit = config.promotion_on_hit;
promotion_admission_threshold = config.promotion_admission_threshold;
promotion_queue_limit = config.promotion_queue_limit;
ha_backend_type = config.ha_backend_type;
ha_backend_connstring = config.ha_backend_connstring;
cluster_id = config.cluster_id;
Expand Down
Loading
Loading