Skip to content

Commit 82a5b80

Browse files
kevincheng2claude
andcommitted
[KVCache][Engine][OP] cache manager v1 refactor, engine fixes and new unit tests
## Motivation cache_manager_storage_transfer 分支持续迭代,本次提交包含多个改进方向: 1. BlockPool 分配逻辑有性能问题且缺少边界处理 2. CacheController.free_cache 无条件清除 storage 可能误清数据 3. request.py 的 CacheSwapMetadata 使用字符串表示 CacheLevel,类型不安全 4. common_engine / resource_manager / worker 中多模态判断入口不统一 5. radix_tree 中 select_blocks_for_backup 已无使用方,可清理 6. gpu_model_runner 多处可优化:异步写入、新注意力后端、MLA/DSA slot_mapping 7. MACA 平台 ops.py 中 F811 重定义警告 8. 缺少 cache manager v1 核心逻辑的单元测试 ## Modifications - cache_manager/ops.py: 移除 MACA 平台对 swap_cache_per_layer/async 的无效 import(F811) - cache_manager/v1/block_pool.py: allocate 增加 num_blocks==0 提前返回;批量切片替换循环 pop - cache_manager/v1/cache_controller.py: free_cache 新增 clear_storage 参数,默认 False - cache_manager/v1/radix_tree.py: 删除废弃的 select_blocks_for_backup 方法 - engine/request.py: CacheSwapMetadata src/dst_type 改用 CacheLevel 枚举;日志接口统一 - engine/common_engine.py: enable_mm 判断统一为 enable_mm_runtime;V1 cache pause 走 reset_cache;新增 model_loader_extra_config / enable_flashinfer_allreduce_fusion 透传; 修复 is_end 时残余 token_ids 未返回的边界 case - engine/sched/resource_manager_v1.py: enable_mm 统一;block 释放去掉冗余条件分支 - model_executor/forward_meta.py: ForwardMeta 用 slot_mapping 替换 mask_encoder_batch; XPUForwardMeta 新增 is_speculative 字段 - worker/gpu_model_runner.py: 引入 DSA/MLA 注意力后端;image processor 通用化; share_inputs 改为 async_set_value;新增 _compute_position_ids_and_slot_mapping; TBO 双缓冲支持;修复拼写错误;去掉 num_blocks > 40000 硬限制 - worker/worker_process.py: enable_mm_runtime 统一;参数顺序整理 - tests/cache_manager/v1/: 新增 test_cache_manager / test_cache_utils / test_radix_tree 单元测试,覆盖 offload/load、pending backup、hash、LayerDoneCounter、complete_swap 等 ## Usage or Command ```bash # 运行新增单元测试 source .venv/py310/bin/activate python -m pytest tests/cache_manager/v1/ -vv -s # 启动服务(单机) cd baidu/FastDeploy bash run.sh ``` Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 0e288b5 commit 82a5b80

13 files changed

Lines changed: 810 additions & 199 deletions

File tree

fastdeploy/cache_manager/ops.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,6 @@ def get_peer_mem_addr(*args, **kwargs):
4949
raise RuntimeError("CUDA no need of get_peer_mem_addr!")
5050

