-
Notifications
You must be signed in to change notification settings - Fork 14
fix: avoid pickle meta-path source probing #1493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f063349
e961b43
c192144
001730f
7af5d8c
618bf70
2b799db
8c83361
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| from __future__ import annotations | ||
|
|
||
| import ast | ||
| import fnmatch | ||
| import hashlib | ||
| import os | ||
| import sys | ||
|
|
@@ -14,9 +15,22 @@ | |
| from contextvars import ContextVar | ||
| from dataclasses import dataclass, field | ||
| from functools import lru_cache | ||
| from importlib.machinery import EXTENSION_SUFFIXES, BuiltinImporter, FrozenImporter, ModuleSpec, PathFinder | ||
| from importlib.machinery import ( | ||
| BYTECODE_SUFFIXES, | ||
| EXTENSION_SUFFIXES, | ||
| SOURCE_SUFFIXES, | ||
| BuiltinImporter, | ||
| ExtensionFileLoader, | ||
| FileFinder, | ||
| FrozenImporter, | ||
| ModuleSpec, | ||
| PathFinder, | ||
| SourceFileLoader, | ||
| SourcelessFileLoader, | ||
| ) | ||
| from pathlib import Path | ||
| from typing import Any, Protocol, TypeVar, cast | ||
| from zipimport import zipimporter | ||
|
|
||
| # Bound per-pass import/callable fan-out for untrusted inputs. The 32-reference | ||
| # cap has kept call-graph enrichment useful while preventing pathological scan | ||
|
|
@@ -33,6 +47,7 @@ | |
| _MAX_ASSIGNMENT_ALIASES = 128 | ||
| _MAX_ASSIGNMENT_ALIAS_PASSES = 256 | ||
| _MAX_FUNCTION_INSTANCE_ALIASES = 32 | ||
| _TRUSTED_PATH_HOOKS = tuple(sys.path_hooks) | ||
| _MAX_CLASS_INSTANCE_ALIASES = 128 | ||
| _MAX_INHERITED_CLASS_METHODS = 128 | ||
| _MAX_WILDCARD_IMPORTS = 16 | ||
|
|
@@ -1007,20 +1022,13 @@ def _call_graph_source_unavailable_reason(module_name: str) -> str | None: | |
| return "source_parse_error" | ||
| return None | ||
|
|
||
| if module_name.split(".", maxsplit=1)[0] in sys.builtin_module_names: | ||
| return None | ||
|
|
||
| try: | ||
| spec = _find_module_spec_without_imports(module_name) | ||
| except Exception: | ||
| return "source_unavailable" | ||
| if spec is None: | ||
| try: | ||
| spec = _find_meta_path_module_spec_without_imports(module_name) | ||
| except Exception: | ||
| return "source_unavailable" | ||
| if spec is None: | ||
| return None | ||
| # Module names come from pickle metadata; do not consult executable custom meta-path finders. | ||
| return "source_unavailable" | ||
| if spec.origin in {"built-in", "frozen"}: | ||
| return None | ||
| if spec.origin is not None and any(spec.origin.endswith(suffix) for suffix in EXTENSION_SUFFIXES): | ||
|
|
@@ -1033,11 +1041,37 @@ def _find_module_spec_without_imports(module_name: str) -> ModuleSpec | None: | |
| if not parts or any(not part or "/" in part or "\\" in part for part in parts): | ||
| return None | ||
|
|
||
| search_path: list[str] | None = None | ||
| loaded_module = sys.modules.get(module_name) | ||
| loaded_spec = getattr(loaded_module, "__spec__", None) | ||
| if isinstance(loaded_spec, ModuleSpec): | ||
| return loaded_spec | ||
|
|
||
| if not _untrusted_meta_path_finder_precedes(BuiltinImporter, module_name): | ||
| builtin_spec = BuiltinImporter.find_spec(module_name) | ||
| if builtin_spec is not None: | ||
| return builtin_spec | ||
|
|
||
| if not _untrusted_meta_path_finder_precedes(FrozenImporter, module_name): | ||
| frozen_spec = FrozenImporter.find_spec(module_name) | ||
| if frozen_spec is not None: | ||
| return frozen_spec | ||
|
|
||
| if _untrusted_meta_path_finder_precedes(PathFinder, module_name) or _has_untrusted_path_hook(): | ||
| return None | ||
|
|
||
| return _find_standard_filesystem_spec(module_name) | ||
|
|
||
|
|
||
| def _find_standard_filesystem_spec(module_name: str) -> ModuleSpec | None: | ||
| parts = module_name.split(".") | ||
| if not parts or any(not part or "/" in part or "\\" in part for part in parts): | ||
| return None | ||
|
|
||
| search_path = [str(Path(entry or os.getcwd())) for entry in sys.path] | ||
| spec: ModuleSpec | None = None | ||
| for index in range(len(parts)): | ||
| qualified_name = ".".join(parts[: index + 1]) | ||
| spec = PathFinder.find_spec(qualified_name, search_path) | ||
| spec = _find_standard_path_spec(qualified_name, search_path) | ||
| if spec is None: | ||
| return None | ||
| if index == len(parts) - 1: | ||
|
|
@@ -1049,18 +1083,91 @@ def _find_module_spec_without_imports(module_name: str) -> ModuleSpec | None: | |
| return spec | ||
|
|
||
|
|
||
| def _find_meta_path_module_spec_without_imports(module_name: str) -> ModuleSpec | None: | ||
| """Consult non-standard meta path finders without importing parent packages.""" | ||
| def _matches_loaded_finder_type(finder: object, module_name: str, type_name: str) -> bool: | ||
| module = sys.modules.get(module_name) | ||
| finder_type = getattr(module, type_name, None) if module is not None else None | ||
| return isinstance(finder_type, type) and type(finder) is finder_type | ||
|
|
||
|
|
||
| def _known_meta_path_finder_cannot_handle(finder: object, module_name: str) -> bool: | ||
| root_name = module_name.split(".", maxsplit=1)[0] | ||
| if _matches_loaded_finder_type(finder, "_distutils_hack", "DistutilsMetaFinder"): | ||
| return root_name not in {"distutils", "pip", "test"} | ||
|
|
||
| if _matches_loaded_finder_type(finder, "_virtualenv", "_Finder"): | ||
| virtualenv_module = sys.modules.get("_virtualenv") | ||
| patched_modules = getattr(virtualenv_module, "_DISTUTILS_PATCH", ()) if virtualenv_module is not None else () | ||
| return module_name not in patched_modules | ||
|
|
||
| if _matches_loaded_finder_type(finder, "_pytest.assertion.rewrite", "AssertionRewritingHook"): | ||
| if module_name == "conftest": | ||
| return False | ||
| must_rewrite = getattr(finder, "_must_rewrite", ()) | ||
| if any(module_name == name or module_name.startswith(f"{name}.") for name in must_rewrite): | ||
| return False | ||
| patterns = getattr(finder, "fnpats", ()) | ||
| module_filename = f"{module_name.rsplit('.', maxsplit=1)[-1]}.py" | ||
| return all(not fnmatch.fnmatchcase(module_filename, pattern) for pattern in patterns) | ||
|
|
||
| return False | ||
|
|
||
|
|
||
| def _untrusted_meta_path_finder_precedes(target: object, module_name: str) -> bool: | ||
| for finder in sys.meta_path: | ||
| if finder is target: | ||
| return False | ||
| if finder is BuiltinImporter or finder is FrozenImporter or finder is PathFinder: | ||
| continue | ||
| find_spec = getattr(finder, "find_spec", None) | ||
| if find_spec is None: | ||
| if _known_meta_path_finder_cannot_handle(finder, module_name): | ||
| continue | ||
| return True | ||
| return True | ||
|
|
||
|
|
||
| def _is_standard_path_hook(hook: object) -> bool: | ||
| return hook is zipimporter or any(hook is trusted_hook for trusted_hook in _TRUSTED_PATH_HOOKS) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If an application installs a custom Useful? React with 👍 / 👎. |
||
|
|
||
|
|
||
| def _has_untrusted_path_hook() -> bool: | ||
| if any(not _is_standard_path_hook(hook) for hook in sys.path_hooks): | ||
| return True | ||
| for entry in sys.path: | ||
| cache_key = entry or os.getcwd() | ||
| finder = sys.path_importer_cache.get(cache_key) | ||
| if finder is not None and not isinstance(finder, (FileFinder, zipimporter)): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎. |
||
| return True | ||
| return False | ||
|
|
||
|
|
||
| def _find_standard_path_spec(module_name: str, search_path: list[str]) -> ModuleSpec | None: | ||
| namespace_locations: list[str] = [] | ||
| loader_details = ( | ||
| (ExtensionFileLoader, EXTENSION_SUFFIXES), | ||
| (SourceFileLoader, SOURCE_SUFFIXES), | ||
| (SourcelessFileLoader, BYTECODE_SUFFIXES), | ||
| ) | ||
| for entry in search_path: | ||
| try: | ||
| zip_spec = zipimporter(entry).find_spec(module_name) | ||
| except ImportError: | ||
| zip_spec = None | ||
| if zip_spec is not None: | ||
| return zip_spec | ||
|
|
||
| finder = FileFinder(entry, *loader_details) | ||
| spec = finder.find_spec(module_name) | ||
|
mldangelo-oai marked this conversation as resolved.
|
||
| if spec is None: | ||
| continue | ||
| spec = find_spec(module_name, None) | ||
| if isinstance(spec, ModuleSpec): | ||
| if spec.loader is not None: | ||
| return spec | ||
| return None | ||
| if spec.submodule_search_locations is not None: | ||
| namespace_locations.extend(spec.submodule_search_locations) | ||
|
|
||
| if not namespace_locations: | ||
| return None | ||
| namespace_spec = ModuleSpec(module_name, loader=None, is_package=True) | ||
| namespace_spec.submodule_search_locations = namespace_locations | ||
| return namespace_spec | ||
|
|
||
|
|
||
| @_register_source_sensitive_cache | ||
|
|
@@ -3485,22 +3592,23 @@ def _resolve_module_source(module_name: str) -> Path | None: | |
| if not parts or any(not part or "/" in part or "\\" in part for part in parts): | ||
| return None | ||
| _track_shared_source_candidates(tuple(parts)) | ||
| for entry in sys.path: | ||
| root = Path(entry or os.getcwd()) | ||
| current = root | ||
| for index, part in enumerate(parts): | ||
| is_last = index == len(parts) - 1 | ||
| if is_last: | ||
| module_file = current / f"{part}.py" | ||
| if module_file.is_file(): | ||
| return module_file | ||
| package_file = current / part / "__init__.py" | ||
| if package_file.is_file(): | ||
| return package_file | ||
| else: | ||
| current = current / part | ||
| if not current.is_dir(): | ||
| break | ||
| loaded_module = sys.modules.get(module_name) | ||
| loaded_spec = getattr(loaded_module, "__spec__", None) | ||
| if isinstance(loaded_spec, ModuleSpec) and isinstance(loaded_spec.origin, str): | ||
| if loaded_spec.origin.endswith(tuple(SOURCE_SUFFIXES)): | ||
| loaded_source_path = Path(loaded_spec.origin) | ||
| if loaded_source_path.is_file(): | ||
| return loaded_source_path | ||
|
Comment on lines
+3598
to
+3601
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the scan process already has a module in Useful? React with 👍 / 👎. |
||
| if loaded_spec.origin not in {"built-in", "frozen"}: | ||
| return None | ||
| elif _untrusted_meta_path_finder_precedes(PathFinder, module_name) or _has_untrusted_path_hook(): | ||
| return None | ||
|
Comment on lines
+3604
to
+3605
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
|
|
||
| spec = _find_standard_filesystem_spec(module_name) | ||
| if spec is not None and isinstance(spec.origin, str) and spec.origin.endswith(tuple(SOURCE_SUFFIXES)): | ||
| source_path = Path(spec.origin) | ||
| if source_path.is_file(): | ||
| return source_path | ||
| return None | ||
|
|
||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.