Skip to content

Commit a340aed

Browse files
committed
Add SQLite audit log for action execution
1 parent bd1c720 commit a340aed

3 files changed

Lines changed: 261 additions & 0 deletions

File tree

automation_file/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
validate_action,
2020
)
2121
from automation_file.core.action_registry import ActionRegistry, build_default_registry
22+
from automation_file.core.audit import AuditException, AuditLog
2223
from automation_file.core.callback_executor import CallbackExecutor
2324
from automation_file.core.checksum import (
2425
ChecksumMismatchException,
@@ -313,6 +314,8 @@ def __getattr__(name: str) -> Any:
313314
"ManifestException",
314315
"write_manifest",
315316
"verify_manifest",
317+
"AuditException",
318+
"AuditLog",
316319
# Triggers
317320
"FileWatcher",
318321
"TriggerManager",

automation_file/core/audit.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
"""SQLite-backed audit log for executed actions.
2+
3+
``AuditLog(db_path)`` opens (or creates) a single-table SQLite database and
4+
appends one row per action execution. Rows carry the timestamp, action name,
5+
a JSON-encoded snapshot of the payload, the result / error repr, and the
6+
duration in milliseconds.
7+
8+
Writes use a short-lived connection per call (``check_same_thread=False``
9+
semantics) so the log is safe to share between background worker threads
10+
and the scheduler. Readers call :meth:`AuditLog.recent` to pull the most
11+
recent N rows.
12+
13+
The module deliberately avoids buffering / background queues: every row is
14+
persisted synchronously with an ``INSERT`` inside a ``with connect(..)`` so
15+
a crash at most loses the currently-executing action.
16+
"""
17+
18+
from __future__ import annotations
19+
20+
import json
21+
import sqlite3
22+
import threading
23+
import time
24+
from contextlib import closing
25+
from pathlib import Path
26+
from typing import Any
27+
28+
from automation_file.exceptions import FileAutomationException
29+
from automation_file.logging_config import file_automation_logger
30+
31+
_SCHEMA = """
32+
CREATE TABLE IF NOT EXISTS audit (
33+
id INTEGER PRIMARY KEY AUTOINCREMENT,
34+
ts REAL NOT NULL,
35+
action TEXT NOT NULL,
36+
payload TEXT NOT NULL,
37+
result TEXT,
38+
error TEXT,
39+
duration_ms REAL NOT NULL
40+
);
41+
CREATE INDEX IF NOT EXISTS idx_audit_ts ON audit (ts DESC);
42+
"""
43+
44+
45+
class AuditException(FileAutomationException):
46+
"""Raised when the audit log cannot be opened or written."""
47+
48+
49+
class AuditLog:
50+
"""Synchronous SQLite audit log."""
51+
52+
def __init__(self, db_path: str | Path) -> None:
53+
self._db_path = Path(db_path)
54+
self._lock = threading.Lock()
55+
try:
56+
self._db_path.parent.mkdir(parents=True, exist_ok=True)
57+
with closing(self._connect()) as conn:
58+
conn.executescript(_SCHEMA)
59+
conn.commit()
60+
except (OSError, sqlite3.DatabaseError) as err:
61+
raise AuditException(f"cannot open audit log {self._db_path}: {err}") from err
62+
63+
def record(
64+
self,
65+
action: str,
66+
payload: Any,
67+
*,
68+
result: Any = None,
69+
error: BaseException | None = None,
70+
duration_ms: float = 0.0,
71+
) -> None:
72+
"""Append a single audit row. Never raises — failures are logged only."""
73+
row = (
74+
time.time(),
75+
action,
76+
_safe_json(payload),
77+
_safe_json(result) if result is not None else None,
78+
repr(error) if error is not None else None,
79+
float(duration_ms),
80+
)
81+
try:
82+
with self._lock, closing(self._connect()) as conn:
83+
conn.execute(
84+
"INSERT INTO audit (ts, action, payload, result, error, duration_ms)"
85+
" VALUES (?, ?, ?, ?, ?, ?)",
86+
row,
87+
)
88+
conn.commit()
89+
except sqlite3.DatabaseError as err:
90+
file_automation_logger.error("audit.record failed: %r", err)
91+
92+
def recent(self, limit: int = 100) -> list[dict[str, Any]]:
93+
"""Return the newest ``limit`` rows, newest first."""
94+
if limit <= 0:
95+
return []
96+
with closing(self._connect()) as conn:
97+
cursor = conn.execute(
98+
"SELECT id, ts, action, payload, result, error, duration_ms"
99+
" FROM audit ORDER BY ts DESC LIMIT ?",
100+
(limit,),
101+
)
102+
rows = cursor.fetchall()
103+
return [
104+
{
105+
"id": row[0],
106+
"ts": row[1],
107+
"action": row[2],
108+
"payload": json.loads(row[3]) if row[3] else None,
109+
"result": json.loads(row[4]) if row[4] else None,
110+
"error": row[5],
111+
"duration_ms": row[6],
112+
}
113+
for row in rows
114+
]
115+
116+
def count(self) -> int:
117+
with closing(self._connect()) as conn:
118+
cursor = conn.execute("SELECT COUNT(*) FROM audit")
119+
(total,) = cursor.fetchone()
120+
return int(total)
121+
122+
def purge(self, older_than_seconds: float) -> int:
123+
"""Delete rows older than ``older_than_seconds`` and return the row count."""
124+
if older_than_seconds <= 0:
125+
raise AuditException("older_than_seconds must be positive")
126+
cutoff = time.time() - older_than_seconds
127+
with self._lock, closing(self._connect()) as conn:
128+
cursor = conn.execute("DELETE FROM audit WHERE ts < ?", (cutoff,))
129+
conn.commit()
130+
return int(cursor.rowcount)
131+
132+
def _connect(self) -> sqlite3.Connection:
133+
return sqlite3.connect(self._db_path, timeout=5.0)
134+
135+
136+
def _safe_json(value: Any) -> str:
137+
try:
138+
return json.dumps(value, default=repr, ensure_ascii=False)
139+
except (TypeError, ValueError):
140+
return json.dumps(repr(value), ensure_ascii=False)

tests/test_audit.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""Tests for the SQLite-backed action audit log."""
2+
3+
from __future__ import annotations
4+
5+
import time
6+
from pathlib import Path
7+
8+
import pytest
9+
10+
from automation_file import AuditException, AuditLog
11+
12+
13+
def test_audit_log_creates_parent_directory(tmp_path: Path) -> None:
14+
db_path = tmp_path / "nested" / "audit.sqlite"
15+
log = AuditLog(db_path)
16+
assert db_path.exists()
17+
assert log.count() == 0
18+
19+
20+
def test_record_and_recent_roundtrip(tmp_path: Path) -> None:
21+
log = AuditLog(tmp_path / "audit.sqlite")
22+
log.record("FA_copy_file", {"src": "a", "dst": "b"}, result={"ok": True}, duration_ms=12.5)
23+
rows = log.recent()
24+
assert len(rows) == 1
25+
row = rows[0]
26+
assert row["action"] == "FA_copy_file"
27+
assert row["payload"] == {"src": "a", "dst": "b"}
28+
assert row["result"] == {"ok": True}
29+
assert row["error"] is None
30+
assert row["duration_ms"] == 12.5
31+
32+
33+
def test_record_error_stores_repr(tmp_path: Path) -> None:
34+
log = AuditLog(tmp_path / "audit.sqlite")
35+
err = ValueError("boom")
36+
log.record("FA_test", {"x": 1}, error=err, duration_ms=0.0)
37+
row = log.recent()[0]
38+
assert row["error"] is not None
39+
assert "ValueError" in row["error"]
40+
assert "boom" in row["error"]
41+
assert row["result"] is None
42+
43+
44+
def test_recent_returns_newest_first(tmp_path: Path) -> None:
45+
log = AuditLog(tmp_path / "audit.sqlite")
46+
log.record("first", {"i": 1})
47+
time.sleep(0.01)
48+
log.record("second", {"i": 2})
49+
time.sleep(0.01)
50+
log.record("third", {"i": 3})
51+
rows = log.recent()
52+
assert [r["action"] for r in rows] == ["third", "second", "first"]
53+
54+
55+
def test_recent_respects_limit(tmp_path: Path) -> None:
56+
log = AuditLog(tmp_path / "audit.sqlite")
57+
for index in range(5):
58+
log.record(f"action-{index}", {"i": index})
59+
rows = log.recent(limit=2)
60+
assert len(rows) == 2
61+
62+
63+
def test_recent_zero_limit_returns_empty(tmp_path: Path) -> None:
64+
log = AuditLog(tmp_path / "audit.sqlite")
65+
log.record("x", {})
66+
assert log.recent(limit=0) == []
67+
68+
69+
def test_count_reflects_inserts(tmp_path: Path) -> None:
70+
log = AuditLog(tmp_path / "audit.sqlite")
71+
assert log.count() == 0
72+
log.record("a", {})
73+
log.record("b", {})
74+
assert log.count() == 2
75+
76+
77+
def test_purge_removes_old_rows(tmp_path: Path) -> None:
78+
log = AuditLog(tmp_path / "audit.sqlite")
79+
log.record("old", {})
80+
# Backdate the row so it's older than the cutoff.
81+
import sqlite3
82+
83+
with sqlite3.connect(log._db_path) as conn:
84+
conn.execute("UPDATE audit SET ts = ? WHERE action = 'old'", (time.time() - 3600,))
85+
conn.commit()
86+
log.record("fresh", {})
87+
removed = log.purge(older_than_seconds=60)
88+
assert removed == 1
89+
remaining = [row["action"] for row in log.recent()]
90+
assert remaining == ["fresh"]
91+
92+
93+
def test_purge_rejects_non_positive(tmp_path: Path) -> None:
94+
log = AuditLog(tmp_path / "audit.sqlite")
95+
with pytest.raises(AuditException):
96+
log.purge(0)
97+
98+
99+
def test_record_non_serialisable_payload_falls_back_to_repr(tmp_path: Path) -> None:
100+
log = AuditLog(tmp_path / "audit.sqlite")
101+
102+
class Custom:
103+
def __repr__(self) -> str:
104+
return "<custom>"
105+
106+
log.record("weird", {"obj": Custom()}, result={"marker": object()})
107+
row = log.recent()[0]
108+
assert row["action"] == "weird"
109+
assert "<custom>" in str(row["payload"])
110+
assert row["result"] is not None
111+
112+
113+
def test_cannot_open_audit_log_in_missing_root(tmp_path: Path) -> None:
114+
# Point at a path whose parent cannot be created (a file, not a dir).
115+
blocker = tmp_path / "blocker"
116+
blocker.write_text("x", encoding="utf-8")
117+
with pytest.raises(AuditException):
118+
AuditLog(blocker / "child" / "audit.sqlite")

0 commit comments

Comments
 (0)