Skip to content

[RL][Feature] R3 Support CPU PrefixCache#7099

Closed
gongshaotian wants to merge 14 commits into
PaddlePaddle:release/2.5from
gongshaotian:vibe_r3_2.5
Closed

[RL][Feature] R3 Support CPU PrefixCache#7099
gongshaotian wants to merge 14 commits into
PaddlePaddle:release/2.5from
gongshaotian:vibe_r3_2.5

Conversation

@gongshaotian
Copy link
Copy Markdown
Collaborator

@gongshaotian gongshaotian commented Mar 31, 2026


Motivation

FastDeploy 的 RoutingReplay(R3)模块在 Phase 1 实现了 GPU PrefixCache 兼容(GPU buffer 双缓冲 + CPU host_cache),但存在以下问题:

Phase 2 核心缺失:

  • routing 数据不跟随 KVCache block 进行 GPU↔CPU swap 和 KVCacheStorage 读写,导致前缀命中 CPU swap 后 routing 数据丢失
  • Worker 侧旧 _host_cache(Paddle tensor)与 CacheTransferManager 进程的 pinned memory 是两套独立内存,CTM 完全不感知 routing
  • PD 分离场景下,P 节点捕获的 routing 数据无法传递给 D 节点

架构债务:

  • routing_indices_cache.py 1152 行、13 个 class,混合 Worker 层(GPU buffer)、Engine 层(请求管理)、Store 层(外部存储)三层关注点
  • routing_dtypenum_moe_layersmoe_top_k 在 4 处各自重复计算(routing_indices_cache.pycommon_engine.pytoken_processor.pyprefix_cache_manager.py
  • CTM 子进程通过 4 个独立 --routing_* argparse 参数接收配置,与框架中 SpeculativeConfig 的 JSON 传参惯例不一致
  • Response 模式和 P2PStore 模式数据源不一致(前者读 SharedMemory,后者读旧 _host_cache),存在数据不一致风险

Bug:

  • GLM 模型的 moe_layer_start_index(由 first_k_dense_replace 推导)在 FDConfig.postprocess() 之前被消费,导致 GLM-4.5 num_moe_layers 计算结果为 46(正确值 45)

Modifications

Commit 1 [RL][Feature]:R3 Phase 2 核心实现 — routing 数据跟随 KVCache block 生命周期

实现三进程共享的双缓冲架构:

  • GPU transient buffer[max_num_batched_tokens, L, K],Triton v2 kernel(线性索引)写入,替代 Phase 1 的 4 维索引方案
  • SharedMemory routing_host_buffer[num_gpu_blocks * block_size, L, K],Engine 在 profiling 后创建,Worker 和 TokenProcessor 惰性 attach,以 slot_mapping 寻址实现跨进程共享
  • CacheTransferManager routing 适配:CTM 增加 RoutingSwapBuffer[num_cpu_blocks * block_size, L, K]),SWAP2CPU/SWAP2GPU 时同步调用 _swap_routing() 完成 routing 数据随 KV block 的换入换出;KVCacheStorage 场景同步写入/读回
  • PD 分离适配:P 节点通过 send_first_token 时附带 routing 数据,D 节点写入本地 routing_host_buffer

变更文件:cache_data.pycache_transfer_manager.pyprefix_cache_manager.pycommon_engine.pyengine.pyrequest.pyresource_manager_v1.pyforward_meta.pymoe.pyrouting_indices_cache.pypre_and_post_process.pytoken_processor.pygpu_model_runner.py


Commit 2 [RL][Refactor]:文件拆分 — 从 routing_indices_cache.py 提取 Store/Buffer 类(零功能变更)

将 1152 行的 routing_indices_cache.py 按职责拆分至新文件:

  • 新建 cache_manager/routing_store.pyStoreTaskStoreWrapperStoreProcessAsyncEventLoopThreadRoutingStoreBase/Local/RDMAget_routing_store
  • 新建 cache_manager/routing_cache_manager.pyRoutingHostBufferRoutingHostBufferViewRoutingSwapBufferRoutingSwapBufferView
  • routing_indices_cache.py 精简至 ~575 行,保留 Triton kernel + RoutingReplayManager
  • 更新 cache_transfer_manager.pycommon_engine.pytoken_processor.py 的 import 来源

Commit 3 [RL][Refactor]:配置统一 + 删 v1 + 重命名 + 请求管理迁移

