Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- scan ONNX external data references in sparse initializers, tensor-valued attributes, and function defaults
- avoid false-positive process-launch findings for parsed framed Python string literals
- detect dangerous Python calls retrieved through module namespace dictionaries in ZIP and TAR members
- fail closed when executable ZIP subtype scanners are unavailable
- reject local, plaintext, and redirecting JFrog credential targets
- detect embedded Python `os.exec*`, `os.spawn*`, `os.posix_spawn*`, and `os.startfile` process-launch calls in archives and JIT-scanned content
- detect embedded Python `asyncio.create_subprocess_*` calls and resolved JIT `subprocess` launch aliases
Expand Down
6 changes: 4 additions & 2 deletions modelaudit/scanners/archive_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,16 @@ def merge_executable_zip_container_findings(
for subtype_id in subtype_ids:
if scanner_selection.allows(subtype_id):
subtype_scanner = _registry.load_scanner_by_id(subtype_id)
if subtype_scanner:
if subtype_scanner is None:
subtype_result = _make_unavailable_recognized_format_result(path, subtype_id, subtype_id)
else:
subtype_result = subtype_scanner(config=subtype_config).scan(path)
raw_offsets = subtype_result.metadata.pop(KNOWN_UNREADABLE_ARCHIVE_ENTRY_OFFSETS_CONFIG_KEY, ())
if isinstance(raw_offsets, (list, tuple, set, frozenset)):
known_unreadable_offsets.update(
offset for offset in raw_offsets if isinstance(offset, int) and not isinstance(offset, bool)
)
_merge_composed_scan_result(result, subtype_result)
_merge_composed_scan_result(result, subtype_result)
else:
add_scanner_selection_skip_check(
result,
Expand Down
131 changes: 131 additions & 0 deletions tests/scanners/test_zip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2996,6 +2996,137 @@ def test_scan_nested_file_fails_closed_when_recognized_header_scanner_is_unavail
assert check.details["preferred_scanner_id"] == "header_only_scanner"


def test_executable_zip_composed_routing_fails_closed_when_subtype_scanner_unavailable(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
archive_path = tmp_path / "skops-polyglot.zip"
with zipfile.ZipFile(archive_path, "w") as archive:
archive.writestr(
"schema.json",
json.dumps(
{
"__loader__": "OperatorFuncNode",
"__module__": "builtins",
"__class__": "eval",
"_skops_version": "0.11.0",
"content": {},
}
),
)
archive.writestr("payload.pkl", b'cos\nsystem\n(S"echo pwned"\ntR.')

original_loader = _registry.load_scanner_by_id

def load_scanner_by_id(scanner_id: str) -> type[BaseScanner] | None:
if scanner_id == "skops":
return None
return original_loader(scanner_id)

monkeypatch.setattr(_registry, "load_scanner_by_id", load_scanner_by_id)

result = ScanResult(scanner_name="zip")
archive_dispatch.merge_executable_zip_container_findings(
str(archive_path),
result,
{"cache_enabled": False},
context="test executable ZIP polyglot",
)

assert result.success is False
assert result.metadata["scan_outcome"] == INCONCLUSIVE_SCAN_OUTCOME
assert result.metadata["operational_error_reason"] == "recognized_format_scanner_unavailable"
assert "recognized_format_scanner_unavailable" in result.metadata["scan_outcome_reasons"]

check = next(check for check in result.checks if check.name == "Format Detection")
assert check.status == CheckStatus.FAILED
assert check.severity == IssueSeverity.INFO
assert check.details["format"] == "skops"
assert check.details["preferred_scanner_id"] == "skops"
assert any(
issue.rule_code == "S201"
and issue.details.get("zip_entry") == "payload.pkl"
and issue.severity == IssueSeverity.CRITICAL
for issue in result.issues
)


def test_executable_zip_unavailable_subtype_fails_closed_and_is_not_cached(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
archive_path = tmp_path / "skops-polyglot.jpg"
with zipfile.ZipFile(archive_path, "w") as archive:
archive.writestr(
"schema.json",
json.dumps(
{
"__loader__": "ObjectNode",
"__module__": "sklearn.pipeline",
"__class__": "Pipeline",
"_skops_version": "0.12.0",
"content": {},
}
),
)
archive_path.write_bytes(b"\x7fELF" + b"\x00" * 60 + archive_path.read_bytes())

original_loader = _registry.load_scanner_by_id

def load_scanner_by_id(scanner_id: str) -> type[BaseScanner] | None:
if scanner_id == "skops":
return None
return original_loader(scanner_id)

monkeypatch.setattr(_registry, "load_scanner_by_id", load_scanner_by_id)

_assert_inconclusive_zip_aggregate_not_cached(
archive_path,
"recognized_format_scanner_unavailable",
tmp_path / "cache",
)


def test_executable_zip_unavailable_subtype_ignores_benign_schema_near_match(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
archive_path = tmp_path / "generic-polyglot.zip"
with zipfile.ZipFile(archive_path, "w") as archive:
archive.writestr(
"schema.json",
json.dumps(
{
"__loader__": "ObjectNode",
"__module__": "sklearn.pipeline",
"__class__": "Pipeline",
"content": {},
}
),
)

original_loader = _registry.load_scanner_by_id

def load_scanner_by_id(scanner_id: str) -> type[BaseScanner] | None:
if scanner_id == "skops":
raise AssertionError("benign schema near-match routed to the Skops scanner")
return original_loader(scanner_id)

monkeypatch.setattr(_registry, "load_scanner_by_id", load_scanner_by_id)

result = ScanResult(scanner_name="zip")
archive_dispatch.merge_executable_zip_container_findings(
str(archive_path),
result,
{"cache_enabled": False},
context="test executable ZIP polyglot",
)

assert result.success is True
assert result.metadata.get("scan_outcome") != INCONCLUSIVE_SCAN_OUTCOME
assert all(check.name != "Format Detection" for check in result.checks)


def test_scan_nested_file_does_not_fail_closed_for_extension_only_member(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
Expand Down
Loading