Skip to content

Commit 0e288b5

Browse files
kevincheng2claude
andcommitted
[KVCache][Engine] fix has_pending_work and move swap/evict to worker layer
## Motivation BatchRequest.__len__ 混入了 prefetch/swap/evict 任务数量,导致 engine 调度 逻辑(判断是否有待处理工作)出现误判;同时 swap/evict 提交散落在 gpu_model_runner 和 resource_manager 中,职责不清晰。 ## Modifications - engine/request.py: 新增 has_pending_work 属性,__len__ 恢复只计 requests 数量;has_pending_work 同时感知 prefetch/swap/evict 任务 - engine/common_engine.py: 用 has_pending_work 替换 len(batch_request) > 0 判断,逻辑更准确 - worker/worker_process.py: 将 submit_swap_tasks 调用移至 worker 层处理, 处理后清空 metadata 避免重复提交 - worker/gpu_model_runner.py: 移除重复的 submit_swap_tasks 调用 - engine/sched/resource_manager_v1.py: 调整 check_and_add_pending_backup / issue_pending_backup / dispatch_pending_prefetches 执行顺序,去掉对 len(batch_request) 的依赖 - cache_manager/v1/cache_manager.py: 恢复 matched_nodes 按 device/host 分类 逻辑(之前被误注释) ## Usage or Command ```bash # 启动服务(单机) cd baidu/FastDeploy bash run.sh ``` Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 271d85c commit 0e288b5

6 files changed

Lines changed: 31 additions & 24 deletions

File tree

fastdeploy/cache_manager/v1/cache_manager.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -516,11 +516,10 @@ def match_prefix(
516516
# Split matched_nodes into device blocks and host blocks
517517
if self.enable_host_cache:
518518
for node in matched_nodes:
519-
pass
520-
# if node.is_on_device():
521-
# result.device_nodes.append(node)
522-
# elif node.is_on_host():
523-
# result.host_nodes.append(node)
519+
if node.is_on_device():
520+
result.device_nodes.append(node)
521+
elif node.is_on_host():
522+
result.host_nodes.append(node)
524523
else:
525524
result.device_nodes = matched_nodes
526525

fastdeploy/engine/common_engine.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,7 +1112,7 @@ def _fetch_request():
11121112
batch_request, error_tasks = self.resource_manager.schedule()
11131113

11141114
# 3. Send to engine
1115-
if len(batch_request) > 0:
1115+
if batch_request.has_pending_work:
11161116
if self.cfg.scheduler_config.splitwise_role == "decode":
11171117
for task in batch_request:
11181118
if task.task_type == RequestType.PREEMPTED:
@@ -1191,7 +1191,7 @@ def _fetch_request():
11911191
continue
11921192
self._send_error_response(request_id, failed)
11931193

1194-
if len(batch_request) <= 0 and not error_tasks:
1194+
if not batch_request.has_pending_work and not error_tasks:
11951195
time.sleep(0.005)
11961196

11971197
except RuntimeError as e:

fastdeploy/engine/request.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -697,10 +697,17 @@ def __getitem__(self, index):
697697
return self.requests[index]
698698

699699
def __len__(self):
700-
count = len(self.requests)
701-
if self.storage_prefetch_tasks:
702-
count += len(self.storage_prefetch_tasks)
703-
return count
700+
return len(self.requests)
701+
702+
@property
703+
def has_pending_work(self) -> bool:
704+
"""Whether there is any pending work (inference requests, prefetch/swap/evict tasks)."""
705+
return (
706+
len(self.requests) > 0
707+
or bool(self.storage_prefetch_tasks)
708+
or bool(self.cache_swap_metadata)
709+
or bool(self.cache_evict_metadata)
710+
)
704711

705712
def append(self, batch_request: "BatchRequest"):
706713
self.requests.extend(batch_request.requests)

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,17 +1276,16 @@ def _allocate_decode_and_extend():
12761276
# Issue pending backup tasks to batch_request
12771277
# This handles write_through_selective policy by attaching backup tasks
12781278
# to the batch request, which will be processed by the worker
1279-
if self.enable_cache_manager_v1 and len(batch_request) > 0:
1279+
if self.enable_cache_manager_v1:
1280+
self.cache_manager.check_and_add_pending_backup()
1281+
12801282
evict_metadata = self.cache_manager.issue_pending_backup_to_batch_request()
12811283
if evict_metadata:
12821284
batch_request.append_evict_metadata([evict_metadata])
12831285

1284-
if self.enable_cache_manager_v1:
1285-
self.cache_manager.check_and_add_pending_backup()
1286-
1287-
# Dispatch any pending storage prefetch tasks via batch_request
1288-
if self.config.cache_config.kvcache_storage_backend:
1289-
self._dispatch_pending_prefetches(batch_request)
1286+
# Dispatch any pending storage prefetch tasks via batch_request
1287+
if self.config.cache_config.kvcache_storage_backend:
1288+
self._dispatch_pending_prefetches(batch_request)
12901289

12911290
return batch_request, error_reqs
12921291

fastdeploy/worker/gpu_model_runner.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -804,12 +804,6 @@ def insert_tasks_v1(self, req_dicts: BatchRequest, num_running_requests: int = N
804804
if self.enable_mm:
805805
# Sort by idx to ensure attention mask offsets are filled in order during mm prefill
806806
req_dicts = sorted(req_dicts, key=lambda r: r.idx)
807-
if self.enable_cache_manager_v1:
808-
# submit_swap_tasks handles:
809-
# 1. Waiting for pending evict handlers before submitting new evict
810-
# 2. write_back policy: waiting for evict to complete before submitting swap-in
811-
# 3. Adding handlers to pending lists appropriately
812-
self.cache_controller.submit_swap_tasks(req_dicts.cache_evict_metadata, req_dicts.cache_swap_metadata)
813807

814808
for i in range(req_len):
815809
request = req_dicts[i]

fastdeploy/worker/worker_process.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,14 @@ def event_loop_normal(self) -> None:
688688
self._handle_prefetch_tasks(batch_request.storage_prefetch_tasks)
689689
batch_request.storage_prefetch_tasks = None
690690

691+
# Handle swap/evict tasks from batch_request
692+
if batch_request.cache_evict_metadata or batch_request.cache_swap_metadata:
693+
self.worker.model_runner.cache_controller.submit_swap_tasks(
694+
batch_request.cache_evict_metadata, batch_request.cache_swap_metadata
695+
)
696+
batch_request.cache_evict_metadata = None
697+
batch_request.cache_swap_metadata = None
698+
691699
if len(control_reqs) > 0:
692700
logger.info(f"Rank: {self.local_rank} received {len(control_reqs)} control request.")
693701
for control_req in control_reqs:

0 commit comments

Comments
 (0)