配置统一(config.py):

  • RoutingReplayConfig 新增 postprocess(model_config) 方法,集中计算 num_moe_layersmoe_top_krouting_dtype,消除 4 处重复计算
  • CTM 子进程启动参数:4 个独立 --routing_* argparse 参数合并为 1 个 --routing_replay_config(JSON),对齐 SpeculativeConfig 传参模式
  • 删除 FD_SUSPEND_ROUTING_REPLAY 环境变量(RL 更新期间 clear_requests() + stop_flags=True 已保证安全)

删除 v1 遗留代码:

  • 删除 routing_indices_cache.py 中的 _save_routing_kernel(v1 Triton kernel)、update_host_cachecompute_slot_mapping(4 维版本)
  • 删除 moe.pypre_and_post_process.py 中的 v1 fallback 分支

Worker 侧重命名与精简:

  • RoutingReplayManagerRoutedExpertsCapturer(保留向后兼容 alias),语义对齐 vLLM 命名
  • 删除 Worker 侧所有请求管理和 Store 提交逻辑(register_requestput_finished_batch_get_routing_from_cacheclear_requestrouting_batch_to_request 等),Worker 仅保留 GPU buffer 管理和 GPU→SharedMemory scatter

Engine 侧新增无状态 RoutingCacheManagerrouting_cache_manager.py):

  • 不维护请求映射表,请求的 block_tableseq_len 完全由 Scheduler 管理
  • 统一 response/p2pstore 两种返回模式的数据源:均从 SharedMemory routing_host_buffer gather
  • 统一触发路径:TokenProcessor 检测请求完成后调用 on_request_finished(),消除 Worker 侧 put_finished_batch 跨进程同步问题

迁移调用方:gpu_model_runner.pymetax_model_runner.pycommon_engine.pytoken_processor.pyprefix_cache_manager.py


Commit 4 [RL][BugFix]:修复 engine.py 中遗漏的方法重命名

_init_routing_host_buffer 重命名为 _init_routing_cache_manager 后,engine.py:_stop_profile() 中的调用未同步更新,导致启用 routing replay 时启动报 AttributeError


Commit 5-6(功能完善):Response 模式和 P2PStore 模式端到端打通

  • response 模式:protocol.py 新增 routed_experts 字段,serving_chat.py/serving_completion.pyRequestOutput 读取并序列化为 base64 返回;token_processor.py 完善 gather 时机与边界处理
  • p2pstore 模式:CTM 子进程启动时序调整(engine.py),确保 RoutingSwapBuffer 在 CTM 启动后正确初始化;修复 rank 0 独占创建 RoutingSwapBuffer 的竞态问题

Commit 7-8(代码质量精简):

  • cache_transfer_manager.py_swap_routing 方向判断改为 elif direction == "to_gpu": ... else: raise ValueError,消除隐式 fallthrough;--routing_* 参数合并为 JSON;移除 paddle 依赖,改用纯 numpy 实现数据布局转换,避免非 GPU 进程的 CUDA 初始化开销
  • routing_cache_manager.py:Buffer 初始值改为 -1(利用无符号整数 wrap 语义,所有 dtype 正确填充哨兵值);变量命名 rrcrouting_replay_config
  • routing_store.pysubmit_put_task 参数类型从 paddle.Tensor 改为 np.ndarray,移除内部 .numpy() 转换
  • forward_meta.py / gpu_model_runner.py:删除已被 gpu_routing_buffer 替代的死代码字段 routing_replay_table
  • token_processor.py:将路由数据收集从 _recycle_resources 提取为独立方法 _finalize_routing,在资源回收前调用(确保 block_tables 仍有效),兼容 PD 分离与非 PD 分离场景

Commit 9 [BugFix]:修复 GLM-4.5 num_moe_layers 计算错误(46 → 45)

RoutingReplayConfig.postprocess()FDConfig.__init__ 内执行,早于 FDConfig.postprocess(),导致 GLM 的 moe_layer_start_index(由 first_k_dense_replace=1 推导)尚未赋值即被消费,结果为 46-0=46。修复:将 RoutingReplayConfig.postprocess(model_config) 移入 FDConfig.postprocess() 中 GLM 字段统一之后调用,结果正确为 46-1=45


变更文件汇总:

