Skip to content

Commit 889db72

Browse files
fix: fail closed on executable ZIP scanner gaps (#1487)
* fix: fail closed on executable ZIP scanner gaps * test: cover unavailable executable zip subtype caching * test: preserve ZIP findings when subtype unavailable
1 parent 11d8978 commit 889db72

3 files changed

Lines changed: 136 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2424
- scan ONNX external data references in sparse initializers, tensor-valued attributes, and function defaults
2525
- avoid false-positive process-launch findings for parsed framed Python string literals
2626
- detect dangerous Python calls retrieved through module namespace dictionaries in ZIP and TAR members
27+
- fail closed when executable ZIP subtype scanners are unavailable
2728
- reject local, plaintext, and redirecting JFrog credential targets
2829
- detect embedded Python `os.exec*`, `os.spawn*`, `os.posix_spawn*`, and `os.startfile` process-launch calls in archives and JIT-scanned content
2930
- detect embedded Python `asyncio.create_subprocess_*` calls and resolved JIT `subprocess` launch aliases

modelaudit/scanners/archive_dispatch.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,14 +372,16 @@ def merge_executable_zip_container_findings(
372372
for subtype_id in subtype_ids:
373373
if scanner_selection.allows(subtype_id):
374374
subtype_scanner = _registry.load_scanner_by_id(subtype_id)
375-
if subtype_scanner:
375+
if subtype_scanner is None:
376+
subtype_result = _make_unavailable_recognized_format_result(path, subtype_id, subtype_id)
377+
else:
376378
subtype_result = subtype_scanner(config=subtype_config).scan(path)
377379
raw_offsets = subtype_result.metadata.pop(KNOWN_UNREADABLE_ARCHIVE_ENTRY_OFFSETS_CONFIG_KEY, ())
378380
if isinstance(raw_offsets, (list, tuple, set, frozenset)):
379381
known_unreadable_offsets.update(
380382
offset for offset in raw_offsets if isinstance(offset, int) and not isinstance(offset, bool)
381383
)
382-
_merge_composed_scan_result(result, subtype_result)
384+
_merge_composed_scan_result(result, subtype_result)
383385
else:
384386
add_scanner_selection_skip_check(
385387
result,

tests/scanners/test_zip_scanner.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2996,6 +2996,137 @@ def test_scan_nested_file_fails_closed_when_recognized_header_scanner_is_unavail
29962996
assert check.details["preferred_scanner_id"] == "header_only_scanner"
29972997

29982998

2999+
def test_executable_zip_composed_routing_fails_closed_when_subtype_scanner_unavailable(
3000+
tmp_path: Path,
3001+
monkeypatch: pytest.MonkeyPatch,
3002+
) -> None:
3003+
archive_path = tmp_path / "skops-polyglot.zip"
3004+
with zipfile.ZipFile(archive_path, "w") as archive:
3005+
archive.writestr(
3006+
"schema.json",
3007+
json.dumps(
3008+
{
3009+
"__loader__": "OperatorFuncNode",
3010+
"__module__": "builtins",
3011+
"__class__": "eval",
3012+
"_skops_version": "0.11.0",
3013+
"content": {},
3014+
}
3015+
),
3016+
)
3017+
archive.writestr("payload.pkl", b'cos\nsystem\n(S"echo pwned"\ntR.')
3018+
3019+
original_loader = _registry.load_scanner_by_id
3020+
3021+
def load_scanner_by_id(scanner_id: str) -> type[BaseScanner] | None:
3022+
if scanner_id == "skops":
3023+
return None
3024+
return original_loader(scanner_id)
3025+
3026+
monkeypatch.setattr(_registry, "load_scanner_by_id", load_scanner_by_id)
3027+
3028+
result = ScanResult(scanner_name="zip")
3029+
archive_dispatch.merge_executable_zip_container_findings(
3030+
str(archive_path),
3031+
result,
3032+
{"cache_enabled": False},
3033+
context="test executable ZIP polyglot",
3034+
)
3035+
3036+
assert result.success is False
3037+
assert result.metadata["scan_outcome"] == INCONCLUSIVE_SCAN_OUTCOME
3038+
assert result.metadata["operational_error_reason"] == "recognized_format_scanner_unavailable"
3039+
assert "recognized_format_scanner_unavailable" in result.metadata["scan_outcome_reasons"]
3040+
3041+
check = next(check for check in result.checks if check.name == "Format Detection")
3042+
assert check.status == CheckStatus.FAILED
3043+
assert check.severity == IssueSeverity.INFO
3044+
assert check.details["format"] == "skops"
3045+
assert check.details["preferred_scanner_id"] == "skops"
3046+
assert any(
3047+
issue.rule_code == "S201"
3048+
and issue.details.get("zip_entry") == "payload.pkl"
3049+
and issue.severity == IssueSeverity.CRITICAL
3050+
for issue in result.issues
3051+
)
3052+
3053+
3054+
def test_executable_zip_unavailable_subtype_fails_closed_and_is_not_cached(
3055+
tmp_path: Path,
3056+
monkeypatch: pytest.MonkeyPatch,
3057+
) -> None:
3058+
archive_path = tmp_path / "skops-polyglot.jpg"
3059+
with zipfile.ZipFile(archive_path, "w") as archive:
3060+
archive.writestr(
3061+
"schema.json",
3062+
json.dumps(
3063+
{
3064+
"__loader__": "ObjectNode",
3065+
"__module__": "sklearn.pipeline",
3066+
"__class__": "Pipeline",
3067+
"_skops_version": "0.12.0",
3068+
"content": {},
3069+
}
3070+
),
3071+
)
3072+
archive_path.write_bytes(b"\x7fELF" + b"\x00" * 60 + archive_path.read_bytes())
3073+
3074+
original_loader = _registry.load_scanner_by_id
3075+
3076+
def load_scanner_by_id(scanner_id: str) -> type[BaseScanner] | None:
3077+
if scanner_id == "skops":
3078+
return None
3079+
return original_loader(scanner_id)
3080+
3081+
monkeypatch.setattr(_registry, "load_scanner_by_id", load_scanner_by_id)
3082+
3083+
_assert_inconclusive_zip_aggregate_not_cached(
3084+
archive_path,
3085+
"recognized_format_scanner_unavailable",
3086+
tmp_path / "cache",
3087+
)
3088+
3089+
3090+
def test_executable_zip_unavailable_subtype_ignores_benign_schema_near_match(
3091+
tmp_path: Path,
3092+
monkeypatch: pytest.MonkeyPatch,
3093+
) -> None:
3094+
archive_path = tmp_path / "generic-polyglot.zip"
3095+
with zipfile.ZipFile(archive_path, "w") as archive:
3096+
archive.writestr(
3097+
"schema.json",
3098+
json.dumps(
3099+
{
3100+
"__loader__": "ObjectNode",
3101+
"__module__": "sklearn.pipeline",
3102+
"__class__": "Pipeline",
3103+
"content": {},
3104+
}
3105+
),
3106+
)
3107+
3108+
original_loader = _registry.load_scanner_by_id
3109+
3110+
def load_scanner_by_id(scanner_id: str) -> type[BaseScanner] | None:
3111+
if scanner_id == "skops":
3112+
raise AssertionError("benign schema near-match routed to the Skops scanner")
3113+
return original_loader(scanner_id)
3114+
3115+
monkeypatch.setattr(_registry, "load_scanner_by_id", load_scanner_by_id)
3116+
3117+
result = ScanResult(scanner_name="zip")
3118+
archive_dispatch.merge_executable_zip_container_findings(
3119+
str(archive_path),
3120+
result,
3121+
{"cache_enabled": False},
3122+
context="test executable ZIP polyglot",
3123+
)
3124+
3125+
assert result.success is True
3126+
assert result.metadata.get("scan_outcome") != INCONCLUSIVE_SCAN_OUTCOME
3127+
assert all(check.name != "Format Detection" for check in result.checks)
3128+
3129+
29993130
def test_scan_nested_file_does_not_fail_closed_for_extension_only_member(
30003131
tmp_path: Path,
30013132
monkeypatch: pytest.MonkeyPatch,

0 commit comments

Comments
 (0)