|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +2028 G-SIFI Pilot — Acceptance-Gate Checklist (runnable). |
| 4 | +
|
| 5 | +Operationalizes section 14 ("2028 G-SIFI pilot deployment") of |
| 6 | +governance_blueprint/DECADAL_STRATEGIC_TECHNICAL_PLAN_2026_2035.md. |
| 7 | +
|
| 8 | +Each of the six monthly pilot gates is either: |
| 9 | + * AUTOMATED - verifiable now against in-repo artifacts (feasibility Tier A). |
| 10 | + The script actually runs the check and reports PASS/FAIL. |
| 11 | + * MANUAL - depends on real hardware / vendor accounts / a supervisor |
| 12 | + (Tier B). The script prints the precise acceptance criterion |
| 13 | + and the evidence the pilot team must capture; it does not fake |
| 14 | + a pass. |
| 15 | +
|
| 16 | +Exit code is non-zero ONLY if an AUTOMATED gate fails. MANUAL gates never fail |
| 17 | +the run (they are reported as PENDING-EVIDENCE), because faking them would |
| 18 | +violate the program's integrity discipline. |
| 19 | +
|
| 20 | +Usage: |
| 21 | + python3 governance_artifacts/pilot/run_pilot_acceptance_gates.py |
| 22 | + python3 .../run_pilot_acceptance_gates.py --json # machine-readable |
| 23 | +""" |
| 24 | +from __future__ import annotations |
| 25 | + |
| 26 | +import argparse |
| 27 | +import json |
| 28 | +import os |
| 29 | +import subprocess |
| 30 | +import sys |
| 31 | +from dataclasses import dataclass, field |
| 32 | +from pathlib import Path |
| 33 | + |
| 34 | +ROOT = Path(__file__).resolve().parents[2] |
| 35 | +GA = ROOT / "governance_artifacts" |
| 36 | + |
| 37 | +# ANSI (suppressed when not a tty) |
| 38 | +_TTY = sys.stdout.isatty() |
| 39 | +GREEN = "\033[32m" if _TTY else "" |
| 40 | +RED = "\033[31m" if _TTY else "" |
| 41 | +YEL = "\033[33m" if _TTY else "" |
| 42 | +DIM = "\033[2m" if _TTY else "" |
| 43 | +RST = "\033[0m" if _TTY else "" |
| 44 | + |
| 45 | + |
| 46 | +@dataclass |
| 47 | +class GateResult: |
| 48 | + month: int |
| 49 | + gate_id: str |
| 50 | + title: str |
| 51 | + kind: str # "automated" | "manual" |
| 52 | + status: str # "PASS" | "FAIL" | "PENDING-EVIDENCE" |
| 53 | + detail: str |
| 54 | + criterion: str |
| 55 | + evidence: list[str] = field(default_factory=list) |
| 56 | + |
| 57 | + |
| 58 | +def _run(cmd: list[str], cwd: Path | None = None, timeout: int = 240) -> tuple[int, str]: |
| 59 | + """Run a command, return (rc, combined_output).""" |
| 60 | + try: |
| 61 | + p = subprocess.run( |
| 62 | + cmd, |
| 63 | + cwd=str(cwd) if cwd else None, |
| 64 | + capture_output=True, |
| 65 | + text=True, |
| 66 | + timeout=timeout, |
| 67 | + ) |
| 68 | + return p.returncode, (p.stdout or "") + (p.stderr or "") |
| 69 | + except FileNotFoundError: |
| 70 | + return 127, f"command not found: {cmd[0]}" |
| 71 | + except subprocess.TimeoutExpired: |
| 72 | + return 124, f"timeout after {timeout}s: {' '.join(cmd)}" |
| 73 | + |
| 74 | + |
| 75 | +# --------------------------------------------------------------------------- |
| 76 | +# AUTOMATED gate checks (Tier A) — each returns (ok: bool, detail: str) |
| 77 | +# --------------------------------------------------------------------------- |
| 78 | +def check_terraform_validate() -> tuple[bool, str]: |
| 79 | + tf = ROOT / "governance_blueprint" / "terraform" |
| 80 | + rc, out = _run(["terraform", "init", "-backend=false", "-input=false", "-no-color"], cwd=tf) |
| 81 | + if rc != 0: |
| 82 | + return False, f"terraform init failed: {out.strip().splitlines()[-1] if out.strip() else rc}" |
| 83 | + rc, out = _run(["terraform", "validate", "-no-color"], cwd=tf) |
| 84 | + ok = rc == 0 and ("Success" in out or "valid" in out.lower()) |
| 85 | + return ok, out.strip().splitlines()[-1] if out.strip() else f"rc={rc}" |
| 86 | + |
| 87 | + |
| 88 | +def check_opa_gates() -> tuple[bool, str]: |
| 89 | + rc, out = _run(["opa", "test", str(GA / "rego")]) |
| 90 | + line = next((l for l in out.splitlines() if l.startswith("PASS:") or l.startswith("FAIL")), out.strip()[-80:]) |
| 91 | + return rc == 0, line.strip() |
| 92 | + |
| 93 | + |
| 94 | +def check_worm_tamper() -> tuple[bool, str]: |
| 95 | + rc, out = _run(["python3", str(GA / "kafka" / "pqc_worm_logger_v2.py")]) |
| 96 | + ok = rc == 0 and "tampering detected" in out |
| 97 | + return ok, "ML-DSA-65 sign+chain verify; tampering detected" if ok else out.strip()[-120:] |
| 98 | + |
| 99 | + |
| 100 | +def check_zk_relayer() -> tuple[bool, str]: |
| 101 | + rc, out = _run(["bash", "run_relayer_pipeline.sh"], cwd=GA / "zk", timeout=300) |
| 102 | + ok = rc == 0 and "relayer pipeline complete" in out |
| 103 | + line = next((l.strip() for l in out.splitlines() if "compiles" in l), "") |
| 104 | + return ok, line or (out.strip()[-120:]) |
| 105 | + |
| 106 | + |
| 107 | +def check_containment_tlc() -> tuple[bool, str]: |
| 108 | + jar = GA / "tla" / "tools" / "tla2tools.jar" |
| 109 | + rc, out = _run( |
| 110 | + ["java", "-cp", str(jar), "tlc2.TLC", |
| 111 | + "-config", str(GA / "tla" / "SentinelContainmentProtocol.cfg"), |
| 112 | + str(GA / "tla" / "SentinelContainmentProtocol.tla")], |
| 113 | + timeout=300, |
| 114 | + ) |
| 115 | + ok = "No error has been found" in out |
| 116 | + states = next((l.strip() for l in out.splitlines() if "distinct states found" in l), "") |
| 117 | + return ok, ("ratchet invariants hold; " + states) if ok else out.strip()[-120:] |
| 118 | + |
| 119 | + |
| 120 | +def check_full_assurance() -> tuple[bool, str]: |
| 121 | + rc, out = _run(["bash", str(GA / "run_runnable_assurance.sh")], timeout=400) |
| 122 | + ok = rc == 0 and "ALL RUNNABLE ASSURANCE CHECKS PASSED" in out |
| 123 | + npass = sum(1 for l in out.splitlines() if "PASS" in l and "ASSURANCE" not in l) |
| 124 | + return ok, f"{npass} checks PASS" if ok else out.strip()[-160:] |
| 125 | + |
| 126 | + |
| 127 | +# --------------------------------------------------------------------------- |
| 128 | +# Gate catalog — mirrors the §14 month-by-month pilot table. |
| 129 | +# --------------------------------------------------------------------------- |
| 130 | +def build_gates() -> list[GateResult]: |
| 131 | + gates: list[GateResult] = [] |
| 132 | + |
| 133 | + # Month 1 — enclave substrate + attestation + OPA decision service |
| 134 | + ok, detail = check_terraform_validate() |
| 135 | + gates.append(GateResult( |
| 136 | + 1, "P1-IAC", "Enclave substrate IaC validates in pilot account", |
| 137 | + "automated", "PASS" if ok else "FAIL", detail, |
| 138 | + criterion="`terraform validate` clean for the multi-region confidential-enclave module", |
| 139 | + )) |
| 140 | + gates.append(GateResult( |
| 141 | + 1, "P1-ATTEST", "First PCR_MATCH=TRUE admission on real hardware", |
| 142 | + "manual", "PENDING-EVIDENCE", |
| 143 | + "Tier B: requires TDX/SEV-SNP hardware + AMD/Intel attestation roots.", |
| 144 | + criterion="A T0 workload is admitted only after a fresh, signature-valid attestation with PCR_MATCH=TRUE", |
| 145 | + evidence=["attestation verifier log showing PCR_MATCH=TRUE", |
| 146 | + "golden measurement registry entry used for the admission"], |
| 147 | + )) |
| 148 | + |
| 149 | + # Month 2 — use-cases behind gates + StaR-MoE |
| 150 | + ok, detail = check_opa_gates() |
| 151 | + gates.append(GateResult( |
| 152 | + 2, "P2-OPA", "T1 decisions routed through OPA release/credit/fairness gates", |
| 153 | + "automated", "PASS" if ok else "FAIL", detail, |
| 154 | + criterion="OPA policy suite green; 100% of T1 decisions evaluated by a default-deny gate", |
| 155 | + )) |
| 156 | + gates.append(GateResult( |
| 157 | + 2, "P2-MOE", "StaR-MoE routing drift index <= 0.1", |
| 158 | + "manual", "PENDING-EVIDENCE", |
| 159 | + "Tier B: requires the pilot's live MoE model + production traffic.", |
| 160 | + criterion="MoE routing drift index <= 0.1 over the pilot window (SARA+ACR enabled)", |
| 161 | + evidence=["StaR-MoE telemetry export showing drift_index timeseries <= 0.1"], |
| 162 | + )) |
| 163 | + |
| 164 | + # Month 3 — 24h monitor + G-SRI + PQC WORM |
| 165 | + ok, detail = check_worm_tamper() |
| 166 | + gates.append(GateResult( |
| 167 | + 3, "P3-WORM", "PQC WORM audit integrity 100% (tamper detected)", |
| 168 | + "automated", "PASS" if ok else "FAIL", detail, |
| 169 | + criterion="ML-DSA-65 signatures + hash chain verify; any tamper is detected", |
| 170 | + )) |
| 171 | + gates.append(GateResult( |
| 172 | + 3, "P3-GSRI", "24h monitor + G-SRI emitting to production Kafka/S3 Object Lock", |
| 173 | + "manual", "PENDING-EVIDENCE", |
| 174 | + "Tier B: requires production Kafka + S3 Object Lock (COMPLIANCE) bucket.", |
| 175 | + criterion="G-SRI checkpoints written every interval; WORM batches retained under Object Lock", |
| 176 | + evidence=["S3 Object Lock retention config (COMPLIANCE mode)", |
| 177 | + "24h monitor checkpoint log with G-SRI + PCR_MATCH"], |
| 178 | + )) |
| 179 | + |
| 180 | + # Month 4 — containment dry-runs (Red-Dawn) + dead-man's switch |
| 181 | + ok, detail = check_containment_tlc() |
| 182 | + gates.append(GateResult( |
| 183 | + 4, "P4-CONTAIN", "Containment ratchet behaves per TLA+ model", |
| 184 | + "automated", "PASS" if ok else "FAIL", detail, |
| 185 | + criterion="SentinelContainmentProtocol TLC: TrippedStaysTripped + KillSwitchIntegrity hold", |
| 186 | + )) |
| 187 | + gates.append(GateResult( |
| 188 | + 4, "P4-MTTC", "Critical-breach MTTC <= 60s in Red-Dawn simulation", |
| 189 | + "manual", "PENDING-EVIDENCE", |
| 190 | + "Tier B: requires a staged live containment exercise (GAI-SOC).", |
| 191 | + criterion="Measured mean-time-to-containment <= 60s across Red-Dawn scenarios", |
| 192 | + evidence=["Red-Dawn exercise report with per-scenario MTTC measurements"], |
| 193 | + )) |
| 194 | + |
| 195 | + # Month 5 — zk systemic-risk proof via relayer + OSCAL dossier |
| 196 | + ok, detail = check_zk_relayer() |
| 197 | + gates.append(GateResult( |
| 198 | + 5, "P5-ZK", "zk systemic-risk proof -> on-chain verifier (relayer)", |
| 199 | + "automated", "PASS" if ok else "FAIL", detail, |
| 200 | + criterion="Groth16 proof exported to a Solidity verifier that compiles; calldata produced", |
| 201 | + )) |
| 202 | + gates.append(GateResult( |
| 203 | + 5, "P5-DOSSIER", "OSCAL Annex IV dossier >= 98% auto-assembled", |
| 204 | + "manual", "PENDING-EVIDENCE", |
| 205 | + "Tier B: requires the institution's live control evidence feeds.", |
| 206 | + criterion=">= 98% of the Annex IV dossier assembled automatically from OSCAL + WORM evidence", |
| 207 | + evidence=["dossier-assembly report with manual-fraction <= 2%"], |
| 208 | + )) |
| 209 | + |
| 210 | + # Month 6 — supervisor read-only + reproducible assurance (go-decision) |
| 211 | + ok, detail = check_full_assurance() |
| 212 | + gates.append(GateResult( |
| 213 | + 6, "P6-REPRO", "Independent reproduction of the assurance suite (11/11)", |
| 214 | + "automated", "PASS" if ok else "FAIL", detail, |
| 215 | + criterion="`run_runnable_assurance.sh` reproduces green in the pilot environment", |
| 216 | + )) |
| 217 | + gates.append(GateResult( |
| 218 | + 6, "P6-SUPERVISOR", "Supervisor signs off on evidence reproducibility", |
| 219 | + "manual", "PENDING-EVIDENCE", |
| 220 | + "Requires a participating supervisor (observer role).", |
| 221 | + criterion="Supervisor confirms dashboards + GIEN events + proofs are independently reproducible", |
| 222 | + evidence=["signed supervisor sign-off memo", "supervisor dashboard access audit record"], |
| 223 | + )) |
| 224 | + |
| 225 | + return gates |
| 226 | + |
| 227 | + |
| 228 | +def main() -> int: |
| 229 | + ap = argparse.ArgumentParser(description="2028 G-SIFI pilot acceptance-gate checklist") |
| 230 | + ap.add_argument("--json", action="store_true", help="emit machine-readable JSON") |
| 231 | + args = ap.parse_args() |
| 232 | + |
| 233 | + print("=" * 70) |
| 234 | + print(" 2028 G-SIFI Pilot — Acceptance-Gate Checklist") |
| 235 | + print(" (automated gates verified now; manual/Tier-B gates report criteria)") |
| 236 | + print("=" * 70) |
| 237 | + |
| 238 | + gates = build_gates() |
| 239 | + |
| 240 | + if args.json: |
| 241 | + print(json.dumps([g.__dict__ for g in gates], indent=2)) |
| 242 | + |
| 243 | + automated_fail = 0 |
| 244 | + by_month: dict[int, list[GateResult]] = {} |
| 245 | + for g in gates: |
| 246 | + by_month.setdefault(g.month, []).append(g) |
| 247 | + |
| 248 | + for month in sorted(by_month): |
| 249 | + print(f"\nMonth {month}") |
| 250 | + for g in by_month[month]: |
| 251 | + if g.status == "PASS": |
| 252 | + badge = f"{GREEN}PASS{RST}" |
| 253 | + elif g.status == "FAIL": |
| 254 | + badge = f"{RED}FAIL{RST}" |
| 255 | + automated_fail += 1 |
| 256 | + else: |
| 257 | + badge = f"{YEL}MANUAL{RST}" |
| 258 | + print(f" [{badge}] {g.gate_id:<13} {g.title}") |
| 259 | + print(f" {DIM}criterion:{RST} {g.criterion}") |
| 260 | + if g.detail: |
| 261 | + print(f" {DIM}detail :{RST} {g.detail}") |
| 262 | + if g.kind == "manual" and g.evidence: |
| 263 | + print(f" {DIM}evidence :{RST} " + "; ".join(g.evidence)) |
| 264 | + |
| 265 | + n_auto = sum(1 for g in gates if g.kind == "automated") |
| 266 | + n_auto_pass = sum(1 for g in gates if g.kind == "automated" and g.status == "PASS") |
| 267 | + n_manual = sum(1 for g in gates if g.kind == "manual") |
| 268 | + |
| 269 | + print("\n" + "=" * 70) |
| 270 | + print(f" Automated gates: {n_auto_pass}/{n_auto} PASS | " |
| 271 | + f"Manual/Tier-B gates pending evidence: {n_manual}") |
| 272 | + if automated_fail == 0: |
| 273 | + print(f" {GREEN}ALL AUTOMATED PILOT GATES PASS{RST} — " |
| 274 | + f"go-decision blocked only on {n_manual} manual/Tier-B evidence items.") |
| 275 | + else: |
| 276 | + print(f" {RED}{automated_fail} AUTOMATED PILOT GATE(S) FAILED{RST} — fix before pilot go-decision.") |
| 277 | + print("=" * 70) |
| 278 | + return 1 if automated_fail else 0 |
| 279 | + |
| 280 | + |
| 281 | +if __name__ == "__main__": |
| 282 | + raise SystemExit(main()) |
0 commit comments