Skip to content

Commit 51afd75

Browse files
committed
fix: preserve response incomplete terminal semantics
Treat ordinary OpenAI Responses response.incomplete terminals as delivered incomplete outcomes instead of upstream failures. Keep error-coded incomplete terminals, stale continuation anchors, stream truncation, and client disconnects on the existing failure or retry paths. Align public /v1 streaming, websocket, non-stream collection, chat completions accounting, request logs, and docs around the shared classifier.
1 parent a5338f4 commit 51afd75

12 files changed

Lines changed: 583 additions & 94 deletions

File tree

docs-site/src/content/docs/clients/openai-compatible.mdx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ The `/v1` surface is compatibility over Codex routing, not a separate OpenAI eng
127127

128128
`POST /v1/responses` lifts system and developer input-message text into top-level `instructions` before dispatching to Codex-compatible work. Streaming `/v1/responses` and `/v1/chat/completions` return early upstream terminal errors as the first public SSE event or data chunk, without synthetic assistant/success prefixes. Non-streaming failures remain OpenAI-shaped JSON errors.
129129

130+
Ordinary OpenAI Responses incomplete terminals are preserved. If an upstream returns `response.incomplete` with `status: "incomplete"` for output limits or content filtering and no embedded error, `/v1/responses` streaming and websocket clients receive that incomplete terminal instead of a synthetic failure. Error-coded incomplete terminals, such as context overflow or stale continuation anchors, are still returned through the sanitized failure path.
131+
130132
Accepted `POST /v1/responses` tool definitions are narrow. OpenAI Responses remote MCP tool definitions are rejected before dispatch.
131133

132134
## Request compression behavior

docs-site/src/content/docs/operators/request-logs.mdx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ The table shows the latest matching rows, with a hard page size limit.
118118

119119
Use status and errors together. A completed row with token usage usually means the request reached an upstream and settled successfully. A failed row may still show route, latency, or partial attempt metadata. A rejected row usually means admission or policy failed before upstream work.
120120

121+
OpenAI Responses `response.incomplete` can be a successful delivered terminal response. For example, an upstream may stop at `max_output_tokens` or a content filter and still return safe usage metadata. Those rows stay `succeeded` and should not be treated as upstream health failures. Error-coded incomplete terminals, stale continuation anchors, stream truncation, and client disconnects still appear as failures or interruptions with sanitized error codes.
122+
121123
Common causes to inspect are:
122124

123125
1. Pool API key paused, revoked, expired, or attached to the wrong Pool
@@ -134,7 +136,7 @@ The Errors column should stay safe to copy into operator tickets. It should not
134136

135137
Usage lines summarize request accounting, including request count context, token totals, cached input tokens, and estimated cost when pricing is available. Treat these as operational attribution and capacity evidence. They are not a replacement for provider billing records.
136138

137-
If usage is blank, check the request status first. A request can be admitted without final usage if it failed before settlement, if pricing was not matched, or if accounting metadata was not available for that route.
139+
If usage is blank, check the request status first. A request can be admitted without final usage if it failed before settlement, if pricing was not matched, or if accounting metadata was not available for that route. A delivered incomplete response with missing upstream usage remains `usage_unknown` and does not get an invented settled cost from the reservation estimate.
138140

139141
## Operational Checklist
140142

lib/codex_pooler/gateway/openai_compatibility/responses/sse.ex

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,15 @@ defmodule CodexPooler.Gateway.OpenAICompatibility.Responses.SSE do
99
def response_from_sse(body) when is_binary(body) do
1010
case Jason.decode(body) do
1111
{:ok, %{} = decoded} ->
12-
{:ok, Map.put_new(decoded, "object", "response")}
12+
response = Map.put_new(decoded, "object", "response")
13+
14+
case response_terminal_type(response) do
15+
nil ->
16+
{:ok, response}
17+
18+
type ->
19+
terminal_error(%{"type" => type, "response" => response}, response) || {:ok, response}
20+
end
1321

1422
_error ->
1523
body |> decoded_sse_events() |> response_from_sse_events()
@@ -49,11 +57,22 @@ defmodule CodexPooler.Gateway.OpenAICompatibility.Responses.SSE do
4957
)
5058
end
5159

52-
defp terminal_error(_event, %{"status" => status})
53-
when status in ["completed", "in_progress", "incomplete"],
54-
do: nil
55-
5660
defp terminal_error(event, response) do
61+
event_type = event["type"] || response_terminal_type(response)
62+
63+
case StreamProtocol.terminal_outcome(event_type, event) do
64+
{:ok, %{kind: kind}} when kind in [:completed, :incomplete] ->
65+
nil
66+
67+
{:ok, %{kind: :failed, failure: failure}} ->
68+
terminal_failure_error(event, response, failure)
69+
70+
_outcome ->
71+
terminal_failure_error(event, response, nil)
72+
end
73+
end
74+
75+
defp terminal_failure_error(event, response, failure) do
5776
error = response["error"] || event["error"]
5877

