Skip to content

Commit 9458784

Browse files
committed
fix(upstreams): avoid token refresh loops on fresh usage probes
Treat usage-probe 401/403 responses as quota unavailable when the stored access token is still fresh, so reconciliation does not mark routable accounts refresh_failed or call OAuth unnecessarily. Keep retryable account-reconciliation refresh failures recoverable through the existing token refresh worker and tighten job read-model redaction for recovery metadata.
1 parent 517c0ab commit 9458784

7 files changed

Lines changed: 542 additions & 33 deletions

File tree

lib/codex_pooler/jobs/read_model/failure_presentation.ex

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@ defmodule CodexPooler.Jobs.ReadModel.FailurePresentation do
22
@moduledoc false
33

44
@sensitive_projection_keys [:args, :meta, :errors, "args", "meta", "errors"]
5+
@sensitive_secret_key_pattern "(?:secret(?:[_-][a-z0-9]+)*|[a-z0-9]+(?:[_-][a-z0-9]+)*[_-]secret(?:[_-][a-z0-9]+)*)"
6+
@sensitive_body_key_pattern "(?:auth[_-]?json|provider[_-]?body|request[_-]?body|response[_-]?body|body)"
7+
@sensitive_failure_key_pattern "(?:authorization|cookie|set-cookie|api[_-]?key|(?:access|refresh|id|session|api)[_-]?token|[a-z0-9]+(?:[_-][a-z0-9]+)*[_-]token|#{@sensitive_body_key_pattern}|password|prompt|#{@sensitive_secret_key_pattern}|token)"
8+
@sensitive_spaced_secret_key_pattern "(?:password|#{@sensitive_secret_key_pattern})"
9+
@sensitive_quoted_string_body_failure_fragment ~r/(?i)(?:"?\b#{@sensitive_body_key_pattern}\b"?\s*[:=]\s*)"(?:\\.|[^"\\])*"/
10+
@sensitive_jsonish_failure_fragment ~r/(?i)(?:"?\b#{@sensitive_failure_key_pattern}\b"?\s*[:=]\s*)(?:\{(?:[^{}]|\{[^{}]*\})*\}|\[(?:[^\[\]]|\[[^\[\]]*\])*\]|"[^"]*"|'[^']*')/
11+
@sensitive_body_failure_fragment ~r/(?i)(?:"?\b#{@sensitive_body_key_pattern}\b"?\s*[:=]\s*).*?(?=\s+"?\b#{@sensitive_failure_key_pattern}\b"?\s*[:=]|\z)/
12+
@sensitive_spaced_secret_failure_fragment ~r/(?i)(?:"?\b#{@sensitive_spaced_secret_key_pattern}\b"?\s*[:=]\s*).*?(?=[,;]|\.\s+|\s+"?\b#{@sensitive_failure_key_pattern}\b"?\s*[:=]|\z)/
13+
@sensitive_text_failure_fragment ~r/(?i)(?:"?\b#{@sensitive_failure_key_pattern}\b"?\s*[:=]\s*)[^,;\s]+/
514

