diff --git a/packages/modelaudit-picklescan/CHANGELOG.md b/packages/modelaudit-picklescan/CHANGELOG.md index 085faf6c2..a90929bb2 100644 --- a/packages/modelaudit-picklescan/CHANGELOG.md +++ b/packages/modelaudit-picklescan/CHANGELOG.md @@ -58,6 +58,7 @@ and this package adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - bound native pickle state simulation for tracked dictionaries, memo clones, dotted globals, and recursive mappings - fail closed when encoded nested-pickle probe candidates exhaust the analysis cap +- avoid custom meta-path finder calls during pickle call-graph source probing - prevent call-graph alias cycles from hanging scans - detect nested brace-format lookups that reach tracked `defaultdict` factories - avoid `str.format` false positives when a `ChainMap` shadows a `defaultdict` diff --git a/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py b/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py index 24359e53b..076ac9c10 100644 --- a/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py +++ b/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py @@ -3,6 +3,7 @@ from __future__ import annotations import ast +import fnmatch import hashlib import os import sys @@ -14,9 +15,22 @@ from contextvars import ContextVar from dataclasses import dataclass, field from functools import lru_cache -from importlib.machinery import EXTENSION_SUFFIXES, BuiltinImporter, FrozenImporter, ModuleSpec, PathFinder +from importlib.machinery import ( + BYTECODE_SUFFIXES, + EXTENSION_SUFFIXES, + SOURCE_SUFFIXES, + BuiltinImporter, + ExtensionFileLoader, + FileFinder, + FrozenImporter, + ModuleSpec, + PathFinder, + SourceFileLoader, + SourcelessFileLoader, +) from pathlib import Path from typing import Any, Protocol, TypeVar, cast +from zipimport import zipimporter # Bound per-pass import/callable fan-out for untrusted inputs. The 32-reference # cap has kept call-graph enrichment useful while preventing pathological scan @@ -33,6 +47,7 @@ _MAX_ASSIGNMENT_ALIASES = 128 _MAX_ASSIGNMENT_ALIAS_PASSES = 256 _MAX_FUNCTION_INSTANCE_ALIASES = 32 +_TRUSTED_PATH_HOOKS = tuple(sys.path_hooks) _MAX_CLASS_INSTANCE_ALIASES = 128 _MAX_INHERITED_CLASS_METHODS = 128 _MAX_WILDCARD_IMPORTS = 16 @@ -1007,20 +1022,13 @@ def _call_graph_source_unavailable_reason(module_name: str) -> str | None: return "source_parse_error" return None - if module_name.split(".", maxsplit=1)[0] in sys.builtin_module_names: - return None - try: spec = _find_module_spec_without_imports(module_name) except Exception: return "source_unavailable" if spec is None: - try: - spec = _find_meta_path_module_spec_without_imports(module_name) - except Exception: - return "source_unavailable" - if spec is None: - return None + # Module names come from pickle metadata; do not consult executable custom meta-path finders. + return "source_unavailable" if spec.origin in {"built-in", "frozen"}: return None if spec.origin is not None and any(spec.origin.endswith(suffix) for suffix in EXTENSION_SUFFIXES): @@ -1033,11 +1041,37 @@ def _find_module_spec_without_imports(module_name: str) -> ModuleSpec | None: if not parts or any(not part or "/" in part or "\\" in part for part in parts): return None - search_path: list[str] | None = None + loaded_module = sys.modules.get(module_name) + loaded_spec = getattr(loaded_module, "__spec__", None) + if isinstance(loaded_spec, ModuleSpec): + return loaded_spec + + if not _untrusted_meta_path_finder_precedes(BuiltinImporter, module_name): + builtin_spec = BuiltinImporter.find_spec(module_name) + if builtin_spec is not None: + return builtin_spec + + if not _untrusted_meta_path_finder_precedes(FrozenImporter, module_name): + frozen_spec = FrozenImporter.find_spec(module_name) + if frozen_spec is not None: + return frozen_spec + + if _untrusted_meta_path_finder_precedes(PathFinder, module_name) or _has_untrusted_path_hook(): + return None + + return _find_standard_filesystem_spec(module_name) + + +def _find_standard_filesystem_spec(module_name: str) -> ModuleSpec | None: + parts = module_name.split(".") + if not parts or any(not part or "/" in part or "\\" in part for part in parts): + return None + + search_path = [str(Path(entry or os.getcwd())) for entry in sys.path] spec: ModuleSpec | None = None for index in range(len(parts)): qualified_name = ".".join(parts[: index + 1]) - spec = PathFinder.find_spec(qualified_name, search_path) + spec = _find_standard_path_spec(qualified_name, search_path) if spec is None: return None if index == len(parts) - 1: @@ -1049,18 +1083,91 @@ def _find_module_spec_without_imports(module_name: str) -> ModuleSpec | None: return spec -def _find_meta_path_module_spec_without_imports(module_name: str) -> ModuleSpec | None: - """Consult non-standard meta path finders without importing parent packages.""" +def _matches_loaded_finder_type(finder: object, module_name: str, type_name: str) -> bool: + module = sys.modules.get(module_name) + finder_type = getattr(module, type_name, None) if module is not None else None + return isinstance(finder_type, type) and type(finder) is finder_type + + +def _known_meta_path_finder_cannot_handle(finder: object, module_name: str) -> bool: + root_name = module_name.split(".", maxsplit=1)[0] + if _matches_loaded_finder_type(finder, "_distutils_hack", "DistutilsMetaFinder"): + return root_name not in {"distutils", "pip", "test"} + + if _matches_loaded_finder_type(finder, "_virtualenv", "_Finder"): + virtualenv_module = sys.modules.get("_virtualenv") + patched_modules = getattr(virtualenv_module, "_DISTUTILS_PATCH", ()) if virtualenv_module is not None else () + return module_name not in patched_modules + + if _matches_loaded_finder_type(finder, "_pytest.assertion.rewrite", "AssertionRewritingHook"): + if module_name == "conftest": + return False + must_rewrite = getattr(finder, "_must_rewrite", ()) + if any(module_name == name or module_name.startswith(f"{name}.") for name in must_rewrite): + return False + patterns = getattr(finder, "fnpats", ()) + module_filename = f"{module_name.rsplit('.', maxsplit=1)[-1]}.py" + return all(not fnmatch.fnmatchcase(module_filename, pattern) for pattern in patterns) + + return False + + +def _untrusted_meta_path_finder_precedes(target: object, module_name: str) -> bool: for finder in sys.meta_path: + if finder is target: + return False if finder is BuiltinImporter or finder is FrozenImporter or finder is PathFinder: continue - find_spec = getattr(finder, "find_spec", None) - if find_spec is None: + if _known_meta_path_finder_cannot_handle(finder, module_name): + continue + return True + return True + + +def _is_standard_path_hook(hook: object) -> bool: + return hook is zipimporter or any(hook is trusted_hook for trusted_hook in _TRUSTED_PATH_HOOKS) + + +def _has_untrusted_path_hook() -> bool: + if any(not _is_standard_path_hook(hook) for hook in sys.path_hooks): + return True + for entry in sys.path: + cache_key = entry or os.getcwd() + finder = sys.path_importer_cache.get(cache_key) + if finder is not None and not isinstance(finder, (FileFinder, zipimporter)): + return True + return False + + +def _find_standard_path_spec(module_name: str, search_path: list[str]) -> ModuleSpec | None: + namespace_locations: list[str] = [] + loader_details = ( + (ExtensionFileLoader, EXTENSION_SUFFIXES), + (SourceFileLoader, SOURCE_SUFFIXES), + (SourcelessFileLoader, BYTECODE_SUFFIXES), + ) + for entry in search_path: + try: + zip_spec = zipimporter(entry).find_spec(module_name) + except ImportError: + zip_spec = None + if zip_spec is not None: + return zip_spec + + finder = FileFinder(entry, *loader_details) + spec = finder.find_spec(module_name) + if spec is None: continue - spec = find_spec(module_name, None) - if isinstance(spec, ModuleSpec): + if spec.loader is not None: return spec - return None + if spec.submodule_search_locations is not None: + namespace_locations.extend(spec.submodule_search_locations) + + if not namespace_locations: + return None + namespace_spec = ModuleSpec(module_name, loader=None, is_package=True) + namespace_spec.submodule_search_locations = namespace_locations + return namespace_spec @_register_source_sensitive_cache @@ -3485,22 +3592,23 @@ def _resolve_module_source(module_name: str) -> Path | None: if not parts or any(not part or "/" in part or "\\" in part for part in parts): return None _track_shared_source_candidates(tuple(parts)) - for entry in sys.path: - root = Path(entry or os.getcwd()) - current = root - for index, part in enumerate(parts): - is_last = index == len(parts) - 1 - if is_last: - module_file = current / f"{part}.py" - if module_file.is_file(): - return module_file - package_file = current / part / "__init__.py" - if package_file.is_file(): - return package_file - else: - current = current / part - if not current.is_dir(): - break + loaded_module = sys.modules.get(module_name) + loaded_spec = getattr(loaded_module, "__spec__", None) + if isinstance(loaded_spec, ModuleSpec) and isinstance(loaded_spec.origin, str): + if loaded_spec.origin.endswith(tuple(SOURCE_SUFFIXES)): + loaded_source_path = Path(loaded_spec.origin) + if loaded_source_path.is_file(): + return loaded_source_path + if loaded_spec.origin not in {"built-in", "frozen"}: + return None + elif _untrusted_meta_path_finder_precedes(PathFinder, module_name) or _has_untrusted_path_hook(): + return None + + spec = _find_standard_filesystem_spec(module_name) + if spec is not None and isinstance(spec.origin, str) and spec.origin.endswith(tuple(SOURCE_SUFFIXES)): + source_path = Path(spec.origin) + if source_path.is_file(): + return source_path return None diff --git a/packages/modelaudit-picklescan/tests/test_api.py b/packages/modelaudit-picklescan/tests/test_api.py index 3371fb59f..c193aa95f 100644 --- a/packages/modelaudit-picklescan/tests/test_api.py +++ b/packages/modelaudit-picklescan/tests/test_api.py @@ -40,6 +40,7 @@ from modelaudit_picklescan.call_graph import ( CallGraphFinding, StartupHookWriteFinding, + _call_graph_source_unavailable_reason, _CallGraphAnalysisLimitError, find_startup_hook_write_call_graphs, ) @@ -3317,9 +3318,20 @@ def test_scan_bytes_does_not_flag_dill_dump_as_dangerous() -> None: report = scan_bytes(payload, source="dill-dump.pkl") - assert report.status == ScanStatus.COMPLETE - assert report.verdict == SafetyVerdict.CLEAN assert report.findings == () + source_reason = _call_graph_source_unavailable_reason("dill") + if source_reason is None: + assert report.status == ScanStatus.COMPLETE + assert report.verdict == SafetyVerdict.CLEAN + else: + assert report.status == ScanStatus.INCONCLUSIVE + assert report.verdict == SafetyVerdict.UNKNOWN + assert any( + notice.code == "call_graph_source_unavailable" + and notice.details.get("import_reference") == "dill.dump" + and notice.details.get("reason") == source_reason + for notice in report.notices + ) def test_scan_bytes_flags_dill_loads_as_dangerous() -> None: diff --git a/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py b/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py index a80220982..851075afa 100644 --- a/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py +++ b/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py @@ -13,7 +13,7 @@ import threading import zipfile from contextvars import copy_context -from importlib.machinery import ModuleSpec +from importlib.machinery import EXTENSION_SUFFIXES, ModuleSpec from importlib.util import find_spec from pathlib import Path from typing import Any @@ -2006,16 +2006,248 @@ def raise_lookup_failure(_: str) -> None: assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") -def test_scan_bytes_marks_custom_meta_path_specs_as_unanalyzable( +def test_scan_bytes_fails_closed_when_custom_meta_path_finder_can_shadow_source( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_dir = tmp_path / "modules" + module_dir.mkdir() + module_name = "modelaudit_tp_source_available_meta_path_probe" + (module_dir / f"{module_name}.py").write_text("def invoke(command):\n return command\n", encoding="utf-8") + marker = tmp_path / "meta_path_called" + + class CustomMetaPathFinder: + @staticmethod + def find_spec( + fullname: str, + path: object | None = None, + target: object | None = None, + ) -> ModuleSpec | None: + del path, target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + + monkeypatch.syspath_prepend(str(module_dir)) + monkeypatch.setattr(sys, "meta_path", [CustomMetaPathFinder(), *sys.meta_path]) + importlib.invalidate_caches() + _clear_call_graph_caches() + + try: + report = scan_bytes( + _global_call_payload(module_name, "invoke", _unicode_operand("echo benign")), + source="source-available-call-graph-source.pkl", + ) + finally: + _clear_call_graph_caches() + + assert report.status == ScanStatus.INCONCLUSIVE + assert report.verdict == SafetyVerdict.UNKNOWN + assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") + assert not marker.exists() + + +def test_call_graph_fails_closed_for_meta_path_finder_installed_before_import(tmp_path: Path) -> None: + module_dir = tmp_path / "modules" + module_dir.mkdir() + module_name = "modelaudit_tp_preimport_meta_path_probe" + (module_dir / f"{module_name}.py").write_text( + "def invoke(command):\n return command\n", + encoding="utf-8", + ) + marker = tmp_path / "preimport_meta_path_called" + child_code = """ +import sys +from importlib.machinery import ModuleSpec +from pathlib import Path + +module_dir = Path(sys.argv[1]) +marker = Path(sys.argv[2]) +module_name = sys.argv[3] + +class PreexistingMetaPathFinder: + @staticmethod + def find_spec(fullname, path=None, target=None): + del path, target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + +sys.path.insert(0, str(module_dir)) +sys.meta_path.insert(0, PreexistingMetaPathFinder()) +from modelaudit_picklescan.call_graph import _call_graph_source_unavailable_reason + +reason = _call_graph_source_unavailable_reason(module_name) +if reason != "source_unavailable": + raise SystemExit(f"unexpected source reason: {reason!r}") +if marker.exists(): + raise SystemExit("pre-existing finder was invoked") +""" + + result = subprocess.run( + [sys.executable, "-c", child_code, str(module_dir), str(marker), module_name], + cwd=str(tmp_path), + env=dict(os.environ), + check=False, + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0, result.stderr + assert not marker.exists() + + +def test_scan_bytes_analyzes_stdlib_source_modules_without_custom_meta_path_finders( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + import statistics + + module_name = "statistics" + marker = tmp_path / "meta_path_called" + + class CustomMetaPathFinder: + @staticmethod + def find_spec( + fullname: str, + path: object | None = None, + target: object | None = None, + ) -> ModuleSpec | None: + del path, target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + + monkeypatch.setattr(sys, "meta_path", [CustomMetaPathFinder(), *sys.meta_path]) + _clear_call_graph_caches() + + try: + report = scan_bytes( + _global_call_payload(module_name, "mean", b"]"), + source="stdlib-source-call-graph-source.pkl", + ) + finally: + _clear_call_graph_caches() + + assert report.status == ScanStatus.COMPLETE + assert report.verdict == SafetyVerdict.CLEAN + assert not _has_call_graph_source_unavailable_notice(report, module_name, "mean", "source_unavailable") + assert not marker.exists() + assert statistics.mean([1, 2, 3]) == 2 + + +def test_scan_bytes_keeps_frozen_stdlib_globals_clean_without_custom_meta_path_finders( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "_frozen_importlib" + marker = tmp_path / "meta_path_called" + + class CustomMetaPathFinder: + @staticmethod + def find_spec( + fullname: str, + path: object | None = None, + target: object | None = None, + ) -> ModuleSpec | None: + del path, target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + + monkeypatch.setattr(sys, "meta_path", [CustomMetaPathFinder(), *sys.meta_path]) + _clear_call_graph_caches() + + try: + report = scan_bytes( + pickle.dumps(importlib.machinery.ModuleSpec("x", None), protocol=4), + source="frozen-stdlib-call-graph-source.pkl", + ) + finally: + _clear_call_graph_caches() + + assert report.status == ScanStatus.COMPLETE + assert report.verdict == SafetyVerdict.CLEAN + assert not _has_call_graph_source_unavailable_notice(report, module_name, "ModuleSpec", "source_unavailable") + assert not marker.exists() + + +def test_call_graph_trusts_only_exact_builtin_module_names() -> None: + assert call_graph._call_graph_source_unavailable_reason("_io") is None + assert call_graph._call_graph_source_unavailable_reason("_io.nonexistent") == "source_unavailable" + + +def test_call_graph_fails_closed_when_custom_meta_path_finder_can_shadow_frozen_module( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "__hello__" + marker = tmp_path / "meta_path_called" + + class CustomMetaPathFinder: + @staticmethod + def find_spec( + fullname: str, + path: object | None = None, + target: object | None = None, + ) -> ModuleSpec | None: + del path, target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + + monkeypatch.delitem(sys.modules, module_name, raising=False) + monkeypatch.setattr(sys, "meta_path", [CustomMetaPathFinder(), *sys.meta_path]) + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) == "source_unavailable" + finally: + _clear_call_graph_caches() + + assert not marker.exists() + + +def test_call_graph_keeps_path_extension_modules_analyzable( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_extension_module_probe" + module_dir = tmp_path / "modules" + module_dir.mkdir() + (module_dir / f"{module_name}{EXTENSION_SUFFIXES[0]}").write_bytes(b"not imported") + monkeypatch.syspath_prepend(str(module_dir)) + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) is None + finally: + _clear_call_graph_caches() + + +def test_scan_bytes_marks_custom_meta_path_specs_as_unanalyzable_without_invoking_finder( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: module_name = "modelaudit_tp_meta_path_spec_probe" + marker = tmp_path / "meta_path_called" class CustomMetaPathFinder: @staticmethod - def find_spec(fullname: str, path: object | None = None) -> ModuleSpec | None: - del path + def find_spec( + fullname: str, + path: object | None = None, + target: object | None = None, + ) -> ModuleSpec | None: + del path, target if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") return ModuleSpec(fullname, loader=None, origin="custom://module") return None @@ -2033,6 +2265,174 @@ def find_spec(fullname: str, path: object | None = None) -> ModuleSpec | None: assert report.status == ScanStatus.INCONCLUSIVE assert report.verdict == SafetyVerdict.UNKNOWN assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") + assert not marker.exists() + + +def test_scan_bytes_marks_custom_path_entry_specs_as_unanalyzable_without_invoking_finder( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_path_entry_spec_probe" + marker = tmp_path / "path_entry_finder_called" + path_entry = str(tmp_path / "custom-path-entry") + extension_dir = tmp_path / "extensions" + extension_dir.mkdir() + (extension_dir / f"{module_name}{EXTENSION_SUFFIXES[0]}").write_bytes(b"not imported") + + class CustomPathEntryFinder: + @staticmethod + def find_spec(fullname: str, target: object | None = None) -> ModuleSpec | None: + del target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + + monkeypatch.setattr(sys, "path", [path_entry, str(extension_dir), *sys.path]) + monkeypatch.setitem(sys.path_importer_cache, path_entry, CustomPathEntryFinder()) + _clear_call_graph_caches() + + try: + report = scan_bytes( + _global_call_payload(module_name, "invoke", _unicode_operand("echo hidden")), + source="path-entry-spec-call-graph-source.pkl", + ) + finally: + _clear_call_graph_caches() + + assert report.status == ScanStatus.INCONCLUSIVE + assert report.verdict == SafetyVerdict.UNKNOWN + assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") + assert not marker.exists() + + +def test_call_graph_honors_bytecode_precedence_over_later_extension( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_bytecode_shadowing_extension" + bytecode_dir = tmp_path / "bytecode" + extension_dir = tmp_path / "extension" + bytecode_dir.mkdir() + extension_dir.mkdir() + source_path = bytecode_dir / f"{module_name}.py" + source_path.write_text("def invoke(command):\n return command\n", encoding="utf-8") + py_compile.compile(str(source_path), cfile=str(bytecode_dir / f"{module_name}.pyc"), doraise=True) + source_path.unlink() + (extension_dir / f"{module_name}{EXTENSION_SUFFIXES[0]}").write_bytes(b"not imported") + monkeypatch.setattr(sys, "path", [str(bytecode_dir), str(extension_dir), *sys.path]) + importlib.invalidate_caches() + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) == "source_unavailable" + finally: + _clear_call_graph_caches() + + +def test_call_graph_does_not_invoke_custom_path_hook( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_uncached_path_hook_probe" + module_dir = tmp_path / "modules" + module_dir.mkdir() + (module_dir / f"{module_name}.py").write_text("def invoke(command):\n return command\n", encoding="utf-8") + marker = tmp_path / "path_hook_called" + + def custom_path_hook(path: str) -> object: + marker.write_text(path, encoding="utf-8") + raise ImportError + + monkeypatch.syspath_prepend(str(module_dir)) + monkeypatch.setattr(sys, "path_hooks", [custom_path_hook, *sys.path_hooks]) + sys.path_importer_cache.pop(str(module_dir), None) + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) == "source_unavailable" + finally: + _clear_call_graph_caches() + + assert not marker.exists() + + +def test_call_graph_rejects_path_hook_with_spoofed_standard_metadata( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_spoofed_path_hook_probe" + module_dir = tmp_path / "modules" + module_dir.mkdir() + (module_dir / f"{module_name}.py").write_text("def invoke():\n return None\n", encoding="utf-8") + marker = tmp_path / "spoofed_path_hook_called" + + def spoofed_path_hook(path: str) -> object: + marker.write_text(path, encoding="utf-8") + raise ImportError + + spoofed_path_hook.__module__ = "_frozen_importlib_external" + spoofed_path_hook.__qualname__ = "FileFinder.path_hook..path_hook_for_FileFinder" + monkeypatch.syspath_prepend(str(module_dir)) + monkeypatch.setattr(sys, "path_hooks", [spoofed_path_hook, *sys.path_hooks]) + sys.path_importer_cache.pop(str(module_dir), None) + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) == "source_unavailable" + finally: + _clear_call_graph_caches() + + assert not marker.exists() + + +def test_call_graph_honors_zipimport_precedence_over_later_extension( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_zip_shadowing_extension" + zip_path = tmp_path / "modules.zip" + extension_dir = tmp_path / "extension" + extension_dir.mkdir() + with zipfile.ZipFile(zip_path, "w") as archive: + archive.writestr(f"{module_name}.py", "def invoke(command):\n return command\n") + (extension_dir / f"{module_name}{EXTENSION_SUFFIXES[0]}").write_bytes(b"not imported") + monkeypatch.setattr(sys, "path", [str(zip_path), str(extension_dir), *sys.path]) + importlib.invalidate_caches() + _clear_call_graph_caches() + + try: + report = scan_bytes( + _global_call_payload(module_name, "invoke", _unicode_operand("echo hidden")), + source="zip-shadowing-extension.pkl", + ) + finally: + _clear_call_graph_caches() + + assert report.status == ScanStatus.INCONCLUSIVE + assert report.verdict == SafetyVerdict.UNKNOWN + assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") + + +def test_call_graph_keeps_extension_precedence_over_later_zipimport( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_extension_shadowing_zip" + extension_dir = tmp_path / "extension" + extension_dir.mkdir() + zip_path = tmp_path / "modules.zip" + (extension_dir / f"{module_name}{EXTENSION_SUFFIXES[0]}").write_bytes(b"not imported") + with zipfile.ZipFile(zip_path, "w") as archive: + archive.writestr(f"{module_name}.py", "def invoke(command):\n return command\n") + monkeypatch.setattr(sys, "path", [str(extension_dir), str(zip_path), *sys.path]) + importlib.invalidate_caches() + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) is None + finally: + _clear_call_graph_caches() def test_scan_bytes_marks_bytecode_only_invoked_call_graph_source_unavailable( @@ -2344,6 +2744,7 @@ def test_scan_bytes_analyzes_shadowed_torch_extension_callable_invocation( "import os\n\ndef device(command):\n os.system(command)\n return command\n", encoding="utf-8", ) + monkeypatch.delitem(sys.modules, "torch", raising=False) monkeypatch.syspath_prepend(str(module_dir)) importlib.invalidate_caches() _clear_call_graph_caches() @@ -2388,6 +2789,7 @@ def __call__(self, command): """, encoding="utf-8", ) + monkeypatch.delitem(sys.modules, "torch", raising=False) monkeypatch.syspath_prepend(str(module_dir)) importlib.invalidate_caches() _clear_call_graph_caches() @@ -2475,6 +2877,7 @@ def test_call_graph_analyzes_shadowed_torch_storage_persistent_id_reference( "import os\n\ndef __getattr__(name):\n os.system(name)\n raise AttributeError(name)\n", encoding="utf-8", ) + monkeypatch.delitem(sys.modules, "torch", raising=False) monkeypatch.syspath_prepend(str(module_dir)) importlib.invalidate_caches() _clear_call_graph_caches()