文件 变更类型
config.py RoutingReplayConfig.postprocess() + FDConfig.postprocess() 调用;修复 GLM num_moe_layers
envs.py 删除 FD_SUSPEND_ROUTING_REPLAY
cache_manager/routing_store.py 新建(Store 全链路)
cache_manager/routing_cache_manager.py 新建(Buffer + 无状态 RoutingCacheManager
cache_manager/cache_transfer_manager.py routing swap/storage 适配;参数传递改 JSON;方向校验;移除 paddle 依赖
cache_manager/prefix_cache_manager.py 改读 config,CTM 启动参数改 JSON
cache_manager/cache_data.py 新增 routing 数据字段用于 PD 传输
engine/common_engine.py 创建 RoutingCacheManagerRoutingHostBuffer SharedMemory 初始化时序
engine/engine.py CTM 启动时序;修复方法重命名遗漏
engine/request.py 新增 routing 数据字段
engine/sched/resource_manager_v1.py PD 分离 routing 数据传递
model_executor/layers/moe/routing_indices_cache.py 删 v1 + 重命名 + 清理请求/store 方法;Triton v2 kernel
model_executor/layers/moe/moe.py 删 v1 fallback 分支
model_executor/pre_and_post_process.py 删 v1 fallback + 删 put_finished_batch 调用
model_executor/forward_meta.py 删除死代码字段 routing_replay_table
worker/gpu_model_runner.py register_request/clear_request
worker/metax_model_runner.py 同步清理 routing 相关调用
output/token_processor.py 提取 _finalize_routing;改读 config;PD 场景适配
entrypoints/openai/protocol.py 新增 routed_experts 响应字段
entrypoints/openai/serving_chat.py routing 数据序列化返回
entrypoints/openai/serving_completion.py routing 数据序列化返回

Usage or Command

# 启动 R3 服务(--routing-replay-config 参数格式不变,对外透明)
export ENABLE_V1_KVCACHE_SCHEDULER=1 CUDA_VISIBLE_DEVICES=4,5,6,7

python -m fastdeploy.entrypoints.openai.api_server \
    --model /path/to/GLM-4.5-Air \
    --tensor-parallel-size 4 \
    --enable-prefix-caching --enable-chunked-prefill \
    --swap-space 1 --num-gpu-blocks-override 200 \
    --routing-replay-config '{"enable_routing_replay":true,"routing_store_type":"response"}'

# 运行 swap 端到端测试(验证 routing 跟随 KVCache block 完整生命周期)
python tools/test_r3_swap.py --api-url http://0.0.0.0:8888 --log-dir log

Accuracy Tests

运行 tools/test_r3_swap.py 端到端验证(GLM-4.5-Air-Fake, TP=4, 200 GPU blocks, 1 GiB CPU swap):

Phase 验证项 结果
Phase 1 初始化 Rank 0 创建 RoutingSwapBuffer,Rank 1-3 不创建,全部 attach RoutingHostBuffer PASS
Phase 2 基础功能 单请求 routing 捕获:shape (22, 10, 8),expert range [0, 127],EOS token sentinel=255 PASS
Phase 3 Swap 跟随 SWAP2CPU 触发(48次)+ _swap_routing to_cpu(8次)+ SWAP2GPU 触发(6次)+ _swap_routing to_gpu(1次)+ 前 516 token routing bit-exact match PASS

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

@paddle-bot
Copy link
Copy Markdown

paddle-bot Bot commented Mar 31, 2026

Thanks for your contribution!

…e (swap/storage/PD)

Implement dual-buffer architecture for routing replay:
- GPU transient buffer [max_num_batched_tokens, L, K] with Triton v2 kernel
- SharedMemory routing_host_buffer for cross-process Engine/Worker/CTM sharing
- Lazy SharedMemory attach in Worker and TokenProcessor (Engine creates after profiling)
- CTM routing write/read for swap and storage backends
- PD disaggregation: P gathers routing via send_first_token, D writes to host buffer
- Local store persistence verified end-to-end

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Mar 31, 2026

Codecov Report

❌ Patch coverage is 21.09091% with 651 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (release/2.5@f3fb7e0). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/cache_manager/routing_store.py 0.00% 309 Missing ⚠️
fastdeploy/cache_manager/cache_transfer_manager.py 13.38% 109 Missing and 1 partial ⚠️
fastdeploy/cache_manager/routing_cache_manager.py 23.48% 101 Missing ⚠️
fastdeploy/output/token_processor.py 15.87% 52 Missing and 1 partial ⚠️
...model_executor/layers/moe/routing_indices_cache.py 72.05% 14 Missing and 5 partials ⚠️
fastdeploy/engine/common_engine.py 11.76% 14 Missing and 1 partial ⚠️
fastdeploy/engine/engine.py 0.00% 7 Missing and 3 partials ⚠️
fastdeploy/config.py 61.90% 6 Missing and 2 partials ⚠️
fastdeploy/entrypoints/openai/serving_chat.py 12.50% 6 Missing and 1 partial ⚠️
...astdeploy/entrypoints/openai/serving_completion.py 12.50% 6 Missing and 1 partial ⚠️
... and 5 more
Additional details and impacted files
@@              Coverage Diff               @@
##             release/2.5    #7099   +/-   ##
==============================================
  Coverage               ?   68.78%           
==============================================
  Files                  ?      392           
  Lines                  ?    54723           
  Branches               ?     8627           
==============================================
  Hits                   ?    37643           
  Misses                 ?    14373           
  Partials               ?     2707           
Flag Coverage Δ
GPU 68.78% <21.09%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

…e.py

Move Store-related classes (StoreWrapper, StoreProcess, StoreTask,
RoutingStoreBase, RoutingStoreLocal, RoutingStoreRDMA, etc.) to
fastdeploy/cache_manager/routing_store.py and Buffer classes
(RoutingHostBuffer, RoutingHostBufferView, RoutingSwapBuffer,
RoutingSwapBufferView) to fastdeploy/cache_manager/routing_cache_manager.py.

Update import paths in cache_transfer_manager.py, common_engine.py, and
token_processor.py. Keep only actually-used imports in
routing_indices_cache.py (RoutingHostBufferView, StoreWrapper).

Pure file reorganization — zero functional change.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…edExpertsCapturer + RoutingCacheManager

- Centralize routing_dtype/num_moe_layers/moe_top_k computation in RoutingReplayConfig
  (FDConfig.__init__), eliminating 4 duplicate computation sites
- Delete v1 kernel (_save_routing_kernel) and all v1 fallback branches in moe.py
  and pre_and_post_process.py — single code path through v2 linear-indexed kernel
- Rename RoutingReplayManager → RoutedExpertsCapturer (Worker-side, capture-only)
  with backward-compat alias
- Remove request management (register/deregister/put_finished_batch),
  store dispatch (_put_request_to_store), and suspend methods from Worker
- Create stateless RoutingCacheManager (Engine-side) for routing data aggregation
  and return-mode dispatch (response/local/rdma)
- Migrate all callers: gpu_model_runner, metax_model_runner, common_engine,
  token_processor, prefix_cache_manager
- Remove FD_SUSPEND_ROUTING_REPLAY env var from envs.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
gongshaotian and others added 2 commits April 1, 2026 19:44
The rename from _init_routing_host_buffer to _init_routing_cache_manager
in common_engine.py was not updated in engine.py:_stop_profile(), causing
AttributeError on startup with routing replay enabled.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

@gongshaotian gongshaotian changed the title [RL][Feature] R3 Phase 2: routing data follows KVCache block lifecycl… [RL][Feature] R3 Support CPUCache Apr 9, 2026
@gongshaotian gongshaotian changed the title [RL][Feature] R3 Support CPUCache [RL][Feature] R3 Support CPU PrefixCache Apr 9, 2026

try:
from fastdeploy.cache_manager.cache_data import AuxBlockDataSpec
from fastdeploy.model_executor.layers.moe.routing_indices_cache import (
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dense model 有很好处理吗?

Comment thread fastdeploy/cache_manager/cache_transfer_manager.py
Comment thread fastdeploy/cache_manager/cache_transfer_manager.py Outdated
Comment thread fastdeploy/worker/gpu_model_runner.py Outdated
kv_tile_ids_per_batch=self.share_inputs["kv_tile_ids_per_batch"],
kv_num_blocks_x_cpu=self.share_inputs["kv_num_blocks_x_cpu"],
routing_replay_table=routing_replay_table,
routing_replay_table=None,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

routing_replay_table、gpu_routing_buffer 应该全部重新命名为 device_routing_buffer

Comment thread fastdeploy/worker/gpu_model_runner.py
Comment thread fastdeploy/engine/sched/resource_manager_v1.py
Comment thread fastdeploy/engine/sched/resource_manager_v1.py
Comment thread fastdeploy/entrypoints/openai/serving_chat.py
Comment thread fastdeploy/entrypoints/openai/serving_completion.py
Comment thread fastdeploy/output/token_processor.py
PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

@gongshaotian gongshaotian marked this pull request as ready for review April 12, 2026 10:58
PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

Copy link
Copy Markdown

@PaddlePaddle-bot PaddlePaddle-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Code Review | 2026-04-16 20:14 CST

📋 Review 摘要

PR 概述:R3(RoutingReplay)Phase 2 实现 — routing 数据跟随 KVCache block 生命周期(swap/storage),重构文件拆分、配置统一、删除 v1 遗留代码,修复 GLM num_moe_layers 计算 bug。
变更范围cache_manager/engine/worker/model_executor/layers/moe/output/entrypoints/openai/config.py
影响面 TagKVCache Engine RL APIServer

问题

级别 文件 概述
🔴 Bug routing_store.py:445 RoutingStoreLocal.put() 缺少创建子目录,paddle.save 将抛出 FileNotFoundError
🔴 安全 routing_store.py:195-196 使用 eval() 转换字符串为整数,存在任意代码执行风险
🟡 建议 routing_cache_manager.py:159-161 split_request_id 使用 assert 做输入校验,python -O 模式下会被跳过
🟡 建议 routing_cache_manager.py:276-283 RoutingCacheManager.close() 未关闭 _store_wrapper,可能泄漏 StoreProcess 子进程

🔴 Bug: RoutingStoreLocal.put() 缺少子目录创建 (routing_store.py:437-445)

__init__ 中只创建了根目录 self.local_store_dir,但 put() 方法写入的 file_path 位于 request_path 子目录下(如 ./routing_replay_output/{rollout_id}/layer_0.pdtensor)。该子目录从未被创建,paddle.save() 将直接抛出 FileNotFoundError

# 当前代码(routing_store.py:437-445)
request_path = os.path.join(self.local_store_dir, rollout_id)
file_path = os.path.join(request_path, f"layer_{layer_id}.pdtensor")
# ...
paddle.save(routing_indices, file_path)  # ❌ request_path 不存在

建议在 paddle.save 前添加:

os.makedirs(request_path, exist_ok=True)

🔴 安全: eval() 用于字符串转整数 (routing_store.py:195-196)

get_needed_clear_ids() 中使用 eval() 将从 rollout_id 中解析出的字符串转为整数。即使输入来自内部,eval() 可执行任意 Python 表达式,应使用 int() 替代。

# 当前代码
turn_id = eval(reversed_turn_id[::-1])
segment_id = eval(reversed_segment_id[::-1])

# 建议修复
turn_id = int(reversed_turn_id[::-1])
segment_id = int(reversed_segment_id[::-1])

🟡 建议: split_request_id 使用 assert 做业务校验 (routing_cache_manager.py:159-161)

assert 语句在 python -O(优化模式)下会被编译器移除。对于运行时输入校验,应使用显式异常:

# 建议修复
if chat_type != "chatcmpl":
    raise ValueError(
        "Rollout Routing Replay only supports chatcmpl. "
        "Please check request type and userid settings."
    )

🟡 建议: RoutingCacheManager.close() 未关闭 _store_wrapper (routing_cache_manager.py:276-283)

close() 方法仅清理了 host_viewhost_buffer 的 SharedMemory,但未关闭 _store_wrapperStoreWrapper 实例,内部管理着 StoreProcess 子进程和 multiprocessing.Manager)。虽然 StoreWrapper 注册了 atexit handler,但显式关闭更为可靠,避免进程泄漏:

def close(self):
    if self._store_wrapper is not None:
        self._store_wrapper.shutdown()
        self._store_wrapper = None
    if self.host_view is not None:
        self.host_view.close()
        self.host_view = None
    if self.host_buffer is not None:
        self.host_buffer.close()
        self.host_buffer = None

总体评价

本 PR 是一个大型且结构清晰的重构+功能增强,将 routing 数据与 KVCache block 生命周期绑定的架构设计合理,文件拆分和配置统一显著改善了代码可维护性。主要阻塞问题是 RoutingStoreLocal.put() 缺少目录创建(P0 Bug)和 eval() 安全隐患,修复成本很低。建议修复后合入。

topk_ids_all = paddle.zeros([token_num_per_rank * tp_size, topk_ids.shape[1]], dtype=topk_ids.dtype)
paddle.distributed.all_gather(topk_ids_all, topk_ids, tp_group)
topk_ids = topk_ids_all[: batch_id_per_token.shape[0], :]
topk_ids = topk_ids_all[:token_num_per_rank, :]
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里应该拿所有token

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants