Skip to content

Fix NIXL PD abort handling#1329

Open
WuSiYu wants to merge 1 commit into
ModelTC:mainfrom
WuSiYu:nixl_fix_1
Open

Fix NIXL PD abort handling#1329
WuSiYu wants to merge 1 commit into
ModelTC:mainfrom
WuSiYu:nixl_fix_1

Conversation

@WuSiYu
Copy link
Copy Markdown
Collaborator

@WuSiYu WuSiYu commented Jun 2, 2026

背景

本 PR 修复 NIXL P/D 模式下 abort 在高并发、请求尚未注册、KV 传输未完成等场景中的处理问题,避免 abort 丢失、传输任务长期等待 timeout、以及 abort 命令过量堆积。

主要改动

lightllm/server/httpserver/pd_loop.py

  • 新增 pending_aborts,用于记录请求注册到 manager.req_id_to_out_inf 之前收到的 abort。
  • 当 abort 到达但 manager.abort() 找不到请求时,将请求 ID 放入 pending_aborts
  • _pd_process_generate() 启动前检查 pending_aborts,如果请求已被提前 abort,则直接跳过。
  • 将 abort 重试从原来的短时间重试扩展为更长时间重试,降低早期 abort 丢失概率。

lightllm/server/httpserver/manager.py

  • generate() 新增 pending_aborts 参数。
  • 在 NIXL prefill 节点上传 prompt ids 前检查请求是否已被 abort。
  • 在等待 decode node info 后再次检查 abort,避免请求在等待期间被取消后继续申请资源。
  • 在 SHM request index 等待和申请成功后检查 abort。
  • 如果 SHM index 申请成功后才发现 abort,会释放刚申请到的 index,避免资源泄漏。

lightllm/server/httpserver_for_pd_master/manager.py

  • 新增 _wait_event_or_disconnect()
  • 在等待 prompt ids、KV ready 等事件时,周期性检查客户端是否已断连。
  • 客户端断连后更早触发 abort,不再等完整 60s timeout。

lightllm/server/router/model_infer/infer_batch.py

  • InferReq 新增 nixl_abort_last_send_time
  • 用于限制 NIXL abort 命令发送频率,避免推理循环中反复发送大量 NIXLAbortReq

lightllm/server/router/model_infer/mode_backend/pd_nixl/decode_node_impl/decode_impl.py

  • decode 节点在请求 abort 且仍有未完成 NIXL 传输任务时,按 1 秒间隔发送 NIXLAbortReq
  • 所有传输任务完成后不再重复发送 abort。

lightllm/server/router/model_infer/mode_backend/pd_nixl/prefill_node_impl/prefill_impl.py

  • prefill 节点也增加对 abort 状态的检查。
  • 请求 abort 且存在未完成 NIXL 传输任务时,向 KV move manager 发送 NIXLAbortReq
  • 与 decode 节点保持一致,同样使用 1 秒限速。

lightllm/server/router/model_infer/mode_backend/pd_nixl/prefill_node_impl/prefill_kv_move_manager.py

  • task_dispatcher_loop() 支持接收 NIXLAbortReq
  • 根据 NIXLAbortReq.device_id 将 abort 命令路由到对应 transfer process。
  • 普通 NIXLChunckedTransTask 的分发逻辑保持不变。

lightllm/server/router/model_infer/mode_backend/pd_nixl/prefill_node_impl/prefill_trans_process.py

  • prefill transfer process 增加 _abort() 处理逻辑。
  • recv_task_loop() 支持识别 NIXLAbortReq
  • abort 时从 waiting_dict 中移除对应 request 的传输任务,并放入 failed_queue
  • _check_tasks_time_out() 改为锁内批量扫描和移除超时任务,避免 pop 后再放回带来的竞态窗口。

