Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1656,10 +1656,19 @@ def _wait_abort_complete(self, target_req_ids, stall_timeout=1):
reset progress state if any, then continue monitoring
"""
target_set = set(target_req_ids)
target_set = target_set & (set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys()))
prev_remaining_count = len(target_set)
last_progress_time = time.time()
remaining = target_set & self.resource_manager.get_reqs_in_aborting()
Comment on lines 1658 to 1662
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接遍历并构造 set(self.resource_manager.requests.keys()) / set(self.scheduler.requests.keys()) 没有持有对应的锁(ResourceManagerV1.lock、LocalScheduler.mutex)。这两个 dict 在运行时会被其他线程修改(scheduler/local_scheduler.py 明确用 mutex 保护 requests),可能触发 RuntimeError: dictionary changed size during iteration,导致 abort 流程再次 500。建议通过线程安全的接口获取 request_id 快照(例如在 ResourceManager/Scheduler 中新增 get_request_ids 方法并在内部加锁),或在此处用各自的锁保护读取。

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个问题看下是否存在风险 @qwes5s5

while remaining:
alive_reqs = set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys())
finished_reqs = target_set - alive_reqs
if finished_reqs:
self.llm_logger.info(f"abort targets already finished, skip: {finished_reqs}")
for req_id in finished_reqs:
self.resource_manager.waiting_abort_req_id_set.discard(req_id)
self.resource_manager.to_be_aborted_req_id_set.discard(req_id)
target_set -= finished_reqs
remaining = target_set & self.resource_manager.get_reqs_in_aborting()
if not remaining:
self.llm_logger.info(f"all {len(target_set)} abort reqs cleaned")
Expand Down
6 changes: 4 additions & 2 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def recycle_abort_task(self, request_id):
self.stop_flags[request.idx] = True # 设置停止标志
del self.requests[request_id]
del self.req_dict[request_id]
self.to_be_aborted_req_id_set.remove(request_id)
self.to_be_aborted_req_id_set.discard(request_id)
self.update_metrics()

def _trigger_abort(self, request_id, scheduled_reqs):
Expand All @@ -293,7 +293,7 @@ def _trigger_abort(self, request_id, scheduled_reqs):
abort_request.cached_block_num = 0
scheduled_reqs.append(self._prepare_abort_task(abort_request))
self.to_be_aborted_req_id_set.add(request_id)
self.waiting_abort_req_id_set.remove(request_id)
self.waiting_abort_req_id_set.discard(request_id)

def _info_each_block(self):
"""
Expand Down Expand Up @@ -1528,6 +1528,8 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]):
del self.requests[req_id]
if req_id in self.req_dict:
del self.req_dict[req_id]
self.waiting_abort_req_id_set.discard(req_id)
self.to_be_aborted_req_id_set.discard(req_id)

# Do not block the main thread here
# Write cache to storage if kvcache_storage_backend is enabled
Expand Down
8 changes: 4 additions & 4 deletions fastdeploy/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class ChatCompletionResponseChoice(BaseModel):
logprobs: Optional[LogProbs] = None
draft_logprobs: Optional[LogProbs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]]
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]]
speculate_metrics: Optional[SpeculateMetrics] = None
Comment on lines 268 to 272
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR 描述的 Modifications 仅提到了 finish_reason 的变更,但本 PR 还修改了 abort 等待/清理逻辑(common_engine/resource_manager)并补充了单测。建议同步更新 PR 描述,明确这些额外改动的原因与影响范围,便于审阅与回溯。

Copilot uses AI. Check for mistakes.


Expand Down Expand Up @@ -333,7 +333,7 @@ class ChatCompletionResponseStreamChoice(BaseModel):
logprobs: Optional[LogProbs] = None
draft_logprobs: Optional[LogProbs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None
arrival_time: Optional[float] = None
speculate_metrics: Optional[SpeculateMetrics] = None

Expand Down Expand Up @@ -369,7 +369,7 @@ class CompletionResponseChoice(BaseModel):
draft_logprobs: Optional[CompletionLogprobs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
reasoning_content: Optional[str] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
speculate_metrics: Optional[SpeculateMetrics] = None

Expand Down Expand Up @@ -415,7 +415,7 @@ class CompletionResponseStreamChoice(BaseModel):
prompt_tokens: Optional[str] = None
completion_tokens: Optional[str] = None
reasoning_content: Optional[str] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
speculate_metrics: Optional[SpeculateMetrics] = None

Expand Down
4 changes: 4 additions & 0 deletions tests/engine/test_common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3706,6 +3706,8 @@ def test_wait_abort_complete_progress(self):
"""_wait_abort_complete exits when background thread cleans up."""
eng = self._make_abort_engine()
eng.resource_manager.waiting_abort_req_id_set = {"req-1_0"}
# Add the request to requests dict so it won't be filtered out
eng.resource_manager.requests = {"req-1_0": self._make_fake_request()}

call_count = [0]

Expand All @@ -3724,6 +3726,8 @@ def test_wait_abort_complete_force_cleanup_stuck_in_to_be_aborted(self):
"""Stall timeout triggers force cleanup for requests in to_be_aborted_req_id_set."""
eng = self._make_abort_engine()
eng.resource_manager.to_be_aborted_req_id_set = {"req-1_0"}
# Add the request to requests dict so it won't be filtered out
eng.resource_manager.requests = {"req-1_0": self._make_fake_request()}

def mock_recycle(req_id):
eng.resource_manager.to_be_aborted_req_id_set.discard(req_id)
Expand Down
Loading