Skip to content

Commit 2cee8eb

Browse files
yzhan1claude
andcommitted
[Store] L2->L1 promotion-on-hit: reviewer feedback (orphan reaper, global cap, const, misconfig log)
Addresses reviewer feedback on PR kvcache-ai#2071. Issue 1 -- orphaned PROCESSING MEMORY replica leak. The promotion task reaper only dropped the source LOCAL_DISK refcnt and erased the task entry; it never popped the staged PROCESSING MEMORY replica added by PromotionAllocStart. That replica is not in shard->processing_keys, so DiscardExpiredProcessingReplicas could not sweep it and the buffer leaked until the object was removed or evicted. Fix: in the reaper, when alloc_id != 0, call metadata.EraseReplicaByID(alloc_id) to pop the staged replica and return its buffer to the allocator. Issue 2 -- per-shard cap was wrong for skewed workloads. The old gate was 'shard->size() * kNumShards >= limit', approximately right for uniform workloads but ~1024x too eager on skewed workloads where hot keys cluster in few shards. Replace with a cluster-wide std::atomic<uint64_t> promotion_in_flight_ counter. Incremented in TryPushPromotionQueue after successful emplace; decremented in NotifyPromotionSuccess and in the reaper. memory_order_relaxed since the value is advisory; the per-shard mutex already serializes inserts within a shard and the dedup gate prevents duplicate work. Issue 3 -- const_cast smell. The promotion_tasks map held const PromotionTask values for "generic safety", forcing a const_cast<PromotionTask&> in PromotionAllocStart to set alloc_id under the shard write lock. Drop the const; PromotionAllocStart now sets task_it->second.alloc_id = new_id directly. Misconfig log -- emit LOG(WARNING) at startup when config.promotion_on_hit=true but enable_offload=false. Promotion requires offload to produce LOCAL_DISK replicas, so it is silently disabled in that combination; the log makes the disablement discoverable to operators. Tests ----- - New ReaperPopsStagedMemoryReplicaOnExpiry: regression for Issue 1. Uses QuerySegments(seg).first (used bytes) to observe that the staged PROCESSING MEMORY replica's buffer is freed back to the DRAM allocator after the reaper sweeps the expired task. - New QueueLimitRejectsCrossShard: regression for Issue 2. With queue_limit=1, proves a second admission attempt on a *different shard* is rejected -- exactly the case the old per-shard cap admitted incorrectly. - Updated comment on existing QueueLimitRejectsBeyondCap to reflect the cluster-wide counter semantics. Verification ------------ - promotion_on_hit_test: 15/15 pass (5 consecutive clean runs) - file_storage_promotion_test: 9/9 pass - master_service_promotion_test_for_snapshot: 5/5 pass - offload_on_evict_test: 9/9 pass - code_format.sh --base upstream/main (clang-format-20 in container): 3 files reformatted (master_service.h, master_service.cpp, promotion_on_hit_test.cpp); all others "Already formatted" Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e0490e9 commit 2cee8eb

3 files changed

Lines changed: 194 additions & 21 deletions

File tree