5151
elif current_platform.is_maca():
52-
from fastdeploy.model_executor.ops.gpu import (
53-
swap_cache_per_layer, # 单层 KV cache 换入算子(同步)
54-
)
55-
from fastdeploy.model_executor.ops.gpu import (
56-
swap_cache_per_layer_async, # 单层 KV cache 换入算子(异步,无强制 sync)
57-
)
5852
from fastdeploy.model_executor.ops.gpu import ( # get_output_kv_signal,; ipc_sent_key_value_cache_by_remote_ptr_block_sync,
5953
cuda_host_alloc,
6054
cuda_host_free,

fastdeploy/cache_manager/v1/block_pool.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,19 @@ def allocate(self, num_blocks: int) -> Optional[List[int]]:
6565
List of allocated block indices if successful, None if not enough blocks
6666
"""
6767
with self._lock:
68+
if num_blocks == 0:
69+
return []
70+
6871
if num_blocks > len(self._free_blocks):
6972
logger.warning(
7073
f"BlockPool.allocate failed: not enough blocks, "
7174
f"requested={num_blocks}, available={len(self._free_blocks)}"
7275
)
7376
return None
7477

75-
allocated = []
76-
for _ in range(num_blocks):
77-
block_idx = self._free_blocks.pop(0)
78-
self._used_blocks.add(block_idx)
79-
allocated.append(block_idx)
78+
allocated = self._free_blocks[-num_blocks:]
79+
del self._free_blocks[-num_blocks:]
80+
self._used_blocks.update(allocated)
8081

8182
return allocated
8283

fastdeploy/cache_manager/v1/cache_controller.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ def reset_cache(self) -> bool:
10771077
except Exception:
10781078
return False
10791079

1080-
def free_cache(self) -> bool:
1080+
def free_cache(self, clear_storage: bool = False) -> bool:
10811081
"""
10821082
Free all cache storage (GPU memory + CPU pinned memory + storage).
10831083
@@ -1098,7 +1098,8 @@ def free_cache(self) -> bool:
10981098
self._free_host_cache()
10991099

11001100
# Clear storage
1101-
self._clear_storage()
1101+
if clear_storage:
1102+
self._clear_storage()
11021103

11031104
return True
11041105
except Exception:

fastdeploy/cache_manager/v1/radix_tree.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -590,40 +590,6 @@ def complete_swap_to_device(
590590

591591
return gpu_block_ids
592592

593-
def select_blocks_for_backup(
594-
self,
595-
needed_num: int,
596-
) -> List[BlockNode]:
597-
"""
598-
Select blocks to backup from evictable device nodes.
599-
600-
Selects the coldest blocks (LRU) from _evictable_device that don't
601-
already have a backup.
602-
603-
Args:
604-
needed_num: Number of blocks to select for backup
605-
606-
Returns:
607-
List of BlockNode objects to backup
608-
"""
609-
if needed_num <= 0:
610-
return []
611-
612-
with self._lock:
613-
# Find candidates: evictable device nodes without backup
614-
candidates = []
615-
for node_id, (_, node) in self._evictable_device.items():
616-
if not node.backuped:
617-
candidates.append(node)
618-
619-
if not candidates:
620-
return []
621-
622-
# Sort by last_access_time (LRU - oldest first)
623-
candidates.sort(key=lambda n: n.last_access_time)
624-
625-
return candidates[:needed_num]
626-
627593
def backup_blocks(
628594
self,
629595
nodes: List[BlockNode],

fastdeploy/engine/common_engine.py

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ def create_data_processor(self):
342342
self.cfg.limit_mm_per_prompt,
343343
self.cfg.mm_processor_kwargs,
344344
self.cfg.tool_parser,
345+
enable_mm_runtime=self.cfg.enable_mm_runtime,
345346
)
346347
self.data_processor = self.input_processor.create_processor()
347348
self.mm_max_tokens_per_item = self.data_processor.get_mm_max_tokens_per_item(
@@ -611,7 +612,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
611612
LoggingEventName.RESCHEDULED_INFERENCE_START, task.request_id, getattr(task, "user", "")
612613
)
613614
if not is_prefill:
614-
if not self.cfg.model_config.enable_mm:
615+
if not self.cfg.enable_mm_runtime:
615616
self.update_requests_chunk_size(tasks)
616617
else:
617618
self.update_mm_requests_chunk_size(tasks)
@@ -1260,7 +1261,7 @@ def _insert_zmq_task_to_scheduler(self):
12601261
while self.running:
12611262
try:
12621263
block = True if len(added_requests) == 0 else False
1263-
if not self.cfg.model_config.enable_mm:
1264+
if not self.cfg.enable_mm_runtime:
12641265
err, data = self.recv_request_server.receive_json_once(block)
12651266
else:
12661267
err, data = self.recv_request_server.receive_pyobj_once(block)
@@ -1482,22 +1483,25 @@ def _control_pause(self, control_request: ControlRequest):
14821483
self._send_error_response(req.request_id, "Request is aborted since engine is paused.")
14831484
self.scheduler.reset()
14841485

1485-
# pause cache transfer
1486-
if self.cfg.cache_config.num_cpu_blocks > 0 or self.cfg.cache_config.kvcache_storage_backend:
1487-
self.llm_logger.info("Start to pause cache transfer.")
1488-
pause_transfer_request = ControlRequest(
1489-
request_id=f"{control_request.request_id}_pause_transfer", method="pause"
1490-
)
1491-
self.cache_task_queue.put_transfer_task((CacheStatus.CTRL, pause_transfer_request))
1492-
# Wait for cache_transfer responses
1493-
asyncio.run(
1494-
self._wait_for_control_responses(
1495-
f"{pause_transfer_request.request_id}", 60, executors=["cache_transfer"]
1486+
if envs.ENABLE_V1_KVCACHE_MANAGER:
1487+
self.resource_manager.cache_manager.reset_cache()
1488+
else:
1489+
# pause cache transfer
1490+
if self.cfg.cache_config.num_cpu_blocks > 0 or self.cfg.cache_config.kvcache_storage_backend:
1491+
self.llm_logger.info("Start to pause cache transfer.")
1492+
pause_transfer_request = ControlRequest(
1493+
request_id=f"{control_request.request_id}_pause_transfer", method="pause"
14961494
)
1497-
)
1498-
self.llm_logger.info("Successfully paused cache transfer.")
1495+
self.cache_task_queue.put_transfer_task((CacheStatus.CTRL, pause_transfer_request))
1496+
# Wait for cache_transfer responses
1497+
asyncio.run(
1498+
self._wait_for_control_responses(
1499+
f"{pause_transfer_request.request_id}", 60, executors=["cache_transfer"]
1500+
)
1501+
)
1502+
self.llm_logger.info("Successfully paused cache transfer.")
14991503

1500-
self.resource_manager.cache_manager.reset()
1504+
self.resource_manager.cache_manager.reset()
15011505
self.llm_logger.info("Successfully paused request generation.")
15021506
return None
15031507

@@ -1791,10 +1795,14 @@ def _control_sleep(self, control_request: ControlRequest):
17911795
executors.add("worker")
17921796
if "kv_cache" in tags:
17931797
executors.add("worker")
1794-
if self.cfg.cache_config.num_cpu_blocks > 0 or self.cfg.cache_config.kvcache_storage_backend:
1795-
executors.add("cache_transfer")
1796-
if self.cfg.cache_config.enable_prefix_caching:
1797-
self.resource_manager.cache_manager.reset()
1798+
if envs.ENABLE_V1_KVCACHE_MANAGER:
1799+
if self.cfg.cache_config.enable_prefix_caching:
1800+
self.resource_manager.cache_manager.reset_cache()
1801+
else:
1802+
if self.cfg.cache_config.num_cpu_blocks > 0 or self.cfg.cache_config.kvcache_storage_backend:
1803+
executors.add("cache_transfer")
1804+
if self.cfg.cache_config.enable_prefix_caching:
1805+
self.resource_manager.cache_manager.reset()
17981806

17991807
# Dispatch sleep request to executors
18001808
self.llm_logger.info(f"Dispatch sleep request to executors: {list(executors)}")
@@ -1989,6 +1997,11 @@ def _decode_token(self, token_ids, req_id, is_end):
19891997
token_ids = cum_tokens[prefix_offset:read_offset]
19901998
else:
19911999
token_ids = []
2000+
2001+
if is_end and delta_text == "" and len(cum_tokens) > 0:
2002+
read_offset = self.data_processor.decode_status[req_id][1]
2003+
token_ids = cum_tokens[read_offset:]
2004+
19922005
if is_end:
19932006
del self.data_processor.decode_status[req_id]
19942007
return delta_text, token_ids
@@ -2444,7 +2457,7 @@ def _setting_environ_variables(self):
24442457
if self.cfg.scheduler_config.splitwise_role == "prefill":
24452458
variables["FLAGS_fmt_write_cache_completed_signal"] = 1
24462459

2447-
if self.cfg.model_config.enable_mm:
2460+
if self.cfg.enable_mm_runtime:
24482461
variables["FLAGS_max_partition_size"] = 1024
24492462

24502463
command_prefix = ""
@@ -2545,6 +2558,7 @@ def _start_worker_service(self):
25452558
f" --early_stop_config '{self.cfg.early_stop_config.to_json_string()}'"
25462559
f" --reasoning_parser {self.cfg.structured_outputs_config.reasoning_parser}"
25472560
f" --load_choices {self.cfg.load_config.load_choices}"
2561+
f" --model_loader_extra_config '{json.dumps(self.cfg.load_config.model_loader_extra_config)}'"
25482562
f" --plas_attention_config '{self.cfg.plas_attention_config.to_json_string()}'"
25492563
f" --ips {ips}"
25502564
f" --cache-transfer-protocol {self.cfg.cache_config.cache_transfer_protocol}"
@@ -2577,6 +2591,7 @@ def _start_worker_service(self):
25772591
"moe_gate_fp32": self.cfg.model_config.moe_gate_fp32,
25782592
"enable_entropy": self.cfg.model_config.enable_entropy,
25792593
"enable_overlap_schedule": self.cfg.scheduler_config.enable_overlap_schedule,
2594+
"enable_flashinfer_allreduce_fusion": self.cfg.parallel_config.enable_flashinfer_allreduce_fusion,
25802595
}
25812596
for worker_flag, value in worker_store_true_flag.items():
25822597
if value:

fastdeploy/engine/request.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@
3434
from typing_extensions import TypeVar
3535

3636
from fastdeploy import envs
37-
from fastdeploy.cache_manager.v1.metadata import CacheSwapMetadata, PendingPrefetch
37+
from fastdeploy.cache_manager.v1.metadata import (
38+
CacheLevel,
39+
CacheSwapMetadata,
40+
PendingPrefetch,
41+
)
3842
from fastdeploy.engine.pooling_params import PoolingParams
3943
from fastdeploy.engine.sampling_params import SamplingParams
4044
from fastdeploy.entrypoints.openai.protocol import (
@@ -43,7 +47,11 @@
4347
StructuralTagResponseFormat,
4448
ToolCall,
4549
)
46-
from fastdeploy.utils import data_processor_logger
50+
from fastdeploy.logger.request_logger import (
51+
RequestLogLevel,
52+
log_request,
53+
log_request_error,
54+
)
4755
from fastdeploy.worker.output import (
4856
LogprobsLists,
4957
PromptLogprobs,
@@ -250,7 +258,7 @@ def prompt_hashes(self) -> list[str]:
250258
return self._prompt_hashes
251259

252260
@property
253-
def match_result(self) -> MatchResult:
261+
def match_result(self) -> Optional[MatchResult]:
254262
return self._match_result
255263

256264
@match_result.setter
@@ -364,15 +372,13 @@ def from_generic_request(
364372
), "The parameter `raw_request` is not supported now, please use completion api instead."
365373
for key, value in req.metadata.items():
366374
setattr(request, key, value)
367-
from fastdeploy.utils import api_server_logger
368-
369-
api_server_logger.warning("The parameter metadata is obsolete.")
375+
log_request(RequestLogLevel.STAGES, message="The parameter metadata is obsolete.")
370376

371377
return request
372378

373379
@classmethod
374380
def from_dict(cls, d: dict):
375-
data_processor_logger.debug(f"{d}")
381+
log_request(RequestLogLevel.FULL, message="{request}", request=d)
376382
sampling_params: SamplingParams = None
377383
pooling_params: PoolingParams = None
378384
metrics: RequestMetrics = None
@@ -403,8 +409,11 @@ def from_dict(cls, d: dict):
403409
ImagePosition(**mm_pos) if not isinstance(mm_pos, ImagePosition) else mm_pos
404410
)
405411
except Exception as e:
406-
data_processor_logger.error(
407-
f"Convert mm_positions to ImagePosition error: {e}, {str(traceback.format_exc())}"
412+
log_request_error(
413+
message="request[{request_id}] Convert mm_positions to ImagePosition error: {error}, {traceback}",
414+
request_id=d.get("request_id"),
415+
error=str(e),
416+
traceback=traceback.format_exc(),
408417
)
409418
return cls(
410419
request_id=d["request_id"],
@@ -640,8 +649,8 @@ def append_swap_metadata(self, metadata: List[CacheSwapMetadata]):
640649
self.cache_swap_metadata = CacheSwapMetadata(
641650
src_block_ids=meta.src_block_ids,
642651
dst_block_ids=meta.dst_block_ids,
643-
src_type="host",
644-
dst_type="device",
652+
src_type=CacheLevel.HOST,
653+
dst_type=CacheLevel.DEVICE,
645654
hash_values=meta.hash_values,
646655
)
647656

@@ -655,8 +664,8 @@ def append_evict_metadata(self, metadata: List[CacheSwapMetadata]):
655664
self.cache_evict_metadata = CacheSwapMetadata(
656665
src_block_ids=meta.src_block_ids,
657666
dst_block_ids=meta.dst_block_ids,
658-
src_type="device",
659-
dst_type="host",
667+
src_type=CacheLevel.HOST,
668+
dst_type=CacheLevel.DEVICE,
660669
hash_values=meta.hash_values,
661670
)
662671

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,11 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
245245
self.need_block_num_map = dict()
246246

247247
self.encoder_cache = None
248-
if config.model_config.enable_mm and config.cache_config.max_encoder_cache > 0:
248+
if config.enable_mm_runtime and config.cache_config.max_encoder_cache > 0:
249249
self.encoder_cache = EncoderCacheManager(config.cache_config.max_encoder_cache)
250250

251251
self.processor_cache = None
252-
if config.model_config.enable_mm and config.cache_config.max_processor_cache > 0:
252+
if config.enable_mm_runtime and config.cache_config.max_processor_cache > 0:
253253
max_processor_cache_in_bytes = int(config.cache_config.max_processor_cache * 1024 * 1024 * 1024)
254254
self.processor_cache = ProcessorCacheManager(max_processor_cache_in_bytes)
255255

@@ -714,7 +714,7 @@ def _get_num_new_tokens(self, request, token_budget):
714714
num_new_tokens = token_budget // self.config.cache_config.block_size * self.config.cache_config.block_size
715715
request.with_image = False
716716

717-
if not self.config.model_config.enable_mm:
717+
if not self.config.enable_mm_runtime:
718718
return num_new_tokens
719719

720720
inputs = request.multimodal_inputs
@@ -1948,13 +1948,7 @@ def _free_blocks(self, request: Request):
19481948
request.block_tables[request.num_cached_blocks :], request.request_id
19491949
)
19501950
else:
1951-
if self.config.cache_config.enable_prefix_caching:
1952-
self.cache_manager.release_block_ids(request)
1953-
self.cache_manager.recycle_gpu_blocks(
1954-
request.block_tables[request.num_cached_blocks :], request.request_id
1955-
)
1956-
else:
1957-
self.cache_manager.recycle_gpu_blocks(request.block_tables, request.request_id)
1951+
self.cache_manager.recycle_gpu_blocks(request.block_tables, request.request_id)
19581952
request.block_tables = []
19591953

19601954
if request.request_id in self.using_extend_tables_req_id:

fastdeploy/model_executor/forward_meta.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ class ForwardMeta:
164164

165165
# for mla & dsa
166166
position_ids: Optional[paddle.Tensor] = None
167-
mask_encoder_batch: Optional[paddle.Tensor] = None
167+
# for kvcache slot
168+
slot_mapping: Optional[paddle.Tensor] = None
168169

169170
real_bsz: int = 0
170171

@@ -279,6 +280,7 @@ class XPUForwardMeta(ForwardMeta):
279280
hidden_states: Optional[paddle.Tensor] = None
280281

281282
is_draft: bool = False
283+
is_speculative: bool = False
282284
# max bs
283285
max_num_seqs: int = 0
284286

0 commit comments

Comments
 (0)