Skip to content
1 change: 1 addition & 0 deletions packages/modelaudit-picklescan/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ and this package adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- bound native pickle state simulation for tracked dictionaries, memo clones,
dotted globals, and recursive mappings
- fail closed when encoded nested-pickle probe candidates exhaust the analysis cap
- avoid custom meta-path finder calls during pickle call-graph source probing
- prevent call-graph alias cycles from hanging scans
- detect nested brace-format lookups that reach tracked `defaultdict` factories
- avoid `str.format` false positives when a `ChainMap` shadows a `defaultdict`
Expand Down
178 changes: 143 additions & 35 deletions packages/modelaudit-picklescan/src/modelaudit_picklescan/call_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import ast
import fnmatch
import hashlib
import os
import sys
Expand All @@ -14,9 +15,22 @@
from contextvars import ContextVar
from dataclasses import dataclass, field
from functools import lru_cache
from importlib.machinery import EXTENSION_SUFFIXES, BuiltinImporter, FrozenImporter, ModuleSpec, PathFinder
from importlib.machinery import (
BYTECODE_SUFFIXES,
EXTENSION_SUFFIXES,
SOURCE_SUFFIXES,
BuiltinImporter,
ExtensionFileLoader,
FileFinder,
FrozenImporter,
ModuleSpec,
PathFinder,
SourceFileLoader,
SourcelessFileLoader,
)
from pathlib import Path
from typing import Any, Protocol, TypeVar, cast
from zipimport import zipimporter

# Bound per-pass import/callable fan-out for untrusted inputs. The 32-reference
# cap has kept call-graph enrichment useful while preventing pathological scan
Expand All @@ -33,6 +47,7 @@
_MAX_ASSIGNMENT_ALIASES = 128
_MAX_ASSIGNMENT_ALIAS_PASSES = 256
_MAX_FUNCTION_INSTANCE_ALIASES = 32
_TRUSTED_PATH_HOOKS = tuple(sys.path_hooks)
_MAX_CLASS_INSTANCE_ALIASES = 128
_MAX_INHERITED_CLASS_METHODS = 128
_MAX_WILDCARD_IMPORTS = 16
Expand Down Expand Up @@ -1007,20 +1022,13 @@ def _call_graph_source_unavailable_reason(module_name: str) -> str | None:
return "source_parse_error"
return None

if module_name.split(".", maxsplit=1)[0] in sys.builtin_module_names:
return None

try:
spec = _find_module_spec_without_imports(module_name)
except Exception:
return "source_unavailable"
if spec is None:
try:
spec = _find_meta_path_module_spec_without_imports(module_name)
except Exception:
return "source_unavailable"
if spec is None:
return None
# Module names come from pickle metadata; do not consult executable custom meta-path finders.
return "source_unavailable"
Comment thread
mldangelo-oai marked this conversation as resolved.
if spec.origin in {"built-in", "frozen"}:
return None
if spec.origin is not None and any(spec.origin.endswith(suffix) for suffix in EXTENSION_SUFFIXES):
Expand All @@ -1033,11 +1041,37 @@ def _find_module_spec_without_imports(module_name: str) -> ModuleSpec | None:
if not parts or any(not part or "/" in part or "\\" in part for part in parts):
return None

search_path: list[str] | None = None
loaded_module = sys.modules.get(module_name)
loaded_spec = getattr(loaded_module, "__spec__", None)
if isinstance(loaded_spec, ModuleSpec):
return loaded_spec

if not _untrusted_meta_path_finder_precedes(BuiltinImporter, module_name):
builtin_spec = BuiltinImporter.find_spec(module_name)
if builtin_spec is not None:
return builtin_spec

if not _untrusted_meta_path_finder_precedes(FrozenImporter, module_name):
frozen_spec = FrozenImporter.find_spec(module_name)
if frozen_spec is not None:
return frozen_spec

if _untrusted_meta_path_finder_precedes(PathFinder, module_name) or _has_untrusted_path_hook():
return None

return _find_standard_filesystem_spec(module_name)


def _find_standard_filesystem_spec(module_name: str) -> ModuleSpec | None:
parts = module_name.split(".")
if not parts or any(not part or "/" in part or "\\" in part for part in parts):
return None

