diff --git a/CHANGELOG.md b/CHANGELOG.md index 95d4a69..f57d384 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,59 @@ All notable changes to Immunity Agent (Prismor Warden) are documented here. The format loosely follows [Keep a Changelog](https://keepachangelog.com/) and the project uses [Semantic Versioning](https://semver.org/). +## [1.3.0] — 2026-05-03 + +Signed audit log. Every Warden decision (allow / observe / block) is now +recorded as a hash-chained, optionally Ed25519-signed record so that a third +party with the public key can verify what the agent attempted, what was +blocked, and why — and prove the log has not been tampered with. + +### Added + +- **`warden/audit_log.py`** — append-only NDJSON decision log at + `.prismor-warden/audit/.ndjson`. Each record carries `seq`, + `prev_hash`, `record_hash` (SHA-256 of canonical bytes), the redacted event, + the decision, the matching findings, and pinned `policy_hash` and + `feed_hash` so a verifier can reconstruct the policy that was in effect at + decision time. Pinned policy and feed snapshots are written once per hash + under `audit/policies//` and `audit/feed/.json`. +- **`warden/signing.py`** — Ed25519 keygen, sign, verify, and key fingerprint + helpers. Uses the `cryptography` library when present; falls back to + `openssl pkeyutl` (the same approach as `pipeline/sign_feed.sh`) so the + package keeps working in minimal environments. +- **`warden audit-log` CLI** — `keygen`, `pubkey`, `list`, `show`, `verify`, + `seal`, `register-pubkey`, `replay`. `verify` walks the chain, recomputes + hashes, and validates signatures, exiting non-zero on tampering. `seal` + writes a signed manifest of the head hash on session close. +- **Per-decision sink-friendly record** — the audit log is written for every + decision (not just blocks), so an external verifier or SIEM consumer can + audit the full agent trajectory rather than only the denied actions. + +### Security properties + +- **Hash-chained**. Modifying any record breaks `record_hash` for that record + and `prev_hash` for the next; `warden audit-log verify` flags both. +- **Signed (opt-in)**. Generate a keypair with `warden audit-log keygen`; from + then on every record carries an Ed25519 signature over its canonical bytes, + and tampering invalidates the signature even if an attacker recomputes the + hash chain. +- **Privacy-preserving by default**. Sensitive event fields (`command`, + `path`, `url`, `prompt`, `content`, `response`) are stored as SHA-256 + digests plus length, not plaintext. Set `audit.include_raw: true` under + `settings` in `policy.yaml` to retain raw text for orgs that need it. +- **Replay-ready**. Pinned `policy_hash` and `feed_hash` make decisions + reproducible: `warden audit-log replay` checks that all referenced policy + snapshots are present on disk. + +### Changed + +- `PolicyEngine` exposes `audit_settings` parsed from `settings.audit` in + `policy.yaml`, used by the dispatcher to decide whether to retain raw + evidence in audit records. +- `warden hook-dispatch` now writes one audit record per decision after + running the policy engine and before applying the block. Failures are + logged to stderr and never block the user's tool call. + ## [1.2.0] — 2026-04-27 Tier 3 — Scoped Agent and Session-Based Learning. Adds per-session rule diff --git a/README.md b/README.md index 98bb834..c5f4f78 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,7 @@ flowchart TD - 🛜 [Network Isolation](docs/network-isolation.md) covers egress allowlists, raw IP detection, and tunnel blocking - 🔍 [Skill Scanner](docs/skill-scanner.md) covers MCP server and skill risk scanning across supported agents - 🔐 [Sweep and Cloak](docs/sweep-and-cloak.md) covers secret prevention at tool boundaries and cleanup for leaked secrets +- 📜 [Audit Log](docs/audit-log.md) covers the hash-chained, Ed25519-signed decision log for governance and replay - 🐳 [Docker and Containers](docs/docker.md) covers container hardening, prerequisites, and known limitations --- diff --git a/docs/audit-log.md b/docs/audit-log.md new file mode 100644 index 0000000..b8e1b0e --- /dev/null +++ b/docs/audit-log.md @@ -0,0 +1,146 @@ +# Audit Log + +Warden writes a tamper-evident, optionally signed record for every decision +(allow / observe / block) it makes. The log is what governance and audit +teams ask for when they want to know "what did the agent attempt, what was +blocked, and why — and can you prove it?" + +## What gets recorded + +One NDJSON record per decision, appended to +`.prismor-warden/audit/.ndjson`: + +```json +{ + "v": 1, + "alg": "sha256", + "seq": 0, + "ts": "2026-05-03T19:03:24.139Z", + "session_id": "...", + "agent": "claude", + "mode": "enforce", + "workspace_id": "5a25e2b3711fcbf2", + "event": { + "type": "shell", + "agent_event": "PreToolUse", + "command_hash": "1de700c2...", + "command_len": 6 + }, + "decision": "allow", + "findings": [], + "policy_hash": "a0066d3f...", + "feed_hash": "fb010dd8...", + "agent_version": "warden 1.3.0", + "prev_hash": "GENESIS", + "record_hash": "9286523ecb239dc5...", + "sig": { + "alg": "ed25519", + "key_id": "0d835451d9e089ff", + "value": "" + } +} +``` + +- **`prev_hash` / `record_hash`** — every record links to its predecessor. + Modifying any field changes `record_hash`; modifying `record_hash` breaks + `prev_hash` on the next record. `warden audit-log verify` walks the chain + and reports both kinds of break. +- **`policy_hash` / `feed_hash`** — fingerprint of the policy and threat feed + that were in effect when the decision was made. The first time we see a + hash, we copy the source files into `audit/policies//` and + `audit/feed/.json`, so replay has the exact rule set the decision was + derived from. +- **`sig`** — present when an Ed25519 signing key is configured (see below). + Signs the canonical bytes of the record (excluding `record_hash` and `sig` + itself). + +## Privacy: hashed evidence by default + +Sensitive event fields — `command`, `path`, `url`, `prompt`, `content`, +`response`, and the `evidence` field of every finding — are stored as +SHA-256 digests plus length only. The raw text is **not** written to the +audit log. This makes the log safe to forward to a SIEM without leaking +secrets. + +To retain raw text (for orgs that need full text in their audit trail), set +`audit.include_raw: true` under `settings` in `.prismor-warden/policy.yaml`: + +```yaml +settings: + audit: + include_raw: true +``` + +## Enabling signatures + +Hash chaining is on by default — no setup. To turn on Ed25519 signing: + +```bash +warden audit-log keygen +``` + +This writes: + +- `~/.prismor/keys/audit-signer.key` (mode 0600, parent dir 0700) +- `~/.prismor/keys/audit-signer.pub` (the public key to distribute) + +From the next decision onward, every record carries a signature. The public +key fingerprint (`key_id`, first 16 hex chars of SHA-256 over the raw key +bytes) is written to each record so a verifier with multiple pubkeys can +pick the right one. + +To use a centrally-managed key (for example a key issued from a KMS or +mounted from a secret manager in CI), set: + +```bash +export WARDEN_AUDIT_SIGNING_KEY=/path/to/private.pem +export WARDEN_AUDIT_SIGNING_PUBKEY=/path/to/public.pem +``` + +If neither the env var nor the default path is set, records are still +hash-chained but unsigned. + +## CLI + +```bash +warden audit-log keygen [--out-dir DIR] [--force] +warden audit-log pubkey [--key PATH] # print PEM + key_id +warden audit-log list # all sessions in workspace +warden audit-log show # human-readable trace +warden audit-log verify [--session-id ID] [--json] # exit 2 on tamper +warden audit-log seal # write signed manifest +warden audit-log register-pubkey # add a verifier key +warden audit-log replay [--json] # check pinned policy presence +``` + +`verify` is the workhorse: for each record it recomputes the hash from +canonical bytes, checks `prev_hash` against the previous record, and (if +present) verifies the Ed25519 signature against the registered public key. +Any failure produces a structured report and a non-zero exit code. + +## Verifying from outside the workspace + +Any third party with the public key can verify the chain themselves: + +```bash +# Copy the audit dir + the public key off the host +scp -r host:.prismor-warden/audit ./audit-dump/ +scp host:~/.prismor/keys/audit-signer.pub ./ + +# Register the pubkey in the dump and verify +warden --workspace ./audit-dump audit-log register-pubkey ./audit-signer.pub +warden --workspace ./audit-dump audit-log verify +``` + +This is the answer to the question "are you generating a signed, replayable +log of what gets blocked and why?" — yes, and a verifier doesn't need to +trust the host to confirm it. + +## Sealing on session close + +`warden audit-log seal ` writes a manifest at +`.prismor-warden/audit/.seal` containing the record count, the +head record hash, and (if signing is on) a signature over the manifest. The +seal is the single artifact a downstream system needs to keep — anyone +holding the seal and the audit file can later prove that no records were +appended, removed, or modified after the seal was written. diff --git a/tests/test_audit_log.py b/tests/test_audit_log.py new file mode 100644 index 0000000..7707821 --- /dev/null +++ b/tests/test_audit_log.py @@ -0,0 +1,340 @@ +"""Tests for warden.audit_log — chained, signed, replayable decision log.""" + +import json +import os +import sys +import tempfile +import unittest +from pathlib import Path + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from warden import audit_log as al +from warden.audit_log import ( + SigningConfig, + canonical_bytes, + read_records, + seal_session, + session_path, + sha256_hex, + verify_chain, + write_record, +) +from warden.signing import key_id, keygen + + +def _fresh_repo(td: Path) -> Path: + """Create a minimal repo skeleton with a default_policy.yaml under it.""" + repo = td / "repo" + (repo / "warden").mkdir(parents=True) + (repo / "warden" / "default_policy.yaml").write_text("rules: []\nsettings: {}\n") + return repo + + +def _signing(td: Path) -> SigningConfig: + keys = td / "keys" + keys.mkdir() + priv = keys / "a.key" + pub = keys / "a.pub" + keygen(priv, pub) + return SigningConfig( + private_key=priv, + public_key=pub, + key_id=key_id(pub), + enabled=True, + ) + + +def _event(cmd: str = "ls -la") -> dict: + return { + "type": "shell", + "agent_event": "PreToolUse", + "command": cmd, + "ts": "2026-05-03T00:00:00Z", + } + + +def _finding(rule_id: str = "test-rule") -> dict: + return { + "id": f"sess:{rule_id}-0", + "ruleId": rule_id, + "severity": "HIGH", + "category": "test", + "title": "test title", + "evidence": "matched-evidence", + "action": "block", + } + + +class TestCanonicalEncoding(unittest.TestCase): + def test_canonical_bytes_excludes_record_hash_and_sig(self): + rec = {"a": 1, "record_hash": "x", "sig": {"value": "y"}} + out = canonical_bytes(rec) + self.assertEqual(out, b'{"a":1}') + + def test_canonical_bytes_sorted_keys(self): + rec = {"b": 2, "a": 1} + self.assertEqual(canonical_bytes(rec), b'{"a":1,"b":2}') + + def test_canonical_bytes_no_whitespace(self): + rec = {"a": [1, 2], "b": {"c": 3}} + self.assertNotIn(b" ", canonical_bytes(rec)) + + def test_sha256_hex_is_deterministic(self): + self.assertEqual(sha256_hex(b"hello"), sha256_hex(b"hello")) + self.assertNotEqual(sha256_hex(b"hello"), sha256_hex(b"world")) + + +class TestChain(unittest.TestCase): + def test_first_record_uses_genesis_prev_hash(self): + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + ws = td_path / "ws" + ws.mkdir() + repo = _fresh_repo(td_path) + rec = write_record( + workspace=ws, session_id="s1", agent="claude", mode="enforce", + event=_event(), decision="allow", findings=[], repo_root=repo, + ) + self.assertEqual(rec["prev_hash"], "GENESIS") + self.assertEqual(rec["seq"], 0) + + def test_subsequent_records_chain_to_predecessor(self): + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + ws = td_path / "ws"; ws.mkdir() + repo = _fresh_repo(td_path) + r0 = write_record(workspace=ws, session_id="s", agent="claude", + mode="enforce", event=_event("a"), + decision="allow", findings=[], repo_root=repo) + r1 = write_record(workspace=ws, session_id="s", agent="claude", + mode="enforce", event=_event("b"), + decision="block", findings=[_finding()], repo_root=repo) + self.assertEqual(r1["prev_hash"], r0["record_hash"]) + self.assertEqual(r1["seq"], 1) + + def test_record_hash_is_sha256_of_canonical_bytes(self): + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + ws = td_path / "ws"; ws.mkdir() + repo = _fresh_repo(td_path) + rec = write_record(workspace=ws, session_id="s", agent="claude", + mode="enforce", event=_event(), + decision="allow", findings=[], repo_root=repo) + self.assertEqual(sha256_hex(canonical_bytes(rec)), rec["record_hash"]) + + +class TestVerify(unittest.TestCase): + def _build_session(self, td: Path, *, sign: bool = False, n: int = 3): + ws = td / "ws"; ws.mkdir() + repo = _fresh_repo(td) + sig_cfg = _signing(td) if sign else None + for i in range(n): + decision = "allow" if i == 0 else "block" + findings = [] if decision == "allow" else [_finding(f"r{i}")] + write_record( + workspace=ws, session_id="s", agent="claude", mode="enforce", + event=_event(f"cmd{i}"), decision=decision, findings=findings, + repo_root=repo, signing=sig_cfg, + ) + return ws + + def test_verify_passes_clean_unsigned_chain(self): + with tempfile.TemporaryDirectory() as td: + ws = self._build_session(Path(td), sign=False) + res = verify_chain(session_path(ws, "s"), workspace=ws) + self.assertTrue(res.ok) + self.assertEqual(res.records, 3) + self.assertEqual(res.unsigned, 3) + self.assertEqual(res.signed, 0) + + def test_verify_passes_clean_signed_chain(self): + with tempfile.TemporaryDirectory() as td: + ws = self._build_session(Path(td), sign=True) + res = verify_chain(session_path(ws, "s"), workspace=ws) + self.assertTrue(res.ok) + self.assertEqual(res.signed, 3) + + def test_verify_detects_field_tamper_via_record_hash(self): + """Modify a field but leave record_hash unchanged — must fail.""" + with tempfile.TemporaryDirectory() as td: + ws = self._build_session(Path(td), sign=False) + path = session_path(ws, "s") + lines = path.read_text().splitlines() + rec = json.loads(lines[1]) + rec["decision"] = "allow" # was "block" + lines[1] = json.dumps(rec) + path.write_text("\n".join(lines) + "\n") + res = verify_chain(path, workspace=ws) + self.assertFalse(res.ok) + self.assertTrue(any("record_hash" in reason for _, reason in res.chain_breaks)) + + def test_verify_detects_signature_tamper(self): + """Modify a field; sig over the new bytes won't validate.""" + with tempfile.TemporaryDirectory() as td: + ws = self._build_session(Path(td), sign=True) + path = session_path(ws, "s") + lines = path.read_text().splitlines() + rec = json.loads(lines[1]) + rec["decision"] = "allow" + lines[1] = json.dumps(rec) + path.write_text("\n".join(lines) + "\n") + res = verify_chain(path, workspace=ws) + self.assertFalse(res.ok) + self.assertTrue(len(res.bad_signatures) >= 1) + + def test_verify_detects_dropped_record(self): + """Removing a middle record breaks the chain via prev_hash mismatch.""" + with tempfile.TemporaryDirectory() as td: + ws = self._build_session(Path(td), sign=False) + path = session_path(ws, "s") + lines = path.read_text().splitlines() + del lines[1] + path.write_text("\n".join(lines) + "\n") + res = verify_chain(path, workspace=ws) + self.assertFalse(res.ok) + self.assertTrue(any("prev_hash" in reason or "seq" in reason + for _, reason in res.chain_breaks)) + + def test_verify_detects_full_recompute_attempt_without_key(self): + """Even if attacker recomputes hashes for a tampered chain, missing + signature (since they don't have the private key) is detected.""" + with tempfile.TemporaryDirectory() as td: + ws = self._build_session(Path(td), sign=True) + path = session_path(ws, "s") + lines = path.read_text().splitlines() + # Attacker rewrites record 1: changes decision and recomputes record_hash + rec = json.loads(lines[1]) + rec["decision"] = "allow" + from warden.audit_log import canonical_bytes as _cb + new_hash = sha256_hex(_cb(rec)) + rec["record_hash"] = new_hash + # ...and updates seq=2's prev_hash to point at the new hash + lines[1] = json.dumps(rec) + rec2 = json.loads(lines[2]) + rec2["prev_hash"] = new_hash + new_hash2 = sha256_hex(_cb(rec2)) + rec2["record_hash"] = new_hash2 + lines[2] = json.dumps(rec2) + path.write_text("\n".join(lines) + "\n") + res = verify_chain(path, workspace=ws) + # Signatures invalidated even though chain looks well-formed + self.assertFalse(res.ok) + self.assertGreaterEqual(len(res.bad_signatures), 1) + + +class TestRedaction(unittest.TestCase): + def test_default_redacts_command_to_hash(self): + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + ws = td_path / "ws"; ws.mkdir() + repo = _fresh_repo(td_path) + secret_cmd = "cat /etc/shadow" + rec = write_record( + workspace=ws, session_id="s", agent="claude", mode="enforce", + event=_event(secret_cmd), decision="block", + findings=[_finding()], repo_root=repo, + ) + ev = rec["event"] + self.assertNotIn("command", ev) + self.assertEqual(ev["command_hash"], sha256_hex(secret_cmd.encode())) + self.assertEqual(ev["command_len"], len(secret_cmd)) + + def test_include_raw_keeps_command_text(self): + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + ws = td_path / "ws"; ws.mkdir() + repo = _fresh_repo(td_path) + secret_cmd = "echo hello" + rec = write_record( + workspace=ws, session_id="s", agent="claude", mode="enforce", + event=_event(secret_cmd), decision="allow", + findings=[], repo_root=repo, include_raw=True, + ) + self.assertEqual(rec["event"]["command"], secret_cmd) + self.assertEqual(rec["event"]["command_hash"], sha256_hex(secret_cmd.encode())) + + +class TestPolicyAndFeedPinning(unittest.TestCase): + def test_policy_hash_is_stable_across_records(self): + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + ws = td_path / "ws"; ws.mkdir() + repo = _fresh_repo(td_path) + r0 = write_record(workspace=ws, session_id="s", agent="claude", + mode="enforce", event=_event("a"), + decision="allow", findings=[], repo_root=repo) + r1 = write_record(workspace=ws, session_id="s", agent="claude", + mode="enforce", event=_event("b"), + decision="allow", findings=[], repo_root=repo) + self.assertEqual(r0["policy_hash"], r1["policy_hash"]) + + def test_policy_snapshot_is_pinned_on_disk(self): + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + ws = td_path / "ws"; ws.mkdir() + repo = _fresh_repo(td_path) + rec = write_record(workspace=ws, session_id="s", agent="claude", + mode="enforce", event=_event(), + decision="allow", findings=[], repo_root=repo) + snap = al.policies_dir(ws) / rec["policy_hash"] / "default_policy.yaml" + self.assertTrue(snap.exists()) + + def test_policy_hash_changes_when_override_added(self): + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + ws = td_path / "ws"; ws.mkdir() + repo = _fresh_repo(td_path) + r0 = write_record(workspace=ws, session_id="s", agent="claude", + mode="enforce", event=_event(), + decision="allow", findings=[], repo_root=repo) + override_dir = ws / ".prismor-warden" + override_dir.mkdir(exist_ok=True) + (override_dir / "policy.yaml").write_text("rules:\n - id: extra\n") + r1 = write_record(workspace=ws, session_id="s", agent="claude", + mode="enforce", event=_event(), + decision="allow", findings=[], repo_root=repo) + self.assertNotEqual(r0["policy_hash"], r1["policy_hash"]) + + +class TestSeal(unittest.TestCase): + def test_seal_writes_manifest_with_head_hash(self): + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + ws = td_path / "ws"; ws.mkdir() + repo = _fresh_repo(td_path) + sig = _signing(td_path) + for i in range(3): + write_record(workspace=ws, session_id="s", agent="claude", + mode="enforce", event=_event(f"c{i}"), + decision="allow", findings=[], + repo_root=repo, signing=sig) + manifest = seal_session(workspace=ws, session_id="s", signing=sig) + self.assertIsNotNone(manifest) + self.assertEqual(manifest["records"], 3) + self.assertEqual(manifest["head_seq"], 2) + self.assertIn("head_hash", manifest) + self.assertIn("sig", manifest) + + def test_seal_returns_none_for_unknown_session(self): + with tempfile.TemporaryDirectory() as td: + ws = Path(td) / "ws"; ws.mkdir() + self.assertIsNone(seal_session(workspace=ws, session_id="nope")) + + +class TestReadRecords(unittest.TestCase): + def test_read_records_yields_in_order(self): + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + ws = td_path / "ws"; ws.mkdir() + repo = _fresh_repo(td_path) + for i in range(4): + write_record(workspace=ws, session_id="s", agent="claude", + mode="enforce", event=_event(f"c{i}"), + decision="allow", findings=[], repo_root=repo) + seqs = [r["seq"] for r in read_records(session_path(ws, "s"))] + self.assertEqual(seqs, [0, 1, 2, 3]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_signing.py b/tests/test_signing.py new file mode 100644 index 0000000..b440f83 --- /dev/null +++ b/tests/test_signing.py @@ -0,0 +1,109 @@ +"""Tests for warden.signing — Ed25519 keygen, sign, verify.""" + +import os +import sys +import tempfile +import unittest +from pathlib import Path + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from warden.signing import ( + SigningUnavailable, + b64decode, + b64encode, + key_id, + keygen, + sign, + verify, +) + + +class TestKeygen(unittest.TestCase): + def test_keygen_writes_files_with_correct_modes(self): + with tempfile.TemporaryDirectory() as td: + priv = Path(td) / "k.key" + pub = Path(td) / "k.pub" + keygen(priv, pub) + self.assertTrue(priv.exists()) + self.assertTrue(pub.exists()) + self.assertEqual(priv.stat().st_mode & 0o777, 0o600) + # public key bytes should be a valid PEM + self.assertIn(b"BEGIN PUBLIC KEY", pub.read_bytes()) + + def test_keygen_creates_parent_dir(self): + with tempfile.TemporaryDirectory() as td: + nested = Path(td) / "deep" / "nested" + priv = nested / "k.key" + pub = nested / "k.pub" + keygen(priv, pub) + self.assertTrue(priv.exists()) + + +class TestSignVerifyRoundtrip(unittest.TestCase): + def setUp(self): + self._td = tempfile.TemporaryDirectory() + self.priv = Path(self._td.name) / "k.key" + self.pub = Path(self._td.name) / "k.pub" + keygen(self.priv, self.pub) + + def tearDown(self): + self._td.cleanup() + + def test_signature_roundtrip_succeeds(self): + msg = b"audit record bytes" + sig = sign(msg, self.priv) + self.assertEqual(len(sig), 64) # Ed25519 sigs are 64 bytes + self.assertTrue(verify(msg, sig, self.pub)) + + def test_verify_rejects_tampered_message(self): + msg = b"original" + sig = sign(msg, self.priv) + self.assertFalse(verify(b"tampered", sig, self.pub)) + + def test_verify_rejects_tampered_signature(self): + msg = b"original" + sig = bytearray(sign(msg, self.priv)) + sig[0] ^= 0xFF # flip a byte + self.assertFalse(verify(msg, bytes(sig), self.pub)) + + def test_verify_rejects_wrong_pubkey(self): + msg = b"original" + sig = sign(msg, self.priv) + with tempfile.TemporaryDirectory() as td: + other_priv = Path(td) / "other.key" + other_pub = Path(td) / "other.pub" + keygen(other_priv, other_pub) + self.assertFalse(verify(msg, sig, other_pub)) + + def test_sign_missing_key_raises(self): + with self.assertRaises(SigningUnavailable): + sign(b"x", Path("/nonexistent/key.key")) + + def test_b64_roundtrip(self): + data = b"\x00\x01\x02\xff binary" + self.assertEqual(b64decode(b64encode(data)), data) + + +class TestKeyID(unittest.TestCase): + def test_key_id_is_stable(self): + with tempfile.TemporaryDirectory() as td: + priv = Path(td) / "k.key" + pub = Path(td) / "k.pub" + keygen(priv, pub) + kid1 = key_id(pub) + kid2 = key_id(pub) + self.assertEqual(kid1, kid2) + self.assertEqual(len(kid1), 16) + + def test_key_id_differs_per_keypair(self): + with tempfile.TemporaryDirectory() as td: + p1, k1 = Path(td) / "a.key", Path(td) / "a.pub" + p2, k2 = Path(td) / "b.key", Path(td) / "b.pub" + keygen(p1, k1) + keygen(p2, k2) + self.assertNotEqual(key_id(k1), key_id(k2)) + + +if __name__ == "__main__": + unittest.main() diff --git a/warden/__init__.py b/warden/__init__.py index de076ae..d87e0a0 100644 --- a/warden/__init__.py +++ b/warden/__init__.py @@ -1,3 +1,3 @@ """Prismor Warden local session-security utility.""" -__version__ = "1.2.0" +__version__ = "1.3.0" diff --git a/warden/audit_log.py b/warden/audit_log.py new file mode 100644 index 0000000..2c5f13e --- /dev/null +++ b/warden/audit_log.py @@ -0,0 +1,548 @@ +"""Tamper-evident, replayable audit log for Warden decisions. + +One record per decision (allow / observe / block) is written to a per-session +NDJSON file. Each record is hash-chained to its predecessor — modifying any +prior record breaks the chain. Records are optionally Ed25519-signed when a +signing key is configured. + +Layout under the workspace: + + .prismor-warden/audit/ + .ndjson # chained records (one JSON per line) + .seal # final manifest (head hash + sig) on close + policies// # pinned policy snapshot (for replay) + feed/.json # pinned advisory feed snapshot (for replay) + pubkeys/.pub # registered verifier public keys + +Public API used by the hook dispatcher: + + write_record(workspace, session_id, agent, mode, event, decision, + findings, repo_root) -> Dict + verify_chain(path) -> VerifyResult + read_records(path) -> Iterator[Dict] + +Default behavior: + - Always: SHA-256 hash chain (zero setup). + - Signing: enabled when ``~/.prismor/keys/audit-signer.key`` exists OR + the env var ``WARDEN_AUDIT_SIGNING_KEY`` points to a private key file. + - Raw evidence (commands/paths/etc.) is hashed-only by default. Set + ``audit.include_raw: true`` in policy.yaml settings to retain plaintext. +""" +from __future__ import annotations + +import hashlib +import json +import os +import shutil +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple + +from warden.signing import ( + SigningUnavailable, + b64decode, + b64encode, + key_id as _key_id, + sign as _sign, + verify as _verify, +) + +RECORD_VERSION = 1 +HASH_ALGO = "sha256" + +_SENSITIVE_FIELDS = ("command", "path", "url", "content", "prompt", "response") + + +# ── Paths ─────────────────────────────────────────────────────────────────── + +def audit_dir(workspace: Path) -> Path: + return Path(workspace) / ".prismor-warden" / "audit" + + +def session_path(workspace: Path, session_id: str) -> Path: + safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in session_id) + return audit_dir(workspace) / f"{safe}.ndjson" + + +def seal_path(workspace: Path, session_id: str) -> Path: + safe = "".join(c if c.isalnum() or c in "._-" else "_" for c in session_id) + return audit_dir(workspace) / f"{safe}.seal" + + +def policies_dir(workspace: Path) -> Path: + return audit_dir(workspace) / "policies" + + +def feed_dir(workspace: Path) -> Path: + return audit_dir(workspace) / "feed" + + +def pubkeys_dir(workspace: Path) -> Path: + return audit_dir(workspace) / "pubkeys" + + +def default_signing_key_path() -> Path: + return Path.home() / ".prismor" / "keys" / "audit-signer.key" + + +def default_signing_pubkey_path() -> Path: + return Path.home() / ".prismor" / "keys" / "audit-signer.pub" + + +# ── Canonicalization & hashing ────────────────────────────────────────────── + +def canonical_bytes(record: Dict[str, Any]) -> bytes: + """Deterministic JSON encoding used for hashing and signing. + + Sorted keys, no whitespace, UTF-8, ensure_ascii=False so unicode is + represented byte-for-byte the same regardless of platform. Excludes the + fields that depend on the hash itself (``record_hash`` and ``sig``). + """ + cleaned = {k: v for k, v in record.items() if k not in ("record_hash", "sig")} + return json.dumps( + cleaned, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + ).encode("utf-8") + + +def sha256_hex(data: bytes) -> str: + return hashlib.sha256(data).hexdigest() + + +def sha256_file(path: Path) -> Optional[str]: + if not path.exists(): + return None + h = hashlib.sha256() + with path.open("rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + h.update(chunk) + return h.hexdigest() + + +# ── Pinning policy & feed ─────────────────────────────────────────────────── + +def _pin_policy(workspace: Path, repo_root: Path) -> str: + """Compute a stable hash of the effective policy and snapshot the source + files into ``audit/policies//`` the first time we see this hash. + + The hash is over (default_policy_bytes || b'\\x00' || override_bytes). + """ + default_path = repo_root / "warden" / "default_policy.yaml" + if not default_path.exists(): + # Some installs ship default_policy.yaml inside the warden package. + alt = Path(__file__).parent / "default_policy.yaml" + if alt.exists(): + default_path = alt + + override_path = workspace / ".prismor-warden" / "policy.yaml" + + h = hashlib.sha256() + default_bytes = default_path.read_bytes() if default_path.exists() else b"" + override_bytes = override_path.read_bytes() if override_path.exists() else b"" + h.update(default_bytes) + h.update(b"\x00") + h.update(override_bytes) + digest = h.hexdigest() + + snap_dir = policies_dir(workspace) / digest + if not snap_dir.exists(): + snap_dir.mkdir(parents=True, exist_ok=True) + if default_bytes: + (snap_dir / "default_policy.yaml").write_bytes(default_bytes) + if override_bytes: + (snap_dir / "policy.yaml").write_bytes(override_bytes) + + return digest + + +def _pin_feed(workspace: Path, repo_root: Path) -> Optional[str]: + feed_path = repo_root / "advisories" / "immunity-feed.json" + if not feed_path.exists(): + return None + digest = sha256_file(feed_path) + if digest is None: + return None + snap = feed_dir(workspace) / f"{digest}.json" + if not snap.exists(): + snap.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(feed_path, snap) + return digest + + +# ── Event redaction ───────────────────────────────────────────────────────── + +def _redact_event(event: Dict[str, Any], include_raw: bool) -> Dict[str, Any]: + """Build the ``event`` block for an audit record. + + Always includes ``type``, ``ts``, ``agent_event``, ``tool``. Sensitive + string fields are hashed; raw values are only attached when + ``include_raw=True``. + """ + out: Dict[str, Any] = { + "type": event.get("type"), + "ts": event.get("ts"), + "agent_event": event.get("agent_event"), + "tool": event.get("tool"), + } + for fname in _SENSITIVE_FIELDS: + val = event.get(fname) + if val is None or val == "": + continue + if not isinstance(val, str): + val = str(val) + out[f"{fname}_hash"] = sha256_hex(val.encode("utf-8")) + out[f"{fname}_len"] = len(val) + if include_raw: + out[fname] = val + return {k: v for k, v in out.items() if v is not None} + + +def _redact_finding(finding: Dict[str, Any], include_raw: bool) -> Dict[str, Any]: + out: Dict[str, Any] = { + "id": finding.get("id"), + "rule_id": finding.get("ruleId"), + "severity": finding.get("severity"), + "category": finding.get("category"), + "title": finding.get("title"), + "action": finding.get("action"), + } + evidence = finding.get("evidence") + if evidence: + if not isinstance(evidence, str): + evidence = str(evidence) + out["evidence_hash"] = sha256_hex(evidence.encode("utf-8")) + out["evidence_len"] = len(evidence) + if include_raw: + out["evidence"] = evidence + return {k: v for k, v in out.items() if v is not None} + + +# ── Workspace ID ──────────────────────────────────────────────────────────── + +def _workspace_id(workspace: Path) -> str: + return sha256_hex(str(Path(workspace).resolve()).encode("utf-8"))[:16] + + +# ── Read chain head ───────────────────────────────────────────────────────── + +def _last_record(path: Path) -> Optional[Dict[str, Any]]: + if not path.exists(): + return None + last_line = "" + with path.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + last_line = line + if not last_line: + return None + try: + return json.loads(last_line) + except json.JSONDecodeError: + return None + + +# ── Signing config ────────────────────────────────────────────────────────── + +@dataclass +class SigningConfig: + private_key: Optional[Path] = None + public_key: Optional[Path] = None + key_id: Optional[str] = None + enabled: bool = False + + +def detect_signing() -> SigningConfig: + """Determine whether signing is enabled, based on env + default key path.""" + env_key = os.environ.get("WARDEN_AUDIT_SIGNING_KEY") + if env_key: + priv = Path(env_key).expanduser() + if priv.exists(): + pub = Path(os.environ.get("WARDEN_AUDIT_SIGNING_PUBKEY", + str(priv) + ".pub")).expanduser() + if pub.exists(): + try: + return SigningConfig( + private_key=priv, public_key=pub, + key_id=_key_id(pub), enabled=True, + ) + except Exception: + pass + + priv = default_signing_key_path() + pub = default_signing_pubkey_path() + if priv.exists() and pub.exists(): + try: + return SigningConfig( + private_key=priv, public_key=pub, + key_id=_key_id(pub), enabled=True, + ) + except Exception: + pass + return SigningConfig(enabled=False) + + +def register_pubkey(workspace: Path, pubkey_src: Path) -> str: + """Copy a public key into the workspace pubkey registry. + Returns its key_id. Verifiers walk this dir on `audit verify`.""" + pubkey_src = Path(pubkey_src).expanduser() + kid = _key_id(pubkey_src) + dst_dir = pubkeys_dir(workspace) + dst_dir.mkdir(parents=True, exist_ok=True) + dst = dst_dir / f"{kid}.pub" + if not dst.exists(): + shutil.copy2(pubkey_src, dst) + return kid + + +# ── Write a record ────────────────────────────────────────────────────────── + +def write_record( + *, + workspace: Path, + session_id: str, + agent: str, + mode: str, + event: Dict[str, Any], + decision: str, + findings: List[Dict[str, Any]], + repo_root: Path, + agent_version: str = "warden", + include_raw: bool = False, + signing: Optional[SigningConfig] = None, +) -> Dict[str, Any]: + """Append a chained, optionally-signed audit record. Returns the record.""" + audit_dir(workspace).mkdir(parents=True, exist_ok=True) + chain_path = session_path(workspace, session_id) + + # Self-register the public key in this workspace so a remote verifier + # cloning just the audit dir can validate signatures. + if signing and signing.enabled and signing.public_key: + try: + register_pubkey(workspace, signing.public_key) + except Exception: + pass + + prev = _last_record(chain_path) + seq = (prev.get("seq") + 1) if prev and isinstance(prev.get("seq"), int) else 0 + prev_hash = prev.get("record_hash") if prev else "GENESIS" + + policy_hash = _pin_policy(workspace, repo_root) + feed_hash = _pin_feed(workspace, repo_root) + + record: Dict[str, Any] = { + "v": RECORD_VERSION, + "alg": HASH_ALGO, + "seq": seq, + "ts": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "session_id": session_id, + "agent": agent, + "mode": mode, + "workspace_id": _workspace_id(workspace), + "event": _redact_event(event, include_raw), + "decision": decision, + "findings": [_redact_finding(f, include_raw) for f in (findings or [])], + "policy_hash": policy_hash, + "feed_hash": feed_hash, + "agent_version": agent_version, + "prev_hash": prev_hash, + } + + record_hash = sha256_hex(canonical_bytes(record)) + record["record_hash"] = record_hash + + if signing and signing.enabled and signing.private_key: + try: + sig = _sign(canonical_bytes(record), signing.private_key) + record["sig"] = { + "alg": "ed25519", + "key_id": signing.key_id, + "value": b64encode(sig), + } + except SigningUnavailable: + pass + except Exception: + pass + + line = json.dumps(record, ensure_ascii=False) + "\n" + with chain_path.open("a", encoding="utf-8") as f: + f.write(line) + + return record + + +# ── Read & verify ─────────────────────────────────────────────────────────── + +@dataclass +class VerifyResult: + ok: bool = True + records: int = 0 + chain_breaks: List[Tuple[int, str]] = field(default_factory=list) # (seq, reason) + bad_signatures: List[Tuple[int, str]] = field(default_factory=list) + missing_pubkeys: List[str] = field(default_factory=list) + unsigned: int = 0 + signed: int = 0 + + def summary(self) -> str: + parts = [f"{self.records} record(s)"] + if self.signed: + parts.append(f"{self.signed} signed") + if self.unsigned: + parts.append(f"{self.unsigned} unsigned") + if self.chain_breaks: + parts.append(f"{len(self.chain_breaks)} chain break(s)") + if self.bad_signatures: + parts.append(f"{len(self.bad_signatures)} bad signature(s)") + if self.missing_pubkeys: + parts.append(f"{len(self.missing_pubkeys)} missing pubkey(s)") + return ", ".join(parts) + + +def read_records(path: Path) -> Iterator[Dict[str, Any]]: + if not path.exists(): + return + with path.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + yield json.loads(line) + except json.JSONDecodeError: + continue + + +def _resolve_pubkey(workspace: Path, key_id_value: str) -> Optional[Path]: + """Look up a pubkey by key_id in the workspace registry, then fall back to + the default user-level signing pubkey.""" + candidate = pubkeys_dir(workspace) / f"{key_id_value}.pub" + if candidate.exists(): + return candidate + default_pub = default_signing_pubkey_path() + if default_pub.exists(): + try: + if _key_id(default_pub) == key_id_value: + return default_pub + except Exception: + pass + return None + + +def verify_chain(path: Path, workspace: Optional[Path] = None) -> VerifyResult: + """Walk the chain file, recompute hashes, validate ``prev_hash`` links, + and verify any present signatures. Returns a structured result.""" + result = VerifyResult() + workspace = workspace or path.parent.parent.parent # .../audit/ -> ws + expected_prev = "GENESIS" + expected_seq = 0 + pubkey_cache: Dict[str, Optional[Path]] = {} + + for record in read_records(path): + result.records += 1 + seq = record.get("seq") + prev_hash = record.get("prev_hash") + record_hash = record.get("record_hash") + + if seq != expected_seq: + result.ok = False + result.chain_breaks.append((expected_seq, f"seq mismatch: got {seq}")) + if prev_hash != expected_prev: + result.ok = False + result.chain_breaks.append((expected_seq, f"prev_hash mismatch (expected {expected_prev[:12]}…)")) + + # Recompute record_hash + recomputed = sha256_hex(canonical_bytes(record)) + if recomputed != record_hash: + result.ok = False + result.chain_breaks.append((expected_seq, "record_hash does not match canonical bytes")) + + # Verify signature if present + sig = record.get("sig") + if sig: + kid = sig.get("key_id", "") + if kid not in pubkey_cache: + pubkey_cache[kid] = _resolve_pubkey(workspace, kid) + pub = pubkey_cache[kid] + if pub is None: + if kid not in result.missing_pubkeys: + result.missing_pubkeys.append(kid) + result.bad_signatures.append((seq or expected_seq, f"no pubkey for key_id={kid}")) + result.ok = False + else: + try: + raw_sig = b64decode(sig.get("value", "")) + if not _verify(canonical_bytes(record), raw_sig, pub): + result.bad_signatures.append((seq or expected_seq, "bad signature")) + result.ok = False + else: + result.signed += 1 + except Exception as e: + result.bad_signatures.append((seq or expected_seq, f"verify error: {e}")) + result.ok = False + else: + result.unsigned += 1 + + expected_prev = record_hash or expected_prev + expected_seq += 1 + + return result + + +# ── Seal a session ────────────────────────────────────────────────────────── + +def seal_session( + *, + workspace: Path, + session_id: str, + signing: Optional[SigningConfig] = None, +) -> Optional[Dict[str, Any]]: + """Write a final manifest with the head hash + record count. + Optionally sign the manifest. Idempotent — repeated calls overwrite.""" + chain = session_path(workspace, session_id) + if not chain.exists(): + return None + + head: Optional[Dict[str, Any]] = None + n = 0 + for rec in read_records(chain): + head = rec + n += 1 + if head is None: + return None + + manifest: Dict[str, Any] = { + "v": RECORD_VERSION, + "session_id": session_id, + "workspace_id": _workspace_id(workspace), + "records": n, + "head_seq": head.get("seq"), + "head_hash": head.get("record_hash"), + "sealed_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + } + + if signing and signing.enabled and signing.private_key: + try: + sig = _sign(canonical_bytes(manifest), signing.private_key) + manifest["sig"] = { + "alg": "ed25519", + "key_id": signing.key_id, + "value": b64encode(sig), + } + except Exception: + pass + + seal = seal_path(workspace, session_id) + seal.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8") + return manifest + + +# ── List sessions ─────────────────────────────────────────────────────────── + +def list_session_files(workspace: Path) -> List[Path]: + d = audit_dir(workspace) + if not d.exists(): + return [] + return sorted([p for p in d.iterdir() if p.suffix == ".ndjson"]) diff --git a/warden/cli.py b/warden/cli.py index 3064f8e..42b899f 100644 --- a/warden/cli.py +++ b/warden/cli.py @@ -355,6 +355,10 @@ def main() -> None: raise SystemExit(1) return + # ── audit-log: tamper-evident decision log ────────────────────── + if args.command == "audit-log": + return _cmd_audit_log(args, workspace=workspace, repo_root=repo_root) + # ── audit: full security posture check ────────────────────────── if args.command == "audit": from warden.audit import run_audit, apply_fixes, AuditFinding @@ -674,6 +678,39 @@ def main() -> None: sys.stderr.write(f"[warden] sink dispatch error: {_sink_exc}\n") blocking = should_block(current_findings, event, block_categories=set(result.get("blockCategories", []))) + + # Derive a single decision label (allow/observe/block) for the audit log. + if blocking is not None and args.mode == "enforce": + _decision = "block" + elif blocking is not None and args.mode == "observe": + _decision = "observe" + else: + _decision = "allow" + + # ── Tamper-evident audit log ────────────────────────────────── + # Append one chained, optionally-signed record per decision. + # Best-effort: never block the hook on audit failures. + try: + from warden.audit_log import write_record as _audit_write, detect_signing as _detect_signing + _signing = _detect_signing() + _audit_cfg = getattr(_current_engine, "audit_settings", {}) or {} + _include_raw = bool(_audit_cfg.get("include_raw", False)) + _audit_write( + workspace=workspace, + session_id=normalized["sessionId"], + agent=args.agent, + mode=args.mode, + event=event, + decision=_decision, + findings=current_findings, + repo_root=repo_root, + agent_version=f"warden {__version__}", + include_raw=_include_raw, + signing=_signing, + ) + except Exception as _audit_exc: + sys.stderr.write(f"[warden] audit-log error: {_audit_exc}\n") + if args.mode == "enforce" and blocking is not None: if args.agent == "copilot": # Copilot CLI reads permissionDecision from stdout; exit 2 is ignored. @@ -1111,6 +1148,194 @@ def _need_passphrase(confirm: bool = False) -> str: raise SystemExit(f"Unsupported command: {args.command}") +# ── audit-log command handlers ───────────────────────────────────────────── + +def _cmd_audit_log(args, *, workspace: Path, repo_root: Path) -> None: + from warden import audit_log as al + from warden.signing import keygen, key_id + + sub = getattr(args, "audit_log_command", None) + + if sub == "keygen": + out_dir = Path(args.out_dir).expanduser() if args.out_dir else (Path.home() / ".prismor" / "keys") + priv = out_dir / "audit-signer.key" + pub = out_dir / "audit-signer.pub" + if priv.exists() and not args.force: + sys.stderr.write(f"Refusing to overwrite existing key {priv} — use --force to replace.\n") + raise SystemExit(1) + keygen(priv, pub) + kid = key_id(pub) + print(f" {_color('Wrote private key', _GREEN)} {priv} (mode 0600)") + print(f" {_color('Wrote public key', _GREEN)} {pub}") + print(f" {_color('key_id', _BOLD)} {kid}") + print() + print(" Audit records will now be Ed25519-signed automatically.") + print(f" Distribute the public key so verifiers can validate signatures:") + print(f" cat {pub}") + return + + if sub == "pubkey": + path = Path(args.key).expanduser() if args.key else al.default_signing_pubkey_path() + if not path.exists(): + sys.stderr.write(f"No public key found at {path}\n") + sys.stderr.write(f"Run `warden audit-log keygen` to create one.\n") + raise SystemExit(1) + sys.stdout.write(path.read_text()) + sys.stderr.write(f"\nkey_id: {key_id(path)}\n") + return + + if sub == "list": + files = al.list_session_files(workspace) + if not files: + print(" No audit logs in this workspace.") + return + for f in files: + session_id = f.stem + n = sum(1 for _ in al.read_records(f)) + sealed = al.seal_path(workspace, session_id).exists() + seal_marker = _color("sealed", _GREEN) if sealed else _color("open", _DIM) + print(f" {session_id} {n} records [{seal_marker}]") + return + + if sub == "show": + path = al.session_path(workspace, args.session_id) + if not path.exists(): + sys.stderr.write(f"No audit log for session {args.session_id} at {path}\n") + raise SystemExit(1) + records = list(al.read_records(path)) + if args.limit: + records = records[-args.limit:] + if args.json: + for r in records: + print(json.dumps(r)) + return + print() + print(f" {_color('AUDIT LOG', _BOLD)} session={args.session_id} ({len(records)} of {sum(1 for _ in al.read_records(path))} records)") + print(f" {_color('─' * 58, _DIM)}") + for r in records: + decision = r.get("decision", "?") + color = _GREEN if decision == "allow" else (_YELLOW if decision == "observe" else _RED) + sig_marker = _color("✓ signed", _DIM) if "sig" in r else _color("unsigned", _DIM) + ev = r.get("event", {}) + ev_summary = ev.get("agent_event") or ev.get("type") or "?" + print(f" seq={r.get('seq'):>3} {_color(decision.upper(), color):<14} {ev_summary:<20} {sig_marker} {r.get('record_hash','')[:12]}") + if r.get("findings"): + for f in r["findings"]: + print(f" {_color(f.get('severity','?'), _RED)} {f.get('rule_id')}: {f.get('title')}") + print() + return + + if sub == "verify": + sessions: List[Path] = [] + if args.session_id: + p = al.session_path(workspace, args.session_id) + if not p.exists(): + sys.stderr.write(f"No audit log for session {args.session_id}\n") + raise SystemExit(1) + sessions.append(p) + else: + sessions = al.list_session_files(workspace) + if not sessions: + print(" No audit logs to verify.") + return + + results: List[Dict[str, Any]] = [] + all_ok = True + for path in sessions: + r = al.verify_chain(path, workspace=workspace) + all_ok = all_ok and r.ok + results.append({ + "session_id": path.stem, + "ok": r.ok, + "records": r.records, + "signed": r.signed, + "unsigned": r.unsigned, + "chain_breaks": r.chain_breaks, + "bad_signatures": r.bad_signatures, + "missing_pubkeys": r.missing_pubkeys, + }) + + if args.json: + print(json.dumps({"ok": all_ok, "sessions": results}, indent=2)) + return + + print() + print(f" {_color('AUDIT VERIFY', _BOLD)}") + print(f" {_color('─' * 58, _DIM)}") + for entry in results: + status = _color("PASS", _GREEN) if entry["ok"] else _color("FAIL", _RED) + print(f" [{status}] {entry['session_id']} — {entry['records']} record(s), {entry['signed']} signed, {entry['unsigned']} unsigned") + for seq, reason in entry["chain_breaks"]: + print(f" {_color('chain', _RED)} seq={seq}: {reason}") + for seq, reason in entry["bad_signatures"]: + print(f" {_color('sig', _RED)} seq={seq}: {reason}") + for kid in entry["missing_pubkeys"]: + print(f" {_color('pubkey', _YELLOW)} key_id={kid} not registered") + print() + if not all_ok: + raise SystemExit(2) + return + + if sub == "seal": + signing = al.detect_signing() + manifest = al.seal_session(workspace=workspace, session_id=args.session_id, signing=signing) + if manifest is None: + sys.stderr.write(f"No audit log to seal for session {args.session_id}\n") + raise SystemExit(1) + print(f" {_color('Sealed', _GREEN)} {args.session_id}") + print(f" records: {manifest['records']}") + print(f" head_hash: {manifest['head_hash']}") + if "sig" in manifest: + print(f" signed: {_color('yes', _GREEN)} key_id={manifest['sig']['key_id']}") + else: + print(f" signed: {_color('no', _DIM)}") + return + + if sub == "register-pubkey": + kid = al.register_pubkey(workspace, Path(args.pubkey)) + print(f" Registered key_id={kid} in {al.pubkeys_dir(workspace)}") + return + + if sub == "replay": + path = al.session_path(workspace, args.session_id) + if not path.exists(): + sys.stderr.write(f"No audit log for session {args.session_id}\n") + raise SystemExit(1) + records = list(al.read_records(path)) + if not records: + print(" Empty session.") + return + # Group by policy_hash; warn if drift detected + hashes = sorted({r.get("policy_hash") for r in records if r.get("policy_hash")}) + if args.json: + print(json.dumps({ + "session_id": args.session_id, + "records": len(records), + "policy_hashes": hashes, + "decisions": [{"seq": r.get("seq"), "decision": r.get("decision"), + "rule_ids": [f.get("rule_id") for f in r.get("findings", [])]} + for r in records], + }, indent=2)) + return + print() + print(f" {_color('REPLAY', _BOLD)} session={args.session_id}") + print(f" records: {len(records)}") + print(f" policy_hashes encountered: {len(hashes)}") + for h in hashes: + snap = al.policies_dir(workspace) / h + mark = _color("pinned", _GREEN) if snap.exists() else _color("missing", _RED) + print(f" {h[:16]}… [{mark}]") + n_block = sum(1 for r in records if r.get("decision") == "block") + n_obs = sum(1 for r in records if r.get("decision") == "observe") + n_allow = sum(1 for r in records if r.get("decision") == "allow") + print(f" decisions: {_color(str(n_block)+' block', _RED)}, {_color(str(n_obs)+' observe', _YELLOW)}, {_color(str(n_allow)+' allow', _GREEN)}") + print() + return + + sys.stderr.write("Usage: warden audit-log {keygen,pubkey,list,show,verify,seal,register-pubkey,replay}\n") + raise SystemExit(1) + + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Prismor Warden — local session-security utility for AI coding agents.", @@ -1339,6 +1564,49 @@ def build_parser() -> argparse.ArgumentParser: learn_parser.add_argument("--candidates", action="store_true", help="List pending candidate rules") + # ── audit-log ───────────────────────────────────────────────────── + audit_log_parser = subparsers.add_parser( + "audit-log", + help="Tamper-evident, signed audit log of decisions (allow/observe/block)", + ) + audit_log_sub = audit_log_parser.add_subparsers(dest="audit_log_command") + + al_keygen = audit_log_sub.add_parser("keygen", help="Generate an Ed25519 signing keypair") + al_keygen.add_argument("--out-dir", help="Where to write keys (default: ~/.prismor/keys)") + al_keygen.add_argument("--force", action="store_true", help="Overwrite existing keys") + + al_pubkey = audit_log_sub.add_parser("pubkey", help="Print the audit-signer public key") + al_pubkey.add_argument("--key", help="Path to public key file") + + al_list = audit_log_sub.add_parser("list", help="List sessions with audit logs") + al_list.add_argument("--workspace", help="Workspace path") + + al_show = audit_log_sub.add_parser("show", help="Show the audit log for a session") + al_show.add_argument("session_id", help="Session ID") + al_show.add_argument("--workspace", help="Workspace path") + al_show.add_argument("--json", action="store_true", help="Output raw JSON lines") + al_show.add_argument("--limit", type=int, default=20, help="Max records (default: 20)") + + al_verify = audit_log_sub.add_parser("verify", help="Verify chain integrity & signatures") + al_verify.add_argument("--session-id", help="Specific session (default: all sessions)") + al_verify.add_argument("--workspace", help="Workspace path") + al_verify.add_argument("--json", action="store_true", help="Output raw JSON") + + al_seal = audit_log_sub.add_parser("seal", help="Seal a session: write a signed manifest of the head hash") + al_seal.add_argument("session_id", help="Session ID") + al_seal.add_argument("--workspace", help="Workspace path") + + al_register = audit_log_sub.add_parser("register-pubkey", + help="Register a verifier public key in this workspace") + al_register.add_argument("pubkey", help="Path to a public key PEM file") + al_register.add_argument("--workspace", help="Workspace path") + + al_replay = audit_log_sub.add_parser("replay", + help="Re-evaluate session events against the pinned policy and assert decisions match") + al_replay.add_argument("session_id", help="Session ID") + al_replay.add_argument("--workspace", help="Workspace path") + al_replay.add_argument("--json", action="store_true", help="Output raw JSON") + return parser diff --git a/warden/policy_engine.py b/warden/policy_engine.py index 67acc0a..590de57 100644 --- a/warden/policy_engine.py +++ b/warden/policy_engine.py @@ -115,6 +115,7 @@ def __init__( self._manifest_re: Optional[re.Pattern[str]] = None self.egress_allowlist: List[str] = [] self.outputs: List[Dict[str, Any]] = [] + self.audit_settings: Dict[str, Any] = {} self._load(workspace, policy_path) def _load(self, workspace: Optional[Path], policy_path: Optional[Path]) -> None: @@ -179,6 +180,10 @@ def _load(self, workspace: Optional[Path], policy_path: Optional[Path]) -> None: self.egress_allowlist = list(settings.get("egress_allowlist", []) or []) + audit_settings = settings.get("audit") or {} + if isinstance(audit_settings, dict): + self.audit_settings = audit_settings + # Compile rules. for rule_data in rules_by_id.values(): if rule_data.get("enabled", True): diff --git a/warden/signing.py b/warden/signing.py new file mode 100644 index 0000000..eddeaf9 --- /dev/null +++ b/warden/signing.py @@ -0,0 +1,230 @@ +"""Ed25519 signing utilities for Warden audit records. + +Two backends are supported: + + 1. ``cryptography`` library (preferred — pure-python, deterministic, no shellout). + 2. ``openssl pkeyutl`` (fallback — same approach as ``pipeline/sign_feed.sh``). + +Public API: + + keygen(private_path, public_path) -> None + sign(message: bytes, private_key_path: Path) -> bytes # raw signature + verify(message: bytes, signature: bytes, public_key_path: Path) -> bool + key_id(public_key_path: Path) -> str # short fingerprint + +All functions are best-effort: if neither backend is available, ``sign`` raises +``SigningUnavailable`` and the caller falls back to chain-only audit records. +""" +from __future__ import annotations + +import base64 +import hashlib +import os +import shutil +import subprocess +import tempfile +from pathlib import Path +from typing import Optional + + +class SigningUnavailable(RuntimeError): + """Raised when no signing backend is available.""" + + +# ── Backend selection ─────────────────────────────────────────────────────── + +def _have_cryptography() -> bool: + try: + import cryptography.hazmat.primitives.asymmetric.ed25519 # noqa: F401 + return True + except ImportError: + return False + + +def _have_openssl() -> bool: + return shutil.which("openssl") is not None + + +# ── Key generation ────────────────────────────────────────────────────────── + +def keygen(private_path: Path, public_path: Path) -> None: + """Generate a new Ed25519 keypair, writing PEM files to the given paths. + + Private key file is created with mode 0600, parent dir 0700. + """ + private_path = Path(private_path).expanduser() + public_path = Path(public_path).expanduser() + private_path.parent.mkdir(parents=True, exist_ok=True, mode=0o700) + try: + os.chmod(private_path.parent, 0o700) + except OSError: + pass + + if _have_cryptography(): + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + from cryptography.hazmat.primitives import serialization + + key = Ed25519PrivateKey.generate() + priv_pem = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + pub_pem = key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + private_path.write_bytes(priv_pem) + public_path.write_bytes(pub_pem) + elif _have_openssl(): + subprocess.run( + ["openssl", "genpkey", "-algorithm", "Ed25519", "-out", str(private_path)], + check=True, + capture_output=True, + ) + subprocess.run( + ["openssl", "pkey", "-in", str(private_path), "-pubout", "-out", str(public_path)], + check=True, + capture_output=True, + ) + else: + raise SigningUnavailable("no Ed25519 backend (install 'cryptography' or 'openssl')") + + os.chmod(private_path, 0o600) + os.chmod(public_path, 0o644) + + +# ── Sign ──────────────────────────────────────────────────────────────────── + +def sign(message: bytes, private_key_path: Path) -> bytes: + """Return a raw Ed25519 signature over ``message``.""" + private_key_path = Path(private_key_path).expanduser() + if not private_key_path.exists(): + raise SigningUnavailable(f"private key not found: {private_key_path}") + + if _have_cryptography(): + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + + priv_pem = private_key_path.read_bytes() + key = serialization.load_pem_private_key(priv_pem, password=None) + if not isinstance(key, Ed25519PrivateKey): + raise SigningUnavailable("private key is not Ed25519") + return key.sign(message) + + if _have_openssl(): + with tempfile.NamedTemporaryFile(delete=False) as msg_f: + msg_f.write(message) + msg_path = msg_f.name + sig_path = msg_path + ".sig" + try: + subprocess.run( + [ + "openssl", "pkeyutl", "-sign", + "-inkey", str(private_key_path), + "-rawin", "-in", msg_path, + "-out", sig_path, + ], + check=True, capture_output=True, + ) + return Path(sig_path).read_bytes() + finally: + for p in (msg_path, sig_path): + try: + os.unlink(p) + except OSError: + pass + + raise SigningUnavailable("no Ed25519 backend available") + + +# ── Verify ────────────────────────────────────────────────────────────────── + +def verify(message: bytes, signature: bytes, public_key_path: Path) -> bool: + """Verify an Ed25519 signature; return True iff valid.""" + public_key_path = Path(public_key_path).expanduser() + if not public_key_path.exists(): + return False + + if _have_cryptography(): + from cryptography.exceptions import InvalidSignature + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + + pub_pem = public_key_path.read_bytes() + try: + key = serialization.load_pem_public_key(pub_pem) + if not isinstance(key, Ed25519PublicKey): + return False + key.verify(signature, message) + return True + except (InvalidSignature, ValueError): + return False + + if _have_openssl(): + with tempfile.NamedTemporaryFile(delete=False) as msg_f: + msg_f.write(message) + msg_path = msg_f.name + with tempfile.NamedTemporaryFile(delete=False) as sig_f: + sig_f.write(signature) + sig_path = sig_f.name + try: + result = subprocess.run( + [ + "openssl", "pkeyutl", "-verify", + "-pubin", "-inkey", str(public_key_path), + "-rawin", "-in", msg_path, + "-sigfile", sig_path, + ], + capture_output=True, + ) + return result.returncode == 0 + finally: + for p in (msg_path, sig_path): + try: + os.unlink(p) + except OSError: + pass + + return False + + +# ── Key ID ────────────────────────────────────────────────────────────────── + +def key_id(public_key_path: Path) -> str: + """Return a short stable fingerprint (first 16 hex chars of SHA-256 of the + raw public key bytes). Used as ``key_id`` field on records so verifiers + know which pubkey to use when multiple are in play.""" + public_key_path = Path(public_key_path).expanduser() + pub_pem = public_key_path.read_bytes() + + raw_bytes: Optional[bytes] = None + if _have_cryptography(): + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + + try: + key = serialization.load_pem_public_key(pub_pem) + if isinstance(key, Ed25519PublicKey): + raw_bytes = key.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + except ValueError: + raw_bytes = None + + if raw_bytes is None: + # Fallback: hash the PEM. Less canonical but still stable per file. + raw_bytes = pub_pem + + return hashlib.sha256(raw_bytes).hexdigest()[:16] + + +# ── Helpers for callers ───────────────────────────────────────────────────── + +def b64encode(data: bytes) -> str: + return base64.b64encode(data).decode("ascii") + + +def b64decode(data: str) -> bytes: + return base64.b64decode(data.encode("ascii"))