Skip to content

Commit 8ddd99b

Browse files
author
Aryan
committed
feat(monitor): per-node nvidia-smi GPU performance monitoring
Squashed rebase of NVIDIA/srt-slurm PR NVIDIA#35 (kdhruv/gweperf_integration) onto current main (which now includes default_bash_preamble, added since PR NVIDIA#35 was opened on 2026-04-13). Original PR NVIDIA#35 had three commits; their net effect is collapsed here to one because the second commit replaced the first's gweperf integration with a built-in poller. Adds: - src/srtctl/monitor/perfmon.py (new) - nvidia-smi polling, per-node perf_samples_<node>.csv + perf_summary_<node>.json output. - MonitoringConfig in src/srtctl/core/schema.py (new) - {enabled, sample_interval}, top-level SrtConfig field. - _start_perf_monitor / _stop_perf_monitor in BenchmarkStageMixin (new) - one process per worker node, started before bench, stopped SIGINT with 30s grace. - tests/test_monitoring.py (new) - 19 tests, all passing upstream. Consumed by SemiAnalysisAI/InferenceX#1574 via the pinned ref SemiAnalysisAI/srt-slurm@feat/inferencex-perfmon. Will revert this fork pin to NVIDIA/srt-slurm@main once PR NVIDIA#35 merges upstream.
1 parent 3523798 commit 8ddd99b

4 files changed

Lines changed: 432 additions & 0 deletions

File tree