search_path = [str(Path(entry or os.getcwd())) for entry in sys.path]
spec: ModuleSpec | None = None
for index in range(len(parts)):
qualified_name = ".".join(parts[: index + 1])
spec = PathFinder.find_spec(qualified_name, search_path)
spec = _find_standard_path_spec(qualified_name, search_path)
if spec is None:
return None
if index == len(parts) - 1:
Expand All @@ -1049,18 +1083,91 @@ def _find_module_spec_without_imports(module_name: str) -> ModuleSpec | None:
return spec


def _find_meta_path_module_spec_without_imports(module_name: str) -> ModuleSpec | None:
"""Consult non-standard meta path finders without importing parent packages."""
def _matches_loaded_finder_type(finder: object, module_name: str, type_name: str) -> bool:
module = sys.modules.get(module_name)
finder_type = getattr(module, type_name, None) if module is not None else None
return isinstance(finder_type, type) and type(finder) is finder_type


def _known_meta_path_finder_cannot_handle(finder: object, module_name: str) -> bool:
root_name = module_name.split(".", maxsplit=1)[0]
if _matches_loaded_finder_type(finder, "_distutils_hack", "DistutilsMetaFinder"):
return root_name not in {"distutils", "pip", "test"}

if _matches_loaded_finder_type(finder, "_virtualenv", "_Finder"):
virtualenv_module = sys.modules.get("_virtualenv")
patched_modules = getattr(virtualenv_module, "_DISTUTILS_PATCH", ()) if virtualenv_module is not None else ()
return module_name not in patched_modules

if _matches_loaded_finder_type(finder, "_pytest.assertion.rewrite", "AssertionRewritingHook"):
if module_name == "conftest":
return False
must_rewrite = getattr(finder, "_must_rewrite", ())
if any(module_name == name or module_name.startswith(f"{name}.") for name in must_rewrite):
return False
patterns = getattr(finder, "fnpats", ())
module_filename = f"{module_name.rsplit('.', maxsplit=1)[-1]}.py"
return all(not fnmatch.fnmatchcase(module_filename, pattern) for pattern in patterns)

return False


def _untrusted_meta_path_finder_precedes(target: object, module_name: str) -> bool:
for finder in sys.meta_path:
if finder is target:
return False
if finder is BuiltinImporter or finder is FrozenImporter or finder is PathFinder:
continue
find_spec = getattr(finder, "find_spec", None)
if find_spec is None:
if _known_meta_path_finder_cannot_handle(finder, module_name):
continue
return True
return True


def _is_standard_path_hook(hook: object) -> bool:
return hook is zipimporter or any(hook is trusted_hook for trusted_hook in _TRUSTED_PATH_HOOKS)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject preexisting custom path hooks

If an application installs a custom sys.path_hooks entry before importing modelaudit_picklescan, this identity check permanently treats that hook as standard. When a later sys.path entry handled by that hook also contains a benign .py file, _has_untrusted_path_hook() returns false and the resolver reads the benign file with a fresh FileFinder, even though Python's import machinery would use the custom hook for the pickle-selected module; this can turn an unanalyzable reference into a clean call-graph result instead of failing closed.

Useful? React with 👍 / 👎.



def _has_untrusted_path_hook() -> bool:
if any(not _is_standard_path_hook(hook) for hook in sys.path_hooks):
return True
for entry in sys.path:
cache_key = entry or os.getcwd()
finder = sys.path_importer_cache.get(cache_key)
if finder is not None and not isinstance(finder, (FileFinder, zipimporter)):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject cached FileFinder subclasses

When sys.path_importer_cache already contains a custom finder that subclasses FileFinder, isinstance(..., FileFinder) treats it as trusted, but real imports will dispatch to that cached finder while this resolver ignores it and constructs a new plain FileFinder for the same path. In a path entry ahead of a benign same-named .py, that lets the scan analyze the benign file and return no source_unavailable notice even though the actual import would be controlled by the cached custom finder.

Useful? React with 👍 / 👎.

return True
return False


def _find_standard_path_spec(module_name: str, search_path: list[str]) -> ModuleSpec | None:
namespace_locations: list[str] = []
loader_details = (
(ExtensionFileLoader, EXTENSION_SUFFIXES),
(SourceFileLoader, SOURCE_SUFFIXES),
(SourcelessFileLoader, BYTECODE_SUFFIXES),
)
for entry in search_path:
try:
zip_spec = zipimporter(entry).find_spec(module_name)
except ImportError:
zip_spec = None
if zip_spec is not None:
return zip_spec

finder = FileFinder(entry, *loader_details)
spec = finder.find_spec(module_name)
Comment thread
mldangelo-oai marked this conversation as resolved.
if spec is None:
continue
spec = find_spec(module_name, None)
if isinstance(spec, ModuleSpec):
if spec.loader is not None:
return spec
return None
if spec.submodule_search_locations is not None:
namespace_locations.extend(spec.submodule_search_locations)

if not namespace_locations:
return None
namespace_spec = ModuleSpec(module_name, loader=None, is_package=True)
namespace_spec.submodule_search_locations = namespace_locations
return namespace_spec


@_register_source_sensitive_cache
Expand Down Expand Up @@ -3485,22 +3592,23 @@ def _resolve_module_source(module_name: str) -> Path | None:
if not parts or any(not part or "/" in part or "\\" in part for part in parts):
return None
_track_shared_source_candidates(tuple(parts))
for entry in sys.path:
root = Path(entry or os.getcwd())
current = root
for index, part in enumerate(parts):
is_last = index == len(parts) - 1
if is_last:
module_file = current / f"{part}.py"
if module_file.is_file():
return module_file
package_file = current / part / "__init__.py"
if package_file.is_file():
return package_file
else:
current = current / part
if not current.is_dir():
break
loaded_module = sys.modules.get(module_name)
loaded_spec = getattr(loaded_module, "__spec__", None)
if isinstance(loaded_spec, ModuleSpec) and isinstance(loaded_spec.origin, str):
if loaded_spec.origin.endswith(tuple(SOURCE_SUFFIXES)):
loaded_source_path = Path(loaded_spec.origin)
if loaded_source_path.is_file():
return loaded_source_path
Comment on lines +3598 to +3601

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Fail closed for preloaded module specs

When the scan process already has a module in sys.modules, this path trusts that module's __spec__.origin as the source to analyze. A plugin or custom loader can preload the pickle-selected module and set origin to a benign .py file; _resolve_module_source() then parses that file and can report the reference clean, while pickle.loads resolves to the already-loaded in-memory module rather than the benign source. Loaded specs from non-standard loaders need the same fail-closed treatment as custom finders.

Useful? React with 👍 / 👎.

if loaded_spec.origin not in {"built-in", "frozen"}:
return None
elif _untrusted_meta_path_finder_precedes(PathFinder, module_name) or _has_untrusted_path_hook():
return None
Comment on lines +3604 to +3605

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Invalidate shared caches when import hooks change

_resolve_module_source() is cached and now depends on sys.meta_path/sys.path_hooks, but the shared-cache stability check still only tracks sys.path and source fingerprints. If a caller uses the public shared_source_sensitive_caches() context across scans, a module resolved cleanly before a custom finder is inserted remains cached as analyzable after the hook change, so the later scan can miss the required source_unavailable fail-closed notice until caches are manually cleared.

Useful? React with 👍 / 👎.


spec = _find_standard_filesystem_spec(module_name)
if spec is not None and isinstance(spec.origin, str) and spec.origin.endswith(tuple(SOURCE_SUFFIXES)):
source_path = Path(spec.origin)
if source_path.is_file():
return source_path
return None


Expand Down
16 changes: 14 additions & 2 deletions packages/modelaudit-picklescan/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from modelaudit_picklescan.call_graph import (
CallGraphFinding,
StartupHookWriteFinding,
_call_graph_source_unavailable_reason,
_CallGraphAnalysisLimitError,
find_startup_hook_write_call_graphs,
)
Expand Down Expand Up @@ -3317,9 +3318,20 @@ def test_scan_bytes_does_not_flag_dill_dump_as_dangerous() -> None:

report = scan_bytes(payload, source="dill-dump.pkl")

assert report.status == ScanStatus.COMPLETE
assert report.verdict == SafetyVerdict.CLEAN
assert report.findings == ()
source_reason = _call_graph_source_unavailable_reason("dill")
if source_reason is None:
assert report.status == ScanStatus.COMPLETE
assert report.verdict == SafetyVerdict.CLEAN
else:
assert report.status == ScanStatus.INCONCLUSIVE
assert report.verdict == SafetyVerdict.UNKNOWN
assert any(
notice.code == "call_graph_source_unavailable"
and notice.details.get("import_reference") == "dill.dump"
and notice.details.get("reason") == source_reason
for notice in report.notices
)


def test_scan_bytes_flags_dill_loads_as_dangerous() -> None:
Expand Down
Loading
Loading