Skip to content

Commit f0bb61f

Browse files
feat: add TLB/THP tracking, page reclaim detection, NUMA analysis, GPU/PCIe topology (#10)
Page Reclaim & THP (Tier 1): - Parse /proc/vmstat: pgscan_direct/kswapd, pgsteal, allocstall, compact_stall/success/fail, thp_fault_alloc/collapse/split - Two-point sampling for rate computation (direct_reclaim_rate, compact_stall_rate, thp_split_rate) - Collect THP defrag mode, vm.watermark_scale_factor, dirty_expire/writeback_centisecs, zone_reclaim_mode - Anomaly rules: direct_reclaim_rate, compaction_stall_rate, thp_split_rate - Recommendations: watermark tuning, THP madvise, compaction fixes NUMA Analysis: - Compute miss_ratio per node from existing numa_hit/numa_miss data - Collect NUMA distance matrix and CPU list per node - Collect kernel.sched_numa_balancing sysctl - Anomaly rule: numa_miss_ratio > 5% warning, > 20% critical - Recommendation: numactl pinning, enable sched_numa_balancing GPU & PCIe Topology (new Tier 1 collector): - New GPUCollector: nvidia-smi query for GPU metrics (name, memory, utilization, temperature, power, PCI bus) - PCI device → NUMA node mapping from /sys/bus/pci/devices/*/numa_node - NIC → NUMA node mapping from /sys/class/net/*/device/numa_node - Cross-NUMA pair detection (GPU on node N, NIC on node M) - Anomaly rule: gpu_nic_cross_numa - Model types: GPUDevice, PCIeTopology, CrossNUMAPair Tests: 10 new test functions covering vmstat reclaim, rate computation, GPU detection with mock nvidia-smi, cross-NUMA pair detection, NUMA miss ratio. MCP explanations added for all 6 new anomaly IDs. Co-authored-by: dmitriimaksimovdevelop <227611064+dmitriimaksimovdevelop@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4d8ff36 commit f0bb61f

9 files changed

Lines changed: 799 additions & 8 deletions

File tree

internal/collector/gpu.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// GPU and PCIe topology collector (Tier 1).
2+
// Detects NVIDIA GPUs via nvidia-smi, maps PCI devices to NUMA nodes,
3+
// flags GPU-NIC pairs on different NUMA nodes.
4+
package collector
5+
6+
import (
7+
"context"
8+
"os"
9+
"path/filepath"
10+
"strconv"
11+
"strings"
12+
"time"
13+
14+
"github.com/dmitriimaksimovdevelop/melisai/internal/model"
15+
)
16+
17+
// GPUCollector detects GPUs and PCIe topology.
18+
type GPUCollector struct {
19+
sysRoot string
20+
cmdRun CommandRunner
21+
}
22+
23+
func NewGPUCollector(sysRoot string) *GPUCollector {
24+
return &GPUCollector{sysRoot: sysRoot, cmdRun: &ExecCommandRunner{}}
25+
}
26+
27+
func (c *GPUCollector) Name() string { return "gpu_pcie" }
28+
func (c *GPUCollector) Category() string { return "system" }
29+
func (c *GPUCollector) Available() Availability {
30+
return Availability{Tier: 1}
31+
}
32+
33+
func (c *GPUCollector) Collect(ctx context.Context, cfg CollectConfig) (*model.Result, error) {
34+
start := time.Now()
35+
topo := &model.PCIeTopology{
36+
NICNUMAMap: make(map[string]int),
37+
}
38+
39+
// Detect NVIDIA GPUs via nvidia-smi
40+
topo.GPUs = c.detectNvidiaGPUs(ctx)
41+
42+
// Build NIC → NUMA node map from sysfs
43+
c.buildNICNUMAMap(topo)
44+
45+
// Find GPU-NIC cross-NUMA pairs
46+
c.findCrossNUMAPairs(topo)
47+
48+
// Skip if nothing detected
49+
if len(topo.GPUs) == 0 && len(topo.NICNUMAMap) == 0 {
50+
return nil, nil
51+
}
52+
53+
return &model.Result{
54+
Collector: c.Name(),
55+
Category: c.Category(),
56+
Tier: 1,
57+
StartTime: start,
58+
EndTime: time.Now(),
59+
Data: topo,
60+
}, nil
61+
}
62+
63+
// detectNvidiaGPUs queries nvidia-smi for GPU information.
64+
func (c *GPUCollector) detectNvidiaGPUs(ctx context.Context) []model.GPUDevice {
65+
// nvidia-smi --query-gpu=index,name,driver_version,pci.bus_id,memory.total,memory.used,utilization.gpu,utilization.memory,temperature.gpu,power.draw --format=csv,noheader,nounits
66+
out, err := c.cmdRun.Run(ctx, "nvidia-smi",
67+
"--query-gpu=index,name,driver_version,pci.bus_id,memory.total,memory.used,utilization.gpu,utilization.memory,temperature.gpu,power.draw",
68+
"--format=csv,noheader,nounits")
69+
if err != nil {
70+
return nil // nvidia-smi not available
71+
}
72+
73+
var gpus []model.GPUDevice
74+
for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
75+
fields := strings.Split(line, ", ")
76+
if len(fields) < 4 {
77+
continue
78+
}
79+
idx, _ := strconv.Atoi(strings.TrimSpace(fields[0]))
80+
gpu := model.GPUDevice{
81+
Index: idx,
82+
Name: strings.TrimSpace(fields[1]),
83+
Driver: strings.TrimSpace(fields[2]),
84+
PCIBus: strings.TrimSpace(fields[3]),
85+
}
86+
if len(fields) > 4 {
87+
gpu.MemoryTotal, _ = strconv.ParseInt(strings.TrimSpace(fields[4]), 10, 64)
88+
}
89+
if len(fields) > 5 {
90+
gpu.MemoryUsed, _ = strconv.ParseInt(strings.TrimSpace(fields[5]), 10, 64)
91+
}
92+
if len(fields) > 6 {
93+
gpu.UtilGPU, _ = strconv.Atoi(strings.TrimSpace(fields[6]))
94+
}
95+
if len(fields) > 7 {
96+
gpu.UtilMemory, _ = strconv.Atoi(strings.TrimSpace(fields[7]))
97+
}
98+
if len(fields) > 8 {
99+
gpu.Temperature, _ = strconv.Atoi(strings.TrimSpace(fields[8]))
100+
}
101+
if len(fields) > 9 {
102+
pw, _ := strconv.ParseFloat(strings.TrimSpace(fields[9]), 64)
103+
gpu.PowerWatts = int(pw)
104+
}
105+
106+
// Get NUMA node from sysfs
107+
gpu.NUMANode = c.pciNUMANode(gpu.PCIBus)
108+
109+
gpus = append(gpus, gpu)
110+
}
111+
return gpus
112+
}
113+
114+
// buildNICNUMAMap maps each physical NIC to its NUMA node.
115+
func (c *GPUCollector) buildNICNUMAMap(topo *model.PCIeTopology) {
116+
netDir := filepath.Join(c.sysRoot, "class", "net")
117+
entries, err := os.ReadDir(netDir)
118+
if err != nil {
119+
return
120+
}
121+
122+
for _, e := range entries {
123+
name := e.Name()
124+
if name == "lo" || strings.HasPrefix(name, "veth") || strings.HasPrefix(name, "docker") || strings.HasPrefix(name, "br-") {
125+
continue
126+
}
127+
numaFile := filepath.Join(netDir, name, "device", "numa_node")
128+
if data, err := os.ReadFile(numaFile); err == nil {
129+
node, _ := strconv.Atoi(strings.TrimSpace(string(data)))
130+
if node >= 0 {
131+
topo.NICNUMAMap[name] = node
132+
}
133+
}
134+
}
135+
}
136+
137+
// findCrossNUMAPairs identifies GPU-NIC pairs on different NUMA nodes.
138+
func (c *GPUCollector) findCrossNUMAPairs(topo *model.PCIeTopology) {
139+
for _, gpu := range topo.GPUs {
140+
for nic, nicNode := range topo.NICNUMAMap {
141+
if gpu.NUMANode != nicNode && gpu.NUMANode >= 0 && nicNode >= 0 {
142+
topo.CrossNUMAPairs = append(topo.CrossNUMAPairs, model.CrossNUMAPair{
143+
GPU: gpu.Name,
144+
GPUNode: gpu.NUMANode,
145+
NIC: nic,
146+
NICNode: nicNode,
147+
})
148+
}
149+
}
150+
}
151+
}
152+
153+
// pciNUMANode reads the NUMA node for a PCI device from sysfs.
154+
// PCIBus format from nvidia-smi: "00000000:01:00.0"
155+
func (c *GPUCollector) pciNUMANode(pciBus string) int {
156+
// Try direct sysfs path
157+
numaFile := filepath.Join(c.sysRoot, "bus", "pci", "devices", pciBus, "numa_node")
158+
if data, err := os.ReadFile(numaFile); err == nil {
159+
node, _ := strconv.Atoi(strings.TrimSpace(string(data)))
160+
return node
161+
}
162+
return -1
163+
}

