Skip to content

Commit 1d82c5b

Browse files
authored
fix refact abort (#7837)
1 parent 4353cdf commit 1d82c5b

2 files changed

Lines changed: 135 additions & 3 deletions

File tree

fastdeploy/engine/common_engine.py

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1342,6 +1342,22 @@ def _insert_zmq_task_to_scheduler(self):
13421342
"Request is aborted since LLM Engine is paused.",
13431343
worker_pid=worker_pid,
13441344
)
1345+
# PD ghost prevention: notify decode side to recycle its
1346+
# scheduler entry, otherwise it would sit there as a ghost
1347+
# since prefill will never deliver any first token.
1348+
if (
1349+
self.cfg.scheduler_config.splitwise_role == "prefill"
1350+
and getattr(request, "disaggregate_info", None)
1351+
and self.split_connector is not None
1352+
):
1353+
try:
1354+
self.split_connector.send_drop_signal(
1355+
request.request_id, request.disaggregate_info
1356+
)
1357+
except Exception as e:
1358+
self.llm_logger.warning(
1359+
f"Failed to send drop signal for {request.request_id}: {e}"
1360+
)
13451361
continue
13461362
except Exception as e:
13471363
self.llm_logger.error(f"Receive request error: {e}, {traceback.format_exc()!s}")
@@ -1494,15 +1510,47 @@ def _control_pause(self, control_request: ControlRequest):
14941510
def _wait_inflight_drained(self):
14951511
"""
14961512
Wait until resource_manager.requests is completely empty.
1497-
No timeout — abort pipeline will complete.
1498-
Logs a warning every 30 seconds while waiting to help diagnose potential hangs.
1513+
Logs a warning and remove scheduler-only request every 30 seconds while waiting to help diagnose potential hangs.
14991514
"""
15001515
start_time = time.monotonic()
15011516
next_warn_time = start_time + 30
1517+
GHOST_REAP_AFTER = 30.0
15021518

15031519
while self.resource_manager.requests or self.scheduler.requests:
15041520
now = time.monotonic()
15051521

