diff --git a/bbot/core/event/base.py b/bbot/core/event/base.py index f50ae0c77c..90e2b96242 100644 --- a/bbot/core/event/base.py +++ b/bbot/core/event/base.py @@ -126,6 +126,8 @@ class BaseEvent: _suppress_chain_dupes = False # Shared compiled regex for discovery context formatting (class-level to avoid per-instance overhead) _discovery_context_regex = re.compile(r"\{(?:event|module)[^}]*\}") + # Stats class for the status line — override in subclasses for custom formatting + _stats_class = None # using __slots__ dramatically reduces memory usage in large scans __slots__ = [ @@ -1655,6 +1657,35 @@ def redirect_location(self): class FINDING(ClosestHostEvent): _always_emit = True _quick_emit = True + + class _stats_class: + _severity_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"] + + def __init__(self): + self.count = 0 + self.severities = {} + + def increment(self, event): + self.count += 1 + sev = event.data.get("severity", "UNKNOWN") + try: + self.severities[sev] += 1 + except KeyError: + self.severities[sev] = 1 + + def format(self, event_type): + if not self.severities: + return f"{event_type}: {self.count}" + parts = [] + for sev in self._severity_order: + n = self.severities.get(sev, 0) + if n: + parts.append(f"{n} {sev}") + for sev, n in sorted(self.severities.items()): + if sev not in self._severity_order and n: + parts.append(f"{n} {sev}") + return f"{event_type}: {self.count} ({', '.join(parts)})" + severity_colors = { "CRITICAL": "🟪", "HIGH": "🟥", diff --git a/bbot/scanner/scanner.py b/bbot/scanner/scanner.py index b2883cded6..4e2ac54b19 100644 --- a/bbot/scanner/scanner.py +++ b/bbot/scanner/scanner.py @@ -745,11 +745,9 @@ def modules_status(self, _log=False, detailed=False): self.info(f"{self.name}: Modules running (incoming:processing:outgoing) {modules_status_str}") else: self.info(f"{self.name}: No modules running") - event_type_summary = sorted(self.stats.events_emitted_by_type.items(), key=lambda x: x[-1], reverse=True) + event_type_summary = self.stats.event_type_summary() if event_type_summary: - self.info( - f"{self.name}: Events produced so far: {', '.join([f'{k}: {v}' for k, v in event_type_summary])}" - ) + self.info(f"{self.name}: Events produced so far: {', '.join(event_type_summary)}") else: self.info(f"{self.name}: No events produced yet") diff --git a/bbot/scanner/stats.py b/bbot/scanner/stats.py index faf950138c..7740bc0877 100644 --- a/bbot/scanner/stats.py +++ b/bbot/scanner/stats.py @@ -16,6 +16,19 @@ def _increment(d, k): d[k] = 1 +class EventTypeStats: + """Tracks count for an event type and formats it for the status line.""" + + def __init__(self): + self.count = 0 + + def increment(self, event): + self.count += 1 + + def format(self, event_type): + return f"{event_type}: {self.count}" + + class SpeedCounter: """ A simple class for keeping a rolling tally of the number of events inside a specific time window @@ -45,8 +58,23 @@ def __init__(self, scan): self.scan = scan self.module_stats = {} self.events_emitted_by_type = {} + self._type_stats = {} self.speedometer = SpeedCounter(scan.status_frequency) + def _get_type_stats(self, event): + event_type = event.type + try: + return self._type_stats[event_type] + except KeyError: + stats_class = getattr(event, "_stats_class", None) or EventTypeStats + self._type_stats[event_type] = stats_class() + return self._type_stats[event_type] + + def event_type_summary(self): + """Return a formatted list of event type counts, sorted by count descending.""" + entries = sorted(self._type_stats.items(), key=lambda x: x[1].count, reverse=True) + return [stats.format(event_type) for event_type, stats in entries if stats.count > 0] + def _get_attribution_module(self, event): """Return the module that should get credit for producing this event. @@ -65,6 +93,7 @@ def _get_attribution_module(self, event): def event_produced(self, event): _increment(self.events_emitted_by_type, event.type) + self._get_type_stats(event).increment(event) module = self._get_attribution_module(event) module_stat = self.get(module) if module_stat is not None: