[Store] L2->L1 promotion-on-hit#2071
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a 'promotion-on-hit' feature that asynchronously promotes objects from local SSD (L2) back to DRAM (L1) when they are frequently accessed. The implementation includes master-side gating logic using a Count-Min Sketch, new RPC endpoints for promotion orchestration, and a client-side execution loop in FileStorage. Feedback focuses on critical concurrency and performance issues: processing too many promotion tasks in a single heartbeat could block the client, and the current replica commitment logic in NotifyPromotionSuccess is ambiguous and could lead to data corruption if concurrent Put operations occur. Additionally, a type mismatch between the frequency sketch and the admission threshold was identified.
9c38edf to
30845ca
Compare
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
8b1d481 to
fd69cf2
Compare
|
Hi @yzhan1 Overall the implementation follows as per the RFC. I'm curious about the following issues:
Ideal fix here would be to either return/prefer MEMORY replicas before LOCAL_DISK in Also, it would be nice to add a test that proves the post-promotion read actually selects MEMORY, not just that MEMORY metadata exists.
Could you keep unprocessed tasks queued, return only a bounded batch from the heartbeat, or process the full drained map with a byte/task budget that preserves leftovers? |
|
Overall, this pr makes sense to me from a design perspective. As a nice-to-have, would it be possible to supplement the results with a few more metrics? This would further strengthen the empirical validation, though it's by no means a blocker. |
…elected e2e proof Two PR-review issues from kvcache-ai#2071, addressed without changing the existing commit: 1. Heartbeat could silently drop leftover promotion tasks. The prior fix to a per-tick cap lived on the client side: master drained the whole per-LDS promotion_objects map, client processed one, the rest evaporated. Their per-shard promotion_tasks entries stayed refcnt-pinned until the reaper TTL (~10 min), and the dedup gate blocked re-enqueue, so a burst of hot LOCAL_DISK keys could lose all but one promotion for that window. Move the cap server-side. MasterService::PromotionObjectHeartbeat now extracts at most kMaxPerHeartbeat (1) entries from the per-LDS promotion_objects map and leaves the remainder queued for subsequent heartbeats. The client-side per-tick cap is removed — the master is the bound. Leftover work is preserved by construction. New unit test HeartbeatBoundedBatchPreservesLeftovers proves: with three queued keys, three successive heartbeats each return exactly one distinct key, the fourth returns empty, and all three keys' master-side state stays intact across the drain. The FakeClient harness in file_storage_promotion_test now mirrors the master's bounded-batch semantic on its mocked Heartbeat, so the existing client-side tests exercise the same drain shape production sees. 2. E2E couldn't prove subsequent reads avoided SSD. Post-promotion the test asserted bytes-back and the existence of a MEMORY replica, but bytes-back is the same whether MEMORY or LOCAL_DISK served the read (the offload-RPC path returns correct bytes too). The RFC's core invariant — "subsequent reads avoid SSD" — went unverified. Add an instrumented counter offload_rpc_read_count on RealClient that increments at every invocation of batch_get_into_offload_object_internal, the single chokepoint for LOCAL_DISK reads served via peer offload-RPC. Expose via the Python binding as MooncakeDistributedStore.get_offload_rpc_read_count(). The e2e snapshots the counter around the post-promotion store.get and asserts it didn't move — a non-zero delta would mean the read served from SSD instead of the promoted MEMORY replica.
|
Thank you for your patience. Overall, this PR looks quite complete to me. My final suggestion is regarding code quality: [Nit] The - std::unordered_map<std::string, const PromotionTask> promotion_tasks;
+ std::unordered_map<std::string, PromotionTask> promotion_tasks;- const_cast<PromotionTask&>(task_it->second).alloc_id = new_id;
+ task_it->second.alloc_id = new_id;The shard RW lock is already held in both
|
|
@LujhCoconut thanks! I will revise based on your comment. Also have some perf tests that I'd like to add and some other minor bugs I'm fixing. Will address and push shortly |
'''
|
…elected e2e proof Two PR-review issues from kvcache-ai#2071, addressed without changing the existing commit: 1. Heartbeat could silently drop leftover promotion tasks. The prior fix to a per-tick cap lived on the client side: master drained the whole per-LDS promotion_objects map, client processed one, the rest evaporated. Their per-shard promotion_tasks entries stayed refcnt-pinned until the reaper TTL (~10 min), and the dedup gate blocked re-enqueue, so a burst of hot LOCAL_DISK keys could lose all but one promotion for that window. Move the cap server-side. MasterService::PromotionObjectHeartbeat now extracts at most kMaxPerHeartbeat (1) entries from the per-LDS promotion_objects map and leaves the remainder queued for subsequent heartbeats. The client-side per-tick cap is removed — the master is the bound. Leftover work is preserved by construction. New unit test HeartbeatBoundedBatchPreservesLeftovers proves: with three queued keys, three successive heartbeats each return exactly one distinct key, the fourth returns empty, and all three keys' master-side state stays intact across the drain. The FakeClient harness in file_storage_promotion_test now mirrors the master's bounded-batch semantic on its mocked Heartbeat, so the existing client-side tests exercise the same drain shape production sees. 2. E2E couldn't prove subsequent reads avoided SSD. Post-promotion the test asserted bytes-back and the existence of a MEMORY replica, but bytes-back is the same whether MEMORY or LOCAL_DISK served the read (the offload-RPC path returns correct bytes too). The RFC's core invariant — "subsequent reads avoid SSD" — went unverified. Add an instrumented counter offload_rpc_read_count on RealClient that increments at every invocation of batch_get_into_offload_object_internal, the single chokepoint for LOCAL_DISK reads served via peer offload-RPC. Expose via the Python binding as MooncakeDistributedStore.get_offload_rpc_read_count(). The e2e snapshots the counter around the post-promotion store.get and asserts it didn't move — a non-zero delta would mean the read served from SSD instead of the promoted MEMORY replica.
Reviewer asked for more metrics. The post-promotion offload-RPC counter already proves MEMORY served the read; adding a wall-time comparison gives the same evidence on a different axis and surfaces a concrete "this is the user-visible win" number. Phase 3 timed the 4 admission-threshold-clearing reads as the LOCAL_DISK sample. Phase 5 expanded the single post-promotion read into 10 timed reads as the MEMORY sample, plus the existing counter assertion. The test now prints a median / min / max table for each phase and asserts the post-promotion median is at least 1.3x faster. The threshold is intentionally loose: on docker tmpfs with small (1 MB) objects much of the wall time is Python/C++ marshalling overhead and the SSD-vs-DRAM gap shrinks to ~2-5x; on real SSDs + RDMA the gap is typically 10-50x. The counter assertion is the hard correctness check; the latency assertion catches gross regressions while the printed summary serves as the primary empirical signal. Observed in CI (docker tmpfs, 1 MB objects): pre-promotion (LOCAL_DISK via offload-RPC): median 1.53 ms post-promotion (MEMORY direct): median 0.31 ms speedup: 4.9x
Reviewer asked to keep a richer latency comparison in the repo but NOT run it as a CI test. test_promotion_after_repeated_hits goes back to the lean correctness shape: 4 reads to clear the admission gate, 1 read post-promotion + offload-RPC counter assertion to prove MEMORY served, no timing. The full p50/p95/p99 latency comparison moves into a sibling class BenchPromotionLatency gated by @unittest.skipUnless(MC_BENCH_PROMOTION_LATENCY), so: python test_promotion_on_hit.py # 3 tests, 1 skipped — CI MC_BENCH_PROMOTION_LATENCY=1 python test_promotion_on_hit.py # full bench The bench retains the per-read MEMORY-detection (snapshots offload_rpc_read_count around each pre-promotion read and bails on detection of MEMORY-served reads so the LOCAL_DISK distribution stays uncontaminated) and the 1.3x p50 speedup assertion. Sample count is LATENCY_SAMPLES (default 1000; verified stable to within noise at N=10000 too — N=1000 is the sweet spot for stable p99 without paying for 10x more wall time). Observed bench numbers on docker tmpfs (1 MiB objects, N=1000): pre-promotion (LOCAL_DISK via offload-RPC): p50=0.52 ms, p95=0.68 ms, p99=0.81 ms post-promotion (MEMORY direct): p50=0.24 ms, p95=0.33 ms, p99=0.42 ms speedup: p50=2.2x | p95=2.0x | p99=1.9x Real-hardware speedup is typically 10-50x because the SSD-vs-DRAM cost gap dominates only when objects are large and the FFI/marshalling overhead is fractional.
…obal cap, const, misconfig log) Addresses reviewer feedback on PR kvcache-ai#2071. Issue 1 -- orphaned PROCESSING MEMORY replica leak. The promotion task reaper only dropped the source LOCAL_DISK refcnt and erased the task entry; it never popped the staged PROCESSING MEMORY replica added by PromotionAllocStart. That replica is not in shard->processing_keys, so DiscardExpiredProcessingReplicas could not sweep it and the buffer leaked until the object was removed or evicted. Fix: in the reaper, when alloc_id != 0, call metadata.EraseReplicaByID(alloc_id) to pop the staged replica and return its buffer to the allocator. Issue 2 -- per-shard cap was wrong for skewed workloads. The old gate was 'shard->size() * kNumShards >= limit', approximately right for uniform workloads but ~1024x too eager on skewed workloads where hot keys cluster in few shards. Replace with a cluster-wide std::atomic<uint64_t> promotion_in_flight_ counter. Incremented in TryPushPromotionQueue after successful emplace; decremented in NotifyPromotionSuccess and in the reaper. memory_order_relaxed since the value is advisory; the per-shard mutex already serializes inserts within a shard and the dedup gate prevents duplicate work. Issue 3 -- const_cast smell. The promotion_tasks map held const PromotionTask values for "generic safety", forcing a const_cast<PromotionTask&> in PromotionAllocStart to set alloc_id under the shard write lock. Drop the const; PromotionAllocStart now sets task_it->second.alloc_id = new_id directly. Misconfig log -- emit LOG(WARNING) at startup when config.promotion_on_hit=true but enable_offload=false. Promotion requires offload to produce LOCAL_DISK replicas, so it is silently disabled in that combination; the log makes the disablement discoverable to operators. Tests ----- - New ReaperPopsStagedMemoryReplicaOnExpiry: regression for Issue 1. Uses QuerySegments(seg).first (used bytes) to observe that the staged PROCESSING MEMORY replica's buffer is freed back to the DRAM allocator after the reaper sweeps the expired task. - New QueueLimitRejectsCrossShard: regression for Issue 2. With queue_limit=1, proves a second admission attempt on a *different shard* is rejected -- exactly the case the old per-shard cap admitted incorrectly. - Updated comment on existing QueueLimitRejectsBeyondCap to reflect the cluster-wide counter semantics. Verification ------------ - promotion_on_hit_test: 15/15 pass (5 consecutive clean runs) - file_storage_promotion_test: 9/9 pass - master_service_promotion_test_for_snapshot: 5/5 pass - offload_on_evict_test: 9/9 pass - code_format.sh --base upstream/main (clang-format-20 in container): 3 files reformatted (master_service.h, master_service.cpp, promotion_on_hit_test.cpp); all others "Already formatted" Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…est + reaper poll Follow-up to 2cee8eb addressing two further reviewer concerns and recovering one edit batch that did not land in that commit. AllocStart deadline reset ------------------------- A queued promotion task could enter the active-transfer phase (PromotionAllocStart -> client SSD read -> RDMA write -> Notify) with only a sliver of TTL remaining if the holder's heartbeat queue was backlogged (kMaxPerHeartbeat = 1 serializes work). If the active transfer of a large object then ran past the TTL, the reaper would call EraseReplicaByID on the staged MEMORY replica mid-transfer; the allocator could then hand that buffer to a concurrent Put while the client's RDMA write was still landing into it. Fix: PromotionAllocStart now resets task.start_time when it stages the PROCESSING MEMORY replica. The queue-wait phase (alloc_id == 0, no buffer) and the active-transfer phase (alloc_id != 0, buffer in flight) each get their own full put_start_release_timeout_sec_ TTL window, measured from when they began. New regression test AllocStartResetsTaskDeadline drives the queue_wait_seconds + alloc_active_seconds > TTL scenario and asserts via QuerySegments(seg).first that the staged buffer survives past the bare TTL when measured from admission. NotifyPromotionSuccess counter coverage --------------------------------------- The success path of NotifyPromotionSuccess was not exercised by any existing test — all call sites in the suite asserted error paths. 2cee8eb added a fetch_sub on the success path but had no test to catch a missing decrement. New test NotifySuccessDecrementsCounter sets queue_limit=1, drives the full happy path (GetReplicaList -> PromotionAllocStart -> NotifyPromotionSuccess), then asserts a second admission on a different key is allowed. Without the success-path fetch_sub, the cap stays saturated at 1 and the second admission is dropped. Bug discrimination verified manually: temporarily removing fetch_sub fails this test as expected. Reaper test poll-instead-of-sleep --------------------------------- ReaperPopsStagedMemoryReplicaOnExpiry used a fixed 2-second sleep after a 1-second TTL. Under suite load the eviction thread cadence plus general scheduling jitter could push past the 2-second mark ~1 in 6 runs, producing a flake matching the StalePromotionReaper class. Replace with a 5-second polling deadline against QuerySegments(seg).first; the test now passes 5/5 stability runs. QueueLimitRejectsBeyondCap stale block comment ---------------------------------------------- The test's docblock still described the old per-shard gate (gate: shard_size * kNumShards >= limit). Update it to reflect the cluster-wide promotion_in_flight_ counter and cross-reference QueueLimitRejectsCrossShard for the cross-shard case. Verification ------------ - promotion_on_hit_test: 17/17 pass × 5 consecutive clean runs - New tests pass; mutation-test of NotifySuccessDecrementsCounter against a removed success-path fetch_sub fails as designed - code_format.sh --base upstream/main (clang-format-20 in container): 3 files reformatted; all others already-formatted Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…obal cap, const, misconfig log) Addresses reviewer feedback on PR kvcache-ai#2071. Issue 1 -- orphaned PROCESSING MEMORY replica leak. The promotion task reaper only dropped the source LOCAL_DISK refcnt and erased the task entry; it never popped the staged PROCESSING MEMORY replica added by PromotionAllocStart. That replica is not in shard->processing_keys, so DiscardExpiredProcessingReplicas could not sweep it and the buffer leaked until the object was removed or evicted. Fix: in the reaper, when alloc_id != 0, call metadata.EraseReplicaByID(alloc_id) to pop the staged replica and return its buffer to the allocator. Issue 2 -- per-shard cap was wrong for skewed workloads. The old gate was 'shard->size() * kNumShards >= limit', approximately right for uniform workloads but ~1024x too eager on skewed workloads where hot keys cluster in few shards. Replace with a cluster-wide std::atomic<uint64_t> promotion_in_flight_ counter. Incremented in TryPushPromotionQueue after successful emplace; decremented in NotifyPromotionSuccess and in the reaper. memory_order_relaxed since the value is advisory; the per-shard mutex already serializes inserts within a shard and the dedup gate prevents duplicate work. Issue 3 -- const_cast smell. The promotion_tasks map held const PromotionTask values for "generic safety", forcing a const_cast<PromotionTask&> in PromotionAllocStart to set alloc_id under the shard write lock. Drop the const; PromotionAllocStart now sets task_it->second.alloc_id = new_id directly. Misconfig log -- emit LOG(WARNING) at startup when config.promotion_on_hit=true but enable_offload=false. Promotion requires offload to produce LOCAL_DISK replicas, so it is silently disabled in that combination; the log makes the disablement discoverable to operators. Tests ----- - New ReaperPopsStagedMemoryReplicaOnExpiry: regression for Issue 1. Uses QuerySegments(seg).first (used bytes) to observe that the staged PROCESSING MEMORY replica's buffer is freed back to the DRAM allocator after the reaper sweeps the expired task. - New QueueLimitRejectsCrossShard: regression for Issue 2. With queue_limit=1, proves a second admission attempt on a *different shard* is rejected -- exactly the case the old per-shard cap admitted incorrectly. - Updated comment on existing QueueLimitRejectsBeyondCap to reflect the cluster-wide counter semantics. Verification ------------ - promotion_on_hit_test: 15/15 pass (5 consecutive clean runs) - file_storage_promotion_test: 9/9 pass - master_service_promotion_test_for_snapshot: 5/5 pass - offload_on_evict_test: 9/9 pass - code_format.sh --base upstream/main (clang-format-20 in container): 3 files reformatted (master_service.h, master_service.cpp, promotion_on_hit_test.cpp); all others "Already formatted" Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
3b0a1da to
3ff606b
Compare
Description
Adds the L2→L1 promotion-on-hit feature: when a key whose only complete replica lives on a peer client's local SSD (LOCAL_DISK) is read enough times to clear an admission threshold, the master enqueues a promotion task and the holder client's next FileStorage heartbeat stages a fresh MEMORY replica via Transfer Engine. Symmetric counterpart to offload-on-evict: offload moves cold data DRAM→SSD on eviction, promotion moves rediscovered-warm data SSD→DRAM on access.
The trigger lives inside
MasterService::GetReplicaList. A CountMinSketch tracks per-key access frequency; gates filter out cases where promotion would be wasteful or unsafe (no LOCAL_DISK source, MEMORY already present, DRAM under high-watermark pressure, queue saturated, or task already in-flight for the same key). When all gates pass, the master records aPromotionTaskin the per-shard map (refcnt-pinning the source replica) and pushes the key onto the holder's per-LocalDiskSegmentpromotion_objectsmap. The holder's FileStorage heartbeat thread drains a bounded batch from that map, callsPromotionAllocStartto stage a PROCESSING MEMORY replica, reads the bytes from local SSD, RDMA-WRITEs them viaPromotionWrite, and notifies the master.NotifyPromotionSuccessflips the new replica COMPLETE and decrements the source LOCAL_DISK refcnt. A reaper drops expired tasks; orphaned PROCESSING replicas are reaped via the standard discarded-replicas path.Failure handling matches offload's posture: per-key failures are logged and skipped, and the master-side reaper handles task TTL expiry on its own schedule.
Key design points addressed in this PR
ReplicaIDof the staged MEMORY replica captured duringPromotionAllocStart.NotifyPromotionSuccesscommits exactly that replica viaGetReplicaByID, so a parallel Put creating its own PROCESSING MEMORY replica on the same key cannot be confused with ours.MasterService::PromotionObjectHeartbeatreturns at mostkMaxPerHeartbeat = 1task per call and leaves the rest queued; the client processes whatever the master returned and never blocks beyond a single task. Leftovers are preserved by construction — no silent drops.promotion_admission_thresholdis clamped to [0, 255] at config parse time with a warning, giving the gate a stable comparison contract.Config knobs (default off)
--promotion_on_hit=true— feature flag--promotion_admission_threshold=N— reads per key before the gate fires (default 2; clamped to ≤255)--promotion_queue_limit=N— soft per-shard cap on in-flight promotion tasks (default 50000)Module
mooncake-transfer-engine)mooncake-store)mooncake-ep)mooncake-integration)mooncake-p2p-store)mooncake-wheel)mooncake-pg)mooncake-rl)Type of Change
How Has This Been Tested?
Master-side unit tests (
mooncake-store/tests/promotion_on_hit_test.cpp, 13 tests):promotion_queue_limitrejects beyond saturation;PromotionObjectHeartbeatreturns ≤kMaxPerHeartbeatper call and preserves leftover work (HeartbeatBoundedBatchPreservesLeftovers)preferred_segmentshonoredClient-side unit tests (
mooncake-store/tests/file_storage_promotion_test.cpp, 9 tests, FakeClient mocks the master RPCs):Snapshot tests (
mooncake-store/tests/ha/snapshot/master_service_promotion_test_for_snapshot.cpp, 5 tests):End-to-end Python tests (
mooncake-wheel/tests/test_promotion_on_hit.py, 2 tests in CI):store.getclears admission threshold → MEMORY replica reappears → bit-exact bytes verified pre- AND post-promotion AND the post-promotion read is proven to serve from MEMORY (not LOCAL_DISK SSD) by snapshottingMooncakeDistributedStore.get_offload_rpc_read_count()around the read and asserting zero deltaGated behind
TEST_PROMOTION_ON_HITinscripts/run_tests.sh.Opt-in latency benchmark (
BenchPromotionLatencyclass in the same file, skipped by default; run withMC_BENCH_PROMOTION_LATENCY=1):The CI test proves MEMORY served the post-promotion read via the offload-RPC counter delta. For a richer empirical view, the benchmark times every read with
time.perf_counter, samplesLATENCY_SAMPLESreads per phase (default 1000), and reports p50/p95/p99 with a per-percentile speedup table. Pre-promotion reads bail the moment a read serves from MEMORY (detected via the same counter) so the LOCAL_DISK distribution stays uncontaminated.Observed numbers from a docker-tmpfs run with 1 MiB objects, N=1000 samples:
The speedup holds across the distribution — MEMORY's win isn't a median trick. The 1.3× p50 assertion baked into the bench has comfortable headroom over the observed 2×. On docker tmpfs (no real SSD, no RDMA NIC) much of the per-read wall-time is Python/C++ marshalling overhead, so the SSD-vs-DRAM cost gap is compressed; on real hardware (NVMe + RDMA) the ratio is typically 10–50×.
The same numbers replicated at N=10000 with
MOONCAKE_OFFLOAD_HEARTBEAT_INTERVAL_SECONDS=30(so the worker doesn't race a longer pre-loop): p50/p95/p99 changed by ≤0.02 ms vs the N=1000 run, confirming N=1000 is the right default for stable tail percentiles.Follow-ups (not in this PR)
kMaxPerHeartbeat=1to stay inside the client-liveness window. If real workloads show the resulting drain rate (≈ 1 promotion / heartbeat interval) becomes a bottleneck, an obvious next step is a dedicated worker thread that pulls from a bounded queue: the heartbeat enqueues, the worker drains independently, and the cap can be relaxed. ~180 LOC + tests; deferred unless needed.Checklist
./scripts/code_format.shbefore submitting.