Skip to content

Commit aab6f80

Browse files
committed
fix(admin): deduplicate upstream stats rows
1 parent 3c6d746 commit aab6f80

2 files changed

Lines changed: 172 additions & 10 deletions

File tree

lib/codex_pooler/admin/stats.ex

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -495,27 +495,103 @@ defmodule CodexPooler.Admin.Stats do
495495
entries_by_identity = Enum.group_by(settlements, & &1.upstream_identity_id)
496496

497497
quota_accounts
498-
|> Enum.map(fn account ->
499-
entries = Map.get(entries_by_identity, account.upstream_identity_id, [])
498+
|> Enum.group_by(& &1.upstream_identity_id)
499+
|> Enum.map(fn {upstream_identity_id, accounts} ->
500+
entries = Map.get(entries_by_identity, upstream_identity_id, [])
501+
canonical_account = canonical_upstream_account(accounts)
500502

501503
%{
502-
pool_upstream_assignment_id: account.pool_upstream_assignment_id,
503-
upstream_identity_id: account.upstream_identity_id,
504-
assignment_label: account.assignment_label,
505-
upstream_label: account.upstream_label,
506-
status: account.assignment_status,
507-
health_status: account.health_status,
508-
quota_state: account.state,
504+
pool_upstream_assignment_id: single_assignment_id(accounts),
505+
upstream_identity_id: upstream_identity_id,
506+
assignment_label: shared_account_value(accounts, :assignment_label),
507+
upstream_label:
508+
shared_account_value(accounts, :upstream_label) || canonical_account.upstream_label,
509+
status: aggregate_account_value(accounts, :assignment_status),
510+
health_status: aggregate_account_value(accounts, :health_status),
511+
quota_state: aggregate_account_value(accounts, :state, :mixed),
512+
assignment_count: length(accounts),
509513
requests: sum_integer(entries, :request_count),
510514
total_tokens: sum_integer(entries, :total_tokens),
511515
settled_cost_micros: sum_decimal_integer(entries, :settled_cost_micros)
512516
}
513517
end)
514518
|> Enum.sort_by(fn row ->
515-
{-row.total_tokens, -row.requests, row.assignment_label || row.upstream_label || ""}
519+
{-row.total_tokens, -row.requests, upstream_table_label(row),
520+
row.upstream_identity_id || ""}
516521
end)
517522
end
518523

524+
@spec canonical_upstream_account([map()]) :: map()
525+
defp canonical_upstream_account(accounts) do
526+
Enum.min_by(accounts, &upstream_account_sort_key/1, fn -> %{} end)
527+
end
528+
529+
@spec single_assignment_id([map()]) :: Ecto.UUID.t() | nil
530+
defp single_assignment_id([account]), do: Map.get(account, :pool_upstream_assignment_id)
531+
defp single_assignment_id(_accounts), do: nil
532+
533+
@spec shared_account_value([map()], atom()) :: term() | nil
534+
defp shared_account_value(accounts, field) do
535+
case distinct_account_values(accounts, field) do
536+
[value] -> value
537+
_values -> nil
538+
end
539+
end
540+
541+
@spec aggregate_account_value([map()], atom(), term()) :: term() | nil
542+
defp aggregate_account_value(accounts, field, mixed_value \\ "mixed") do
543+
case distinct_account_values(accounts, field) do
544+
[value] -> value
545+
[] -> nil
546+
_values -> mixed_value
547+
end
548+
end
549+
550+
@spec distinct_account_values([map()], atom()) :: [term()]
551+
defp distinct_account_values(accounts, field) do
552+
accounts
553+
|> Enum.map(&Map.get(&1, field))
554+
|> Enum.reject(&blank_value?/1)
555+
|> Enum.uniq()
556+
end
557+
558+
@spec upstream_account_sort_key(map()) :: {integer(), String.t(), String.t()}
559+
defp upstream_account_sort_key(account) do
560+
{
561+
assignment_status_rank(Map.get(account, :assignment_status)),
562+
safe_string(Map.get(account, :assignment_label) || Map.get(account, :upstream_label)),
563+
safe_string(Map.get(account, :pool_upstream_assignment_id))
564+
}
565+
end
566+
567+
@spec assignment_status_rank(term()) :: non_neg_integer()
568+
defp assignment_status_rank("active"), do: 0
569+
defp assignment_status_rank("pending"), do: 1
570+
defp assignment_status_rank("refresh_due"), do: 2
571+
defp assignment_status_rank("refreshing"), do: 3
572+
defp assignment_status_rank("paused"), do: 4
573+
defp assignment_status_rank("refresh_failed"), do: 5
574+
defp assignment_status_rank("reauth_required"), do: 6
575+
defp assignment_status_rank("disabled"), do: 7
576+
defp assignment_status_rank("errored"), do: 8
577+
defp assignment_status_rank("deleted"), do: 9
578+
defp assignment_status_rank(_status), do: 10
579+
580+
@spec upstream_table_label(map()) :: String.t()
581+
defp upstream_table_label(row) do
582+
safe_string(row.assignment_label || row.upstream_label)
583+
end
584+
585+
@spec blank_value?(term()) :: boolean()
586+
defp blank_value?(nil), do: true
587+
defp blank_value?(value) when is_binary(value), do: String.trim(value) == ""
588+
defp blank_value?(_value), do: false
589+
590+
@spec safe_string(term()) :: String.t()
591+
defp safe_string(nil), do: ""
592+
defp safe_string(value) when is_binary(value), do: value
593+
defp safe_string(value), do: to_string(value)
594+
519595
defp usage_pool_name(entries, pool_names_by_id) do
520596
entries
521597
|> Enum.map(& &1.pool_id)

test/codex_pooler/admin/stats_test.exs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ defmodule CodexPooler.Admin.StatsTest do
1414
alias CodexPooler.Jobs
1515
alias CodexPooler.Jobs.RuntimeStateCleanupWorker
1616
alias CodexPooler.Repo
17+
alias CodexPooler.Upstreams.Assignments.PoolAssignments
1718

1819
test "build_dashboard/2 returns pool-scoped KPI, table, chart, session, and quota aggregates" do
1920
scope = owner_scope()
@@ -187,6 +188,91 @@ defmodule CodexPooler.Admin.StatsTest do
187188
] = dashboard.tables.upstreams
188189
end
189190

