|
| 1 | +"""Backfill ``token_usage.model_id`` for rows written before the column. |
| 2 | +
|
| 3 | +New rows get ``model_id`` stamped at write time (see |
| 4 | +``application.llm.llm_creator`` / ``application.usage``). This script |
| 5 | +fills the historical NULLs by deriving the model from data we already |
| 6 | +trust, in priority order. A row is only ever filled by the |
| 7 | +highest-priority tier that matches it; tiers run in one transaction so |
| 8 | +each later tier sees only the rows still NULL. |
| 9 | +
|
| 10 | +Tiers (both touch only ``source='agent_stream'`` rows) |
| 11 | +----- |
| 12 | +1. ``request_id`` join (high confidence). The route stamps the same |
| 13 | + ``request_id`` on the token_usage row and the assistant message, so |
| 14 | + ``conversation_messages.model_id`` is authoritative for the call. |
| 15 | +2. ``agent_id`` + nearest message (medium confidence). For primary rows |
| 16 | + with no usable ``request_id`` (legacy), copy ``model_id`` from the |
| 17 | + closest-in-time message of any conversation belonging to the same |
| 18 | + agent, within ``--window-minutes`` (ties broken toward the later |
| 19 | + message so re-runs are reproducible). |
| 20 | +
|
| 21 | +Side-channel rows (``fallback`` / ``compression`` / ``title`` / |
| 22 | +``rag_condense`` / ``schedule``) are left NULL: they share the primary |
| 23 | +turn's ``request_id`` or agent but often ran a *different* model (a |
| 24 | +backup, a compression override), so copying the primary turn's model |
| 25 | +onto them would mis-attribute spend. New rows already get the correct |
| 26 | +per-call model stamped at write time, so this only concerns history. |
| 27 | +
|
| 28 | +Rows that match neither tier are left NULL on purpose — the partial |
| 29 | +index ``token_usage_model_ts_idx`` excludes them, and a model we can't |
| 30 | +tie to the specific call (e.g. the agent's configured default) would |
| 31 | +poison the analytics it feeds. |
| 32 | +
|
| 33 | +Both ``model_id`` columns store the canonical id (catalog name for |
| 34 | +built-ins, UUID for BYOM), so BYOM rows backfill to the UUID unchanged. |
| 35 | +
|
| 36 | +Usage:: |
| 37 | +
|
| 38 | + # Dry-run (default): runs the fills in a rolled-back transaction and |
| 39 | + # reports exactly how many rows each tier would touch. |
| 40 | + python scripts/db/backfill_token_usage_model_id.py |
| 41 | +
|
| 42 | + # Commit the backfill. |
| 43 | + python scripts/db/backfill_token_usage_model_id.py --apply |
| 44 | +
|
| 45 | + # Widen the tier-2 match window (default 5 minutes). |
| 46 | + python scripts/db/backfill_token_usage_model_id.py --window-minutes 10 --apply |
| 47 | +
|
| 48 | +Exit codes: |
| 49 | + 0 — success (dry-run or apply) |
| 50 | + 1 — bad arguments |
| 51 | +""" |
| 52 | + |
| 53 | +from __future__ import annotations |
| 54 | + |
| 55 | +import argparse |
| 56 | +import sys |
| 57 | +from pathlib import Path |
| 58 | + |
| 59 | +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) |
| 60 | + |
| 61 | +from sqlalchemy import text # noqa: E402 |
| 62 | + |
| 63 | +from application.storage.db.engine import get_engine # noqa: E402 |
| 64 | + |
| 65 | + |
| 66 | +# Tier 1: same request -> same model, primary (agent_stream) rows only. |
| 67 | +# conversation_messages.model_id is authoritative for that turn; fallback |
| 68 | +# / compression rows share the request_id but ran a different model. |
| 69 | +_TIER1 = text( |
| 70 | + """ |
| 71 | + UPDATE token_usage tu |
| 72 | + SET model_id = cm.model_id |
| 73 | + FROM conversation_messages cm |
| 74 | + WHERE cm.request_id = tu.request_id |
| 75 | + AND cm.model_id IS NOT NULL |
| 76 | + AND tu.model_id IS NULL |
| 77 | + AND tu.request_id IS NOT NULL |
| 78 | + AND tu.source = 'agent_stream' |
| 79 | + """ |
| 80 | +) |
| 81 | + |
| 82 | +# Tier 2: nearest message of the same agent within the window, primary |
| 83 | +# (agent_stream) rows only. The EXISTS mirror skips rows with no match |
| 84 | +# (else the subquery would set NULL); the ORDER BY tiebreak (later message |
| 85 | +# wins) keeps the pick reproducible across re-runs. |
| 86 | +_TIER2 = text( |
| 87 | + """ |
| 88 | + UPDATE token_usage tu |
| 89 | + SET model_id = ( |
| 90 | + SELECT cm.model_id |
| 91 | + FROM conversation_messages cm |
| 92 | + JOIN conversations c ON c.id = cm.conversation_id |
| 93 | + WHERE c.agent_id = tu.agent_id |
| 94 | + AND cm.model_id IS NOT NULL |
| 95 | + AND cm.timestamp BETWEEN tu.timestamp - make_interval(mins => :win) |
| 96 | + AND tu.timestamp + make_interval(mins => :win) |
| 97 | + ORDER BY abs(extract(epoch FROM (cm.timestamp - tu.timestamp))), cm.timestamp DESC |
| 98 | + LIMIT 1 |
| 99 | + ) |
| 100 | + WHERE tu.model_id IS NULL |
| 101 | + AND tu.agent_id IS NOT NULL |
| 102 | + AND tu.source = 'agent_stream' |
| 103 | + AND EXISTS ( |
| 104 | + SELECT 1 |
| 105 | + FROM conversation_messages cm |
| 106 | + JOIN conversations c ON c.id = cm.conversation_id |
| 107 | + WHERE c.agent_id = tu.agent_id |
| 108 | + AND cm.model_id IS NOT NULL |
| 109 | + AND cm.timestamp BETWEEN tu.timestamp - make_interval(mins => :win) |
| 110 | + AND tu.timestamp + make_interval(mins => :win) |
| 111 | + ) |
| 112 | + """ |
| 113 | +) |
| 114 | + |
| 115 | +_COUNT_NULL = text("SELECT count(*) FROM token_usage WHERE model_id IS NULL") |
| 116 | + |
| 117 | + |
| 118 | +def main() -> int: |
| 119 | + parser = argparse.ArgumentParser( |
| 120 | + description="Backfill token_usage.model_id from existing data.", |
| 121 | + ) |
| 122 | + parser.add_argument( |
| 123 | + "--apply", |
| 124 | + action="store_true", |
| 125 | + help="Commit the backfill. Default is a rolled-back dry-run.", |
| 126 | + ) |
| 127 | + parser.add_argument( |
| 128 | + "--window-minutes", |
| 129 | + type=int, |
| 130 | + default=5, |
| 131 | + metavar="N", |
| 132 | + help="Tier-2 nearest-message match window, in minutes (default 5).", |
| 133 | + ) |
| 134 | + args = parser.parse_args() |
| 135 | + |
| 136 | + if args.window_minutes < 0: |
| 137 | + print("--window-minutes must be >= 0", file=sys.stderr) |
| 138 | + return 1 |
| 139 | + |
| 140 | + engine = get_engine() |
| 141 | + with engine.connect() as conn: |
| 142 | + trans = conn.begin() |
| 143 | + try: |
| 144 | + # A one-shot maintenance UPDATE can run well past the engine's |
| 145 | + # 30s per-statement guardrail; lift it for this transaction. |
| 146 | + conn.execute(text("SET LOCAL statement_timeout = 0")) |
| 147 | + |
| 148 | + before = conn.execute(_COUNT_NULL).scalar_one() |
| 149 | + |
| 150 | + t1 = conn.execute(_TIER1).rowcount or 0 |
| 151 | + t2 = conn.execute(_TIER2, {"win": args.window_minutes}).rowcount or 0 |
| 152 | + |
| 153 | + after = conn.execute(_COUNT_NULL).scalar_one() |
| 154 | + |
| 155 | + print(f"NULL model_id rows before: {before}") |
| 156 | + print(f" tier 1 (request_id): {t1}") |
| 157 | + print(f" tier 2 (agent + nearest msg): {t2}") |
| 158 | + print(f"NULL model_id rows remaining: {after}") |
| 159 | + |
| 160 | + if args.apply: |
| 161 | + trans.commit() |
| 162 | + print("\nCommitted.") |
| 163 | + else: |
| 164 | + trans.rollback() |
| 165 | + print("\nDry run — rolled back. Re-run with --apply to commit.") |
| 166 | + except Exception: |
| 167 | + trans.rollback() |
| 168 | + raise |
| 169 | + |
| 170 | + return 0 |
| 171 | + |
| 172 | + |
| 173 | +if __name__ == "__main__": |
| 174 | + sys.exit(main()) |
0 commit comments