615
@type failure_summary :: %{
716
required(:title) => String.t(),
@@ -21,6 +30,12 @@ defmodule CodexPooler.Jobs.ReadModel.FailurePresentation do
2130
def sanitize_projection(%NaiveDateTime{} = value), do: value
2231
def sanitize_projection(%Date{} = value), do: value
2332

33+
def sanitize_projection(%{trigger_kind: trigger_kind} = value) when trigger_kind in [nil, ""] do
34+
value
35+
|> Map.delete(:trigger_kind)
36+
|> sanitize_projection()
37+
end
38+
2439
def sanitize_projection(%{errors: errors} = value) when is_list(errors) do
2540
value
2641
|> Map.drop(@sensitive_projection_keys)
@@ -109,11 +124,12 @@ defmodule CodexPooler.Jobs.ReadModel.FailurePresentation do
109124
defp redact_failure_secrets(message) do
110125
message
111126
|> String.replace(~r/(?i)bearer\s+[a-z0-9._~+\/=:-]+/, "Bearer [redacted]")
127+
|> String.replace(@sensitive_quoted_string_body_failure_fragment, "[redacted]")
128+
|> String.replace(@sensitive_jsonish_failure_fragment, "[redacted]")
129+
|> String.replace(@sensitive_body_failure_fragment, "[redacted]")
130+
|> String.replace(@sensitive_spaced_secret_failure_fragment, "[redacted]")
131+
|> String.replace(@sensitive_text_failure_fragment, "[redacted]")
112132
|> String.replace(~r/(?i)\bsecret[-_a-z0-9]*\b/, "[redacted]")
113-
|> String.replace(
114-
~r/(?i)\b(authorization|cookie|set-cookie|api[_-]?key|access[_-]?token|refresh[_-]?token|password|prompt|secret|token)\b\s*[:=]\s*[^,;\s]+/,
115-
"[redacted]"
116-
)
117133
end
118134

119135
defp unwrap_oban_failure_message(message) do

lib/codex_pooler/jobs/read_model/query.ex

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,29 @@ defmodule CodexPooler.Jobs.ReadModel.Query do
2929

3030
def with_attention(jobs, opts) when is_list(jobs) do
3131
now = attention_now(opts)
32-
Enum.map(jobs, &HealthPolicy.put_attention(&1, now: now))
32+
Enum.map(jobs, &put_attention(&1, now))
3333
end
3434

3535
def with_attention(nil, _opts), do: nil
3636

3737
def with_attention(job, opts) when is_map(job) do
38-
HealthPolicy.put_attention(job, now: attention_now(opts))
38+
put_attention(job, attention_now(opts))
3939
end
4040

4141
def attention_now(opts), do: Keyword.get_lazy(opts, :now, &DateTime.utc_now/0)
4242

43+
defp put_attention(job, now) do
44+
job
45+
|> drop_blank_trigger_kind()
46+
|> HealthPolicy.put_attention(now: now)
47+
end
48+
49+
defp drop_blank_trigger_kind(%{trigger_kind: trigger_kind} = job)
50+
when trigger_kind in [nil, ""],
51+
do: Map.delete(job, :trigger_kind)
52+
53+
defp drop_blank_trigger_kind(job), do: job
54+
4355
def group_worker_rows_by_index(worker_groups) do
4456
worker_groups = Enum.with_index(worker_groups)
4557

@@ -71,6 +83,7 @@ defmodule CodexPooler.Jobs.ReadModel.Query do
7183
worker: unquote(job).worker,
7284
queue: unquote(job).queue,
7385
state: unquote(job).state,
86+
trigger_kind: fragment("?->>?", unquote(job).args, "trigger_kind"),
7487
errors: unquote(job).errors,
7588
attempt: unquote(job).attempt,
7689
max_attempts: unquote(job).max_attempts,

lib/codex_pooler/upstreams/reconciliation/pool_reconciliation.ex

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ defmodule CodexPooler.Upstreams.Reconciliation.PoolReconciliation do
33

44
import Ecto.Query
55

6+
alias CodexPooler.Jobs
67
alias CodexPooler.Pools.Pool
78
alias CodexPooler.Repo
89
alias CodexPooler.Upstreams.Auth.TokenRefresh
@@ -17,6 +18,7 @@ defmodule CodexPooler.Upstreams.Reconciliation.PoolReconciliation do
1718
@eligible PoolUpstreamAssignment.eligible_status()
1819
@health_active PoolUpstreamAssignment.active_health_status()
1920
@account_quota_key "account"
21+
@usage_auth_refresh_skew_seconds 5 * 60
2022
@codex_usage_paths [
2123
"/api/codex/usage",
2224
"/backend-api/codex/usage",
@@ -204,7 +206,7 @@ defmodule CodexPooler.Upstreams.Reconciliation.PoolReconciliation do
204206
{:usage, identity, payload, windows}
205207

206208
{:error, {:upstream_status, status}} when status in [401, 403] ->
207-
retry_codex_usage_after_token_refresh(identity, assignment, opts)
209+
maybe_retry_codex_usage_after_token_refresh(identity, assignment, observed_at, opts)
208210

209211
_error ->
210212
:usage_unavailable
@@ -214,13 +216,62 @@ defmodule CodexPooler.Upstreams.Reconciliation.PoolReconciliation do
214216
end
215217
end
216218

219+
defp maybe_retry_codex_usage_after_token_refresh(identity, assignment, observed_at, opts) do
220+
if access_token_refresh_due_after_usage_auth_failure?(identity, observed_at) do
221+
retry_codex_usage_after_token_refresh(identity, assignment, opts)
222+
else
223+
:usage_unavailable
224+
end
225+
end
226+
227+
defp access_token_refresh_due_after_usage_auth_failure?(
228+
%UpstreamIdentity{} = identity,
229+
%DateTime{} = observed_at
230+
) do
231+
case access_token_expires_at(identity.metadata) do
232+
{:ok, expires_at} ->
233+
refresh_at = DateTime.add(observed_at, @usage_auth_refresh_skew_seconds, :second)
234+
DateTime.compare(expires_at, refresh_at) in [:lt, :eq]
235+
236+
:unknown ->
237+
true
238+
end
239+
end
240+
241+
defp access_token_expires_at(%{} = metadata) do
242+
case metadata["access_token_expires_at"] do
243+
expires_at when is_binary(expires_at) ->
244+
case DateTime.from_iso8601(expires_at) do
245+
{:ok, parsed, _offset} -> {:ok, DateTime.truncate(parsed, :microsecond)}
246+
_invalid -> :unknown
247+
end
248+
249+
_value ->
250+
:unknown
251+
end
252+
end
253+
254+
defp access_token_expires_at(_metadata), do: :unknown
255+
217256
defp retry_codex_usage_after_token_refresh(identity, assignment, opts) do
218-
with {:ok, %{status: :active, identity: refreshed_identity}} <-
219-
TokenRefresh.refresh_access_token(identity,
220-
trigger_kind: "account_reconciliation",
221-
receive_timeout: Keyword.get(opts, :receive_timeout, 30_000)
222-
),
223-
{:ok, access_token} <- Secrets.decrypt_active_secret(refreshed_identity, "access_token"),
257+
case TokenRefresh.refresh_access_token(identity,
258+
trigger_kind: "account_reconciliation",
259+
receive_timeout: Keyword.get(opts, :receive_timeout, 30_000)
260+
) do
261+
{:ok, %{status: :active, identity: refreshed_identity}} ->
262+
fetch_codex_usage_after_successful_token_refresh(refreshed_identity, assignment, opts)
263+
264+
{:ok, %{status: :refresh_failed, retryable?: true, identity: failed_identity}} ->
265+
maybe_enqueue_account_reconciliation_token_refresh_recovery(failed_identity)
266+
:auth_unavailable
267+
268+
_unavailable ->
269+
:auth_unavailable
270+
end
271+
end
272+
273+
defp fetch_codex_usage_after_successful_token_refresh(refreshed_identity, assignment, opts) do
274+
with {:ok, access_token} <- Secrets.decrypt_active_secret(refreshed_identity, "access_token"),
224275
observed_at <- now(),
225276
{:ok, payload, _url, windows} <-
226277
fetch_codex_usage_payload(
@@ -236,6 +287,26 @@ defmodule CodexPooler.Upstreams.Reconciliation.PoolReconciliation do
236287
end
237288
end
238289

290+
defp maybe_enqueue_account_reconciliation_token_refresh_recovery(
291+
%UpstreamIdentity{} = failed_identity
292+
) do
293+
if account_reconciliation_refresh_failure?(failed_identity) do
294+
case Jobs.enqueue_token_refresh(failed_identity,
295+
trigger_kind: "account_reconciliation_recovery"
296+
) do
297+
{:ok, %Oban.Job{}} -> :ok
298+
{:error, _reason} -> :ok
299+
end
300+
end
301+
end
302+
303+
defp account_reconciliation_refresh_failure?(%UpstreamIdentity{} = identity) do
304+
case identity.metadata["token_refresh"] do
305+
%{"status" => "failed", "trigger_kind" => "account_reconciliation"} -> true
306+
_metadata -> false
307+
end
308+
end
309+
239310
defp fetch_codex_usage_payload(identity, assignment, access_token, observed_at, opts) do
240311
base = upstream_usage_base_url(identity, assignment)
241312
timeout = Keyword.get(opts, :receive_timeout, 30_000)

0 commit comments

Comments
 (0)