[RL][Feature] R3 Support CPU PrefixCache#7099
Conversation
|
Thanks for your contribution! |
…e (swap/storage/PD) Implement dual-buffer architecture for routing replay: - GPU transient buffer [max_num_batched_tokens, L, K] with Triton v2 kernel - SharedMemory routing_host_buffer for cross-process Engine/Worker/CTM sharing - Lazy SharedMemory attach in Worker and TokenProcessor (Engine creates after profiling) - CTM routing write/read for swap and storage backends - PD disaggregation: P gathers routing via send_first_token, D writes to host buffer - Local store persistence verified end-to-end Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7f7d86e to
0c68b77
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## release/2.5 #7099 +/- ##
==============================================
Coverage ? 68.78%
==============================================
Files ? 392
Lines ? 54723
Branches ? 8627
==============================================
Hits ? 37643
Misses ? 14373
Partials ? 2707
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…e.py Move Store-related classes (StoreWrapper, StoreProcess, StoreTask, RoutingStoreBase, RoutingStoreLocal, RoutingStoreRDMA, etc.) to fastdeploy/cache_manager/routing_store.py and Buffer classes (RoutingHostBuffer, RoutingHostBufferView, RoutingSwapBuffer, RoutingSwapBufferView) to fastdeploy/cache_manager/routing_cache_manager.py. Update import paths in cache_transfer_manager.py, common_engine.py, and token_processor.py. Keep only actually-used imports in routing_indices_cache.py (RoutingHostBufferView, StoreWrapper). Pure file reorganization — zero functional change. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…edExpertsCapturer + RoutingCacheManager - Centralize routing_dtype/num_moe_layers/moe_top_k computation in RoutingReplayConfig (FDConfig.__init__), eliminating 4 duplicate computation sites - Delete v1 kernel (_save_routing_kernel) and all v1 fallback branches in moe.py and pre_and_post_process.py — single code path through v2 linear-indexed kernel - Rename RoutingReplayManager → RoutedExpertsCapturer (Worker-side, capture-only) with backward-compat alias - Remove request management (register/deregister/put_finished_batch), store dispatch (_put_request_to_store), and suspend methods from Worker - Create stateless RoutingCacheManager (Engine-side) for routing data aggregation and return-mode dispatch (response/local/rdma) - Migrate all callers: gpu_model_runner, metax_model_runner, common_engine, token_processor, prefix_cache_manager - Remove FD_SUSPEND_ROUTING_REPLAY env var from envs.py Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The rename from _init_routing_host_buffer to _init_routing_cache_manager in common_engine.py was not updated in engine.py:_stop_profile(), causing AttributeError on startup with routing replay enabled. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
|
||
| try: | ||
| from fastdeploy.cache_manager.cache_data import AuxBlockDataSpec | ||
| from fastdeploy.model_executor.layers.moe.routing_indices_cache import ( |
There was a problem hiding this comment.
dense model 有很好处理吗?
| kv_tile_ids_per_batch=self.share_inputs["kv_tile_ids_per_batch"], | ||
| kv_num_blocks_x_cpu=self.share_inputs["kv_num_blocks_x_cpu"], | ||
| routing_replay_table=routing_replay_table, | ||
| routing_replay_table=None, |
There was a problem hiding this comment.
routing_replay_table、gpu_routing_buffer 应该全部重新命名为 device_routing_buffer
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 AI Code Review |
2026-04-16 20:14 CST
📋 Review 摘要
PR 概述:R3(RoutingReplay)Phase 2 实现 — routing 数据跟随 KVCache block 生命周期(swap/storage),重构文件拆分、配置统一、删除 v1 遗留代码,修复 GLM num_moe_layers 计算 bug。
变更范围:cache_manager/、engine/、worker/、model_executor/layers/moe/、output/、entrypoints/openai/、config.py
影响面 Tag:KVCache Engine RL APIServer
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | routing_store.py:445 |
RoutingStoreLocal.put() 缺少创建子目录,paddle.save 将抛出 FileNotFoundError |
| 🔴 安全 | routing_store.py:195-196 |
使用 eval() 转换字符串为整数,存在任意代码执行风险 |
| 🟡 建议 | routing_cache_manager.py:159-161 |
split_request_id 使用 assert 做输入校验,python -O 模式下会被跳过 |
| 🟡 建议 | routing_cache_manager.py:276-283 |
RoutingCacheManager.close() 未关闭 _store_wrapper,可能泄漏 StoreProcess 子进程 |
🔴 Bug: RoutingStoreLocal.put() 缺少子目录创建 (routing_store.py:437-445)
__init__ 中只创建了根目录 self.local_store_dir,但 put() 方法写入的 file_path 位于 request_path 子目录下(如 ./routing_replay_output/{rollout_id}/layer_0.pdtensor)。该子目录从未被创建,paddle.save() 将直接抛出 FileNotFoundError。
# 当前代码(routing_store.py:437-445)
request_path = os.path.join(self.local_store_dir, rollout_id)
file_path = os.path.join(request_path, f"layer_{layer_id}.pdtensor")
# ...
paddle.save(routing_indices, file_path) # ❌ request_path 不存在建议在 paddle.save 前添加:
os.makedirs(request_path, exist_ok=True)🔴 安全: eval() 用于字符串转整数 (routing_store.py:195-196)
get_needed_clear_ids() 中使用 eval() 将从 rollout_id 中解析出的字符串转为整数。即使输入来自内部,eval() 可执行任意 Python 表达式,应使用 int() 替代。
# 当前代码
turn_id = eval(reversed_turn_id[::-1])
segment_id = eval(reversed_segment_id[::-1])
# 建议修复
turn_id = int(reversed_turn_id[::-1])
segment_id = int(reversed_segment_id[::-1])🟡 建议: split_request_id 使用 assert 做业务校验 (routing_cache_manager.py:159-161)
assert 语句在 python -O(优化模式)下会被编译器移除。对于运行时输入校验,应使用显式异常:
# 建议修复
if chat_type != "chatcmpl":
raise ValueError(
"Rollout Routing Replay only supports chatcmpl. "
"Please check request type and userid settings."
)🟡 建议: RoutingCacheManager.close() 未关闭 _store_wrapper (routing_cache_manager.py:276-283)
close() 方法仅清理了 host_view 和 host_buffer 的 SharedMemory,但未关闭 _store_wrapper(StoreWrapper 实例,内部管理着 StoreProcess 子进程和 multiprocessing.Manager)。虽然 StoreWrapper 注册了 atexit handler,但显式关闭更为可靠,避免进程泄漏:
def close(self):
if self._store_wrapper is not None:
self._store_wrapper.shutdown()
self._store_wrapper = None
if self.host_view is not None:
self.host_view.close()
self.host_view = None
if self.host_buffer is not None:
self.host_buffer.close()
self.host_buffer = None总体评价
本 PR 是一个大型且结构清晰的重构+功能增强,将 routing 数据与 KVCache block 生命周期绑定的架构设计合理,文件拆分和配置统一显著改善了代码可维护性。主要阻塞问题是 RoutingStoreLocal.put() 缺少目录创建(P0 Bug)和 eval() 安全隐患,修复成本很低。建议修复后合入。
| topk_ids_all = paddle.zeros([token_num_per_rank * tp_size, topk_ids.shape[1]], dtype=topk_ids.dtype) | ||
| paddle.distributed.all_gather(topk_ids_all, topk_ids, tp_group) | ||
| topk_ids = topk_ids_all[: batch_id_per_token.shape[0], :] | ||
| topk_ids = topk_ids_all[:token_num_per_rank, :] |
Motivation
FastDeploy 的 RoutingReplay(R3)模块在 Phase 1 实现了 GPU PrefixCache 兼容(GPU buffer 双缓冲 + CPU host_cache),但存在以下问题:
Phase 2 核心缺失:
_host_cache(Paddle tensor)与 CacheTransferManager 进程的 pinned memory 是两套独立内存,CTM 完全不感知 routing架构债务:
routing_indices_cache.py1152 行、13 个 class,混合 Worker 层(GPU buffer)、Engine 层(请求管理)、Store 层(外部存储)三层关注点routing_dtype、num_moe_layers、moe_top_k在 4 处各自重复计算(routing_indices_cache.py、common_engine.py、token_processor.py、prefix_cache_manager.py)--routing_*argparse 参数接收配置,与框架中SpeculativeConfig的 JSON 传参惯例不一致_host_cache),存在数据不一致风险Bug:
moe_layer_start_index(由first_k_dense_replace推导)在FDConfig.postprocess()之前被消费,导致 GLM-4.5num_moe_layers计算结果为 46(正确值 45)Modifications
Commit 1
[RL][Feature]:R3 Phase 2 核心实现 — routing 数据跟随 KVCache block 生命周期实现三进程共享的双缓冲架构:
[max_num_batched_tokens, L, K],Triton v2 kernel(线性索引)写入,替代 Phase 1 的 4 维索引方案routing_host_buffer:[num_gpu_blocks * block_size, L, K],Engine 在 profiling 后创建,Worker 和 TokenProcessor 惰性 attach,以slot_mapping寻址实现跨进程共享RoutingSwapBuffer([num_cpu_blocks * block_size, L, K]),SWAP2CPU/SWAP2GPU 时同步调用_swap_routing()完成 routing 数据随 KV block 的换入换出;KVCacheStorage 场景同步写入/读回send_first_token时附带 routing 数据,D 节点写入本地routing_host_buffer变更文件:
cache_data.py、cache_transfer_manager.py、prefix_cache_manager.py、common_engine.py、engine.py、request.py、resource_manager_v1.py、forward_meta.py、moe.py、routing_indices_cache.py、pre_and_post_process.py、token_processor.py、gpu_model_runner.pyCommit 2
[RL][Refactor]:文件拆分 — 从routing_indices_cache.py提取 Store/Buffer 类(零功能变更)将 1152 行的
routing_indices_cache.py按职责拆分至新文件:cache_manager/routing_store.py:StoreTask、StoreWrapper、StoreProcess、AsyncEventLoopThread、RoutingStoreBase/Local/RDMA、get_routing_storecache_manager/routing_cache_manager.py:RoutingHostBuffer、RoutingHostBufferView、RoutingSwapBuffer、RoutingSwapBufferViewrouting_indices_cache.py精简至 ~575 行,保留 Triton kernel +RoutingReplayManagercache_transfer_manager.py、common_engine.py、token_processor.py的 import 来源Commit 3
[RL][Refactor]:配置统一 + 删 v1 + 重命名 + 请求管理迁移配置统一(
config.py):RoutingReplayConfig新增postprocess(model_config)方法,集中计算num_moe_layers、moe_top_k、routing_dtype,消除 4 处重复计算--routing_*argparse 参数合并为 1 个--routing_replay_config(JSON),对齐SpeculativeConfig传参模式FD_SUSPEND_ROUTING_REPLAY环境变量(RL 更新期间clear_requests()+stop_flags=True已保证安全)删除 v1 遗留代码:
routing_indices_cache.py中的_save_routing_kernel(v1 Triton kernel)、update_host_cache、compute_slot_mapping(4 维版本)moe.py、pre_and_post_process.py中的 v1 fallback 分支Worker 侧重命名与精简:
RoutingReplayManager→RoutedExpertsCapturer(保留向后兼容 alias),语义对齐 vLLM 命名register_request、put_finished_batch、_get_routing_from_cache、clear_request、routing_batch_to_request等),Worker 仅保留 GPU buffer 管理和 GPU→SharedMemory scatterEngine 侧新增无状态
RoutingCacheManager(routing_cache_manager.py):block_table、seq_len完全由 Scheduler 管理response/p2pstore两种返回模式的数据源:均从 SharedMemoryrouting_host_buffergatheron_request_finished(),消除 Worker 侧put_finished_batch跨进程同步问题迁移调用方:
gpu_model_runner.py、metax_model_runner.py、common_engine.py、token_processor.py、prefix_cache_manager.pyCommit 4
[RL][BugFix]:修复engine.py中遗漏的方法重命名_init_routing_host_buffer重命名为_init_routing_cache_manager后,engine.py:_stop_profile()中的调用未同步更新,导致启用 routing replay 时启动报AttributeError。Commit 5-6(功能完善):Response 模式和 P2PStore 模式端到端打通
response模式:protocol.py新增routed_experts字段,serving_chat.py/serving_completion.py从RequestOutput读取并序列化为 base64 返回;token_processor.py完善 gather 时机与边界处理p2pstore模式:CTM 子进程启动时序调整(engine.py),确保RoutingSwapBuffer在 CTM 启动后正确初始化;修复 rank 0 独占创建RoutingSwapBuffer的竞态问题Commit 7-8(代码质量精简):
cache_transfer_manager.py:_swap_routing方向判断改为elif direction == "to_gpu": ... else: raise ValueError,消除隐式 fallthrough;--routing_*参数合并为 JSON;移除paddle依赖,改用纯 numpy 实现数据布局转换,避免非 GPU 进程的 CUDA 初始化开销routing_cache_manager.py:Buffer 初始值改为-1(利用无符号整数 wrap 语义,所有 dtype 正确填充哨兵值);变量命名rrc→routing_replay_configrouting_store.py:submit_put_task参数类型从paddle.Tensor改为np.ndarray,移除内部.numpy()转换forward_meta.py/gpu_model_runner.py:删除已被gpu_routing_buffer替代的死代码字段routing_replay_tabletoken_processor.py:将路由数据收集从_recycle_resources提取为独立方法_finalize_routing,在资源回收前调用(确保block_tables仍有效),兼容 PD 分离与非 PD 分离场景Commit 9
[BugFix]:修复 GLM-4.5num_moe_layers计算错误(46 → 45)RoutingReplayConfig.postprocess()在FDConfig.__init__内执行,早于FDConfig.postprocess(),导致 GLM 的moe_layer_start_index(由first_k_dense_replace=1推导)尚未赋值即被消费,结果为46-0=46。修复:将RoutingReplayConfig.postprocess(model_config)移入FDConfig.postprocess()中 GLM 字段统一之后调用,结果正确为46-1=45。变更文件汇总:
config.pyRoutingReplayConfig.postprocess()+FDConfig.postprocess()调用;修复 GLM num_moe_layersenvs.pyFD_SUSPEND_ROUTING_REPLAYcache_manager/routing_store.pycache_manager/routing_cache_manager.pyRoutingCacheManager)cache_manager/cache_transfer_manager.pycache_manager/prefix_cache_manager.pycache_manager/cache_data.pyengine/common_engine.pyRoutingCacheManager;RoutingHostBufferSharedMemory 初始化时序engine/engine.pyengine/request.pyengine/sched/resource_manager_v1.pymodel_executor/layers/moe/routing_indices_cache.pymodel_executor/layers/moe/moe.pymodel_executor/pre_and_post_process.pyput_finished_batch调用model_executor/forward_meta.pyrouting_replay_tableworker/gpu_model_runner.pyregister_request/clear_request等worker/metax_model_runner.pyoutput/token_processor.py_finalize_routing;改读 config;PD 场景适配entrypoints/openai/protocol.pyrouted_experts响应字段entrypoints/openai/serving_chat.pyentrypoints/openai/serving_completion.pyUsage or Command
Accuracy Tests
运行
tools/test_r3_swap.py端到端验证(GLM-4.5-Air-Fake, TP=4, 200 GPU blocks, 1 GiB CPU swap):RoutingSwapBuffer,Rank 1-3 不创建,全部 attachRoutingHostBuffer(22, 10, 8),expert range[0, 127],EOS token sentinel=255_swap_routing to_cpu(8次)+ SWAP2GPU 触发(6次)+_swap_routing to_gpu(1次)+ 前 516 token routing bit-exact matchChecklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.