From 66dbd77fc27cf2377e497b002b3af3cd27d5fb95 Mon Sep 17 00:00:00 2001 From: wenlei07 <1522419171@qq.com> Date: Sat, 16 May 2026 13:11:51 +0000 Subject: [PATCH] fix refact abort --- fastdeploy/engine/common_engine.py | 103 +++++++++++++++++++- fastdeploy/splitwise/splitwise_connector.py | 35 +++++++ 2 files changed, 135 insertions(+), 3 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index ab2ce1dc275..aee62f3ebdc 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -1341,6 +1341,22 @@ def _insert_zmq_task_to_scheduler(self): "Request is aborted since LLM Engine is paused.", worker_pid=worker_pid, ) + # PD ghost prevention: notify decode side to recycle its + # scheduler entry, otherwise it would sit there as a ghost + # since prefill will never deliver any first token. + if ( + self.cfg.scheduler_config.splitwise_role == "prefill" + and getattr(request, "disaggregate_info", None) + and self.split_connector is not None + ): + try: + self.split_connector.send_drop_signal( + request.request_id, request.disaggregate_info + ) + except Exception as e: + self.llm_logger.warning( + f"Failed to send drop signal for {request.request_id}: {e}" + ) continue except Exception as e: self.llm_logger.error(f"Receive request error: {e}, {traceback.format_exc()!s}") @@ -1493,15 +1509,47 @@ def _control_pause(self, control_request: ControlRequest): def _wait_inflight_drained(self): """ Wait until resource_manager.requests is completely empty. - No timeout — abort pipeline will complete. - Logs a warning every 30 seconds while waiting to help diagnose potential hangs. + Logs a warning and remove scheduler-only request every 30 seconds while waiting to help diagnose potential hangs. """ start_time = time.monotonic() next_warn_time = start_time + 30 + GHOST_REAP_AFTER = 30.0 while self.resource_manager.requests or self.scheduler.requests: now = time.monotonic() + late_ids = list( + set(self.resource_manager.requests.keys()) + - self.resource_manager.waiting_abort_req_id_set + - self.resource_manager.to_be_aborted_req_id_set + ) + if late_ids: + self.resource_manager.add_abort_req_ids(late_ids) + self.llm_logger.info(f"Pause drain: late-arrived requests added to abort set: {late_ids}") + + if now - start_time >= GHOST_REAP_AFTER: + scheduler_only_ids = list( + set(self.scheduler.requests.keys()) - set(self.resource_manager.requests.keys()) + ) + if scheduler_only_ids: + ghost_outputs = [ + RequestOutput( + request_id=req_id, + finished=True, + error_code=499, + error_msg=(f"forced cleanup after {GHOST_REAP_AFTER}s"), + ) + for req_id in scheduler_only_ids + ] + self.scheduler.put_results(ghost_outputs) + self.llm_logger.warning( + f"Pause drain timeout: reaped {len(scheduler_only_ids)} " + f"scheduler-only ghost(s) after {GHOST_REAP_AFTER}s: " + f"{scheduler_only_ids}" + ) + # Reset to avoid re-reaping on the next tick + start_time = now + if now >= next_warn_time: self.llm_logger.warning( "Still waiting for inflight requests to drain, " @@ -1977,6 +2025,31 @@ def _fetch_requests(): items = self.engine_worker_queue.get_disaggregated_tasks() for item in items: + msg_type = item[0] + + # PD pause race: P drops a request via paused gate and notifies us + # to recycle our scheduler entry (otherwise it becomes a ghost that + # blocks pause/abort drain forever). Synthesize a finished + # RequestOutput so it walks the normal put_results -> _recycle path + # and the client gets a 499 error response. + if msg_type == "decode_drop": + drop_outputs = [ + RequestOutput( + request_id=req_id, + finished=True, + error_code=499, + error_msg="Aborted: prefill dropped this request (paused gate)", + ) + for req_id in item[1] + ] + if drop_outputs: + self.scheduler.put_results(drop_outputs) + self.llm_logger.info( + "Decode recycled scheduler ghost(s) via P-side drop signal: " + f"{[r.request_id for r in drop_outputs]}" + ) + continue + tasks = item[1] if isinstance(tasks[0], Request): self.llm_logger.debug( @@ -2041,9 +2114,17 @@ def _process_prefilled_requests(): nonlocal prefilled_request_ouputs ready_request_outputs = [] waiting_request_outputs = [] + ghost_request_outputs = [] for req_output in prefilled_request_ouputs: - if hasattr(self.scheduler, "has_request") and not self.scheduler.has_request(req_output.request_id): + req_id = req_output.request_id + if hasattr(self.scheduler, "has_request") and not self.scheduler.has_request(req_id): + if ( + req_id in self.resource_manager.waiting_abort_req_id_set + or req_id in self.resource_manager.to_be_aborted_req_id_set + ): + ghost_request_outputs.append(req_output) + continue # ensure the api_server and scheduler in decode have # received the request sent by the client waiting_request_outputs.append(req_output) @@ -2054,6 +2135,22 @@ def _process_prefilled_requests(): self.llm_logger.debug(f"there are enough resource for prefilled request: {req_output.request_id}") prefilled_request_ouputs = waiting_request_outputs + + for req_output in ghost_request_outputs: + req_id = req_output.request_id + self.llm_logger.warning( + f"Pause drain: reaping prefilled-output ghost {req_id} " + "(scheduler never registered, marked for abort -- breaks deadlock)" + ) + try: + self.resource_manager.pre_recycle_resource(req_id) + except Exception as e: + self.llm_logger.warning(f"pre_recycle_resource({req_id}) failed: {e}") + self.resource_manager.waiting_abort_req_id_set.discard(req_id) + self.resource_manager.to_be_aborted_req_id_set.discard(req_id) + if req_id in self.token_processor.tokens_counter: + del self.token_processor.tokens_counter[req_id] + if self.cfg.splitwise_version == "v1": # decode return first token to client self.scheduler.put_results(ready_request_outputs) diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index 9b1da85417e..8d2091c7d31 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -234,6 +234,27 @@ def send_first_token(self, prefill_msg, tasks_list): ) self._send_message(addr, "decode", tasks_list) + def send_drop_signal(self, request_id: str, disaggregate_info: dict): + """ + Notify the decode side that this prefill request has been dropped + (e.g. paused gate rejected it on P). The decode side should recycle + its scheduler entry for this request_id, otherwise it would sit + there forever as a ghost and pause/abort drain would hang. + """ + if not disaggregate_info: + return + decode_ip = disaggregate_info.get("decode_ip") + decode_port = disaggregate_info.get("decode_connector_port") + if not decode_ip or not decode_port: + self.logger.warning( + f"send_drop_signal: missing decode_ip/decode_connector_port in " + f"disaggregate_info for {request_id}; skip" + ) + return + addr = f"{decode_ip}:{decode_port}" + self.logger.info(f"send_drop_signal: addr={addr}, request_id={request_id}") + self._send_message(addr, "drop", {"request_id": request_id}) + def check_decode_allocated(self, task): """Check whether the requests have been allocated resources in decode.""" self.logger.debug(f"check_decode_allocated: {task.request_id}") @@ -380,6 +401,8 @@ def _process_message(self, frames: List[bytes]): self._handle_prefill(payload) elif msg_type == "decode": self._handle_decode(payload) + elif msg_type == "drop": + self._handle_drop(payload) elif msg_type == "cache_sync": for task in payload: self.logger.info(f"_process_message: cache_sync task: {task}") @@ -410,3 +433,15 @@ def _handle_decode(self, payload): for task in payload: tasks.append(RequestOutput.from_dict(task)) self.engine_worker_queue.put_disaggregated_tasks(("decode", tasks)) + + def _handle_drop(self, payload): + """ + Handle drop signal from prefill: forward to engine worker queue so the + decode engine main loop can recycle the corresponding scheduler entry. + """ + request_id = payload.get("request_id") if isinstance(payload, dict) else None + if not request_id: + self.logger.warning(f"_handle_drop: invalid payload {payload}") + return + self.logger.info(f"_handle_drop: request_id={request_id}") + self.engine_worker_queue.put_disaggregated_tasks(("decode_drop", [request_id]))