Fix NIXL PD abort handling#1329
Conversation
There was a problem hiding this comment.
Code Review
This pull request improves the robustness of request aborts and client disconnection handling in LightLLM's PD mode. It introduces a pending_aborts tracking mechanism, rate-limits abort signals, refactors the task status update loop to handle tasks in batch, and adds periodic client disconnection checks during event waits. Feedback on these changes highlights a potential leak of allocated SHM indexes when a multi-sequence request is aborted, as well as a potential AttributeError in the new disconnection helper if the request object is None.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| while alloc_req_index is None: | ||
| if pending_aborts is not None and group_request_id in pending_aborts: | ||
| pending_aborts.discard(group_request_id) | ||
| raise Exception(f"req_id {group_request_id} aborted during SHM allocation wait") | ||
|
|
||
| await asyncio.sleep(sleep_time) | ||
| sleep_time *= 1.1 | ||
| sleep_time = min(1, sleep_time) | ||
|
|
||
| alloc_req_index = await self.shm_req_manager.async_alloc_req_index() | ||
| if pending_aborts is not None and group_request_id in pending_aborts: | ||
| pending_aborts.discard(group_request_id) | ||
| await self.shm_req_manager.async_release_req_index(alloc_req_index) | ||
| raise Exception(f"req_id {group_request_id} aborted after SHM allocation") | ||
| alloced_req_indexes.append(alloc_req_index) |
There was a problem hiding this comment.
在 generate 方法中,当 sampling_params.n > 1 时,会循环多次申请 SHM 索引并存入 alloced_req_indexes。如果在后续的循环中触发了提前 abort 检查(第 412-414 行或第 421-424 行),直接抛出异常会导致之前已经成功申请并存放在 alloced_req_indexes 中的 SHM 索引发生泄漏,因为它们没有被释放。
建议在抛出异常前,遍历并释放 alloced_req_indexes 中已申请的所有索引。
while alloc_req_index is None:
if pending_aborts is not None and group_request_id in pending_aborts:
pending_aborts.discard(group_request_id)
for idx in alloced_req_indexes:
await self.shm_req_manager.async_release_req_index(idx)
raise Exception(f"req_id {group_request_id} aborted during SHM allocation wait")
await asyncio.sleep(sleep_time)
sleep_time *= 1.1
sleep_time = min(1, sleep_time)
alloc_req_index = await self.shm_req_manager.async_alloc_req_index()
if pending_aborts is not None and group_request_id in pending_aborts:
pending_aborts.discard(group_request_id)
await self.shm_req_manager.async_release_req_index(alloc_req_index)
for idx in alloced_req_indexes:
await self.shm_req_manager.async_release_req_index(idx)
raise Exception(f"req_id {group_request_id} aborted after SHM allocation")
alloced_req_indexes.append(alloc_req_index)| if await request.is_disconnected(): | ||
| raise ClientDisconnected( | ||
| group_request_id=group_request_id, | ||
| reason="client disconnected while waiting", | ||
| ) |
There was a problem hiding this comment.
在 _wait_event_or_disconnect 中,直接调用 await request.is_disconnected() 可能会在 request 为 None 时引发 AttributeError。虽然在正常流程中 request 应该存在,但为了防御性编程,建议增加 request is not None 的判断。
| if await request.is_disconnected(): | |
| raise ClientDisconnected( | |
| group_request_id=group_request_id, | |
| reason="client disconnected while waiting", | |
| ) | |
| if request is not None and await request.is_disconnected(): | |
| raise ClientDisconnected( | |
| group_request_id=group_request_id, | |
| reason="client disconnected while waiting", | |
| ) |
背景
本 PR 修复 NIXL P/D 模式下 abort 在高并发、请求尚未注册、KV 传输未完成等场景中的处理问题,避免 abort 丢失、传输任务长期等待 timeout、以及 abort 命令过量堆积。
主要改动
lightllm/server/httpserver/pd_loop.pypending_aborts,用于记录请求注册到manager.req_id_to_out_inf之前收到的 abort。manager.abort()找不到请求时,将请求 ID 放入pending_aborts。_pd_process_generate()启动前检查pending_aborts,如果请求已被提前 abort,则直接跳过。lightllm/server/httpserver/manager.pygenerate()新增pending_aborts参数。lightllm/server/httpserver_for_pd_master/manager.py_wait_event_or_disconnect()。lightllm/server/router/model_infer/infer_batch.pyInferReq新增nixl_abort_last_send_time。NIXLAbortReq。lightllm/server/router/model_infer/mode_backend/pd_nixl/decode_node_impl/decode_impl.pyNIXLAbortReq。lightllm/server/router/model_infer/mode_backend/pd_nixl/prefill_node_impl/prefill_impl.pyNIXLAbortReq。lightllm/server/router/model_infer/mode_backend/pd_nixl/prefill_node_impl/prefill_kv_move_manager.pytask_dispatcher_loop()支持接收NIXLAbortReq。NIXLAbortReq.device_id将 abort 命令路由到对应 transfer process。NIXLChunckedTransTask的分发逻辑保持不变。lightllm/server/router/model_infer/mode_backend/pd_nixl/prefill_node_impl/prefill_trans_process.py_abort()处理逻辑。recv_task_loop()支持识别NIXLAbortReq。waiting_dict中移除对应 request 的传输任务,并放入failed_queue。_check_tasks_time_out()改为锁内批量扫描和移除超时任务,避免 pop 后再放回带来的竞态窗口。lightllm/server/router/model_infer/mode_backend/pd_nixl/decode_node_impl/decode_trans_process.py_abort()改为锁内批量扫描并移除对应 request 的任务。_check_tasks_time_out()改为锁内批量移除超时任务。update_task_status_loop()改为维护in_flight列表,批量轮询多个 NIXL transfer 状态,避免单个慢 transfer 阻塞后续任务状态更新。预期效果
info_queue被大量重复 abort 命令淹没。