5978
case error do
@@ -70,10 +89,18 @@ defmodule CodexPooler.Gateway.OpenAICompatibility.Responses.SSE do
7089
)}
7190

7291
_other ->
73-
{:error, Error.reason(502, "upstream_error", "upstream response failed")}
92+
{:error, Error.reason(502, failure_code(failure), "upstream request failed")}
7493
end
7594
end
7695

96+
defp failure_code(%{code: code}) when is_binary(code), do: code
97+
defp failure_code(_failure), do: "upstream_error"
98+
99+
defp response_terminal_type(%{"status" => "completed"}), do: "response.completed"
100+
defp response_terminal_type(%{"status" => "failed"}), do: "response.failed"
101+
defp response_terminal_type(%{"status" => "incomplete"}), do: "response.incomplete"
102+
defp response_terminal_type(_response), do: nil
103+
77104
defp maybe_backfill_output(%{"output" => output} = response, _events)
78105
when is_list(output) and output != [],
79106
do: response

lib/codex_pooler/gateway/runtime/dispatch/websocket_attempt.ex

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ defmodule CodexPooler.Gateway.Runtime.Dispatch.WebsocketAttempt do
77
alias CodexPooler.Gateway.Runtime.Dispatch.ResponseContext
88
alias CodexPooler.Gateway.Runtime.Finalization
99
alias CodexPooler.Gateway.Runtime.Finalization.{AttemptSettlement, Metadata}
10+
alias CodexPooler.Gateway.Transports.Streaming.StreamProtocol
1011
alias CodexPooler.Gateway.Transports.Streaming.WebsocketCodec
1112
alias CodexPooler.Gateway.Transports.UpstreamDispatch
1213
alias CodexPooler.Gateway.Transports.UpstreamDispatch.Request, as: DispatchRequest
@@ -81,19 +82,19 @@ defmodule CodexPooler.Gateway.Runtime.Dispatch.WebsocketAttempt do
8182
failure
8283
)
8384

84-
{:ok, %{terminal: "response.completed"} = response} ->
85-
Finalization.finalize_completed_websocket_response(
86-
context,
85+
{:ok, %{terminal: terminal} = response} ->
86+
finalization =
8787
response
8888
|> Map.put(:started, started)
89-
|> Map.put(:callbacks, callbacks)
90-
)
89+
|> maybe_put_websocket_callbacks(callbacks)
9190

92-
{:ok, response} ->
93-
Finalization.finalize_terminal_websocket_response(
94-
context,
95-
Map.put(response, :started, started)
96-
)
91+
case websocket_terminal_outcome(terminal, Map.get(response, :body, "")) do
92+
{:ok, %{kind: kind}} when kind in [:completed, :incomplete] ->
93+
Finalization.finalize_completed_websocket_response(context, finalization)
94+
95+
_outcome ->
96+
Finalization.finalize_terminal_websocket_response(context, finalization)
97+
end
9798