191+
test "build_dashboard/2 collapses repeated assignments for the same upstream identity" do
192+
scope = owner_scope()
193+
first_pool = pool_fixture(%{slug: "stats-shared-upstream-a", name: "Stats Shared A"})
194+
second_pool = pool_fixture(%{slug: "stats-shared-upstream-b", name: "Stats Shared B"})
195+
third_pool = pool_fixture(%{slug: "stats-shared-label", name: "Stats Shared Label"})
196+
%{api_key: first_api_key} = active_api_key_fixture(first_pool)
197+
%{api_key: second_api_key} = active_api_key_fixture(second_pool)
198+
%{api_key: third_api_key} = active_api_key_fixture(third_pool)
199+
200+
%{identity: shared_identity, assignment: first_assignment} =
201+
upstream_assignment_fixture(first_pool, %{
202+
account_label: "Shared account",
203+
assignment_label: "Pool A custom label"
204+
})
205+
206+
assert {:ok, second_assignment} =
207+
PoolAssignments.create_pool_assignment(second_pool, shared_identity, %{
208+
assignment_label: "Pool B custom label",
209+
status: "active",
210+
health_status: "active",
211+
eligibility_status: "eligible"
212+
})
213+
214+
%{identity: same_label_identity, assignment: same_label_assignment} =
215+
upstream_assignment_fixture(third_pool, %{
216+
account_label: "Shared account",
217+
assignment_label: "Pool A custom label"
218+
})
219+
220+
as_of = ~U[2026-01-10 12:00:00.000000Z]
221+
occurred_at = ~U[2026-01-10 11:30:00.000000Z]
222+
223+
insert_timed_usage!(
224+
first_pool,
225+
first_api_key,
226+
first_assignment,
227+
shared_identity,
228+
occurred_at,
229+
40
230+
)
231+
232+
insert_timed_usage!(
233+
second_pool,
234+
second_api_key,
235+
second_assignment,
236+
shared_identity,
237+
occurred_at,
238+
60
239+
)
240+
241+
insert_timed_usage!(
242+
third_pool,
243+
third_api_key,
244+
same_label_assignment,
245+
same_label_identity,
246+
occurred_at,
247+
25
248+
)
249+
250+
assert {:ok, dashboard} = Stats.build_dashboard(scope, %{window: "1h", as_of: as_of})
251+
252+
assert [
253+
%{
254+
upstream_identity_id: shared_identity_id,
255+
assignment_label: nil,
256+
upstream_label: "Shared account",
257+
status: "active",
258+
assignment_count: 2,
259+
requests: 2,
260+
total_tokens: 100
261+
},
262+
%{
263+
upstream_identity_id: same_label_identity_id,
264+
assignment_label: "Pool A custom label",
265+
upstream_label: "Shared account",
266+
assignment_count: 1,
267+
requests: 1,
268+
total_tokens: 25
269+
}
270+
] = dashboard.tables.upstreams
271+
272+
assert shared_identity_id == shared_identity.id
273+
assert same_label_identity_id == same_label_identity.id
274+
end
275+
190276
test "build_dashboard/2 returns hourly model usage top five plus Other for sub-day windows" do
191277
%{user: owner} = bootstrap_owner_fixture()
192278
%{user: admin} = operator_fixture(owner, %{"email" => unique_user_email()})

0 commit comments

Comments
 (0)