Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion fastdeploy/cache_manager/cache_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,35 @@
# limitations under the License.
"""

from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional

from fastdeploy.utils import get_logger

logger = get_logger("prefix_cache_manager", "cache_manager.log")


@dataclass
class AuxBlockDataSpec:
"""
Describes a type of auxiliary data bound to KVCache blocks.
CacheTransferManager iterates registered specs during swap/storage
to perform corresponding data transfers.
"""

name: str
num_layers: int
per_token_size: int = 0
block_size: int = 0
dtype: str = "uint8"
swap_buffer: Optional[Any] = None
enabled: bool = True

def get_storage_key(self, key_prefix: str, block_hash: str, rank: int) -> str:
return f"prefix{key_prefix}_{block_hash}_{rank}_{self.name}"


class CacheStatus(Enum):
"""
cache status enum class
Expand Down Expand Up @@ -56,6 +78,7 @@ def __init__(
cache_status=CacheStatus.GPU,
is_persistent=False,
persistent_shared_count=0,
aux_data_names=None,
):
"""
Args:
Expand Down Expand Up @@ -89,6 +112,7 @@ def __init__(
self.cache_status = cache_status
self.is_persistent = is_persistent
self.persistent_shared_count = persistent_shared_count
self.aux_data_names = aux_data_names or []
self.req_id_set = set()

def __lt__(self, other):
Expand All @@ -102,7 +126,7 @@ def __lt__(self, other):
else:
return self.depth > other.depth

def __str__(self):
def __str__(self) -> str:
"""
return node info
"""
Expand Down
220 changes: 219 additions & 1 deletion fastdeploy/cache_manager/cache_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import threading
import time
import traceback
import weakref
from typing import List

import numpy as np
Expand All @@ -48,7 +49,7 @@
FileStore,
MooncakeStore,
)
from fastdeploy.config import CacheConfig, SpeculativeConfig
from fastdeploy.config import CacheConfig, RoutingReplayConfig, SpeculativeConfig
from fastdeploy.engine.request import ControlRequest, ControlResponse
from fastdeploy.inter_communicator import EngineCacheQueue, IPCSignal, KVCacheStatus
from fastdeploy.inter_communicator.fmq import FMQ
Expand Down Expand Up @@ -129,7 +130,11 @@ def parse_args():
)
parser.add_argument("--model_path", type=str, help="The path of model")

# Routing replay (R3) — single JSON arg, mirrors SpeculativeConfig pattern
parser.add_argument("--routing_replay_config", type=json.loads, default="{}", help="Routing replay config JSON")

args = parser.parse_args()
args.routing_replay_config = RoutingReplayConfig(args.routing_replay_config)
return args


Expand Down Expand Up @@ -241,6 +246,25 @@ def __init__(self, args):
self._init_cpu_cache()
if self.storage_backend_type is not None:
self._init_storage(args)

# Initialize auxiliary data specs (e.g., routing replay)
self.aux_data_specs = {}
self.routing_host_view = None
self.routing_swap_buffer = None
self.routing_replay_config = args.routing_replay_config
self.engine_worker_queue_port = args.engine_worker_queue_port
self._init_routing_aux_data()

This comment was marked as outdated.


# Register finalizer to release routing SharedMemory on process exit.
# Must use a static method — callback must NOT hold a reference to self,
# otherwise the object can never be GC'd and the finalizer won't fire.
self._finalizer = weakref.finalize(
self,
CacheTransferManager._cleanup_routing_resources,
self.routing_swap_buffer,
self.routing_host_view,
)

self._init_control()

cache_task_broadcast_data = np.zeros(shape=[1], dtype=np.int32)
Expand Down Expand Up @@ -307,6 +331,185 @@ def __init__(self, args):
)
self.cache_transfer_inited_signal.value[self.rank] = 1

def _init_routing_aux_data(self):
"""Initialize routing auxiliary data buffers for swap sync."""
routing_replay_config = self.routing_replay_config
if not routing_replay_config.enable_routing_replay:
return

try:
from fastdeploy.cache_manager.cache_data import AuxBlockDataSpec
from fastdeploy.cache_manager.routing_cache_manager import (
RoutingHostBufferView,
RoutingSwapBuffer,
)

num_moe_layers = routing_replay_config.num_moe_layers
moe_top_k = routing_replay_config.moe_top_k
routing_dtype = routing_replay_config.routing_dtype

if num_moe_layers == 0 or moe_top_k == 0:
return

spec = AuxBlockDataSpec(
name="routing",
num_layers=num_moe_layers,
per_token_size=moe_top_k,
block_size=self.block_size,
dtype=routing_dtype,
)

# Create routing swap buffer (for CPU blocks).
# Only rank 0 needs it — _swap_routing() only runs on rank 0.
if self.num_cpu_blocks > 0 and self.rank == 0:
dp_suffix = str(self.engine_worker_queue_port)
self.routing_swap_buffer = RoutingSwapBuffer(

This comment was marked as outdated.

num_cpu_blocks=self.num_cpu_blocks,
block_size=self.block_size,
num_moe_layers=num_moe_layers,
top_k=moe_top_k,
dtype=routing_dtype,
dp_suffix=dp_suffix,
)
spec.swap_buffer = self.routing_swap_buffer

This comment was marked as outdated.


# Attach to routing host buffer (SharedMemory created by Engine)
dp_suffix = str(self.engine_worker_queue_port)
shm_name = f"routing_host_buffer.{dp_suffix}"
max_num_kv_tokens = self.num_gpu_blocks * self.block_size
shape = (max_num_kv_tokens, num_moe_layers, moe_top_k)
try:
self.routing_host_view = RoutingHostBufferView(shape=shape, dtype=routing_dtype, shm_name=shm_name)
logger.info(f"[R3] CTM attached to RoutingHostBuffer: {shm_name}")
except FileNotFoundError:
logger.warning(f"[R3] CTM RoutingHostBuffer {shm_name} not found")

self.aux_data_specs["routing"] = spec
logger.info(f"[R3] CTM registered routing aux data: layers={num_moe_layers}, top_k={moe_top_k}")

except Exception as e:

This comment was marked as outdated.

logger.warning(f"[R3] CTM failed to init routing aux data: {e}")

This comment was marked as outdated.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review 关于 routing_swap_buffer.close() 的建议合理,可以在 CTM 进程退出时加上清理。但有两点需要补充:

  1. RoutingHostBufferView 不是 owner,不负责 unlink(),close() 只释放 fd,进程退出自动回收,优先级较低。
  2. 真正更重要的遗漏:RoutingHostBuffer 的 owner 是 RoutingCacheManager(Engine 侧),它的 close() 方法存在但从未被调用。Engine 进程退出时 /dev/shm/routing_host_buffer. 同样不会被清理,也应该一并修复。


@staticmethod
def _cleanup_routing_resources(routing_swap_buffer, routing_host_view):
"""Release routing SharedMemory on process exit. Called by weakref.finalize."""
if routing_swap_buffer is not None:
routing_swap_buffer.close()
if routing_host_view is not None:
routing_host_view.close()

def _swap_routing(self, gpu_block_ids, cpu_block_ids, direction):
"""
Swap routing data between routing_host_buffer and routing_swap_buffer.
Pure CPU-to-CPU numpy memcpy, no GPU DMA.
Only rank 0 performs this (routing buffers are cross-rank SharedMemory).
"""
if self.routing_host_view is None or self.routing_swap_buffer is None:
logger.warning(
f"[R3] _swap_routing skipped: host_view={self.routing_host_view is not None}, "
f"swap_buffer={self.routing_swap_buffer is not None}"
)
return
if self.rank > 0:
return
bs = self.block_size
for gpu_bid, cpu_bid in zip(gpu_block_ids, cpu_block_ids):

This comment was marked as outdated.

gpu_start = gpu_bid * bs
gpu_end = gpu_start + bs
cpu_start = cpu_bid * bs
cpu_end = cpu_start + bs
if direction == "to_cpu":
self.routing_swap_buffer.buffer[cpu_start:cpu_end] = self.routing_host_view.buffer[gpu_start:gpu_end]
elif direction == "to_gpu":
self.routing_host_view.buffer[gpu_start:gpu_end] = self.routing_swap_buffer.buffer[cpu_start:cpu_end]
else:
raise ValueError(f"[R3] _swap_routing: unknown direction '{direction}', expected 'to_cpu' or 'to_gpu'")
logger.info(
f"[R3] _swap_routing {direction}: {len(gpu_block_ids)} blocks, "
f"gpu_ids={gpu_block_ids[:3]}{'...' if len(gpu_block_ids) > 3 else ''}, "
f"cpu_ids={cpu_block_ids[:3]}{'...' if len(cpu_block_ids) > 3 else ''}"
)

def _write_routing_to_storage(self, task_keys, gpu_block_ids):
"""
Write routing data from routing_host_buffer to storage backend.
Only for mooncake/file backends; only tp_rank=0 writes routing.
"""
if self.routing_host_view is None or self.rank != 0:
return
if self.storage_backend_type not in ("mooncake", "file"):
return

try:
spec = self.aux_data_specs.get("routing")
if spec is None or not spec.enabled:
return

bs = self.block_size
routing_keys = []
routing_ptrs = []
routing_sizes = []
per_block_bytes = bs * spec.num_layers * spec.per_token_size * np.dtype(spec.dtype).itemsize
Comment thread
gongshaotian marked this conversation as resolved.

for block_hash, gpu_bid in zip(task_keys, gpu_block_ids):
key = spec.get_storage_key(self.key_prefix, block_hash, self.rank)
start = gpu_bid * bs
end = start + bs
block_data = self.routing_host_view.buffer[start:end]
if not block_data.flags["C_CONTIGUOUS"]:
Comment thread
gongshaotian marked this conversation as resolved.
block_data = np.ascontiguousarray(block_data)
routing_keys.append(key)
routing_ptrs.append(block_data.ctypes.data)
routing_sizes.append(per_block_bytes)

if routing_keys:
self.storage_backend.batch_set(
keys=routing_keys, target_locations=routing_ptrs, target_sizes=routing_sizes
)
logger.debug(f"[R3] Wrote {len(routing_keys)} routing blocks to storage")
except Exception as e:
logger.warning(f"[R3] Failed to write routing to storage: {e}")

def _read_routing_from_storage(self, task_keys, gpu_block_ids):
"""
Read routing data from storage backend into routing_host_buffer.
Only for mooncake/file backends; only tp_rank=0 reads routing.
"""
if self.routing_host_view is None or self.rank != 0:
return
if self.storage_backend_type not in ("mooncake", "file"):
return

try:
spec = self.aux_data_specs.get("routing")
if spec is None or not spec.enabled:
return

bs = self.block_size
per_block_bytes = bs * spec.num_layers * spec.per_token_size * np.dtype(spec.dtype).itemsize

for block_hash, gpu_bid in zip(task_keys, gpu_block_ids):
key = spec.get_storage_key(self.key_prefix, block_hash, self.rank)
start = gpu_bid * bs
end = start + bs
target_slice = self.routing_host_view.buffer[start:end]
if not target_slice.flags["C_CONTIGUOUS"]:
# Need contiguous target for ctypes pointer
tmp = np.ascontiguousarray(target_slice)
result = self.storage_backend.get(
key=key, target_location=tmp.ctypes.data, target_size=per_block_bytes
)
if result is not None and result >= 0:
self.routing_host_view.buffer[start:end] = tmp
else:
self.storage_backend.get(
key=key, target_location=target_slice.ctypes.data, target_size=per_block_bytes
)

logger.debug(f"[R3] Read {len(task_keys)} routing blocks from storage")
except Exception as e:
logger.warning(f"[R3] Failed to read routing from storage: {e}")

def _init_control(self):
dp_rank = self.local_data_parallel_id
tp_rank = self.rank
Expand Down Expand Up @@ -809,6 +1012,9 @@ def read_storage_task(self, task: ReadStorageTask):
logger.info(
f"Successfully read {len(valid_gpu_block_ids)} blocks from cache storage for task {task.task_id}"
)
# Read routing data from storage for matched blocks
matched_keys = task.keys[: len(valid_gpu_block_ids)]
self._read_routing_from_storage(matched_keys, valid_gpu_block_ids)
except Exception as e:
logger.error(
f"Failed to read cache for task {task.task_id}, error: {e}, traceback: {traceback.format_exc()}"
Expand Down Expand Up @@ -1000,6 +1206,9 @@ def write_back_storage_task(self, task: WriteStorageTask):
logger.info(
f"Successfully wrote {write_block_num} blocks to cache storage for task {task.task_id}"
)
# Write routing data to storage (shares dedup with KVCache)
remaining_keys = task.keys[match_block_num:]
self._write_routing_to_storage(remaining_keys, gpu_block_ids)
except Exception as e:
logger.error(f"Error in write back storage task: {e}, traceback:{traceback.format_exc()}")
gpu_block_ids = []
Expand Down Expand Up @@ -1375,6 +1584,10 @@ def _transfer_data(
0,
)

# Routing: routing_host_buffer → routing_swap_buffer
if "routing" in self.aux_data_specs:
self._swap_routing(gpu_block_ids, cpu_block_ids, "to_cpu")

elif event_type.value == CacheStatus.SWAP2GPU.value:
swap_cache_all_layers(
self.gpu_cache_k_tensors,
Expand Down Expand Up @@ -1413,6 +1626,11 @@ def _transfer_data(
self.device,
1,
)

# Routing: routing_swap_buffer → routing_host_buffer
if "routing" in self.aux_data_specs:
self._swap_routing(gpu_block_ids, cpu_block_ids, "to_gpu")

else:
logger.warning(
f"transfer data: Get unexpected event type {event_type}, only SWAP2CPU and SWAP2GPU supported"
Expand Down
8 changes: 8 additions & 0 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ def launch_cache_manager(
else:
storage_arg_str = " "

# Compute routing replay args for CTM — single JSON arg
routing_replay_config = getattr(self.config, "routing_replay_config", None)
if routing_replay_config is not None and routing_replay_config.enable_routing_replay:
routing_arg_str = f" --routing_replay_config '{routing_replay_config.to_json_string()}'"
else:
routing_arg_str = ""

if self.cache_config.num_cpu_blocks > 0 or self.cache_config.kvcache_storage_backend:
for i in range(tensor_parallel_size):
launch_cmd = (
Expand Down Expand Up @@ -324,6 +331,7 @@ def launch_cache_manager(
+ f" --write_policy {cache_config.write_policy}"
+ f" --max_model_len {self.config.model_config.max_model_len}"
+ f" --model_path {self.config.model_config.model}"
+ routing_arg_str
+ f" >{log_dir}/launch_cache_transfer_manager_{int(device_ids[i])}.log 2>&1"
)
logger.info(f"Launch cache transfer manager, command:{launch_cmd}")
Expand Down
Loading
Loading