Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 124 additions & 48 deletions mypy/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import sys
import time
import types
from collections.abc import Callable, Iterator, Mapping, Sequence, Set as AbstractSet
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set as AbstractSet
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from heapq import heappop, heappush
from textwrap import dedent
from typing import (
Expand Down Expand Up @@ -125,6 +126,7 @@
from mypy.util import (
DecodeError,
decode_python_encoding,
get_available_threads,
get_mypy_comments,
hash_digest,
hash_digest_bytes,
Expand Down Expand Up @@ -947,6 +949,60 @@ def dump_stats(self) -> None:
for key, value in sorted(self.stats_summary().items()):
print(f"{key + ':':24}{value}")

def parse_all(self, states: Iterable[State]) -> None:
"""Parse multiple files in parallel (if possible) and compute dependencies.

Note: this duplicates a bit of logic from State.parse_file(). This is done
as a micro-optimization to parallelize only those parts of the code that
can be parallelized efficiently.
"""
if self.options.native_parser:
futures = []
parsed_states = set()
available_threads = get_available_threads()
# Overhead from trying to parallelize (small) blocking portion of
# parse_file_inner() results in no visible improvement with more than 8 threads.
with ThreadPoolExecutor(max_workers=min(available_threads, 8)) as executor:
for state in states:
state.needs_parse = False
if state.tree is not None:
# The file was already parsed.
continue
# New parser reads source from file directly, we do this only for
# the side effect of parsing inline mypy configurations.
state.get_source()
if state.id not in self.ast_cache:
futures.append(executor.submit(state.parse_file_inner, state.source or ""))
parsed_states.add(state)
else:
self.log(f"Using cached AST for {state.xpath} ({state.id})")
state.tree, state.early_errors = self.ast_cache[state.id]
for fut in wait(futures, return_when=FIRST_EXCEPTION).done:
# This will raise exceptions, if any.
fut.result()

for state in states:
assert state.tree is not None
if state in parsed_states:
state.early_errors = list(self.errors.error_info_map.get(state.xpath, []))
state.semantic_analysis_pass1()
self.ast_cache[state.id] = (state.tree, state.early_errors)
self.modules[state.id] = state.tree
state.check_blockers()
state.setup_errors()
else:
# Old parser cannot be parallelized.
for state in states:
state.parse_file()

for state in states:
state.compute_dependencies()
if self.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del self.ast_cache[state.id]

def use_fine_grained_cache(self) -> bool:
return self.cache_enabled and self.options.use_fine_grained_cache

Expand Down Expand Up @@ -2502,8 +2558,7 @@ def new_state(
# we need to re-calculate dependencies.
# NOTE: see comment below for why we skip this in fine-grained mode.
if exist_added_packages(suppressed, manager):
state.parse_file() # This is safe because the cache is anyway stale.
state.compute_dependencies()
state.needs_parse = True # This is safe because the cache is anyway stale.
# This is an inverse to the situation above. If we had an import like this:
# from pkg import mod
# and then mod was deleted, we need to force recompute dependencies, to
Expand All @@ -2512,8 +2567,7 @@ def new_state(
# import pkg
# import pkg.mod
if exist_removed_submodules(dependencies, manager):
state.parse_file() # Same as above, the current state is stale anyway.
state.compute_dependencies()
state.needs_parse = True # Same as above, the current state is stale anyway.
state.size_hint = meta.size
else:
# When doing a fine-grained cache load, pretend we only
Expand All @@ -2523,14 +2577,17 @@ def new_state(
manager.log(f"Deferring module to fine-grained update {path} ({id})")
raise ModuleNotFound

# Parse the file (and then some) to get the dependencies.
state.parse_file(temporary=temporary)
state.compute_dependencies()
if manager.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del manager.ast_cache[id]
if temporary:
# Eagerly parse temporary states, they are needed rarely.
state.parse_file(temporary=True)
state.compute_dependencies()
if state.manager.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del state.manager.ast_cache[state.id]
else:
state.needs_parse = True

return state

Expand Down Expand Up @@ -2593,6 +2650,8 @@ def __init__(
# Pre-computed opaque value of suppressed_deps_opts() used
# to minimize amount of data sent to parallel workers.
self.known_suppressed_deps_opts: bytes | None = None
# An internal flag used by build manager to schedule states for parsing.
self.needs_parse = False

def write(self, buf: WriteBuffer) -> None:
"""Serialize State for sending to build worker.
Expand Down Expand Up @@ -2816,26 +2875,9 @@ def fix_cross_refs(self) -> None:

# Methods for processing modules from source code.

def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None:
"""Parse file and run first pass of semantic analysis.

Everything done here is local to the file. Don't depend on imported
modules in any way. Also record module dependencies based on imports.
"""
if self.tree is not None:
# The file was already parsed (in __init__()).
return

def get_source(self) -> str:
"""Get module source and parse inline mypy configurations."""
manager = self.manager

# Can we reuse a previously parsed AST? This avoids redundant work in daemon.
cached = self.id in manager.ast_cache
modules = manager.modules
if not cached:
manager.log(f"Parsing {self.xpath} ({self.id})")
else:
manager.log(f"Using cached AST for {self.xpath} ({self.id})")

t0 = time_ref()

with self.wrap_context():
Expand Down Expand Up @@ -2877,33 +2919,53 @@ def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None =
self.check_for_invalid_options()

self.size_hint = len(source)
if not cached:
ignore_errors = self.ignore_all or self.options.ignore_errors
self.tree = manager.parse_file(
self.id,
self.xpath,
source,
ignore_errors=ignore_errors,
options=self.options,
raw_data=raw_data,
)
else:
# Reuse a cached AST
self.tree = manager.ast_cache[self.id][0]
self.time_spent_us += time_spent_us(t0)
return source

def parse_file_inner(self, source: str, raw_data: FileRawData | None = None) -> None:
t0 = time_ref()
self.manager.log(f"Parsing {self.xpath} ({self.id})")
with self.wrap_context():
ignore_errors = self.ignore_all or self.options.ignore_errors
self.tree = self.manager.parse_file(
self.id,
self.xpath,
source,
ignore_errors=ignore_errors,
options=self.options,
raw_data=raw_data,
)
self.time_spent_us += time_spent_us(t0)

if not cached:
def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None:
"""Parse file and run first pass of semantic analysis.

Everything done here is local to the file. Don't depend on imported
modules in any way. Logic here should be kept in sync with BuildManager.parse_all().
"""
self.needs_parse = False
if self.tree is not None:
# The file was already parsed.
return

source = self.get_source()
manager = self.manager
# Can we reuse a previously parsed AST? This avoids redundant work in daemon.
if self.id not in manager.ast_cache:
self.parse_file_inner(source, raw_data)
# Make a copy of any errors produced during parse time so that
# fine-grained mode can repeat them when the module is
# reprocessed.
self.early_errors = list(manager.errors.error_info_map.get(self.xpath, []))
self.semantic_analysis_pass1()
else:
self.early_errors = manager.ast_cache[self.id][1]
# Reuse a cached AST
manager.log(f"Using cached AST for {self.xpath} ({self.id})")
self.tree, self.early_errors = manager.ast_cache[self.id]

assert self.tree is not None
if not temporary:
modules[self.id] = self.tree
manager.modules[self.id] = self.tree
self.check_blockers()

manager.ast_cache[self.id] = (self.tree, self.early_errors)
Expand Down Expand Up @@ -3909,14 +3971,23 @@ def load_graph(
graph[st.id] = st
new.append(st)
entry_points.add(bs.module)
manager.parse_all([state for state in new if state.needs_parse])

# Note: Running this each time could be slow in the daemon. If it's a problem, we
# can do more work to maintain this incrementally.
seen_files = {st.abspath: st for st in graph.values() if st.path}

# Collect dependencies. We go breadth-first.
# More nodes might get added to new as we go, but that's fine.
ready = set(new)
not_ready: set[State] = set()
for st in new:
if st not in ready:
# We have run out of states, parse all we have.
assert st in not_ready
manager.parse_all(not_ready)
ready |= not_ready
not_ready.clear()
assert st.ancestors is not None
# Strip out indirect dependencies. These will be dealt with
# when they show up as direct dependencies, and there's a
Expand Down Expand Up @@ -3972,6 +4043,7 @@ def load_graph(
newst_path = newst.abspath

if newst_path in seen_files:
manager.errors.set_file(newst.xpath, newst.id, manager.options)
manager.error(
None,
"Source file found twice under different module names: "
Expand All @@ -3992,6 +4064,10 @@ def load_graph(
assert newst.id not in graph, newst.id
graph[newst.id] = newst
new.append(newst)
if newst.needs_parse:
not_ready.add(newst)
else:
ready.add(newst)
# There are two things we need to do after the initial load loop. One is up-suppress
# modules that are back in graph. We need to do this after the loop to cover edge cases
# like where a namespace package ancestor is shared by a typed and an untyped package.
Expand Down
2 changes: 1 addition & 1 deletion mypy/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def close(self) -> None:
def connect_db(db_file: str) -> sqlite3.Connection:
import sqlite3.dbapi2

db = sqlite3.dbapi2.connect(db_file)
db = sqlite3.dbapi2.connect(db_file, check_same_thread=False)
# This is a bit unfortunate (as we may get corrupt cache after e.g. Ctrl + C),
# but without this flag, commits are *very* slow, especially when using HDDs,
# see https://www.sqlite.org/faq.html#q19 for details.
Expand Down
5 changes: 5 additions & 0 deletions mypy/nativeparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import annotations

import os
import time
from typing import Any, Final, cast

import ast_serialize # type: ignore[import-untyped, import-not-found, unused-ignore]
Expand Down Expand Up @@ -273,6 +274,10 @@ def read_statements(state: State, data: ReadBuffer, n: int) -> list[Statement]:
def parse_to_binary_ast(
filename: str, options: Options, skip_function_bodies: bool = False
) -> tuple[bytes, list[dict[str, Any]], TypeIgnores, bytes, bool, bool]:
# This is a horrible hack to work around a mypyc bug where imported
# module may be not ready in a thread sometimes.
while ast_serialize is None:
time.sleep(0.0001) # type: ignore[unreachable]
ast_bytes, errors, ignores, import_bytes, ast_data = ast_serialize.parse(
filename,
skip_function_bodies=skip_function_bodies,
Expand Down
2 changes: 2 additions & 0 deletions mypy/test/testgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def test_sorted_components(self) -> None:
"c": State.new_state("c", None, "import b, d", manager),
"builtins": State.new_state("builtins", None, "", manager),
}
manager.parse_all(graph.values())
res = [scc.mod_ids for scc in sorted_components(graph)]
assert_equal(res, [{"builtins"}, {"d"}, {"c", "b"}, {"a"}])

Expand All @@ -129,6 +130,7 @@ def test_order_ascc(self) -> None:
"c": State.new_state("c", None, "import b, d", manager),
"builtins": State.new_state("builtins", None, "", manager),
}
manager.parse_all(graph.values())
res = [scc.mod_ids for scc in sorted_components(graph)]
assert_equal(res, [{"builtins"}, {"a", "d", "c", "b"}])
ascc = res[1]
Expand Down
55 changes: 55 additions & 0 deletions mypy/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
except ImportError:
CURSES_ENABLED = False

try:
import psutil

PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False

T = TypeVar("T")

TYPESHED_DIR: Final = str(importlib_resources.files("mypy") / "typeshed")
Expand Down Expand Up @@ -959,3 +966,51 @@ def json_loads(data: bytes) -> Any:
if orjson is not None:
return orjson.loads(data)
return json.loads(data)


_AVAILABLE_THREADS: int | None = None


def get_available_threads() -> int:
"""Determine number of physical cores that current process can use (best effort)."""
global _AVAILABLE_THREADS
if _AVAILABLE_THREADS is not None:
return _AVAILABLE_THREADS

# This takes into account -X cpu_count and/or PYTHON_CPU_COUNT, but always
# counts virtual cores (which is not what we want for CPU bound tasks).
os_cpu_count = os.cpu_count()
if PSUTIL_AVAILABLE:
# Unlike os, psutil can determine number of physical cores.
psutil_cpu_count = psutil.cpu_count(logical=False)
else:
psutil_cpu_count = None

if psutil_cpu_count and os_cpu_count:
cpu_count = min(psutil_cpu_count, os_cpu_count)
elif psutil_cpu_count or os_cpu_count:
cpu_count = psutil_cpu_count or os_cpu_count
else:
# A conservative fallback in case we cannot determine CPU count in any way.
cpu_count = 2

affinity = None
try:
# Not available on old Python versions on some platforms.
affinity = os.sched_getaffinity(0)
except AttributeError:
pass
if PSUTIL_AVAILABLE:
try:
# Currently not supported on macOS.
affinity = psutil.Process().cpu_affinity()
except AttributeError:
pass

assert cpu_count is not None
if affinity:
available_threads = min(cpu_count, len(affinity))
else:
available_threads = cpu_count
_AVAILABLE_THREADS = available_threads
return available_threads