diff --git a/.gitignore b/.gitignore index 9d609ca7..5982cabf 100644 --- a/.gitignore +++ b/.gitignore @@ -211,3 +211,5 @@ summary_output/ test_output/ traces/ test_*.json +.claude/ +format*.json diff --git a/infinimetrics/common/error_handler.py b/infinimetrics/common/error_handler.py new file mode 100644 index 00000000..70f9161d --- /dev/null +++ b/infinimetrics/common/error_handler.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +"""Error Handling Utilities for Test Execution.""" + +import logging +from datetime import datetime +from typing import Any, Dict + +from infinimetrics.common.constants import ErrorCode + +logger = logging.getLogger(__name__) + +# Memory-related error keywords +MEMORY_KEYWORDS = [ + "out of memory", + "oom", + "memory leak", + "memory allocation failed", + "insufficient memory", + "cuda out of memory", +] + +# Error logging configuration: error_code -> (is_critical, issue_type, analysis) +_ERROR_LOG_CONFIG = { + ErrorCode.TIMEOUT: ( + True, + "timeout", + "Test timed out. Hardware may be hung or overloaded.", + ), + ErrorCode.SYSTEM: (True, "memory", "Memory allocation failed."), + ErrorCode.CONFIG: (False, "configuration_error", None), + ErrorCode.GENERIC: (False, "runtime_error", None), +} + + +class ErrorHandler: + """Handles error classification and response building.""" + + @staticmethod + def classify_runtime_error(error_msg: str) -> int: + """ + Classify RuntimeError by analyzing error message. + + Args: + error_msg: Error message string (lowercase) + + Returns: + Appropriate error code + """ + if any(kw in error_msg for kw in MEMORY_KEYWORDS): + return ErrorCode.SYSTEM + return ErrorCode.GENERIC + + @staticmethod + def build_error_response( + run_id: str, + testcase: str, + error_msg: str, + result_code: int, + config: Dict[str, Any], + ) -> Dict[str, Any]: + """ + Build a response dict containing error information. + + Args: + run_id: Test run identifier + testcase: Test case name + error_msg: Error message string + result_code: Error result code + config: Test configuration + + Returns: + Dictionary with error details + """ + # Create cleaned config without injected metadata + cleaned_config = { + k: v + for k, v in config.items() + if not k.startswith("_") # Skip _testcase, _run_id, _time + } + + return { + "run_id": run_id, + "testcase": testcase, + "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "result_code": result_code, + "error_msg": error_msg, + "success": 1, # 1 = failure + "config": cleaned_config, + } + + @staticmethod + def log_error(testcase: str, error: Exception, error_code: int) -> None: + """ + Log error with appropriate severity and context. + + Args: + testcase: Test case name + error: Exception instance + error_code: Error code for classification + """ + error_msg = str(error)[:300] + is_critical, issue_type, analysis = _ERROR_LOG_CONFIG.get( + error_code, (False, "unknown_error", None) + ) + + log_fn = logger.error if is_critical else logger.warning + prefix = "STABILITY CHECK FAILED" if is_critical else "Test failed" + + lines = [f"Executor: {prefix} for {testcase}", f" Issue Type: {issue_type}"] + if is_critical and analysis: + lines.append(" Severity: CRITICAL") + lines.append(f" Analysis: {analysis}") + lines.append(f" Error: {error_msg}") + + log_fn("\n".join(lines)) diff --git a/infinimetrics/common/hardware_info.py b/infinimetrics/common/hardware_info.py new file mode 100644 index 00000000..a69c86cd --- /dev/null +++ b/infinimetrics/common/hardware_info.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +"""Hardware Information Collector.""" + +import logging +import re +import subprocess +from dataclasses import dataclass +from shutil import which +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# Hardware probe configurations +PROBE_CONFIGS = { + "nvidia": { + "command": [ + "nvidia-smi", + "--query-gpu=name,memory.total,driver_version", + "--format=csv,noheader", + ], + "pattern": r"\bGPU\b", + "default_name": "NVIDIA GPU", + "parse_output": True, + }, + "amd": { + "candidates": ["amd-smi", "rocm-smi"], + "pattern": r"\bGPU\b", + "default_name": "AMD GPU", + }, + "ascend": { + "command": ["npu-smi", "info"], + "pattern": r"\bNPU\b|\bDevice\b", + "default_name": "Ascend NPU", + }, + "cambricon": { + "command": ["cnmon", "info"], + "pattern": r"\bMLU\b|\bDevice\b", + "default_name": "Cambricon MLU", + }, +} + + +@dataclass +class ProbeResult: + """Result from probing a hardware type.""" + + success: bool + count: int = 0 + model: str = "" + driver: str = "" + memory_gb: int = 0 + + +def _which(cmd: str) -> Optional[str]: + """Check if command exists in PATH.""" + try: + return which(cmd) + except Exception: + return None + + +class HardwareCollector: + """Collects static hardware information (CPU, memory, GPU).""" + + def collect(self, accel_type: str = "", device_ids: Any = None) -> Dict[str, Any]: + """ + Best-effort static HW collector (CPU/mem/GPU model/driver/CUDA). + + Args: + accel_type: Hint for accelerator type + device_ids: Device IDs to query (currently unused) + + Returns: + Dictionary with hardware information + """ + hw: Dict[str, Any] = { + "cpu_model": "Unknown", + "memory_gb": 0, + "gpu_model": "Unknown", + "gpu_count": 0, + "gpu_memory_gb": 0, + "driver_version": "Unknown", + "cuda_version": "Unknown", + "accelerator_type": "generic", + } + + self._collect_cpu_info(hw) + self._collect_memory_info(hw) + + # Determine probe order based on hint + hint = (accel_type or "").lower().strip() + probe_order = self._get_probe_order(hint) + + for probe_type in probe_order: + result = self._probe(probe_type, hw) + if result.success: + hw["accelerator_type"] = probe_type + if probe_type == "nvidia": + hw["cuda_version"] = ( + self._collect_cuda_version() or hw["cuda_version"] + ) + return hw + + return hw + + def _get_probe_order(self, hint: str) -> List[str]: + """Get probe order based on accelerator type hint.""" + order = ["nvidia", "amd", "ascend", "cambricon"] + if hint in order: + return [hint] + [p for p in order if p != hint] + return order + + def _collect_cpu_info(self, hw: Dict[str, Any]) -> None: + """Collect CPU model information.""" + try: + with open("/proc/cpuinfo", "r") as f: + for line in f: + if "model name" in line: + hw["cpu_model"] = line.split(":", 1)[1].strip() + break + except Exception: + pass + + def _collect_memory_info(self, hw: Dict[str, Any]) -> None: + """Collect total memory information.""" + try: + with open("/proc/meminfo", "r") as f: + for line in f: + if "MemTotal" in line: + mem_kb = int(line.split()[1]) + hw["memory_gb"] = mem_kb // (1024 * 1024) + break + except Exception: + pass + + def _probe(self, probe_type: str, hw: Dict[str, Any]) -> ProbeResult: + """Generic probe dispatcher.""" + probe_methods = { + "nvidia": self._probe_nvidia, + "amd": self._probe_amd, + "ascend": self._probe_generic_command, + "cambricon": self._probe_generic_command, + } + method = probe_methods.get(probe_type) + if method: + return method(probe_type, hw) + return ProbeResult(success=False) + + def _probe_nvidia(self, probe_type: str, hw: Dict[str, Any]) -> ProbeResult: + """Probe NVIDIA GPU with special parsing.""" + config = PROBE_CONFIGS["nvidia"] + try: + r = subprocess.run( + config["command"], capture_output=True, text=True, timeout=5 + ) + if r.returncode != 0 or not r.stdout.strip(): + return ProbeResult(success=False) + + lines = [x.strip() for x in r.stdout.strip().splitlines() if x.strip()] + hw["gpu_count"] = len(lines) + + parts = [x.strip() for x in lines[0].split(",")] + if len(parts) >= 3: + hw["gpu_model"] = parts[0] + hw["driver_version"] = parts[2] + mem_match = re.search(r"(\d+)\s*MiB", parts[1]) + if mem_match: + hw["gpu_memory_gb"] = int(mem_match.group(1)) // 1024 + return ProbeResult(success=True, count=hw["gpu_count"]) + except Exception: + return ProbeResult(success=False) + + def _probe_amd(self, probe_type: str, hw: Dict[str, Any]) -> ProbeResult: + """Probe AMD GPU - detect available tool first.""" + config = PROBE_CONFIGS["amd"] + tool = next((c for c in config["candidates"] if _which(c)), None) + if not tool: + return ProbeResult(success=False) + + cmd = [tool, "list"] if tool == "amd-smi" else [tool, "-i"] + return self._run_probe_command( + cmd, config["pattern"], config["default_name"], hw + ) + + def _probe_generic_command( + self, probe_type: str, hw: Dict[str, Any] + ) -> ProbeResult: + """Generic probe for ascend/cambricon using command output.""" + config = PROBE_CONFIGS.get(probe_type) + if not config or not _which(config["command"][0]): + return ProbeResult(success=False) + + return self._run_probe_command( + config["command"], config["pattern"], config["default_name"], hw + ) + + def _run_probe_command( + self, command: List[str], pattern: str, default_name: str, hw: Dict[str, Any] + ) -> ProbeResult: + """Run probe command and parse output.""" + try: + r = subprocess.run(command, capture_output=True, text=True, timeout=5) + if r.returncode != 0 or not r.stdout.strip(): + return ProbeResult(success=False) + + count = len([x for x in r.stdout.splitlines() if re.search(pattern, x)]) + if count: + hw["gpu_count"] = max(hw["gpu_count"], count) + if hw["gpu_model"] == "Unknown": + hw["gpu_model"] = default_name + return ProbeResult(success=True, count=count) + except Exception: + return ProbeResult(success=False) + + def _collect_cuda_version(self) -> Optional[str]: + """Collect CUDA version using nvcc.""" + try: + r = subprocess.run( + ["nvcc", "--version"], capture_output=True, text=True, timeout=2 + ) + if r.returncode == 0: + for line in r.stdout.splitlines(): + if "release" in line: + match = re.search(r"release\s+(\d+\.\d+)", line) + if match: + return match.group(1) + except Exception: + pass + return None + + +# Singleton instance for convenience +_collector = HardwareCollector() + + +def collect_hardware_info( + accel_type: str = "", device_ids: Any = None +) -> Dict[str, Any]: + """Convenience function to collect hardware info.""" + return _collector.collect(accel_type, device_ids) diff --git a/infinimetrics/executor.py b/infinimetrics/executor.py index 85e495a4..17763c78 100644 --- a/infinimetrics/executor.py +++ b/infinimetrics/executor.py @@ -3,40 +3,23 @@ Executor - Universal Test Execution Framework """ -import logging import json +import logging import subprocess -import re -from pathlib import Path -from typing import Any, Dict, List, Optional -from datetime import datetime from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional from infinimetrics.adapter import BaseAdapter -from infinimetrics.input import TestInput +from infinimetrics.common.error_handler import ErrorHandler +from infinimetrics.common.hardware_info import collect_hardware_info from infinimetrics.utils.path_utils import sanitize_filename from infinimetrics.common.constants import ErrorCode, TEST_CATEGORIES logger = logging.getLogger(__name__) -NVIDIA_SMI_GPU_QUERY = [ - "nvidia-smi", - "--query-gpu=name,memory.total,driver_version", - "--format=csv,noheader", -] - -AMD_SMI_CANDIDATES = ["amd-smi", "rocm-smi"] - - -def _which(cmd: str) -> Optional[str]: - try: - from shutil import which - - return which(cmd) - except Exception: - return None - @dataclass class TestResult: @@ -92,7 +75,6 @@ def __init__(self, payload: Dict[str, Any], adapter: BaseAdapter): self.run_id = payload.get("run_id", "") self.test_input = None - # Setup output directory from config config = payload.get("config", {}) output_dir = config.get("output_dir", "./output") self.output_dir = Path(output_dir) @@ -173,7 +155,7 @@ def execute(self) -> TestResult: config=config, ) - response = {} + response: Dict[str, Any] = {} try: # Phase 1: Setup @@ -188,7 +170,7 @@ def execute(self) -> TestResult: env = self._build_environment(response) # rebuild ordered dict (py3.7+ preserves insertion order) - ordered = {} + ordered: Dict[str, Any] = {} for k in [ "run_id", "time", @@ -223,86 +205,41 @@ def execute(self) -> TestResult: except subprocess.TimeoutExpired as e: # Timeout errors (possible hardware hang) - logger.error( - f"Executor: STABILITY CHECK FAILED for {self.testcase}\n" - f" Issue Type: timeout\n" - f" Severity: CRITICAL\n" - f" Analysis: Test timed out. Hardware may be hung or overloaded.\n" - f" Error: {str(e)[:300]}" - ) test_result.result_code = ErrorCode.TIMEOUT - # Build error response for saving - response = self._build_error_response(str(e), ErrorCode.TIMEOUT) + ErrorHandler.log_error(self.testcase, e, ErrorCode.TIMEOUT) + response = self._build_error_response(str(e), ErrorCode.TIMEOUT, config) except ValueError as e: # Configuration or input validation errors - logger.warning( - f"Executor: Test failed for {self.testcase}\n" - f" Issue Type: configuration_error\n" - f" Error: {str(e)[:300]}" - ) test_result.result_code = ErrorCode.CONFIG - # Build error response for saving - response = self._build_error_response(str(e), ErrorCode.CONFIG) + ErrorHandler.log_error(self.testcase, e, ErrorCode.CONFIG) + response = self._build_error_response(str(e), ErrorCode.CONFIG, config) except RuntimeError as e: # RuntimeError: analyze error message for specific patterns - error_msg = str(e).lower() - - # Check for memory insufficient errors - memory_keywords = [ - "out of memory", - "oom", - "memory", - "memory leak", - "allocate", - "allocation failed", - "insufficient memory", - ] - if any(kw in error_msg for kw in memory_keywords): - logger.error( - f"Executor: STABILITY CHECK FAILED for {self.testcase}\n" - f" Issue Type: memory\n" - f" Severity: CRITICAL\n" - f" Analysis: Memory allocation failed. Possible causes: insufficient memory, memory leak, or test data too large.\n" - f" Error: {str(e)[:300]}" - ) - test_result.result_code = ErrorCode.SYSTEM - # Build error response for saving - response = self._build_error_response(str(e), ErrorCode.SYSTEM) - else: - # Other RuntimeError - logger.warning( - f"Executor: Test failed for {self.testcase}\n" - f" Issue Type: runtime_error\n" - f" Error: {str(e)[:300]}" - ) - test_result.result_code = ErrorCode.GENERIC - # Build error response for saving - response = self._build_error_response(str(e), ErrorCode.GENERIC) + error_code = ErrorHandler.classify_runtime_error(str(e).lower()) + test_result.result_code = error_code + ErrorHandler.log_error(self.testcase, e, error_code) + response = self._build_error_response(str(e), error_code, config) except Exception as e: # Unexpected exceptions + test_result.result_code = ErrorCode.GENERIC logger.error( f"Executor: {self.testcase} failed with unexpected exception: {e}", exc_info=True, ) - test_result.result_code = ErrorCode.GENERIC - # Build error response for saving - response = self._build_error_response(str(e), ErrorCode.GENERIC) + response = self._build_error_response(str(e), ErrorCode.GENERIC, config) finally: # Always save result (even on failure) - try: - if not test_result.result_file: - result_file = self._save_result(response) - test_result.result_file = result_file - except Exception as teardown_error: - logger.error(f"Executor: Failed to save result: {teardown_error}") + self._finalize_result(test_result, response) return test_result - def _build_error_response(self, error_msg: str, result_code: int) -> Dict[str, Any]: + def _build_error_response( + self, error_msg: str, result_code: int, config: Dict[str, Any] + ) -> Dict[str, Any]: """ Build a response dict containing error information for saving to disk. @@ -313,28 +250,11 @@ def _build_error_response(self, error_msg: str, result_code: int) -> Dict[str, A Returns: Dictionary with basic test info and error details """ - config = self.payload.get("config", {}) - - # Create a cleaned config without injected metadata - cleaned_config = { - k: v - for k, v in config.items() - if not k.startswith("_") # Skip _testcase, _run_id, _time - } - - # Extract device information - resolved = self._extract_device_info(config) - - return { - "run_id": self.run_id, - "testcase": self.testcase, - "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "result_code": result_code, - "error_msg": error_msg, - "success": 1, # 1 = failure - "config": cleaned_config, - "resolved": resolved, - } + response = ErrorHandler.build_error_response( + self.run_id, self.testcase, error_msg, result_code, config + ) + response["resolved"] = self._extract_device_info(config) + return response def _extract_device_info(self, config: Dict[str, Any]) -> Dict[str, Any]: """Extract device information from config.""" @@ -405,7 +325,7 @@ def _build_environment(self, response: Dict[str, Any]) -> Dict[str, Any]: else: topo = f"{nodes}x{(gpn or max(1, device_used // nodes))} ring mesh" - hw = self._collect_static_hw(accel_type=accel_type, device_ids=device_ids) + hw = collect_hardware_info(accel_type=accel_type, device_ids=device_ids) return { "cluster_scale": nodes, @@ -432,216 +352,16 @@ def _build_environment(self, response: Dict[str, Any]) -> Dict[str, Any]: ], } - def _collect_static_hw( - self, accel_type: str = "", device_ids: Any = None - ) -> Dict[str, Any]: - """ - Best-effort static HW collector (CPU/mem/GPU model/driver/CUDA). - """ - hw: Dict[str, Any] = { - "cpu_model": "Unknown", - "memory_gb": 0, - "gpu_model": "Unknown", - "gpu_count": 0, - "gpu_memory_gb": 0, - "driver_version": "Unknown", - "cuda_version": "Unknown", - "accelerator_type": "generic", - } - - # CPU - try: - with open("/proc/cpuinfo", "r") as f: - for line in f: - if "model name" in line: - hw["cpu_model"] = line.split(":", 1)[1].strip() - break - except Exception: - pass - - # Mem - try: - with open("/proc/meminfo", "r") as f: - for line in f: - if "MemTotal" in line: - mem_kb = int(line.split()[1]) - hw["memory_gb"] = mem_kb // (1024 * 1024) - break - except Exception: - pass - - hint = (accel_type or "").lower().strip() - - probes: List[str] = [] - if hint in ("nvidia", "amd", "ascend", "cambricon", "generic"): - probes = [hint] - else: - probes = [] - - # add auto-detect order - if "nvidia" not in probes: - probes.append("nvidia") - if "amd" not in probes: - probes.append("amd") - if "ascend" not in probes: - probes.append("ascend") - if "cambricon" not in probes: - probes.append("cambricon") - if "generic" not in probes: - probes.append("generic") - - for p in probes: - if p == "nvidia" and self._probe_nvidia(hw): - hw["accelerator_type"] = "nvidia" - hw["cuda_version"] = self._collect_cuda_version() or hw["cuda_version"] - return hw - - if p == "amd" and self._probe_amd(hw): - hw["accelerator_type"] = "amd" - return hw - - if p == "ascend" and self._probe_ascend(hw): - hw["accelerator_type"] = "ascend" - return hw - - if p == "cambricon" and self._probe_cambricon(hw): - hw["accelerator_type"] = "cambricon" - return hw - - if p == "generic": - hw["accelerator_type"] = "generic" - return hw - - return hw - - def _probe_nvidia(self, hw: Dict[str, Any]) -> bool: - try: - r = subprocess.run( - NVIDIA_SMI_GPU_QUERY, capture_output=True, text=True, timeout=5 - ) - if r.returncode != 0 or not r.stdout.strip(): - return False - - lines = [x.strip() for x in r.stdout.strip().splitlines() if x.strip()] - hw["gpu_count"] = len(lines) - - p = [x.strip() for x in lines[0].split(",")] - if len(p) >= 3: - hw["gpu_model"] = p[0] - hw["driver_version"] = p[2] - mm = re.search(r"(\d+)\s*MiB", p[1]) - if mm: - hw["gpu_memory_gb"] = int(mm.group(1)) // 1024 - return True - except Exception: - return False - - def _probe_amd(self, hw: Dict[str, Any]) -> bool: - """ - Try amd-smi or rocm-smi (best-effort). - We only fill model/count if possible; otherwise return False. - """ - try: - tool = None - for c in AMD_SMI_CANDIDATES: - if _which(c): - tool = c - break - if not tool: - return False - - # Try a light command. Different environments output differently. - # amd-smi: `amd-smi list` ; rocm-smi: `rocm-smi -i` - if tool == "amd-smi": - cmd = ["amd-smi", "list"] - else: - cmd = ["rocm-smi", "-i"] - - r = subprocess.run(cmd, capture_output=True, text=True, timeout=5) - if r.returncode != 0 or not r.stdout.strip(): - return False - - # Minimal parse: count devices by "GPU" markers - txt = r.stdout - # heuristic: count lines containing "GPU" and an index - lines = [ - x for x in txt.splitlines() if re.search(r"\bGPU\b", x, re.IGNORECASE) - ] - hw["gpu_count"] = ( - max(hw["gpu_count"], len(lines)) if lines else hw["gpu_count"] - ) - hw["gpu_model"] = ( - hw["gpu_model"] if hw["gpu_model"] != "Unknown" else "AMD GPU" - ) - return True - except Exception: - return False - - def _probe_ascend(self, hw: Dict[str, Any]) -> bool: - """ - Ascend: best-effort using npu-smi if present. - """ - try: - if not _which("npu-smi"): - return False - r = subprocess.run( - ["npu-smi", "info"], capture_output=True, text=True, timeout=5 - ) - if r.returncode != 0 or not r.stdout.strip(): - return False - - txt = r.stdout - # heuristic: count device lines with "NPU" or "Device" - cnt = len( - [x for x in txt.splitlines() if re.search(r"\bNPU\b|\bDevice\b", x)] - ) - hw["gpu_count"] = max(hw["gpu_count"], cnt) if cnt else hw["gpu_count"] - hw["gpu_model"] = ( - hw["gpu_model"] if hw["gpu_model"] != "Unknown" else "Ascend NPU" - ) - return True - except Exception: - return False - - def _probe_cambricon(self, hw: Dict[str, Any]) -> bool: - """ - Cambricon: best-effort using cnmon if present. - """ - try: - if not _which("cnmon"): - return False - r = subprocess.run( - ["cnmon", "info"], capture_output=True, text=True, timeout=5 - ) - if r.returncode != 0 or not r.stdout.strip(): - return False - - txt = r.stdout - cnt = len( - [x for x in txt.splitlines() if re.search(r"\bMLU\b|\bDevice\b", x)] - ) - hw["gpu_count"] = max(hw["gpu_count"], cnt) if cnt else hw["gpu_count"] - hw["gpu_model"] = ( - hw["gpu_model"] if hw["gpu_model"] != "Unknown" else "Cambricon MLU" - ) - return True - except Exception: - return False - - def _collect_cuda_version(self) -> Optional[str]: - try: - r = subprocess.run( - ["nvcc", "--version"], capture_output=True, text=True, timeout=2 - ) - if r.returncode == 0: - for line in r.stdout.splitlines(): - if "release" in line: - m = re.search(r"release\s+(\d+\.\d+)", line) - if m: - return m.group(1) - except Exception: - pass - return None + def _finalize_result( + self, test_result: TestResult, response: Dict[str, Any] + ) -> None: + """Save result file if not already saved.""" + if not test_result.result_file: + try: + result_file = self._save_result(response) + test_result.result_file = result_file + except Exception as teardown_error: + logger.error(f"Executor: Failed to save result: {teardown_error}") def _save_result(self, result: Dict[str, Any]) -> str: """