src/srtctl/cli/mixins/benchmark_stage.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
import logging
1111
import shlex
12+
import signal
13+
import subprocess
1214
import threading
1315
import time
1416
from pathlib import Path
@@ -159,17 +161,95 @@ def run_benchmark(
159161

160162
logger.info("Running %s benchmark", runner.name)
161163

164+
# Start perf monitoring on all worker nodes (non-fatal if it fails)
165+
perf_procs = self._start_perf_monitor()
166+
162167
# Run the benchmark script
163168
benchmark_log = self.runtime.log_dir / "benchmark.out"
164169
exit_code = self._run_benchmark_script(runner, benchmark_log, stop_event)
165170

171+
# Stop monitoring regardless of benchmark outcome
172+
self._stop_perf_monitor(perf_procs)
173+
166174
if exit_code != 0:
167175
logger.error("Benchmark failed with exit code %d", exit_code)
168176
else:
169177
logger.info("Benchmark completed successfully")
170178

171179
return exit_code
172180

181+
def _start_perf_monitor(self) -> list[tuple[str, "subprocess.Popen"]]:
182+
"""Start one perfmon process per worker node.
183+
184+
Failures are non-fatal: a warning is logged and that node is skipped.
185+
186+
Returns:
187+
List of (node, Popen) pairs for processes that started successfully.
188+
"""
189+
m = self.config.monitoring
190+
if m is None or not m.enabled:
191+
return []
192+
193+
worker_nodes = list(self.runtime.nodes.worker)
194+
if not worker_nodes:
195+
logger.warning("No worker nodes to monitor")
196+
return []
197+
198+
perfmon_script = Path(__file__).parent.parent.parent / "monitor" / "perfmon.py"
199+
mounts = dict(self.runtime.container_mounts)
200+
mounts[perfmon_script] = Path("/tmp/srt_perfmon.py")
201+
202+
procs: list[tuple[str, subprocess.Popen]] = []
203+
for node in worker_nodes:
204+
cmd = [
205+
"python3", "/tmp/srt_perfmon.py",
206+
"--output-csv", f"/logs/perf_samples_{node}.csv",
207+
"--output-json", f"/logs/perf_summary_{node}.json",
208+
"--interval", str(m.sample_interval),
209+
]
210+
perf_log = self.runtime.log_dir / f"perf_monitor_{node}.out"
211+
try:
212+
proc = start_srun_process(
213+
command=cmd,
214+
nodelist=[node],
215+
output=str(perf_log),
216+
container_image=str(self.runtime.container_image),
217+
container_mounts=mounts,
218+
)
219+
procs.append((node, proc))
220+
logger.info("perf monitor started on %s (interval=%.1fs)", node, m.sample_interval)
221+
except Exception as e:
222+
logger.warning("Failed to start perf monitor on %s: %s - monitoring skipped for this node", node, e)
223+
224+
return procs
225+
226+
def _stop_perf_monitor(self, procs: list[tuple[str, "subprocess.Popen"]]) -> None:
227+
"""Stop all perfmon processes, allowing each to write its summary JSON.
228+
229+
Sends SIGINT (triggers perfmon's exit handler) and waits up to 30s.
230+
Falls back to SIGKILL if the process does not exit cleanly.
231+
"""
232+
if not procs:
233+
return
234+
235+
logger.info("Stopping perf monitoring on %d node(s)", len(procs))
236+
for node, proc in procs:
237+
if proc.poll() is not None:
238+
logger.warning("perf monitor on %s already exited (code %d)", node, proc.returncode)
239+
continue
240+
try:
241+
proc.send_signal(signal.SIGINT)
242+
except ProcessLookupError:
243+
logger.warning("perf monitor on %s vanished before SIGINT", node)
244+
continue
245+
try:
246+
proc.wait(timeout=30)
247+
logger.info("perf monitor on %s stopped cleanly", node)
248+
except subprocess.TimeoutExpired:
249+
logger.warning("perf monitor on %s did not stop within 30s, killing", node)
250+
proc.kill()
251+
proc.wait()
252+
173253
def _run_benchmark_script(
174254
self,
175255
runner: "BenchmarkRunner",

src/srtctl/core/schema.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,6 +1227,25 @@ class OutputConfig:
12271227
Schema: ClassVar[type[Schema]] = Schema
12281228

12291229

1230+
@dataclass(frozen=True)
1231+
class MonitoringConfig:
1232+
"""Built-in GPU performance monitoring during benchmark execution.
1233+
1234+
When enabled, one perfmon process runs per worker node (excluding the head node)
1235+
and writes per-node output files to the job log directory:
1236+
- perf_samples_{node}.csv per-second time-series (GPU util, memory, power, temp)
1237+
- perf_summary_{node}.json aggregate statistics over the benchmark window
1238+
1239+
Uses nvidia-smi — no external dependencies required.
1240+
Failures are non-fatal: monitoring is skipped for affected nodes, benchmark continues.
1241+
"""
1242+
1243+
enabled: bool = True
1244+
sample_interval: float = 1.0
1245+
1246+
Schema: ClassVar[type[Schema]] = Schema
1247+
1248+
12301249
@dataclass(frozen=True)
12311250
class HealthCheckConfig:
12321251
"""Health check configuration."""
@@ -1307,6 +1326,9 @@ class SrtConfig:
13071326
# Reporting configuration (status API, future: logs to S3, etc.)
13081327
reporting: ReportingConfig | None = None
13091328

1329+
# Built-in GPU performance monitoring (runs on all worker nodes during benchmark)
1330+
monitoring: MonitoringConfig | None = None
1331+
13101332
Schema: ClassVar[type[Schema]] = Schema
13111333

13121334
def __post_init__(self):

src/srtctl/monitor/perfmon.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#!/usr/bin/env python3
2+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
# SPDX-License-Identifier: Apache-2.0
4+
"""Lightweight GPU performance monitor.
5+
6+
Polls nvidia-smi at a fixed interval and writes:
7+
- per-second CSV samples (--output-csv)
8+
- aggregate summary JSON (--output-json, written on SIGINT/exit)
9+
10+
Usage:
11+
python3 perfmon.py --output-csv /logs/perf_samples_node1.csv \\
12+
--output-json /logs/perf_summary_node1.json \\
13+
--interval 1.0
14+
"""
15+
16+
import argparse
17+
import csv
18+
import json
19+
import signal
20+
import subprocess
21+
import time
22+
from datetime import datetime, timezone
23+
from pathlib import Path
24+
25+
_QUERY = "index,utilization.gpu,memory.used,memory.total,power.draw,temperature.gpu"
26+
_FIELDS = ["gpu", "util_pct", "mem_used_mb", "mem_total_mb", "power_w", "temp_c"]
27+
28+
29+
def _sample() -> list[dict]:
30+
try:
31+
out = subprocess.check_output(
32+
["nvidia-smi", f"--query-gpu={_QUERY}", "--format=csv,noheader,nounits"],
33+
text=True,
34+
)
35+
except (FileNotFoundError, subprocess.CalledProcessError):
36+
return []
37+
rows = []
38+
for line in out.strip().splitlines():
39+
parts = [p.strip() for p in line.split(",")]
40+
if len(parts) == len(_FIELDS):
41+
rows.append(dict(zip(_FIELDS, parts)))
42+
return rows
43+
44+
45+
def _summarize(samples: list[dict]) -> dict:
46+
by_gpu: dict[str, list[dict]] = {}
47+
for s in samples:
48+
by_gpu.setdefault(s["gpu"], []).append(s)
49+
50+
summary = {}
51+
for gpu_idx, gpu_samples in by_gpu.items():
52+
53+
def avg(field: str, _s: list[dict] = gpu_samples) -> float | None:
54+
vals = [float(s[field]) for s in _s if s.get(field, "").strip() not in ("", "[N/A]")]
55+
return round(sum(vals) / len(vals), 2) if vals else None
56+
57+
summary[f"gpu_{gpu_idx}"] = {
58+
"samples": len(gpu_samples),
59+
"avg_util_pct": avg("util_pct"),
60+
"avg_mem_used_mb": avg("mem_used_mb"),
61+
"mem_total_mb": avg("mem_total_mb"),
62+
"avg_power_w": avg("power_w"),
63+
"avg_temp_c": avg("temp_c"),
64+
}
65+
return summary
66+
67+
68+
def main() -> None:
69+
parser = argparse.ArgumentParser()
70+
parser.add_argument("--output-csv", required=True)
71+
parser.add_argument("--output-json", required=True)
72+
parser.add_argument("--interval", type=float, default=1.0)
73+
args = parser.parse_args()
74+
75+
samples: list[dict] = []
76+
stop = False
77+
78+
def handle_sigint(sig, frame):
79+
nonlocal stop
80+
stop = True
81+
82+
signal.signal(signal.SIGINT, handle_sigint)
83+
84+
with Path(args.output_csv).open("w", newline="") as f:
85+
writer: csv.DictWriter | None = None
86+
while not stop:
87+
ts = datetime.now(timezone.utc).isoformat()
88+
for row in _sample():
89+
record = {"timestamp": ts, **row}
90+
if writer is None:
91+
writer = csv.DictWriter(f, fieldnames=list(record.keys()))
92+
writer.writeheader()
93+
writer.writerow(record)
94+
samples.append(record)
95+
f.flush()
96+
time.sleep(args.interval)
97+
98+
if samples:
99+
Path(args.output_json).write_text(json.dumps(_summarize(samples), indent=2))
100+
101+
102+
if __name__ == "__main__":
103+
main()

0 commit comments

Comments
 (0)