Skip to content

[KVCache][Feature] Implement Cache Manager V1 Storage Transfer and Prefetch (2/n)#7529

Open
kevincheng2 wants to merge 37 commits into
PaddlePaddle:developfrom
kevincheng2:feature/cache_manager_storage_transfer
Open

[KVCache][Feature] Implement Cache Manager V1 Storage Transfer and Prefetch (2/n)#7529
kevincheng2 wants to merge 37 commits into
PaddlePaddle:developfrom
kevincheng2:feature/cache_manager_storage_transfer

Conversation

@kevincheng2
Copy link
Copy Markdown
Collaborator

@kevincheng2 kevincheng2 commented Apr 21, 2026

Motivation

本 PR 是 Cache Manager V1 系列的第二部分,基于 #7097 之上构建。

引入了 Cache Manager V1 的 storage transfer 子系统,主要能力包括:

  • KV cache 从 GPU/CPU 卸载到外部存储(Mooncake、RDMA、IPC)
  • 异步从外部存储预取 KV cache 回设备
  • D2H → Storage 链式传输,实现多级缓存无缝驱逐
  • 基于 ZMQ 的 Scheduler 与 Worker 之间的预取 pipeline

Modifications

核心模块

  • fastdeploy/cache_manager/v1/storage/

    • staging_manager.py:新增 StagingManager,管理异步存储传输队列及预取任务
    • base.py:更新 StorageConnector 基类,增加 prefetch/backup 接口
    • mooncake/connector.py:Mooncake 存储连接器,支持 prefetch/backup
  • fastdeploy/cache_manager/v1/transfer_manager.py:重构以支持多级传输(GPU↔CPU↔Storage),将 TransferManagerStagingManager 分离

  • fastdeploy/cache_manager/v1/cache_controller.py:新增存储预取/备份编排逻辑及 D2H→Storage 链式传输

  • fastdeploy/cache_manager/v1/cache_manager.py:集成存储传输 API

  • fastdeploy/cache_manager/v1/block_pool.py:支持存储层级 block

Engine & Worker 集成

  • fastdeploy/engine/sched/resource_manager_v1.py:在资源管理器中集成存储预取触发逻辑
  • fastdeploy/worker/gpu_worker.py / worker_process.py:新增 ZMQ 预取 pipeline 处理
  • fastdeploy/inter_communicator/zmq_client.py:扩展 ZMQ client 支持预取消息
  • fastdeploy/engine/common_engine.py / request.py:存储传输元数据相关更新

测试

  • tests/cache_manager/v1/test_staging_manager.pyStagingManager 单元测试
  • tests/cache_manager/v1/test_transfer_manager.py:更新 transfer manager 测试
  • tests/cache_manager/v1/test_cache_manager.py:更新 cache manager 测试
  • tests/cache_manager/v1/test_cache_utils.py:更新 cache utils 测试

Usage or Command

启动包含 storage transfer 功能的服务示例:

# 编译算子(仅修改了 Python 代码,通常无需重新编译)
# bash build.sh 1 python false [90] 1

Checklist

  • Add at least a tag in the PR title.
  • Format your code, run pre-commit before commit.
  • Add unit tests.
  • Provide accuracy results.(不涉及模型前向代码修改,无需精度测试)
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch.

@paddle-bot
Copy link
Copy Markdown

paddle-bot Bot commented Apr 21, 2026

Thanks for your contribution!

PaddlePaddle-bot

This comment was marked as outdated.

@kevincheng2 kevincheng2 force-pushed the feature/cache_manager_storage_transfer branch from 0b34168 to 1757460 Compare May 7, 2026 08:02
PaddlePaddle-bot

This comment was marked as outdated.

@PaddlePaddle-bot
Copy link
Copy Markdown

PaddlePaddle-bot commented May 7, 2026

🤖 Paddle-CI-Agent | ci_status_monitor | 2026-05-13 18:19:38

CI报告基于以下代码生成(30分钟更新一次):


