|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from dataclasses import dataclass |
| 4 | +from typing import Dict, Optional, Tuple |
| 5 | + |
| 6 | +import numpy as np |
| 7 | + |
| 8 | + |
| 9 | +BANDS: Dict[str, Tuple[float, float]] = { |
| 10 | + "delta": (0.5, 4.0), |
| 11 | + "theta": (4.0, 8.0), |
| 12 | + "alpha": (8.0, 13.0), |
| 13 | + "beta": (13.0, 30.0), |
| 14 | + "gamma": (30.0, 80.0), |
| 15 | +} |
| 16 | + |
| 17 | + |
| 18 | +@dataclass |
| 19 | +class BandPowerResult: |
| 20 | + """Per-band absolute and relative power, plus derived confidence.""" |
| 21 | + absolute: Dict[str, float] |
| 22 | + relative: Dict[str, float] |
| 23 | + total_power: float |
| 24 | + engagement_ratio: float |
| 25 | + cognitive_ratio: float |
| 26 | + confidence: float |
| 27 | + intent_class: str |
| 28 | + |
| 29 | + |
| 30 | +class BandPowerAnalyzer: |
| 31 | + """ |
| 32 | + Computes EEG band power from a 1-D signal array using Welch's method. |
| 33 | +
|
| 34 | + Args: |
| 35 | + sampling_rate: Hz. Must match the actual acquisition rate of the signal. |
| 36 | + nperseg: Welch segment length. Defaults to min(256, len(signal)). |
| 37 | + """ |
| 38 | + |
| 39 | + def __init__(self, sampling_rate: float = 256.0, nperseg: Optional[int] = None) -> None: |
| 40 | + self.sampling_rate = float(sampling_rate) |
| 41 | + self._nperseg = nperseg |
| 42 | + |
| 43 | + def analyze(self, signal: np.ndarray) -> BandPowerResult: |
| 44 | + signal = np.asarray(signal, dtype=float).ravel() |
| 45 | + if len(signal) < 4: |
| 46 | + return self._zero_result() |
| 47 | + |
| 48 | + freqs, psd = self._welch(signal) |
| 49 | + |
| 50 | + absolute: Dict[str, float] = {} |
| 51 | + _integrate = getattr(np, "trapezoid", None) or getattr(np, "trapz") |
| 52 | + for band, (lo, hi) in BANDS.items(): |
| 53 | + mask = (freqs >= lo) & (freqs < hi) |
| 54 | + absolute[band] = float(_integrate(psd[mask], freqs[mask])) if mask.any() else 0.0 |
| 55 | + |
| 56 | + total = sum(absolute.values()) or 1e-9 |
| 57 | + relative = {b: v / total for b, v in absolute.items()} |
| 58 | + |
| 59 | + engagement = (absolute["beta"] + absolute["gamma"]) / max(absolute["alpha"] + absolute["theta"], 1e-9) |
| 60 | + cognitive = (absolute["theta"] + absolute["gamma"]) / max(absolute["alpha"], 1e-9) |
| 61 | + |
| 62 | + intent_class, confidence = self._classify(engagement, cognitive, relative) |
| 63 | + |
| 64 | + return BandPowerResult( |
| 65 | + absolute=absolute, |
| 66 | + relative=relative, |
| 67 | + total_power=total, |
| 68 | + engagement_ratio=round(engagement, 4), |
| 69 | + cognitive_ratio=round(cognitive, 4), |
| 70 | + confidence=round(float(np.clip(confidence, 0.0, 1.0)), 3), |
| 71 | + intent_class=intent_class, |
| 72 | + ) |
| 73 | + |
| 74 | + def _welch(self, signal: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| 75 | + n = len(signal) |
| 76 | + nperseg = self._nperseg or min(256, n) |
| 77 | + nperseg = min(nperseg, n) |
| 78 | + step = nperseg // 2 or 1 |
| 79 | + |
| 80 | + window = np.hanning(nperseg) |
| 81 | + win_power = np.sum(window ** 2) |
| 82 | + |
| 83 | + segments = [] |
| 84 | + start = 0 |
| 85 | + while start + nperseg <= n: |
| 86 | + seg = signal[start:start + nperseg] * window |
| 87 | + segments.append(np.abs(np.fft.rfft(seg)) ** 2 / (self.sampling_rate * win_power)) |
| 88 | + start += step |
| 89 | + |
| 90 | + if not segments: |
| 91 | + psd = np.abs(np.fft.rfft(signal)) ** 2 / (self.sampling_rate * len(signal)) |
| 92 | + else: |
| 93 | + psd = np.mean(segments, axis=0) |
| 94 | + |
| 95 | + freqs = np.fft.rfftfreq(nperseg, d=1.0 / self.sampling_rate) |
| 96 | + return freqs, psd |
| 97 | + |
| 98 | + def _classify( |
| 99 | + self, |
| 100 | + engagement: float, |
| 101 | + cognitive: float, |
| 102 | + relative: Dict[str, float], |
| 103 | + ) -> Tuple[str, float]: |
| 104 | + alpha_dom = relative.get("alpha", 0.0) > 0.40 |
| 105 | + |
| 106 | + if alpha_dom: |
| 107 | + confidence = max(0.10, 0.5 - relative["alpha"]) |
| 108 | + return "unclear", confidence |
| 109 | + |
| 110 | + if engagement >= cognitive: |
| 111 | + confidence = float(np.clip(engagement / (engagement + 1.5), 0.1, 1.0)) |
| 112 | + return "motor", confidence |
| 113 | + |
| 114 | + confidence = float(np.clip(cognitive / (cognitive + 2.0), 0.1, 1.0)) |
| 115 | + return "cognitive", confidence |
| 116 | + |
| 117 | + def _zero_result(self) -> BandPowerResult: |
| 118 | + zero_bands = {b: 0.0 for b in BANDS} |
| 119 | + return BandPowerResult( |
| 120 | + absolute=zero_bands, |
| 121 | + relative=zero_bands, |
| 122 | + total_power=0.0, |
| 123 | + engagement_ratio=0.0, |
| 124 | + cognitive_ratio=0.0, |
| 125 | + confidence=0.0, |
| 126 | + intent_class="unclear", |
| 127 | + ) |
0 commit comments