From f0633495cf01aedeb013dec65994668df0d9ceb3 Mon Sep 17 00:00:00 2001 From: Michael D'Angelo Date: Sun, 31 May 2026 00:52:58 -0400 Subject: [PATCH 1/5] fix: avoid pickle meta-path source probing --- packages/modelaudit-picklescan/CHANGELOG.md | 1 + .../src/modelaudit_picklescan/call_graph.py | 24 +---- .../test_call_graph_import_statements.py | 93 ++++++++++++++++++- 3 files changed, 94 insertions(+), 24 deletions(-) diff --git a/packages/modelaudit-picklescan/CHANGELOG.md b/packages/modelaudit-picklescan/CHANGELOG.md index b2cb23983..2dd2059fc 100644 --- a/packages/modelaudit-picklescan/CHANGELOG.md +++ b/packages/modelaudit-picklescan/CHANGELOG.md @@ -53,6 +53,7 @@ and this package adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - detect dynamic `type()` namespaces that can hide callable protocol hooks - fail closed when encoded nested-pickle probe candidates exhaust the analysis cap +- avoid custom meta-path finder calls during pickle call-graph source probing - prevent call-graph alias cycles from hanging scans - detect nested brace-format lookups that reach tracked `defaultdict` factories - avoid `str.format` false positives when a `ChainMap` shadows a `defaultdict` diff --git a/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py b/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py index 24359e53b..bce1efd5d 100644 --- a/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py +++ b/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py @@ -14,7 +14,7 @@ from contextvars import ContextVar from dataclasses import dataclass, field from functools import lru_cache -from importlib.machinery import EXTENSION_SUFFIXES, BuiltinImporter, FrozenImporter, ModuleSpec, PathFinder +from importlib.machinery import EXTENSION_SUFFIXES, ModuleSpec, PathFinder from pathlib import Path from typing import Any, Protocol, TypeVar, cast @@ -1015,12 +1015,8 @@ def _call_graph_source_unavailable_reason(module_name: str) -> str | None: except Exception: return "source_unavailable" if spec is None: - try: - spec = _find_meta_path_module_spec_without_imports(module_name) - except Exception: - return "source_unavailable" - if spec is None: - return None + # Module names come from pickle metadata; do not consult executable custom meta-path finders. + return "source_unavailable" if spec.origin in {"built-in", "frozen"}: return None if spec.origin is not None and any(spec.origin.endswith(suffix) for suffix in EXTENSION_SUFFIXES): @@ -1049,20 +1045,6 @@ def _find_module_spec_without_imports(module_name: str) -> ModuleSpec | None: return spec -def _find_meta_path_module_spec_without_imports(module_name: str) -> ModuleSpec | None: - """Consult non-standard meta path finders without importing parent packages.""" - for finder in sys.meta_path: - if finder is BuiltinImporter or finder is FrozenImporter or finder is PathFinder: - continue - find_spec = getattr(finder, "find_spec", None) - if find_spec is None: - continue - spec = find_spec(module_name, None) - if isinstance(spec, ModuleSpec): - return spec - return None - - @_register_source_sensitive_cache @lru_cache(maxsize=4096) def _find_sink_path(start: str) -> tuple[str, ...] | None: diff --git a/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py b/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py index a80220982..cac06675c 100644 --- a/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py +++ b/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py @@ -2006,16 +2006,102 @@ def raise_lookup_failure(_: str) -> None: assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") -def test_scan_bytes_marks_custom_meta_path_specs_as_unanalyzable( +def test_scan_bytes_analyzes_source_available_modules_without_custom_meta_path_finders( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_dir = tmp_path / "modules" + module_dir.mkdir() + module_name = "modelaudit_tp_source_available_meta_path_probe" + (module_dir / f"{module_name}.py").write_text("def invoke(command):\n return command\n", encoding="utf-8") + marker = tmp_path / "meta_path_called" + + class CustomMetaPathFinder: + @staticmethod + def find_spec( + fullname: str, + path: object | None = None, + target: object | None = None, + ) -> ModuleSpec | None: + del path, target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + + monkeypatch.syspath_prepend(str(module_dir)) + monkeypatch.setattr(sys, "meta_path", [CustomMetaPathFinder(), *sys.meta_path]) + importlib.invalidate_caches() + _clear_call_graph_caches() + + try: + report = scan_bytes( + _global_call_payload(module_name, "invoke", _unicode_operand("echo benign")), + source="source-available-call-graph-source.pkl", + ) + finally: + _clear_call_graph_caches() + + assert report.status == ScanStatus.COMPLETE + assert report.verdict == SafetyVerdict.CLEAN + assert not _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") + assert not marker.exists() + + +def test_scan_bytes_analyzes_stdlib_source_modules_without_custom_meta_path_finders( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "statistics" + marker = tmp_path / "meta_path_called" + + class CustomMetaPathFinder: + @staticmethod + def find_spec( + fullname: str, + path: object | None = None, + target: object | None = None, + ) -> ModuleSpec | None: + del path, target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + + monkeypatch.setattr(sys, "meta_path", [CustomMetaPathFinder(), *sys.meta_path]) + _clear_call_graph_caches() + + try: + report = scan_bytes( + _global_call_payload(module_name, "mean", b"]"), + source="stdlib-source-call-graph-source.pkl", + ) + finally: + _clear_call_graph_caches() + + assert report.status == ScanStatus.COMPLETE + assert report.verdict == SafetyVerdict.CLEAN + assert not _has_call_graph_source_unavailable_notice(report, module_name, "mean", "source_unavailable") + assert not marker.exists() + + +def test_scan_bytes_marks_custom_meta_path_specs_as_unanalyzable_without_invoking_finder( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: module_name = "modelaudit_tp_meta_path_spec_probe" + marker = tmp_path / "meta_path_called" class CustomMetaPathFinder: @staticmethod - def find_spec(fullname: str, path: object | None = None) -> ModuleSpec | None: - del path + def find_spec( + fullname: str, + path: object | None = None, + target: object | None = None, + ) -> ModuleSpec | None: + del path, target if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") return ModuleSpec(fullname, loader=None, origin="custom://module") return None @@ -2033,6 +2119,7 @@ def find_spec(fullname: str, path: object | None = None) -> ModuleSpec | None: assert report.status == ScanStatus.INCONCLUSIVE assert report.verdict == SafetyVerdict.UNKNOWN assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") + assert not marker.exists() def test_scan_bytes_marks_bytecode_only_invoked_call_graph_source_unavailable( From e961b43ab33265b4436fb8270e22e5acb0bd553e Mon Sep 17 00:00:00 2001 From: Michael D'Angelo Date: Sun, 31 May 2026 14:40:56 +0000 Subject: [PATCH 2/5] fix: avoid executable pickle source finders --- .../src/modelaudit_picklescan/call_graph.py | 34 ++++--- .../modelaudit-picklescan/tests/test_api.py | 16 +++- .../test_call_graph_import_statements.py | 91 ++++++++++++++++++- 3 files changed, 123 insertions(+), 18 deletions(-) diff --git a/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py b/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py index bce1efd5d..299c9d1e8 100644 --- a/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py +++ b/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py @@ -14,7 +14,7 @@ from contextvars import ContextVar from dataclasses import dataclass, field from functools import lru_cache -from importlib.machinery import EXTENSION_SUFFIXES, ModuleSpec, PathFinder +from importlib.machinery import EXTENSION_SUFFIXES, FrozenImporter, ModuleSpec from pathlib import Path from typing import Any, Protocol, TypeVar, cast @@ -1029,20 +1029,24 @@ def _find_module_spec_without_imports(module_name: str) -> ModuleSpec | None: if not parts or any(not part or "/" in part or "\\" in part for part in parts): return None - search_path: list[str] | None = None - spec: ModuleSpec | None = None - for index in range(len(parts)): - qualified_name = ".".join(parts[: index + 1]) - spec = PathFinder.find_spec(qualified_name, search_path) - if spec is None: - return None - if index == len(parts) - 1: - return spec - locations = spec.submodule_search_locations - if locations is None: - return None - search_path = list(locations) - return spec + frozen_spec = FrozenImporter.find_spec(module_name) + if frozen_spec is not None: + return frozen_spec + + for entry in sys.path: + current = Path(entry or os.getcwd()) + for index, part in enumerate(parts): + if index == len(parts) - 1: + for suffix in EXTENSION_SUFFIXES: + for candidate in (current / f"{part}{suffix}", current / part / f"__init__{suffix}"): + if candidate.is_file(): + return ModuleSpec(module_name, loader=None, origin=str(candidate)) + break + + current /= part + if not current.is_dir(): + break + return None @_register_source_sensitive_cache diff --git a/packages/modelaudit-picklescan/tests/test_api.py b/packages/modelaudit-picklescan/tests/test_api.py index 4324fb260..7b706e428 100644 --- a/packages/modelaudit-picklescan/tests/test_api.py +++ b/packages/modelaudit-picklescan/tests/test_api.py @@ -25,6 +25,7 @@ import pytest import modelaudit_picklescan.api as package_api +import modelaudit_picklescan.call_graph as call_graph from modelaudit_picklescan import ( Finding, PickleReport, @@ -3085,9 +3086,20 @@ def test_scan_bytes_does_not_flag_dill_dump_as_dangerous() -> None: report = scan_bytes(payload, source="dill-dump.pkl") - assert report.status == ScanStatus.COMPLETE - assert report.verdict == SafetyVerdict.CLEAN assert report.findings == () + source_reason = call_graph._call_graph_source_unavailable_reason("dill") + if source_reason is None: + assert report.status == ScanStatus.COMPLETE + assert report.verdict == SafetyVerdict.CLEAN + else: + assert report.status == ScanStatus.INCONCLUSIVE + assert report.verdict == SafetyVerdict.UNKNOWN + assert any( + notice.code == "call_graph_source_unavailable" + and notice.details.get("import_reference") == "dill.dump" + and notice.details.get("reason") == source_reason + for notice in report.notices + ) def test_scan_bytes_flags_dill_loads_as_dangerous() -> None: diff --git a/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py b/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py index cac06675c..d0f789c1e 100644 --- a/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py +++ b/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py @@ -13,7 +13,7 @@ import threading import zipfile from contextvars import copy_context -from importlib.machinery import ModuleSpec +from importlib.machinery import EXTENSION_SUFFIXES, ModuleSpec from importlib.util import find_spec from pathlib import Path from typing import Any @@ -2085,6 +2085,60 @@ def find_spec( assert not marker.exists() +def test_scan_bytes_keeps_frozen_stdlib_globals_clean_without_custom_meta_path_finders( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "_frozen_importlib" + marker = tmp_path / "meta_path_called" + + class CustomMetaPathFinder: + @staticmethod + def find_spec( + fullname: str, + path: object | None = None, + target: object | None = None, + ) -> ModuleSpec | None: + del path, target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + + monkeypatch.setattr(sys, "meta_path", [CustomMetaPathFinder(), *sys.meta_path]) + _clear_call_graph_caches() + + try: + report = scan_bytes( + pickle.dumps(importlib.machinery.ModuleSpec("x", None), protocol=4), + source="frozen-stdlib-call-graph-source.pkl", + ) + finally: + _clear_call_graph_caches() + + assert report.status == ScanStatus.COMPLETE + assert report.verdict == SafetyVerdict.CLEAN + assert not _has_call_graph_source_unavailable_notice(report, module_name, "ModuleSpec", "source_unavailable") + assert not marker.exists() + + +def test_call_graph_keeps_path_extension_modules_analyzable( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_extension_module_probe" + module_dir = tmp_path / "modules" + module_dir.mkdir() + (module_dir / f"{module_name}{EXTENSION_SUFFIXES[0]}").write_bytes(b"not imported") + monkeypatch.syspath_prepend(str(module_dir)) + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) is None + finally: + _clear_call_graph_caches() + + def test_scan_bytes_marks_custom_meta_path_specs_as_unanalyzable_without_invoking_finder( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, @@ -2122,6 +2176,41 @@ def find_spec( assert not marker.exists() +def test_scan_bytes_marks_custom_path_entry_specs_as_unanalyzable_without_invoking_finder( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_path_entry_spec_probe" + marker = tmp_path / "path_entry_finder_called" + path_entry = str(tmp_path / "custom-path-entry") + + class CustomPathEntryFinder: + @staticmethod + def find_spec(fullname: str, target: object | None = None) -> ModuleSpec | None: + del target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + + monkeypatch.syspath_prepend(path_entry) + monkeypatch.setitem(sys.path_importer_cache, path_entry, CustomPathEntryFinder()) + _clear_call_graph_caches() + + try: + report = scan_bytes( + _global_call_payload(module_name, "invoke", _unicode_operand("echo hidden")), + source="path-entry-spec-call-graph-source.pkl", + ) + finally: + _clear_call_graph_caches() + + assert report.status == ScanStatus.INCONCLUSIVE + assert report.verdict == SafetyVerdict.UNKNOWN + assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") + assert not marker.exists() + + def test_scan_bytes_marks_bytecode_only_invoked_call_graph_source_unavailable( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, From 001730fcb2bb6a9da8283ab8d3dcccc1d82d4d93 Mon Sep 17 00:00:00 2001 From: Michael D'Angelo Date: Thu, 4 Jun 2026 07:49:14 +0000 Subject: [PATCH 3/5] test: use explicit call graph helper import --- packages/modelaudit-picklescan/tests/test_api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/modelaudit-picklescan/tests/test_api.py b/packages/modelaudit-picklescan/tests/test_api.py index 6d0d21956..ee8bfbe0b 100644 --- a/packages/modelaudit-picklescan/tests/test_api.py +++ b/packages/modelaudit-picklescan/tests/test_api.py @@ -25,7 +25,6 @@ import pytest import modelaudit_picklescan.api as package_api -import modelaudit_picklescan.call_graph as call_graph from modelaudit_picklescan import ( CoverageSummary, Finding, @@ -41,6 +40,7 @@ from modelaudit_picklescan.call_graph import ( CallGraphFinding, StartupHookWriteFinding, + _call_graph_source_unavailable_reason, _CallGraphAnalysisLimitError, find_startup_hook_write_call_graphs, ) @@ -3114,7 +3114,7 @@ def test_scan_bytes_does_not_flag_dill_dump_as_dangerous() -> None: report = scan_bytes(payload, source="dill-dump.pkl") assert report.findings == () - source_reason = call_graph._call_graph_source_unavailable_reason("dill") + source_reason = _call_graph_source_unavailable_reason("dill") if source_reason is None: assert report.status == ScanStatus.COMPLETE assert report.verdict == SafetyVerdict.CLEAN From 618bf709e5217959af9ff1a9eb44cb732a9beb92 Mon Sep 17 00:00:00 2001 From: Michael D'Angelo Date: Thu, 4 Jun 2026 22:32:38 +0000 Subject: [PATCH 4/5] fix: honor safe import resolution precedence --- .../src/modelaudit_picklescan/call_graph.py | 168 ++++++++++++++---- .../test_call_graph_import_statements.py | 104 ++++++++++- 2 files changed, 232 insertions(+), 40 deletions(-) diff --git a/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py b/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py index 299c9d1e8..514430258 100644 --- a/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py +++ b/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py @@ -14,9 +14,22 @@ from contextvars import ContextVar from dataclasses import dataclass, field from functools import lru_cache -from importlib.machinery import EXTENSION_SUFFIXES, FrozenImporter, ModuleSpec +from importlib.machinery import ( + BYTECODE_SUFFIXES, + EXTENSION_SUFFIXES, + SOURCE_SUFFIXES, + BuiltinImporter, + ExtensionFileLoader, + FileFinder, + FrozenImporter, + ModuleSpec, + PathFinder, + SourceFileLoader, + SourcelessFileLoader, +) from pathlib import Path from typing import Any, Protocol, TypeVar, cast +from zipimport import zipimporter # Bound per-pass import/callable fan-out for untrusted inputs. The 32-reference # cap has kept call-graph enrichment useful while preventing pathological scan @@ -33,6 +46,9 @@ _MAX_ASSIGNMENT_ALIASES = 128 _MAX_ASSIGNMENT_ALIAS_PASSES = 256 _MAX_FUNCTION_INSTANCE_ALIASES = 32 +_TRUSTED_META_PATH_FINDERS = tuple(sys.meta_path) +_TRUSTED_PATH_HOOKS = tuple(sys.path_hooks) +_TRUSTED_PATH_IMPORTERS = tuple(finder for finder in sys.path_importer_cache.values() if finder is not None) _MAX_CLASS_INSTANCE_ALIASES = 128 _MAX_INHERITED_CLASS_METHODS = 128 _MAX_WILDCARD_IMPORTS = 16 @@ -1007,9 +1023,6 @@ def _call_graph_source_unavailable_reason(module_name: str) -> str | None: return "source_parse_error" return None - if module_name.split(".", maxsplit=1)[0] in sys.builtin_module_names: - return None - try: spec = _find_module_spec_without_imports(module_name) except Exception: @@ -1029,24 +1042,108 @@ def _find_module_spec_without_imports(module_name: str) -> ModuleSpec | None: if not parts or any(not part or "/" in part or "\\" in part for part in parts): return None - frozen_spec = FrozenImporter.find_spec(module_name) - if frozen_spec is not None: - return frozen_spec + loaded_module = sys.modules.get(module_name) + loaded_spec = getattr(loaded_module, "__spec__", None) + if isinstance(loaded_spec, ModuleSpec): + return loaded_spec + + if not _untrusted_meta_path_finder_precedes(BuiltinImporter): + builtin_spec = BuiltinImporter.find_spec(module_name) + if builtin_spec is not None: + return builtin_spec + + if not _untrusted_meta_path_finder_precedes(FrozenImporter): + frozen_spec = FrozenImporter.find_spec(module_name) + if frozen_spec is not None: + return frozen_spec + + if _untrusted_meta_path_finder_precedes(PathFinder) or _has_untrusted_path_hook(): + return None + + return _find_standard_filesystem_spec(module_name) + + +def _find_standard_filesystem_spec(module_name: str) -> ModuleSpec | None: + parts = module_name.split(".") + if not parts or any(not part or "/" in part or "\\" in part for part in parts): + return None + + search_path = [str(Path(entry or os.getcwd())) for entry in sys.path] + spec: ModuleSpec | None = None + for index in range(len(parts)): + qualified_name = ".".join(parts[: index + 1]) + spec = _find_standard_path_spec(qualified_name, search_path) + if spec is None: + return None + if index == len(parts) - 1: + return spec + locations = spec.submodule_search_locations + if locations is None: + return None + search_path = list(locations) + return spec + + +def _untrusted_meta_path_finder_precedes(target: object) -> bool: + for finder in sys.meta_path: + if finder is target: + return False + if finder is BuiltinImporter or finder is FrozenImporter or finder is PathFinder: + continue + if any(finder is trusted_finder for trusted_finder in _TRUSTED_META_PATH_FINDERS): + continue + return True + return True + +def _is_standard_path_hook(hook: object) -> bool: + if hook is zipimporter: + return True + return getattr(hook, "__module__", None) == "_frozen_importlib_external" and getattr( + hook, "__qualname__", "" + ).endswith("FileFinder.path_hook..path_hook_for_FileFinder") + + +def _has_untrusted_path_hook() -> bool: + if any( + not _is_standard_path_hook(hook) and not any(hook is trusted_hook for trusted_hook in _TRUSTED_PATH_HOOKS) + for hook in sys.path_hooks + ): + return True for entry in sys.path: - current = Path(entry or os.getcwd()) - for index, part in enumerate(parts): - if index == len(parts) - 1: - for suffix in EXTENSION_SUFFIXES: - for candidate in (current / f"{part}{suffix}", current / part / f"__init__{suffix}"): - if candidate.is_file(): - return ModuleSpec(module_name, loader=None, origin=str(candidate)) - break + cache_key = entry or os.getcwd() + finder = sys.path_importer_cache.get(cache_key) + if ( + finder is not None + and not isinstance(finder, (FileFinder, zipimporter)) + and not any(finder is trusted_finder for trusted_finder in _TRUSTED_PATH_IMPORTERS) + ): + return True + return False - current /= part - if not current.is_dir(): - break - return None + +def _find_standard_path_spec(module_name: str, search_path: list[str]) -> ModuleSpec | None: + namespace_locations: list[str] = [] + loader_details = ( + (ExtensionFileLoader, EXTENSION_SUFFIXES), + (SourceFileLoader, SOURCE_SUFFIXES), + (SourcelessFileLoader, BYTECODE_SUFFIXES), + ) + for entry in search_path: + finder = FileFinder(entry, *loader_details) + spec = finder.find_spec(module_name) + if spec is None: + continue + if spec.loader is not None: + return spec + if spec.submodule_search_locations is not None: + namespace_locations.extend(spec.submodule_search_locations) + + if not namespace_locations: + return None + namespace_spec = ModuleSpec(module_name, loader=None, is_package=True) + namespace_spec.submodule_search_locations = namespace_locations + return namespace_spec @_register_source_sensitive_cache @@ -3471,22 +3568,23 @@ def _resolve_module_source(module_name: str) -> Path | None: if not parts or any(not part or "/" in part or "\\" in part for part in parts): return None _track_shared_source_candidates(tuple(parts)) - for entry in sys.path: - root = Path(entry or os.getcwd()) - current = root - for index, part in enumerate(parts): - is_last = index == len(parts) - 1 - if is_last: - module_file = current / f"{part}.py" - if module_file.is_file(): - return module_file - package_file = current / part / "__init__.py" - if package_file.is_file(): - return package_file - else: - current = current / part - if not current.is_dir(): - break + loaded_module = sys.modules.get(module_name) + loaded_spec = getattr(loaded_module, "__spec__", None) + if isinstance(loaded_spec, ModuleSpec) and isinstance(loaded_spec.origin, str): + if loaded_spec.origin.endswith(tuple(SOURCE_SUFFIXES)): + loaded_source_path = Path(loaded_spec.origin) + if loaded_source_path.is_file(): + return loaded_source_path + if loaded_spec.origin not in {"built-in", "frozen"}: + return None + elif _untrusted_meta_path_finder_precedes(PathFinder) or _has_untrusted_path_hook(): + return None + + spec = _find_standard_filesystem_spec(module_name) + if spec is not None and isinstance(spec.origin, str) and spec.origin.endswith(tuple(SOURCE_SUFFIXES)): + source_path = Path(spec.origin) + if source_path.is_file(): + return source_path return None diff --git a/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py b/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py index d0f789c1e..88667b83e 100644 --- a/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py +++ b/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py @@ -2006,7 +2006,7 @@ def raise_lookup_failure(_: str) -> None: assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") -def test_scan_bytes_analyzes_source_available_modules_without_custom_meta_path_finders( +def test_scan_bytes_fails_closed_when_custom_meta_path_finder_can_shadow_source( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -2042,9 +2042,9 @@ def find_spec( finally: _clear_call_graph_caches() - assert report.status == ScanStatus.COMPLETE - assert report.verdict == SafetyVerdict.CLEAN - assert not _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") + assert report.status == ScanStatus.INCONCLUSIVE + assert report.verdict == SafetyVerdict.UNKNOWN + assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") assert not marker.exists() @@ -2052,6 +2052,8 @@ def test_scan_bytes_analyzes_stdlib_source_modules_without_custom_meta_path_find tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: + import statistics + module_name = "statistics" marker = tmp_path / "meta_path_called" @@ -2083,6 +2085,7 @@ def find_spec( assert report.verdict == SafetyVerdict.CLEAN assert not _has_call_graph_source_unavailable_notice(report, module_name, "mean", "source_unavailable") assert not marker.exists() + assert statistics.mean([1, 2, 3]) == 2 def test_scan_bytes_keeps_frozen_stdlib_globals_clean_without_custom_meta_path_finders( @@ -2122,6 +2125,43 @@ def find_spec( assert not marker.exists() +def test_call_graph_trusts_only_exact_builtin_module_names() -> None: + assert call_graph._call_graph_source_unavailable_reason("_io") is None + assert call_graph._call_graph_source_unavailable_reason("_io.nonexistent") == "source_unavailable" + + +def test_call_graph_fails_closed_when_custom_meta_path_finder_can_shadow_frozen_module( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "__hello__" + marker = tmp_path / "meta_path_called" + + class CustomMetaPathFinder: + @staticmethod + def find_spec( + fullname: str, + path: object | None = None, + target: object | None = None, + ) -> ModuleSpec | None: + del path, target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + + monkeypatch.delitem(sys.modules, module_name, raising=False) + monkeypatch.setattr(sys, "meta_path", [CustomMetaPathFinder(), *sys.meta_path]) + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) == "source_unavailable" + finally: + _clear_call_graph_caches() + + assert not marker.exists() + + def test_call_graph_keeps_path_extension_modules_analyzable( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, @@ -2183,6 +2223,9 @@ def test_scan_bytes_marks_custom_path_entry_specs_as_unanalyzable_without_invoki module_name = "modelaudit_tp_path_entry_spec_probe" marker = tmp_path / "path_entry_finder_called" path_entry = str(tmp_path / "custom-path-entry") + extension_dir = tmp_path / "extensions" + extension_dir.mkdir() + (extension_dir / f"{module_name}{EXTENSION_SUFFIXES[0]}").write_bytes(b"not imported") class CustomPathEntryFinder: @staticmethod @@ -2193,7 +2236,7 @@ def find_spec(fullname: str, target: object | None = None) -> ModuleSpec | None: return ModuleSpec(fullname, loader=None, origin="custom://module") return None - monkeypatch.syspath_prepend(path_entry) + monkeypatch.setattr(sys, "path", [path_entry, str(extension_dir), *sys.path]) monkeypatch.setitem(sys.path_importer_cache, path_entry, CustomPathEntryFinder()) _clear_call_graph_caches() @@ -2211,6 +2254,57 @@ def find_spec(fullname: str, target: object | None = None) -> ModuleSpec | None: assert not marker.exists() +def test_call_graph_honors_bytecode_precedence_over_later_extension( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_bytecode_shadowing_extension" + bytecode_dir = tmp_path / "bytecode" + extension_dir = tmp_path / "extension" + bytecode_dir.mkdir() + extension_dir.mkdir() + source_path = bytecode_dir / f"{module_name}.py" + source_path.write_text("def invoke(command):\n return command\n", encoding="utf-8") + py_compile.compile(str(source_path), cfile=str(bytecode_dir / f"{module_name}.pyc"), doraise=True) + source_path.unlink() + (extension_dir / f"{module_name}{EXTENSION_SUFFIXES[0]}").write_bytes(b"not imported") + monkeypatch.setattr(sys, "path", [str(bytecode_dir), str(extension_dir), *sys.path]) + importlib.invalidate_caches() + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) == "source_unavailable" + finally: + _clear_call_graph_caches() + + +def test_call_graph_does_not_invoke_custom_path_hook( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_uncached_path_hook_probe" + module_dir = tmp_path / "modules" + module_dir.mkdir() + (module_dir / f"{module_name}.py").write_text("def invoke(command):\n return command\n", encoding="utf-8") + marker = tmp_path / "path_hook_called" + + def custom_path_hook(path: str) -> object: + marker.write_text(path, encoding="utf-8") + raise ImportError + + monkeypatch.syspath_prepend(str(module_dir)) + monkeypatch.setattr(sys, "path_hooks", [custom_path_hook, *sys.path_hooks]) + sys.path_importer_cache.pop(str(module_dir), None) + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) == "source_unavailable" + finally: + _clear_call_graph_caches() + + assert not marker.exists() + + def test_scan_bytes_marks_bytecode_only_invoked_call_graph_source_unavailable( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, From 8c83361d446303919ea2b5e2ec023a8a9c31bbf0 Mon Sep 17 00:00:00 2001 From: Michael D'Angelo Date: Fri, 5 Jun 2026 00:41:06 +0000 Subject: [PATCH 5/5] fix: harden pickle source resolution precedence --- .../src/modelaudit_picklescan/call_graph.py | 68 ++++++--- .../test_call_graph_import_statements.py | 133 ++++++++++++++++++ 2 files changed, 179 insertions(+), 22 deletions(-) diff --git a/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py b/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py index 514430258..076ac9c10 100644 --- a/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py +++ b/packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py @@ -3,6 +3,7 @@ from __future__ import annotations import ast +import fnmatch import hashlib import os import sys @@ -46,9 +47,7 @@ _MAX_ASSIGNMENT_ALIASES = 128 _MAX_ASSIGNMENT_ALIAS_PASSES = 256 _MAX_FUNCTION_INSTANCE_ALIASES = 32 -_TRUSTED_META_PATH_FINDERS = tuple(sys.meta_path) _TRUSTED_PATH_HOOKS = tuple(sys.path_hooks) -_TRUSTED_PATH_IMPORTERS = tuple(finder for finder in sys.path_importer_cache.values() if finder is not None) _MAX_CLASS_INSTANCE_ALIASES = 128 _MAX_INHERITED_CLASS_METHODS = 128 _MAX_WILDCARD_IMPORTS = 16 @@ -1047,17 +1046,17 @@ def _find_module_spec_without_imports(module_name: str) -> ModuleSpec | None: if isinstance(loaded_spec, ModuleSpec): return loaded_spec - if not _untrusted_meta_path_finder_precedes(BuiltinImporter): + if not _untrusted_meta_path_finder_precedes(BuiltinImporter, module_name): builtin_spec = BuiltinImporter.find_spec(module_name) if builtin_spec is not None: return builtin_spec - if not _untrusted_meta_path_finder_precedes(FrozenImporter): + if not _untrusted_meta_path_finder_precedes(FrozenImporter, module_name): frozen_spec = FrozenImporter.find_spec(module_name) if frozen_spec is not None: return frozen_spec - if _untrusted_meta_path_finder_precedes(PathFinder) or _has_untrusted_path_hook(): + if _untrusted_meta_path_finder_precedes(PathFinder, module_name) or _has_untrusted_path_hook(): return None return _find_standard_filesystem_spec(module_name) @@ -1084,40 +1083,58 @@ def _find_standard_filesystem_spec(module_name: str) -> ModuleSpec | None: return spec -def _untrusted_meta_path_finder_precedes(target: object) -> bool: +def _matches_loaded_finder_type(finder: object, module_name: str, type_name: str) -> bool: + module = sys.modules.get(module_name) + finder_type = getattr(module, type_name, None) if module is not None else None + return isinstance(finder_type, type) and type(finder) is finder_type + + +def _known_meta_path_finder_cannot_handle(finder: object, module_name: str) -> bool: + root_name = module_name.split(".", maxsplit=1)[0] + if _matches_loaded_finder_type(finder, "_distutils_hack", "DistutilsMetaFinder"): + return root_name not in {"distutils", "pip", "test"} + + if _matches_loaded_finder_type(finder, "_virtualenv", "_Finder"): + virtualenv_module = sys.modules.get("_virtualenv") + patched_modules = getattr(virtualenv_module, "_DISTUTILS_PATCH", ()) if virtualenv_module is not None else () + return module_name not in patched_modules + + if _matches_loaded_finder_type(finder, "_pytest.assertion.rewrite", "AssertionRewritingHook"): + if module_name == "conftest": + return False + must_rewrite = getattr(finder, "_must_rewrite", ()) + if any(module_name == name or module_name.startswith(f"{name}.") for name in must_rewrite): + return False + patterns = getattr(finder, "fnpats", ()) + module_filename = f"{module_name.rsplit('.', maxsplit=1)[-1]}.py" + return all(not fnmatch.fnmatchcase(module_filename, pattern) for pattern in patterns) + + return False + + +def _untrusted_meta_path_finder_precedes(target: object, module_name: str) -> bool: for finder in sys.meta_path: if finder is target: return False if finder is BuiltinImporter or finder is FrozenImporter or finder is PathFinder: continue - if any(finder is trusted_finder for trusted_finder in _TRUSTED_META_PATH_FINDERS): + if _known_meta_path_finder_cannot_handle(finder, module_name): continue return True return True def _is_standard_path_hook(hook: object) -> bool: - if hook is zipimporter: - return True - return getattr(hook, "__module__", None) == "_frozen_importlib_external" and getattr( - hook, "__qualname__", "" - ).endswith("FileFinder.path_hook..path_hook_for_FileFinder") + return hook is zipimporter or any(hook is trusted_hook for trusted_hook in _TRUSTED_PATH_HOOKS) def _has_untrusted_path_hook() -> bool: - if any( - not _is_standard_path_hook(hook) and not any(hook is trusted_hook for trusted_hook in _TRUSTED_PATH_HOOKS) - for hook in sys.path_hooks - ): + if any(not _is_standard_path_hook(hook) for hook in sys.path_hooks): return True for entry in sys.path: cache_key = entry or os.getcwd() finder = sys.path_importer_cache.get(cache_key) - if ( - finder is not None - and not isinstance(finder, (FileFinder, zipimporter)) - and not any(finder is trusted_finder for trusted_finder in _TRUSTED_PATH_IMPORTERS) - ): + if finder is not None and not isinstance(finder, (FileFinder, zipimporter)): return True return False @@ -1130,6 +1147,13 @@ def _find_standard_path_spec(module_name: str, search_path: list[str]) -> Module (SourcelessFileLoader, BYTECODE_SUFFIXES), ) for entry in search_path: + try: + zip_spec = zipimporter(entry).find_spec(module_name) + except ImportError: + zip_spec = None + if zip_spec is not None: + return zip_spec + finder = FileFinder(entry, *loader_details) spec = finder.find_spec(module_name) if spec is None: @@ -3577,7 +3601,7 @@ def _resolve_module_source(module_name: str) -> Path | None: return loaded_source_path if loaded_spec.origin not in {"built-in", "frozen"}: return None - elif _untrusted_meta_path_finder_precedes(PathFinder) or _has_untrusted_path_hook(): + elif _untrusted_meta_path_finder_precedes(PathFinder, module_name) or _has_untrusted_path_hook(): return None spec = _find_standard_filesystem_spec(module_name) diff --git a/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py b/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py index 88667b83e..851075afa 100644 --- a/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py +++ b/packages/modelaudit-picklescan/tests/test_call_graph_import_statements.py @@ -2048,6 +2048,58 @@ def find_spec( assert not marker.exists() +def test_call_graph_fails_closed_for_meta_path_finder_installed_before_import(tmp_path: Path) -> None: + module_dir = tmp_path / "modules" + module_dir.mkdir() + module_name = "modelaudit_tp_preimport_meta_path_probe" + (module_dir / f"{module_name}.py").write_text( + "def invoke(command):\n return command\n", + encoding="utf-8", + ) + marker = tmp_path / "preimport_meta_path_called" + child_code = """ +import sys +from importlib.machinery import ModuleSpec +from pathlib import Path + +module_dir = Path(sys.argv[1]) +marker = Path(sys.argv[2]) +module_name = sys.argv[3] + +class PreexistingMetaPathFinder: + @staticmethod + def find_spec(fullname, path=None, target=None): + del path, target + if fullname == module_name: + marker.write_text(fullname, encoding="utf-8") + return ModuleSpec(fullname, loader=None, origin="custom://module") + return None + +sys.path.insert(0, str(module_dir)) +sys.meta_path.insert(0, PreexistingMetaPathFinder()) +from modelaudit_picklescan.call_graph import _call_graph_source_unavailable_reason + +reason = _call_graph_source_unavailable_reason(module_name) +if reason != "source_unavailable": + raise SystemExit(f"unexpected source reason: {reason!r}") +if marker.exists(): + raise SystemExit("pre-existing finder was invoked") +""" + + result = subprocess.run( + [sys.executable, "-c", child_code, str(module_dir), str(marker), module_name], + cwd=str(tmp_path), + env=dict(os.environ), + check=False, + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0, result.stderr + assert not marker.exists() + + def test_scan_bytes_analyzes_stdlib_source_modules_without_custom_meta_path_finders( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, @@ -2305,6 +2357,84 @@ def custom_path_hook(path: str) -> object: assert not marker.exists() +def test_call_graph_rejects_path_hook_with_spoofed_standard_metadata( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_spoofed_path_hook_probe" + module_dir = tmp_path / "modules" + module_dir.mkdir() + (module_dir / f"{module_name}.py").write_text("def invoke():\n return None\n", encoding="utf-8") + marker = tmp_path / "spoofed_path_hook_called" + + def spoofed_path_hook(path: str) -> object: + marker.write_text(path, encoding="utf-8") + raise ImportError + + spoofed_path_hook.__module__ = "_frozen_importlib_external" + spoofed_path_hook.__qualname__ = "FileFinder.path_hook..path_hook_for_FileFinder" + monkeypatch.syspath_prepend(str(module_dir)) + monkeypatch.setattr(sys, "path_hooks", [spoofed_path_hook, *sys.path_hooks]) + sys.path_importer_cache.pop(str(module_dir), None) + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) == "source_unavailable" + finally: + _clear_call_graph_caches() + + assert not marker.exists() + + +def test_call_graph_honors_zipimport_precedence_over_later_extension( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_zip_shadowing_extension" + zip_path = tmp_path / "modules.zip" + extension_dir = tmp_path / "extension" + extension_dir.mkdir() + with zipfile.ZipFile(zip_path, "w") as archive: + archive.writestr(f"{module_name}.py", "def invoke(command):\n return command\n") + (extension_dir / f"{module_name}{EXTENSION_SUFFIXES[0]}").write_bytes(b"not imported") + monkeypatch.setattr(sys, "path", [str(zip_path), str(extension_dir), *sys.path]) + importlib.invalidate_caches() + _clear_call_graph_caches() + + try: + report = scan_bytes( + _global_call_payload(module_name, "invoke", _unicode_operand("echo hidden")), + source="zip-shadowing-extension.pkl", + ) + finally: + _clear_call_graph_caches() + + assert report.status == ScanStatus.INCONCLUSIVE + assert report.verdict == SafetyVerdict.UNKNOWN + assert _has_call_graph_source_unavailable_notice(report, module_name, "invoke", "source_unavailable") + + +def test_call_graph_keeps_extension_precedence_over_later_zipimport( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "modelaudit_tp_extension_shadowing_zip" + extension_dir = tmp_path / "extension" + extension_dir.mkdir() + zip_path = tmp_path / "modules.zip" + (extension_dir / f"{module_name}{EXTENSION_SUFFIXES[0]}").write_bytes(b"not imported") + with zipfile.ZipFile(zip_path, "w") as archive: + archive.writestr(f"{module_name}.py", "def invoke(command):\n return command\n") + monkeypatch.setattr(sys, "path", [str(extension_dir), str(zip_path), *sys.path]) + importlib.invalidate_caches() + _clear_call_graph_caches() + + try: + assert call_graph._call_graph_source_unavailable_reason(module_name) is None + finally: + _clear_call_graph_caches() + + def test_scan_bytes_marks_bytecode_only_invoked_call_graph_source_unavailable( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, @@ -2614,6 +2744,7 @@ def test_scan_bytes_analyzes_shadowed_torch_extension_callable_invocation( "import os\n\ndef device(command):\n os.system(command)\n return command\n", encoding="utf-8", ) + monkeypatch.delitem(sys.modules, "torch", raising=False) monkeypatch.syspath_prepend(str(module_dir)) importlib.invalidate_caches() _clear_call_graph_caches() @@ -2658,6 +2789,7 @@ def __call__(self, command): """, encoding="utf-8", ) + monkeypatch.delitem(sys.modules, "torch", raising=False) monkeypatch.syspath_prepend(str(module_dir)) importlib.invalidate_caches() _clear_call_graph_caches() @@ -2745,6 +2877,7 @@ def test_call_graph_analyzes_shadowed_torch_storage_persistent_id_reference( "import os\n\ndef __getattr__(name):\n os.system(name)\n raise AttributeError(name)\n", encoding="utf-8", ) + monkeypatch.delitem(sys.modules, "torch", raising=False) monkeypatch.syspath_prepend(str(module_dir)) importlib.invalidate_caches() _clear_call_graph_caches()