Skip to content

Commit 4b070fa

Browse files
committed
test(runtime): cover websocket auth refresh retries
Assert owner-forwarded websocket handshake authorization failures refresh and retry through the same owner, and that bearer changes between turns open a fresh upstream websocket connection.
1 parent 2946e11 commit 4b070fa

2 files changed

Lines changed: 151 additions & 0 deletions

File tree

test/codex_pooler/gateway/transports/upstream_websocket_session_test.exs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,61 @@ defmodule CodexPooler.Gateway.Transports.Websocket.UpstreamWebsocketSessionTest
138138
assert_receive {:fake_upstream_chunk_sent, 3}, 1_000
139139
end
140140

141+
test "opens a new upstream websocket connection when bearer changes between turns" do
142+
upstream =
143+
start_upstream(
144+
{:sequence,
145+
[
146+
FakeUpstream.json_response(%{"id" => "resp_ws_old_token", "object" => "response"}),
147+
FakeUpstream.json_response(%{"id" => "resp_ws_new_token", "object" => "response"})
148+
]}
149+
)
150+
151+
{:ok, session} = UpstreamWebsocketSession.start_link([])
152+
parent = self()
153+
url = FakeUpstream.url(upstream) <> "/backend-api/codex/responses"
154+
155+
request = fn label, bearer, content ->
156+
%Request{
157+
url: url,
158+
headers: [{"authorization", "Bearer #{bearer}"}],
159+
payload:
160+
Jason.encode!(%{
161+
"model" => "upstream-test-model",
162+
"input" => [%{"type" => "message", "role" => "user", "content" => content}],
163+
"stream" => true
164+
}),
165+
timeouts: @timeouts,
166+
writer: fn text -> send(parent, {:upstream_websocket_frame, label, text}) end,
167+
message_mapper: nil
168+
}
169+
end
170+
171+
assert {:ok, %{terminal: "response.completed", status: 200}} =
172+
UpstreamWebsocketSession.request(
173+
session,
174+
request.(:old_token_turn, "old-upstream-token", "first turn")
175+
)
176+
177+
assert_receive {:upstream_websocket_frame, :old_token_turn, old_frame}, 1_000
178+
assert %{"id" => "resp_ws_old_token"} = Jason.decode!(old_frame)
179+
180+
assert {:ok, %{terminal: "response.completed", status: 200}} =
181+
UpstreamWebsocketSession.request(
182+
session,
183+
request.(:new_token_turn, "new-upstream-token", "second turn")
184+
)
185+
186+
assert_receive {:upstream_websocket_frame, :new_token_turn, new_frame}, 1_000
187+
assert %{"id" => "resp_ws_new_token"} = Jason.decode!(new_frame)
188+
189+
assert [first_request, second_request] = FakeUpstream.requests(upstream)
190+
assert first_request.websocket_connection_id != second_request.websocket_connection_id
191+
assert Map.new(first_request.headers)["authorization"] == "Bearer old-upstream-token"
192+
assert Map.new(second_request.headers)["authorization"] == "Bearer new-upstream-token"
193+
assert FakeUpstream.websocket_connection_count(upstream) == 2
194+
end
195+
141196
test "does not treat response.created as upstream websocket terminal success" do
142197
parent = self()
143198

test/codex_pooler_web/controllers/runtime/backend_codex_websocket_owner_forwarding_test.exs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ defmodule CodexPoolerWeb.Runtime.BackendCodexWebsocketOwnerForwardingTest do
4040
alias CodexPooler.Gateway.Runtime.Finalization.Interruption
4141