mooncake-store/include/master_service.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,7 @@ class MasterService {
902902
GUARDED_BY(mutex);
903903
std::unordered_map<std::string, const OffloadingTask> offloading_tasks
904904
GUARDED_BY(mutex);
905-
std::unordered_map<std::string, const PromotionTask> promotion_tasks
905+
std::unordered_map<std::string, PromotionTask> promotion_tasks
906906
GUARDED_BY(mutex);
907907
};
908908
std::array<MetadataShard, kNumShards> metadata_shards_;
@@ -1248,6 +1248,18 @@ class MasterService {
12481248
bool promotion_on_hit_{false};
12491249
uint32_t promotion_admission_threshold_{2};
12501250
uint32_t promotion_queue_limit_{50000};
1251+
// Global in-flight task counter, checked against promotion_queue_limit_
1252+
// as the gate cap. A previous per-shard heuristic (shard->size() *
1253+
// kNumShards) was effectively right for uniform workloads but ~1024x
1254+
// tight on skewed workloads, where hot keys cluster in a few shards and
1255+
// would saturate one shard's projection of the cap while the cluster
1256+
// had near-zero in-flight tasks. Promotion specifically targets skewed
1257+
// access (hot keys re-accessed after eviction), so the global counter
1258+
// is the correct primitive. Incremented in TryPushPromotionQueue after
1259+
// successful enqueue; decremented in NotifyPromotionSuccess and in the
1260+
// promotion task reaper after the task entry is erased. Relaxed memory
1261+
// order is safe — the value is an advisory soft cap, not a barrier.
1262+
std::atomic<uint64_t> promotion_in_flight_{0};
12511263
// Master-side frequency sketch. Constructed only when promotion_on_hit_ is
12521264
// true. CountMinSketch is mutex-protected internally so we can call into it
12531265
// from any GetReplicaList caller without additional locking.

mooncake-store/src/master_service.cpp

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@ MasterService::MasterService(const MasterServiceConfig& config)
185185
promotion_on_hit_ = enable_offload_ && config.promotion_on_hit;
186186
promotion_admission_threshold_ = config.promotion_admission_threshold;
187187
promotion_queue_limit_ = config.promotion_queue_limit;
188+
if (config.promotion_on_hit && !enable_offload_) {
189+
LOG(WARNING) << "promotion_on_hit=true was requested but "
190+
<< "enable_offload=false; promotion is silently "
191+
<< "disabled because it requires offload to produce "
192+
<< "LOCAL_DISK replicas. Set enable_offload=true to "
193+
<< "use this feature.";
194+
}
188195
if (promotion_on_hit_) {
189196
promotion_sketch_ = std::make_unique<CountMinSketch>();
190197
LOG(INFO) << "Promotion-on-hit mode enabled: LOCAL_DISK-only Gets "
@@ -2404,11 +2411,15 @@ void MasterService::TryPushPromotionQueue(const std::string& key) {
24042411
return;
24052412
}
24062413

2407-
// Cap gate: cheap stat — read this shard's count under our held RW lock,
2408-
// which already gives an upper bound that's typically tight enough.
2409-
// Cluster-wide cap can be added in a follow-up; for v1 the per-shard
2410-
// count × shard count gives a soft cap.
2411-
if (shard->promotion_tasks.size() * kNumShards >= promotion_queue_limit_) {
2414+
// Cap gate: read the cluster-wide in-flight count. Soft cap — a
2415+
// benign TOCTOU race between this load and the emplace below can let
2416+
// a few extra tasks slip in, but the per-shard mutex already
2417+
// serializes inserts within a shard and the dedup gate above prevents
2418+
// duplicate work, so the worst case is N concurrent inserters across
2419+
// distinct shards each admitting one extra task. Atomic load is
2420+
// relaxed because the value is purely advisory.
2421+
if (promotion_in_flight_.load(std::memory_order_relaxed) >=
2422+
promotion_queue_limit_) {
24122423
return;
24132424
}
24142425

@@ -2443,6 +2454,7 @@ void MasterService::TryPushPromotionQueue(const std::string& key) {
24432454
.alloc_id = 0,
24442455
.object_size = object_size,
24452456
.start_time = std::chrono::system_clock::now()});
2457+
promotion_in_flight_.fetch_add(1, std::memory_order_relaxed);
24462458
VLOG(1) << "promotion_queued key=" << key << " size=" << object_size;
24472459
}
24482460

@@ -2527,10 +2539,7 @@ auto MasterService::PromotionAllocStart(
25272539
auto& shard = accessor.GetShard();
25282540
auto task_it = shard->promotion_tasks.find(key);
25292541
if (task_it != shard->promotion_tasks.end()) {
2530-
// const_cast: promotion_tasks holds const PromotionTask values for
2531-
// generic safety, but we own the entry under the shard write lock
2532-
// and need to record the alloc_id post-Allocate.
2533-
const_cast<PromotionTask&>(task_it->second).alloc_id = new_id;
2542+
task_it->second.alloc_id = new_id;
25342543
}
25352544
return PromotionAllocStartResponse{std::move(desc)};
25362545
}
@@ -2571,6 +2580,7 @@ auto MasterService::NotifyPromotionSuccess(const UUID& client_id,
25712580
source->dec_refcnt();
25722581
}
25732582
shard->promotion_tasks.erase(task_it);
2583+
promotion_in_flight_.fetch_sub(1, std::memory_order_relaxed);
25742584

25752585
// Erase the per-client promotion_objects entry (best-effort; the
25762586
// heartbeat may have already drained it).
@@ -2765,13 +2775,23 @@ void MasterService::DiscardExpiredProcessingReplicas(
27652775
task_it = shard->offloading_tasks.erase(task_it);
27662776
}
27672777

2768-
// Part 4: Discard expired promotion-on-hit tasks. Drops the source
2769-
// LOCAL_DISK replica's refcnt and erases the task entry; the per-client
2770-
// promotion_objects map is best-effort garbage collected on the next
2771-
// heartbeat (entries for vanished tasks are harmless — the client will
2772-
// allocate, transfer, then NotifyPromotionSuccess will return
2773-
// REPLICA_IS_NOT_READY and the new MEMORY replica is reaped via the
2774-
// discarded-replicas path).
2778+
// Part 4: Discard expired promotion-on-hit tasks. For each expired
2779+
// task:
2780+
// - Drop the source LOCAL_DISK replica's refcnt so it can be
2781+
// evicted normally.
2782+
// - If PromotionAllocStart already staged a PROCESSING MEMORY
2783+
// replica (alloc_id != 0), pop it via EraseReplicaByID. The
2784+
// staged replica is not in shard->processing_keys, so
2785+
// DiscardExpiredProcessingReplicas would never reap it; this is
2786+
// the only place that does. Without this the buffer leaks until
2787+
// the object itself is removed or evicted.
2788+
// - Erase the task entry and decrement the global in-flight
2789+
// counter.
2790+
// The per-client promotion_objects map is best-effort garbage
2791+
// collected on the next heartbeat (entries for vanished tasks are
2792+
// harmless — the client will allocate, transfer, then
2793+
// NotifyPromotionSuccess will return REPLICA_IS_NOT_READY since the
2794+
// task entry is gone).
27752795
for (auto task_it = shard->promotion_tasks.begin();
27762796
task_it != shard->promotion_tasks.end();) {
27772797
const auto ttl =
@@ -2787,9 +2807,13 @@ void MasterService::DiscardExpiredProcessingReplicas(
27872807
if (source != nullptr) {
27882808
source->dec_refcnt();
27892809
}
2810+
if (task_it->second.alloc_id != 0) {
2811+
metadata_it->second.EraseReplicaByID(task_it->second.alloc_id);
2812+
}
27902813
}
27912814
LOG(WARNING) << "Promotion task expired for key: " << task_it->first;
27922815
task_it = shard->promotion_tasks.erase(task_it);
2816+
promotion_in_flight_.fetch_sub(1, std::memory_order_relaxed);
27932817
}
27942818

27952819
if (!discarded_replicas.empty()) {

mooncake-store/tests/promotion_on_hit_test.cpp

Lines changed: 141 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -559,8 +559,9 @@ TEST_F(PromotionOnHitTest, QueueLimitRejectsBeyondCap) {
559559
auto r1 = service->GetReplicaList(k1);
560560
ASSERT_TRUE(r1.has_value());
561561

562-
// Second read on k2 (same shard S, different key, so no dedup) must be
563-
// dropped by the cap gate: shard already has 1 task and 1*1024 >= 1.
562+
// Second read on k2 (same shard S, different key, so no dedup) must
563+
// be dropped by the cap gate: the cluster-wide in-flight counter is
564+
// already 1, which meets promotion_queue_limit_ = 1.
564565
auto r2 = service->GetReplicaList(k2);
565566
ASSERT_TRUE(r2.has_value()) << "read itself must still succeed; "
566567
<< "queue gate is silent";
@@ -569,8 +570,8 @@ TEST_F(PromotionOnHitTest, QueueLimitRejectsBeyondCap) {
569570
auto heartbeat = service->PromotionObjectHeartbeat(seg.client_id);
570571
ASSERT_TRUE(heartbeat.has_value());
571572
EXPECT_EQ(heartbeat->size(), 1u)
572-
<< "queue_limit=1 should cap the same shard at 1 task; "
573-
<< "k2's enqueue must be dropped";
573+
<< "promotion_queue_limit=1 should admit only the first task "
574+
<< "globally; k2's enqueue must be dropped";
574575
EXPECT_EQ(heartbeat->count(k1), 1u)
575576
<< "k1 was read first and should be the surviving task";
576577
EXPECT_EQ(heartbeat->count(k2), 0u)
@@ -650,6 +651,142 @@ TEST_F(PromotionOnHitTest, HeartbeatBoundedBatchPreservesLeftovers) {
650651
service->RemoveAll();
651652
}
652653

654+
// Issue 1 regression: the promotion task reaper must pop the staged
655+
// PROCESSING MEMORY replica added by PromotionAllocStart. Without it,
656+
// the staged replica was orphaned forever: it's not in
657+
// shard->processing_keys (so DiscardExpiredProcessingReplicas can't see
658+
// it) and the previous reaper code only touched the source LOCAL_DISK
659+
// refcnt and the task entry. The orphan held its allocator buffer
660+
// indefinitely.
661+
//
662+
// We can't observe the staged replica via GetReplicaList because the
663+
// master filters out PROCESSING entries (clients can only read COMPLETE
664+
// replicas), so we use QuerySegments to watch the DRAM allocator's used
665+
// bytes: AllocStart bumps it, and the reaper must return it to baseline.
666+
// NotifyPromotionSuccess on a reaped task must also fail cleanly.
667+
TEST_F(PromotionOnHitTest, ReaperPopsStagedMemoryReplicaOnExpiry) {
668+
MasterServiceConfig config;
669+
config.enable_offload = true;
670+
config.promotion_on_hit = true;
671+
config.promotion_admission_threshold = 1;
672+
config.default_kv_lease_ttl = 2000;
673+
config.put_start_discard_timeout_sec = 0;
674+
config.put_start_release_timeout_sec = 1;
675+
auto service = std::make_unique<MasterService>(config);
676+
677+
constexpr size_t seg_size = 1024 * 1024 * 16;
678+
auto ctx = PrepareSegment(*service, "seg_a", kDefaultSegmentBase, seg_size);
679+
ASSERT_TRUE(InjectLocalDiskReplica(*service, ctx.client_id, "k_cold", 1024,
680+
ctx.segment_name));
681+
682+
// Baseline allocator usage on the DRAM segment.
683+
auto seg_baseline = service->QuerySegments(ctx.segment_name);
684+
ASSERT_TRUE(seg_baseline.has_value());
685+
const size_t used_baseline = seg_baseline->first;
686+
687+
// Trigger the gate to enqueue a PromotionTask.
688+
{
689+
auto r = service->GetReplicaList("k_cold");
690+
ASSERT_TRUE(r.has_value());
691+
}
692+
// Drive the AllocStart side so alloc_id != 0 — this is the exact
693+
// setup that left an orphaned PROCESSING MEMORY replica pre-fix.
694+
auto alloc = service->PromotionAllocStart("k_cold", 1024, {});
695+
ASSERT_TRUE(alloc.has_value());
696+
697+
// After AllocStart, the DRAM allocator must have committed bytes for
698+
// the staged PROCESSING MEMORY replica.
699+
auto seg_after_alloc = service->QuerySegments(ctx.segment_name);
700+
ASSERT_TRUE(seg_after_alloc.has_value());
701+
EXPECT_GT(seg_after_alloc->first, used_baseline)
702+
<< "PromotionAllocStart should bump segment used bytes "
703+
<< "(allocator-tracked PROCESSING MEMORY replica)";
704+
705+
// Sleep past the staleness window; the eviction thread reaps the
706+
// task and (with the fix) pops the staged replica via
707+
// EraseReplicaByID, which releases the buffer back to the allocator.
708+
std::this_thread::sleep_for(std::chrono::seconds(2));
709+
710+
auto seg_after_reap = service->QuerySegments(ctx.segment_name);
711+
ASSERT_TRUE(seg_after_reap.has_value());
712+
EXPECT_EQ(seg_after_reap->first, used_baseline)
713+
<< "after reap: staged PROCESSING MEMORY replica's buffer must "
714+
<< "be freed back to the DRAM allocator. Pre-fix the buffer "
715+
<< "leaked and used bytes stayed elevated until the object "
716+
<< "itself was removed or evicted.";
717+
718+
// NotifyPromotionSuccess for a reaped task must not commit anything
719+
// and must return REPLICA_IS_NOT_READY (the task entry is gone, so
720+
// the alloc_id lookup at the top of NotifyPromotionSuccess fails
721+
// fast).
722+
auto notify = service->NotifyPromotionSuccess(ctx.client_id, "k_cold");
723+
ASSERT_FALSE(notify.has_value());
724+
EXPECT_EQ(notify.error(), ErrorCode::REPLICA_IS_NOT_READY);
725+
726+
service->RemoveAll();
727+
}
728+
729+
// Issue 2 regression: the cap gate must be cluster-wide. The old
730+
// implementation used `shard->size() * kNumShards >= promotion_queue_limit_`,
731+
// which made the cap fire ~1024x too eagerly on skewed workloads (hot
732+
// keys cluster in few shards). With a global atomic counter, a task in
733+
// shard A counts toward the cap that gates a task in shard B.
734+
TEST_F(PromotionOnHitTest, QueueLimitRejectsCrossShard) {
735+
MasterServiceConfig config;
736+
config.enable_offload = true;
737+
config.promotion_on_hit = true;
738+
config.promotion_admission_threshold = 1;
739+
config.promotion_queue_limit = 1; // 1 in-flight task globally
740+
config.default_kv_lease_ttl = 2000;
741+
auto service = std::make_unique<MasterService>(config);
742+
743+
constexpr size_t seg_size = 1024 * 1024 * 16;
744+
auto seg = PrepareSegment(*service, "seg_a", kDefaultSegmentBase, seg_size);
745+
746+
// Find two keys hashing to *different* shards. With the old per-shard
747+
// heuristic this would let both through (each shard's count is 0
748+
// independently). With the global counter, only the first goes in.
749+
constexpr size_t kNumShardsLocal = 1024;
750+
auto shard_of = [](const std::string& k) {
751+
return std::hash<std::string>{}(k) % kNumShardsLocal;
752+
};
753+
const std::string k1 = "xshard_first";
754+
std::string k2;
755+
for (int i = 0; i < 100000 && k2.empty(); ++i) {
756+
std::string candidate = "xshard_other_" + std::to_string(i);
757+
if (shard_of(candidate) != shard_of(k1)) {
758+
k2 = candidate;
759+
}
760+
}
761+
ASSERT_FALSE(k2.empty()) << "couldn't find a different-shard key";
762+
ASSERT_NE(shard_of(k1), shard_of(k2));
763+
764+
ASSERT_TRUE(InjectLocalDiskReplica(*service, seg.client_id, k1, 1024,
765+
seg.segment_name));
766+
ASSERT_TRUE(InjectLocalDiskReplica(*service, seg.client_id, k2, 1024,
767+
seg.segment_name));
768+
769+
auto r1 = service->GetReplicaList(k1);
770+
ASSERT_TRUE(r1.has_value());
771+
772+
// k2 lives in a different shard, but the global cap is already met
773+
// by k1's task — k2 must be rejected.
774+
auto r2 = service->GetReplicaList(k2);
775+
ASSERT_TRUE(r2.has_value()) << "read itself still succeeds";
776+
777+
auto heartbeat = service->PromotionObjectHeartbeat(seg.client_id);
778+
ASSERT_TRUE(heartbeat.has_value());
779+
EXPECT_EQ(heartbeat->size(), 1u)
780+
<< "with global cap=1 and one task already in shard " << shard_of(k1)
781+
<< ", a key hashing to shard " << shard_of(k2)
782+
<< " must be rejected by the global gate (pre-fix it would have "
783+
<< "been admitted since its shard's local count was 0)";
784+
EXPECT_EQ(heartbeat->count(k1), 1u);
785+
EXPECT_EQ(heartbeat->count(k2), 0u);
786+
787+
service->RemoveAll();
788+
}
789+
653790
} // namespace mooncake::test
654791

655792
int main(int argc, char** argv) {

0 commit comments

Comments
 (0)