lightllm/server/router/model_infer/mode_backend/pd_nixl/decode_node_impl/decode_trans_process.py

  • _abort() 改为锁内批量扫描并移除对应 request 的任务。
  • _check_tasks_time_out() 改为锁内批量移除超时任务。
  • update_task_status_loop() 改为维护 in_flight 列表,批量轮询多个 NIXL transfer 状态,避免单个慢 transfer 阻塞后续任务状态更新。

预期效果

  • 降低请求注册前 abort 丢失的概率。
  • 减少 abort 后 NIXL transfer task 长时间占用资源的问题。
  • 避免高并发 abort 时 info_queue 被大量重复 abort 命令淹没。
  • 客户端断连后 PD master 能更快通知 P/D 节点 abort。
  • decode 侧 NIXL transfer 状态更新更加公平,减少慢任务导致的级联等待。

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request improves the robustness of request aborts and client disconnection handling in LightLLM's PD mode. It introduces a pending_aborts tracking mechanism, rate-limits abort signals, refactors the task status update loop to handle tasks in batch, and adds periodic client disconnection checks during event waits. Feedback on these changes highlights a potential leak of allocated SHM indexes when a multi-sequence request is aborted, as well as a potential AttributeError in the new disconnection helper if the request object is None.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines 411 to 425
while alloc_req_index is None:
if pending_aborts is not None and group_request_id in pending_aborts:
pending_aborts.discard(group_request_id)
raise Exception(f"req_id {group_request_id} aborted during SHM allocation wait")

await asyncio.sleep(sleep_time)
sleep_time *= 1.1
sleep_time = min(1, sleep_time)

alloc_req_index = await self.shm_req_manager.async_alloc_req_index()
if pending_aborts is not None and group_request_id in pending_aborts:
pending_aborts.discard(group_request_id)
await self.shm_req_manager.async_release_req_index(alloc_req_index)
raise Exception(f"req_id {group_request_id} aborted after SHM allocation")
alloced_req_indexes.append(alloc_req_index)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

generate 方法中,当 sampling_params.n > 1 时,会循环多次申请 SHM 索引并存入 alloced_req_indexes。如果在后续的循环中触发了提前 abort 检查(第 412-414 行或第 421-424 行),直接抛出异常会导致之前已经成功申请并存放在 alloced_req_indexes 中的 SHM 索引发生泄漏,因为它们没有被释放。

建议在抛出异常前,遍历并释放 alloced_req_indexes 中已申请的所有索引。

                while alloc_req_index is None:
                    if pending_aborts is not None and group_request_id in pending_aborts:
                        pending_aborts.discard(group_request_id)
                        for idx in alloced_req_indexes:
                            await self.shm_req_manager.async_release_req_index(idx)
                        raise Exception(f"req_id {group_request_id} aborted during SHM allocation wait")

                    await asyncio.sleep(sleep_time)
                    sleep_time *= 1.1
                    sleep_time = min(1, sleep_time)

                    alloc_req_index = await self.shm_req_manager.async_alloc_req_index()
                if pending_aborts is not None and group_request_id in pending_aborts:
                    pending_aborts.discard(group_request_id)
                    await self.shm_req_manager.async_release_req_index(alloc_req_index)
                    for idx in alloced_req_indexes:
                        await self.shm_req_manager.async_release_req_index(idx)
                    raise Exception(f"req_id {group_request_id} aborted after SHM allocation")
                alloced_req_indexes.append(alloc_req_index)

Comment on lines +305 to +309
if await request.is_disconnected():
raise ClientDisconnected(
group_request_id=group_request_id,
reason="client disconnected while waiting",
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

_wait_event_or_disconnect 中,直接调用 await request.is_disconnected() 可能会在 requestNone 时引发 AttributeError。虽然在正常流程中 request 应该存在,但为了防御性编程,建议增加 request is not None 的判断。

Suggested change
if await request.is_disconnected():
raise ClientDisconnected(
group_request_id=group_request_id,
reason="client disconnected while waiting",
)
if request is not None and await request.is_disconnected():
raise ClientDisconnected(
group_request_id=group_request_id,
reason="client disconnected while waiting",
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant