Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions formal_verification/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
"""

from .detector import FormalVerificationConflictDetector
from .hybrid_lean_coq_resolver import HybridLeanCoqResolver
from .lean_prover import LeanProver
from .lean_translator import LeanTranslator
from .proof_protocol import (
PreservationAuditor,
build_claim_ir,
Expand All @@ -25,6 +28,9 @@
"CoqProver",
"FormalSpec",
"FormalVerificationConflictDetector",
"HybridLeanCoqResolver",
"LeanProver",
"LeanTranslator",
"PreservationAudit",
"PreservationAuditor",
"PreservationLabel",
Expand Down
109 changes: 109 additions & 0 deletions formal_verification/hybrid_lean_coq_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Hybrid Lean + Coq conflict resolver.

Tries one prover first, falls back to the other on failure. Exposes
the standard ``prove_specification`` interface so it plugs into the
existing FormalVerificationConflictDetector without changes.
"""

import logging

from .lean_prover import LeanProver
from .lean_translator import LeanTranslator
from .prover import CoqProver
from .types import FormalSpec, ProofResult

logger = logging.getLogger(__name__)


class HybridLeanCoqResolver:
"""Lean-first prover with Coq fallback (or vice versa)."""

def __init__(
self,
prefer_lean: bool = True,
use_fallback: bool = True,
timeout_seconds: int = 30,
):
"""Initialize hybrid resolver.

Args:
prefer_lean: If True, try Lean first; otherwise try Coq first.
use_fallback: If True, try the other prover when the first fails.
timeout_seconds: Per-prover timeout.
"""
self.prefer_lean = prefer_lean
self.use_fallback = use_fallback
self.lean_translator = LeanTranslator()
self.lean_prover = LeanProver(timeout_seconds=timeout_seconds)
self.coq_prover = CoqProver(timeout_seconds=timeout_seconds)

def prove_specification(self, spec: FormalSpec) -> ProofResult:
"""Prove a specification, trying both backends if needed.

Args:
spec: Formal specification (may contain Coq or Lean code).

Returns:
Best ProofResult obtained across backends.
"""
if self.prefer_lean:
return self._lean_then_coq(spec)
return self._coq_then_lean(spec)

def _lean_then_coq(self, spec: FormalSpec) -> ProofResult:
"""Try Lean first, fall back to Coq."""
lean_spec = self._ensure_lean_spec(spec)
if lean_spec is not None:
lean_result = self.lean_prover.prove_specification(lean_spec)
if lean_result.proven:
return lean_result
else:
lean_result = None

if not self.use_fallback:
if lean_result is not None:
return lean_result
return self._untranslatable(spec, "lean")

coq_result = self.coq_prover.prove_specification(spec)
if coq_result.proven:
return coq_result
# Prefer the more informative result from whichever prover ran.
return lean_result if lean_result is not None else coq_result

def _coq_then_lean(self, spec: FormalSpec) -> ProofResult:
"""Try Coq first, fall back to Lean."""
coq_result = self.coq_prover.prove_specification(spec)
if coq_result.proven:
return coq_result

if not self.use_fallback:
return coq_result

lean_spec = self._ensure_lean_spec(spec)
if lean_spec is None:
return coq_result

lean_result = self.lean_prover.prove_specification(lean_spec)
if lean_result.proven:
return lean_result
# Both failed — prefer the initial result.
return coq_result

def _ensure_lean_spec(self, spec: FormalSpec) -> FormalSpec | None:
"""Translate the spec's claim to Lean if needed."""
return self.lean_translator.translate(spec.claim)

@staticmethod
def _untranslatable(spec: FormalSpec, prover: str) -> ProofResult:
"""Return an INCONCLUSIVE result when translation failed."""
return ProofResult(
spec=spec,
proven=False,
proof_time_ms=0,
error_message=f"Could not translate claim for {prover} prover",
counter_example=None,
proof_output="",
prover_name=prover,
solver_status="inconclusive",
)
Comment thread
cursor[bot] marked this conversation as resolved.
183 changes: 183 additions & 0 deletions formal_verification/lean_prover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""Lean 4 theorem prover interface.

Proves Lean 4 theorems by invoking the ``lean`` binary, following the
same FormalSpec -> ProofResult contract as CoqProver.
"""

import logging
import re
import subprocess
import tempfile
import time
from pathlib import Path

from .proof_cache import ProofCache
from .types import FormalSpec, ProofResult, ProofStatus

logger = logging.getLogger(__name__)

# Patterns that indicate an unverified assumption in Lean 4 code.
_ASSUMPTION_PATTERNS: dict[str, str] = {
"sorry": r"\bsorry\b",
"admit": r"\badmit\b",
"axiom": r"\baxiom\b",
}


class LeanProver:
"""Interface to the Lean 4 type-checker for formal verification."""

def __init__(self, timeout_seconds: int = 30, use_cache: bool = True):
"""Initialize Lean prover.

Args:
timeout_seconds: Maximum seconds for a proof attempt.
use_cache: Whether to cache proof results.
"""
self.timeout_seconds = timeout_seconds
self.use_cache = use_cache
self.cache = ProofCache() if use_cache else None
self.lean_available = self._check_binary("lean")

if not self.lean_available:
logger.warning("Lean 4 theorem prover not available")

@staticmethod
def _check_binary(binary: str) -> bool:
"""Check if a binary is installed and callable."""
try:
result = subprocess.run(
[binary, "--version"], capture_output=True, timeout=5
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False

@staticmethod
def _detect_unverified_assumptions(lean_code: str) -> list[str]:
"""Detect assumptions that prevent a sound proof."""
markers: list[str] = []
for label, pattern in _ASSUMPTION_PATTERNS.items():
if re.search(pattern, lean_code):
markers.append(label)
return markers

def prove_specification(self, spec: FormalSpec) -> ProofResult:
"""Attempt to prove a formal specification using Lean 4.

Args:
spec: The formal specification to prove (Lean code in coq_code).

Returns:
ProofResult with proof status and timing information.
"""
lean_code = spec.coq_code

if self.use_cache and self.cache:
cached = self.cache.get(spec)
if cached:
return cached

start_time = time.time()

# Reject code that contains sorry / admit / axiom.
assumptions = self._detect_unverified_assumptions(lean_code)
if assumptions:
return ProofResult(
spec=spec,
proven=False,
proof_time_ms=(time.time() - start_time) * 1000,
error_message=(
"Specification contains unverified assumptions: "
+ ", ".join(assumptions)
),
counter_example=None,
proof_output="",
prover_name="lean",
solver_status=ProofStatus.FORMALIZED_UNPROVED.value,
assumptions_present=True,
)

if not self.lean_available:
return ProofResult(
spec=spec,
proven=False,
proof_time_ms=0,
error_message="Lean 4 theorem prover not available",
counter_example=None,
proof_output="",
prover_name="lean",
solver_status=ProofStatus.UNAVAILABLE.value,
)

try:
result = self._run_lean(lean_code, start_time, spec)
except subprocess.TimeoutExpired:
logger.warning("Lean proof timeout for: %s", spec.spec_text)
result = ProofResult(
spec=spec,
proven=False,
proof_time_ms=self.timeout_seconds * 1000,
error_message="Proof attempt timed out",
counter_example=None,
proof_output="",
prover_name="lean",
solver_status=ProofStatus.TIMEOUT.value,
)

if self.use_cache and self.cache:
self.cache.put(spec, result)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout results incorrectly cached unlike CoqProver

Medium Severity

The cache.put call is placed after the try/except block, which means timeout results are cached (including to disk). In CoqProver, caching happens inside the try block before the except, so timeout results are intentionally not cached. This inconsistency means a transient timeout in LeanProver permanently prevents retries for the same spec, since subsequent calls return the cached TIMEOUT result without ever invoking lean again.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 9b8792a. Configure here.


return result
Comment on lines +121 to +131
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: An unhandled UnicodeDecodeError in _run_lean can cause an UnboundLocalError in prove_specification because the result variable is used before it is assigned.
Severity: MEDIUM

Suggested Fix

Expand the except block in prove_specification to catch other potential exceptions from _run_lean, such as UnicodeDecodeError. When such an exception is caught, assign a sensible default or error state to the result variable before the function attempts to use it, preventing the UnboundLocalError.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: formal_verification/lean_prover.py#L113-L131

Potential issue: In `prove_specification`, the `try...except` block only handles
`subprocess.TimeoutExpired`. If the `_run_lean` function raises a different exception,
such as a `UnicodeDecodeError` from decoding non-UTF-8 output from the Lean binary, the
exception will not be caught. This prevents the `result` variable from being assigned.
The function then proceeds to the `self.cache.put(spec, result)` and `return result`
statements, which will raise an `UnboundLocalError` because `result` was never assigned,
causing a crash.


def _run_lean(
self, lean_code: str, start_time: float, spec: FormalSpec
) -> ProofResult:
"""Invoke ``lean`` on a temporary file and interpret the result."""
with tempfile.TemporaryDirectory() as tmp:
source = Path(tmp) / "proof.lean"
source.write_text(lean_code, encoding="utf-8")

proc = subprocess.run(
["lean", source.name],
capture_output=True,
timeout=self.timeout_seconds,
cwd=tmp,
)

elapsed_ms = (time.time() - start_time) * 1000
stdout = proc.stdout.decode("utf-8") if proc.stdout else ""
stderr = proc.stderr.decode("utf-8") if proc.stderr else ""

if proc.returncode == 0:
logger.debug("Lean proof succeeded for: %s", spec.spec_text)
return ProofResult(
spec=spec,
proven=True,
proof_time_ms=elapsed_ms,
error_message=None,
counter_example=None,
proof_output=stdout,
prover_name="lean",
solver_status=ProofStatus.MACHINE_CHECKED.value,
checker_name="lean",
)

logger.debug(
"Lean proof failed for: %s, error: %s",
spec.spec_text,
stderr[:100],
)
status = ProofStatus.default_for_solver_result(
proven=False, prover_name="lean", counter_example=None,
)
return ProofResult(
spec=spec,
proven=False,
proof_time_ms=elapsed_ms,
error_message=stderr or "Lean type-check failed",
counter_example=None,
proof_output=stdout,
prover_name="lean",
solver_status=status.value,
)
Comment thread
cursor[bot] marked this conversation as resolved.
Loading