1 任务总览

⚠️ 1 个 Required 失败,2 个 Required 运行中Approval 审批未通过,需处理后方可合并。

总执行(rerun次数) 总任务 ✅ 通过 ❌ 失败 ⏳ 运行中 ⏸️ 等待中 跳过
38(0) 38 31 4 2 1 0

2 任务状态汇总

2.1 Required任务 : 7/10 通过

必选任务阻塞合并,失败需优先处理。

状态 任务 耗时 根因 修复建议 日志 重跑
Approval 9s PR问题:PR修改日志行为,需指定RD成员审批 请联系 xyxinyang/zyyzghb 在PR上 Approve Job -
xpu_4cards_case_test / run_xpu_4cards_cases - 运行中 - Job -
Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage - 运行中 - Job -
其余 7 个必选任务通过 - - - - -

2.2 可选任务 — 24/28 通过

可选任务不阻塞合并,失败仅供参考。

状态 任务 耗时 日志 重跑
Run iluvatar Tests / run_iluvatar_cases 10m34s Job -
Check PR Template 17s Job -
Trigger Jenkins for PR 25m47s Job -
⏸️ CI_HPU - - -
其余 24 个可选任务通过 - - -

3 失败详情(仅 required)

Approval — 代码规范(置信度: 高)

Approval

  • 状态: ❌ 失败
  • 错误类型: 代码规范
  • 置信度: 高
  • 根因摘要: PR新增大量logging调用,需FastDeploy指定RD成员审批
  • 分析器: 通用分析(fallback)

根因详情:
PR (#7529) 在 diff 中新增了大量 logger.info / logger.debug / logger.error 调用,触发了 FastDeploy 的 check_approval.sh 审批检查。该检查要求修改日志行为的 PR 必须获得指定 FastDeploy RD 成员(xyxinyang(zhouchong)zyyzghb(zhangyongyue))的 GitHub Approve,退出码 6 表示当前尚未获得所需审批。

关键日志:

Detected log modification in diff
0. You must have one FastDeploy RD (xyxinyang(zhouchong), zyyzghb(zhangyongyue)) approval
   for modifying logging behavior (.info/.debug/.error/log_request).
There are 1 approved errors.
Process completed with exit code 6.

修复建议:

  1. 请 PR 作者联系 xyxinyang(zhouchong)zyyzghb(zhangyongyue) 在本 PR 上进行 GitHub Review Approve 操作
  2. Approve 完成后重新触发 Approval workflow(Re-run failed jobs)

修复建议摘要: 请联系 xyxinyang/zyyzghb 在PR上 Approve

关联变更: PR 新增 StoragePrefetchStorageBackupMooncakeDistributedStore 等模块的大量 logger 调用

链接: 查看日志

PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 8, 2026

Codecov Report

❌ Patch coverage is 31.21577% with 628 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (develop@ecce6b5). Learn more about missing BASE report.

Files with missing lines Patch % Lines
...loy/cache_manager/v1/storage/mooncake/connector.py 0.00% 296 Missing ⚠️
fastdeploy/cache_manager/v1/transfer_manager.py 22.36% 121 Missing and 4 partials ⚠️
fastdeploy/cache_manager/v1/cache_manager.py 36.20% 71 Missing and 3 partials ⚠️
fastdeploy/worker/worker_process.py 15.68% 40 Missing and 3 partials ⚠️
fastdeploy/engine/sched/resource_manager_v1.py 85.62% 0 Missing and 23 partials ⚠️
fastdeploy/cache_manager/v1/cache_controller.py 47.50% 18 Missing and 3 partials ⚠️
fastdeploy/inter_communicator/zmq_client.py 8.33% 11 Missing ⚠️
...oy/cache_manager/v1/storage/attnstore/connector.py 0.00% 9 Missing ⚠️
fastdeploy/cache_manager/v1/storage/base.py 36.36% 7 Missing ⚠️
fastdeploy/engine/request.py 57.14% 5 Missing and 1 partial ⚠️
... and 5 more
Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #7529   +/-   ##
==========================================
  Coverage           ?   62.71%           
==========================================
  Files              ?      461           
  Lines              ?    64895           
  Branches           ?     9960           
==========================================
  Hits               ?    40697           
  Misses             ?    21383           
  Partials           ?     2815           
Flag Coverage Δ
GPU 71.64% <31.21%> (?)
XPU 7.13% <6.90%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread fastdeploy/worker/gpu_model_runner.py Outdated
PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

kevincheng2 and others added 6 commits May 11, 2026 15:11
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When a node transitions from SWAP_TO_DEVICE to DEVICE via
complete_swap_to_device, it was not being added to the
_evictable_device set. This caused nodes with ref_count=0 to
become "orphaned" - not appearing in any evictable set despite
having cache_status=DEVICE.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add new cache_manager.py with cache management functionality
- Add radix_tree.py for prefix caching
- Update block_pool.py and metadata.py
- Update request.py and resource_manager_v1.py for scheduling
- Update gpu_model_runner.py for GPU model execution

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add CacheController class for cache management
- Update config.py with cache related configurations
- Refactor gpu_model_runner.py for improved cache handling
kevincheng2 and others added 15 commits May 11, 2026 15:11
…in CacheController

补全 CacheController 中 Storage 相关接口的实现,使 Host↔Storage 传输链路完整可用:
- prefetch_from_storage / backup_host_to_storage / backup_device_to_storage 此前均为 TODO stub
- D2H evict 之后缺少自动备份到 Storage 的链路

- cache_controller.py:
  - 新增 `storage_enabled` 属性,判断是否配置了 StorageConnector
  - `evict_device_to_host` 支持可选 `StorageMetadata` 参数,D2H 成功后自动 chain backup_host_to_storage
  - `_submit_swap_task` 新增 `on_success` 回调,在 worker 线程内 transfer 成功后触发
  - 实现 `prefetch_from_storage`:通过 ThreadPoolExecutor 异步调用 transfer_manager.prefetch_from_storage
  - 实现 `backup_host_to_storage`:通过 ThreadPoolExecutor 异步调用 transfer_manager.backup_host_to_storage
  - 在 swap 调度入口,当 storage_enabled 且 evict_metadata.hash_values 存在时,自动构造 StorageMetadata 传入 evict_device_to_host
- cache_manager.py:适配相关接口调用变更
- transfer_manager.py:补充类型/接口调整
- storage/mooncake/connector.py:connector 接口对齐
- engine/request.py、engine/sched/resource_manager_v1.py:相关字段/调用适配

```bash
source .venv/py310/bin/activate
bash run.sh
```

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
## Motivation

重构 TransferManager,整合 staging 缓冲区管理逻辑,新增独立的 StagingManager 模块统一管理 CPU staging buffer 的分配与释放。

## Modifications

- 重构 `transfer_manager.py`:简化传输逻辑,减少冗余代码
- 新增 `storage/staging_manager.py`:独立管理 staging buffer 生命周期
- 更新 `cache_manager.py`:适配新的 TransferManager 接口
- 更新 `cache_utils.py`:新增辅助工具函数
- 更新 `storage/__init__.py`:导出 StagingManager
- 精简 `mooncake/connector.py`:移除冗余逻辑
- 新增 `test_staging_manager.py` 和更新 `test_transfer_manager.py` 单元测试

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…er and Worker

Storage prefetch (Storage → CPU) previously had no runtime execution path in
Scheduler/Worker: the Scheduler prepared host-block metadata but the actual
data transfer was never triggered.  Workers also had no mechanism to receive
prefetch commands or report completion, leaving LOADING_FROM_STORAGE blocks
permanently stuck and never promoted to HOST.

- Add `_prefetch_node_map: Dict[int, BlockNode]` to track in-flight blocks by
  host_block_id for O(1) status lookup.
- `prepare_prefetch_metadata`: register returned nodes into `_prefetch_node_map`.
- New `update_storage_blocks_to_host(host_block_ids)`: transition
  LOADING_FROM_STORAGE → HOST after all TP workers confirm transfer done.
- New `abort_prefetch_blocks(host_block_ids)`: remove nodes from RadixTree and
  release host pool blocks on transfer failure.

- Add per-worker ZMQ PUSH/PULL servers (`_prefetch_cmd_servers`,
  `_prefetch_done_servers`), one pair per TP worker, keyed by local_rank.
- `_init_prefetch_zmq_servers()`: initialize servers at startup when storage
  backend is configured.
- `_prefetch_storage_cache()`: after inserting host blocks, serialize
  `StorageMetadata` and broadcast to all TP workers via ZMQ PUSH; then poll
  PULL done sockets until all workers reply, call
  `update_storage_blocks_to_host` on success or `abort_prefetch_blocks` on
  failure.

- Add `receive_pyobj_once(block=False)`: non-blocking (or blocking) receive
  helper returning `(error, data)` tuple; used by Scheduler to poll done
  messages and by Worker in the prefetch loop.

- Add `init_prefetch_zmq_clients()`: connect ZMQ PULL/PUSH clients to
  Scheduler servers for this worker's local_rank; start daemon
  `_prefetch_loop` thread.
- `_prefetch_loop()`: background thread receiving `StorageMetadata` commands,
  calling `cache_controller.prefetch_from_storage`, waiting for
  `AsyncTaskHandler.wait`, and replying with ok/error status.

- Add `TestUpdateStorageBlocksToHost` with 6 test cases covering:
  status transition, multi-block, unknown id, empty list, wrong status,
  and initial-empty-map assertions.

No additional build steps. Enable storage prefetch via existing config:
```bash
python -m fastdeploy.entrypoints.openai.api_server \
  --kvcache-storage-backend <backend> \
  --enable-prefix-caching \
  ...
```
Checkout core files from upstream/develop to fix merge inconsistencies:
- config.py: add model_loader_extra_config, enable_flashinfer_allreduce_fusion
- inter_communicator: remove IPCLock (already deleted upstream)
- engine/common_engine.py, worker/worker_process.py: sync with upstream
- resource_manager_v1.py: revert to upstream (prefetch refactoring to be re-applied)
- cache_manager/v1: sync with upstream
- gpu_model_runner.py, gpu_worker.py: sync with upstream

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Refactor _prefetch_storage_cache into three decoupled phases:
- Phase 1 (preprocess thread): CacheManager.prefetch_storage() does matching + enqueue
- Phase 2 (schedule thread): drain pending list, attach to batch_request for dispatch
- Phase 3 (receiver thread): zmq.Poller receives done msgs, stores results

Worker side: extract prefetch tasks from batch_request, execute via thread pool,
send completion via ZMQ PUSH.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…layer

BatchRequest.__len__ 混入了 prefetch/swap/evict 任务数量,导致 engine 调度
逻辑(判断是否有待处理工作)出现误判;同时 swap/evict 提交散落在
gpu_model_runner 和 resource_manager 中,职责不清晰。

- engine/request.py: 新增 has_pending_work 属性,__len__ 恢复只计 requests
  数量;has_pending_work 同时感知 prefetch/swap/evict 任务
- engine/common_engine.py: 用 has_pending_work 替换 len(batch_request) > 0
  判断,逻辑更准确
- worker/worker_process.py: 将 submit_swap_tasks 调用移至 worker 层处理,
  处理后清空 metadata 避免重复提交
- worker/gpu_model_runner.py: 移除重复的 submit_swap_tasks 调用
- engine/sched/resource_manager_v1.py: 调整 check_and_add_pending_backup /
  issue_pending_backup / dispatch_pending_prefetches 执行顺序,去掉对
  len(batch_request) 的依赖
- cache_manager/v1/cache_manager.py: 恢复 matched_nodes 按 device/host 分类
  逻辑(之前被误注释)

```bash
cd baidu/FastDeploy
bash run.sh
```

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… unit tests

## Motivation

cache_manager_storage_transfer 分支持续迭代,本次提交包含多个改进方向:
1. BlockPool 分配逻辑有性能问题且缺少边界处理
2. CacheController.free_cache 无条件清除 storage 可能误清数据
3. request.py 的 CacheSwapMetadata 使用字符串表示 CacheLevel,类型不安全
4. common_engine / resource_manager / worker 中多模态判断入口不统一
5. radix_tree 中 select_blocks_for_backup 已无使用方,可清理
6. gpu_model_runner 多处可优化:异步写入、新注意力后端、MLA/DSA slot_mapping
7. MACA 平台 ops.py 中 F811 重定义警告
8. 缺少 cache manager v1 核心逻辑的单元测试

## Modifications

- cache_manager/ops.py: 移除 MACA 平台对 swap_cache_per_layer/async 的无效 import(F811)
- cache_manager/v1/block_pool.py: allocate 增加 num_blocks==0 提前返回;批量切片替换循环 pop
- cache_manager/v1/cache_controller.py: free_cache 新增 clear_storage 参数,默认 False
- cache_manager/v1/radix_tree.py: 删除废弃的 select_blocks_for_backup 方法
- engine/request.py: CacheSwapMetadata src/dst_type 改用 CacheLevel 枚举;日志接口统一
- engine/common_engine.py: enable_mm 判断统一为 enable_mm_runtime;V1 cache pause 走
  reset_cache;新增 model_loader_extra_config / enable_flashinfer_allreduce_fusion 透传;
  修复 is_end 时残余 token_ids 未返回的边界 case
- engine/sched/resource_manager_v1.py: enable_mm 统一;block 释放去掉冗余条件分支
- model_executor/forward_meta.py: ForwardMeta 用 slot_mapping 替换 mask_encoder_batch;
  XPUForwardMeta 新增 is_speculative 字段
- worker/gpu_model_runner.py: 引入 DSA/MLA 注意力后端;image processor 通用化;
  share_inputs 改为 async_set_value;新增 _compute_position_ids_and_slot_mapping;
  TBO 双缓冲支持;修复拼写错误;去掉 num_blocks > 40000 硬限制
- worker/worker_process.py: enable_mm_runtime 统一;参数顺序整理
- tests/cache_manager/v1/: 新增 test_cache_manager / test_cache_utils / test_radix_tree
  单元测试,覆盖 offload/load、pending backup、hash、LayerDoneCounter、complete_swap 等

## Usage or Command

```bash
# 运行新增单元测试
source .venv/py310/bin/activate
python -m pytest tests/cache_manager/v1/ -vv -s

# 启动服务(单机)
cd baidu/FastDeploy
bash run.sh
```

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…urce manager v1 bugs

## Motivation

修复 cache_manager 和 resource_manager_v1 中的多个 bug。

## Modifications

- `cache_manager.py`: 修复 `free_gpu_block_ids` 返回实际空闲块列表而非 range,调整日志顺序(先打印日志再计算 matched_device/host_ids)
- `common_engine.py`: 修正 typo(Unexcepted → Unexpected)
- `request.py`: 修正 `cache_evict_metadata` 中 src/dst 类型方向错误(DEVICE→HOST 驱逐方向)
- `resource_manager_v1.py`: PD 分离 prefill 节点跳过 prefix cache update_cache_blocks;在 prefill 节点分配后调用 update_cache_blocks

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
## Motivation

`increment_ref_nodes` should only be called during the scheduling
phase (when `skip_storage=True`), not during the actual storage
prefetch phase. The previous condition was inverted, causing ref
counts to be incremented at the wrong time.

## Modifications

- Fix condition from `not (self._storage_scheduler and skip_storage)`
  to `skip_storage` in `CacheManager.match`
- Update comment to clarify "only scheduling phase"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
## Motivation

直接调用 `_host_pool.allocate()` 时不会触发驱逐,导致在 host block 空闲不足
但存在 evictable block 的情况下,`can_allocate_host_blocks` 返回 True 但分配
静默失败。

## Modifications

- `prepare_prefetch_metadata`:将 `_host_pool.allocate()` 替换为 `allocate_host_blocks()`,
  空闲不足时自动驱逐 evictable host block 后再分配
- 删除未被生产代码调用的 `offload_to_host` 方法及其全部测试用例
…e manager v1

## Motivation

在 cache manager v1 下,KV cache 的存储回写由 v1 内部的 RadixTree 机制处理,
resource_manager_v1 中的 write_cache_to_storage / write_cache_to_storage_decode
调用属于冗余,应跳过。

## Modifications

- resource_manager_v1.py:preemption 路径的两处存储回写调用(decode/非decode)加上
  `and not self.enable_cache_manager_v1` 条件,v1 下不再触发
- cache_manager/v1/cache_manager.py:prefix caching 未启用时,补充初始化
  `request._match_result = MatchResult()`,避免后续访问空属性

## Usage or Command

启动服务时设置 `--enable-cache-manager-v1` 即可复现修复效果:

```bash
python -m fastdeploy.entrypoints.openai.api_server \
  --enable-cache-manager-v1 \
  ...
```
…tree position

## Motivation

三级 KV Cache(Device → Host → Storage)预拉取完成后,第二次 match_prefix
仍然只命中 device 层的 block,storage 预拉取的 host block 无法被找到。

根本原因:`prepare_prefetch_metadata` 调用 `radix_tree.insert` 时未传
`start_node`,导致 8 个新 LOADING_FROM_STORAGE 节点被错误地挂在 radix tree
的 root 节点下(以 storage hash h22 作为 root 直接子节点),而非接在已有
22 节点链末尾(node[21] 的子节点)。`find_prefix` 遍历到 node[21] 时,
node[21].children 中不存在 h22,立即停止,始终只返回 22 个节点。

同批次还修复了几个关联问题:
- `_match_storage` 只探测 "key" kind,Mooncake LRU 可能单独驱逐 "value"
  导致虚假命中,改为同时探测 key + value,两者都存在才算命中
- partial write 时部分 key 写成功、部分失败,改为自动 rollback 已写入的
  key,防止 _match_storage 发现半写 block
- `prepare_prefetch_metadata` 中只注册真正是 LOADING_FROM_STORAGE 状态的
  节点进 prefetch_node_map,避免 insert 复用已有 HOST/DEVICE 节点时触发
  spurious "unexpected status" 警告

## Modifications

- `cache_manager.py`
  - `match_prefix`: 传 `start_node=matched_nodes[-1]` 给 `prepare_prefetch_metadata`
  - `prepare_prefetch_metadata`: 新增 `start_node` 参数,透传给 `_radix_tree.insert`
  - `prepare_prefetch_metadata`: 只注册 LOADING_FROM_STORAGE 节点进 prefetch_node_map
  - `_match_storage`: 同时探测 key + value 两个 kind,均存在才视为命中
- `storage/base.py`: 新增 `batch_exists` / `batch_delete` 默认实现
- `storage/mooncake/connector.py`: Mooncake 实现 `batch_exists` / `batch_delete`
- `storage/staging_manager.py`: partial write 自动 rollback
- `transfer_manager.py`: prefetch/backup 失败时输出诊断日志
- `tests/cache_manager/v1/test_cache_manager.py`: 添加回归测试 `TestPreparePrefixtMetadataStartNode`

## Usage or Command

```bash
# 运行回归测试
source .venv/py310/bin/activate
PYTHONPATH=. python -m pytest tests/cache_manager/v1/test_cache_manager.py::TestPreparePrefixtMetadataStartNode -v
```
@kevincheng2 kevincheng2 force-pushed the feature/cache_manager_storage_transfer branch from 5043d01 to a656c6e Compare May 11, 2026 07:19
PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

…count and add debug logging

- Remove single-key storage methods (get/set/delete/exists), keep only batch ops
- Remove attention_store backend support
- Add cancel_pending_prefetch method to CacheManager
- Fix ref_count balance in update_storage_blocks_to_host (decrement after LFS->HOST)
- Skip LOADING_FROM_STORAGE nodes in radix_tree find_prefix
- Add debug logging for cache allocate/match/finish/prefetch flows
- Change Mooncake warmup asserts to RuntimeError
- Update tests to match new interfaces
Copy link
Copy Markdown

@PaddlePaddle-bot PaddlePaddle-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Paddle-CI-Agent | pr_review | 2026-05-13 18:19:39

📋 Review 摘要

PR 概述:实现 Cache Manager V1 的 storage transfer 子系统,包含 GPU↔CPU↔Storage 多级传输、D2H→Storage 链式驱逐、以及基于 ZMQ 的预取 pipeline。
变更范围cache_manager/v1/engine/sched/worker/inter_communicator/
影响面 Tag[KVCache] [Engine] [Scheduler]


📝 PR 规范检查

标题使用了两个 Tag [KVCache][Feature],但规范要求仅一个官方 Tag;PR body 缺少 ## Accuracy Tests 段落。

标题建议(可直接复制):

  • [KVCache] Implement Cache Manager V1 Storage Transfer and Prefetch (2/n)

PR 描述建议(可直接复制,需补充 ## Accuracy Tests 段落):

## Motivation
本 PR 是 Cache Manager V1 系列的第二部分,基于 #7097 之上构建。

引入了 Cache Manager V1 的 **storage transfer** 子系统,主要能力包括:
- KV cache 从 GPU/CPU 卸载到外部存储(Mooncake、RDMA、IPC)
- 异步从外部存储预取 KV cache 回设备
- D2H → Storage 链式传输,实现多级缓存无缝驱逐
- 基于 ZMQ 的 Scheduler 与 Worker 之间的预取 pipeline

## Modifications
- `staging_manager.py`:新增 StagingManager,管理异步存储传输队列及预取任务
- `storage/base.py`:更新 StorageConnector 基类,增加 prefetch/backup 接口
- `storage/mooncake/connector.py`:Mooncake 存储连接器,支持 prefetch/backup
- `transfer_manager.py`:重构以支持多级传输(GPU↔CPU↔Storage)
- `cache_controller.py`:新增存储预取/备份编排逻辑及 D2H→Storage 链式传输
- `cache_manager.py`:集成存储传输 API,实现 prefetch_storage / abort_prefetch_blocks 等
- `engine/sched/resource_manager_v1.py`:集成存储预取触发逻辑
- `worker/gpu_worker.py``worker_process.py`:新增 ZMQ 预取 pipeline 处理
- `inter_communicator/zmq_client.py`:扩展 ZMQ client 支持预取消息

## Usage or Command
```bash
python -m fastdeploy.entrypoints.openai.api_server \
  --model /path/to/model \
  --cache-manager-version v1 \
  --host-cache-size-gb 40 \
  --storage-backend mooncake \
  --tensor-parallel-size 8
```

## Accuracy Tests
N/A(本 PR 不涉及模型前向代码修改,无需精度测试)

## Checklist

- [x] Add at least a tag in the PR title.
  - Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
  - You can add new tags based on the PR content, but the semantics must be clear.
- [x] Format your code, run `pre-commit` before commit.
- [x] Add unit tests. Please write the reason in this PR if no unit tests.
- [x] Provide accuracy results.
- [ ] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.

问题

级别 文件 概述
🟡 建议 fastdeploy/cache_manager/v1/cache_manager.py:282 [Debug] 前缀的日志使用 logger.info 级别,热路径中每请求都会输出 block_id/ref_count 列表,生产环境将造成严重日志泛洪
🟡 建议 fastdeploy/cache_manager/v1/cache_manager.py:1199 abort_prefetch_blocks 直接调用 _radix_tree._remove_node_from_tree 私有方法,破坏封装
🟡 建议 fastdeploy/cache_manager/v1/cache_controller.py:116 storage_enabled 通过 getattr 读取 _transfer_manager._storage_connector 私有属性,应在 CacheTransferManager 上暴露公共 API
❓ 疑问 fastdeploy/cache_manager/v1/storage/__init__.py:73 create_storage_scheduler 连接失败从"warn and continue"改为抛 RuntimeError,Mooncake 暂时不可用时将导致整个服务无法启动,是否有重试或降级策略?
📝 PR 规范 标题含两个 Tag [KVCache][Feature](规范要求仅一个);PR body 缺少 ## Accuracy Tests 段落

总体评价

整体实现架构清晰,D2H→Storage 链式驱逐的 on_success 回调设计合理;存储层 key 格式统一到 cache_utils.storage_key_for_block 也是良好重构。主要需关注调试日志级别问题(会在生产环境产生大量噪声)和跨类私有属性访问的封装问题,其余为设计层面的讨论点。

host_refs_before = [(n.block_id, n.ref_count) for n in match_result.host_nodes]
free_host_block_ids = self._radix_tree.swap_to_device(match_result.host_nodes, device_blocks)
host_refs_after = [(n.block_id, n.ref_count) for n in match_result.host_nodes]
logger.info(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 多处 [Debug] 前缀的消息使用 logger.info 级别输出(此处及 L321、L529、L558、L775、L1030、L1109、L1165 等)。

每个请求都会打印包含完整 block_id 列表和 ref_count 的 INFO 日志,在高并发生产环境将产生严重日志泛洪,同时 dict/list 的字符串化本身有 CPU 开销。

建议统一改为 logger.debug,或在合并前移除这些临时调试日志:

# 改为
logger.debug(
    f"[allocate_device_blocks] swap host->device: "
    f"host_refs_before={host_refs_before}, host_refs_after={host_refs_after}, ..."
)

node = self._prefetch_node_map.pop(block_id, None)
if node is None:
continue
self._radix_tree._remove_node_from_tree(node)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 abort_prefetch_blocks 直接调用了 RadixTree 的私有方法 _radix_tree._remove_node_from_tree(node),破坏封装边界。若 RadixTree 内部实现变更,此处会悄然失效。

建议在 RadixTree 上暴露一个公共方法,例如:

# radix_tree.py
def remove_node(self, node: BlockNode) -> None:
    """Remove a single node from the tree and its parent's children map."""
    self._remove_node_from_tree(node)

然后在 CacheManager 中调用 self._radix_tree.remove_node(node)

return None

@property
def storage_enabled(self) -> bool:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 storage_enabled 通过 getattr(self._transfer_manager, "_storage_connector", None) 读取另一个类的私有属性,是跨类私有属性访问反模式。

建议在 CacheTransferManager 上添加公共属性:

# transfer_manager.py
@property
def has_storage_connector(self) -> bool:
    return self._storage_connector is not None

然后 CacheController.storage_enabled 改为:

return self._transfer_manager.has_storage_connector

if not scheduler.connect():
# Log warning but still return the scheduler
pass
raise RuntimeError(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 create_storage_scheduler 的连接失败处理从原来的「warn and continue(返回 scheduler)」改成了抛 RuntimeError

这意味着若 Mooncake 服务在节点启动时暂时不可用(如网络抖动、服务未就绪),整个推理服务将无法启动。

请确认这是预期行为。如果是,建议在错误信息里补充排查建议(如检查 Mooncake 服务是否已就绪、MOONCAKE_METADATA_SERVER 配置是否正确);如果希望支持降级运行(无 storage 仍可服务),则应恢复 warn-and-continue 语义。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants