diff --git a/Makefile b/Makefile index aba1401..3de0920 100644 --- a/Makefile +++ b/Makefile @@ -49,12 +49,12 @@ lint-gsifi-governance: npx --yes markdownlint-cli@0.39.0 --config docs/reports/.markdownlint.json docs/reports/GSIFI_AGI_ASI_GOVERNANCE_BLUEPRINT_2026_2030.md docs/reports/GSIFI_GOVERNANCE_ARTIFACTS_RUNBOOK.md check-gsifi-governance: validate-gsifi-governance validate-gsifi-governance-module test-gsifi-governance lint-gsifi-governance -.PHONY: governance-test governance-validate governance-validate-json governance-validate-json-check governance-check +.PHONY: governance-test governance-reports-validate governance-validate-json governance-validate-json-check governance-check governance-test: python3 -m unittest discover tool_tests -governance-validate: +governance-reports-validate: python3 tools/validate_governance_reports.py governance-validate-json: @@ -64,7 +64,7 @@ governance-validate-json-check: python3 tools/validate_governance_reports.py --json > /tmp/governance_validation.json python3 -c 'import json; p=json.load(open("/tmp/governance_validation.json", "r", encoding="utf-8")); assert p.get("status")=="passed", f"Validator JSON status not passed: {p}"; print("Validator JSON status is passed.")' -governance-check: governance-test governance-validate governance-validate-json-check +governance-check: governance-test governance-reports-validate governance-validate-json-check .PHONY: governance-setup governance-deps-check governance-lint governance-validate governance-artifact-inventory governance-policy-test governance-validator-test governance-evidence-manifest governance-evidence-verify governance-evidence-schema governance-report governance-report-schema governance-check-generated governance-setup: @@ -140,8 +140,7 @@ gov-dashboard-check: $(PYTHON) governance_blueprint/validation/validate_dashboard_links.py gov-selftest: - $(PYTHON) governance_blueprint/validation/selftest_validate_artifacts.py - $(PYTHON) governance_blueprint/validation/selftest_run_validation_suite.py + $(PYTHON) -m unittest discover governance_blueprint/validation -p 'selftest_*.py' gov-suite: $(PYTHON) governance_blueprint/validation/run_validation_suite.py diff --git a/REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md b/REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md new file mode 100644 index 0000000..6303af8 --- /dev/null +++ b/REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md @@ -0,0 +1,193 @@ +Regulator-Ready 2026–2030 Enterprise and Civilizational AGI/ASI Governance, Architecture, Safety, and Implementation Blueprint + +This report provides an implementation-ready blueprint for Fortune 500, Global 2000, and G‑SIFI institutions and supervisors from 2026 to 2030. It unifies enterprise AI governance, regulatory compliance engineering, high-assurance platform architecture, AGI/ASI safety and containment, civilizational compute governance, and financial-services model risk controls. It includes dependency-aware rollout planning, machine-readable governance artifacts (JSON/YAML/Rego), and regulator-ready sections for boards, C-suites, architects, platform engineers, and AI safety teams. + + + +## 1) Scope, Audience, and Design Principles +- **Audience:** Board risk committees, C-suites, regulators, model risk teams, enterprise architects, AI platform engineers, and AI safety researchers. +- **Institutional scope:** Multi-jurisdiction enterprises spanning US/EU/UK/APAC with prudential and conduct exposure. +- **Design principles:** legality-by-design, safety-by-design, controls-as-code, evidence-by-default, and independent challenge for all high-impact AI. + +## 2) Integrated Regulatory Compliance Framework Mapping and Implementation + +### 2.1 Control ontology and traceability model +Adopt a canonical enterprise control model (`AIGOV-*`) with immutable trace links: +1. legal/supervisory source, +2. control objective, +3. implementation control, +4. test procedure, +5. evidence artifact, +6. accountable owner. + +### 2.2 Framework crosswalk (required coverage) +- **EU AI Act + Annex IV:** risk classification, provider/deployer obligations, conformity pathways, technical documentation and post-market monitoring. +- **NIST AI RMF 1.0:** Govern/Map/Measure/Manage aligned to risk lifecycle and operating KPIs/KRIs. +- **NIST AI 600-1:** secure/trustworthy AI engineering controls, adversarial robustness, and resilience. +- **ISO/IEC 42001:** AI management system (AIMS), audit cycle, continual improvement. +- **OECD AI Principles:** transparency, robustness, accountability, and human-centered outcomes. +- **GDPR Article 22:** safeguards for significant automated decisions (human review, contestability, meaningful information). +- **FCRA/ECOA:** adverse action reasoning and anti-discrimination controls in credit decisions. +- **Basel III/IV + SR 11-7:** model risk governance, prudential oversight, overlays, and board reporting. +- **NIS2:** cyber resilience, AI dependency security, incident reporting and supply-chain control. +- **FCA Consumer Duty + SMCR:** customer outcomes governance and explicit senior-manager accountability. +- **MAS/HKMA FEAT:** fairness, ethics, accountability, and transparency control packs for APAC. + +### 2.3 Compliance implementation pattern (enterprise) +- **Policy layer:** legal interpretation + control text + jurisdiction overlays. +- **Enforcement layer:** OPA/Rego admission and runtime policies. +- **Evidence layer:** Kafka event streams + WORM retention + legal hold. +- **Assurance layer:** independent validation, 2LOD challenge, 3LOD audit, external assurance. +- **Regulatory layer:** jurisdiction-ready supervisory evidence packs and notification workflows. + +## 3) Institutional-Grade Governance Platform Technical Architecture + +### 3.1 Capability domains +- **Sentinel AI Governance Platform v2.4** (policy registry, tiering, approvals, exceptions, evidence graph). +- **WorkflowAI Pro** (HITL orchestration, approvals, overrides, and accountability trails). +- **EAIP** (model gateway, policy mediation, secure tool-use brokering, and failover routing). +- **High-assurance RAG** (source provenance, trust scoring, citation constraints, and retrieval-integrity checks). + +### 3.2 Control stack specification +- **Kubernetes/Kafka/OPA:** policy admission, runtime guardrails, immutable telemetry. +- **Docker Swarm hardening:** mTLS everywhere, signed-image-only deployment, scoped secrets, node attestation. +- **Node.js/Python governance sidecars:** mandatory evidence envelope for every inference/action. +- **Next.js explainability UX:** rationale views, recourse process, policy provenance and model card surfacing. +- **Terraform/CI/CD governance automation:** policy test gates, SoD approvals, provenance attestations, rollback controls. + +### 3.3 Hyperparameter and drift standards +- **Parameter governance:** approved envelope per model tier; material-change classification. +- **Drift standards:** data/concept/behavior/policy drift metrics with mandatory response triggers. +- **Model update protocol:** major updates require revalidation + compliance sign-off before promotion. + +## 4) AGI/ASI Safety, Containment, and Crisis Simulation Blueprint + +### 4.1 Safety framework integration +- **Luminous Engine Codex:** safety claims catalog and evidentiary burden framework. +- **Cognitive Resonance Protocol:** coherence/deception stress testing and emergent behavior diagnostics. +- **Sentinel / Omni-Sentinel:** enterprise monitoring and emergency intervention plane. + +### 4.2 Containment architecture for frontier systems +- isolated AGI containment labs, +- hardened egress and tool controls, +- dual-key authorization for external effects, +- autonomous behavior tripwires, +- immediate kill/quarantine pathways. + +### 4.3 Frontier risk taxonomy +- misuse acceleration, +- cyber offense amplification, +- financial market manipulation, +- institutional deception/persuasion, +- recursive capability escalation. + +### 4.4 Crisis simulation standard +- quarterly tabletop and semiannual live simulation, +- regulator-observer scenarios for Tier 4/5, +- mean-time-to-containment and incident quality KPIs, +- postmortem evidence and control remediation SLAs. + +## 5) Civilizational-Scale AI and Compute Governance Mechanisms + +### 5.1 Global governance construct +- **International Compute Governance Consortium (ICGC)** +- **Global Compute Registry** +- **Treaty-aligned systemic governance forum** + +### 5.2 Mechanism registry +- **GACRA, GASO, GFMCF, GAICS, GAIVS, GACP, GATI, GACMO, FTEWS, GAI-SOC, GAIGA, GACRLS, GFCO, GAID, GASCF** + +### 5.3 Enterprise obligations +- register above-threshold compute, +- disclose severe incidents and near misses, +- participate in cross-border simulations, +- maintain schema interoperability for audit and crisis coordination. + +## 6) Financial Services-Specific Model Risk and Governance + +### 6.1 Credit and lending +- adverse action explainability, +- protected-group fairness monitoring, +- recourse and manual escalation controls. + +### 6.2 Trading and market support +- no fully autonomous high-impact execution, +- stress/reverse-stress controls, +- real-time supervisory kill-switch authority. + +### 6.3 Enterprise risk and fiduciary advisors +- suitability and fiduciary constraints, +- systemic spillover pre-checks, +- liquidity and contagion scenario gates. + +### 6.4 SR 11-7 lifecycle integration +inventory -> tiering -> validation -> challenge -> production monitoring -> periodic revalidation -> retirement. + +## 7) 2026–2030 Dependency-Aware Implementation Roadmap + +### Phase A (2026): Baseline controls and legal-compliance anchoring +Dependencies: inventory + tiering + policy baseline + evidence stream bootstrap. + +### Phase B (2027): Automation and operating scale +Dependencies: standardized sidecar telemetry + release gates + multi-jurisdiction packs. + +### Phase C (2028): Frontier assurance and resilience +Dependencies: containment lab maturity + crisis simulations + external assurance. + +### Phase D (2029): Systemic-risk integration +Dependencies: compute registry linkage + mechanism interoperability + systemic exercises. + +### Phase E (2030): Adaptive governance and treaty-compatible operations +Dependencies: dynamic control tuning + supervisory data exchange maturity + continuous assurance. + +## 8) Regulator-Ready Report Sections by Stakeholder +
+- risk appetite posture, +- concentration exposure, +- unresolved exceptions, +- investment and capability roadmap. +
+ +
+- accountability model, +- operational KRIs/KPIs, +- cross-border compliance heatmap, +- strategic deployment constraints. +
+ +
+- control mapping and legal traceability, +- test evidence and exceptions, +- incidents/remediation, +- forward risk treatment plan. +
+ +
+- reference architecture, +- system boundaries and trust zones, +- dependency and resilience design, +- control integration points. +
+ +
+- runtime enforcement policies, +- release gate definitions, +- observability/evidence contracts, +- rollback and incident hooks. +
+ +
+- capability evaluations, +- containment efficacy, +- deceptive-behavior and misuse testing, +- residual risk and open research queue. +
+ +## 9) Machine-Readable Governance Artifacts +- `governance_blueprint/compliance_profile_2026.json` +- `governance_blueprint/civilizational_compute_governance_framework.yaml` +- `governance_blueprint/opa/systemic_risk_guardrails.rego` +- `governance_blueprint/annex_iv_technical_documentation_template.json` +- `governance_blueprint/rollout_plan_2026_2030.yaml` + +
diff --git a/governance_blueprint/annex_iv_technical_documentation_template.json b/governance_blueprint/annex_iv_technical_documentation_template.json new file mode 100644 index 0000000..7425979 --- /dev/null +++ b/governance_blueprint/annex_iv_technical_documentation_template.json @@ -0,0 +1,30 @@ +{ + "template_id": "eu-ai-act-annex-iv-tech-doc-v1", + "version": "1.0.0", + "sections": [ + {"id": "A", "name": "General system description", "required": true}, + {"id": "B", "name": "Design and development specifications", "required": true}, + {"id": "C", "name": "Data requirements and governance", "required": true}, + {"id": "D", "name": "Risk management system", "required": true}, + {"id": "E", "name": "Post-market monitoring", "required": true}, + {"id": "F", "name": "Human oversight measures", "required": true}, + {"id": "G", "name": "Performance and limitations", "required": true}, + {"id": "H", "name": "Cybersecurity and resilience", "required": true} + ], + "metadata": { + "provider": "", + "deployer": "", + "model_id": "", + "model_version": "", + "intended_purpose": "", + "risk_classification": "", + "jurisdictions": [], + "responsible_executive": "" + }, + "evidence_links": { + "validation_report": "", + "data_lineage_record": "", + "monitoring_dashboard": "", + "incident_playbook": "" + } +} diff --git a/governance_blueprint/artifact_manifest.json b/governance_blueprint/artifact_manifest.json index 4b68145..4958e25 100644 --- a/governance_blueprint/artifact_manifest.json +++ b/governance_blueprint/artifact_manifest.json @@ -1,18 +1,27 @@ { "package": "enterprise_agi_asi_governance_blueprint", - "version": "1.3.1", - "generated_utc": "2026-04-27T06:11:04Z", + "version": "1.4.5", + "generated_utc": "2026-04-28T02:47:09Z", "artifacts": { "control_mapping_matrix.csv": "8af4170e62e6aec3c12f3f554d29fe31e6c59c196cd9b3e1590f1238597ce228", "evidence_event_schema.json": "7c84f8fce1cefeff08308a2763c086eb4ede05881881cd53c484e879df04196a", - "opa/release_gate.rego": "bd117bddd2c77a0fd5cc4741aa6805b6f1f711d2baa5732ca037ea4db7b60c43", + "compliance_profile_2026.json": "aa3468812b58f05095d6d96e7def2262c90c142dadf13bf91bc8423a85ae345f", + "annex_iv_technical_documentation_template.json": "08c791484963dd46e0cbc0e76358229813816f66d050df4e9783e73ded7e787e", + "civilizational_compute_governance_framework.yaml": "15a2b94042bcd6f79643be6289febbef3b697f29424e842b76ee8944027d9d27", "roadmap_2026_2030.yaml": "35132b486b360d91ceab94e7949278c755a28dbab0cccf64e0b3a776d7dab485", - "validation/validate_artifacts.py": "0908bb44ecf2b209861fb3fe0259bad2b652d94b1f6c50c45592b074f52848e0", - "validation/selftest_validate_artifacts.py": "50414aa4ecf39166268d76ab0363ad2ec9ac32cde6b27ae5c631764fd7bce29b", - "validation/generate_artifact_manifest.py": "654479289df4a57ab58288adcbb5c9e23861f3b3a6e4d524b8214bb8c992d060", - "validation/run_validation_suite.py": "4c7038c4d3da1d6fb3f4c43bddd5b2237856b90bd568a17d03a1d16cfc904781", - "validation/selftest_run_validation_suite.py": "2f987933769c0530eaa7ad51a0454781e8bd90bb700c120219dae5a96645adbe", + "rollout_plan_2026_2030.yaml": "2d735de1f810f23828f9798154ac5dfe50460b4e583909ea8b677dfeafb26061", + "opa/release_gate.rego": "3a8b5e3a4c90e78bfd5f9dee1f4ca4927d198238aa18679e4a78aa94623d453c", + "opa/systemic_risk_guardrails.rego": "5eb9d5f7061aa0f03194d505c8eb3347cbac00138ff3ce28ec1b71bee5382ab7", + "validation/validate_artifacts.py": "a82ba842ada8a22d3d8cd37553b4c71691ec2da32f6add3c18a7baa9b0cbc1a7", + "validation/generate_artifact_manifest.py": "528970f9f6e35a0c50fd97c0551cc9230b2c7ce967f7b590a2dea2821d19c41c", + "validation/run_validation_suite.py": "2e00f22a83e572424b07ba9f6984394c8b99d2317fb40134fd2dd97d6708a2b6", "validation/lint_python_sources.py": "52b36b1427679624fd9778dc93cb7b318b4c882930e78c0947a37d5185dafae9", - "validation/validate_dashboard_links.py": "e854e2c61ac6e31f880fce8e28c6ed95856d13a85fdfdbcf124df74925b1461a" + "validation/validate_dashboard_links.py": "e854e2c61ac6e31f880fce8e28c6ed95856d13a85fdfdbcf124df74925b1461a", + "validation/selftest_generate_artifact_manifest.py": "381af02a7b337e11af7df7992012736a5ec9a37b1009c8aa3e918ad589baa8d2", + "validation/selftest_run_validation_suite.py": "697fdd88db942deb2a4d4f5cb17cabd5c36ce4278e7c6e70c9059c97fa1f47c1", + "validation/selftest_validate_artifacts.py": "84e95dfe25db9586c1806fda0fba1f4e8bb10b6c02360a224b12cddb7d82c06c" + }, + "external_artifacts": { + "REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md": "b590161a765704a9d320dcfa1fae2f8285bc816fc56cf25062e11c3f27bcdbee" } } diff --git a/governance_blueprint/civilizational_compute_governance_framework.yaml b/governance_blueprint/civilizational_compute_governance_framework.yaml new file mode 100644 index 0000000..53dc2f9 --- /dev/null +++ b/governance_blueprint/civilizational_compute_governance_framework.yaml @@ -0,0 +1,58 @@ +framework_id: civilizational-compute-governance-2030 +version: 1.0.0 +updated_at: 2026-04-27T00:00:00Z +institutions: + - International Compute Governance Consortium (ICGC) + - Global Compute Registry + - Treaty-Aligned Systemic Risk Governance Forum +mechanisms: + GACRA: + name: Global AI Compute Registration Authority + purpose: Register frontier compute assets and cross-border usage declarations. + GASO: + name: Global AI Safety Observatory + purpose: Consolidate safety incidents and near misses. + GFMCF: + name: Global Frontier Model Certification Framework + purpose: Maintain safety certification baseline for frontier systems. + GAICS: + name: Global AI Incident Coordination System + purpose: Coordinate transnational containment and response. + GAIVS: + name: Global AI Verification Standard + purpose: Define verifiable claims, evaluations, and evidence schemas. + GACP: + name: Global Algorithmic Containment Protocol + purpose: Standardize emergency containment actions. + GATI: + name: Global AI Treaty Interface + purpose: Technical interoperability layer for treaty reporting. + GACMO: + name: Global AI Capacity Monitoring Office + purpose: Monitor concentration and capacity risk in advanced compute supply. + FTEWS: + name: Frontier Threat Early Warning System + purpose: Generate cross-sector alerts for escalatory AI threats. + GAI-SOC: + name: Global AI Security Operations Center + purpose: Operate shared detection and response telemetry for severe events. + GAIGA: + name: Global AI Governance Interoperability Gateway + purpose: Bridge institution-level governance data with regulator systems. + GACRLS: + name: Global AI Compute Resource Licensing System + purpose: License high-threshold compute access by risk class. + GFCO: + name: Global Frontier Compute Observatory + purpose: Assess trends in frontier training and inference capacity. + GAID: + name: Global AI Disclosure Standard + purpose: Normalize disclosures for capability, risk, and incidents. + GASCF: + name: Global AI Systemic Containment Facility + purpose: Federated contingency coordination for systemic AI crises. +enterprise_obligations: + - Register above-threshold compute clusters and material upgrades. + - Provide machine-readable incident disclosures within jurisdictional deadlines. + - Participate in annual cross-border crisis simulations. + - Maintain compatibility with shared verification and disclosure schemas. diff --git a/governance_blueprint/compliance_profile_2026.json b/governance_blueprint/compliance_profile_2026.json new file mode 100644 index 0000000..0c3f169 --- /dev/null +++ b/governance_blueprint/compliance_profile_2026.json @@ -0,0 +1,102 @@ +{ + "profile_id": "gsifi-agi-asi-governance-2026", + "version": "2.0.0", + "generated_at": "2026-04-27T00:00:00Z", + "scope": [ + "Fortune500", + "Global2000", + "G-SIFI" + ], + "control_model": { + "id_prefix": "AIGOV", + "traceability_dimensions": [ + "regulation", + "control_objective", + "implementation", + "test_procedure", + "evidence", + "owner" + ] + }, + "framework_mappings": [ + { + "control_id": "AIGOV-001", + "control_family": "Governance and accountability", + "frameworks": ["EU_AI_Act", "NIST_AI_RMF_1_0_GOVERN", "ISO_IEC_42001", "SMCR"], + "owner": "CRO", + "implementation": ["board_ai_risk_charter", "executive_ai_governance_council", "raci_registry"], + "evidence": ["board_minutes", "risk_appetite_statement", "committee_decisions"] + }, + { + "control_id": "AIGOV-005", + "control_family": "EU AI Act Annex IV technical documentation", + "frameworks": ["EU_AI_Act", "EU_AI_Act_Annex_IV", "ISO_IEC_42001"], + "owner": "CAIO", + "implementation": ["annex_iv_builder", "model_card_pipeline", "post_market_monitoring_registry"], + "evidence": ["technical_file", "model_card", "dataset_statement", "monitoring_plan"] + }, + { + "control_id": "AIGOV-010", + "control_family": "Risk mapping and tiering", + "frameworks": ["NIST_AI_RMF_1_0_MAP", "OECD_AI_Principles", "Basel_III_IV"], + "owner": "Enterprise_Risk", + "implementation": ["use_case_classification", "risk_tier_service", "materiality_thresholds"], + "evidence": ["tier_register", "classification_rationale", "committee_approvals"] + }, + { + "control_id": "AIGOV-014", + "control_family": "Automated decision safeguards", + "frameworks": ["GDPR_Art_22", "FCRA", "ECOA", "FCA_Consumer_Duty", "MAS_HKMA_FEAT"], + "owner": "Compliance", + "implementation": ["human_review_gate", "reason_code_service", "contestability_workflow"], + "evidence": ["decision_logs", "adverse_action_outputs", "recourse_ticket_history"] + }, + { + "control_id": "AIGOV-021", + "control_family": "Cyber and resilience", + "frameworks": ["NIS2", "NIST_AI_600_1", "NIST_AI_RMF_1_0_MANAGE"], + "owner": "CISO", + "implementation": ["supply_chain_assurance", "runtime_hardening", "incident_response_playbooks"], + "evidence": ["security_assessment", "incident_timeline", "resilience_test_reports"] + }, + { + "control_id": "AIGOV-027", + "control_family": "Model risk lifecycle", + "frameworks": ["SR_11_7", "Basel_III_IV", "NIST_AI_600_1"], + "owner": "Model_Risk_Management", + "implementation": ["independent_validation", "challenger_model_suite", "drift_monitoring"], + "evidence": ["validation_reports", "monitoring_dashboards", "override_audit_trails"] + }, + { + "control_id": "AIGOV-033", + "control_family": "APAC FEAT implementation", + "frameworks": ["MAS_HKMA_FEAT", "OECD_AI_Principles", "ISO_IEC_42001"], + "owner": "Regional_Compliance_APAC", + "implementation": ["feat_testing_pack", "ethics_review_board", "transparency_notices"], + "evidence": ["feat_assessment", "regional_attestation", "customer_outcome_reviews"] + }, + { + "control_id": "AIGOV-040", + "control_family": "Explainability and customer outcomes", + "frameworks": ["FCA_Consumer_Duty", "FCRA", "GDPR_Art_22", "NIST_AI_RMF_1_0_MEASURE"], + "owner": "Customer_Office", + "implementation": ["explainability_service", "plain_language_rendering", "customer_appeals_workflow"], + "evidence": ["explanation_samples", "sla_metrics", "complaints_analytics"] + }, + { + "control_id": "AIGOV-052", + "control_family": "Frontier containment and systemic governance", + "frameworks": ["NIST_AI_600_1", "OECD_AI_Principles", "SR_11_7"], + "owner": "AI_Safety_Officer", + "implementation": ["frontier_lab_controls", "systemic_risk_guardrails", "crisis_simulation_program"], + "evidence": ["safety_case", "simulation_results", "containment_audit"] + } + ], + "implementation_strategy": { + "phase_2026": "Baseline controls and legal harmonization", + "phase_2027": "Automation scale-out and CI/CD control hardening", + "phase_2028": "Frontier assurance and containment maturity", + "phase_2029": "Systemic interoperability and compute governance integration", + "phase_2030": "Adaptive governance and treaty-aligned operations" + } +} diff --git a/governance_blueprint/opa/release_gate.rego b/governance_blueprint/opa/release_gate.rego index b3cd500..2168e3e 100644 --- a/governance_blueprint/opa/release_gate.rego +++ b/governance_blueprint/opa/release_gate.rego @@ -4,20 +4,20 @@ package aigov.release default allow = false # Baseline requirements for all models. -baseline_requirements { +baseline_requirements if { input.model_card_exists input.security_scan_passed input.policy_bundle_hash_approved } # Low/medium risk release path. -allow { +allow if { input.risk_tier <= 2 baseline_requirements } # High-risk release path. -allow { +allow if { input.risk_tier >= 3 baseline_requirements input.independent_validation_approved @@ -27,7 +27,7 @@ allow { } # Additional controls for frontier/special risk systems. -allow { +allow if { input.risk_tier == 4 baseline_requirements input.independent_validation_approved diff --git a/governance_blueprint/opa/systemic_risk_guardrails.rego b/governance_blueprint/opa/systemic_risk_guardrails.rego new file mode 100644 index 0000000..06dc584 --- /dev/null +++ b/governance_blueprint/opa/systemic_risk_guardrails.rego @@ -0,0 +1,59 @@ +package aigov.systemic + +default allow = false + +allow if { + input.risk_tier <= 2 + input.validation.approved + input.monitoring.enabled +} + +allow if { + input.risk_tier == 3 + input.validation.approved + input.monitoring.enabled + input.explainability.enabled + input.change_control.release_approved +} + +allow if { + input.risk_tier >= 4 + input.validation.approved + input.safety_case.approved + input.containment.lab_certified + input.crisis_simulation.last_run_days <= 180 + input.compute_registry.registered + input.systemic_risk_committee.signoff + input.jurisdictional_pack.complete + input.high_assurance_rag.provenance_enforced +} + +deny contains msg if { + input.risk_tier >= 4 + not input.safety_case.approved + msg := "Frontier deployment blocked: safety case approval missing" +} + +deny contains msg if { + input.risk_tier >= 4 + not input.compute_registry.registered + msg := "Frontier deployment blocked: compute registry declaration missing" +} + +deny contains msg if { + input.risk_tier >= 4 + input.crisis_simulation.last_run_days > 180 + msg := "Frontier deployment blocked: crisis simulation stale" +} + +deny contains msg if { + input.risk_tier >= 4 + not input.jurisdictional_pack.complete + msg := "Frontier deployment blocked: jurisdictional compliance pack incomplete" +} + +deny contains msg if { + input.risk_tier >= 4 + not input.high_assurance_rag.provenance_enforced + msg := "Frontier deployment blocked: high-assurance RAG provenance control missing" +} diff --git a/governance_blueprint/rollout_plan_2026_2030.yaml b/governance_blueprint/rollout_plan_2026_2030.yaml new file mode 100644 index 0000000..75f45ca --- /dev/null +++ b/governance_blueprint/rollout_plan_2026_2030.yaml @@ -0,0 +1,64 @@ +program: agi-asi-governance-blueprint +version: 1.0.0 +phases: + - name: Phase A - 2026 Foundation + outcomes: + - Enterprise inventory and risk tiering baseline complete + - Core legal mapping and policy-as-code baseline active + - Immutable evidence pipeline online + dependencies: + - enterprise_model_inventory + - control_taxonomy + - ci_cd_policy_gates + exit_criteria: + - 100_percent_material_ai_in_inventory + - tier3_plus_validation_required + - board_risk_appetite_approved + - name: Phase B - 2027 Scale + outcomes: + - Workflow automation and explainability roll-out + - Multi-jurisdiction control packs deployed + - Vendor/GPAI assurance program integrated + dependencies: + - governance_sidecar_adoption + - regional_compliance_templates + - supplier_assurance_controls + exit_criteria: + - 90_percent_controls_continuously_monitored + - regulator_evidence_pack_sla_under_72h + - name: Phase C - 2028 Frontier Assurance + outcomes: + - Frontier containment labs operational + - Crisis simulation cadence institutionalized + - External assurance integrated into release governance + dependencies: + - frontier_evaluation_framework + - containment_lab_controls + - external_red_team_program + exit_criteria: + - tier4_tier5_safety_case_mandatory + - simulation_kpis_in_green_band + - name: Phase D - 2029 Systemic Governance Integration + outcomes: + - Compute registry interoperability in production + - Civilizational mechanism reporting pathways active + - Cross-border incident exercises executed + dependencies: + - compute_registration_integration + - gaiga_data_exchange_adapter + - incident_coordination_protocols + exit_criteria: + - severe_incident_disclosure_workflow_tested + - systemic_risk_dashboard_live + - name: Phase E - 2030 Adaptive Operations + outcomes: + - Adaptive governance with controlled policy tuning + - Treaty-compatible supervisory exchange readiness + - Mature enterprise-to-global risk signaling loop + dependencies: + - policy_feedback_loop + - regulator_data_exchange_maturity + - advanced_model_behavior_monitoring + exit_criteria: + - annual_independent_assurance_pass + - sustained_reduction_in_high_severity_incidents diff --git a/governance_blueprint/validation/README.md b/governance_blueprint/validation/README.md index 4211ac2..e2375a6 100644 --- a/governance_blueprint/validation/README.md +++ b/governance_blueprint/validation/README.md @@ -15,8 +15,7 @@ python3 governance_blueprint/validation/validate_artifacts.py --json Run validator self-tests (stdlib `unittest`): ```bash -python3 governance_blueprint/validation/selftest_validate_artifacts.py -python3 governance_blueprint/validation/selftest_run_validation_suite.py +python3 -m unittest discover governance_blueprint/validation -p 'selftest_*.py' ``` Run full suite (manifest check + validator + lint + dashboard check + self-tests): @@ -37,6 +36,17 @@ Quiet mode (less log noise in local scripts): python3 governance_blueprint/validation/run_validation_suite.py --quiet ``` + +Optional explicit OPA binary pinning (recommended in CI if OPA is available): + +```bash +python3 governance_blueprint/validation/run_validation_suite.py --opa-bin /path/to/opa +python3 governance_blueprint/validation/validate_artifacts.py --opa-bin /path/to/opa +# Enforce OPA presence (fail fast if unavailable) +python3 governance_blueprint/validation/run_validation_suite.py --require-opa --opa-bin /path/to/opa +python3 governance_blueprint/validation/validate_artifacts.py --require-opa --opa-bin /path/to/opa +``` + Lint validation Python sources: ```bash @@ -70,9 +80,11 @@ python3 governance_blueprint/validation/generate_artifact_manifest.py --check What the validator checks: - Required headers and non-empty values in `control_mapping_matrix.csv`. - Required top-level fields and property definitions in `evidence_event_schema.json`. -- Structural expectations in `opa/release_gate.rego` (baseline block + tiered `allow` rules). -- Required roadmap tokens and indentation sanity in `roadmap_2026_2030.yaml`. -- SHA-256 integrity verification using `artifact_manifest.json`. +- Structural expectations in `opa/release_gate.rego` and `opa/systemic_risk_guardrails.rego`, plus optional OPA parse checks when `opa` is installed (or when `OPA_BIN` is set). +- Required schema/shape checks for `compliance_profile_2026.json` and `annex_iv_technical_documentation_template.json`. +- Required roadmap tokens and indentation sanity in `roadmap_2026_2030.yaml` plus phased checks in `rollout_plan_2026_2030.yaml`. +- Structural coverage check for `REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md` (`/<abstract>/<content>` and required section anchors). +- Manifest schema checks (`package`, semver `version`, UTC `generated_utc`, artifacts maps) and SHA-256 integrity verification across governance + root-level external report artifacts. - Python syntax compile checks across `governance_blueprint/validation/*.py`. - Dashboard navigation link checks between whitepaper and blueprint pages. @@ -122,6 +134,7 @@ Exit code conventions (run_validation_suite.py): - `0`: all checks passed. - Any other non-zero code: propagated from an invoked check command (for example manifest/check/selftest failure codes). - `3`: validator JSON output was malformed when `--json-report` was requested. +- `4`: no selftests were discovered while selftests were required (i.e., without `--skip-selftest`). `make gov-suite-ci` runs the suite in quiet report mode, matching the CI workflow command line. diff --git a/governance_blueprint/validation/generate_artifact_manifest.py b/governance_blueprint/validation/generate_artifact_manifest.py index 4de8864..2d51576 100644 --- a/governance_blueprint/validation/generate_artifact_manifest.py +++ b/governance_blueprint/validation/generate_artifact_manifest.py @@ -6,31 +6,89 @@ import argparse import hashlib import json +import re +import subprocess from datetime import datetime, timezone from pathlib import Path ROOT = Path(__file__).resolve().parents[2] ARTIFACTS = ROOT / "governance_blueprint" MANIFEST_PATH = ARTIFACTS / "artifact_manifest.json" -DEFAULT_FILES = [ +BASE_DEFAULT_FILES = [ "control_mapping_matrix.csv", "evidence_event_schema.json", - "opa/release_gate.rego", + "compliance_profile_2026.json", + "annex_iv_technical_documentation_template.json", + "civilizational_compute_governance_framework.yaml", "roadmap_2026_2030.yaml", + "rollout_plan_2026_2030.yaml", + "opa/release_gate.rego", + "opa/systemic_risk_guardrails.rego", "validation/validate_artifacts.py", - "validation/selftest_validate_artifacts.py", "validation/generate_artifact_manifest.py", "validation/run_validation_suite.py", - "validation/selftest_run_validation_suite.py", "validation/lint_python_sources.py", "validation/validate_dashboard_links.py", ] +EXTERNAL_FILES = [ + "REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md", +] +UTC_TS_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$") def sha256_of(path: Path) -> str: return hashlib.sha256(path.read_bytes()).hexdigest() +def _safe_join(root: Path, rel: str) -> Path: + if rel.startswith("/") or ".." in Path(rel).parts: + raise ValueError(f"Disallowed manifest path entry: {rel}") + resolved = (root / rel).resolve() + try: + resolved.relative_to(root.resolve()) + except ValueError as exc: + raise ValueError(f"Manifest path escapes root: {rel}") from exc + return resolved + + +def _default_files() -> list[str]: + try: + git = subprocess.run( + ["git", "ls-files", "governance_blueprint/validation/selftest_*.py"], + cwd=ROOT, + capture_output=True, + text=True, + timeout=10, + ) + if git.returncode == 0: + selftests = [] + for line in git.stdout.splitlines(): + candidate = line.strip() + if ( + not candidate.startswith("governance_blueprint/validation/selftest_") + or not candidate.endswith(".py") + or ".." in Path(candidate).parts + ): + continue + try: + rel = Path(candidate).relative_to("governance_blueprint") + except ValueError: + continue + selftests.append(str(rel)) + else: + selftests = [ + str(path.relative_to(ARTIFACTS)) + for path in (ARTIFACTS / "validation").glob("selftest_*.py") + ] + except (OSError, subprocess.SubprocessError): + selftests = [ + str(path.relative_to(ARTIFACTS)) + for path in (ARTIFACTS / "validation").glob("selftest_*.py") + ] + selftests = sorted(selftests) + return list(dict.fromkeys([*BASE_DEFAULT_FILES, *selftests])) + + def _existing_generated_utc() -> str | None: if not MANIFEST_PATH.exists(): return None @@ -39,15 +97,22 @@ def _existing_generated_utc() -> str | None: except json.JSONDecodeError: return None value = current.get("generated_utc") - return value if isinstance(value, str) and value else None + if not isinstance(value, str) or not UTC_TS_RE.match(value): + return None + return value def build_manifest(*, preserve_timestamp: bool = True) -> dict: artifacts: dict[str, str] = {} - for rel in DEFAULT_FILES: - p = ARTIFACTS / rel + for rel in _default_files(): + p = _safe_join(ARTIFACTS, rel) artifacts[rel] = sha256_of(p) + external_artifacts: dict[str, str] = {} + for rel in EXTERNAL_FILES: + p = _safe_join(ROOT, rel) + external_artifacts[rel] = sha256_of(p) + generated_utc = _existing_generated_utc() if preserve_timestamp else None if not generated_utc: generated_utc = ( @@ -59,9 +124,10 @@ def build_manifest(*, preserve_timestamp: bool = True) -> dict: return { "package": "enterprise_agi_asi_governance_blueprint", - "version": "1.3.1", + "version": "1.4.5", "generated_utc": generated_utc, "artifacts": artifacts, + "external_artifacts": external_artifacts, } @@ -79,11 +145,13 @@ def main() -> int: if not MANIFEST_PATH.exists(): print("artifact_manifest.json is missing") return 1 - current_obj = json.loads(MANIFEST_PATH.read_text(encoding="utf-8")) + try: + current_obj = json.loads(MANIFEST_PATH.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + print(f"artifact_manifest.json is invalid JSON: {exc}") + return 1 expected_obj = build_manifest(preserve_timestamp=True) - current_artifacts = current_obj.get("artifacts", {}) - expected_artifacts = expected_obj.get("artifacts", {}) - if current_artifacts != expected_artifacts: + if current_obj != expected_obj: print("artifact_manifest.json is out of date; run generate_artifact_manifest.py") return 1 print("artifact_manifest.json is up to date") diff --git a/governance_blueprint/validation/run_validation_suite.py b/governance_blueprint/validation/run_validation_suite.py index 5e75b62..41d94f5 100644 --- a/governance_blueprint/validation/run_validation_suite.py +++ b/governance_blueprint/validation/run_validation_suite.py @@ -8,44 +8,79 @@ import argparse import json +import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path ROOT = Path(__file__).resolve().parents[2] +VALIDATION_DIR = ROOT / "governance_blueprint" / "validation" MALFORMED_VALIDATOR_JSON_RC = 3 +NO_SELFTESTS_DISCOVERED_RC = 4 -def _run(cmd: list[str], *, quiet: bool = False) -> int: +def _run(cmd: list[str], *, quiet: bool = False, env: dict | None = None) -> int: if not quiet: print("$", " ".join(cmd)) - completed = subprocess.run(cmd, cwd=ROOT) + completed = subprocess.run(cmd, cwd=ROOT, env=env) return completed.returncode -def build_steps(*, json_report: bool, skip_selftest: bool) -> list[list[str]]: +def _selftest_scripts() -> list[str]: + try: + git = subprocess.run( + ["git", "ls-files", "governance_blueprint/validation/selftest_*.py"], + cwd=ROOT, + capture_output=True, + text=True, + timeout=10, + ) + if git.returncode == 0: + paths = [ + line.strip() + for line in git.stdout.splitlines() + if line.strip().startswith("governance_blueprint/validation/selftest_") + and line.strip().endswith(".py") + and ".." not in Path(line.strip()).parts + ] + if paths: + return sorted(paths) + except (OSError, subprocess.SubprocessError): + pass + return sorted( + str(p.relative_to(ROOT)) + for p in VALIDATION_DIR.glob("selftest_*.py") + if p.is_file() + ) + + +def _is_selftest_step(step: list[str]) -> bool: + if len(step) < 2: + return False + return Path(step[1]).name.startswith("selftest_") and step[1].endswith(".py") + + +def build_steps(*, json_report: bool, skip_selftest: bool, opa_bin: str = "", require_opa: bool = False) -> list[list[str]]: steps: list[list[str]] = [ [sys.executable, "governance_blueprint/validation/generate_artifact_manifest.py", "--check"], ] + validate_cmd = [sys.executable, "governance_blueprint/validation/validate_artifacts.py"] if json_report: - steps.append( - [ - sys.executable, - "governance_blueprint/validation/validate_artifacts.py", - "--json", - ] - ) - else: - steps.append([sys.executable, "governance_blueprint/validation/validate_artifacts.py"]) + validate_cmd.append("--json") + if opa_bin: + validate_cmd.extend(["--opa-bin", opa_bin]) + if require_opa: + validate_cmd.append("--require-opa") + steps.append(validate_cmd) steps.append([sys.executable, "governance_blueprint/validation/lint_python_sources.py"]) steps.append([sys.executable, "governance_blueprint/validation/validate_dashboard_links.py"]) if not skip_selftest: - steps.append([sys.executable, "governance_blueprint/validation/selftest_validate_artifacts.py"]) - steps.append([sys.executable, "governance_blueprint/validation/selftest_run_validation_suite.py"]) + for selftest in _selftest_scripts(): + steps.append([sys.executable, selftest]) return steps @@ -90,9 +125,42 @@ def main() -> int: action="store_true", help="Continue running remaining steps after a failure and return the first non-zero code.", ) + parser.add_argument( + "--opa-bin", + type=str, + default="", + help="Optional explicit path to OPA binary for validator optional parse checks.", + ) + parser.add_argument( + "--require-opa", + action="store_true", + help="Fail validation if OPA binary is not available.", + ) args = parser.parse_args() - steps = build_steps(json_report=bool(args.json_report), skip_selftest=args.skip_selftest) + steps = build_steps( + json_report=bool(args.json_report), + skip_selftest=args.skip_selftest, + opa_bin=args.opa_bin, + require_opa=args.require_opa, + ) + if not args.skip_selftest: + has_selftest = any(_is_selftest_step(step) for step in steps) + if not has_selftest: + print("No validation selftests were discovered; refusing to continue.") + if args.suite_report: + _write_suite_report( + Path(args.suite_report), + [ + { + "name": "selftest_discovery", + "command": ["selftest_discovery"], + "returncode": NO_SELFTESTS_DISCOVERED_RC, + } + ], + None, + ) + return NO_SELFTESTS_DISCOVERED_RC step_results: list[dict] = [] validator_payload: dict | None = None first_failure_rc = 0 @@ -100,11 +168,14 @@ def main() -> int: for cmd in steps: step_name = Path(cmd[1]).name if len(cmd) > 1 else "unknown" - if args.json_report and cmd[-1] == "--json": + if args.json_report and "validate_artifacts.py" in cmd[1] and "--json" in cmd: report_path = Path(args.json_report) report_path.parent.mkdir(parents=True, exist_ok=True) with report_path.open("w", encoding="utf-8") as out: - completed = subprocess.run(cmd, cwd=ROOT, stdout=out) + env = None + if args.opa_bin: + env = {**os.environ, "OPA_BIN": args.opa_bin} + completed = subprocess.run(cmd, cwd=ROOT, stdout=out, env=env) rc = completed.returncode if rc == 0: try: @@ -122,7 +193,10 @@ def main() -> int: return rc continue - rc = _run(cmd, quiet=args.quiet) + env = None + if args.opa_bin: + env = {**os.environ, "OPA_BIN": args.opa_bin} + rc = _run(cmd, quiet=args.quiet, env=env) step_results.append({"name": step_name, "command": cmd, "returncode": rc}) if rc != 0: if first_failure_rc == 0: diff --git a/governance_blueprint/validation/selftest_generate_artifact_manifest.py b/governance_blueprint/validation/selftest_generate_artifact_manifest.py new file mode 100644 index 0000000..29f327b --- /dev/null +++ b/governance_blueprint/validation/selftest_generate_artifact_manifest.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +"""Unit tests for generate_artifact_manifest.py.""" + +from __future__ import annotations + +import importlib.util +import io +import json +import tempfile +import unittest +from contextlib import redirect_stdout +from unittest import mock +from pathlib import Path + +MODULE_PATH = Path(__file__).with_name("generate_artifact_manifest.py") +spec = importlib.util.spec_from_file_location("generate_artifact_manifest", MODULE_PATH) +gm = importlib.util.module_from_spec(spec) +assert spec and spec.loader +spec.loader.exec_module(gm) + + +class GenerateArtifactManifestTests(unittest.TestCase): + def setUp(self) -> None: + self.tmp = tempfile.TemporaryDirectory() + self.tmp_root = Path(self.tmp.name) + self.artifacts = self.tmp_root / "governance_blueprint" + self.artifacts.mkdir(parents=True, exist_ok=True) + + self.original_root = gm.ROOT + self.original_artifacts = gm.ARTIFACTS + self.original_manifest = gm.MANIFEST_PATH + self.original_base_defaults = gm.BASE_DEFAULT_FILES + self.original_external = gm.EXTERNAL_FILES + + gm.ROOT = self.tmp_root + gm.ARTIFACTS = self.artifacts + gm.MANIFEST_PATH = self.artifacts / "artifact_manifest.json" + gm.BASE_DEFAULT_FILES = ["control_mapping_matrix.csv"] + gm.EXTERNAL_FILES = ["REPORT.md"] + (self.artifacts / "validation").mkdir(parents=True, exist_ok=True) + (self.artifacts / "validation" / "selftest_tmp.py").write_text("print('ok')\n", encoding="utf-8") + + (self.artifacts / "control_mapping_matrix.csv").write_text("h\n", encoding="utf-8") + (self.tmp_root / "REPORT.md").write_text("report\n", encoding="utf-8") + + def tearDown(self) -> None: + gm.ROOT = self.original_root + gm.ARTIFACTS = self.original_artifacts + gm.MANIFEST_PATH = self.original_manifest + gm.BASE_DEFAULT_FILES = self.original_base_defaults + gm.EXTERNAL_FILES = self.original_external + self.tmp.cleanup() + + def test_safe_join_rejects_traversal(self) -> None: + with self.assertRaises(ValueError): + gm._safe_join(self.artifacts, "../oops.txt") + + def test_build_manifest_hashes_current_files(self) -> None: + manifest = gm.build_manifest(preserve_timestamp=False) + self.assertEqual(manifest["package"], "enterprise_agi_asi_governance_blueprint") + self.assertIn("control_mapping_matrix.csv", manifest["artifacts"]) + self.assertIn("validation/selftest_tmp.py", manifest["artifacts"]) + self.assertIn("REPORT.md", manifest["external_artifacts"]) + + def test_default_files_are_deduplicated(self) -> None: + gm.BASE_DEFAULT_FILES = ["control_mapping_matrix.csv", "validation/selftest_tmp.py"] + files = gm._default_files() + self.assertEqual(len(files), len(set(files))) + + def test_default_files_prefers_git_tracked_selftests_when_available(self) -> None: + class _R: + returncode = 0 + stdout = ( + "governance_blueprint/validation/selftest_run_validation_suite.py\n" + "../bad.py\n" + ) + + with mock.patch.object(gm.subprocess, "run", return_value=_R()): + files = gm._default_files() + self.assertIn("validation/selftest_run_validation_suite.py", files) + self.assertFalse(any(".." in f for f in files)) + + def test_default_files_falls_back_when_git_invocation_fails(self) -> None: + with mock.patch.object(gm.subprocess, "run", side_effect=gm.subprocess.TimeoutExpired(cmd="git", timeout=10)): + files = gm._default_files() + self.assertIn("validation/selftest_tmp.py", files) + + def test_check_fails_when_manifest_payload_differs(self) -> None: + manifest = gm.build_manifest(preserve_timestamp=True) + manifest["version"] = "0.0.1" + gm.MANIFEST_PATH.write_text(json.dumps(manifest), encoding="utf-8") + + with mock.patch("sys.argv", ["generate_artifact_manifest.py", "--check"]): + with redirect_stdout(io.StringIO()): + rc = gm.main() + self.assertEqual(rc, 1) + + def test_check_fails_on_invalid_generated_timestamp_shape(self) -> None: + manifest = gm.build_manifest(preserve_timestamp=True) + manifest["generated_utc"] = "2026/01/01 00:00:00" + gm.MANIFEST_PATH.write_text(json.dumps(manifest), encoding="utf-8") + + with mock.patch("sys.argv", ["generate_artifact_manifest.py", "--check"]): + with redirect_stdout(io.StringIO()): + rc = gm.main() + self.assertEqual(rc, 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/governance_blueprint/validation/selftest_run_validation_suite.py b/governance_blueprint/validation/selftest_run_validation_suite.py index b244522..d12396f 100644 --- a/governance_blueprint/validation/selftest_run_validation_suite.py +++ b/governance_blueprint/validation/selftest_run_validation_suite.py @@ -21,20 +21,51 @@ class RunValidationSuiteTests(unittest.TestCase): + def test_is_selftest_step_uses_script_name(self) -> None: + self.assertTrue(rs._is_selftest_step([sys.executable, "governance_blueprint/validation/selftest_validate_artifacts.py"])) + self.assertFalse(rs._is_selftest_step([sys.executable, "governance_blueprint/validation/validate_artifacts.py", "--flag=selftest_x"])) + self.assertFalse(rs._is_selftest_step([sys.executable])) + + def test_selftest_script_discovery_uses_git_ls_files_when_available(self) -> None: + class _R: + returncode = 0 + stdout = ( + "governance_blueprint/validation/selftest_validate_artifacts.py\n" + "../bad.py\n" + ) + + with patch.object(rs.subprocess, "run", return_value=_R()): + scripts = rs._selftest_scripts() + self.assertEqual(scripts, ["governance_blueprint/validation/selftest_validate_artifacts.py"]) + + def test_selftest_script_discovery_is_sorted(self) -> None: + scripts = rs._selftest_scripts() + self.assertEqual(scripts, sorted(scripts)) + self.assertTrue(all(s.endswith(".py") for s in scripts)) + + def test_selftest_script_discovery_falls_back_on_git_failure(self) -> None: + class _R: + returncode = 1 + stdout = "" + + with patch.object(rs.subprocess, "run", return_value=_R()): + scripts = rs._selftest_scripts() + self.assertTrue(any(s.endswith("selftest_run_validation_suite.py") for s in scripts)) + def test_build_steps_without_json_report(self) -> None: - steps = rs.build_steps(json_report=False, skip_selftest=False) + steps = rs.build_steps(json_report=False, skip_selftest=False, opa_bin="", require_opa=False) + selftests = rs._selftest_scripts() expected = [ [sys.executable, "governance_blueprint/validation/generate_artifact_manifest.py", "--check"], [sys.executable, "governance_blueprint/validation/validate_artifacts.py"], [sys.executable, "governance_blueprint/validation/lint_python_sources.py"], [sys.executable, "governance_blueprint/validation/validate_dashboard_links.py"], - [sys.executable, "governance_blueprint/validation/selftest_validate_artifacts.py"], - [sys.executable, "governance_blueprint/validation/selftest_run_validation_suite.py"], ] + expected.extend([[sys.executable, p] for p in selftests]) self.assertEqual(steps, expected) def test_build_steps_with_json_and_skip_selftest(self) -> None: - steps = rs.build_steps(json_report=True, skip_selftest=True) + steps = rs.build_steps(json_report=True, skip_selftest=True, opa_bin="", require_opa=False) expected = [ [sys.executable, "governance_blueprint/validation/generate_artifact_manifest.py", "--check"], [sys.executable, "governance_blueprint/validation/validate_artifacts.py", "--json"], @@ -43,11 +74,65 @@ def test_build_steps_with_json_and_skip_selftest(self) -> None: ] self.assertEqual(steps, expected) + def test_build_steps_with_opa_bin(self) -> None: + steps = rs.build_steps(json_report=False, skip_selftest=True, opa_bin="/tmp/opa", require_opa=False) + self.assertEqual( + steps[1], + [ + sys.executable, + "governance_blueprint/validation/validate_artifacts.py", + "--opa-bin", + "/tmp/opa", + ], + ) + + + def test_build_steps_with_json_and_opa_bin(self) -> None: + steps = rs.build_steps(json_report=True, skip_selftest=True, opa_bin="/tmp/opa", require_opa=False) + self.assertEqual( + steps[1], + [ + sys.executable, + "governance_blueprint/validation/validate_artifacts.py", + "--json", + "--opa-bin", + "/tmp/opa", + ], + ) + + + def test_build_steps_with_require_opa(self) -> None: + steps = rs.build_steps(json_report=False, skip_selftest=True, opa_bin="", require_opa=True) + self.assertEqual( + steps[1], + [ + sys.executable, + "governance_blueprint/validation/validate_artifacts.py", + "--require-opa", + ], + ) + + + def test_build_steps_with_json_and_require_opa(self) -> None: + steps = rs.build_steps(json_report=True, skip_selftest=True, opa_bin="/tmp/opa", require_opa=True) + self.assertEqual( + steps[1], + [ + sys.executable, + "governance_blueprint/validation/validate_artifacts.py", + "--json", + "--opa-bin", + "/tmp/opa", + "--require-opa", + ], + ) + + def test_suite_writes_json_report_path(self) -> None: with tempfile.TemporaryDirectory() as tmp: report = Path(tmp) / "report.json" - def fake_run(cmd, cwd=None, stdout=None): + def fake_run(cmd, cwd=None, stdout=None, **kwargs): class R: returncode = 0 @@ -63,12 +148,77 @@ class R: self.assertTrue(report.exists()) self.assertIn('"ok": true', report.read_text(encoding="utf-8")) + def test_json_report_mode_works_with_opa_bin_flag(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + report = Path(tmp) / "report-with-opa.json" + + def fake_run(cmd, cwd=None, stdout=None, **kwargs): + class R: + returncode = 0 + + if stdout is not None: + stdout.write('{"ok": true}\n') + return R() + + with patch.object(rs.subprocess, "run", side_effect=fake_run): + with patch( + "sys.argv", + [ + "run_validation_suite.py", + "--json-report", + str(report), + "--opa-bin", + "/tmp/opa", + "--skip-selftest", + "--quiet", + ], + ): + rc = rs.main() + + self.assertEqual(rc, 0) + self.assertTrue(report.exists()) + self.assertIn('"ok": true', report.read_text(encoding="utf-8")) + + + def test_json_report_uses_opa_env_override(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + report = Path(tmp) / "report-env.json" + seen_envs = [] + + def fake_run(cmd, cwd=None, stdout=None, **kwargs): + class R: + returncode = 0 + + seen_envs.append(kwargs.get("env")) + if stdout is not None: + stdout.write('{"ok": true}\n') + return R() + + with patch.object(rs.subprocess, "run", side_effect=fake_run): + with patch( + "sys.argv", + [ + "run_validation_suite.py", + "--json-report", + str(report), + "--opa-bin", + "/tmp/opa", + "--skip-selftest", + "--quiet", + ], + ): + rc = rs.main() + + self.assertEqual(rc, 0) + self.assertTrue(any(env and env.get("OPA_BIN") == "/tmp/opa" for env in seen_envs)) + + def test_suite_writes_suite_report(self) -> None: with tempfile.TemporaryDirectory() as tmp: validator_report = Path(tmp) / "validator.json" suite_report = Path(tmp) / "suite.json" - def fake_run(cmd, cwd=None, stdout=None): + def fake_run(cmd, cwd=None, stdout=None, **kwargs): class R: returncode = 0 @@ -119,7 +269,7 @@ def test_malformed_validator_json_fails(self) -> None: report = Path(tmp) / "bad-validator.json" suite_report = Path(tmp) / "suite.json" - def fake_run(cmd, cwd=None, stdout=None): + def fake_run(cmd, cwd=None, stdout=None, **kwargs): class R: returncode = 0 @@ -174,6 +324,27 @@ def test_no_fail_fast_runs_all_steps(self) -> None: self.assertEqual(payload["steps"][0]["returncode"], 2) self.assertEqual(payload["steps"][-1]["returncode"], 0) + def test_no_selftests_discovered_returns_specific_code(self) -> None: + with patch.object(rs, "_selftest_scripts", return_value=[]): + with patch("sys.argv", ["run_validation_suite.py", "--quiet"]): + with redirect_stdout(io.StringIO()): + rc = rs.main() + self.assertEqual(rc, rs.NO_SELFTESTS_DISCOVERED_RC) + + def test_no_selftests_discovered_writes_suite_report_when_requested(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + suite_report = Path(tmp) / "suite-no-selftests.json" + with patch.object(rs, "_selftest_scripts", return_value=[]): + with patch("sys.argv", ["run_validation_suite.py", "--quiet", "--suite-report", str(suite_report)]): + with redirect_stdout(io.StringIO()): + rc = rs.main() + self.assertEqual(rc, rs.NO_SELFTESTS_DISCOVERED_RC) + self.assertTrue(suite_report.exists()) + payload = json.loads(suite_report.read_text(encoding="utf-8")) + self.assertFalse(payload["ok"]) + self.assertEqual(payload["steps"][0]["name"], "selftest_discovery") + self.assertEqual(payload["steps"][0]["returncode"], rs.NO_SELFTESTS_DISCOVERED_RC) + if __name__ == "__main__": diff --git a/governance_blueprint/validation/selftest_validate_artifacts.py b/governance_blueprint/validation/selftest_validate_artifacts.py index e0ed58f..c8d3f25 100644 --- a/governance_blueprint/validation/selftest_validate_artifacts.py +++ b/governance_blueprint/validation/selftest_validate_artifacts.py @@ -3,10 +3,15 @@ from __future__ import annotations -import importlib.util +from contextlib import redirect_stdout import hashlib +import importlib.util +import io import json +import os import tempfile +import sys +from unittest.mock import patch import unittest from pathlib import Path @@ -23,11 +28,17 @@ def setUp(self) -> None: self.tmp_path = Path(self.tmp.name) self.artifacts = self.tmp_path / "governance_blueprint" self._seed_valid_artifacts() + self.original_root = va.ROOT self.original_artifacts = va.ARTIFACTS + self.original_report_path = va.REPORT_PATH + va.ROOT = self.tmp_path va.ARTIFACTS = self.artifacts + va.REPORT_PATH = self.tmp_path / "REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md" def tearDown(self) -> None: + va.ROOT = self.original_root va.ARTIFACTS = self.original_artifacts + va.REPORT_PATH = self.original_report_path self.tmp.cleanup() def _write(self, path: Path, text: str) -> None: @@ -74,14 +85,51 @@ def _seed_valid_artifacts(self) -> None: } self._write(self.artifacts / "evidence_event_schema.json", json.dumps(schema)) + compliance_profile = { + "profile_id": "id", + "version": "1.0.0", + "framework_mappings": [ + { + "control_id": f"AIGOV-00{i}", + "control_family": "x", + "frameworks": ["EU_AI_Act"], + "owner": "CRO", + "implementation": ["ctrl"], + "evidence": ["e1"], + } + for i in range(1, 6) + ], + "implementation_strategy": {"phase_2026": "x"}, + } + self._write(self.artifacts / "compliance_profile_2026.json", json.dumps(compliance_profile)) + + annex_template = { + "template_id": "eu-ai-act-annex-iv-tech-doc-v1", + "version": "1.0.0", + "sections": [{"id": sid, "name": sid, "required": True} for sid in "ABCDEFGH"], + "metadata": {}, + "evidence_links": {}, + } + self._write(self.artifacts / "annex_iv_technical_documentation_template.json", json.dumps(annex_template)) + self._write( self.artifacts / "opa" / "release_gate.rego", "package aigov.release\n" "default allow = false\n" - "baseline_requirements { true }\n" - "allow { input.risk_tier <= 2 }\n" - "allow { input.risk_tier >= 3 }\n" - "allow { input.risk_tier == 4 }\n", + "baseline_requirements if { true }\n" + "allow if { input.risk_tier <= 2 }\n" + "allow if { input.risk_tier >= 3 }\n" + "allow if { input.risk_tier == 4 }\n", + ) + + self._write( + self.artifacts / "opa" / "systemic_risk_guardrails.rego", + "package aigov.systemic\n" + "default allow = false\n" + "allow if { input.risk_tier >= 4; input.safety_case.approved; input.compute_registry.registered }\n" + "deny contains msg if { input.risk_tier >= 4; not input.safety_case.approved; msg := \"x\" }\n" + "deny contains msg if { input.risk_tier >= 4; not input.compute_registry.registered; msg := \"x\" }\n" + "deny contains msg if { input.risk_tier >= 4; input.crisis_simulation.last_run_days > 180; msg := \"x\" }\n", ) self._write( @@ -100,29 +148,83 @@ def _seed_valid_artifacts(self) -> None: " - three\n", ) - # Generate manifest hashes for seeded files. + self._write( + self.artifacts / "rollout_plan_2026_2030.yaml", + "program: p\n" + "version: 1\n" + "phases:\n" + " - name: Phase A - foundation\n" + " dependencies:\n" + " - d1\n" + " exit_criteria:\n" + " - e1\n" + " - name: Phase B - scale\n" + " dependencies:\n" + " - d1\n" + " exit_criteria:\n" + " - e1\n" + " - name: Phase C - assurance\n" + " dependencies:\n" + " - d1\n" + " exit_criteria:\n" + " - e1\n" + " - name: Phase D - integration\n" + " dependencies:\n" + " - d1\n" + " exit_criteria:\n" + " - e1\n" + " - name: Phase E - adaptive\n" + " dependencies:\n" + " - d1\n" + " exit_criteria:\n" + " - e1\n", + ) + + self._write( + self.tmp_path / "REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md", + "<title>x\n" + "x\n" + "\n" + "## 2) Integrated Regulatory Compliance Framework Mapping and Implementation\n" + "## 3) Institutional-Grade Governance Platform Technical Architecture\n" + "## 4) AGI/ASI Safety, Containment, and Crisis Simulation Blueprint\n" + "## 5) Civilizational-Scale AI and Compute Governance Mechanisms\n" + "## 7) 2026–2030 Dependency-Aware Implementation Roadmap\n" + "
x
\n" + "
x
\n" + "
x
\n" + "
\n", + ) + hash_targets = [ "control_mapping_matrix.csv", "evidence_event_schema.json", + "compliance_profile_2026.json", + "annex_iv_technical_documentation_template.json", "opa/release_gate.rego", + "opa/systemic_risk_guardrails.rego", "roadmap_2026_2030.yaml", + "rollout_plan_2026_2030.yaml", ] - manifest = { - "package": "test", - "version": "test", - "generated_utc": "test", - "artifacts": {}, - } + manifest = {"package": "test", "version": "1.0.0", "generated_utc": "2026-01-01T00:00:00Z", "artifacts": {}, "external_artifacts": {}} for rel in hash_targets: p = self.artifacts / rel manifest["artifacts"][rel] = hashlib.sha256(p.read_bytes()).hexdigest() + report_rel = "REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md" + manifest["external_artifacts"][report_rel] = hashlib.sha256((self.tmp_path / report_rel).read_bytes()).hexdigest() self._write(self.artifacts / "artifact_manifest.json", json.dumps(manifest)) def test_all_validators_pass_for_good_assets(self) -> None: self.assertEqual(va.validate_csv(), []) self.assertEqual(va.validate_json_schema(), []) - self.assertEqual(va.validate_rego(), []) + self.assertEqual(va.validate_compliance_profile(), []) + self.assertEqual(va.validate_annex_iv_template(), []) + self.assertEqual(va.validate_rego_release_gate(), []) + self.assertEqual(va.validate_rego_systemic_guardrails(), []) self.assertEqual(va.validate_yaml_shape(), []) + self.assertEqual(va.validate_rollout_plan(), []) + self.assertEqual(va.validate_report_structure(), []) + self.assertEqual(va.validate_manifest_schema(), []) self.assertEqual(va.validate_manifest_hashes(), []) def test_schema_missing_model_id_fails(self) -> None: @@ -130,45 +232,127 @@ def test_schema_missing_model_id_fails(self) -> None: schema = json.loads(schema_path.read_text(encoding="utf-8")) schema["properties"].pop("model_id") schema_path.write_text(json.dumps(schema), encoding="utf-8") - errors = va.validate_json_schema() self.assertTrue(any("model_id" in e for e in errors)) - def test_rego_missing_blocks_fails(self) -> None: - (self.artifacts / "opa" / "release_gate.rego").write_text( - "package aigov.release\ndefault allow = false\nallow { input.risk_tier <= 2 }\n", + def test_systemic_rego_missing_deny_fails(self) -> None: + (self.artifacts / "opa" / "systemic_risk_guardrails.rego").write_text( + "package aigov.systemic\ndefault allow = false\nallow if { input.risk_tier >= 4 }\n", encoding="utf-8", ) - - errors = va.validate_rego() - self.assertTrue(any("baseline_requirements" in e or "allow blocks" in e for e in errors)) + errors = va.validate_rego_systemic_guardrails() + self.assertTrue(any("deny blocks" in e for e in errors)) def test_manifest_hash_mismatch_fails(self) -> None: - # Mutate a file after manifest generation. - (self.artifacts / "roadmap_2026_2030.yaml").write_text( - "program: changed\nversion: 1\nhorizon: h\nphases:\n - name: foundation\nworkstreams:\n - one\n", - encoding="utf-8", - ) + (self.artifacts / "rollout_plan_2026_2030.yaml").write_text("program: bad\n", encoding="utf-8") errors = va.validate_manifest_hashes() self.assertTrue(any("Hash mismatch" in e for e in errors)) - def test_yaml_shape_fails_when_insufficient_workstreams(self) -> None: - (self.artifacts / "roadmap_2026_2030.yaml").write_text( - "program: p\n" - "version: 1\n" - "horizon: h\n" - "phases:\n" - " - name: foundation\n" - " - name: industrialization\n" - " - name: advanced_assurance\n" - " - name: resilience_and_advantage\n" - "workstreams:\n" - " - one\n" - " - two\n", + def test_rollout_plan_missing_phase_fails(self) -> None: + (self.artifacts / "rollout_plan_2026_2030.yaml").write_text( + "program: p\nversion: 1\nphases:\n - name: Phase A\n", + encoding="utf-8", + ) + errors = va.validate_rollout_plan() + self.assertTrue(any("at least 5 phases" in e for e in errors)) + + def test_report_missing_token_fails(self) -> None: + (self.tmp_path / "REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md").write_text( + "x\nx\nmissing sections\n", encoding="utf-8", ) - errors = va.validate_yaml_shape() - self.assertTrue(any("at least 3 workstreams" in e for e in errors)) + errors = va.validate_report_structure() + self.assertTrue(any("missing required token" in e for e in errors)) + + def test_manifest_external_path_traversal_fails(self) -> None: + manifest_path = self.artifacts / "artifact_manifest.json" + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + manifest["external_artifacts"] = {"../bad.md": "d" * 64} + manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + errors = va.validate_manifest_hashes() + self.assertTrue(any("not allowed" in e or "escapes repository root" in e for e in errors)) + + def test_manifest_schema_invalid_version_fails(self) -> None: + manifest_path = self.artifacts / "artifact_manifest.json" + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + manifest["version"] = "1.4" + manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + errors = va.validate_manifest_schema() + self.assertTrue(any("semantic version" in e for e in errors)) + + def test_manifest_schema_rejects_non_object_json(self) -> None: + manifest_path = self.artifacts / "artifact_manifest.json" + manifest_path.write_text('["not-an-object"]', encoding="utf-8") + errors = va.validate_manifest_schema() + self.assertTrue(any("must be a JSON object" in e for e in errors)) + + def test_manifest_hashes_reject_non_object_json(self) -> None: + manifest_path = self.artifacts / "artifact_manifest.json" + manifest_path.write_text('["not-an-object"]', encoding="utf-8") + errors = va.validate_manifest_hashes() + self.assertTrue(any("must be a JSON object" in e for e in errors)) + + def test_manifest_hash_value_must_be_sha256_hex(self) -> None: + manifest_path = self.artifacts / "artifact_manifest.json" + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + manifest["artifacts"]["roadmap_2026_2030.yaml"] = "not-a-sha256" + manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + errors = va.validate_manifest_hashes() + self.assertTrue(any("64-char lowercase hex SHA-256" in e for e in errors)) + + def test_manifest_external_hash_value_must_be_sha256_hex(self) -> None: + manifest_path = self.artifacts / "artifact_manifest.json" + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + key = "REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md" + manifest["external_artifacts"][key] = "12345" + manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + errors = va.validate_manifest_hashes() + self.assertTrue(any("64-char lowercase hex SHA-256" in e for e in errors)) + + def test_require_opa_fails_when_binary_missing(self) -> None: + previous = os.environ.get("OPA_BIN") + os.environ.pop("OPA_BIN", None) + try: + with patch.object(va.shutil, "which", return_value=None): + errors = va.validate_opa_parse_optional(require_opa=True) + self.assertTrue(any("required" in e for e in errors)) + finally: + if previous is not None: + os.environ["OPA_BIN"] = previous + + + def test_main_require_opa_returns_nonzero_when_missing(self) -> None: + previous_env = os.environ.get("OPA_BIN") + os.environ.pop("OPA_BIN", None) + try: + with patch.object(va.shutil, "which", return_value=None): + with patch.object(sys, "argv", ["validate_artifacts.py", "--require-opa"]): + with redirect_stdout(io.StringIO()): + rc = va.main() + self.assertNotEqual(rc, 0) + finally: + if previous_env is not None: + os.environ["OPA_BIN"] = previous_env + + + def test_opa_bin_override_invalid_path_reports_error(self) -> None: + previous = os.environ.get("OPA_BIN") + os.environ["OPA_BIN"] = str(self.tmp_path / "missing-opa") + try: + errors = va.validate_opa_parse_optional() + self.assertTrue(any("does not exist" in e for e in errors)) + finally: + if previous is None: + os.environ.pop("OPA_BIN", None) + else: + os.environ["OPA_BIN"] = previous + + def test_opa_parse_subprocess_failure_reports_error(self) -> None: + opa = self.tmp_path / "opa" + opa.write_text("", encoding="utf-8") + with patch.object(va.subprocess, "run", side_effect=va.subprocess.TimeoutExpired(cmd="opa parse", timeout=20)): + errors = va.validate_opa_parse_optional(opa_bin_override=str(opa), require_opa=True) + self.assertTrue(any("execution failed" in e for e in errors)) if __name__ == "__main__": diff --git a/governance_blueprint/validation/validate_artifacts.py b/governance_blueprint/validation/validate_artifacts.py index 76436cd..d4801c1 100644 --- a/governance_blueprint/validation/validate_artifacts.py +++ b/governance_blueprint/validation/validate_artifacts.py @@ -2,7 +2,7 @@ """Static validator for governance blueprint machine-readable artifacts. Runs dependency-light checks so CI can validate artifacts without requiring -external tooling (OPA/yq/etc.). +external tooling. """ from __future__ import annotations @@ -11,11 +11,43 @@ import csv import hashlib import json +import os import re +import shutil +import subprocess +from typing import Any from pathlib import Path ROOT = Path(__file__).resolve().parents[2] ARTIFACTS = ROOT / "governance_blueprint" +REPORT_PATH = ROOT / "REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md" + + +_HEX64_RE = re.compile(r"^[0-9a-f]{64}$") + + +def _path_stays_within_root(path: Path, root: Path) -> bool: + try: + path.resolve().relative_to(root.resolve()) + return True + except ValueError: + return False + + +def _load_json(path: Path) -> tuple[Any, list[str]]: + try: + return json.loads(path.read_text(encoding="utf-8")), [] + except json.JSONDecodeError as exc: + return None, [f"Invalid JSON in {path.name}: {exc}"] + + +def _load_json_object(path: Path, *, label: str) -> tuple[dict, list[str]]: + data, load_errors = _load_json(path) + if load_errors: + return {}, load_errors + if not isinstance(data, dict): + return {}, [f"{label} must be a JSON object at the top level."] + return data, [] def validate_csv() -> list[str]: @@ -59,8 +91,10 @@ def validate_csv() -> list[str]: def validate_json_schema() -> list[str]: errors: list[str] = [] path = ARTIFACTS / "evidence_event_schema.json" - with path.open(encoding="utf-8") as f: - data = json.load(f) + data, load_errors = _load_json_object(path, label=path.name) + errors.extend(load_errors) + if load_errors: + return errors required_top_level = {"$schema", "title", "type", "required", "properties"} missing = required_top_level.difference(data.keys()) @@ -85,7 +119,60 @@ def validate_json_schema() -> list[str]: return errors -def validate_rego() -> list[str]: +def validate_compliance_profile() -> list[str]: + errors: list[str] = [] + path = ARTIFACTS / "compliance_profile_2026.json" + data, load_errors = _load_json_object(path, label=path.name) + errors.extend(load_errors) + if load_errors: + return errors + + for key in ["profile_id", "version", "framework_mappings", "implementation_strategy"]: + if key not in data: + errors.append(f"compliance_profile_2026.json missing key: {key}") + + mappings = data.get("framework_mappings", []) + if not isinstance(mappings, list) or len(mappings) < 5: + errors.append("compliance_profile_2026.json must include at least 5 framework mappings.") + return errors + + required_entry_keys = {"control_id", "control_family", "frameworks", "owner", "implementation", "evidence"} + for idx, entry in enumerate(mappings, start=1): + if not isinstance(entry, dict): + errors.append(f"framework_mappings[{idx}] must be an object") + continue + missing = required_entry_keys.difference(entry.keys()) + if missing: + errors.append(f"framework_mappings[{idx}] missing keys: {sorted(missing)}") + + return errors + + +def validate_annex_iv_template() -> list[str]: + errors: list[str] = [] + path = ARTIFACTS / "annex_iv_technical_documentation_template.json" + data, load_errors = _load_json_object(path, label=path.name) + errors.extend(load_errors) + if load_errors: + return errors + + for key in ["template_id", "version", "sections", "metadata", "evidence_links"]: + if key not in data: + errors.append(f"annex_iv template missing key: {key}") + + sections = data.get("sections", []) + if not isinstance(sections, list) or len(sections) < 8: + errors.append("annex_iv template must define at least 8 required sections.") + + required_section_ids = {"A", "B", "C", "D", "E", "F", "G", "H"} + section_ids = {str(s.get("id")) for s in sections if isinstance(s, dict)} + if not required_section_ids.issubset(section_ids): + errors.append("annex_iv template is missing one or more section IDs A-H.") + + return errors + + +def validate_rego_release_gate() -> list[str]: errors: list[str] = [] path = ARTIFACTS / "opa" / "release_gate.rego" text = path.read_text(encoding="utf-8") @@ -100,17 +187,38 @@ def validate_rego() -> list[str]: ] for token in expected_tokens: if token not in text: - errors.append(f"Rego policy missing expected token: {token}") + errors.append(f"release_gate.rego missing expected token: {token}") - allow_count = text.count("allow {") - if allow_count < 3: - errors.append("Rego policy must define at least three allow blocks.") + if text.count("allow if") < 3: + errors.append("release_gate.rego must define at least three allow blocks.") + + return errors + + +def validate_rego_systemic_guardrails() -> list[str]: + errors: list[str] = [] + path = ARTIFACTS / "opa" / "systemic_risk_guardrails.rego" + text = path.read_text(encoding="utf-8") + + expected_tokens = [ + "package aigov.systemic", + "default allow = false", + "input.risk_tier >= 4", + "input.safety_case.approved", + "input.compute_registry.registered", + "deny contains msg if", + ] + for token in expected_tokens: + if token not in text: + errors.append(f"systemic_risk_guardrails.rego missing expected token: {token}") + + if text.count("deny contains msg if") < 3: + errors.append("systemic_risk_guardrails.rego must define at least three deny blocks.") return errors def validate_yaml_shape() -> list[str]: - """Structure checks without external YAML parser dependency.""" errors: list[str] = [] path = ARTIFACTS / "roadmap_2026_2030.yaml" text = path.read_text(encoding="utf-8") @@ -131,12 +239,7 @@ def validate_yaml_shape() -> list[str]: errors.append(f"YAML roadmap missing expected token: {token}") phase_names = re.findall(r"^\s*-\s+name:\s*([a-zA-Z0-9_]+)\s*$", text, flags=re.MULTILINE) - expected_phases = [ - "foundation", - "industrialization", - "advanced_assurance", - "resilience_and_advantage", - ] + expected_phases = ["foundation", "industrialization", "advanced_assurance", "resilience_and_advantage"] if phase_names[:4] != expected_phases: errors.append(f"YAML roadmap phase order mismatch: expected {expected_phases}, got {phase_names[:4]}") @@ -144,7 +247,6 @@ def validate_yaml_shape() -> list[str]: if len(workstream_entries) < 3: errors.append("YAML roadmap must define at least 3 workstreams.") - # Lightweight indentation sanity for list entries. for ln, line in enumerate(text.splitlines(), start=1): if "\t" in line: errors.append(f"YAML roadmap has tab indentation at line {ln}; use spaces only.") @@ -152,57 +254,202 @@ def validate_yaml_shape() -> list[str]: return errors +def validate_rollout_plan() -> list[str]: + errors: list[str] = [] + path = ARTIFACTS / "rollout_plan_2026_2030.yaml" + text = path.read_text(encoding="utf-8") + + for token in ["program:", "version:", "phases:", "Phase A", "Phase E", "dependencies:", "exit_criteria:"]: + if token not in text: + errors.append(f"rollout_plan_2026_2030.yaml missing expected token: {token}") + + phase_count = len(re.findall(r"^\s*-\s+name:\s+Phase\s+[A-E]", text, flags=re.MULTILINE)) + if phase_count < 5: + errors.append("rollout_plan_2026_2030.yaml must define at least 5 phases (A-E).") + + return errors + + +def validate_report_structure() -> list[str]: + errors: list[str] = [] + if not REPORT_PATH.exists(): + return [f"Missing report file: {REPORT_PATH.name}"] + + text = REPORT_PATH.read_text(encoding="utf-8") + tag_pairs = [("", ""), ("", ""), ("", "")] + for open_tag, close_tag in tag_pairs: + if text.count(open_tag) != 1 or text.count(close_tag) != 1: + errors.append(f"Technical report must contain exactly one {open_tag}/{close_tag} pair") + continue + if text.index(open_tag) > text.index(close_tag): + errors.append(f"Technical report has invalid tag order for {open_tag}/{close_tag}") + + required_tokens = [ + "## 2) Integrated Regulatory Compliance Framework Mapping and Implementation", + "## 3) Institutional-Grade Governance Platform Technical Architecture", + "## 4) AGI/ASI Safety, Containment, and Crisis Simulation Blueprint", + "## 5) Civilizational-Scale AI and Compute Governance Mechanisms", + "## 7) 2026–2030 Dependency-Aware Implementation Roadmap", + '
', + '
', + '
', + ] + for token in required_tokens: + if token not in text: + errors.append(f"Technical report missing required token: {token}") + + return errors + + +def validate_opa_parse_optional(opa_bin_override: str = "", require_opa: bool = False) -> list[str]: + """Optionally validate Rego syntax if an OPA binary is available. + + Resolution order: + 1) OPA_BIN environment variable + 2) `opa` discovered on PATH + """ + env_opa = os.getenv("OPA_BIN", "").strip() + opa_bin = opa_bin_override.strip() or env_opa or shutil.which("opa") + if not opa_bin: + if require_opa: + return ["OPA binary is required but not found. Set --opa-bin or OPA_BIN."] + return [] + if not Path(opa_bin).exists(): + return [f"OPA_BIN path does not exist: {opa_bin}"] + + errors: list[str] = [] + targets = [ARTIFACTS / "opa" / "release_gate.rego", ARTIFACTS / "opa" / "systemic_risk_guardrails.rego"] + for target in targets: + try: + proc = subprocess.run( + [opa_bin, "parse", str(target)], + capture_output=True, + text=True, + timeout=20, + ) + except (OSError, subprocess.SubprocessError) as exc: + errors.append(f"OPA parse execution failed for {target.name}: {exc}") + continue + if proc.returncode != 0: + stderr = (proc.stderr or proc.stdout).strip() + errors.append(f"OPA parse failed for {target.name}: {stderr}") + return errors + + +def validate_manifest_schema() -> list[str]: + errors: list[str] = [] + manifest_path = ARTIFACTS / "artifact_manifest.json" + manifest, load_errors = _load_json_object(manifest_path, label=manifest_path.name) + if load_errors: + return load_errors + + required_keys = {"package", "version", "generated_utc", "artifacts", "external_artifacts"} + missing = required_keys.difference(manifest.keys()) + if missing: + errors.append(f"artifact_manifest.json missing keys: {sorted(missing)}") + + version = manifest.get("version", "") + if not isinstance(version, str) or not re.match(r"^\d+\.\d+\.\d+$", version): + errors.append("artifact_manifest.json version must use semantic version format (x.y.z).") + + generated_utc = manifest.get("generated_utc", "") + if not isinstance(generated_utc, str) or not re.match(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$", generated_utc): + errors.append("artifact_manifest.json generated_utc must be in UTC ISO format YYYY-MM-DDTHH:MM:SSZ.") + + return errors + + def validate_manifest_hashes() -> list[str]: errors: list[str] = [] manifest_path = ARTIFACTS / "artifact_manifest.json" if not manifest_path.exists(): return ["artifact_manifest.json not found."] - with manifest_path.open(encoding="utf-8") as f: - manifest = json.load(f) + manifest, load_errors = _load_json_object(manifest_path, label=manifest_path.name) + if load_errors: + return load_errors artifacts = manifest.get("artifacts") if not isinstance(artifacts, dict) or not artifacts: return ["artifact_manifest.json must contain a non-empty 'artifacts' object."] for rel_path, expected_hash in artifacts.items(): + if not isinstance(rel_path, str): + errors.append(f"Manifest artifact path key must be a string: {rel_path!r}") + continue + if not isinstance(expected_hash, str) or not _HEX64_RE.match(expected_hash): + errors.append(f"Manifest hash for {rel_path} must be a 64-char lowercase hex SHA-256.") + continue + if rel_path.startswith("/") or ".." in Path(rel_path).parts: + errors.append(f"Manifest artifact path is not allowed: {rel_path}") + continue artifact_path = ARTIFACTS / rel_path + if not _path_stays_within_root(artifact_path, ARTIFACTS): + errors.append(f"Manifest artifact path escapes artifact root: {rel_path}") + continue if not artifact_path.exists(): errors.append(f"Manifest references missing file: {rel_path}") continue actual_hash = hashlib.sha256(artifact_path.read_bytes()).hexdigest() if actual_hash != expected_hash: - errors.append( - f"Hash mismatch for {rel_path}: expected {expected_hash}, got {actual_hash}" - ) + errors.append(f"Hash mismatch for {rel_path}: expected {expected_hash}, got {actual_hash}") + + external_artifacts = manifest.get("external_artifacts", {}) + if not isinstance(external_artifacts, dict): + errors.append("artifact_manifest.json 'external_artifacts' must be an object when present.") + return errors + + for rel_path, expected_hash in external_artifacts.items(): + if not isinstance(rel_path, str): + errors.append(f"Manifest external artifact path key must be a string: {rel_path!r}") + continue + if not isinstance(expected_hash, str) or not _HEX64_RE.match(expected_hash): + errors.append(f"Manifest external hash for {rel_path} must be a 64-char lowercase hex SHA-256.") + continue + if rel_path.startswith("/") or ".." in Path(rel_path).parts: + errors.append(f"Manifest external artifact path is not allowed: {rel_path}") + continue + external_path = ROOT / rel_path + if not _path_stays_within_root(external_path, ROOT): + errors.append(f"Manifest external artifact path escapes repository root: {rel_path}") + continue + if not external_path.exists(): + errors.append(f"Manifest references missing external file: {rel_path}") + continue + actual_hash = hashlib.sha256(external_path.read_bytes()).hexdigest() + if actual_hash != expected_hash: + errors.append(f"External hash mismatch for {rel_path}: expected {expected_hash}, got {actual_hash}") + return errors -def run_checks() -> dict[str, list[str]]: +def run_checks(*, opa_bin_override: str = "", require_opa: bool = False) -> dict[str, list[str]]: checks = { "control_mapping_matrix.csv": validate_csv, "evidence_event_schema.json": validate_json_schema, - "opa/release_gate.rego": validate_rego, + "compliance_profile_2026.json": validate_compliance_profile, + "annex_iv_technical_documentation_template.json": validate_annex_iv_template, + "opa/release_gate.rego": validate_rego_release_gate, + "opa/systemic_risk_guardrails.rego": validate_rego_systemic_guardrails, "roadmap_2026_2030.yaml": validate_yaml_shape, + "rollout_plan_2026_2030.yaml": validate_rollout_plan, + "REGULATOR_READY_AGI_ASI_TECHNICAL_REPORT_2026_2030.md": validate_report_structure, + "artifact_manifest.schema": validate_manifest_schema, + "opa.parse_optional": (lambda: validate_opa_parse_optional(opa_bin_override, require_opa=require_opa)), "artifact_manifest.json": validate_manifest_hashes, } - results: dict[str, list[str]] = {} - for name, fn in checks.items(): - results[name] = fn() - return results + return {name: fn() for name, fn in checks.items()} def main() -> int: parser = argparse.ArgumentParser(description="Validate governance blueprint artifacts.") - parser.add_argument( - "--json", - action="store_true", - help="Print machine-readable JSON output for CI integrations.", - ) + parser.add_argument("--json", action="store_true", help="Print machine-readable JSON output for CI integrations.") + parser.add_argument("--opa-bin", type=str, default="", help="Optional explicit OPA binary path for optional parse checks.") + parser.add_argument("--require-opa", action="store_true", help="Fail if OPA binary is unavailable for parse checks.") args = parser.parse_args() - results = run_checks() + results = run_checks(opa_bin_override=args.opa_bin, require_opa=args.require_opa) all_errors: list[str] = [] for name, errors in results.items(): if errors: @@ -210,10 +457,7 @@ def main() -> int: all_errors.extend([f" - {e}" for e in errors]) if args.json: - payload = { - "ok": len(all_errors) == 0, - "results": results, - } + payload = {"ok": len(all_errors) == 0, "results": results} print(json.dumps(payload, indent=2)) return 0 if payload["ok"] else 1