|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""r8_analyze.py — primary analysis chart (Solarized Dark). |
| 3 | +
|
| 4 | +6-panel per-system overlay (7 systems as colors): |
| 5 | + 1) throughput (events consumed / s, 60s rolling mean) |
| 6 | + 2) dead_tup on queue tables (bloat) |
| 7 | + 3) CPU user+sys % |
| 8 | + 4) NVMe write MiB/s |
| 9 | + 5) backlog (producer_total − consumer_total_at_time_t) over time |
| 10 | + 6) delivery-lag p99 (ms, clipped at 5s, LINEAR scale) |
| 11 | +
|
| 12 | +LINEAR y-axes everywhere. No log/symlog. |
| 13 | +
|
| 14 | +Inputs: /tmp/bench_full/<sys>/{events_consumed_per_sec.csv,bloat.csv, |
| 15 | + sys_metrics.csv,events_consumed_summary.txt, |
| 16 | + producer.log} |
| 17 | +Output: /tmp/bench_main_chart.png + /tmp/bench_summary.json + /tmp/bench_table.md |
| 18 | +""" |
| 19 | +from __future__ import annotations |
| 20 | +import csv, json, re, sys |
| 21 | +from pathlib import Path |
| 22 | +from collections import defaultdict |
| 23 | +from datetime import datetime, timezone |
| 24 | + |
| 25 | +import matplotlib |
| 26 | +matplotlib.use("Agg") |
| 27 | +import matplotlib.pyplot as plt |
| 28 | +import numpy as np |
| 29 | +from matplotlib.ticker import FuncFormatter |
| 30 | + |
| 31 | +SYSTEMS = ["pgque", "pgq", "pgmq", "pgmq-partitioned", "river", "que", "pgboss"] |
| 32 | + |
| 33 | +# Solarized Dark palette |
| 34 | +BG = "#002b36"; SURF = "#073642" |
| 35 | +FG = "#839496"; FG_EMPH = "#93a1a1"; FG_DIM = "#586e75" |
| 36 | +ALERT = "#dc322f" |
| 37 | + |
| 38 | +# Per-system accent colors (Solarized-coherent) |
| 39 | +COLORS = { |
| 40 | + "pgque": "#268bd2", # blue (hero) |
| 41 | + "pgq": "#2aa198", # cyan |
| 42 | + "pgmq": "#cb4b16", # orange |
| 43 | + "pgmq-partitioned": "#dc322f", # red |
| 44 | + "river": "#b58900", # yellow |
| 45 | + "que": "#6c71c4", # violet |
| 46 | + "pgboss": "#859900", # green |
| 47 | +} |
| 48 | + |
| 49 | +TX_START_MIN, TX_END_MIN = 30, 90 |
| 50 | +TOTAL_MIN = 120 |
| 51 | +LAG_CLIP_MS = 5000 # clip p99 to 5s (linear scale) |
| 52 | + |
| 53 | + |
| 54 | +def parse_ts(s): |
| 55 | + if not s: return None |
| 56 | + s2 = s.strip().replace(" ", "T").replace("Z", "+00:00") |
| 57 | + m = re.search(r"([+-])(\d{2})$", s2) |
| 58 | + if m: |
| 59 | + s2 = s2[:m.start()] + m.group(1) + m.group(2) + ":00" |
| 60 | + try: |
| 61 | + dt = datetime.fromisoformat(s2) |
| 62 | + if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) |
| 63 | + return dt |
| 64 | + except Exception: |
| 65 | + return None |
| 66 | + |
| 67 | + |
| 68 | +def read_csv(p: Path): |
| 69 | + if not p.is_file(): return [] |
| 70 | + with p.open() as f: |
| 71 | + return list(csv.DictReader(f)) |
| 72 | + |
| 73 | + |
| 74 | +def load_events(d: Path): |
| 75 | + """events_consumed_per_sec.csv → (minutes[], ev/s[], p99_lag_ms[], cumulative_consumed[])""" |
| 76 | + rows = read_csv(d / "events_consumed_per_sec.csv") |
| 77 | + if not rows: return [], [], [], [] |
| 78 | + xs, ev, p99, cum = [], [], [], [] |
| 79 | + c = 0 |
| 80 | + for r in rows: |
| 81 | + try: |
| 82 | + s = int(r["second_since_start"]) |
| 83 | + n = int(r["events_consumed"]) |
| 84 | + p = int(r.get("p99_lag_ms", "0") or 0) |
| 85 | + except Exception: |
| 86 | + continue |
| 87 | + c += n |
| 88 | + xs.append(s / 60.0); ev.append(n); p99.append(p); cum.append(c) |
| 89 | + return xs, ev, p99, cum |
| 90 | + |
| 91 | + |
| 92 | +def smooth(ys, window=30): |
| 93 | + if not ys: return ys |
| 94 | + out = [] |
| 95 | + w = window |
| 96 | + for i in range(len(ys)): |
| 97 | + a = max(0, i - w); b = min(len(ys), i + w + 1) |
| 98 | + out.append(sum(ys[a:b]) / (b - a)) |
| 99 | + return out |
| 100 | + |
| 101 | + |
| 102 | +def load_bloat(d: Path): |
| 103 | + rows = read_csv(d / "bloat.csv") |
| 104 | + if not rows: return [], [], [] |
| 105 | + by_ts = defaultdict(lambda: {"dead": 0, "live": 0}) |
| 106 | + for r in rows: |
| 107 | + ts = r.get("ts") or r.get("sample_time") or "" |
| 108 | + try: |
| 109 | + dt = int(r.get("n_dead_tup", "0") or 0) |
| 110 | + lv = int(r.get("n_live_tup", "0") or 0) |
| 111 | + except Exception: |
| 112 | + continue |
| 113 | + by_ts[ts]["dead"] += dt |
| 114 | + by_ts[ts]["live"] += lv |
| 115 | + if not by_ts: return [], [], [] |
| 116 | + ts_sorted = sorted(by_ts.keys()) |
| 117 | + t0 = parse_ts(ts_sorted[0]) |
| 118 | + if t0 is None: return [], [], [] |
| 119 | + xs, dead, live = [], [], [] |
| 120 | + for ts in ts_sorted: |
| 121 | + t = parse_ts(ts) |
| 122 | + if t is None: continue |
| 123 | + xs.append((t - t0).total_seconds() / 60.0) |
| 124 | + dead.append(by_ts[ts]["dead"]) |
| 125 | + live.append(by_ts[ts]["live"]) |
| 126 | + return xs, dead, live |
| 127 | + |
| 128 | + |
| 129 | +def load_sys(d: Path): |
| 130 | + rows = read_csv(d / "sys_metrics.csv") |
| 131 | + if not rows: return [], [], [] |
| 132 | + t0 = parse_ts(rows[0]["ts_iso"]) |
| 133 | + if t0 is None: return [], [], [] |
| 134 | + xs, cpu, wmib = [], [], [] |
| 135 | + for r in rows: |
| 136 | + t = parse_ts(r["ts_iso"]) |
| 137 | + if t is None: continue |
| 138 | + xs.append((t - t0).total_seconds() / 60.0) |
| 139 | + try: |
| 140 | + cpu.append(float(r["cpu_user_pct"]) + float(r["cpu_system_pct"])) |
| 141 | + wmib.append(float(r.get("disk_write_mib_s", "0") or 0)) |
| 142 | + except Exception: |
| 143 | + cpu.append(0); wmib.append(0) |
| 144 | + return xs, cpu, wmib |
| 145 | + |
| 146 | + |
| 147 | +def load_summary(d: Path): |
| 148 | + p = d / "events_consumed_summary.txt" |
| 149 | + out = {} |
| 150 | + if not p.is_file(): return out |
| 151 | + with p.open() as f: |
| 152 | + for ln in f: |
| 153 | + if "=" in ln: |
| 154 | + k, v = ln.strip().split("=", 1) |
| 155 | + out[k] = v |
| 156 | + return out |
| 157 | + |
| 158 | + |
| 159 | +def load_producer_total(d: Path): |
| 160 | + """Grep 'number of transactions actually processed' from producer.log.""" |
| 161 | + p = d / "producer.log" |
| 162 | + if not p.is_file(): return 0 |
| 163 | + with p.open() as f: |
| 164 | + for ln in f: |
| 165 | + m = re.search(r"number of transactions actually processed:\s*(\d+)", ln) |
| 166 | + if m: |
| 167 | + return int(m.group(1)) |
| 168 | + return 0 |
| 169 | + |
| 170 | + |
| 171 | +def tx_slice(xs, ys): |
| 172 | + return [y for x, y in zip(xs, ys) if TX_START_MIN <= x <= TX_END_MIN] |
| 173 | + |
| 174 | + |
| 175 | +def mean(xs): return sum(xs) / len(xs) if xs else 0 |
| 176 | + |
| 177 | + |
| 178 | +def fmt_thousands(v, _): |
| 179 | + v = abs(v) |
| 180 | + if v >= 1e6: return f"{v/1e6:.1f}M" |
| 181 | + if v >= 1e3: return f"{v/1e3:.0f}k" |
| 182 | + return f"{v:.0f}" |
| 183 | + |
| 184 | + |
| 185 | +def main(): |
| 186 | + base = Path("/tmp/bench_full") |
| 187 | + |
| 188 | + plt.rcParams.update({ |
| 189 | + 'figure.facecolor': BG, 'axes.facecolor': BG, 'savefig.facecolor': BG, |
| 190 | + 'text.color': FG, 'axes.labelcolor': FG_EMPH, |
| 191 | + 'xtick.color': FG, 'ytick.color': FG, |
| 192 | + 'axes.edgecolor': FG_DIM, |
| 193 | + 'grid.color': SURF, 'grid.linewidth': 0.8, |
| 194 | + 'font.family': ['Helvetica', 'Arial', 'DejaVu Sans'], |
| 195 | + 'font.size': 10, |
| 196 | + }) |
| 197 | + |
| 198 | + fig, axes = plt.subplots(6, 1, figsize=(14, 15), sharex=True, dpi=110) |
| 199 | + titles = [ |
| 200 | + "1) Throughput — events consumed / s (60s rolling mean)", |
| 201 | + "2) Bloat — n_dead_tup on queue tables", |
| 202 | + "3) CPU — user + system %", |
| 203 | + "4) NVMe write — MiB/s", |
| 204 | + "5) Backlog — producer_total minus consumer_cum (events stuck in queue)", |
| 205 | + "6) Delivery-lag p99 — head-of-queue age per batch, ms (clipped 5s, LINEAR)", |
| 206 | + ] |
| 207 | + ylabels = ["ev/s", "dead tuples", "%", "MiB/s", "events", "ms"] |
| 208 | + |
| 209 | + summary = {} |
| 210 | + table_rows = [] |
| 211 | + |
| 212 | + for sys_name in SYSTEMS: |
| 213 | + color = COLORS[sys_name] |
| 214 | + d = base / sys_name |
| 215 | + xs_ev, ev, p99, cum = load_events(d) |
| 216 | + xs_b, dead, live = load_bloat(d) |
| 217 | + xs_s, cpu, wmib = load_sys(d) |
| 218 | + sm = load_summary(d) |
| 219 | + prod_total = load_producer_total(d) |
| 220 | + cons_total = int(sm.get("total_events_consumed", "0") or 0) |
| 221 | + |
| 222 | + ev_s = smooth(ev, window=30) |
| 223 | + p99_s = smooth(p99, window=30) |
| 224 | + p99_clip = [min(v, LAG_CLIP_MS) for v in p99_s] |
| 225 | + |
| 226 | + # backlog at time t = producer_rate_so_far(t) − cumulative_consumed_so_far(t) |
| 227 | + # producer is -R 2000 (constant). We can approximate producer_cum(t) = min(2000*t, prod_total) |
| 228 | + # More accurate: if bench ran full duration, prod_cum = 2000 * (t*60). Use prod_total at end. |
| 229 | + prod_rate = 2000.0 # target rate |
| 230 | + bench_dur_s = 7200 |
| 231 | + backlog = [] |
| 232 | + for t_min, c_cum in zip(xs_ev, cum): |
| 233 | + t_s = t_min * 60 |
| 234 | + p_cum = min(prod_rate * t_s, prod_total) |
| 235 | + backlog.append(max(0, p_cum - c_cum)) |
| 236 | + |
| 237 | + lw = 2.5 if sys_name == "pgque" else 1.5 |
| 238 | + z = 5 if sys_name == "pgque" else 3 |
| 239 | + |
| 240 | + axes[0].plot(xs_ev, ev_s, color=color, lw=lw, zorder=z, label=sys_name) |
| 241 | + axes[1].plot(xs_b, dead, color=color, lw=lw, zorder=z) |
| 242 | + axes[2].plot(xs_s, cpu, color=color, lw=lw, zorder=z) |
| 243 | + axes[3].plot(xs_s, wmib, color=color, lw=lw, zorder=z) |
| 244 | + axes[4].plot(xs_ev, backlog,color=color, lw=lw, zorder=z) |
| 245 | + axes[5].plot(xs_ev, p99_clip, color=color, lw=lw, zorder=z) |
| 246 | + |
| 247 | + tx_ev = tx_slice(xs_ev, ev) |
| 248 | + tx_p99 = tx_slice(xs_ev, p99) |
| 249 | + p50_overall = int(sm.get("overall_lag_p50_ms", "0") or 0) |
| 250 | + p99_overall = int(sm.get("overall_lag_p99_ms", "0") or 0) |
| 251 | + |
| 252 | + true_backlog = max(0, prod_total - cons_total) |
| 253 | + table_rows.append({ |
| 254 | + "system": sys_name, |
| 255 | + "producer_total": prod_total, |
| 256 | + "consumer_total": cons_total, |
| 257 | + "tx_avg_evs": mean(tx_ev), |
| 258 | + "p50_lag_ms": p50_overall, |
| 259 | + "p99_lag_ms": p99_overall, |
| 260 | + "tx_p99_lag_ms": max(tx_p99) if tx_p99 else 0, |
| 261 | + "true_backlog": true_backlog, |
| 262 | + "peak_cpu": max(cpu) if cpu else 0, |
| 263 | + "peak_wmib": max(wmib) if wmib else 0, |
| 264 | + }) |
| 265 | + summary[sys_name] = table_rows[-1] |
| 266 | + |
| 267 | + for ax, t, yl in zip(axes, titles, ylabels): |
| 268 | + ax.set_title(t, fontsize=10, loc='left', color=FG_EMPH) |
| 269 | + ax.axvspan(TX_START_MIN, TX_END_MIN, color=SURF, alpha=0.55, zorder=0) |
| 270 | + for b in (TX_START_MIN, TX_END_MIN): |
| 271 | + ax.axvline(x=b, color=ALERT, lw=0.8, alpha=0.55, zorder=0.5) |
| 272 | + ax.grid(True, alpha=0.35) |
| 273 | + ax.set_axisbelow(True) |
| 274 | + for sp in ("top", "right"): ax.spines[sp].set_visible(False) |
| 275 | + ax.set_ylabel(yl, color=FG_EMPH) |
| 276 | + ax.set_xlim(0, TOTAL_MIN) |
| 277 | + |
| 278 | + # Large-number formatter for panels that need it |
| 279 | + axes[1].yaxis.set_major_formatter(FuncFormatter(fmt_thousands)) |
| 280 | + axes[4].yaxis.set_major_formatter(FuncFormatter(fmt_thousands)) |
| 281 | + axes[0].yaxis.set_major_formatter(FuncFormatter(fmt_thousands)) |
| 282 | + |
| 283 | + axes[-1].set_xticks([0, 15, 30, 45, 60, 75, 90, 105, 120]) |
| 284 | + axes[-1].set_xlabel( |
| 285 | + "minutes since bench start · TX phase (held xmin) shaded 30-90m", |
| 286 | + color=FG_EMPH) |
| 287 | + |
| 288 | + # Legend at top; 7 systems in one row |
| 289 | + axes[0].legend(loc="upper center", bbox_to_anchor=(0.5, 1.55), ncol=7, |
| 290 | + fontsize=9, frameon=False) |
| 291 | + |
| 292 | + fig.suptitle( |
| 293 | + "7 Postgres queue systems · 2h (30m clean + 60m held-xmin + 30m recovery) · R=2000/s", |
| 294 | + y=0.995, color=FG_EMPH, fontsize=13, fontweight='bold') |
| 295 | + fig.tight_layout(rect=[0, 0, 1, 0.96]) |
| 296 | + fig.savefig("/tmp/bench_main_chart.png", dpi=110, bbox_inches="tight", facecolor=BG) |
| 297 | + |
| 298 | + with open("/tmp/bench_summary.json", "w") as f: |
| 299 | + json.dump(summary, f, indent=2, default=str) |
| 300 | + |
| 301 | + with open("/tmp/bench_table.md", "w") as f: |
| 302 | + f.write("| system | producer total | consumer total | TX-avg ev/s | p50 lag ms | p99 lag ms | TX p99 lag ms | true backlog | peak CPU % | peak NVMe write MiB/s |\n") |
| 303 | + f.write("|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|\n") |
| 304 | + for r in table_rows: |
| 305 | + f.write( |
| 306 | + f"| {r['system']} | {r['producer_total']:,} | {r['consumer_total']:,} | " |
| 307 | + f"{r['tx_avg_evs']:.0f} | {r['p50_lag_ms']} | {r['p99_lag_ms']} | " |
| 308 | + f"{r['tx_p99_lag_ms']} | {r['true_backlog']:,} | " |
| 309 | + f"{r['peak_cpu']:.1f} | {r['peak_wmib']:.1f} |\n") |
| 310 | + |
| 311 | + out = Path("/tmp/bench_main_chart.png") |
| 312 | + print(f"wrote {out} ({out.stat().st_size/1024:.0f} KiB) + /tmp/bench_summary.json + /tmp/bench_table.md") |
| 313 | + |
| 314 | + |
| 315 | +if __name__ == "__main__": |
| 316 | + main() |
0 commit comments