-
-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Parse files in parallel when possible #21175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
8449bb5
0f160f8
24aad4f
93ad7ce
cd85d5d
1879228
e8fbe26
20fcce2
5d2a216
533b0c7
f6093c5
91368a3
b6f9e67
c1dd208
13ffb0d
8d55063
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,8 @@ | |
| import sys | ||
| import time | ||
| import types | ||
| from collections.abc import Callable, Iterator, Mapping, Sequence, Set as AbstractSet | ||
| from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set as AbstractSet | ||
| from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait | ||
| from heapq import heappop, heappush | ||
| from textwrap import dedent | ||
| from typing import ( | ||
|
|
@@ -947,6 +948,63 @@ def dump_stats(self) -> None: | |
| for key, value in sorted(self.stats_summary().items()): | ||
| print(f"{key + ':':24}{value}") | ||
|
|
||
| def parse_all(self, states: Iterable[State]) -> None: | ||
| """Parse multiple files in parallel (if possible) and compute dependencies. | ||
|
|
||
| Note: this duplicates a bit of logic from State.parse_file(). This is done | ||
| as a micro-optimization to parallelize only those parts of the code that | ||
| can be parallelized efficiently. | ||
| """ | ||
| if self.options.native_parser: | ||
| futures = [] | ||
| parsed_states = set() | ||
| # TODO: we should probably use psutil instead. | ||
| # With psutil we can get a number of physical cores, while all stdlib | ||
| # functions include virtual cores (which is not optimal for performance). | ||
| available_threads = os.cpu_count() or 2 # conservative fallback | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, If you don't want to add a
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I tried
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Huh, I see However,
So maybe your platform didn't implement it until a later Python version. Yeah, helper function + memoization is very helpful here |
||
| # For some reason there is no visible improvement with more than 8 threads. | ||
| # TODO: consider writing our own ThreadPool as an optimization. | ||
| with ThreadPoolExecutor(max_workers=min(available_threads, 8)) as executor: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reusing the ThreadPoolExecutor could also help scaling a bit, based on some quick microbenchmarks (this is a potential future improvement). |
||
| for state in states: | ||
| state.needs_parse = False | ||
| if state.tree is not None: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we guaranteed to not hit the fastparse fallback in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh, completely for got about that one. I think it would be easier to simply parse those sequentially (IIUC it is only needed now for something like |
||
| # The file was already parsed. | ||
| continue | ||
| # New parser reads source from file directly, we do this only for | ||
| # the side effect of parsing inline mypy configurations. | ||
| state.get_source() | ||
| if state.id not in self.ast_cache: | ||
| futures.append(executor.submit(state.parse_file_inner, state.source or "")) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another potential future improvement/experiment: sending multiple small files in a single |
||
| parsed_states.add(state) | ||
| else: | ||
| self.log(f"Using cached AST for {state.xpath} ({state.id})") | ||
| state.tree, state.early_errors = self.ast_cache[state.id] | ||
| for fut in wait(futures, return_when=FIRST_EXCEPTION).done: | ||
| # This will raise exceptions, if any. | ||
| fut.result() | ||
|
|
||
| for state in states: | ||
| assert state.tree is not None | ||
| if state in parsed_states: | ||
| state.early_errors = list(self.errors.error_info_map.get(state.xpath, [])) | ||
| state.semantic_analysis_pass1() | ||
| self.ast_cache[state.id] = (state.tree, state.early_errors) | ||
| self.modules[state.id] = state.tree | ||
| state.check_blockers() | ||
| state.setup_errors() | ||
| else: | ||
| # Old parser cannot be parallelized. | ||
| for state in states: | ||
| state.parse_file() | ||
|
|
||
| for state in states: | ||
| state.compute_dependencies() | ||
| if self.workers and state.tree: | ||
| # We don't need imports in coordinator process anymore, we parse only to | ||
| # compute dependencies. | ||
| state.tree.imports = [] | ||
| del self.ast_cache[state.id] | ||
|
|
||
| def use_fine_grained_cache(self) -> bool: | ||
| return self.cache_enabled and self.options.use_fine_grained_cache | ||
|
|
||
|
|
@@ -2505,8 +2563,7 @@ def new_state( | |
| # we need to re-calculate dependencies. | ||
| # NOTE: see comment below for why we skip this in fine-grained mode. | ||
| if exist_added_packages(suppressed, manager): | ||
| state.parse_file() # This is safe because the cache is anyway stale. | ||
| state.compute_dependencies() | ||
| state.needs_parse = True # This is safe because the cache is anyway stale. | ||
| # This is an inverse to the situation above. If we had an import like this: | ||
| # from pkg import mod | ||
| # and then mod was deleted, we need to force recompute dependencies, to | ||
|
|
@@ -2515,8 +2572,7 @@ def new_state( | |
| # import pkg | ||
| # import pkg.mod | ||
| if exist_removed_submodules(dependencies, manager): | ||
| state.parse_file() # Same as above, the current state is stale anyway. | ||
| state.compute_dependencies() | ||
| state.needs_parse = True # Same as above, the current state is stale anyway. | ||
| state.size_hint = meta.size | ||
| else: | ||
| # When doing a fine-grained cache load, pretend we only | ||
|
|
@@ -2526,14 +2582,17 @@ def new_state( | |
| manager.log(f"Deferring module to fine-grained update {path} ({id})") | ||
| raise ModuleNotFound | ||
|
|
||
| # Parse the file (and then some) to get the dependencies. | ||
| state.parse_file(temporary=temporary) | ||
| state.compute_dependencies() | ||
| if manager.workers and state.tree: | ||
| # We don't need imports in coordinator process anymore, we parse only to | ||
| # compute dependencies. | ||
| state.tree.imports = [] | ||
| del manager.ast_cache[id] | ||
| if temporary: | ||
| # Eagerly parse temporary states, they are needed rarely. | ||
| state.parse_file(temporary=True) | ||
| state.compute_dependencies() | ||
| if state.manager.workers and state.tree: | ||
| # We don't need imports in coordinator process anymore, we parse only to | ||
| # compute dependencies. | ||
| state.tree.imports = [] | ||
| del state.manager.ast_cache[state.id] | ||
| else: | ||
| state.needs_parse = True | ||
|
|
||
| return state | ||
|
|
||
|
|
@@ -2596,6 +2655,8 @@ def __init__( | |
| # Pre-computed opaque value of suppressed_deps_opts() used | ||
| # to minimize amount of data sent to parallel workers. | ||
| self.known_suppressed_deps_opts: bytes | None = None | ||
| # An internal flag used by build manager to schedule states for parsing. | ||
| self.needs_parse = False | ||
|
|
||
| def write(self, buf: WriteBuffer) -> None: | ||
| """Serialize State for sending to build worker. | ||
|
|
@@ -2819,26 +2880,9 @@ def fix_cross_refs(self) -> None: | |
|
|
||
| # Methods for processing modules from source code. | ||
|
|
||
| def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None: | ||
| """Parse file and run first pass of semantic analysis. | ||
|
|
||
| Everything done here is local to the file. Don't depend on imported | ||
| modules in any way. Also record module dependencies based on imports. | ||
| """ | ||
| if self.tree is not None: | ||
| # The file was already parsed (in __init__()). | ||
| return | ||
|
|
||
| def get_source(self) -> str: | ||
| """Get module source and parse inline mypy configurations.""" | ||
| manager = self.manager | ||
|
|
||
| # Can we reuse a previously parsed AST? This avoids redundant work in daemon. | ||
| cached = self.id in manager.ast_cache | ||
| modules = manager.modules | ||
| if not cached: | ||
| manager.log(f"Parsing {self.xpath} ({self.id})") | ||
| else: | ||
| manager.log(f"Using cached AST for {self.xpath} ({self.id})") | ||
|
|
||
| t0 = time_ref() | ||
|
|
||
| with self.wrap_context(): | ||
|
|
@@ -2880,33 +2924,53 @@ def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = | |
| self.check_for_invalid_options() | ||
|
|
||
| self.size_hint = len(source) | ||
| if not cached: | ||
| ignore_errors = self.ignore_all or self.options.ignore_errors | ||
| self.tree = manager.parse_file( | ||
| self.id, | ||
| self.xpath, | ||
| source, | ||
| ignore_errors=ignore_errors, | ||
| options=self.options, | ||
| raw_data=raw_data, | ||
| ) | ||
| else: | ||
| # Reuse a cached AST | ||
| self.tree = manager.ast_cache[self.id][0] | ||
| self.time_spent_us += time_spent_us(t0) | ||
| return source | ||
|
|
||
| def parse_file_inner(self, source: str, raw_data: FileRawData | None = None) -> None: | ||
| t0 = time_ref() | ||
| self.manager.log(f"Parsing {self.xpath} ({self.id})") | ||
| with self.wrap_context(): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error context and the A more efficient approach would be to have It's okay to do something simple at first. We can always improve it later, but it would be good to fix race conditions before merging. Also in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, indeed there are race conditions. But IIUC the extra |
||
| ignore_errors = self.ignore_all or self.options.ignore_errors | ||
| self.tree = self.manager.parse_file( | ||
| self.id, | ||
| self.xpath, | ||
| source, | ||
| ignore_errors=ignore_errors, | ||
| options=self.options, | ||
| raw_data=raw_data, | ||
| ) | ||
| self.time_spent_us += time_spent_us(t0) | ||
|
|
||
| if not cached: | ||
| def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None: | ||
| """Parse file and run first pass of semantic analysis. | ||
|
|
||
| Everything done here is local to the file. Don't depend on imported | ||
| modules in any way. Logic here should be kept in sync with BuildManager.parse_all(). | ||
| """ | ||
| self.needs_parse = False | ||
| if self.tree is not None: | ||
| # The file was already parsed. | ||
| return | ||
|
|
||
| source = self.get_source() | ||
| manager = self.manager | ||
| # Can we reuse a previously parsed AST? This avoids redundant work in daemon. | ||
| if self.id not in manager.ast_cache: | ||
| self.parse_file_inner(source, raw_data) | ||
| # Make a copy of any errors produced during parse time so that | ||
| # fine-grained mode can repeat them when the module is | ||
| # reprocessed. | ||
| self.early_errors = list(manager.errors.error_info_map.get(self.xpath, [])) | ||
| self.semantic_analysis_pass1() | ||
| else: | ||
| self.early_errors = manager.ast_cache[self.id][1] | ||
| # Reuse a cached AST | ||
| manager.log(f"Using cached AST for {self.xpath} ({self.id})") | ||
| self.tree, self.early_errors = manager.ast_cache[self.id] | ||
|
|
||
| assert self.tree is not None | ||
| if not temporary: | ||
| modules[self.id] = self.tree | ||
| manager.modules[self.id] = self.tree | ||
| self.check_blockers() | ||
|
|
||
| manager.ast_cache[self.id] = (self.tree, self.early_errors) | ||
|
|
@@ -3912,14 +3976,23 @@ def load_graph( | |
| graph[st.id] = st | ||
| new.append(st) | ||
| entry_points.add(bs.module) | ||
| manager.parse_all([state for state in new if state.needs_parse]) | ||
|
|
||
| # Note: Running this each time could be slow in the daemon. If it's a problem, we | ||
| # can do more work to maintain this incrementally. | ||
| seen_files = {st.abspath: st for st in graph.values() if st.path} | ||
|
|
||
| # Collect dependencies. We go breadth-first. | ||
| # More nodes might get added to new as we go, but that's fine. | ||
| ready = set(new) | ||
| not_ready: set[State] = set() | ||
| for st in new: | ||
| if st not in ready: | ||
| # We have run out of states, parse all we have. | ||
| assert st in not_ready | ||
| manager.parse_all(not_ready) | ||
| ready |= not_ready | ||
| not_ready.clear() | ||
| assert st.ancestors is not None | ||
| # Strip out indirect dependencies. These will be dealt with | ||
| # when they show up as direct dependencies, and there's a | ||
|
|
@@ -3975,6 +4048,7 @@ def load_graph( | |
| newst_path = newst.abspath | ||
|
|
||
| if newst_path in seen_files: | ||
| manager.errors.set_file(newst.xpath, newst.id, manager.options) | ||
| manager.error( | ||
| None, | ||
| "Source file found twice under different module names: " | ||
|
|
@@ -3995,6 +4069,10 @@ def load_graph( | |
| assert newst.id not in graph, newst.id | ||
| graph[newst.id] = newst | ||
| new.append(newst) | ||
| if newst.needs_parse: | ||
| not_ready.add(newst) | ||
| else: | ||
| ready.add(newst) | ||
| # There are two things we need to do after the initial load loop. One is up-suppress | ||
| # modules that are back in graph. We need to do this after the loop to cover edge cases | ||
| # like where a namespace package ancestor is shared by a typed and an untyped package. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| from __future__ import annotations | ||
|
|
||
| import os | ||
| import time | ||
| from typing import Any, Final, cast | ||
|
|
||
| import ast_serialize # type: ignore[import-untyped, import-not-found, unused-ignore] | ||
|
|
@@ -273,6 +274,10 @@ def read_statements(state: State, data: ReadBuffer, n: int) -> list[Statement]: | |
| def parse_to_binary_ast( | ||
| filename: str, options: Options, skip_function_bodies: bool = False | ||
| ) -> tuple[bytes, list[dict[str, Any]], TypeIgnores, bytes, bool, bool]: | ||
| # This is a horrible hack to work around a mypyc bug where imported | ||
| # module may be not ready in a thread sometimes. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: maybe add a timeout so that we give up after say 60s and raise an exception? |
||
| while ast_serialize is None: | ||
| time.sleep(0.0001) # type: ignore[unreachable] | ||
| ast_bytes, errors, ignores, import_bytes, ast_data = ast_serialize.parse( | ||
| filename, | ||
| skip_function_bodies=skip_function_bodies, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterableis somewhat questionable as a type, since we iterate overstatesmore than once.Container[State]could be a slightly better type.