9899
{:error, response} ->
99100
Finalization.finalize_failed_websocket_response(
@@ -303,6 +304,19 @@ defmodule CodexPooler.Gateway.Runtime.Dispatch.WebsocketAttempt do
303304
end
304305
end
305306

307+
defp maybe_put_websocket_callbacks(%{terminal: terminal} = finalization, callbacks) do
308+
case websocket_terminal_outcome(terminal, Map.get(finalization, :body, "")) do
309+
{:ok, %{kind: kind}} when kind in [:completed, :incomplete] ->
310+
Map.put(finalization, :callbacks, callbacks)
311+
312+
_outcome ->
313+
finalization
314+
end
315+
end
316+
317+
defp websocket_terminal_outcome("response.completed", _body), do: {:ok, %{kind: :completed}}
318+
defp websocket_terminal_outcome(_terminal, body), do: StreamProtocol.terminal_outcome(body)
319+
306320
defp finalize_exhausted_auth_refresh(context, dispatch_request, response, failure, started) do
307321
case Map.get(response, :body, "") do
308322
"" ->

lib/codex_pooler/gateway/runtime/finalization/websocket.ex

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,19 @@ defmodule CodexPooler.Gateway.Runtime.Finalization.Websocket do
6161

6262
@spec finalize_terminal(DispatchContext.t(), map()) :: {:ok, map()} | {:error, map()}
6363
def finalize_terminal(context, finalization) do
64-
%{
65-
body: body,
66-
terminal: terminal,
67-
status: status,
68-
headers: headers,
69-
started: started
70-
} = finalization
64+
%{body: body, terminal: terminal} = finalization
7165

72-
%{reserved: reserved, attempt: attempt, request_options: request_options} =
73-
context
66+
case websocket_terminal_outcome(terminal, body) do
67+
{:ok, %{kind: kind}} when kind in [:completed, :incomplete] ->
68+
finalize_completed(context, finalization)
69+
70+
_outcome ->
71+
finalize_terminal_failure(context, finalization)
72+
end
73+
end
74+
75+
defp finalize_terminal_failure(context, finalization) do
76+
%{body: body, terminal: terminal, headers: headers} = finalization
7477

7578
upstream_code =
7679
Map.get(finalization, :upstream_error_code) ||
@@ -80,35 +83,67 @@ defmodule CodexPooler.Gateway.Runtime.Finalization.Websocket do
8083
websocket_frame_headers = Map.get(finalization, :websocket_frame_headers, %{})
8184
metadata_headers = headers ++ Map.to_list(websocket_frame_headers)
8285

83-
with :ok <- Streaming.record_terminal_health_failure(upstream_code, metadata_headers, context) do
84-
case AttemptSettlement.finalize_partial_stream_failure(
85-
reserved.request,
86-
attempt,
87-
ResponseUsage.from_websocket_body(body),
88-
SettlementAttrs.partial_stream_failure(
89-
context,
90-
status,
91-
code,
92-
code,
93-
metadata_headers
94-
|> Metadata.websocket_response_metadata(
95-
code,
96-
request_options,
97-
websocket_frame_headers
98-
)
99-
|> Metadata.maybe_put_masked_error_metadata(upstream_code, code),
100-
started: started
101-
)
102-
) do
103-
{:ok, _finalized} ->
104-
{:ok, %{status: 200, headers: [], websocket_messages: []}}
105-
106-
{:error, gateway_error} ->
107-
{:error, gateway_error}
108-
end
86+
attempt_metadata =
87+
terminal_failure_metadata(
88+
context,
89+
metadata_headers,
90+
websocket_frame_headers,
91+
code,
92+
upstream_code
93+
)
94+
95+
case Streaming.record_terminal_health_failure(upstream_code, metadata_headers, context) do
96+
:ok ->
97+
settle_terminal_failure(context, finalization, body, code, attempt_metadata)
98+
99+
{:error, _gateway_error} = error ->
100+
error
101+
end
102+
end
103+
104+
defp terminal_failure_metadata(
105+
context,
106+
metadata_headers,
107+
websocket_frame_headers,
108+
code,
109+
upstream_code
110+
) do
111+
metadata_headers
112+
|> Metadata.websocket_response_metadata(
113+
code,
114+
context.request_options,
115+
websocket_frame_headers
116+
)
117+
|> Metadata.maybe_put_masked_error_metadata(upstream_code, code)
118+
end
119+
120+
defp settle_terminal_failure(context, finalization, body, code, attempt_metadata) do
121+
%{reserved: reserved, attempt: attempt} = context
122+
123+
case AttemptSettlement.finalize_partial_stream_failure(
124+
reserved.request,
125+
attempt,
126+
ResponseUsage.from_websocket_body(body),
127+
SettlementAttrs.partial_stream_failure(
128+
context,
129+
finalization.status,
130+
code,
131+
code,
132+
attempt_metadata,
133+
started: finalization.started
134+
)
135+
) do
136+
{:ok, _finalized} ->
137+
{:ok, %{status: 200, headers: [], websocket_messages: []}}
138+
139+
{:error, gateway_error} ->
140+
{:error, gateway_error}
109141
end
110142
end
111143

144+
defp websocket_terminal_outcome("response.completed", _body), do: {:ok, %{kind: :completed}}
145+
defp websocket_terminal_outcome(_terminal, body), do: StreamProtocol.terminal_outcome(body)
146+
112147
@spec finalize_failed(DispatchContext.t(), map()) :: {:error, map()}
113148
def finalize_failed(context, %{reason: :client_disconnected} = finalization) do
114149
%{headers: headers, started: started} = finalization

lib/codex_pooler/gateway/runtime/streaming/stream_attempt.ex

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ defmodule CodexPooler.Gateway.Runtime.Streaming.StreamAttempt do
3535

3636
defp classify_data_after_first_event(data) do
3737
classification =
38-
case StreamProtocol.terminal_failure(data) do
39-
{:ok, failure} -> {:write_terminal_failure, data, failure}
40-
:error -> {:write, data}
38+
case StreamProtocol.terminal_outcome(data) do
39+
{:ok, %{kind: :failed, failure: failure}} -> {:write_terminal_failure, data, failure}
40+
_outcome -> {:write, data}
4141
end
4242

4343
{classification, %{classified?: true, buffer: ""}}
@@ -80,9 +80,9 @@ defmodule CodexPooler.Gateway.Runtime.Streaming.StreamAttempt do
8080
if StreamProtocol.internal_rate_limit_event?(event) do
8181
{:write, buffer}
8282
else
83-
case StreamProtocol.terminal_failure_event(event) do
84-
{:ok, failure} -> {:write_terminal_failure, buffer, failure}
85-
nil -> {:write, buffer}
83+
case StreamProtocol.terminal_outcome_event(event) do
84+
{:ok, %{kind: :failed, failure: failure}} -> {:write_terminal_failure, buffer, failure}
85+
_outcome -> {:write, buffer}
8686
end
8787
end
8888
end

0 commit comments

Comments
 (0)