internal/collector/memory.go

Lines changed: 123 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,20 @@ func (c *MemoryCollector) Collect(ctx context.Context, cfg CollectConfig) (*mode
3737
// /proc/meminfo
3838
c.parseMeminfo(data)
3939

40-
// /proc/vmstat — page faults
40+
// /proc/vmstat — page faults, reclaim, compaction, THP (two-point sampling)
41+
vmstat1 := c.parseVmstatRaw()
42+
interval := cfg.SampleInterval
43+
if interval == 0 {
44+
interval = 1 * time.Second
45+
}
46+
select {
47+
case <-time.After(interval):
48+
case <-ctx.Done():
49+
return nil, ctx.Err()
50+
}
4151
c.parseVmstat(data)
52+
vmstat2 := c.parseVmstatRaw()
53+
c.computeReclaimRates(data, vmstat1, vmstat2, interval.Seconds())
4254

4355
// vm.* sysctl settings
4456
data.Swappiness = readSysctlInt(c.procRoot, "sys/vm/swappiness")
@@ -52,14 +64,22 @@ func (c *MemoryCollector) Collect(ctx context.Context, cfg CollectConfig) (*mode
5264

5365
// Transparent Huge Pages
5466
data.THPEnabled = c.readTHPEnabled()
67+
data.THPDefrag = c.readTHPDefrag()
68+
69+
// Additional vm.* sysctls
70+
data.WatermarkScaleFactor = readSysctlInt(c.procRoot, "sys/vm/watermark_scale_factor")
71+
data.DirtyExpireCentisecs = readSysctlInt(c.procRoot, "sys/vm/dirty_expire_centisecs")
72+
data.DirtyWritebackCentisecs = readSysctlInt(c.procRoot, "sys/vm/dirty_writeback_centisecs")
73+
data.ZoneReclaimMode = readSysctlInt(c.procRoot, "sys/vm/zone_reclaim_mode")
74+
data.SchedNumaBalancing = readSysctlInt(c.procRoot, "sys/kernel/sched_numa_balancing")
5575

5676
// PSI memory pressure
5777
c.parsePSI(data)
5878

5979
// /proc/buddyinfo
6080
data.BuddyInfo = c.parseBuddyinfo()
6181

62-
// NUMA stats
82+
// NUMA stats (with distance matrix and CPU list)
6383
data.NUMANodes = c.parseNUMAStats()
6484

6585
return &model.Result{
@@ -124,6 +144,7 @@ func (c *MemoryCollector) parseVmstat(data *model.MemoryData) {
124144
}
125145
defer f.Close()
126146

147+
r := &model.ReclaimStats{}
127148
scanner := bufio.NewScanner(f)
128149
for scanner.Scan() {
129150
fields := strings.Fields(scanner.Text())
@@ -136,8 +157,74 @@ func (c *MemoryCollector) parseVmstat(data *model.MemoryData) {
136157
data.MajorFaults = val
137158
case "pgfault":
138159
data.MinorFaults = val
160+
case "pgscan_direct":
161+
r.PgscanDirect = val
162+
case "pgscan_kswapd":
163+
r.PgscanKswapd = val
164+
case "pgsteal_direct":
165+
r.PgstealDirect = val
166+
case "pgsteal_kswapd":
167+
r.PgstealKswapd = val
168+
case "allocstall_normal":
169+
r.AllocstallNormal = val
170+
case "allocstall_dma":
171+
r.AllocstallDMA = val
172+
case "allocstall_movable":
173+
r.AllocstallMovable = val
174+
case "compact_stall":
175+
r.CompactStall = val
176+
case "compact_success":
177+
r.CompactSuccess = val
178+
case "compact_fail":
179+
r.CompactFail = val
180+
case "thp_fault_alloc":
181+
r.THPFaultAlloc = val
182+
case "thp_collapse_alloc":
183+
r.THPCollapseAlloc = val
184+
case "thp_split_page":
185+
r.THPSplitPage = val
186+
}
187+
}
188+
data.Reclaim = r
189+
}
190+
191+
// parseVmstatRaw returns key vmstat counters as a map for rate computation.
192+
func (c *MemoryCollector) parseVmstatRaw() map[string]int64 {
193+
f, err := os.Open(filepath.Join(c.procRoot, "vmstat"))
194+
if err != nil {
195+
return nil
196+
}
197+
defer f.Close()
198+
199+
m := make(map[string]int64)
200+
scanner := bufio.NewScanner(f)
201+
for scanner.Scan() {
202+
fields := strings.Fields(scanner.Text())
203+
if len(fields) != 2 {
204+
continue
205+
}
206+
switch fields[0] {
207+
case "pgscan_direct", "compact_stall", "thp_split_page", "allocstall_normal":
208+
m[fields[0]], _ = strconv.ParseInt(fields[1], 10, 64)
139209
}
140210
}
211+
return m
212+
}
213+
214+
// computeReclaimRates computes per-second rates for reclaim counters.
215+
func (c *MemoryCollector) computeReclaimRates(data *model.MemoryData, v1, v2 map[string]int64, secs float64) {
216+
if data == nil || data.Reclaim == nil || v1 == nil || v2 == nil || secs <= 0 {
217+
return
218+
}
219+
if d := v2["pgscan_direct"] - v1["pgscan_direct"]; d > 0 {
220+
data.Reclaim.DirectReclaimRate = float64(d) / secs
221+
}
222+
if d := v2["compact_stall"] - v1["compact_stall"]; d > 0 {
223+
data.Reclaim.CompactStallRate = float64(d) / secs
224+
}
225+
if d := v2["thp_split_page"] - v1["thp_split_page"]; d > 0 {
226+
data.Reclaim.THPSplitRate = float64(d) / secs
227+
}
141228
}
142229

143230
func (c *MemoryCollector) parsePSI(data *model.MemoryData) {
@@ -193,6 +280,21 @@ func (c *MemoryCollector) readTHPEnabled() string {
193280
return strings.TrimSpace(content)
194281
}
195282

283+
func (c *MemoryCollector) readTHPDefrag() string {
284+
data, err := os.ReadFile(filepath.Join(c.sysRoot, "kernel", "mm", "transparent_hugepage", "defrag"))
285+
if err != nil {
286+
return ""
287+
}
288+
content := string(data)
289+
if idx := strings.Index(content, "["); idx >= 0 {
290+
end := strings.Index(content[idx:], "]")
291+
if end > 0 {
292+
return content[idx+1 : idx+end]
293+
}
294+
}
295+
return strings.TrimSpace(content)
296+
}
297+
196298
func (c *MemoryCollector) parseBuddyinfo() map[string][]int {
197299
f, err := os.Open(filepath.Join(c.procRoot, "buddyinfo"))
198300
if err != nil {
@@ -289,6 +391,25 @@ func (c *MemoryCollector) parseNUMAStats() []model.NUMANode {
289391
f.Close()
290392
}
291393

394+
// Compute miss ratio
395+
total := node.NumaHit + node.NumaMiss
396+
if total > 0 {
397+
node.MissRatio = float64(node.NumaMiss) / float64(total) * 100
398+
}
399+
400+
// Distance matrix
401+
if distData, err := os.ReadFile(filepath.Join(nodePath, "distance")); err == nil {
402+
for _, s := range strings.Fields(strings.TrimSpace(string(distData))) {
403+
v, _ := strconv.Atoi(s)
404+
node.Distance = append(node.Distance, v)
405+
}
406+
}
407+
408+
// CPU list
409+
if cpuData, err := os.ReadFile(filepath.Join(nodePath, "cpulist")); err == nil {
410+
node.CPUs = strings.TrimSpace(string(cpuData))
411+
}
412+
292413
nodes = append(nodes, node)
293414
}
294415
return nodes

0 commit comments

Comments
 (0)