11"""Aggregate measured GPU power from a vendor SMI CSV into the agg result JSON.
22
3- Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi),
4- filters samples to the benchmark load window using start/end Unix timestamps
5- written by benchmark_serving.py, and patches two keys into the aggregated
6- result JSON consumed by InferenceX-app's ETL:
3+ Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi)
4+ or by srt-slurm's per-node perfmon (multinode), filters samples to the benchmark
5+ load window using start/end Unix timestamps written by benchmark_serving.py, and
6+ patches three keys into the aggregated result JSON consumed by InferenceX-app's
7+ ETL:
78
89 - avg_power_w: mean per-GPU power draw (W) during the load window
910 - joules_per_output_token: (avg_power_w * num_gpus * duration_s) / total_output_tokens
11+ - joules_per_total_token: same, divided by (input + output) tokens
12+
13+ Multinode: accepts multiple CSV paths (one per worker node). GPU indices are
14+ namespaced by source CSV stem to avoid the same-index collision across nodes —
15+ e.g. 8 nodes each reporting indices 0..3 would otherwise be miscounted as 4
16+ total GPUs instead of 32.
1017
1118The ETL (`packages/db/src/etl/benchmark-mapper.ts`) auto-captures any numeric
1219field in the agg JSON into the `metrics` JSONB column, so no schema migration
1320is required.
1421
1522Vendor schema detection is regex-based: any timestamp-like column + any column
1623whose name contains "power" (excluding "limit"/"cap"/"max") is picked up.
17- NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version. Both are
18- handled.
24+ NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version; srt-slurm's
25+ perfmon emits "power_w". All are handled.
1926
2027This script is best-effort. Missing or malformed CSV exits 0 without patching
2128so a monitoring hiccup never breaks the benchmark upload.
2532
2633import argparse
2734import csv
35+ import glob as glob_module
2836import json
2937import re
3038import sys
39+ from collections .abc import Iterable
3140from datetime import datetime , timezone
3241from pathlib import Path
3342from statistics import mean
@@ -109,74 +118,84 @@ def _detect_columns(header: list[str]) -> tuple[str | None, str | None, str | No
109118
110119
111120def aggregate_power (
112- csv_path : Path ,
121+ csv_path : Path | Iterable [ Path ] ,
113122 start_unix : float ,
114123 end_unix : float ,
115124) -> tuple [float , int ] | None :
116125 """Return (per_gpu_avg_power_w, num_gpus) for samples in [start, end].
117126
118- Returns None if the CSV is missing, empty, has no detectable power column,
119- or no rows fall in the window.
127+ Accepts either a single Path (single-node case) or an iterable of Paths
128+ (multinode case: one CSV per worker node, all written by srt-slurm's
129+ perfmon). For multi-path inputs, GPU indices are namespaced by source
130+ CSV stem so the distinct-id count reflects the true total — each node
131+ independently reports indices 0..N, and without namespacing the union
132+ would collapse to a single node's worth.
133+
134+ Returns None if no CSVs are usable, none have a detectable power column,
135+ or no rows fall in the window across all paths.
120136 """
121- if not csv_path .is_file () or csv_path .stat ().st_size == 0 :
122- return None
123- if end_unix <= start_unix :
137+ paths = [csv_path ] if isinstance (csv_path , Path ) else list (csv_path )
138+ if not paths or end_unix <= start_unix :
124139 return None
125140
126- try :
127- with csv_path .open ("r" , newline = "" , encoding = "utf-8" , errors = "replace" ) as f :
128- reader = csv .DictReader (f , skipinitialspace = True )
129- header = [c .strip () for c in (reader .fieldnames or [])]
130- reader .fieldnames = header
131- timestamp_col , power_col , gpu_col = _detect_columns (header )
132- if not timestamp_col or not power_col :
133- return None
134-
135- # Group power readings by sample timestamp so per-sample total power
136- # (sum across GPUs) is computed correctly even if rows are interleaved.
137- #
138- # per_sample_row_count is the structural divisor: it's incremented for
139- # every contributing row regardless of whether a GPU-index column was
140- # detected. per_sample_gpus / gpu_keys are only populated when gpu_col
141- # is present and provide the canonical num_gpus via distinct-id count.
142- # When gpu_col is absent (vendor schema variant whose header doesn't
143- # match _GPU_INDEX_COL_RE), we fall back to inferring num_gpus from
144- # the modal row count per timestamp — assuming one row per GPU per
145- # sample, which is what every SMI tool we've seen actually emits.
146- per_sample_total : dict [float , float ] = {}
147- per_sample_row_count : dict [float , int ] = {}
148- per_sample_gpus : dict [float , set [str ]] = {}
149- gpu_keys : set [str ] = set ()
150-
151- for row in reader :
152- ts_raw = (row .get (timestamp_col ) or "" ).strip ()
153- pw_raw = (row .get (power_col ) or "" ).strip ()
154- ts = _parse_timestamp (ts_raw )
155- pw = _parse_power (pw_raw )
156- if ts is None or pw is None :
157- continue
158- if ts < start_unix or ts > end_unix :
141+ # Only namespace when there are multiple sources — keeps single-node
142+ # gpu_keys identical to the pre-multinode behavior so existing callers
143+ # see the same num_gpus values.
144+ namespace = len (paths ) > 1
145+
146+ # Per-sample state accumulates across ALL paths. Bucketed by ms-rounded
147+ # timestamp so nodes whose clocks drift sub-ms still end up in the same
148+ # bucket (they reliably do — all sample on `time.sleep(interval)` against
149+ # the same NTP-synced cluster clock).
150+ per_sample_total : dict [float , float ] = {}
151+ per_sample_row_count : dict [float , int ] = {}
152+ per_sample_gpus : dict [float , set [str ]] = {}
153+ gpu_keys : set [str ] = set ()
154+ saw_gpu_col = False
155+
156+ for path in paths :
157+ if not path .is_file () or path .stat ().st_size == 0 :
158+ continue
159+ try :
160+ with path .open ("r" , newline = "" , encoding = "utf-8" , errors = "replace" ) as f :
161+ reader = csv .DictReader (f , skipinitialspace = True )
162+ header = [c .strip () for c in (reader .fieldnames or [])]
163+ reader .fieldnames = header
164+ timestamp_col , power_col , gpu_col = _detect_columns (header )
165+ if not timestamp_col or not power_col :
159166 continue
160- # Bucket by sample timestamp (rounded to ms to absorb sub-ms drift).
161- bucket = round (ts , 3 )
162- per_sample_total [bucket ] = per_sample_total .get (bucket , 0.0 ) + pw
163- per_sample_row_count [bucket ] = per_sample_row_count .get (bucket , 0 ) + 1
164167 if gpu_col :
165- gpu_id = (row .get (gpu_col ) or "" ).strip ()
166- if gpu_id :
167- per_sample_gpus .setdefault (bucket , set ()).add (gpu_id )
168- gpu_keys .add (gpu_id )
169- except (OSError , csv .Error ):
170- return None
168+ saw_gpu_col = True
169+
170+ for row in reader :
171+ ts_raw = (row .get (timestamp_col ) or "" ).strip ()
172+ pw_raw = (row .get (power_col ) or "" ).strip ()
173+ ts = _parse_timestamp (ts_raw )
174+ pw = _parse_power (pw_raw )
175+ if ts is None or pw is None :
176+ continue
177+ if ts < start_unix or ts > end_unix :
178+ continue
179+ bucket = round (ts , 3 )
180+ per_sample_total [bucket ] = per_sample_total .get (bucket , 0.0 ) + pw
181+ per_sample_row_count [bucket ] = per_sample_row_count .get (bucket , 0 ) + 1
182+ if gpu_col :
183+ gpu_id = (row .get (gpu_col ) or "" ).strip ()
184+ if gpu_id :
185+ ns_id = f"{ path .stem } :{ gpu_id } " if namespace else gpu_id
186+ per_sample_gpus .setdefault (bucket , set ()).add (ns_id )
187+ gpu_keys .add (ns_id )
188+ except (OSError , csv .Error ):
189+ continue
171190
172191 if not per_sample_total :
173192 return None
174193
175194 # Per-sample divisor and overall num_gpus.
176- # - If a GPU column was detected , trust distinct GPU IDs (correct for any
177- # sampling pattern, including hot-swap or partial visibility).
178- # - Otherwise, infer from row count (one row per GPU per sample ).
179- if gpu_col and gpu_keys :
195+ # - If any path exposed a GPU column, trust distinct (namespaced) GPU IDs.
196+ # - Otherwise, infer from row count (one row per GPU per sample, summed
197+ # across all paths' rows that fell into the same timestamp bucket ).
198+ if saw_gpu_col and gpu_keys :
180199 num_gpus = len (gpu_keys )
181200 per_sample_mean_per_gpu = [
182201 total / max (len (per_sample_gpus .get (ts , ())), 1 )
@@ -194,7 +213,16 @@ def _load_bench_window(
194213 bench_result_path : Path ,
195214) -> tuple [float , float , float , int , int ] | None :
196215 """Read (start_unix, end_unix, duration_s, total_output_tokens, total_input_tokens)
197- from the raw bench JSON. Returns None if any required field is missing.
216+ from the raw bench JSON. Returns None if a window cannot be resolved.
217+
218+ Window resolution order, tried in turn:
219+ 1. benchmark_start_time_unix + benchmark_end_time_unix (our benchmark_serving.py
220+ writes both — single-node, brackets the actual load window exactly).
221+ 2. date + duration (srt-slurm sa-bench writes "YYYYMMDD-HHMMSS" UTC as the
222+ result write time — multinode; treat as bench end and subtract duration
223+ for start. Overshoots by post-bench JSON serialization, typically <5s).
224+ 3. file mtime + duration (last resort if `date` is absent or unparseable —
225+ same end-of-bench proxy as #2 via the result file's mtime).
198226
199227 total_input_tokens defaults to 0 if absent (older bench JSONs may not have it);
200228 this only degrades joules_per_total_token to equal joules_per_output_token in
@@ -204,18 +232,52 @@ def _load_bench_window(
204232 bench = json .loads (bench_result_path .read_text (encoding = "utf-8" ))
205233 except (OSError , json .JSONDecodeError ):
206234 return None
207- start = bench .get ("benchmark_start_time_unix" )
208- end = bench .get ("benchmark_end_time_unix" )
209235 duration = bench .get ("duration" )
210236 total_output = bench .get ("total_output_tokens" )
211237 total_input = bench .get ("total_input_tokens" , 0 )
212- if not all ( isinstance (v , (int , float )) for v in ( start , end , duration )):
238+ if not isinstance (duration , (int , float )):
213239 return None
214240 if not isinstance (total_output , int ) or total_output <= 0 :
215241 return None
216242 if not isinstance (total_input , int ) or total_input < 0 :
217243 total_input = 0
218- return float (start ), float (end ), float (duration ), int (total_output ), int (total_input )
244+
245+ # Tier 1: explicit Unix timestamps (single-node bench_serving.py).
246+ start = bench .get ("benchmark_start_time_unix" )
247+ end = bench .get ("benchmark_end_time_unix" )
248+ if isinstance (start , (int , float )) and isinstance (end , (int , float )):
249+ return float (start ), float (end ), float (duration ), int (total_output ), int (total_input )
250+
251+ # Tier 2: parse `date` field (srt-slurm sa-bench multinode). On observed
252+ # runs the string matches file mtime to the second, confirming it's the
253+ # JSON write time.
254+ date_str = bench .get ("date" )
255+ if isinstance (date_str , str ):
256+ try :
257+ end_dt = datetime .strptime (date_str , "%Y%m%d-%H%M%S" ).replace (tzinfo = timezone .utc )
258+ end_unix = end_dt .timestamp ()
259+ return (
260+ float (end_unix - duration ),
261+ float (end_unix ),
262+ float (duration ),
263+ int (total_output ),
264+ int (total_input ),
265+ )
266+ except ValueError :
267+ pass
268+
269+ # Tier 3: file mtime as last-resort bench-end proxy.
270+ try :
271+ end_unix = bench_result_path .stat ().st_mtime
272+ except OSError :
273+ return None
274+ return (
275+ float (end_unix - duration ),
276+ float (end_unix ),
277+ float (duration ),
278+ int (total_output ),
279+ int (total_input ),
280+ )
219281
220282
221283def patch_agg_result (
@@ -234,7 +296,7 @@ def patch_agg_result(
234296 tmp_path .replace (agg_path )
235297
236298
237- def run (csv_path : Path , bench_result : Path , agg_result : Path ) -> int :
299+ def run (csv_path : Path | Iterable [ Path ] , bench_result : Path , agg_result : Path ) -> int :
238300 window = _load_bench_window (bench_result )
239301 if window is None :
240302 print (
@@ -244,10 +306,12 @@ def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int:
244306 return 0
245307 start , end , duration , total_output , total_input = window
246308
247- result = aggregate_power (csv_path , start , end )
309+ paths = [csv_path ] if isinstance (csv_path , Path ) else list (csv_path )
310+ result = aggregate_power (paths , start , end )
248311 if result is None :
312+ label = str (paths [0 ]) if len (paths ) == 1 else f"{ len (paths )} CSVs"
249313 print (
250- f"[aggregate_power] No usable power samples in { csv_path } for "
314+ f"[aggregate_power] No usable power samples in { label } for "
251315 f"window [{ start } , { end } ] — skipping" ,
252316 file = sys .stderr ,
253317 )
@@ -291,11 +355,20 @@ def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int:
291355
292356def main () -> int :
293357 parser = argparse .ArgumentParser (description = __doc__ .splitlines ()[0 ])
294- parser .add_argument (
358+ source = parser .add_mutually_exclusive_group ()
359+ source .add_argument (
295360 "--csv" ,
296361 type = Path ,
297- default = Path ("/workspace/gpu_metrics.csv" ),
298- help = "Path to gpu_metrics.csv from start_gpu_monitor (default: /workspace/gpu_metrics.csv)" ,
362+ default = None ,
363+ help = "Single gpu_metrics.csv from start_gpu_monitor (single-node). "
364+ "Falls back to /workspace/gpu_metrics.csv when neither --csv nor --csv-glob is set." ,
365+ )
366+ source .add_argument (
367+ "--csv-glob" ,
368+ type = str ,
369+ default = None ,
370+ help = "Shell glob expanding to per-node perf_samples_*.csv files (multinode, "
371+ "written by srt-slurm's perfmon). GPU indices are namespaced by source CSV stem." ,
299372 )
300373 parser .add_argument (
301374 "--bench-result" ,
@@ -310,7 +383,17 @@ def main() -> int:
310383 help = "Path to the agg_<run>.json output of process_result.py (will be patched in place)" ,
311384 )
312385 args = parser .parse_args ()
313- return run (args .csv , args .bench_result , args .agg_result )
386+
387+ if args .csv_glob :
388+ paths = sorted (Path (p ) for p in glob_module .glob (args .csv_glob ))
389+ if not paths :
390+ print (
391+ f"[aggregate_power] No CSVs matched glob { args .csv_glob !r} — skipping" ,
392+ file = sys .stderr ,
393+ )
394+ return 0
395+ return run (paths , args .bench_result , args .agg_result )
396+ return run (args .csv or Path ("/workspace/gpu_metrics.csv" ), args .bench_result , args .agg_result )
314397
315398
316399if __name__ == "__main__" :
0 commit comments