From 080407ef58c0f2a77b67abf8697a804562063940 Mon Sep 17 00:00:00 2001 From: Michael D'Angelo Date: Sun, 31 May 2026 01:12:03 -0400 Subject: [PATCH] fix: fail closed on embedded python budget exhaustion --- CHANGELOG.md | 1 + modelaudit/detectors/jit_script.py | 87 ++++++++++++++++++++- modelaudit/scanners/base.py | 9 +++ tests/detectors/test_jit_script_detector.py | 47 +++++++++++ tests/scanners/test_pytorch_zip_scanner.py | 27 +++++++ 5 files changed, 167 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e4541c8a..1f383c73c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - detect embedded Python `asyncio.create_subprocess_*` calls and resolved JIT `subprocess` launch aliases - detect embedded Python `runpy.run_module`, `runpy.run_path`, and `runpy._run_module_as_main` dynamic-module execution calls - preserve embedded Python runpy, webbrowser, and ctypes findings across continued imports, late aliases, and bounded tail-window extraction gaps +- mark embedded Python/JIT byte and snippet budget exhaustion as incomplete coverage instead of clean scans - detect embedded Python `webbrowser` launches and `ctypes` native-library loads in archives and JIT-scanned content - resolve embedded `ctypes` loads through more CDLL-subclass construction forms (`__new__` returning inside `try`/`for`/`while`/`with`, `super()`/`*args` initializer forwarding) and indirect loader/controller bindings (conditional, boolean, walrus, and loop-bound expressions) - honor benign loader/controller member overwrites spelled as `setattr(..., **{})` or starred `setattr(*(...))` diff --git a/modelaudit/detectors/jit_script.py b/modelaudit/detectors/jit_script.py index 04933c15e..e55aa9437 100644 --- a/modelaudit/detectors/jit_script.py +++ b/modelaudit/detectors/jit_script.py @@ -142,8 +142,11 @@ def _compile_dangerous_import_patterns(dangerous_import: str) -> tuple[re.Patter # Bound nested ``:``-header recursion when extracting an embedded statement so a # crafted deeply-indented blob cannot exhaust the interpreter stack. _MAX_BODY_STATEMENT_NESTING = 100 +_EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT = 1_000_000 _EMBEDDED_PYTHON_SCAN_WINDOW_BYTES = 1_000_000 _MAX_EMBEDDED_PYTHON_IMPORT_CONTEXT_BYTES = 16_384 +_EMBEDDED_PYTHON_BYTE_LIMIT_REASON = "jit_embedded_python_byte_limit" +_EMBEDDED_PYTHON_SNIPPET_LIMIT_REASON = "jit_embedded_python_snippet_limit" _EMBEDDED_PYTHON_START_MARKERS = (b"def ", b"async def ", b"class ", b"import ", b"from ") _PRIORITY_EMBEDDED_PYTHON_MODULES = tuple( sorted( @@ -969,14 +972,15 @@ def _compact_candidate_segments(candidate: bytes, segment_ranges: list[tuple[int return b"\n".join(candidate[start:end].rstrip(b"\n") for start, end in segment_ranges) -def _prioritized_embedded_python_snippets( +def _select_prioritized_embedded_python_snippets( candidates: list[_EmbeddedPythonCandidate], bounded: bytes | None = None, -) -> list[_EmbeddedPythonCandidate]: +) -> tuple[list[_EmbeddedPythonCandidate], int]: selected: list[_EmbeddedPythonCandidate] = [] selected_spans: set[tuple[int, int]] = set() priority_offsets = _priority_import_offsets(bounded) if bounded is not None else [] selected_priority_candidates = 0 + omitted_budgeted_candidates = 0 for index, (candidate, span, real_ranges) in enumerate(candidates): has_priority_marker = ( _span_contains_priority_offset(span, priority_offsets) @@ -985,8 +989,10 @@ def _prioritized_embedded_python_snippets( ) if index >= _MAX_DEFAULT_EMBEDDED_PYTHON_SNIPPETS: if not has_priority_marker: + omitted_budgeted_candidates += 1 continue if selected_priority_candidates >= _MAX_PRIORITY_EMBEDDED_PYTHON_SNIPPETS: + omitted_budgeted_candidates += 1 continue if bounded is not None: candidate, span, real_ranges = _bounded_priority_embedded_python_candidate( @@ -1001,6 +1007,14 @@ def _prioritized_embedded_python_snippets( continue selected_spans.add(span) selected.append((candidate, span, real_ranges)) + return selected, omitted_budgeted_candidates + + +def _prioritized_embedded_python_snippets( + candidates: list[_EmbeddedPythonCandidate], + bounded: bytes | None = None, +) -> list[_EmbeddedPythonCandidate]: + selected, _omitted_budgeted_candidates = _select_prioritized_embedded_python_snippets(candidates, bounded) return selected @@ -1366,6 +1380,44 @@ def _bounded_priority_tail_starts(tail_starts: list[int]) -> list[int]: return [*tail_starts[:head_count], *tail_starts[-tail_count:]] +def _embedded_python_analysis_incomplete_finding( + *, + framework: str, + context: str, + reason: str, + message: str, + max_scan_bytes: int | None = None, + omitted_snippets: int | None = None, + candidates_count: int | None = None, +) -> "JITScriptFinding": + details: dict[str, Any] = { + "analysis_incomplete": True, + "reason": reason, + } + if max_scan_bytes is not None: + details["max_scan_bytes"] = max_scan_bytes + if omitted_snippets is not None: + details["omitted_snippets"] = omitted_snippets + if candidates_count is not None: + details["candidate_snippets"] = candidates_count + + return create_jit_finding( + message=message, + severity="INFO", + context=context, + pattern=None, + recommendation="Treat JIT/embedded Python coverage as inconclusive and review the model source.", + confidence=1.0, + details=details, + framework=framework, + code_snippet=None, + type="analysis_incomplete", + operation=None, + builtin=None, + import_=None, + ) + + def _tail_starts_for_priority_alias_uses( tail: bytes, tail_starts: list[int], @@ -1894,8 +1946,35 @@ def _extract_and_check_python_code( if not self.check_ast: return findings - bounded = data if include_full_source else data[:1000000] + bounded = data if include_full_source else data[:_EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT] matches = _candidate_embedded_python_snippets(bounded, include_full_source=include_full_source) + prioritized_matches, omitted_budgeted_candidates = _select_prioritized_embedded_python_snippets( + matches, bounded=bounded + ) + if not include_full_source and len(data) > _EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT: + findings.append( + _embedded_python_analysis_incomplete_finding( + framework=framework, + context=context, + reason=_EMBEDDED_PYTHON_BYTE_LIMIT_REASON, + message=("Embedded Python/JIT analysis incomplete: payload exceeds the bounded byte scan window"), + max_scan_bytes=_EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT, + ) + ) + if omitted_budgeted_candidates: + findings.append( + _embedded_python_analysis_incomplete_finding( + framework=framework, + context=context, + reason=_EMBEDDED_PYTHON_SNIPPET_LIMIT_REASON, + message=("Embedded Python/JIT analysis incomplete: candidate snippet budget was exceeded"), + omitted_snippets=omitted_budgeted_candidates, + candidates_count=len(matches), + ) + ) + if not prioritized_matches and omitted_budgeted_candidates: + return findings + bounded_high_risk_calls: set[tuple[str, str]] | None = None snippet_high_risk_calls: set[tuple[str, str]] = set() parsed_snippet_spans: list[tuple[int, int]] = [] @@ -1907,7 +1986,7 @@ def _extract_and_check_python_code( # raw pattern detection active and fall back to extracted snippets. bounded_high_risk_calls = None - for match, span, real_ranges in _prioritized_embedded_python_snippets(matches, bounded=bounded): + for match, span, real_ranges in prioritized_matches: try: if _is_span_inside_parsed_spans(span, parsed_snippet_spans): continue diff --git a/modelaudit/scanners/base.py b/modelaudit/scanners/base.py index 2164db354..cfb29ea24 100644 --- a/modelaudit/scanners/base.py +++ b/modelaudit/scanners/base.py @@ -21,6 +21,7 @@ Issue, IssueSeverity, ScanResult, + mark_inconclusive_scan_result, ) from ..utils.helpers.interrupt_handler import check_interrupted from .rule_mapper import get_embedded_code_rule_code, get_network_rule_code, get_secret_rule_code @@ -823,6 +824,14 @@ def add_jit_script_findings( recommendation = getattr(finding, "recommendation", "Review JIT/Script code for security") details = finding.__dict__ if hasattr(finding, "__dict__") else {"object": str(finding)} + finding_details = details.get("details") if isinstance(details, dict) else None + if isinstance(finding_details, dict) and finding_details.get("analysis_incomplete"): + reason = finding_details.get("reason") + mark_inconclusive_scan_result( + result, + reason if isinstance(reason, str) and reason else "jit_script_analysis_incomplete", + ) + jit_indicator = f"{details.get('type', '')} {message} {model_type}".strip() jit_rule_code = get_embedded_code_rule_code(jit_indicator) if not jit_rule_code: diff --git a/tests/detectors/test_jit_script_detector.py b/tests/detectors/test_jit_script_detector.py index e5efd5c4c..266144e9a 100644 --- a/tests/detectors/test_jit_script_detector.py +++ b/tests/detectors/test_jit_script_detector.py @@ -246,6 +246,53 @@ def test_scan_model_detects_late_unmarked_module_scope_python_source(self) -> No assert any(f.type == "dangerous_import" and f.import_ == "os" for f in findings) + def test_extract_embedded_python_marks_byte_budget_incomplete(self) -> None: + detector = JITScriptDetector() + padding = b"# pad\n" * ((jit_script_module._EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT // len(b"# pad\n")) + 1) + data = padding + b"import os\nos.system('id')\n" + + findings = detector._extract_and_check_python_code(data, "TorchScript", "late_payload.pt") + + incomplete = [ + finding + for finding in findings + if finding.type == "analysis_incomplete" + and finding.details.get("reason") == jit_script_module._EMBEDDED_PYTHON_BYTE_LIMIT_REASON + ] + assert len(incomplete) == 1 + assert incomplete[0].details["max_scan_bytes"] == jit_script_module._EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT + assert not any(finding.type == "dangerous_import" and finding.import_ == "os" for finding in findings) + + def test_extract_embedded_python_marks_snippet_budget_incomplete(self) -> None: + detector = JITScriptDetector() + data = b"\n".join( + f"import harmless_{index}".encode() + for index in range(jit_script_module._MAX_DEFAULT_EMBEDDED_PYTHON_SNIPPETS + 2) + ) + + findings = detector._extract_and_check_python_code(data, "TorchScript", "many_snippets.pt") + + incomplete = [ + finding + for finding in findings + if finding.type == "analysis_incomplete" + and finding.details.get("reason") == jit_script_module._EMBEDDED_PYTHON_SNIPPET_LIMIT_REASON + ] + assert len(incomplete) == 1 + assert incomplete[0].details["omitted_snippets"] > 0 + assert incomplete[0].details["candidate_snippets"] > jit_script_module._MAX_DEFAULT_EMBEDDED_PYTHON_SNIPPETS + + def test_extract_embedded_python_keeps_benign_within_budgets_clean(self) -> None: + detector = JITScriptDetector() + data = b"\n".join( + f"import harmless_{index}".encode() + for index in range(jit_script_module._MAX_DEFAULT_EMBEDDED_PYTHON_SNIPPETS) + ) + + findings = detector._extract_and_check_python_code(data, "TorchScript", "benign_snippets.pt") + + assert findings == [] + def test_scan_model_detects_unmarked_from_import_source(self) -> None: detector = JITScriptDetector() diff --git a/tests/scanners/test_pytorch_zip_scanner.py b/tests/scanners/test_pytorch_zip_scanner.py index 63d215aaf..349660190 100644 --- a/tests/scanners/test_pytorch_zip_scanner.py +++ b/tests/scanners/test_pytorch_zip_scanner.py @@ -11,6 +11,7 @@ from modelaudit.cache import get_cache_manager, reset_cache_manager from modelaudit.core import determine_exit_code, scan_model_directory_or_file +from modelaudit.detectors import jit_script as jit_script_module from modelaudit.detectors.suspicious_symbols import CVE_COMBINED_PATTERNS from modelaudit.scanner_results import INCONCLUSIVE_SCAN_OUTCOME, Check, ScanResult from modelaudit.scanners.archive_dispatch import NESTED_SCAN_CALLBACK_CONFIG_KEY @@ -1178,6 +1179,32 @@ def test_pytorch_zip_jit_scan_size_limit_marks_inconclusive(tmp_path: Path) -> N assert size_checks[0].details["max_scan_bytes"] == 4 +def test_pytorch_zip_jit_detector_byte_budget_marks_inconclusive(tmp_path: Path) -> None: + model_path = tmp_path / "late_jit_payload.pt" + padding = b"# pad\n" * ((jit_script_module._EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT // len(b"# pad\n")) + 1) + late_payload = padding + b"def payload():\n return 1\n" + with zipfile.ZipFile(model_path, "w") as zip_file: + zip_file.writestr("archive/version", "3\n") + zip_file.writestr("archive/byteorder", "little") + zip_file.writestr("archive/data.pkl", pickle.dumps({"weights": [1, 2, 3]}, protocol=4)) + zip_file.writestr("archive/code/debug/source.py", late_payload) + + result = PyTorchZipScanner().scan(str(model_path)) + aggregate = scan_model_directory_or_file(str(model_path), cache_enabled=False) + + assert result.success is False + assert result.metadata["analysis_incomplete"] is True + assert result.metadata["scan_outcome"] == INCONCLUSIVE_SCAN_OUTCOME + assert jit_script_module._EMBEDDED_PYTHON_BYTE_LIMIT_REASON in result.metadata["scan_outcome_reasons"] + jit_checks = [check for check in result.checks if check.name == "JIT/Script Code Execution Detection"] + assert any( + check.details.get("details", {}).get("reason") == jit_script_module._EMBEDDED_PYTHON_BYTE_LIMIT_REASON + for check in jit_checks + ) + assert getattr(aggregate.file_metadata[str(model_path)], "scan_outcome", None) == INCONCLUSIVE_SCAN_OUTCOME + assert determine_exit_code(aggregate) == 1 + + def test_pytorch_zip_jit_scan_read_failure_marks_inconclusive( tmp_path: Path, monkeypatch: pytest.MonkeyPatch,