Skip to content

Commit d7fd4a0

Browse files
committed
fix(detoken,openai): unhang client on detoken init failure; map "error" to API error path
- detoken: on _add_new_group_req_index failure, set FINISHED_ERROR, push an empty-string sentinel into out_tokens_queue at finish_token_index, mark can_released_mark, and continue with the rest of the group instead of re-raising. Without this the http loop stays blocked (queue empty, no finish ever forwarded) and the shm req leaks until client disconnect. - openai: surface FINISHED_ERROR as a controlled error response. Non-stream chat / completions return HTTP 500; streaming chat / completions yield an SSE error event followed by [DONE] and stop. Previously "error" leaked into ChatCompletionResponseChoice / CompletionChoice whose finish_reason literals reject it, raising Pydantic ValidationError.
1 parent 5ef655a commit d7fd4a0

2 files changed

Lines changed: 59 additions & 5 deletions

File tree

lightllm/server/api_openai.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,19 @@ async def _safe_stream_wrapper(stream_generator):
7070
yield f"data: {error_data}\n\n"
7171

7272

73+
# OpenAI response schemas restrict finish_reason to {"stop","length","tool_calls"} (chat) and
74+
# {"stop","length"} (completions). The internal FINISHED_ERROR status surfaces as "error" — surface
75+
# that at the API boundary as a controlled error response rather than letting it leak into the
76+
# Pydantic models (which would raise ValidationError) or to the client as a silent stop.
77+
_INTERNAL_ERROR_MESSAGE = "Internal server error during request processing"
78+
_INTERNAL_ERROR_TYPE = "InternalServerError"
79+
80+
81+
def _sse_internal_error_payload() -> str:
82+
error = {"error": {"message": _INTERNAL_ERROR_MESSAGE, "type": _INTERNAL_ERROR_TYPE}}
83+
return json.dumps(error, ensure_ascii=False)
84+
85+
7386
def _serialize_sse_chunk(chunk, choice_nulls=(), response_nulls=()):
7487
"""Serialize a streaming chunk, explicitly including specified null fields."""
7588
d = chunk.model_dump(exclude_none=True)
@@ -350,6 +363,9 @@ async def chat_completions_impl(request: ChatCompletionRequest, raw_request: Req
350363
finish_reason_dict[sub_req_id] = finish_status.get_finish_reason()
351364
prompt_tokens_dict[sub_req_id] = metadata["prompt_tokens"]
352365
prompt_cache_len_dict[sub_req_id] = metadata.get("prompt_cache_len", 0)
366+
if any(r == "error" for r in finish_reason_dict.values()):
367+
logger.error(f"internal pipeline error during chat completion group_id={group_request_id}")
368+
return create_error_response(HTTPStatus.INTERNAL_SERVER_ERROR, _INTERNAL_ERROR_MESSAGE)
353369
choices = []
354370
sub_ids = list(final_output_dict.keys())[: request.n]
355371
for i in range(request.n):
@@ -468,6 +484,15 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
468484
delta = request_output
469485
current_finish_reason = finish_status.get_finish_reason()
470486

487+
if current_finish_reason == "error":
488+
logger.error(
489+
f"internal pipeline error during chat stream group_id={group_request_id} "
490+
f"sub_req_id={sub_req_id}"
491+
)
492+
yield f"data: {_sse_internal_error_payload()}\n\n"
493+
yield "data: [DONE]\n\n".encode("utf-8")
494+
return
495+
471496
# Emit the initial role-only chunk once per choice, as required by the
472497
# OpenAI SSE spec: role appears only in the first delta with content="".
473498
if not has_emitted_first_chunk[choice_index]:
@@ -877,6 +902,9 @@ async def process_single_prompt(prompt: Union[str, List[int]], prompt_index: int
877902
tasks = [asyncio.create_task(process_single_prompt(prompt, i)) for i, prompt in enumerate(prompts)]
878903

879904
results = await asyncio.gather(*tasks)
905+
if any(r.get("finish_reason") == "error" for r in results):
906+
logger.error("internal pipeline error during completion")
907+
return create_error_response(HTTPStatus.INTERNAL_SERVER_ERROR, _INTERNAL_ERROR_MESSAGE)
880908
return _build_completion_response(results, request, created_time, len(prompts) > 1)
881909

882910

@@ -911,6 +939,15 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
911939
if finish_status.is_finished():
912940
current_finish_reason = finish_status.get_finish_reason()
913941

942+
if current_finish_reason == "error":
943+
logger.error(
944+
f"internal pipeline error during completion stream group_id={group_request_id} "
945+
f"sub_req_id={sub_req_id}"
946+
)
947+
yield f"data: {_sse_internal_error_payload()}\n\n"
948+
yield "data: [DONE]\n\n"
949+
return
950+
914951
output_text = request_output
915952
if request.echo and metadata.get("is_first_token", False):
916953
prompt_str = prompt

lightllm/server/detokenization/manager.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ def _init_get_token_id_to_token_str(self):
4646
self.token_id_to_token = {token_id: token for token, token_id in self.tokenizer.get_vocab().items()}
4747
return
4848

49-
def _add_new_group_req_index(self, recv_obj: GroupReqIndexes):
49+
def _add_new_group_req_index(self, recv_obj: GroupReqIndexes) -> int:
5050
from lightllm.server.core.objs import FinishStatus
5151

52+
failed_count = 0
5253
for req_index in recv_obj.shm_req_indexes:
5354
req = self.shm_req_manager.get_req_obj_by_index(req_index)
5455
try:
@@ -68,10 +69,23 @@ def _add_new_group_req_index(self, recv_obj: GroupReqIndexes):
6869
decode_req.init_token_healing_prefix_str(self.token_id_to_token, self.tokenizer)
6970

7071
self.req_id_to_out[req.request_id] = decode_req
71-
except Exception as e:
72+
except Exception:
73+
# Init failed (shm link, tokenizer, decode-mode fix, …). Mark the req
74+
# finished with an error and push a sentinel into out_tokens_queue so the
75+
# http loop forwards a terminal status — otherwise the queue stays empty,
76+
# the client hangs until disconnect, and the shm slot leaks because
77+
# can_released_mark never gets set. Continue with the rest of the group.
78+
logger.exception(f"detokenization init failed for req_id {req.request_id}")
7279
req.finish_status.set_status(FinishStatus.FINISHED_ERROR)
73-
raise e
74-
return
80+
req.finish_token_index = req.input_len
81+
try:
82+
if not req.out_tokens_queue.is_full():
83+
req.out_tokens_queue.push("", req.input_len, False, 1)
84+
except Exception:
85+
logger.exception(f"failed to push error sentinel for req_id {req.request_id}")
86+
req.can_released_mark = True
87+
failed_count += 1
88+
return failed_count
7589

7690
def handle_loop(self):
7791
try:
@@ -83,9 +97,12 @@ def handle_loop(self):
8397
recv_obj: GroupReqIndexes = self.zmq_recv_socket.recv_pyobj(zmq.NOBLOCK)
8498
assert isinstance(recv_obj, GroupReqIndexes)
8599
try:
86-
self._add_new_group_req_index(recv_obj=recv_obj)
100+
failed_count = self._add_new_group_req_index(recv_obj=recv_obj)
87101
except Exception:
88102
logger.exception("add new group req index has exception")
103+
failed_count = len(recv_obj.shm_req_indexes)
104+
if failed_count:
105+
# Wake the http loop so it drains the error sentinel(s) we just pushed.
89106
self.pub_to_httpserver.send_pyobj(None, protocol=pickle.HIGHEST_PROTOCOL)
90107

91108
# 当队列中存在较多的请求时,将一次接受的数量上调

0 commit comments

Comments
 (0)