[KVCache][Feature] Implement Cache Manager V1 Storage Transfer and Prefetch (2/n)#7529
[KVCache][Feature] Implement Cache Manager V1 Storage Transfer and Prefetch (2/n)#7529kevincheng2 wants to merge 37 commits into
Conversation
|
Thanks for your contribution! |
0b34168 to
1757460
Compare
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览
2 任务状态汇总2.1 Required任务 : 7/10 通过
2.2 可选任务 — 24/28 通过
3 失败详情(仅 required)Approval — 代码规范(置信度: 高)Approval
根因详情: 关键日志: 修复建议:
修复建议摘要: 请联系 xyxinyang/zyyzghb 在PR上 Approve 关联变更: PR 新增 链接: 查看日志 |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #7529 +/- ##
==========================================
Coverage ? 62.71%
==========================================
Files ? 461
Lines ? 64895
Branches ? 9960
==========================================
Hits ? 40697
Misses ? 21383
Partials ? 2815
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When a node transitions from SWAP_TO_DEVICE to DEVICE via complete_swap_to_device, it was not being added to the _evictable_device set. This caused nodes with ref_count=0 to become "orphaned" - not appearing in any evictable set despite having cache_status=DEVICE. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add new cache_manager.py with cache management functionality - Add radix_tree.py for prefix caching - Update block_pool.py and metadata.py - Update request.py and resource_manager_v1.py for scheduling - Update gpu_model_runner.py for GPU model execution Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add CacheController class for cache management - Update config.py with cache related configurations - Refactor gpu_model_runner.py for improved cache handling
…in CacheController 补全 CacheController 中 Storage 相关接口的实现,使 Host↔Storage 传输链路完整可用: - prefetch_from_storage / backup_host_to_storage / backup_device_to_storage 此前均为 TODO stub - D2H evict 之后缺少自动备份到 Storage 的链路 - cache_controller.py: - 新增 `storage_enabled` 属性,判断是否配置了 StorageConnector - `evict_device_to_host` 支持可选 `StorageMetadata` 参数,D2H 成功后自动 chain backup_host_to_storage - `_submit_swap_task` 新增 `on_success` 回调,在 worker 线程内 transfer 成功后触发 - 实现 `prefetch_from_storage`:通过 ThreadPoolExecutor 异步调用 transfer_manager.prefetch_from_storage - 实现 `backup_host_to_storage`:通过 ThreadPoolExecutor 异步调用 transfer_manager.backup_host_to_storage - 在 swap 调度入口,当 storage_enabled 且 evict_metadata.hash_values 存在时,自动构造 StorageMetadata 传入 evict_device_to_host - cache_manager.py:适配相关接口调用变更 - transfer_manager.py:补充类型/接口调整 - storage/mooncake/connector.py:connector 接口对齐 - engine/request.py、engine/sched/resource_manager_v1.py:相关字段/调用适配 ```bash source .venv/py310/bin/activate bash run.sh ``` Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
## Motivation 重构 TransferManager,整合 staging 缓冲区管理逻辑,新增独立的 StagingManager 模块统一管理 CPU staging buffer 的分配与释放。 ## Modifications - 重构 `transfer_manager.py`:简化传输逻辑,减少冗余代码 - 新增 `storage/staging_manager.py`:独立管理 staging buffer 生命周期 - 更新 `cache_manager.py`:适配新的 TransferManager 接口 - 更新 `cache_utils.py`:新增辅助工具函数 - 更新 `storage/__init__.py`:导出 StagingManager - 精简 `mooncake/connector.py`:移除冗余逻辑 - 新增 `test_staging_manager.py` 和更新 `test_transfer_manager.py` 单元测试 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…er and Worker Storage prefetch (Storage → CPU) previously had no runtime execution path in Scheduler/Worker: the Scheduler prepared host-block metadata but the actual data transfer was never triggered. Workers also had no mechanism to receive prefetch commands or report completion, leaving LOADING_FROM_STORAGE blocks permanently stuck and never promoted to HOST. - Add `_prefetch_node_map: Dict[int, BlockNode]` to track in-flight blocks by host_block_id for O(1) status lookup. - `prepare_prefetch_metadata`: register returned nodes into `_prefetch_node_map`. - New `update_storage_blocks_to_host(host_block_ids)`: transition LOADING_FROM_STORAGE → HOST after all TP workers confirm transfer done. - New `abort_prefetch_blocks(host_block_ids)`: remove nodes from RadixTree and release host pool blocks on transfer failure. - Add per-worker ZMQ PUSH/PULL servers (`_prefetch_cmd_servers`, `_prefetch_done_servers`), one pair per TP worker, keyed by local_rank. - `_init_prefetch_zmq_servers()`: initialize servers at startup when storage backend is configured. - `_prefetch_storage_cache()`: after inserting host blocks, serialize `StorageMetadata` and broadcast to all TP workers via ZMQ PUSH; then poll PULL done sockets until all workers reply, call `update_storage_blocks_to_host` on success or `abort_prefetch_blocks` on failure. - Add `receive_pyobj_once(block=False)`: non-blocking (or blocking) receive helper returning `(error, data)` tuple; used by Scheduler to poll done messages and by Worker in the prefetch loop. - Add `init_prefetch_zmq_clients()`: connect ZMQ PULL/PUSH clients to Scheduler servers for this worker's local_rank; start daemon `_prefetch_loop` thread. - `_prefetch_loop()`: background thread receiving `StorageMetadata` commands, calling `cache_controller.prefetch_from_storage`, waiting for `AsyncTaskHandler.wait`, and replying with ok/error status. - Add `TestUpdateStorageBlocksToHost` with 6 test cases covering: status transition, multi-block, unknown id, empty list, wrong status, and initial-empty-map assertions. No additional build steps. Enable storage prefetch via existing config: ```bash python -m fastdeploy.entrypoints.openai.api_server \ --kvcache-storage-backend <backend> \ --enable-prefix-caching \ ... ```
Checkout core files from upstream/develop to fix merge inconsistencies: - config.py: add model_loader_extra_config, enable_flashinfer_allreduce_fusion - inter_communicator: remove IPCLock (already deleted upstream) - engine/common_engine.py, worker/worker_process.py: sync with upstream - resource_manager_v1.py: revert to upstream (prefetch refactoring to be re-applied) - cache_manager/v1: sync with upstream - gpu_model_runner.py, gpu_worker.py: sync with upstream Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This reverts commit 3919ae9.
Refactor _prefetch_storage_cache into three decoupled phases: - Phase 1 (preprocess thread): CacheManager.prefetch_storage() does matching + enqueue - Phase 2 (schedule thread): drain pending list, attach to batch_request for dispatch - Phase 3 (receiver thread): zmq.Poller receives done msgs, stores results Worker side: extract prefetch tasks from batch_request, execute via thread pool, send completion via ZMQ PUSH. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…layer BatchRequest.__len__ 混入了 prefetch/swap/evict 任务数量,导致 engine 调度 逻辑(判断是否有待处理工作)出现误判;同时 swap/evict 提交散落在 gpu_model_runner 和 resource_manager 中,职责不清晰。 - engine/request.py: 新增 has_pending_work 属性,__len__ 恢复只计 requests 数量;has_pending_work 同时感知 prefetch/swap/evict 任务 - engine/common_engine.py: 用 has_pending_work 替换 len(batch_request) > 0 判断,逻辑更准确 - worker/worker_process.py: 将 submit_swap_tasks 调用移至 worker 层处理, 处理后清空 metadata 避免重复提交 - worker/gpu_model_runner.py: 移除重复的 submit_swap_tasks 调用 - engine/sched/resource_manager_v1.py: 调整 check_and_add_pending_backup / issue_pending_backup / dispatch_pending_prefetches 执行顺序,去掉对 len(batch_request) 的依赖 - cache_manager/v1/cache_manager.py: 恢复 matched_nodes 按 device/host 分类 逻辑(之前被误注释) ```bash cd baidu/FastDeploy bash run.sh ``` Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… unit tests ## Motivation cache_manager_storage_transfer 分支持续迭代,本次提交包含多个改进方向: 1. BlockPool 分配逻辑有性能问题且缺少边界处理 2. CacheController.free_cache 无条件清除 storage 可能误清数据 3. request.py 的 CacheSwapMetadata 使用字符串表示 CacheLevel,类型不安全 4. common_engine / resource_manager / worker 中多模态判断入口不统一 5. radix_tree 中 select_blocks_for_backup 已无使用方,可清理 6. gpu_model_runner 多处可优化:异步写入、新注意力后端、MLA/DSA slot_mapping 7. MACA 平台 ops.py 中 F811 重定义警告 8. 缺少 cache manager v1 核心逻辑的单元测试 ## Modifications - cache_manager/ops.py: 移除 MACA 平台对 swap_cache_per_layer/async 的无效 import(F811) - cache_manager/v1/block_pool.py: allocate 增加 num_blocks==0 提前返回;批量切片替换循环 pop - cache_manager/v1/cache_controller.py: free_cache 新增 clear_storage 参数,默认 False - cache_manager/v1/radix_tree.py: 删除废弃的 select_blocks_for_backup 方法 - engine/request.py: CacheSwapMetadata src/dst_type 改用 CacheLevel 枚举;日志接口统一 - engine/common_engine.py: enable_mm 判断统一为 enable_mm_runtime;V1 cache pause 走 reset_cache;新增 model_loader_extra_config / enable_flashinfer_allreduce_fusion 透传; 修复 is_end 时残余 token_ids 未返回的边界 case - engine/sched/resource_manager_v1.py: enable_mm 统一;block 释放去掉冗余条件分支 - model_executor/forward_meta.py: ForwardMeta 用 slot_mapping 替换 mask_encoder_batch; XPUForwardMeta 新增 is_speculative 字段 - worker/gpu_model_runner.py: 引入 DSA/MLA 注意力后端;image processor 通用化; share_inputs 改为 async_set_value;新增 _compute_position_ids_and_slot_mapping; TBO 双缓冲支持;修复拼写错误;去掉 num_blocks > 40000 硬限制 - worker/worker_process.py: enable_mm_runtime 统一;参数顺序整理 - tests/cache_manager/v1/: 新增 test_cache_manager / test_cache_utils / test_radix_tree 单元测试,覆盖 offload/load、pending backup、hash、LayerDoneCounter、complete_swap 等 ## Usage or Command ```bash # 运行新增单元测试 source .venv/py310/bin/activate python -m pytest tests/cache_manager/v1/ -vv -s # 启动服务(单机) cd baidu/FastDeploy bash run.sh ``` Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…urce manager v1 bugs ## Motivation 修复 cache_manager 和 resource_manager_v1 中的多个 bug。 ## Modifications - `cache_manager.py`: 修复 `free_gpu_block_ids` 返回实际空闲块列表而非 range,调整日志顺序(先打印日志再计算 matched_device/host_ids) - `common_engine.py`: 修正 typo(Unexcepted → Unexpected) - `request.py`: 修正 `cache_evict_metadata` 中 src/dst 类型方向错误(DEVICE→HOST 驱逐方向) - `resource_manager_v1.py`: PD 分离 prefill 节点跳过 prefix cache update_cache_blocks;在 prefill 节点分配后调用 update_cache_blocks Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
## Motivation `increment_ref_nodes` should only be called during the scheduling phase (when `skip_storage=True`), not during the actual storage prefetch phase. The previous condition was inverted, causing ref counts to be incremented at the wrong time. ## Modifications - Fix condition from `not (self._storage_scheduler and skip_storage)` to `skip_storage` in `CacheManager.match` - Update comment to clarify "only scheduling phase" Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
## Motivation 直接调用 `_host_pool.allocate()` 时不会触发驱逐,导致在 host block 空闲不足 但存在 evictable block 的情况下,`can_allocate_host_blocks` 返回 True 但分配 静默失败。 ## Modifications - `prepare_prefetch_metadata`:将 `_host_pool.allocate()` 替换为 `allocate_host_blocks()`, 空闲不足时自动驱逐 evictable host block 后再分配 - 删除未被生产代码调用的 `offload_to_host` 方法及其全部测试用例
…e manager v1 ## Motivation 在 cache manager v1 下,KV cache 的存储回写由 v1 内部的 RadixTree 机制处理, resource_manager_v1 中的 write_cache_to_storage / write_cache_to_storage_decode 调用属于冗余,应跳过。 ## Modifications - resource_manager_v1.py:preemption 路径的两处存储回写调用(decode/非decode)加上 `and not self.enable_cache_manager_v1` 条件,v1 下不再触发 - cache_manager/v1/cache_manager.py:prefix caching 未启用时,补充初始化 `request._match_result = MatchResult()`,避免后续访问空属性 ## Usage or Command 启动服务时设置 `--enable-cache-manager-v1` 即可复现修复效果: ```bash python -m fastdeploy.entrypoints.openai.api_server \ --enable-cache-manager-v1 \ ... ```
…tree position ## Motivation 三级 KV Cache(Device → Host → Storage)预拉取完成后,第二次 match_prefix 仍然只命中 device 层的 block,storage 预拉取的 host block 无法被找到。 根本原因:`prepare_prefetch_metadata` 调用 `radix_tree.insert` 时未传 `start_node`,导致 8 个新 LOADING_FROM_STORAGE 节点被错误地挂在 radix tree 的 root 节点下(以 storage hash h22 作为 root 直接子节点),而非接在已有 22 节点链末尾(node[21] 的子节点)。`find_prefix` 遍历到 node[21] 时, node[21].children 中不存在 h22,立即停止,始终只返回 22 个节点。 同批次还修复了几个关联问题: - `_match_storage` 只探测 "key" kind,Mooncake LRU 可能单独驱逐 "value" 导致虚假命中,改为同时探测 key + value,两者都存在才算命中 - partial write 时部分 key 写成功、部分失败,改为自动 rollback 已写入的 key,防止 _match_storage 发现半写 block - `prepare_prefetch_metadata` 中只注册真正是 LOADING_FROM_STORAGE 状态的 节点进 prefetch_node_map,避免 insert 复用已有 HOST/DEVICE 节点时触发 spurious "unexpected status" 警告 ## Modifications - `cache_manager.py` - `match_prefix`: 传 `start_node=matched_nodes[-1]` 给 `prepare_prefetch_metadata` - `prepare_prefetch_metadata`: 新增 `start_node` 参数,透传给 `_radix_tree.insert` - `prepare_prefetch_metadata`: 只注册 LOADING_FROM_STORAGE 节点进 prefetch_node_map - `_match_storage`: 同时探测 key + value 两个 kind,均存在才视为命中 - `storage/base.py`: 新增 `batch_exists` / `batch_delete` 默认实现 - `storage/mooncake/connector.py`: Mooncake 实现 `batch_exists` / `batch_delete` - `storage/staging_manager.py`: partial write 自动 rollback - `transfer_manager.py`: prefetch/backup 失败时输出诊断日志 - `tests/cache_manager/v1/test_cache_manager.py`: 添加回归测试 `TestPreparePrefixtMetadataStartNode` ## Usage or Command ```bash # 运行回归测试 source .venv/py310/bin/activate PYTHONPATH=. python -m pytest tests/cache_manager/v1/test_cache_manager.py::TestPreparePrefixtMetadataStartNode -v ```
5043d01 to
a656c6e
Compare
…count and add debug logging - Remove single-key storage methods (get/set/delete/exists), keep only batch ops - Remove attention_store backend support - Add cancel_pending_prefetch method to CacheManager - Fix ref_count balance in update_storage_blocks_to_host (decrement after LFS->HOST) - Skip LOADING_FROM_STORAGE nodes in radix_tree find_prefix - Add debug logging for cache allocate/match/finish/prefetch flows - Change Mooncake warmup asserts to RuntimeError - Update tests to match new interfaces
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-13 18:19:39
📋 Review 摘要
PR 概述:实现 Cache Manager V1 的 storage transfer 子系统,包含 GPU↔CPU↔Storage 多级传输、D2H→Storage 链式驱逐、以及基于 ZMQ 的预取 pipeline。
变更范围:cache_manager/v1/、engine/sched/、worker/、inter_communicator/
影响面 Tag:[KVCache] [Engine] [Scheduler]
📝 PR 规范检查
标题使用了两个 Tag [KVCache][Feature],但规范要求仅一个官方 Tag;PR body 缺少 ## Accuracy Tests 段落。
标题建议(可直接复制):
[KVCache] Implement Cache Manager V1 Storage Transfer and Prefetch (2/n)
PR 描述建议(可直接复制,需补充 ## Accuracy Tests 段落):
## Motivation
本 PR 是 Cache Manager V1 系列的第二部分,基于 #7097 之上构建。
引入了 Cache Manager V1 的 **storage transfer** 子系统,主要能力包括:
- KV cache 从 GPU/CPU 卸载到外部存储(Mooncake、RDMA、IPC)
- 异步从外部存储预取 KV cache 回设备
- D2H → Storage 链式传输,实现多级缓存无缝驱逐
- 基于 ZMQ 的 Scheduler 与 Worker 之间的预取 pipeline
## Modifications
- `staging_manager.py`:新增 StagingManager,管理异步存储传输队列及预取任务
- `storage/base.py`:更新 StorageConnector 基类,增加 prefetch/backup 接口
- `storage/mooncake/connector.py`:Mooncake 存储连接器,支持 prefetch/backup
- `transfer_manager.py`:重构以支持多级传输(GPU↔CPU↔Storage)
- `cache_controller.py`:新增存储预取/备份编排逻辑及 D2H→Storage 链式传输
- `cache_manager.py`:集成存储传输 API,实现 prefetch_storage / abort_prefetch_blocks 等
- `engine/sched/resource_manager_v1.py`:集成存储预取触发逻辑
- `worker/gpu_worker.py`、`worker_process.py`:新增 ZMQ 预取 pipeline 处理
- `inter_communicator/zmq_client.py`:扩展 ZMQ client 支持预取消息
## Usage or Command
```bash
python -m fastdeploy.entrypoints.openai.api_server \
--model /path/to/model \
--cache-manager-version v1 \
--host-cache-size-gb 40 \
--storage-backend mooncake \
--tensor-parallel-size 8
```
## Accuracy Tests
N/A(本 PR 不涉及模型前向代码修改,无需精度测试)
## Checklist
- [x] Add at least a tag in the PR title.
- Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
- You can add new tags based on the PR content, but the semantics must be clear.
- [x] Format your code, run `pre-commit` before commit.
- [x] Add unit tests. Please write the reason in this PR if no unit tests.
- [x] Provide accuracy results.
- [ ] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🟡 建议 | fastdeploy/cache_manager/v1/cache_manager.py:282 |
[Debug] 前缀的日志使用 logger.info 级别,热路径中每请求都会输出 block_id/ref_count 列表,生产环境将造成严重日志泛洪 |
| 🟡 建议 | fastdeploy/cache_manager/v1/cache_manager.py:1199 |
abort_prefetch_blocks 直接调用 _radix_tree._remove_node_from_tree 私有方法,破坏封装 |
| 🟡 建议 | fastdeploy/cache_manager/v1/cache_controller.py:116 |
storage_enabled 通过 getattr 读取 _transfer_manager._storage_connector 私有属性,应在 CacheTransferManager 上暴露公共 API |
| ❓ 疑问 | fastdeploy/cache_manager/v1/storage/__init__.py:73 |
create_storage_scheduler 连接失败从"warn and continue"改为抛 RuntimeError,Mooncake 暂时不可用时将导致整个服务无法启动,是否有重试或降级策略? |
| 📝 PR 规范 | — | 标题含两个 Tag [KVCache][Feature](规范要求仅一个);PR body 缺少 ## Accuracy Tests 段落 |
总体评价
整体实现架构清晰,D2H→Storage 链式驱逐的 on_success 回调设计合理;存储层 key 格式统一到 cache_utils.storage_key_for_block 也是良好重构。主要需关注调试日志级别问题(会在生产环境产生大量噪声)和跨类私有属性访问的封装问题,其余为设计层面的讨论点。
| host_refs_before = [(n.block_id, n.ref_count) for n in match_result.host_nodes] | ||
| free_host_block_ids = self._radix_tree.swap_to_device(match_result.host_nodes, device_blocks) | ||
| host_refs_after = [(n.block_id, n.ref_count) for n in match_result.host_nodes] | ||
| logger.info( |
There was a problem hiding this comment.
🟡 建议 多处 [Debug] 前缀的消息使用 logger.info 级别输出(此处及 L321、L529、L558、L775、L1030、L1109、L1165 等)。
每个请求都会打印包含完整 block_id 列表和 ref_count 的 INFO 日志,在高并发生产环境将产生严重日志泛洪,同时 dict/list 的字符串化本身有 CPU 开销。
建议统一改为 logger.debug,或在合并前移除这些临时调试日志:
# 改为
logger.debug(
f"[allocate_device_blocks] swap host->device: "
f"host_refs_before={host_refs_before}, host_refs_after={host_refs_after}, ..."
)| node = self._prefetch_node_map.pop(block_id, None) | ||
| if node is None: | ||
| continue | ||
| self._radix_tree._remove_node_from_tree(node) |
There was a problem hiding this comment.
🟡 建议 abort_prefetch_blocks 直接调用了 RadixTree 的私有方法 _radix_tree._remove_node_from_tree(node),破坏封装边界。若 RadixTree 内部实现变更,此处会悄然失效。
建议在 RadixTree 上暴露一个公共方法,例如:
# radix_tree.py
def remove_node(self, node: BlockNode) -> None:
"""Remove a single node from the tree and its parent's children map."""
self._remove_node_from_tree(node)然后在 CacheManager 中调用 self._radix_tree.remove_node(node)。
| return None | ||
|
|
||
| @property | ||
| def storage_enabled(self) -> bool: |
There was a problem hiding this comment.
🟡 建议 storage_enabled 通过 getattr(self._transfer_manager, "_storage_connector", None) 读取另一个类的私有属性,是跨类私有属性访问反模式。
建议在 CacheTransferManager 上添加公共属性:
# transfer_manager.py
@property
def has_storage_connector(self) -> bool:
return self._storage_connector is not None然后 CacheController.storage_enabled 改为:
return self._transfer_manager.has_storage_connector| if not scheduler.connect(): | ||
| # Log warning but still return the scheduler | ||
| pass | ||
| raise RuntimeError( |
There was a problem hiding this comment.
❓ 疑问 create_storage_scheduler 的连接失败处理从原来的「warn and continue(返回 scheduler)」改成了抛 RuntimeError。
这意味着若 Mooncake 服务在节点启动时暂时不可用(如网络抖动、服务未就绪),整个推理服务将无法启动。
请确认这是预期行为。如果是,建议在错误信息里补充排查建议(如检查 Mooncake 服务是否已就绪、MOONCAKE_METADATA_SERVER 配置是否正确);如果希望支持降级运行(无 storage 仍可服务),则应恢复 warn-and-continue 语义。
Motivation
本 PR 是 Cache Manager V1 系列的第二部分,基于 #7097 之上构建。
引入了 Cache Manager V1 的 storage transfer 子系统,主要能力包括:
Modifications
核心模块
fastdeploy/cache_manager/v1/storage/staging_manager.py:新增StagingManager,管理异步存储传输队列及预取任务base.py:更新StorageConnector基类,增加 prefetch/backup 接口mooncake/connector.py:Mooncake 存储连接器,支持 prefetch/backupfastdeploy/cache_manager/v1/transfer_manager.py:重构以支持多级传输(GPU↔CPU↔Storage),将TransferManager与StagingManager分离fastdeploy/cache_manager/v1/cache_controller.py:新增存储预取/备份编排逻辑及 D2H→Storage 链式传输fastdeploy/cache_manager/v1/cache_manager.py:集成存储传输 APIfastdeploy/cache_manager/v1/block_pool.py:支持存储层级 blockEngine & Worker 集成
fastdeploy/engine/sched/resource_manager_v1.py:在资源管理器中集成存储预取触发逻辑fastdeploy/worker/gpu_worker.py/worker_process.py:新增 ZMQ 预取 pipeline 处理fastdeploy/inter_communicator/zmq_client.py:扩展 ZMQ client 支持预取消息fastdeploy/engine/common_engine.py/request.py:存储传输元数据相关更新测试
tests/cache_manager/v1/test_staging_manager.py:StagingManager单元测试tests/cache_manager/v1/test_transfer_manager.py:更新 transfer manager 测试tests/cache_manager/v1/test_cache_manager.py:更新 cache manager 测试tests/cache_manager/v1/test_cache_utils.py:更新 cache utils 测试Usage or Command
启动包含 storage transfer 功能的服务示例:
Checklist
pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch.