Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 126 additions & 48 deletions mypy/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import sys
import time
import types
from collections.abc import Callable, Iterator, Mapping, Sequence, Set as AbstractSet
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set as AbstractSet
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from heapq import heappop, heappush
from textwrap import dedent
from typing import (
Expand Down Expand Up @@ -947,6 +948,63 @@ def dump_stats(self) -> None:
for key, value in sorted(self.stats_summary().items()):
print(f"{key + ':':24}{value}")

def parse_all(self, states: Iterable[State]) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterable is somewhat questionable as a type, since we iterate over states more than once. Container[State] could be a slightly better type.

"""Parse multiple files in parallel (if possible) and compute dependencies.

Note: this duplicates a bit of logic from State.parse_file(). This is done
as a micro-optimization to parallelize only those parts of the code that
can be parallelized efficiently.
"""
if self.options.native_parser:
futures = []
parsed_states = set()
# TODO: we should probably use psutil instead.
# With psutil we can get a number of physical cores, while all stdlib
# functions include virtual cores (which is not optimal for performance).
available_threads = os.cpu_count() or 2 # conservative fallback
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, len(psutil.Process().cpu_affinity()) is better everywhere except Darwin/macOS, where psutil doesn't support that; though I still suggest taking the minimum of that and os.cpu_count() as the later respects -X cpu_count and/or PYTHON_CPU_COUNT for Python 3.13+ users (especially containerized users).

If you don't want to add a psutil dependency yet, I recommend os.sched_getaffinity(0) which is how os.process_cpu_count() is implemented on Python 3.13+. (you should also still call os.cpu_count() and use it if it is smaller, for the same reasons as above).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I tried sched_getaffinity() but it is not available on Python 3.10 (which we still support). I guess we may need to write a separate helper with various fallbacks logic to make this ~reliable.

Copy link
Copy Markdown
Contributor

@mr-c mr-c Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I see os.sched_getaffinity() all the way back to Python 3.3: https://docs.python.org/3.3/library/os.html#os.sched_getaffinity

However,

They are only available on some Unix platforms

So maybe your platform didn't implement it until a later Python version.

Yeah, helper function + memoization is very helpful here

# For some reason there is no visible improvement with more than 8 threads.
# TODO: consider writing our own ThreadPool as an optimization.
with ThreadPoolExecutor(max_workers=min(available_threads, 8)) as executor:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing the ThreadPoolExecutor could also help scaling a bit, based on some quick microbenchmarks (this is a potential future improvement).

for state in states:
state.needs_parse = False
if state.tree is not None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed to not hit the fastparse fallback in mypy/parse.py when the path doesn't exist? If yes, maybe add a comment about it here and/or add an assert to ensure this won't go through the legacy fast parser which I think is not thread safe.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed to not hit the fastparse fallback in mypy/parse.py when the path doesn't exist?

Oh, completely for got about that one. I think it would be easier to simply parse those sequentially (IIUC it is only needed now for something like mypy -c).

# The file was already parsed.
continue
# New parser reads source from file directly, we do this only for
# the side effect of parsing inline mypy configurations.
state.get_source()
if state.id not in self.ast_cache:
futures.append(executor.submit(state.parse_file_inner, state.source or ""))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential future improvement/experiment: sending multiple small files in a single submit call could improve scaling a bit, but likely only for small files or if the batch is large enough.

parsed_states.add(state)
else:
self.log(f"Using cached AST for {state.xpath} ({state.id})")
state.tree, state.early_errors = self.ast_cache[state.id]
for fut in wait(futures, return_when=FIRST_EXCEPTION).done:
# This will raise exceptions, if any.
fut.result()

for state in states:
assert state.tree is not None
if state in parsed_states:
state.early_errors = list(self.errors.error_info_map.get(state.xpath, []))
state.semantic_analysis_pass1()
self.ast_cache[state.id] = (state.tree, state.early_errors)
self.modules[state.id] = state.tree
state.check_blockers()
state.setup_errors()
else:
# Old parser cannot be parallelized.
for state in states:
state.parse_file()

for state in states:
state.compute_dependencies()
if self.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del self.ast_cache[state.id]

def use_fine_grained_cache(self) -> bool:
return self.cache_enabled and self.options.use_fine_grained_cache

Expand Down Expand Up @@ -2505,8 +2563,7 @@ def new_state(
# we need to re-calculate dependencies.
# NOTE: see comment below for why we skip this in fine-grained mode.
if exist_added_packages(suppressed, manager):
state.parse_file() # This is safe because the cache is anyway stale.
state.compute_dependencies()
state.needs_parse = True # This is safe because the cache is anyway stale.
# This is an inverse to the situation above. If we had an import like this:
# from pkg import mod
# and then mod was deleted, we need to force recompute dependencies, to
Expand All @@ -2515,8 +2572,7 @@ def new_state(
# import pkg
# import pkg.mod
if exist_removed_submodules(dependencies, manager):
state.parse_file() # Same as above, the current state is stale anyway.
state.compute_dependencies()
state.needs_parse = True # Same as above, the current state is stale anyway.
state.size_hint = meta.size
else:
# When doing a fine-grained cache load, pretend we only
Expand All @@ -2526,14 +2582,17 @@ def new_state(
manager.log(f"Deferring module to fine-grained update {path} ({id})")
raise ModuleNotFound

# Parse the file (and then some) to get the dependencies.
state.parse_file(temporary=temporary)
state.compute_dependencies()
if manager.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del manager.ast_cache[id]
if temporary:
# Eagerly parse temporary states, they are needed rarely.
state.parse_file(temporary=True)
state.compute_dependencies()
if state.manager.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del state.manager.ast_cache[state.id]
else:
state.needs_parse = True

return state

Expand Down Expand Up @@ -2596,6 +2655,8 @@ def __init__(
# Pre-computed opaque value of suppressed_deps_opts() used
# to minimize amount of data sent to parallel workers.
self.known_suppressed_deps_opts: bytes | None = None
# An internal flag used by build manager to schedule states for parsing.
self.needs_parse = False

def write(self, buf: WriteBuffer) -> None:
"""Serialize State for sending to build worker.
Expand Down Expand Up @@ -2819,26 +2880,9 @@ def fix_cross_refs(self) -> None:

# Methods for processing modules from source code.

def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None:
"""Parse file and run first pass of semantic analysis.

Everything done here is local to the file. Don't depend on imported
modules in any way. Also record module dependencies based on imports.
"""
if self.tree is not None:
# The file was already parsed (in __init__()).
return

def get_source(self) -> str:
"""Get module source and parse inline mypy configurations."""
manager = self.manager

# Can we reuse a previously parsed AST? This avoids redundant work in daemon.
cached = self.id in manager.ast_cache
modules = manager.modules
if not cached:
manager.log(f"Parsing {self.xpath} ({self.id})")
else:
manager.log(f"Using cached AST for {self.xpath} ({self.id})")

t0 = time_ref()

with self.wrap_context():
Expand Down Expand Up @@ -2880,33 +2924,53 @@ def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None =
self.check_for_invalid_options()

self.size_hint = len(source)
if not cached:
ignore_errors = self.ignore_all or self.options.ignore_errors
self.tree = manager.parse_file(
self.id,
self.xpath,
source,
ignore_errors=ignore_errors,
options=self.options,
raw_data=raw_data,
)
else:
# Reuse a cached AST
self.tree = manager.ast_cache[self.id][0]
self.time_spent_us += time_spent_us(t0)
return source

def parse_file_inner(self, source: str, raw_data: FileRawData | None = None) -> None:
t0 = time_ref()
self.manager.log(f"Parsing {self.xpath} ({self.id})")
with self.wrap_context():
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error context and the Errors object seems to be shared between threads, so there is a race condition. We could perhaps use a separate Errors per thread as a quick fix.

A more efficient approach would be to have parse just return a list of error descriptions, and then the main thread would report any errors via the shared Errors instance. This way we wouldn't need to set up error context for each file unless there are errors (which is rare), so the sequential part would do less work, possibly increasing the maximum possible concurrency.

It's okay to do something simple at first. We can always improve it later, but it would be good to fix race conditions before merging.

Also in BuildManager.parse_file, the add_stats call is racy. In this case we could add a stats_enabled check to quickly skip stats reporting most of the time (again reducing sequential part), and then use a lock around stats updates if stats are enabled.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed there are race conditions. But IIUC the extra wrap_context() calls I added are needed anyway. Without those we can get incorrect error ordering even in sequential mode in some edge cases.

ignore_errors = self.ignore_all or self.options.ignore_errors
self.tree = self.manager.parse_file(
self.id,
self.xpath,
source,
ignore_errors=ignore_errors,
options=self.options,
raw_data=raw_data,
)
self.time_spent_us += time_spent_us(t0)

if not cached:
def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None:
"""Parse file and run first pass of semantic analysis.

Everything done here is local to the file. Don't depend on imported
modules in any way. Logic here should be kept in sync with BuildManager.parse_all().
"""
self.needs_parse = False
if self.tree is not None:
# The file was already parsed.
return

source = self.get_source()
manager = self.manager
# Can we reuse a previously parsed AST? This avoids redundant work in daemon.
if self.id not in manager.ast_cache:
self.parse_file_inner(source, raw_data)
# Make a copy of any errors produced during parse time so that
# fine-grained mode can repeat them when the module is
# reprocessed.
self.early_errors = list(manager.errors.error_info_map.get(self.xpath, []))
self.semantic_analysis_pass1()
else:
self.early_errors = manager.ast_cache[self.id][1]
# Reuse a cached AST
manager.log(f"Using cached AST for {self.xpath} ({self.id})")
self.tree, self.early_errors = manager.ast_cache[self.id]

assert self.tree is not None
if not temporary:
modules[self.id] = self.tree
manager.modules[self.id] = self.tree
self.check_blockers()

manager.ast_cache[self.id] = (self.tree, self.early_errors)
Expand Down Expand Up @@ -3912,14 +3976,23 @@ def load_graph(
graph[st.id] = st
new.append(st)
entry_points.add(bs.module)
manager.parse_all([state for state in new if state.needs_parse])

# Note: Running this each time could be slow in the daemon. If it's a problem, we
# can do more work to maintain this incrementally.
seen_files = {st.abspath: st for st in graph.values() if st.path}

# Collect dependencies. We go breadth-first.
# More nodes might get added to new as we go, but that's fine.
ready = set(new)
not_ready: set[State] = set()
for st in new:
if st not in ready:
# We have run out of states, parse all we have.
assert st in not_ready
manager.parse_all(not_ready)
ready |= not_ready
not_ready.clear()
assert st.ancestors is not None
# Strip out indirect dependencies. These will be dealt with
# when they show up as direct dependencies, and there's a
Expand Down Expand Up @@ -3975,6 +4048,7 @@ def load_graph(
newst_path = newst.abspath

if newst_path in seen_files:
manager.errors.set_file(newst.xpath, newst.id, manager.options)
manager.error(
None,
"Source file found twice under different module names: "
Expand All @@ -3995,6 +4069,10 @@ def load_graph(
assert newst.id not in graph, newst.id
graph[newst.id] = newst
new.append(newst)
if newst.needs_parse:
not_ready.add(newst)
else:
ready.add(newst)
# There are two things we need to do after the initial load loop. One is up-suppress
# modules that are back in graph. We need to do this after the loop to cover edge cases
# like where a namespace package ancestor is shared by a typed and an untyped package.
Expand Down
2 changes: 1 addition & 1 deletion mypy/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def close(self) -> None:
def connect_db(db_file: str, sync_off: bool = False) -> sqlite3.Connection:
import sqlite3.dbapi2

db = sqlite3.dbapi2.connect(db_file)
db = sqlite3.dbapi2.connect(db_file, check_same_thread=False)
if sync_off:
# This is a bit unfortunate (as we may get corrupt cache after e.g. Ctrl + C),
# but without this flag, commits are *very* slow, especially when using HDDs,
Expand Down
5 changes: 5 additions & 0 deletions mypy/nativeparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import annotations

import os
import time
from typing import Any, Final, cast

import ast_serialize # type: ignore[import-untyped, import-not-found, unused-ignore]
Expand Down Expand Up @@ -273,6 +274,10 @@ def read_statements(state: State, data: ReadBuffer, n: int) -> list[Statement]:
def parse_to_binary_ast(
filename: str, options: Options, skip_function_bodies: bool = False
) -> tuple[bytes, list[dict[str, Any]], TypeIgnores, bytes, bool, bool]:
# This is a horrible hack to work around a mypyc bug where imported
# module may be not ready in a thread sometimes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: maybe add a timeout so that we give up after say 60s and raise an exception?

while ast_serialize is None:
time.sleep(0.0001) # type: ignore[unreachable]
ast_bytes, errors, ignores, import_bytes, ast_data = ast_serialize.parse(
filename,
skip_function_bodies=skip_function_bodies,
Expand Down
2 changes: 2 additions & 0 deletions mypy/test/testgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def test_sorted_components(self) -> None:
"c": State.new_state("c", None, "import b, d", manager),
"builtins": State.new_state("builtins", None, "", manager),
}
manager.parse_all(graph.values())
res = [scc.mod_ids for scc in sorted_components(graph)]
assert_equal(res, [{"builtins"}, {"d"}, {"c", "b"}, {"a"}])

Expand All @@ -129,6 +130,7 @@ def test_order_ascc(self) -> None:
"c": State.new_state("c", None, "import b, d", manager),
"builtins": State.new_state("builtins", None, "", manager),
}
manager.parse_all(graph.values())
res = [scc.mod_ids for scc in sorted_components(graph)]
assert_equal(res, [{"builtins"}, {"a", "d", "c", "b"}])
ascc = res[1]
Expand Down
Loading