1522+
late_ids = list(
1523+
set(self.resource_manager.requests.keys())
1524+
- self.resource_manager.waiting_abort_req_id_set
1525+
- self.resource_manager.to_be_aborted_req_id_set
1526+
)
1527+
if late_ids:
1528+
self.resource_manager.add_abort_req_ids(late_ids)
1529+
self.llm_logger.info(f"Pause drain: late-arrived requests added to abort set: {late_ids}")
1530+
1531+
if now - start_time >= GHOST_REAP_AFTER:
1532+
scheduler_only_ids = list(
1533+
set(self.scheduler.requests.keys()) - set(self.resource_manager.requests.keys())
1534+
)
1535+
if scheduler_only_ids:
1536+
ghost_outputs = [
1537+
RequestOutput(
1538+
request_id=req_id,
1539+
finished=True,
1540+
error_code=499,
1541+
error_msg=(f"forced cleanup after {GHOST_REAP_AFTER}s"),
1542+
)
1543+
for req_id in scheduler_only_ids
1544+
]
1545+
self.scheduler.put_results(ghost_outputs)
1546+
self.llm_logger.warning(
1547+
f"Pause drain timeout: reaped {len(scheduler_only_ids)} "
1548+
f"scheduler-only ghost(s) after {GHOST_REAP_AFTER}s: "
1549+
f"{scheduler_only_ids}"
1550+
)
1551+
# Reset to avoid re-reaping on the next tick
1552+
start_time = now
1553+
15061554
if now >= next_warn_time:
15071555
self.llm_logger.warning(
15081556
"Still waiting for inflight requests to drain, "
@@ -1978,6 +2026,31 @@ def _fetch_requests():
19782026

19792027
items = self.engine_worker_queue.get_disaggregated_tasks()
19802028
for item in items:
2029+
msg_type = item[0]
2030+
2031+
# PD pause race: P drops a request via paused gate and notifies us
2032+
# to recycle our scheduler entry (otherwise it becomes a ghost that
2033+
# blocks pause/abort drain forever). Synthesize a finished
2034+
# RequestOutput so it walks the normal put_results -> _recycle path
2035+
# and the client gets a 499 error response.
2036+
if msg_type == "decode_drop":
2037+
drop_outputs = [
2038+
RequestOutput(
2039+
request_id=req_id,
2040+
finished=True,
2041+
error_code=499,
2042+
error_msg="Aborted: prefill dropped this request (paused gate)",
2043+
)
2044+
for req_id in item[1]
2045+
]
2046+
if drop_outputs:
2047+
self.scheduler.put_results(drop_outputs)
2048+
self.llm_logger.info(
2049+
"Decode recycled scheduler ghost(s) via P-side drop signal: "
2050+
f"{[r.request_id for r in drop_outputs]}"
2051+
)
2052+
continue
2053+
19812054
tasks = item[1]
19822055
if isinstance(tasks[0], Request):
19832056
self.llm_logger.debug(
@@ -2042,9 +2115,17 @@ def _process_prefilled_requests():
20422115
nonlocal prefilled_request_ouputs
20432116
ready_request_outputs = []
20442117
waiting_request_outputs = []
2118+
ghost_request_outputs = []
20452119

20462120
for req_output in prefilled_request_ouputs:
2047-
if hasattr(self.scheduler, "has_request") and not self.scheduler.has_request(req_output.request_id):
2121+
req_id = req_output.request_id
2122+
if hasattr(self.scheduler, "has_request") and not self.scheduler.has_request(req_id):
2123+
if (
2124+
req_id in self.resource_manager.waiting_abort_req_id_set
2125+
or req_id in self.resource_manager.to_be_aborted_req_id_set
2126+
):
2127+
ghost_request_outputs.append(req_output)
2128+
continue
20482129
# ensure the api_server and scheduler in decode have
20492130
# received the request sent by the client
20502131
waiting_request_outputs.append(req_output)
@@ -2055,6 +2136,22 @@ def _process_prefilled_requests():
20552136
self.llm_logger.debug(f"there are enough resource for prefilled request: {req_output.request_id}")
20562137

20572138
prefilled_request_ouputs = waiting_request_outputs
2139+
2140+
for req_output in ghost_request_outputs:
2141+
req_id = req_output.request_id
2142+
self.llm_logger.warning(
2143+
f"Pause drain: reaping prefilled-output ghost {req_id} "
2144+
"(scheduler never registered, marked for abort -- breaks deadlock)"
2145+
)
2146+
try:
2147+
self.resource_manager.pre_recycle_resource(req_id)
2148+
except Exception as e:
2149+
self.llm_logger.warning(f"pre_recycle_resource({req_id}) failed: {e}")
2150+
self.resource_manager.waiting_abort_req_id_set.discard(req_id)
2151+
self.resource_manager.to_be_aborted_req_id_set.discard(req_id)
2152+
if req_id in self.token_processor.tokens_counter:
2153+
del self.token_processor.tokens_counter[req_id]
2154+
20582155
if self.cfg.splitwise_version == "v1":
20592156
# decode return first token to client
20602157
self.scheduler.put_results(ready_request_outputs)

fastdeploy/splitwise/splitwise_connector.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,27 @@ def send_first_token(self, prefill_msg, tasks_list):
234234
)
235235
self._send_message(addr, "decode", tasks_list)
236236

237+
def send_drop_signal(self, request_id: str, disaggregate_info: dict):
238+
"""
239+
Notify the decode side that this prefill request has been dropped
240+
(e.g. paused gate rejected it on P). The decode side should recycle
241+
its scheduler entry for this request_id, otherwise it would sit
242+
there forever as a ghost and pause/abort drain would hang.
243+
"""
244+
if not disaggregate_info:
245+
return
246+
decode_ip = disaggregate_info.get("decode_ip")
247+
decode_port = disaggregate_info.get("decode_connector_port")
248+
if not decode_ip or not decode_port:
249+
self.logger.warning(
250+
f"send_drop_signal: missing decode_ip/decode_connector_port in "
251+
f"disaggregate_info for {request_id}; skip"
252+
)
253+
return
254+
addr = f"{decode_ip}:{decode_port}"
255+
self.logger.info(f"send_drop_signal: addr={addr}, request_id={request_id}")
256+
self._send_message(addr, "drop", {"request_id": request_id})
257+
237258
def check_decode_allocated(self, task):
238259
"""Check whether the requests have been allocated resources in decode."""
239260
self.logger.debug(f"check_decode_allocated: {task.request_id}")
@@ -380,6 +401,8 @@ def _process_message(self, frames: List[bytes]):
380401
self._handle_prefill(payload)
381402
elif msg_type == "decode":
382403
self._handle_decode(payload)
404+
elif msg_type == "drop":
405+
self._handle_drop(payload)
383406
elif msg_type == "cache_sync":
384407
for task in payload:
385408
self.logger.info(f"_process_message: cache_sync task: {task}")
@@ -410,3 +433,15 @@ def _handle_decode(self, payload):
410433
for task in payload:
411434
tasks.append(RequestOutput.from_dict(task))
412435
self.engine_worker_queue.put_disaggregated_tasks(("decode", tasks))
436+
437+
def _handle_drop(self, payload):
438+
"""
439+
Handle drop signal from prefill: forward to engine worker queue so the
440+
decode engine main loop can recycle the corresponding scheduler entry.
441+
"""
442+
request_id = payload.get("request_id") if isinstance(payload, dict) else None
443+
if not request_id:
444+
self.logger.warning(f"_handle_drop: invalid payload {payload}")
445+
return
446+
self.logger.info(f"_handle_drop: request_id={request_id}")
447+
self.engine_worker_queue.put_disaggregated_tasks(("decode_drop", [request_id]))

0 commit comments

Comments
 (0)