Skip to content

Commit 17c73ca

Browse files
committed
refact: Split executor and move hardware/error log logic for new file
1 parent 30f5b00 commit 17c73ca

4 files changed

Lines changed: 395 additions & 318 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,5 @@ summary_output/
211211
test_output/
212212
traces/
213213
test_*.json
214+
.claude/
215+
format*.json
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
#!/usr/bin/env python3
2+
"""Error Handling Utilities for Test Execution."""
3+
4+
import logging
5+
from datetime import datetime
6+
from typing import Any, Dict
7+
8+
from infinimetrics.common.constants import ErrorCode
9+
10+
logger = logging.getLogger(__name__)
11+
12+
# Memory-related error keywords
13+
MEMORY_KEYWORDS = [
14+
"out of memory",
15+
"oom",
16+
"memory leak",
17+
"memory allocation failed",
18+
"insufficient memory",
19+
"cuda out of memory",
20+
]
21+
22+
# Error logging configuration: error_code -> (is_critical, issue_type, analysis)
23+
_ERROR_LOG_CONFIG = {
24+
ErrorCode.TIMEOUT: (
25+
True,
26+
"timeout",
27+
"Test timed out. Hardware may be hung or overloaded.",
28+
),
29+
ErrorCode.SYSTEM: (True, "memory", "Memory allocation failed."),
30+
ErrorCode.CONFIG: (False, "configuration_error", None),
31+
ErrorCode.GENERIC: (False, "runtime_error", None),
32+
}
33+
34+
35+
class ErrorHandler:
36+
"""Handles error classification and response building."""
37+
38+
@staticmethod
39+
def classify_runtime_error(error_msg: str) -> int:
40+
"""
41+
Classify RuntimeError by analyzing error message.
42+
43+
Args:
44+
error_msg: Error message string (lowercase)
45+
46+
Returns:
47+
Appropriate error code
48+
"""
49+
if any(kw in error_msg for kw in MEMORY_KEYWORDS):
50+
return ErrorCode.SYSTEM
51+
return ErrorCode.GENERIC
52+
53+
@staticmethod
54+
def build_error_response(
55+
run_id: str,
56+
testcase: str,
57+
error_msg: str,
58+
result_code: int,
59+
config: Dict[str, Any],
60+
) -> Dict[str, Any]:
61+
"""
62+
Build a response dict containing error information.
63+
64+
Args:
65+
run_id: Test run identifier
66+
testcase: Test case name
67+
error_msg: Error message string
68+
result_code: Error result code
69+
config: Test configuration
70+
71+
Returns:
72+
Dictionary with error details
73+
"""
74+
# Create cleaned config without injected metadata
75+
cleaned_config = {
76+
k: v
77+
for k, v in config.items()
78+
if not k.startswith("_") # Skip _testcase, _run_id, _time
79+
}
80+
81+
return {
82+
"run_id": run_id,
83+
"testcase": testcase,
84+
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
85+
"result_code": result_code,
86+
"error_msg": error_msg,
87+
"success": 1, # 1 = failure
88+
"config": cleaned_config,
89+
}
90+
91+
@staticmethod
92+
def log_error(testcase: str, error: Exception, error_code: int) -> None:
93+
"""
94+
Log error with appropriate severity and context.
95+
96+
Args:
97+
testcase: Test case name
98+
error: Exception instance
99+
error_code: Error code for classification
100+
"""
101+
error_msg = str(error)[:300]
102+
is_critical, issue_type, analysis = _ERROR_LOG_CONFIG.get(
103+
error_code, (False, "unknown_error", None)
104+
)
105+
106+
log_fn = logger.error if is_critical else logger.warning
107+
prefix = "STABILITY CHECK FAILED" if is_critical else "Test failed"
108+
109+
lines = [f"Executor: {prefix} for {testcase}", f" Issue Type: {issue_type}"]
110+
if is_critical and analysis:
111+
lines.append(" Severity: CRITICAL")
112+
lines.append(f" Analysis: {analysis}")
113+
lines.append(f" Error: {error_msg}")
114+
115+
log_fn("\n".join(lines))
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
#!/usr/bin/env python3
2+
"""Hardware Information Collector."""
3+
4+
import logging
5+
import re
6+
import subprocess
7+
from dataclasses import dataclass
8+
from shutil import which
9+
from typing import Any, Dict, List, Optional
10+
11+
logger = logging.getLogger(__name__)
12+
13+
# Hardware probe configurations
14+
PROBE_CONFIGS = {
15+
"nvidia": {
16+
"command": [
17+
"nvidia-smi",
18+
"--query-gpu=name,memory.total,driver_version",
19+
"--format=csv,noheader",
20+
],
21+
"pattern": r"\bGPU\b",
22+
"default_name": "NVIDIA GPU",
23+
"parse_output": True,
24+
},
25+
"amd": {
26+
"candidates": ["amd-smi", "rocm-smi"],
27+
"pattern": r"\bGPU\b",
28+
"default_name": "AMD GPU",
29+
},
30+
"ascend": {
31+
"command": ["npu-smi", "info"],
32+
"pattern": r"\bNPU\b|\bDevice\b",
33+
"default_name": "Ascend NPU",
34+
},
35+
"cambricon": {
36+
"command": ["cnmon", "info"],
37+
"pattern": r"\bMLU\b|\bDevice\b",
38+
"default_name": "Cambricon MLU",
39+
},
40+
}
41+
42+
43+
@dataclass
44+
class ProbeResult:
45+
"""Result from probing a hardware type."""
46+
47+
success: bool
48+
count: int = 0
49+
model: str = ""
50+
driver: str = ""
51+
memory_gb: int = 0
52+
53+
54+
def _which(cmd: str) -> Optional[str]:
55+
"""Check if command exists in PATH."""
56+
try:
57+
return which(cmd)
58+
except Exception:
59+
return None
60+
61+
62+
class HardwareCollector:
63+
"""Collects static hardware information (CPU, memory, GPU)."""
64+
65+
def collect(self, accel_type: str = "", device_ids: Any = None) -> Dict[str, Any]:
66+
"""
67+
Best-effort static HW collector (CPU/mem/GPU model/driver/CUDA).
68+
69+
Args:
70+
accel_type: Hint for accelerator type
71+
device_ids: Device IDs to query (currently unused)
72+
73+
Returns:
74+
Dictionary with hardware information
75+
"""
76+
hw: Dict[str, Any] = {
77+
"cpu_model": "Unknown",
78+
"memory_gb": 0,
79+
"gpu_model": "Unknown",
80+
"gpu_count": 0,
81+
"gpu_memory_gb": 0,
82+
"driver_version": "Unknown",
83+
"cuda_version": "Unknown",
84+
"accelerator_type": "generic",
85+
}
86+
87+
self._collect_cpu_info(hw)
88+
self._collect_memory_info(hw)
89+
90+
# Determine probe order based on hint
91+
hint = (accel_type or "").lower().strip()
92+
probe_order = self._get_probe_order(hint)
93+
94+
for probe_type in probe_order:
95+
result = self._probe(probe_type, hw)
96+
if result.success:
97+
hw["accelerator_type"] = probe_type
98+
if probe_type == "nvidia":
99+
hw["cuda_version"] = (
100+
self._collect_cuda_version() or hw["cuda_version"]
101+
)
102+
return hw
103+
104+
return hw
105+
106+
def _get_probe_order(self, hint: str) -> List[str]:
107+
"""Get probe order based on accelerator type hint."""
108+
order = ["nvidia", "amd", "ascend", "cambricon"]
109+
if hint in order:
110+
return [hint] + [p for p in order if p != hint]
111+
return order
112+
113+
def _collect_cpu_info(self, hw: Dict[str, Any]) -> None:
114+
"""Collect CPU model information."""
115+
try:
116+
with open("/proc/cpuinfo", "r") as f:
117+
for line in f:
118+
if "model name" in line:
119+
hw["cpu_model"] = line.split(":", 1)[1].strip()
120+
break
121+
except Exception:
122+
pass
123+
124+
def _collect_memory_info(self, hw: Dict[str, Any]) -> None:
125+
"""Collect total memory information."""
126+
try:
127+
with open("/proc/meminfo", "r") as f:
128+
for line in f:
129+
if "MemTotal" in line:
130+
mem_kb = int(line.split()[1])
131+
hw["memory_gb"] = mem_kb // (1024 * 1024)
132+
break
133+
except Exception:
134+
pass
135+
136+
def _probe(self, probe_type: str, hw: Dict[str, Any]) -> ProbeResult:
137+
"""Generic probe dispatcher."""
138+
probe_methods = {
139+
"nvidia": self._probe_nvidia,
140+
"amd": self._probe_amd,
141+
"ascend": self._probe_generic_command,
142+
"cambricon": self._probe_generic_command,
143+
}
144+
method = probe_methods.get(probe_type)
145+
if method:
146+
return method(probe_type, hw)
147+
return ProbeResult(success=False)
148+
149+
def _probe_nvidia(self, probe_type: str, hw: Dict[str, Any]) -> ProbeResult:
150+
"""Probe NVIDIA GPU with special parsing."""
151+
config = PROBE_CONFIGS["nvidia"]
152+
try:
153+
r = subprocess.run(
154+
config["command"], capture_output=True, text=True, timeout=5
155+
)
156+
if r.returncode != 0 or not r.stdout.strip():
157+
return ProbeResult(success=False)
158+
159+
lines = [x.strip() for x in r.stdout.strip().splitlines() if x.strip()]
160+
hw["gpu_count"] = len(lines)
161+
162+
parts = [x.strip() for x in lines[0].split(",")]
163+
if len(parts) >= 3:
164+
hw["gpu_model"] = parts[0]
165+
hw["driver_version"] = parts[2]
166+
mem_match = re.search(r"(\d+)\s*MiB", parts[1])
167+
if mem_match:
168+
hw["gpu_memory_gb"] = int(mem_match.group(1)) // 1024
169+
return ProbeResult(success=True, count=hw["gpu_count"])
170+
except Exception:
171+
return ProbeResult(success=False)
172+
173+
def _probe_amd(self, probe_type: str, hw: Dict[str, Any]) -> ProbeResult:
174+
"""Probe AMD GPU - detect available tool first."""
175+
config = PROBE_CONFIGS["amd"]
176+
tool = next((c for c in config["candidates"] if _which(c)), None)
177+
if not tool:
178+
return ProbeResult(success=False)
179+
180+
cmd = [tool, "list"] if tool == "amd-smi" else [tool, "-i"]
181+
return self._run_probe_command(
182+
cmd, config["pattern"], config["default_name"], hw
183+
)
184+
185+
def _probe_generic_command(
186+
self, probe_type: str, hw: Dict[str, Any]
187+
) -> ProbeResult:
188+
"""Generic probe for ascend/cambricon using command output."""
189+
config = PROBE_CONFIGS.get(probe_type)
190+
if not config or not _which(config["command"][0]):
191+
return ProbeResult(success=False)
192+
193+
return self._run_probe_command(
194+
config["command"], config["pattern"], config["default_name"], hw
195+
)
196+
197+
def _run_probe_command(
198+
self, command: List[str], pattern: str, default_name: str, hw: Dict[str, Any]
199+
) -> ProbeResult:
200+
"""Run probe command and parse output."""
201+
try:
202+
r = subprocess.run(command, capture_output=True, text=True, timeout=5)
203+
if r.returncode != 0 or not r.stdout.strip():
204+
return ProbeResult(success=False)
205+
206+
count = len([x for x in r.stdout.splitlines() if re.search(pattern, x)])
207+
if count:
208+
hw["gpu_count"] = max(hw["gpu_count"], count)
209+
if hw["gpu_model"] == "Unknown":
210+
hw["gpu_model"] = default_name
211+
return ProbeResult(success=True, count=count)
212+
except Exception:
213+
return ProbeResult(success=False)
214+
215+
def _collect_cuda_version(self) -> Optional[str]:
216+
"""Collect CUDA version using nvcc."""
217+
try:
218+
r = subprocess.run(
219+
["nvcc", "--version"], capture_output=True, text=True, timeout=2
220+
)
221+
if r.returncode == 0:
222+
for line in r.stdout.splitlines():
223+
if "release" in line:
224+
match = re.search(r"release\s+(\d+\.\d+)", line)
225+
if match:
226+
return match.group(1)
227+
except Exception:
228+
pass
229+
return None
230+
231+
232+
# Singleton instance for convenience
233+
_collector = HardwareCollector()
234+
235+
236+
def collect_hardware_info(
237+
accel_type: str = "", device_ids: Any = None
238+
) -> Dict[str, Any]:
239+
"""Convenience function to collect hardware info."""
240+
return _collector.collect(accel_type, device_ids)

0 commit comments

Comments
 (0)