-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.py
More file actions
696 lines (585 loc) · 23.1 KB
/
executor.py
File metadata and controls
696 lines (585 loc) · 23.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
#!/usr/bin/env python3
"""
Executor - Universal Test Execution Framework
"""
import logging
import json
import subprocess
import re
from pathlib import Path
from typing import Any, Dict, List, Optional
from datetime import datetime
from dataclasses import dataclass
from infinimetrics.adapter import BaseAdapter
from infinimetrics.input import TestInput
from infinimetrics.utils.path_utils import sanitize_filename
from infinimetrics.common.constants import ErrorCode, TEST_CATEGORIES
logger = logging.getLogger(__name__)
NVIDIA_SMI_GPU_QUERY = [
"nvidia-smi",
"--query-gpu=name,memory.total,driver_version",
"--format=csv,noheader",
]
AMD_SMI_CANDIDATES = ["amd-smi", "rocm-smi"]
def _which(cmd: str) -> Optional[str]:
try:
from shutil import which
return which(cmd)
except Exception:
return None
@dataclass
class TestResult:
"""
Standardized test result structure.
Used throughout the execution lifecycle and returned to Dispatcher.
Note:
result_code: 0 = success, non-zero = error code (following Linux convention)
"""
run_id: str
testcase: str
result_code: int # 0 = success, non-zero = error code
result_file: Optional[str] = None
skipped: bool = False
config: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to lightweight dictionary format for Dispatcher aggregation."""
return {
"run_id": self.run_id,
"testcase": self.testcase,
"result_code": self.result_code,
"result_file": self.result_file,
"skipped": self.skipped,
"config": self.config,
}
class Executor:
"""
Universal test executor for all test types.
Responsibilities:
1. Manage adapter lifecycle (setup -> process -> teardown)
2. Save results to disk
3. Return result summary
"""
def __init__(self, payload: Dict[str, Any], adapter: BaseAdapter):
"""
Initialize executor.
Args:
payload: Test payload with testcase, config, etc.
adapter: Configured adapter instance
"""
self.payload = payload
self.adapter = adapter
self.testcase = payload.get("testcase", "unknown")
self.run_id = payload.get("run_id", "")
self.test_input = None
# Setup output directory from config
config = payload.get("config", {})
output_dir = config.get("output_dir", "./output")
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"Executor initialized: testcase={self.testcase}")
def setup(self) -> None:
"""
Setup phase - initialize adapter.
This should be called before execute().
"""
config = self.payload.get("config", {})
# Inject testcase, run_id, and other metadata into config
config["_testcase"] = self.payload.get("testcase", "")
config["_run_id"] = self.payload.get("run_id", "")
config["_time"] = self.payload.get("time", None)
# Initialize test_input from payload
self.test_input = self.payload
self.adapter.setup(config)
logger.debug(f"Executor: Setup complete for {self.testcase}")
def teardown(self, result: Any) -> str:
"""
Teardown phase - cleanup adapter, collect metrics, and save results.
This should be called after process() completes.
Args:
result:
Returns:
Path to saved result file
"""
# Always cleanup adapter
try:
self.adapter.teardown()
except Exception as teardown_error:
logger.warning(
f"Executor: Teardown failed for {self.testcase}: {teardown_error}"
)
# TODO: Add metrics calculation method
# Save result to disk
result_file = self._save_result(result)
logger.debug(f"Executor: Teardown complete for {self.testcase}")
return result_file
def execute(self) -> TestResult:
"""
Execute the complete test with proper lifecycle management.
Lifecycle:
1. adapter.setup(config)
2. adapter.process(payload)
3. adapter.teardown() - includes saving results
4. Return TestResult
Returns:
TestResult object with result_code and file path.
"""
logger.info(f"Executor: Running {self.testcase}")
# Initialize TestResult directly (default: result_code=0)
config = self.payload.get("config", {})
test_result = TestResult(
run_id=self.run_id,
testcase=self.testcase,
result_code=0, # Default to success
result_file=None,
config=config,
)
response = {}
try:
# Phase 1: Setup
self.setup()
# Phase 2: Process
logger.debug(f"Executor: Calling adapter.process()")
response = self.adapter.process(self.test_input)
# Enrich environment ONLY if missing
if isinstance(response, dict) and "environment" not in response:
env = self._build_environment(response)
# rebuild ordered dict (py3.7+ preserves insertion order)
ordered = {}
for k in [
"run_id",
"time",
"testcase",
"success",
"environment",
"result_code",
"config",
"metrics",
]:
if k == "environment":
ordered["environment"] = env
elif k in response:
ordered[k] = response[k]
# append remaining keys in original order (skip those already set)
for k, v in response.items():
if k not in ordered:
ordered[k] = v
response = ordered
# Phase 3: Teardown (cleanup, save result)
result_file = self.teardown(response)
test_result.result_file = result_file
logger.info(
f"Executor: {self.testcase} completed with code={test_result.result_code}"
)
return test_result
except subprocess.TimeoutExpired as e:
# Timeout errors (possible hardware hang)
logger.error(
f"Executor: STABILITY CHECK FAILED for {self.testcase}\n"
f" Issue Type: timeout\n"
f" Severity: CRITICAL\n"
f" Analysis: Test timed out. Hardware may be hung or overloaded.\n"
f" Error: {str(e)[:300]}"
)
test_result.result_code = ErrorCode.TIMEOUT
# Build error response for saving
response = self._build_error_response(str(e), ErrorCode.TIMEOUT)
except ValueError as e:
# Configuration or input validation errors
logger.warning(
f"Executor: Test failed for {self.testcase}\n"
f" Issue Type: configuration_error\n"
f" Error: {str(e)[:300]}"
)
test_result.result_code = ErrorCode.CONFIG
# Build error response for saving
response = self._build_error_response(str(e), ErrorCode.CONFIG)
except RuntimeError as e:
# RuntimeError: analyze error message for specific patterns
error_msg = str(e).lower()
# Check for memory insufficient errors
memory_keywords = [
"out of memory",
"oom",
"memory",
"memory leak",
"allocate",
"allocation failed",
"insufficient memory",
]
if any(kw in error_msg for kw in memory_keywords):
logger.error(
f"Executor: STABILITY CHECK FAILED for {self.testcase}\n"
f" Issue Type: memory\n"
f" Severity: CRITICAL\n"
f" Analysis: Memory allocation failed. Possible causes: insufficient memory, memory leak, or test data too large.\n"
f" Error: {str(e)[:300]}"
)
test_result.result_code = ErrorCode.SYSTEM
# Build error response for saving
response = self._build_error_response(str(e), ErrorCode.SYSTEM)
else:
# Other RuntimeError
logger.warning(
f"Executor: Test failed for {self.testcase}\n"
f" Issue Type: runtime_error\n"
f" Error: {str(e)[:300]}"
)
test_result.result_code = ErrorCode.GENERIC
# Build error response for saving
response = self._build_error_response(str(e), ErrorCode.GENERIC)
except Exception as e:
# Unexpected exceptions
logger.error(
f"Executor: {self.testcase} failed with unexpected exception: {e}",
exc_info=True,
)
test_result.result_code = ErrorCode.GENERIC
# Build error response for saving
response = self._build_error_response(str(e), ErrorCode.GENERIC)
finally:
# Always save result (even on failure)
try:
if not test_result.result_file:
result_file = self._save_result(response)
test_result.result_file = result_file
except Exception as teardown_error:
logger.error(f"Executor: Failed to save result: {teardown_error}")
return test_result
def _build_error_response(self, error_msg: str, result_code: int) -> Dict[str, Any]:
"""
Build a response dict containing error information for saving to disk.
Args:
error_msg: Error message string
result_code: Error result code
Returns:
Dictionary with basic test info and error details
"""
config = self.payload.get("config", {})
# Create a cleaned config without injected metadata
cleaned_config = {
k: v
for k, v in config.items()
if not k.startswith("_") # Skip _testcase, _run_id, _time
}
# Extract device information
resolved = self._extract_device_info(config)
return {
"run_id": self.run_id,
"testcase": self.testcase,
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"result_code": result_code,
"error_msg": error_msg,
"success": 1, # 1 = failure
"config": cleaned_config,
"resolved": resolved,
}
def _extract_device_info(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Extract device information from config."""
device_used = 0
gpus_per_node = 0
nodes = 1
# Try device_involved
if "device_involved" in config:
try:
device_used = int(config.get("device_involved", 0) or 0)
except (ValueError, TypeError):
device_used = 0
# Try single_node config
if isinstance(config.get("single_node"), dict):
single_node = config["single_node"]
device_ids = single_node.get("device_ids", [])
if device_ids:
device_used = len(device_ids)
gpus_per_node = device_used
else:
gpus_per_node = device_used
# Try multi_node config
if "multi_node" in config:
try:
nodes = int(config.get("multi_node", {}).get("num_nodes", 1) or 1)
except (ValueError, TypeError):
nodes = 1
return {
"nodes": nodes,
"gpus_per_node": gpus_per_node,
"device_used": device_used,
}
def _build_environment(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""
Build a unified environment block
"""
resolved = response.get("resolved", {}) if isinstance(response, dict) else {}
nodes = int(resolved.get("nodes", 1) or 1)
device_used = int(resolved.get("device_used", 0) or 0)
gpn = int(resolved.get("gpus_per_node", 0) or 0)
# Fallback to config hints if adapter didn't provide
cfg = self.payload.get("config", {}) or {}
accel_type = (
(cfg.get("accelerator_type") or cfg.get("device_type") or "")
.strip()
.lower()
) # optional
device_ids = cfg.get("device_ids")
if device_ids is None and isinstance(cfg.get("single_node"), dict):
device_ids = cfg["single_node"].get("device_ids")
if device_used <= 0:
try:
device_used = int(cfg.get("device_involved", 0) or 0)
except Exception:
device_used = 0
if nodes <= 1:
topo = f"{device_used}x1 ring mesh"
else:
topo = f"{nodes}x{(gpn or max(1, device_used // nodes))} ring mesh"
hw = self._collect_static_hw(accel_type=accel_type, device_ids=device_ids)
return {
"cluster_scale": nodes,
"topology": topo,
"cluster": [
{
"machine": {
"cpu_model": hw.get("cpu_model", "Unknown"),
"memory_gb": hw.get("memory_gb", 0),
"accelerators": [
{
"model": hw.get("gpu_model", "Unknown"),
"count": device_used,
"memory_gb_per_card": hw.get("gpu_memory_gb", 0),
"driver": hw.get("driver_version", "Unknown"),
"cuda": hw.get("cuda_version", "Unknown"),
# reserved: type of platform
"type": hw.get("accelerator_type", "generic"),
}
],
},
"framework": [{"name": "unknown", "version": "unknown"}],
}
],
}
def _collect_static_hw(
self, accel_type: str = "", device_ids: Any = None
) -> Dict[str, Any]:
"""
Best-effort static HW collector (CPU/mem/GPU model/driver/CUDA).
"""
hw: Dict[str, Any] = {
"cpu_model": "Unknown",
"memory_gb": 0,
"gpu_model": "Unknown",
"gpu_count": 0,
"gpu_memory_gb": 0,
"driver_version": "Unknown",
"cuda_version": "Unknown",
"accelerator_type": "generic",
}
# CPU
try:
with open("/proc/cpuinfo", "r") as f:
for line in f:
if "model name" in line:
hw["cpu_model"] = line.split(":", 1)[1].strip()
break
except Exception:
pass
# Mem
try:
with open("/proc/meminfo", "r") as f:
for line in f:
if "MemTotal" in line:
mem_kb = int(line.split()[1])
hw["memory_gb"] = mem_kb // (1024 * 1024)
break
except Exception:
pass
hint = (accel_type or "").lower().strip()
probes: List[str] = []
if hint in ("nvidia", "amd", "ascend", "cambricon", "generic"):
probes = [hint]
else:
probes = []
# add auto-detect order
if "nvidia" not in probes:
probes.append("nvidia")
if "amd" not in probes:
probes.append("amd")
if "ascend" not in probes:
probes.append("ascend")
if "cambricon" not in probes:
probes.append("cambricon")
if "generic" not in probes:
probes.append("generic")
for p in probes:
if p == "nvidia" and self._probe_nvidia(hw):
hw["accelerator_type"] = "nvidia"
hw["cuda_version"] = self._collect_cuda_version() or hw["cuda_version"]
return hw
if p == "amd" and self._probe_amd(hw):
hw["accelerator_type"] = "amd"
return hw
if p == "ascend" and self._probe_ascend(hw):
hw["accelerator_type"] = "ascend"
return hw
if p == "cambricon" and self._probe_cambricon(hw):
hw["accelerator_type"] = "cambricon"
return hw
if p == "generic":
hw["accelerator_type"] = "generic"
return hw
return hw
def _probe_nvidia(self, hw: Dict[str, Any]) -> bool:
try:
r = subprocess.run(
NVIDIA_SMI_GPU_QUERY, capture_output=True, text=True, timeout=5
)
if r.returncode != 0 or not r.stdout.strip():
return False
lines = [x.strip() for x in r.stdout.strip().splitlines() if x.strip()]
hw["gpu_count"] = len(lines)
p = [x.strip() for x in lines[0].split(",")]
if len(p) >= 3:
hw["gpu_model"] = p[0]
hw["driver_version"] = p[2]
mm = re.search(r"(\d+)\s*MiB", p[1])
if mm:
hw["gpu_memory_gb"] = int(mm.group(1)) // 1024
return True
except Exception:
return False
def _probe_amd(self, hw: Dict[str, Any]) -> bool:
"""
Try amd-smi or rocm-smi (best-effort).
We only fill model/count if possible; otherwise return False.
"""
try:
tool = None
for c in AMD_SMI_CANDIDATES:
if _which(c):
tool = c
break
if not tool:
return False
# Try a light command. Different environments output differently.
# amd-smi: `amd-smi list` ; rocm-smi: `rocm-smi -i`
if tool == "amd-smi":
cmd = ["amd-smi", "list"]
else:
cmd = ["rocm-smi", "-i"]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if r.returncode != 0 or not r.stdout.strip():
return False
# Minimal parse: count devices by "GPU" markers
txt = r.stdout
# heuristic: count lines containing "GPU" and an index
lines = [
x for x in txt.splitlines() if re.search(r"\bGPU\b", x, re.IGNORECASE)
]
hw["gpu_count"] = (
max(hw["gpu_count"], len(lines)) if lines else hw["gpu_count"]
)
hw["gpu_model"] = (
hw["gpu_model"] if hw["gpu_model"] != "Unknown" else "AMD GPU"
)
return True
except Exception:
return False
def _probe_ascend(self, hw: Dict[str, Any]) -> bool:
"""
Ascend: best-effort using npu-smi if present.
"""
try:
if not _which("npu-smi"):
return False
r = subprocess.run(
["npu-smi", "info"], capture_output=True, text=True, timeout=5
)
if r.returncode != 0 or not r.stdout.strip():
return False
txt = r.stdout
# heuristic: count device lines with "NPU" or "Device"
cnt = len(
[x for x in txt.splitlines() if re.search(r"\bNPU\b|\bDevice\b", x)]
)
hw["gpu_count"] = max(hw["gpu_count"], cnt) if cnt else hw["gpu_count"]
hw["gpu_model"] = (
hw["gpu_model"] if hw["gpu_model"] != "Unknown" else "Ascend NPU"
)
return True
except Exception:
return False
def _probe_cambricon(self, hw: Dict[str, Any]) -> bool:
"""
Cambricon: best-effort using cnmon if present.
"""
try:
if not _which("cnmon"):
return False
r = subprocess.run(
["cnmon", "info"], capture_output=True, text=True, timeout=5
)
if r.returncode != 0 or not r.stdout.strip():
return False
txt = r.stdout
cnt = len(
[x for x in txt.splitlines() if re.search(r"\bMLU\b|\bDevice\b", x)]
)
hw["gpu_count"] = max(hw["gpu_count"], cnt) if cnt else hw["gpu_count"]
hw["gpu_model"] = (
hw["gpu_model"] if hw["gpu_model"] != "Unknown" else "Cambricon MLU"
)
return True
except Exception:
return False
def _collect_cuda_version(self) -> Optional[str]:
try:
r = subprocess.run(
["nvcc", "--version"], capture_output=True, text=True, timeout=2
)
if r.returncode == 0:
for line in r.stdout.splitlines():
if "release" in line:
m = re.search(r"release\s+(\d+\.\d+)", line)
if m:
return m.group(1)
except Exception:
pass
return None
def _save_result(self, result: Dict[str, Any]) -> str:
"""
Save detailed result to disk as JSON.
Args:
result: Complete result dict with data and metrics
Returns:
Absolute path to saved file
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 1) Prefer run_id in result (adapter may generate a richer run_id)
run_id = None
if isinstance(result, dict):
run_id = result.get("run_id") or result.get("raw_result", {}).get("run_id")
# 2) Fallback to payload run_id
if not run_id:
run_id = self.payload.get("run_id") or self.run_id
# 3) Determine output subdirectory based on testcase
testcase = self.payload.get("testcase", "")
# Extract category from testcase (e.g., "hardware.cudaUnified.Comprehensive" -> "hardware")
category = "other" # default fallback
for prefix, subdir in TEST_CATEGORIES.items():
if testcase.startswith(prefix):
category = subdir
break
if run_id:
safe_run_id = sanitize_filename(run_id)
# Put results in category-specific subdirectory
category_dir = self.output_dir / category
category_dir.mkdir(parents=True, exist_ok=True)
filename = f"{safe_run_id}_results.json"
output_file = category_dir / filename
else:
# Final fallback to old naming (in root output_dir)
safe_name = self.testcase.replace(".", "_").replace("/", "_")
filename = f"{safe_name}_{timestamp}_results.json"
output_file = self.output_dir / filename
with open(output_file, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
logger.debug(f"Executor: Results saved to {output_file}")
return str(output_file)