Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- detect embedded Python `asyncio.create_subprocess_*` calls and resolved JIT `subprocess` launch aliases
- detect embedded Python `runpy.run_module`, `runpy.run_path`, and `runpy._run_module_as_main` dynamic-module execution calls
- preserve embedded Python runpy, webbrowser, and ctypes findings across continued imports, late aliases, and bounded tail-window extraction gaps
- mark embedded Python/JIT byte and snippet budget exhaustion as incomplete coverage instead of clean scans
- detect embedded Python `webbrowser` launches and `ctypes` native-library loads in archives and JIT-scanned content
- resolve embedded `ctypes` loads through more CDLL-subclass construction forms (`__new__` returning inside `try`/`for`/`while`/`with`, `super()`/`*args` initializer forwarding) and indirect loader/controller bindings (conditional, boolean, walrus, and loop-bound expressions)
- honor benign loader/controller member overwrites spelled as `setattr(..., **{})` or starred `setattr(*(...))`
Expand Down
87 changes: 83 additions & 4 deletions modelaudit/detectors/jit_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ def _compile_dangerous_import_patterns(dangerous_import: str) -> tuple[re.Patter
# Bound nested ``:``-header recursion when extracting an embedded statement so a
# crafted deeply-indented blob cannot exhaust the interpreter stack.
_MAX_BODY_STATEMENT_NESTING = 100
_EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT = 1_000_000
_EMBEDDED_PYTHON_SCAN_WINDOW_BYTES = 1_000_000
_MAX_EMBEDDED_PYTHON_IMPORT_CONTEXT_BYTES = 16_384
_EMBEDDED_PYTHON_BYTE_LIMIT_REASON = "jit_embedded_python_byte_limit"
_EMBEDDED_PYTHON_SNIPPET_LIMIT_REASON = "jit_embedded_python_snippet_limit"
_EMBEDDED_PYTHON_START_MARKERS = (b"def ", b"async def ", b"class ", b"import ", b"from ")
_PRIORITY_EMBEDDED_PYTHON_MODULES = tuple(
sorted(
Expand Down Expand Up @@ -969,14 +972,15 @@ def _compact_candidate_segments(candidate: bytes, segment_ranges: list[tuple[int
return b"\n".join(candidate[start:end].rstrip(b"\n") for start, end in segment_ranges)


def _prioritized_embedded_python_snippets(
def _select_prioritized_embedded_python_snippets(
candidates: list[_EmbeddedPythonCandidate],
bounded: bytes | None = None,
) -> list[_EmbeddedPythonCandidate]:
) -> tuple[list[_EmbeddedPythonCandidate], int]:
selected: list[_EmbeddedPythonCandidate] = []
selected_spans: set[tuple[int, int]] = set()
priority_offsets = _priority_import_offsets(bounded) if bounded is not None else []
selected_priority_candidates = 0
omitted_budgeted_candidates = 0
for index, (candidate, span, real_ranges) in enumerate(candidates):
has_priority_marker = (
_span_contains_priority_offset(span, priority_offsets)
Expand All @@ -985,8 +989,10 @@ def _prioritized_embedded_python_snippets(
)
if index >= _MAX_DEFAULT_EMBEDDED_PYTHON_SNIPPETS:
if not has_priority_marker:
omitted_budgeted_candidates += 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Count only uncovered snippets as omitted

When a normal source-like member has more than 10 def/class/import starts, the first selected candidate often spans from the first marker through the rest of the file, so the later candidates are already covered by a parsed selected span. Incrementing the omitted counter here still emits an analysis_incomplete finding, which makes fully covered files inconclusive and can turn an otherwise clean scan into a failed/exit-1 result. Please only count candidates whose spans are not covered by an already selected span.

Useful? React with 👍 / 👎.

continue
if selected_priority_candidates >= _MAX_PRIORITY_EMBEDDED_PYTHON_SNIPPETS:
omitted_budgeted_candidates += 1
continue
if bounded is not None:
candidate, span, real_ranges = _bounded_priority_embedded_python_candidate(
Expand All @@ -1001,6 +1007,14 @@ def _prioritized_embedded_python_snippets(
continue
selected_spans.add(span)
selected.append((candidate, span, real_ranges))
return selected, omitted_budgeted_candidates


def _prioritized_embedded_python_snippets(
candidates: list[_EmbeddedPythonCandidate],
bounded: bytes | None = None,
) -> list[_EmbeddedPythonCandidate]:
selected, _omitted_budgeted_candidates = _select_prioritized_embedded_python_snippets(candidates, bounded)
return selected


Expand Down Expand Up @@ -1366,6 +1380,44 @@ def _bounded_priority_tail_starts(tail_starts: list[int]) -> list[int]:
return [*tail_starts[:head_count], *tail_starts[-tail_count:]]


def _embedded_python_analysis_incomplete_finding(
*,
framework: str,
context: str,
reason: str,
message: str,
max_scan_bytes: int | None = None,
omitted_snippets: int | None = None,
candidates_count: int | None = None,
) -> "JITScriptFinding":
details: dict[str, Any] = {
"analysis_incomplete": True,
"reason": reason,
}
if max_scan_bytes is not None:
details["max_scan_bytes"] = max_scan_bytes
if omitted_snippets is not None:
details["omitted_snippets"] = omitted_snippets
if candidates_count is not None:
details["candidate_snippets"] = candidates_count

return create_jit_finding(
message=message,
severity="INFO",
context=context,
pattern=None,
recommendation="Treat JIT/embedded Python coverage as inconclusive and review the model source.",
confidence=1.0,
details=details,
framework=framework,
code_snippet=None,
type="analysis_incomplete",
operation=None,
builtin=None,
import_=None,
)


def _tail_starts_for_priority_alias_uses(
tail: bytes,
tail_starts: list[int],
Expand Down Expand Up @@ -1894,8 +1946,35 @@ def _extract_and_check_python_code(
if not self.check_ast:
return findings

bounded = data if include_full_source else data[:1000000]
bounded = data if include_full_source else data[:_EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT]
matches = _candidate_embedded_python_snippets(bounded, include_full_source=include_full_source)
prioritized_matches, omitted_budgeted_candidates = _select_prioritized_embedded_python_snippets(
matches, bounded=bounded
)
if not include_full_source and len(data) > _EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT:
findings.append(
_embedded_python_analysis_incomplete_finding(
framework=framework,
context=context,
reason=_EMBEDDED_PYTHON_BYTE_LIMIT_REASON,
message=("Embedded Python/JIT analysis incomplete: payload exceeds the bounded byte scan window"),
max_scan_bytes=_EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT,
)
)
if omitted_budgeted_candidates:
findings.append(
_embedded_python_analysis_incomplete_finding(
framework=framework,
context=context,
reason=_EMBEDDED_PYTHON_SNIPPET_LIMIT_REASON,
message=("Embedded Python/JIT analysis incomplete: candidate snippet budget was exceeded"),
omitted_snippets=omitted_budgeted_candidates,
candidates_count=len(matches),
)
)
if not prioritized_matches and omitted_budgeted_candidates:
return findings

bounded_high_risk_calls: set[tuple[str, str]] | None = None
snippet_high_risk_calls: set[tuple[str, str]] = set()
parsed_snippet_spans: list[tuple[int, int]] = []
Expand All @@ -1907,7 +1986,7 @@ def _extract_and_check_python_code(
# raw pattern detection active and fall back to extracted snippets.
bounded_high_risk_calls = None

for match, span, real_ranges in _prioritized_embedded_python_snippets(matches, bounded=bounded):
for match, span, real_ranges in prioritized_matches:
try:
if _is_span_inside_parsed_spans(span, parsed_snippet_spans):
continue
Expand Down
9 changes: 9 additions & 0 deletions modelaudit/scanners/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Issue,
IssueSeverity,
ScanResult,
mark_inconclusive_scan_result,
)
from ..utils.helpers.interrupt_handler import check_interrupted
from .rule_mapper import get_embedded_code_rule_code, get_network_rule_code, get_secret_rule_code
Expand Down Expand Up @@ -823,6 +824,14 @@ def add_jit_script_findings(
recommendation = getattr(finding, "recommendation", "Review JIT/Script code for security")
details = finding.__dict__ if hasattr(finding, "__dict__") else {"object": str(finding)}

finding_details = details.get("details") if isinstance(details, dict) else None
if isinstance(finding_details, dict) and finding_details.get("analysis_incomplete"):
reason = finding_details.get("reason")
mark_inconclusive_scan_result(
result,
reason if isinstance(reason, str) and reason else "jit_script_analysis_incomplete",
)

jit_indicator = f"{details.get('type', '')} {message} {model_type}".strip()
jit_rule_code = get_embedded_code_rule_code(jit_indicator)
if not jit_rule_code:
Expand Down
47 changes: 47 additions & 0 deletions tests/detectors/test_jit_script_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,53 @@ def test_scan_model_detects_late_unmarked_module_scope_python_source(self) -> No

assert any(f.type == "dangerous_import" and f.import_ == "os" for f in findings)

def test_extract_embedded_python_marks_byte_budget_incomplete(self) -> None:
detector = JITScriptDetector()
padding = b"# pad\n" * ((jit_script_module._EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT // len(b"# pad\n")) + 1)
data = padding + b"import os\nos.system('id')\n"

findings = detector._extract_and_check_python_code(data, "TorchScript", "late_payload.pt")

incomplete = [
finding
for finding in findings
if finding.type == "analysis_incomplete"
and finding.details.get("reason") == jit_script_module._EMBEDDED_PYTHON_BYTE_LIMIT_REASON
]
assert len(incomplete) == 1
assert incomplete[0].details["max_scan_bytes"] == jit_script_module._EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT
assert not any(finding.type == "dangerous_import" and finding.import_ == "os" for finding in findings)

def test_extract_embedded_python_marks_snippet_budget_incomplete(self) -> None:
detector = JITScriptDetector()
data = b"\n".join(
f"import harmless_{index}".encode()
for index in range(jit_script_module._MAX_DEFAULT_EMBEDDED_PYTHON_SNIPPETS + 2)
)

findings = detector._extract_and_check_python_code(data, "TorchScript", "many_snippets.pt")

incomplete = [
finding
for finding in findings
if finding.type == "analysis_incomplete"
and finding.details.get("reason") == jit_script_module._EMBEDDED_PYTHON_SNIPPET_LIMIT_REASON
]
assert len(incomplete) == 1
assert incomplete[0].details["omitted_snippets"] > 0
assert incomplete[0].details["candidate_snippets"] > jit_script_module._MAX_DEFAULT_EMBEDDED_PYTHON_SNIPPETS

def test_extract_embedded_python_keeps_benign_within_budgets_clean(self) -> None:
detector = JITScriptDetector()
data = b"\n".join(
f"import harmless_{index}".encode()
for index in range(jit_script_module._MAX_DEFAULT_EMBEDDED_PYTHON_SNIPPETS)
)

findings = detector._extract_and_check_python_code(data, "TorchScript", "benign_snippets.pt")

assert findings == []

def test_scan_model_detects_unmarked_from_import_source(self) -> None:
detector = JITScriptDetector()

Expand Down
27 changes: 27 additions & 0 deletions tests/scanners/test_pytorch_zip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from modelaudit.cache import get_cache_manager, reset_cache_manager
from modelaudit.core import determine_exit_code, scan_model_directory_or_file
from modelaudit.detectors import jit_script as jit_script_module
from modelaudit.detectors.suspicious_symbols import CVE_COMBINED_PATTERNS
from modelaudit.scanner_results import INCONCLUSIVE_SCAN_OUTCOME, Check, ScanResult
from modelaudit.scanners.archive_dispatch import NESTED_SCAN_CALLBACK_CONFIG_KEY
Expand Down Expand Up @@ -1178,6 +1179,32 @@ def test_pytorch_zip_jit_scan_size_limit_marks_inconclusive(tmp_path: Path) -> N
assert size_checks[0].details["max_scan_bytes"] == 4


def test_pytorch_zip_jit_detector_byte_budget_marks_inconclusive(tmp_path: Path) -> None:
model_path = tmp_path / "late_jit_payload.pt"
padding = b"# pad\n" * ((jit_script_module._EMBEDDED_PYTHON_EXTRACT_BYTE_LIMIT // len(b"# pad\n")) + 1)
late_payload = padding + b"def payload():\n return 1\n"
with zipfile.ZipFile(model_path, "w") as zip_file:
zip_file.writestr("archive/version", "3\n")
zip_file.writestr("archive/byteorder", "little")
zip_file.writestr("archive/data.pkl", pickle.dumps({"weights": [1, 2, 3]}, protocol=4))
zip_file.writestr("archive/code/debug/source.py", late_payload)

result = PyTorchZipScanner().scan(str(model_path))
aggregate = scan_model_directory_or_file(str(model_path), cache_enabled=False)

assert result.success is False
assert result.metadata["analysis_incomplete"] is True
assert result.metadata["scan_outcome"] == INCONCLUSIVE_SCAN_OUTCOME
assert jit_script_module._EMBEDDED_PYTHON_BYTE_LIMIT_REASON in result.metadata["scan_outcome_reasons"]
jit_checks = [check for check in result.checks if check.name == "JIT/Script Code Execution Detection"]
assert any(
check.details.get("details", {}).get("reason") == jit_script_module._EMBEDDED_PYTHON_BYTE_LIMIT_REASON
for check in jit_checks
)
assert getattr(aggregate.file_metadata[str(model_path)], "scan_outcome", None) == INCONCLUSIVE_SCAN_OUTCOME
assert determine_exit_code(aggregate) == 1


def test_pytorch_zip_jit_scan_read_failure_marks_inconclusive(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
Expand Down
Loading