diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index aca282e0f70..b13ff3b1edc 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -1606,13 +1606,14 @@ def _control_abort_requests(self, control_req: ControlRequest): engine_recv_first_token_time=request.metrics.engine_recv_first_token_time if request.metrics else now, request_start_time=request.metrics.arrival_time if request.metrics else now, ) + eos_token_ids = getattr(request, "eos_token_ids", [0]) result = RequestOutput( request_id=req_id, finished=True, outputs=CompletionOutput( index=0, send_idx=len(partial_token_ids), - token_ids=[self.data_processor.eos_token_ids[0]], + token_ids=[eos_token_ids[0]], ), metrics=abort_metrics, error_code=200, @@ -1656,10 +1657,19 @@ def _wait_abort_complete(self, target_req_ids, stall_timeout=1): reset progress state if any, then continue monitoring """ target_set = set(target_req_ids) + target_set = target_set & (set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys())) prev_remaining_count = len(target_set) last_progress_time = time.time() remaining = target_set & self.resource_manager.get_reqs_in_aborting() while remaining: + alive_reqs = set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys()) + finished_reqs = target_set - alive_reqs + if finished_reqs: + self.llm_logger.info(f"abort targets already finished, skip: {finished_reqs}") + for req_id in finished_reqs: + self.resource_manager.waiting_abort_req_id_set.discard(req_id) + self.resource_manager.to_be_aborted_req_id_set.discard(req_id) + target_set -= finished_reqs remaining = target_set & self.resource_manager.get_reqs_in_aborting() if not remaining: self.llm_logger.info(f"all {len(target_set)} abort reqs cleaned") diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 45ec18aa1c0..8b758db1a62 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -281,7 +281,7 @@ def recycle_abort_task(self, request_id): self.stop_flags[request.idx] = True # 设置停止标志 del self.requests[request_id] del self.req_dict[request_id] - self.to_be_aborted_req_id_set.remove(request_id) + self.to_be_aborted_req_id_set.discard(request_id) self.update_metrics() def _trigger_abort(self, request_id, scheduled_reqs): @@ -293,7 +293,7 @@ def _trigger_abort(self, request_id, scheduled_reqs): abort_request.cached_block_num = 0 scheduled_reqs.append(self._prepare_abort_task(abort_request)) self.to_be_aborted_req_id_set.add(request_id) - self.waiting_abort_req_id_set.remove(request_id) + self.waiting_abort_req_id_set.discard(request_id) def _info_each_block(self): """ @@ -1528,6 +1528,8 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]): del self.requests[req_id] if req_id in self.req_dict: del self.req_dict[req_id] + self.waiting_abort_req_id_set.discard(req_id) + self.to_be_aborted_req_id_set.discard(req_id) # Do not block the main thread here # Write cache to storage if kvcache_storage_backend is enabled diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index f8cd70bca08..832eb6c7714 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -268,7 +268,7 @@ class ChatCompletionResponseChoice(BaseModel): logprobs: Optional[LogProbs] = None draft_logprobs: Optional[LogProbs] = None prompt_logprobs: Optional[PromptLogprobs] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] speculate_metrics: Optional[SpeculateMetrics] = None @@ -333,7 +333,7 @@ class ChatCompletionResponseStreamChoice(BaseModel): logprobs: Optional[LogProbs] = None draft_logprobs: Optional[LogProbs] = None prompt_logprobs: Optional[PromptLogprobs] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None arrival_time: Optional[float] = None speculate_metrics: Optional[SpeculateMetrics] = None @@ -369,7 +369,7 @@ class CompletionResponseChoice(BaseModel): draft_logprobs: Optional[CompletionLogprobs] = None prompt_logprobs: Optional[PromptLogprobs] = None reasoning_content: Optional[str] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None speculate_metrics: Optional[SpeculateMetrics] = None @@ -415,7 +415,7 @@ class CompletionResponseStreamChoice(BaseModel): prompt_tokens: Optional[str] = None completion_tokens: Optional[str] = None reasoning_content: Optional[str] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None speculate_metrics: Optional[SpeculateMetrics] = None diff --git a/tests/engine/test_common_engine.py b/tests/engine/test_common_engine.py index 551f93babd8..55d7b486109 100644 --- a/tests/engine/test_common_engine.py +++ b/tests/engine/test_common_engine.py @@ -3706,6 +3706,8 @@ def test_wait_abort_complete_progress(self): """_wait_abort_complete exits when background thread cleans up.""" eng = self._make_abort_engine() eng.resource_manager.waiting_abort_req_id_set = {"req-1_0"} + # Add the request to requests dict so it won't be filtered out + eng.resource_manager.requests = {"req-1_0": self._make_fake_request()} call_count = [0] @@ -3724,6 +3726,8 @@ def test_wait_abort_complete_force_cleanup_stuck_in_to_be_aborted(self): """Stall timeout triggers force cleanup for requests in to_be_aborted_req_id_set.""" eng = self._make_abort_engine() eng.resource_manager.to_be_aborted_req_id_set = {"req-1_0"} + # Add the request to requests dict so it won't be filtered out + eng.resource_manager.requests = {"req-1_0": self._make_fake_request()} def mock_recycle(req_id): eng.resource_manager.to_be_aborted_req_id_set.discard(req_id)