Skip to content

Commit 1b212bd

Browse files
committed
fix(detoken,openai): unhang client on detoken init failure; map "error" to API error path
- detoken: on _add_new_group_req_index failure, set FINISHED_ERROR, push an empty-string sentinel into out_tokens_queue at finish_token_index, mark can_released_mark, and continue with the rest of the group instead of re-raising. Without this the http loop stays blocked (queue empty, no finish ever forwarded) and the shm req leaks until client disconnect. - openai: surface FINISHED_ERROR as a controlled error response. Non-stream chat / completions return HTTP 500; streaming chat / completions yield an SSE error event followed by [DONE] and stop. Previously "error" leaked into ChatCompletionResponseChoice / CompletionChoice whose finish_reason literals reject it, raising Pydantic ValidationError.
1 parent f184e8f commit 1b212bd

2 files changed

Lines changed: 59 additions & 5 deletions

File tree

lightllm/server/api_openai.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ async def _safe_stream_wrapper(stream_generator):
7575
return
7676

7777

78+
# OpenAI response schemas restrict finish_reason to {"stop","length","tool_calls"} (chat) and
79+
# {"stop","length"} (completions). The internal FINISHED_ERROR status surfaces as "error" — surface
80+
# that at the API boundary as a controlled error response rather than letting it leak into the
81+
# Pydantic models (which would raise ValidationError) or to the client as a silent stop.
82+
_INTERNAL_ERROR_MESSAGE = "Internal server error during request processing"
83+
_INTERNAL_ERROR_TYPE = "InternalServerError"
84+
85+
86+
def _sse_internal_error_payload() -> str:
87+
error = {"error": {"message": _INTERNAL_ERROR_MESSAGE, "type": _INTERNAL_ERROR_TYPE}}
88+
return json.dumps(error, ensure_ascii=False)
89+
90+
7891
def _serialize_sse_chunk(chunk, choice_nulls=(), response_nulls=()):
7992
"""Serialize a streaming chunk, explicitly including specified null fields."""
8093
d = chunk.model_dump(exclude_none=True)
@@ -355,6 +368,9 @@ async def chat_completions_impl(request: ChatCompletionRequest, raw_request: Req
355368
finish_reason_dict[sub_req_id] = finish_status.get_finish_reason()
356369
prompt_tokens_dict[sub_req_id] = metadata["prompt_tokens"]
357370
prompt_cache_len_dict[sub_req_id] = metadata.get("prompt_cache_len", 0)
371+
if any(r == "error" for r in finish_reason_dict.values()):
372+
logger.error(f"internal pipeline error during chat completion group_id={group_request_id}")
373+
return create_error_response(HTTPStatus.INTERNAL_SERVER_ERROR, _INTERNAL_ERROR_MESSAGE)
358374
choices = []
359375
sub_ids = list(final_output_dict.keys())[: request.n]
360376
for i in range(request.n):
@@ -473,6 +489,15 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
473489
delta = request_output
474490
current_finish_reason = finish_status.get_finish_reason()
475491

492+
if current_finish_reason == "error":
493+
logger.error(
494+
f"internal pipeline error during chat stream group_id={group_request_id} "
495+
f"sub_req_id={sub_req_id}"
496+
)
497+
yield f"data: {_sse_internal_error_payload()}\n\n"
498+
yield "data: [DONE]\n\n".encode("utf-8")
499+
return
500+
476501
# Emit the initial role-only chunk once per choice, as required by the
477502
# OpenAI SSE spec: role appears only in the first delta with content="".
478503
if not has_emitted_first_chunk[choice_index]:
@@ -882,6 +907,9 @@ async def process_single_prompt(prompt: Union[str, List[int]], prompt_index: int
882907
tasks = [asyncio.create_task(process_single_prompt(prompt, i)) for i, prompt in enumerate(prompts)]
883908

884909
results = await asyncio.gather(*tasks)
910+
if any(r.get("finish_reason") == "error" for r in results):
911+
logger.error("internal pipeline error during completion")
912+
return create_error_response(HTTPStatus.INTERNAL_SERVER_ERROR, _INTERNAL_ERROR_MESSAGE)
885913
return _build_completion_response(results, request, created_time, len(prompts) > 1)
886914

887915

@@ -916,6 +944,15 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
916944
if finish_status.is_finished():
917945
current_finish_reason = finish_status.get_finish_reason()
918946

947+
if current_finish_reason == "error":
948+
logger.error(
949+
f"internal pipeline error during completion stream group_id={group_request_id} "
950+
f"sub_req_id={sub_req_id}"
951+
)
952+
yield f"data: {_sse_internal_error_payload()}\n\n"
953+
yield "data: [DONE]\n\n"
954+
return
955+
919956
output_text = request_output
920957
if request.echo and metadata.get("is_first_token", False):
921958
prompt_str = prompt

lightllm/server/detokenization/manager.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ def _init_get_token_id_to_token_str(self):
4646
self.token_id_to_token = {token_id: token for token, token_id in self.tokenizer.get_vocab().items()}
4747
return
4848

49-
def _add_new_group_req_index(self, recv_obj: GroupReqIndexes):
49+
def _add_new_group_req_index(self, recv_obj: GroupReqIndexes) -> int:
5050
from lightllm.server.core.objs import FinishStatus
5151

52+
failed_count = 0
5253
for req_index in recv_obj.shm_req_indexes:
5354
req = self.shm_req_manager.get_req_obj_by_index(req_index)
5455
try:
@@ -68,10 +69,23 @@ def _add_new_group_req_index(self, recv_obj: GroupReqIndexes):
6869
decode_req.init_token_healing_prefix_str(self.token_id_to_token, self.tokenizer)
6970

7071
self.req_id_to_out[req.request_id] = decode_req
71-
except Exception as e:
72+
except Exception:
73+
# Init failed (shm link, tokenizer, decode-mode fix, …). Mark the req
74+
# finished with an error and push a sentinel into out_tokens_queue so the
75+
# http loop forwards a terminal status — otherwise the queue stays empty,
76+
# the client hangs until disconnect, and the shm slot leaks because
77+
# can_released_mark never gets set. Continue with the rest of the group.
78+
logger.exception(f"detokenization init failed for req_id {req.request_id}")
7279
req.finish_status.set_status(FinishStatus.FINISHED_ERROR)
73-
raise e
74-
return
80+
req.finish_token_index = req.input_len
81+
try:
82+
if not req.out_tokens_queue.is_full():
83+
req.out_tokens_queue.push("", req.input_len, False, 1)
84+
except Exception:
85+
logger.exception(f"failed to push error sentinel for req_id {req.request_id}")
86+
req.can_released_mark = True
87+
failed_count += 1
88+
return failed_count
7589

7690
def handle_loop(self):
7791
try:
@@ -83,9 +97,12 @@ def handle_loop(self):
8397
recv_obj: GroupReqIndexes = self.zmq_recv_socket.recv_pyobj(zmq.NOBLOCK)
8498
assert isinstance(recv_obj, GroupReqIndexes)
8599
try:
86-
self._add_new_group_req_index(recv_obj=recv_obj)
100+
failed_count = self._add_new_group_req_index(recv_obj=recv_obj)
87101
except Exception:
88102
logger.exception("add new group req index has exception")
103+
failed_count = len(recv_obj.shm_req_indexes)
104+
if failed_count:
105+
# Wake the http loop so it drains the error sentinel(s) we just pushed.
89106
self.pub_to_httpserver.send_pyobj(None, protocol=pickle.HIGHEST_PROTOCOL)
90107

91108
# 当队列中存在较多的请求时,将一次接受的数量上调

0 commit comments

Comments
 (0)