-
Notifications
You must be signed in to change notification settings - Fork 749
[RL][Feature] R3 Support CPU PrefixCache #7099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0c68b77
ec4e8e1
822a12d
0bb1a7a
4c014ea
306ce3e
a9e1ac4
3fd1d83
f658b74
545e33c
3b12111
3cce78b
3c9fbd8
1e3983b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import threading | ||
| import time | ||
| import traceback | ||
| import weakref | ||
| from typing import List | ||
|
|
||
| import numpy as np | ||
|
|
@@ -48,7 +49,7 @@ | |
| FileStore, | ||
| MooncakeStore, | ||
| ) | ||
| from fastdeploy.config import CacheConfig, SpeculativeConfig | ||
| from fastdeploy.config import CacheConfig, RoutingReplayConfig, SpeculativeConfig | ||
| from fastdeploy.engine.request import ControlRequest, ControlResponse | ||
| from fastdeploy.inter_communicator import EngineCacheQueue, IPCSignal, KVCacheStatus | ||
| from fastdeploy.inter_communicator.fmq import FMQ | ||
|
|
@@ -129,7 +130,11 @@ def parse_args(): | |
| ) | ||
| parser.add_argument("--model_path", type=str, help="The path of model") | ||
|
|
||
| # Routing replay (R3) — single JSON arg, mirrors SpeculativeConfig pattern | ||
| parser.add_argument("--routing_replay_config", type=json.loads, default="{}", help="Routing replay config JSON") | ||
|
|
||
| args = parser.parse_args() | ||
| args.routing_replay_config = RoutingReplayConfig(args.routing_replay_config) | ||
| return args | ||
|
|
||
|
|
||
|
|
@@ -241,6 +246,25 @@ def __init__(self, args): | |
| self._init_cpu_cache() | ||
| if self.storage_backend_type is not None: | ||
| self._init_storage(args) | ||
|
|
||
| # Initialize auxiliary data specs (e.g., routing replay) | ||
| self.aux_data_specs = {} | ||
| self.routing_host_view = None | ||
| self.routing_swap_buffer = None | ||
| self.routing_replay_config = args.routing_replay_config | ||
| self.engine_worker_queue_port = args.engine_worker_queue_port | ||
| self._init_routing_aux_data() | ||
|
|
||
| # Register finalizer to release routing SharedMemory on process exit. | ||
| # Must use a static method — callback must NOT hold a reference to self, | ||
| # otherwise the object can never be GC'd and the finalizer won't fire. | ||
| self._finalizer = weakref.finalize( | ||
| self, | ||
| CacheTransferManager._cleanup_routing_resources, | ||
| self.routing_swap_buffer, | ||
| self.routing_host_view, | ||
| ) | ||
|
|
||
| self._init_control() | ||
|
|
||
| cache_task_broadcast_data = np.zeros(shape=[1], dtype=np.int32) | ||
|
|
@@ -307,6 +331,185 @@ def __init__(self, args): | |
| ) | ||
| self.cache_transfer_inited_signal.value[self.rank] = 1 | ||
|
|
||
| def _init_routing_aux_data(self): | ||
| """Initialize routing auxiliary data buffers for swap sync.""" | ||
| routing_replay_config = self.routing_replay_config | ||
| if not routing_replay_config.enable_routing_replay: | ||
| return | ||
|
|
||
| try: | ||
| from fastdeploy.cache_manager.cache_data import AuxBlockDataSpec | ||
| from fastdeploy.cache_manager.routing_cache_manager import ( | ||
| RoutingHostBufferView, | ||
| RoutingSwapBuffer, | ||
| ) | ||
|
|
||
| num_moe_layers = routing_replay_config.num_moe_layers | ||
| moe_top_k = routing_replay_config.moe_top_k | ||
| routing_dtype = routing_replay_config.routing_dtype | ||
|
|
||
| if num_moe_layers == 0 or moe_top_k == 0: | ||
| return | ||
|
|
||
| spec = AuxBlockDataSpec( | ||
| name="routing", | ||
| num_layers=num_moe_layers, | ||
| per_token_size=moe_top_k, | ||
| block_size=self.block_size, | ||
| dtype=routing_dtype, | ||
| ) | ||
|
|
||
| # Create routing swap buffer (for CPU blocks). | ||
| # Only rank 0 needs it — _swap_routing() only runs on rank 0. | ||
| if self.num_cpu_blocks > 0 and self.rank == 0: | ||
| dp_suffix = str(self.engine_worker_queue_port) | ||
| self.routing_swap_buffer = RoutingSwapBuffer( | ||
This comment was marked as outdated.
Sorry, something went wrong. |
||
| num_cpu_blocks=self.num_cpu_blocks, | ||
| block_size=self.block_size, | ||
| num_moe_layers=num_moe_layers, | ||
| top_k=moe_top_k, | ||
| dtype=routing_dtype, | ||
| dp_suffix=dp_suffix, | ||
| ) | ||
| spec.swap_buffer = self.routing_swap_buffer | ||
This comment was marked as outdated.
Sorry, something went wrong. |
||
|
|
||
| # Attach to routing host buffer (SharedMemory created by Engine) | ||
| dp_suffix = str(self.engine_worker_queue_port) | ||
| shm_name = f"routing_host_buffer.{dp_suffix}" | ||
| max_num_kv_tokens = self.num_gpu_blocks * self.block_size | ||
| shape = (max_num_kv_tokens, num_moe_layers, moe_top_k) | ||
| try: | ||
| self.routing_host_view = RoutingHostBufferView(shape=shape, dtype=routing_dtype, shm_name=shm_name) | ||
| logger.info(f"[R3] CTM attached to RoutingHostBuffer: {shm_name}") | ||
| except FileNotFoundError: | ||
| logger.warning(f"[R3] CTM RoutingHostBuffer {shm_name} not found") | ||
|
|
||
| self.aux_data_specs["routing"] = spec | ||
| logger.info(f"[R3] CTM registered routing aux data: layers={num_moe_layers}, top_k={moe_top_k}") | ||
|
|
||
| except Exception as e: | ||
This comment was marked as outdated.
Sorry, something went wrong. |
||
| logger.warning(f"[R3] CTM failed to init routing aux data: {e}") | ||
This comment was marked as outdated.
Sorry, something went wrong.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Review 关于 routing_swap_buffer.close() 的建议合理,可以在 CTM 进程退出时加上清理。但有两点需要补充:
|
||
|
|
||
| @staticmethod | ||
| def _cleanup_routing_resources(routing_swap_buffer, routing_host_view): | ||
| """Release routing SharedMemory on process exit. Called by weakref.finalize.""" | ||
| if routing_swap_buffer is not None: | ||
| routing_swap_buffer.close() | ||
| if routing_host_view is not None: | ||
| routing_host_view.close() | ||
|
|
||
| def _swap_routing(self, gpu_block_ids, cpu_block_ids, direction): | ||
| """ | ||
| Swap routing data between routing_host_buffer and routing_swap_buffer. | ||
| Pure CPU-to-CPU numpy memcpy, no GPU DMA. | ||
| Only rank 0 performs this (routing buffers are cross-rank SharedMemory). | ||
| """ | ||
| if self.routing_host_view is None or self.routing_swap_buffer is None: | ||
| logger.warning( | ||
| f"[R3] _swap_routing skipped: host_view={self.routing_host_view is not None}, " | ||
| f"swap_buffer={self.routing_swap_buffer is not None}" | ||
| ) | ||
| return | ||
| if self.rank > 0: | ||
| return | ||
| bs = self.block_size | ||
| for gpu_bid, cpu_bid in zip(gpu_block_ids, cpu_block_ids): | ||
This comment was marked as outdated.
Sorry, something went wrong. |
||
| gpu_start = gpu_bid * bs | ||
| gpu_end = gpu_start + bs | ||
| cpu_start = cpu_bid * bs | ||
| cpu_end = cpu_start + bs | ||
| if direction == "to_cpu": | ||
| self.routing_swap_buffer.buffer[cpu_start:cpu_end] = self.routing_host_view.buffer[gpu_start:gpu_end] | ||
| elif direction == "to_gpu": | ||
| self.routing_host_view.buffer[gpu_start:gpu_end] = self.routing_swap_buffer.buffer[cpu_start:cpu_end] | ||
| else: | ||
| raise ValueError(f"[R3] _swap_routing: unknown direction '{direction}', expected 'to_cpu' or 'to_gpu'") | ||
| logger.info( | ||
| f"[R3] _swap_routing {direction}: {len(gpu_block_ids)} blocks, " | ||
| f"gpu_ids={gpu_block_ids[:3]}{'...' if len(gpu_block_ids) > 3 else ''}, " | ||
| f"cpu_ids={cpu_block_ids[:3]}{'...' if len(cpu_block_ids) > 3 else ''}" | ||
| ) | ||
|
|
||
| def _write_routing_to_storage(self, task_keys, gpu_block_ids): | ||
| """ | ||
| Write routing data from routing_host_buffer to storage backend. | ||
| Only for mooncake/file backends; only tp_rank=0 writes routing. | ||
| """ | ||
| if self.routing_host_view is None or self.rank != 0: | ||
| return | ||
| if self.storage_backend_type not in ("mooncake", "file"): | ||
| return | ||
|
|
||
| try: | ||
| spec = self.aux_data_specs.get("routing") | ||
| if spec is None or not spec.enabled: | ||
| return | ||
|
|
||
| bs = self.block_size | ||
| routing_keys = [] | ||
| routing_ptrs = [] | ||
| routing_sizes = [] | ||
| per_block_bytes = bs * spec.num_layers * spec.per_token_size * np.dtype(spec.dtype).itemsize | ||
|
gongshaotian marked this conversation as resolved.
|
||
|
|
||
| for block_hash, gpu_bid in zip(task_keys, gpu_block_ids): | ||
| key = spec.get_storage_key(self.key_prefix, block_hash, self.rank) | ||
| start = gpu_bid * bs | ||
| end = start + bs | ||
| block_data = self.routing_host_view.buffer[start:end] | ||
| if not block_data.flags["C_CONTIGUOUS"]: | ||
|
gongshaotian marked this conversation as resolved.
|
||
| block_data = np.ascontiguousarray(block_data) | ||
| routing_keys.append(key) | ||
| routing_ptrs.append(block_data.ctypes.data) | ||
| routing_sizes.append(per_block_bytes) | ||
|
|
||
| if routing_keys: | ||
| self.storage_backend.batch_set( | ||
| keys=routing_keys, target_locations=routing_ptrs, target_sizes=routing_sizes | ||
| ) | ||
| logger.debug(f"[R3] Wrote {len(routing_keys)} routing blocks to storage") | ||
| except Exception as e: | ||
| logger.warning(f"[R3] Failed to write routing to storage: {e}") | ||
|
|
||
| def _read_routing_from_storage(self, task_keys, gpu_block_ids): | ||
| """ | ||
| Read routing data from storage backend into routing_host_buffer. | ||
| Only for mooncake/file backends; only tp_rank=0 reads routing. | ||
| """ | ||
| if self.routing_host_view is None or self.rank != 0: | ||
| return | ||
| if self.storage_backend_type not in ("mooncake", "file"): | ||
| return | ||
|
|
||
| try: | ||
| spec = self.aux_data_specs.get("routing") | ||
| if spec is None or not spec.enabled: | ||
| return | ||
|
|
||
| bs = self.block_size | ||
| per_block_bytes = bs * spec.num_layers * spec.per_token_size * np.dtype(spec.dtype).itemsize | ||
|
|
||
| for block_hash, gpu_bid in zip(task_keys, gpu_block_ids): | ||
| key = spec.get_storage_key(self.key_prefix, block_hash, self.rank) | ||
| start = gpu_bid * bs | ||
| end = start + bs | ||
| target_slice = self.routing_host_view.buffer[start:end] | ||
| if not target_slice.flags["C_CONTIGUOUS"]: | ||
| # Need contiguous target for ctypes pointer | ||
| tmp = np.ascontiguousarray(target_slice) | ||
| result = self.storage_backend.get( | ||
| key=key, target_location=tmp.ctypes.data, target_size=per_block_bytes | ||
| ) | ||
| if result is not None and result >= 0: | ||
| self.routing_host_view.buffer[start:end] = tmp | ||
| else: | ||
| self.storage_backend.get( | ||
| key=key, target_location=target_slice.ctypes.data, target_size=per_block_bytes | ||
| ) | ||
|
|
||
| logger.debug(f"[R3] Read {len(task_keys)} routing blocks from storage") | ||
| except Exception as e: | ||
| logger.warning(f"[R3] Failed to read routing from storage: {e}") | ||
|
|
||
| def _init_control(self): | ||
| dp_rank = self.local_data_parallel_id | ||
| tp_rank = self.rank | ||
|
|
@@ -809,6 +1012,9 @@ def read_storage_task(self, task: ReadStorageTask): | |
| logger.info( | ||
| f"Successfully read {len(valid_gpu_block_ids)} blocks from cache storage for task {task.task_id}" | ||
| ) | ||
| # Read routing data from storage for matched blocks | ||
| matched_keys = task.keys[: len(valid_gpu_block_ids)] | ||
| self._read_routing_from_storage(matched_keys, valid_gpu_block_ids) | ||
| except Exception as e: | ||
| logger.error( | ||
| f"Failed to read cache for task {task.task_id}, error: {e}, traceback: {traceback.format_exc()}" | ||
|
|
@@ -1000,6 +1206,9 @@ def write_back_storage_task(self, task: WriteStorageTask): | |
| logger.info( | ||
| f"Successfully wrote {write_block_num} blocks to cache storage for task {task.task_id}" | ||
| ) | ||
| # Write routing data to storage (shares dedup with KVCache) | ||
| remaining_keys = task.keys[match_block_num:] | ||
| self._write_routing_to_storage(remaining_keys, gpu_block_ids) | ||
| except Exception as e: | ||
| logger.error(f"Error in write back storage task: {e}, traceback:{traceback.format_exc()}") | ||
| gpu_block_ids = [] | ||
|
|
@@ -1375,6 +1584,10 @@ def _transfer_data( | |
| 0, | ||
| ) | ||
|
|
||
| # Routing: routing_host_buffer → routing_swap_buffer | ||
| if "routing" in self.aux_data_specs: | ||
| self._swap_routing(gpu_block_ids, cpu_block_ids, "to_cpu") | ||
|
|
||
| elif event_type.value == CacheStatus.SWAP2GPU.value: | ||
| swap_cache_all_layers( | ||
| self.gpu_cache_k_tensors, | ||
|
|
@@ -1413,6 +1626,11 @@ def _transfer_data( | |
| self.device, | ||
| 1, | ||
| ) | ||
|
|
||
| # Routing: routing_swap_buffer → routing_host_buffer | ||
| if "routing" in self.aux_data_specs: | ||
| self._swap_routing(gpu_block_ids, cpu_block_ids, "to_gpu") | ||
|
|
||
| else: | ||
| logger.warning( | ||
| f"transfer data: Get unexpected event type {event_type}, only SWAP2CPU and SWAP2GPU supported" | ||
|
|
||
This comment was marked as outdated.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.