|
| 1 | +"""Pure aggregation helpers for the gRPC long-session bench. |
| 2 | +
|
| 3 | +The bench script under ``scripts/bench_agentic/bench_session_long_run.py`` |
| 4 | +walks one gRPC session through many turns, recording per-turn |
| 5 | +metrics: latency, KV bytes, history length, error / success. After |
| 6 | +the run it calls :func:`aggregate_run` here to compute the headline |
| 7 | +KPIs: |
| 8 | +
|
| 9 | + * ``kv_bounded`` — does ``kv_live_bytes`` stay under a tight band |
| 10 | + across all turns? (ADR 0006 §2.3.a, ADR 0008 §7 G2.) |
| 11 | + * ``prefill_bounded`` — does per-turn latency stay flat as the |
| 12 | + history grows? (ADR 0008 §7 G2 prefill claim, the v0.3 GA gate |
| 13 | + that was a non-claim on the deprecated HTTP shim.) |
| 14 | + * Latency p50/p95, KV min/mean/max, n_turns, n_errors. |
| 15 | +
|
| 16 | +Splitting this out of the CLI script means the aggregation logic is |
| 17 | +fully unit-testable and the script itself stays focused on IO. The |
| 18 | +script also computes a 10-minute bucket breakdown for visual sanity- |
| 19 | +check on long runs (4h+); that bucketing logic lives here too. |
| 20 | +""" |
| 21 | + |
| 22 | +from __future__ import annotations |
| 23 | + |
| 24 | +import statistics |
| 25 | +from typing import Any, Dict, List, Optional |
| 26 | + |
| 27 | + |
| 28 | +# --------------------------------------------------------------------------- |
| 29 | +# Aggregation |
| 30 | +# --------------------------------------------------------------------------- |
| 31 | + |
| 32 | + |
| 33 | +def _percentile(values: List[float], pct: float) -> Optional[float]: |
| 34 | + """Linear-interpolated percentile, ``None`` if input is empty. |
| 35 | +
|
| 36 | + Implemented locally instead of pulling in ``numpy`` so the bench |
| 37 | + has no scientific-stack dependency. |
| 38 | + """ |
| 39 | + if not values: |
| 40 | + return None |
| 41 | + if not 0.0 <= pct <= 1.0: |
| 42 | + raise ValueError(f"pct must be in [0, 1], got {pct}") |
| 43 | + sorted_values = sorted(values) |
| 44 | + if len(sorted_values) == 1: |
| 45 | + return float(sorted_values[0]) |
| 46 | + rank = pct * (len(sorted_values) - 1) |
| 47 | + lo = int(rank) |
| 48 | + hi = min(lo + 1, len(sorted_values) - 1) |
| 49 | + frac = rank - lo |
| 50 | + return float(sorted_values[lo] + (sorted_values[hi] - sorted_values[lo]) * frac) |
| 51 | + |
| 52 | + |
| 53 | +def _kv_bounded(kv_values: List[int], *, tolerance: float = 0.10) -> Optional[bool]: |
| 54 | + """Returns ``True`` iff the KV-bytes series stays within |
| 55 | + ``tolerance`` (default 10%) of its minimum across every turn. |
| 56 | +
|
| 57 | + Returns ``None`` when there are not enough successful turns to |
| 58 | + answer (≤1 sample). The tolerance is a relative band — if the |
| 59 | + minimum is 0 we treat that as a pathologically small denominator |
| 60 | + and use ``max(min, 1)`` to avoid div-by-zero, the same convention |
| 61 | + ``bench_long_session.py`` uses. |
| 62 | + """ |
| 63 | + if len(kv_values) <= 1: |
| 64 | + return None |
| 65 | + lo = min(kv_values) |
| 66 | + hi = max(kv_values) |
| 67 | + return (hi - lo) / max(lo, 1) < tolerance |
| 68 | + |
| 69 | + |
| 70 | +def _prefill_bounded( |
| 71 | + latencies: List[float], |
| 72 | + *, |
| 73 | + head_window: int = 5, |
| 74 | + tail_window: int = 5, |
| 75 | + drift_threshold_s: float = 5.0, |
| 76 | +) -> Optional[bool]: |
| 77 | + """Returns ``True`` iff median per-turn latency on the LAST |
| 78 | + ``tail_window`` turns is within ``drift_threshold_s`` seconds of |
| 79 | + the median on the FIRST ``head_window`` turns. |
| 80 | +
|
| 81 | + This is the prefill-bounded contract: a healthy session-bound |
| 82 | + runtime processes only the new user message per turn, so latency |
| 83 | + should not grow with conversation length. On the deprecated HTTP |
| 84 | + shim, by contrast, every turn re-prefills the full history and |
| 85 | + latency grows linearly — that's the failure mode this metric |
| 86 | + catches. |
| 87 | +
|
| 88 | + ``None`` when the run is too short to bracket head and tail |
| 89 | + windows without overlap. |
| 90 | + """ |
| 91 | + if len(latencies) < head_window + tail_window: |
| 92 | + return None |
| 93 | + head = latencies[:head_window] |
| 94 | + tail = latencies[-tail_window:] |
| 95 | + head_p50 = statistics.median(head) |
| 96 | + tail_p50 = statistics.median(tail) |
| 97 | + return (tail_p50 - head_p50) <= drift_threshold_s |
| 98 | + |
| 99 | + |
| 100 | +def _latency_drift_p50_s( |
| 101 | + latencies: List[float], |
| 102 | + *, |
| 103 | + head_window: int = 5, |
| 104 | + tail_window: int = 5, |
| 105 | +) -> Optional[float]: |
| 106 | + """Drift in seconds between head-window p50 and tail-window p50. |
| 107 | +
|
| 108 | + Positive = latency grew over the run. Returns ``None`` for |
| 109 | + runs too short to bracket head and tail without overlap. |
| 110 | + """ |
| 111 | + if len(latencies) < head_window + tail_window: |
| 112 | + return None |
| 113 | + head = latencies[:head_window] |
| 114 | + tail = latencies[-tail_window:] |
| 115 | + return float(statistics.median(tail) - statistics.median(head)) |
| 116 | + |
| 117 | + |
| 118 | +def _bucketize_10min(turns: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| 119 | + """Partition successful turns by their wall-clock bucket |
| 120 | + (10-minute granularity, indexed from 0). Each bucket reports |
| 121 | + ``n_turns``, p50/p95 latency, and mean kv_live_bytes — gives a |
| 122 | + visual sanity check of latency / memory drift across a long run. |
| 123 | +
|
| 124 | + Empty input or all-error input returns an empty list. |
| 125 | + """ |
| 126 | + buckets: Dict[int, List[Dict[str, Any]]] = {} |
| 127 | + for t in turns: |
| 128 | + if not t.get("ok"): |
| 129 | + continue |
| 130 | + bucket_idx = int(t["t_relative_s"] // 600) |
| 131 | + buckets.setdefault(bucket_idx, []).append(t) |
| 132 | + |
| 133 | + out: List[Dict[str, Any]] = [] |
| 134 | + for idx in sorted(buckets): |
| 135 | + items = buckets[idx] |
| 136 | + latencies = [float(t["latency_s"]) for t in items] |
| 137 | + kv_values = [ |
| 138 | + int(t["kv_live_bytes"]) for t in items |
| 139 | + if t.get("kv_live_bytes") is not None |
| 140 | + ] |
| 141 | + out.append( |
| 142 | + { |
| 143 | + "bucket_index": idx, |
| 144 | + "n_turns": len(items), |
| 145 | + "p50_latency_s": _percentile(latencies, 0.50), |
| 146 | + "p95_latency_s": _percentile(latencies, 0.95), |
| 147 | + "mean_kv_live_bytes": ( |
| 148 | + statistics.mean(kv_values) if kv_values else None |
| 149 | + ), |
| 150 | + } |
| 151 | + ) |
| 152 | + return out |
| 153 | + |
| 154 | + |
| 155 | +def aggregate_run( |
| 156 | + turns: List[Dict[str, Any]], |
| 157 | + *, |
| 158 | + duration_s: float, |
| 159 | + kv_tolerance: float = 0.10, |
| 160 | + drift_head_window: int = 5, |
| 161 | + drift_tail_window: int = 5, |
| 162 | + drift_threshold_s: float = 5.0, |
| 163 | +) -> Dict[str, Any]: |
| 164 | + """Build the aggregate report from a list of per-turn records. |
| 165 | +
|
| 166 | + Each turn dict must carry at least: |
| 167 | + * ``ok`` — bool |
| 168 | + * ``t_relative_s`` — float, seconds since run start |
| 169 | + * ``latency_s`` — float (only if ``ok``) |
| 170 | + * ``kv_live_bytes`` — int or ``None`` (only if ``ok``) |
| 171 | +
|
| 172 | + Returns a dict with the headline KPIs ADR 0006 §2.3.a / ADR 0008 |
| 173 | + §7 G2 speak to: ``kv_bounded``, ``prefill_bounded``, latency |
| 174 | + p50/p95, kv min/mean/max, error count, 10-minute bucket break- |
| 175 | + down. |
| 176 | + """ |
| 177 | + successes = [t for t in turns if t.get("ok")] |
| 178 | + errors = [t for t in turns if not t.get("ok")] |
| 179 | + |
| 180 | + latencies = [float(t["latency_s"]) for t in successes] |
| 181 | + kv_values = [ |
| 182 | + int(t["kv_live_bytes"]) for t in successes |
| 183 | + if t.get("kv_live_bytes") is not None |
| 184 | + ] |
| 185 | + |
| 186 | + return { |
| 187 | + "n_turns": len(successes), |
| 188 | + "n_errors": len(errors), |
| 189 | + "duration_s": float(duration_s), |
| 190 | + "p50_latency_s": _percentile(latencies, 0.50), |
| 191 | + "p95_latency_s": _percentile(latencies, 0.95), |
| 192 | + "min_kv_live_bytes": min(kv_values) if kv_values else None, |
| 193 | + "mean_kv_live_bytes": ( |
| 194 | + statistics.mean(kv_values) if kv_values else None |
| 195 | + ), |
| 196 | + "max_kv_live_bytes": max(kv_values) if kv_values else None, |
| 197 | + "kv_bounded": _kv_bounded(kv_values, tolerance=kv_tolerance), |
| 198 | + "prefill_bounded": _prefill_bounded( |
| 199 | + latencies, |
| 200 | + head_window=drift_head_window, |
| 201 | + tail_window=drift_tail_window, |
| 202 | + drift_threshold_s=drift_threshold_s, |
| 203 | + ), |
| 204 | + "latency_drift_p50_s": _latency_drift_p50_s( |
| 205 | + latencies, |
| 206 | + head_window=drift_head_window, |
| 207 | + tail_window=drift_tail_window, |
| 208 | + ), |
| 209 | + "buckets_10min": _bucketize_10min(turns), |
| 210 | + } |
0 commit comments