Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e9f0806
Update cache manager and related modules
kevincheng2 Mar 23, 2026
64cbe9f
chore: update cache_manager and related modules
kevincheng2 Mar 24, 2026
80f5350
fix: add node to evictable set in complete_swap_to_device
kevincheng2 Mar 24, 2026
b440af6
feat: update cache manager v1 and related modules
kevincheng2 Mar 25, 2026
647bc9b
feat(cache): add cache controller v1 implementation
kevincheng2 Mar 26, 2026
ed35db1
feat(cache_manager): update cache manager v1
kevincheng2 Mar 27, 2026
4af7d71
fix(cache_manager): 修复 swap_cache H2D/D2H 方向的 block_ids 逻辑并清理 Forward…
kevincheng2 Mar 27, 2026
2d87335
feat(cache_manager): refactor cache manager v1 and optimize swap ops
kevincheng2 Mar 30, 2026
bdf2632
[KVCache][MTP] 支持 cache_manager_v1 下的 MTP KV Cache 初始化及多模态 hash
kevincheng2 Mar 30, 2026
8729a87
fix(cache_manager): multi-GPU fix, mm hash boundary fix, and remove b…
kevincheng2 Mar 31, 2026
3a0e50b
[BugFix][KVCache] fix List import and move write_policy normalization…
kevincheng2 Mar 31, 2026
a049c1b
[BugFix][KVCache] fix pre-commit code style issues
kevincheng2 Mar 31, 2026
7716126
[Feature][KVCache] update cache_manager_v1 modules
kevincheng2 Mar 31, 2026
bb98c75
[Feature][KVCache] add BatchRequest.from_tasks and refactor worker ta…
kevincheng2 Mar 31, 2026
d141dd6
[Feature][KVCache] add NUMA affinity for host cache and skip swap cac…
kevincheng2 Mar 31, 2026
4275e1a
[BugFix][KVCache] remove debug logging code
kevincheng2 Apr 1, 2026
2e1dc2f
[BugFix][KVCache] fix cupy device id caching and pickle for _match_re…
kevincheng2 Apr 3, 2026
9c66d8a
[Feature][KVCache] update cache_manager_v1 transfer and storage modules
kevincheng2 Apr 1, 2026
c68f0a6
[KVCache] implement storage prefetch/backup and D2H→Storage chaining …
kevincheng2 Apr 2, 2026
32d115f
[KVCache] remove unused elapsed_put_s variable in mooncake connector
kevincheng2 Apr 2, 2026
ebf0c5f
[KVCache] refactor transfer_manager and add staging_manager
kevincheng2 Apr 3, 2026
5a67fbe
[KVCache][Feature] implement storage prefetch ZMQ pipeline in Schedul…
kevincheng2 Apr 14, 2026
7936366
sync: align all core files with upstream/develop
kevincheng2 May 7, 2026
80e7281
Revert "sync: align all core files with upstream/develop"
kevincheng2 May 7, 2026
570840a
fix: remove IPCLock and add missing attrs to align with upstream
kevincheng2 May 7, 2026
aabfd97
feat: refactor storage prefetch - 3-phase architecture
kevincheng2 May 7, 2026
544bd73
[KVCache][Engine] fix has_pending_work and move swap/evict to worker …
kevincheng2 May 8, 2026
a3be152
[KVCache][Engine][OP] cache manager v1 refactor, engine fixes and new…
kevincheng2 May 8, 2026
0677e15
[KVCache][Engine][BugFix] fix cache evict metadata direction and reso…
kevincheng2 May 8, 2026
5857171
[KVCache][BugFix] fix increment ref count logic in cache_manager
kevincheng2 May 9, 2026
b50b6da
[KVCache] unify host block allocation through allocate_host_blocks
kevincheng2 May 9, 2026
8dea20a
[KVCache][Scheduler] disable write_cache_to_storage* calls under cach…
kevincheng2 May 9, 2026
a656c6e
[KVCache][BugFix] fix storage prefetch nodes inserted at wrong radix …
kevincheng2 May 11, 2026
d889bef
fix: pre-commit fixes for test_mm_warmup.py imports and formatting
kevincheng2 May 11, 2026
4dcda16
fix: move get_position_ids_and_mask_encoder_batch to non-iluvatar imp…
kevincheng2 May 11, 2026
5cbabea
refactor: remove staging_manager, update transfer and storage connector
kevincheng2 May 12, 2026
b4f54a9
refactor: simplify storage interface to batch-only, fix prefetch ref_…
kevincheng2 May 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
KVCacheStorage,
logger,
)
from fastdeploy.cache_manager.transfer_factory.utils import get_rdma_nics
from fastdeploy.cache_manager.v1.cache_utils import get_rdma_nics
from fastdeploy.platforms import current_platform
from fastdeploy.utils import get_host_ip

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import traceback

from fastdeploy.cache_manager.transfer_factory.utils import get_rdma_nics
from fastdeploy.cache_manager.v1.cache_utils import get_rdma_nics
from fastdeploy.utils import get_logger

logger = get_logger("cache_messager", "cache_messager.log")
Expand Down
34 changes: 2 additions & 32 deletions fastdeploy/cache_manager/transfer_factory/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,6 @@
# limitations under the License.
"""

import importlib
import subprocess
from fastdeploy.cache_manager.v1.cache_utils import get_rdma_nics

from fastdeploy.platforms import current_platform
from fastdeploy.utils import get_logger

logger = get_logger("cache_messager", "cache_messager.log")


def get_rdma_nics():
res = importlib.resources.files("fastdeploy.cache_manager.transfer_factory") / "get_rdma_nics.sh"
with importlib.resources.as_file(res) as path:
file_path = str(path)

nic_type = current_platform.device_name
command = ["bash", file_path, nic_type]
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
logger.info(f"get_rdma_nics command: {command}")
logger.info(f"get_rdma_nics output: {result.stdout}")
if result.returncode != 0:
raise RuntimeError(f"Failed to execute script `get_rdma_nics.sh`: {result.stderr.strip()}")

env_name, env_value = result.stdout.strip().split("=")
if env_name != "KVCACHE_RDMA_NICS":
raise ValueError(f"Unexpected variable name: {env_name}, expected 'KVCACHE_RDMA_NICS'")

return env_value
__all__ = ["get_rdma_nics"]
5 changes: 4 additions & 1 deletion fastdeploy/cache_manager/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
from .base import KVCacheBase
from .cache_controller import CacheController
from .cache_manager import CacheManager
from .cache_utils import LayerDoneCounter, LayerSwapTimeoutError
from .cache_utils import LayerDoneCounter, LayerSwapTimeoutError, get_rdma_nics
from .metadata import (
AsyncTaskHandler,
BlockNode,
CacheBlockMetadata,
CacheStatus,
MatchResult,
PDTransferMetadata,
PendingPrefetch,
StorageConfig,
StorageMetadata,
StorageType,
Expand All @@ -49,6 +50,7 @@
"LayerSwapTimeoutError",
# Utils
"LayerDoneCounter",
"get_rdma_nics",
# Metadata
"CacheBlockMetadata",
"BlockNode",
Expand All @@ -60,6 +62,7 @@
"AsyncTaskHandler",
"MatchResult",
"StorageMetadata",
"PendingPrefetch",
"PDTransferMetadata",
"StorageConfig",
"StorageType",
Expand Down
150 changes: 120 additions & 30 deletions fastdeploy/cache_manager/v1/cache_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, Dict, List, Optional

Expand Down Expand Up @@ -111,6 +112,11 @@ def write_policy(self) -> Optional[str]:
return self.cache_config.write_policy
return None

This comment was marked as outdated.

@property
def storage_enabled(self) -> bool:

This comment was marked as outdated.

This comment was marked as outdated.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 storage_enabled 通过 getattr(self._transfer_manager, "_storage_connector", None) 读取另一个类的私有属性,是跨类私有属性访问反模式。

建议在 CacheTransferManager 上添加公共属性:

# transfer_manager.py
@property
def has_storage_connector(self) -> bool:
    return self._storage_connector is not None

然后 CacheController.storage_enabled 改为:

return self._transfer_manager.has_storage_connector

"""Whether a storage connector is available for Host↔Storage transfers."""

This comment was marked as outdated.

return getattr(self._transfer_manager, "_storage_connector", None) is not None

This comment was marked as outdated.


def _should_wait_for_swap_out(self) -> bool:
"""
Determine if swap-out operations should wait synchronously.
Expand Down Expand Up @@ -147,7 +153,17 @@ def submit_swap_tasks(
# Note: evict returns LayerDoneCounter but we don't wait on it layer-by-layer
# (except in write_back mode where we wait synchronously via wait_all)
if evict_metadata is not None:
evict_counter = self.evict_device_to_host(evict_metadata)
# Build StorageMetadata when storage is enabled and hash_values are available,
# so that D2H eviction is automatically followed by Host→Storage backup.
storage_metadata = None
if self.storage_enabled and evict_metadata.hash_values:
storage_metadata = StorageMetadata(
hash_values=evict_metadata.hash_values,
block_ids=evict_metadata.dst_block_ids,
direction="evict",
)

evict_counter = self.evict_device_to_host(evict_metadata, storage_metadata)
self._pending_evict_counters.append(evict_counter)

# Step 3: For write_back, wait for evict to complete before submitting swap-in
Expand Down Expand Up @@ -622,6 +638,15 @@ def initialize_host_cache(
# Share host_cache_kvs_map with transfer manager
self._transfer_manager.set_host_cache_kvs_map(self.host_cache_kvs_map)

# Propagate block shape so transfer manager can compute per-block byte offsets
# for prefetch_from_storage / backup_to_storage.
self._transfer_manager.set_host_block_shape(
key_shape=self._host_key_cache_shape,
value_shape=self._host_value_cache_shape,
scale_shape=self._host_cache_scale_shape,
cache_item_bytes=cache_item_bytes,
)

def get_host_cache_kvs_map(self) -> Dict[str, Any]:
"""
Get the Host KV Cache pointer dictionary.
Expand All @@ -641,6 +666,7 @@ def _submit_swap_task(
transfer_fn_all: callable,
transfer_fn_layer: callable,
force_all_layers: bool = False,
on_success: callable = None,
) -> LayerDoneCounter:
"""
Submit a single swap transfer task (internal method).
Expand All @@ -658,6 +684,8 @@ def _submit_swap_task(
transfer_fn_all: All-layer transfer function, signature (src_ids, dst_ids) -> bool.
transfer_fn_layer: Layer-by-layer transfer function, signature (layer_indices, on_layer_complete, src_ids, dst_ids) -> bool.
force_all_layers: If True, always use all-layers mode (used for D2H evict).
on_success: Optional callback invoked after a successful transfer,
signature () -> None. Runs in the same worker thread.

Returns:
LayerDoneCounter instance for tracking layer completion.
Expand Down Expand Up @@ -755,9 +783,14 @@ def _do_transfer():
meta.success = result.success
meta.error_message = result.error_message

except Exception as e:
import traceback
# Chain next task on success
if result.success and on_success is not None:
try:
on_success()
except Exception as cb_err:
logger.error(f"[SwapTask] on_success callback failed: {cb_err}\n" f"{traceback.format_exc()}")

except Exception as e:
traceback.print_exc()
logger.error(
f"[SwapTask] {src_location.value}->{dst_location.value} "
Expand Down Expand Up @@ -807,6 +840,7 @@ def load_host_to_device(
def evict_device_to_host(
self,
swap_metadata: CacheSwapMetadata,
storage_metadata: Optional[StorageMetadata] = None,
) -> LayerDoneCounter:
"""
Evict device cache to host (async).
Expand All @@ -818,17 +852,32 @@ def evict_device_to_host(
swap_metadata: CacheSwapMetadata containing:
- src_block_ids: Source device block IDs
- dst_block_ids: Destination host block IDs
storage_metadata: Optional StorageMetadata. If provided, a
backup_host_to_storage task is automatically submitted
after the D2H transfer succeeds (chained in the same
worker thread).

Returns:
LayerDoneCounter for tracking layer completion.
"""
on_success = None
if storage_metadata is not None:
host_block_ids = swap_metadata.dst_block_ids

def _on_success_backup():
logger.debug(f"[EvictAndBackup] D2H done, chaining backup to storage " f"host_blocks={host_block_ids}")
self.backup_host_to_storage(host_block_ids, storage_metadata)

on_success = _on_success_backup

layer_counter = self._submit_swap_task(
meta=swap_metadata,
src_location=CacheLevel.DEVICE,
dst_location=CacheLevel.HOST,
transfer_fn_all=lambda src_ids, dst_ids: self._transfer_manager.evict_to_host_async(src_ids, dst_ids),
transfer_fn_layer=None,
force_all_layers=True, # Eviction always uses output_stream for all-layers async transfer
on_success=on_success,
)
return layer_counter

Expand All @@ -854,35 +903,43 @@ def prefetch_from_storage(

handler = AsyncTaskHandler()

# TODO: Implement storage prefetch logic
handler.set_error("Storage prefetch not implemented yet")

return handler

def backup_device_to_storage(
self,
device_block_ids: List[int],
metadata: StorageMetadata,
) -> AsyncTaskHandler:
"""
Backup device cache to storage (async).
hash_values = metadata.hash_values
block_ids = metadata.block_ids

Backup KV cache from device memory to external storage
for reuse by subsequent requests.
if not hash_values or not block_ids:
logger.info(f"[StoragePrefetch] skip: empty hash_values={hash_values}, block_ids={block_ids}")
handler.set_error("Empty hash_values or block_ids in StorageMetadata")
return handler

Args:
device_block_ids: Device block IDs to backup.
metadata: Storage transfer metadata.

Returns:
AsyncTaskHandler for tracking the async transfer task.
"""

handler = AsyncTaskHandler()
def _do_prefetch():
try:
start_time = time.time()
results = self._transfer_manager.prefetch_from_storage(
hash_list=hash_values,
cpu_block_list=block_ids,
)
elapsed = time.time() - start_time

# TODO: Implement storage backup logic
handler.set_error("Storage backup not implemented yet")
success = all(results)
if success:
logger.debug(
f"[StoragePrefetch] success hash_values={hash_values} "
f"block_ids={block_ids} elapsed={elapsed*1000:.3f}ms"
)
handler.set_result(results)
else:
failed_indices = [i for i, ok in enumerate(results) if not ok]
logger.warning(
f"[StoragePrefetch] partial failure "
f"failed_indices={failed_indices} elapsed={elapsed*1000:.3f}ms"
)
handler.set_error(f"Storage prefetch failed for blocks at indices {failed_indices}")
except Exception as e:
traceback.print_exc()
logger.error(f"[StoragePrefetch] EXCEPTION: {e}\n{traceback.format_exc()}")
handler.set_error(str(e))

self._executor.submit(_do_prefetch)
return handler

def backup_host_to_storage(
Expand All @@ -905,9 +962,42 @@ def backup_host_to_storage(

handler = AsyncTaskHandler()

# TODO: Implement storage backup logic
handler.set_error("Storage backup not implemented yet")
hash_values = metadata.hash_values

if not host_block_ids or not hash_values:
logger.info(f"[StorageBackup] skip: empty host_block_ids={host_block_ids}, " f"hash_values={hash_values}")
handler.set_error("Empty host_block_ids or hash_values in StorageMetadata")
return handler

def _do_backup():
try:
start_time = time.time()
results = self._transfer_manager.backup_to_storage(
cpu_block_list=host_block_ids,
hash_list=hash_values,
)
elapsed = time.time() - start_time

success = all(results)
if success:
logger.debug(
f"[StorageBackup] success host_block_ids={host_block_ids} "
f"hash_values={hash_values} elapsed={elapsed*1000:.3f}ms"
)
handler.set_result(results)
else:
failed_indices = [i for i, ok in enumerate(results) if not ok]
logger.warning(
f"[StorageBackup] partial failure "
f"failed_indices={failed_indices} elapsed={elapsed*1000:.3f}ms"
)
handler.set_error(f"Storage backup failed for blocks at indices {failed_indices}")
except Exception as e:
traceback.print_exc()
logger.error(f"[StorageBackup] EXCEPTION: {e}\n{traceback.format_exc()}")
handler.set_error(str(e))

self._executor.submit(_do_backup)
return handler

def send_to_node(
Expand Down
Loading
Loading