4242
alias CodexPooler.Gateway.Persistence.{
43+
BridgeDemotion,
4344
BridgeOwnerLease,
4445
BridgeSessionAlias,
4546
CodexSession,
@@ -353,6 +354,101 @@ defmodule CodexPoolerWeb.Runtime.BackendCodexWebsocketOwnerForwardingTest do
353354
end
354355
end
355356

357+
@tag :feature_websocket_terminal_auth_refresh
358+
test "owner-forwarded websocket handshake 401 refreshes through the same owner without demotion" do
359+
upstream =
360+
start_upstream(
361+
{:sequence,
362+
[
363+
FakeUpstream.websocket_upgrade_error(
364+
%{"error" => %{"code" => "invalid_api_key"}},
365+
status: 401,
366+
headers: [{"x-openai-authorization-error", "invalid_api_key"}]
367+
),
368+
FakeUpstream.json_response(
369+
%{"access_token" => "owner-upstream-token-handshake-refreshed"},
370+
200
371+
),
372+
FakeUpstream.json_response(%{
373+
"id" => "resp_owner_auth_handshake_retry_success",
374+
"object" => "response",
375+
"usage" => %{"input_tokens" => 4, "output_tokens" => 3, "total_tokens" => 7}
376+
})
377+
]}
378+
)
379+
380+
setup = gateway_setup(upstream)
381+
382+
assert {:ok, _secret} =
383+
Upstreams.store_encrypted_secret(setup.identity, %{
384+
secret_kind: "refresh_token",
385+
plaintext: "refresh-token-owner-ws-handshake-do-not-leak"
386+
})
387+
388+
{:ok, auth} = Access.authenticate_authorization_header(setup.authorization)
389+
390+
{:ok, state} =
391+
owner_socket(auth, "ws-owner-auth-handshake-refresh", "owner-auth-handshake-refresh")
392+
393+
try do
394+
assert {:ok, owner_pid} = WebsocketOwnerSession.lookup(state.codex_session.id)
395+
396+
assert {:ok, state} =
397+
CodexResponsesSocket.handle_in(
398+
{websocket_payload(setup, "owner handshake auth refresh"), [opcode: :text]},
399+
state
400+
)
401+
402+
assert {:push, {:text, frame}, state} = receive_owner_socket_push(state)
403+
assert %{"id" => "resp_owner_auth_handshake_retry_success"} = Jason.decode!(frame)
404+
assert {:ok, _state} = receive_socket_done(state)
405+
assert {:ok, ^owner_pid} = WebsocketOwnerSession.lookup(state.codex_session.id)
406+
407+
assert [refresh_request, retried_request] = await_upstream_requests(upstream, 2)
408+
assert refresh_request.path == "/oauth/token"
409+
assert retried_request.method == "WEBSOCKET"
410+
assert retried_request.path == "/backend-api/codex/responses"
411+
412+
assert Map.new(retried_request.headers)["authorization"] ==
413+
"Bearer owner-upstream-token-handshake-refreshed"
414+
415+
assert FakeUpstream.websocket_connection_count(upstream) == 1
416+
assert [request] = request_logs(setup.pool.id)
417+
assert request.status == "succeeded"
418+
assert request.retry_count == 1
419+
assert request.last_error_code == nil
420+
assert request.request_metadata["auth_refresh"]["status"] == "succeeded"
421+
422+
owner_metadata = request.request_metadata["websocket_owner_forwarding"]
423+
assert owner_metadata["enabled"] == true
424+
assert owner_metadata["owner_instance_id"] == Atom.to_string(node())
425+
assert owner_metadata["proxy_instance_id"] == Atom.to_string(node())
426+
refute Repo.exists?(from d in BridgeDemotion, where: d.pool_id == ^setup.pool.id)
427+
428+
assert [first_attempt, second_attempt] =
429+
Repo.all(from(a in Attempt, order_by: [asc: a.attempt_number]))
430+
431+
assert first_attempt.pool_upstream_assignment_id == setup.assignment.id
432+
assert first_attempt.status == "retryable_failed"
433+
assert first_attempt.network_error_code == "upstream_unauthorized"
434+
assert second_attempt.pool_upstream_assignment_id == setup.assignment.id
435+
assert second_attempt.status == "succeeded"
436+
437+
metadata_text =
438+
inspect(
439+
{request.request_metadata, first_attempt.response_metadata,
440+
second_attempt.response_metadata}
441+
)
442+
443+
refute metadata_text =~ setup.authorization
444+
refute metadata_text =~ "refresh-token-owner-ws-handshake-do-not-leak"
445+
refute metadata_text =~ "owner-upstream-token-handshake-refreshed"
446+
refute metadata_text =~ "Bearer "
447+
after
448+
CodexResponsesSocket.terminate(:closed, state)
449+
end
450+
end
451+
356452
test "response.processed after reconnect is forwarded through the owner upstream connection" do
357453
upstream =
358454
start_upstream(

0 commit comments

Comments
 (0)