|
| 1 | +# ============================================================ |
| 2 | +# File: finbot/aegis/sentinel.py |
| 3 | +# Purpose: Hash-chained HMAC audit trail on Redis via EventBus |
| 4 | +# Author: Jean Francois Regis MUKIZA |
| 5 | +# GSoC Week: 2 |
| 6 | +# OWASP Category: ASI06 Memory Poisoning, ASI08 Cascading Failures |
| 7 | +# ============================================================ |
| 8 | +"""SentinelStream: hash-chained forensic audit events on Redis.""" |
| 9 | + |
| 10 | +import hashlib |
| 11 | +import hmac |
| 12 | +import json |
| 13 | +import logging |
| 14 | +from datetime import UTC, datetime |
| 15 | +from typing import Any |
| 16 | + |
| 17 | +from finbot.aegis.schemas import AuditEvent |
| 18 | +from finbot.config import settings |
| 19 | +from finbot.core.auth.session import SessionContext |
| 20 | +from finbot.core.messaging import event_bus |
| 21 | + |
| 22 | +logger = logging.getLogger(__name__) |
| 23 | + |
| 24 | + |
| 25 | +class SentinelStream: |
| 26 | + """Records tamper-evident audit events with per-namespace hash chains.""" |
| 27 | + |
| 28 | + def __init__(self) -> None: |
| 29 | + self._chain_key = "aegis:audit:chain_head" |
| 30 | + signing_key = settings.SESSION_SIGNING_KEY or settings.SECRET_KEY |
| 31 | + self._signing_key = signing_key.encode() |
| 32 | + |
| 33 | + async def record( |
| 34 | + self, |
| 35 | + *, |
| 36 | + event_type: str, |
| 37 | + namespace: str, |
| 38 | + workflow_id: str, |
| 39 | + agent_name: str, |
| 40 | + payload: dict[str, Any], |
| 41 | + session_context: SessionContext, |
| 42 | + ) -> AuditEvent: |
| 43 | + prev_hash = await self._get_chain_head(namespace) |
| 44 | + timestamp = datetime.now(UTC).isoformat() |
| 45 | + body = { |
| 46 | + "event_type": event_type, |
| 47 | + "namespace": namespace, |
| 48 | + "workflow_id": workflow_id, |
| 49 | + "agent_name": agent_name, |
| 50 | + "payload": payload, |
| 51 | + "timestamp": timestamp, |
| 52 | + "prev_hash": prev_hash, |
| 53 | + } |
| 54 | + canonical = json.dumps(body, sort_keys=True, separators=(",", ":")) |
| 55 | + event_hash = hmac.new( |
| 56 | + self._signing_key, |
| 57 | + canonical.encode(), |
| 58 | + hashlib.sha256, |
| 59 | + ).hexdigest() |
| 60 | + audit = AuditEvent(**body, event_hash=event_hash) |
| 61 | + await self._set_chain_head(namespace, event_hash) |
| 62 | + await event_bus.emit_agent_event( |
| 63 | + agent_name="aegis", |
| 64 | + event_type=f"audit.{event_type}", |
| 65 | + event_subtype="security", |
| 66 | + event_data={**body, "event_hash": event_hash}, |
| 67 | + session_context=session_context, |
| 68 | + workflow_id=workflow_id, |
| 69 | + summary=f"AEGIS audit: {event_type}", |
| 70 | + ) |
| 71 | + return audit |
| 72 | + |
| 73 | + async def _get_chain_head(self, namespace: str) -> str | None: |
| 74 | + key = f"{self._chain_key}:{namespace}" |
| 75 | + try: |
| 76 | + val = await event_bus.redis.get(key) |
| 77 | + if val is None: |
| 78 | + return None |
| 79 | + return val.decode() if isinstance(val, bytes) else str(val) |
| 80 | + except Exception: # pylint: disable=broad-exception-caught |
| 81 | + logger.debug("Could not read AEGIS chain head for %s", namespace, exc_info=True) |
| 82 | + return None |
| 83 | + |
| 84 | + async def _set_chain_head(self, namespace: str, digest: str) -> None: |
| 85 | + key = f"{self._chain_key}:{namespace}" |
| 86 | + try: |
| 87 | + await event_bus.redis.set(key, digest, ex=settings.AEGIS_AUDIT_CHAIN_TTL) |
| 88 | + except Exception: # pylint: disable=broad-exception-caught |
| 89 | + logger.debug("Could not write AEGIS chain head for %s", namespace, exc_info=True) |
0 commit comments