11from __future__ import annotations
22
3+ import logging
4+ from concurrent .futures import ThreadPoolExecutor , as_completed
35from dataclasses import dataclass
46from pathlib import Path
57
2628from portfolio_auditor .settings import Settings
2729from portfolio_auditor .site .transformers import build_site_payload
2830
31+ logger = logging .getLogger (__name__ )
32+
33+ # Maximum number of repos processed concurrently.
34+ # Kept intentionally modest to respect GitHub clone rate limits and local I/O.
35+ _DEFAULT_MAX_WORKERS = 4
36+
2937
3038@dataclass (slots = True )
3139class AuditArtifacts :
@@ -38,6 +46,16 @@ class AuditArtifacts:
3846 selection : PortfolioSelection | None = None
3947
4048
49+ @dataclass (slots = True )
50+ class _RepoPipelineResult :
51+ """Intermediate result for one repo — used to merge parallel work."""
52+
53+ repo : RepoMetadata
54+ scan : RepoScanResult
55+ score : RepoScore
56+ review : RepoReview
57+
58+
4159class AuditRunner :
4260 """
4361 Main orchestration layer for the deterministic audit pipeline.
@@ -46,15 +64,26 @@ class AuditRunner:
4664 - collect GitHub metadata
4765 - enrich metadata
4866 - clone repositories locally
49- - run scanners
67+ - run scanners (parallelised with ThreadPoolExecutor)
5068 - compute scores
5169 - generate reviews
5270 - rank repositories for portfolio decisions
5371 - export JSON/CSV/site artifacts
72+
73+ Parallelism note
74+ ----------------
75+ Scanning, scoring and reviewing are CPU/IO-bound but GIL-friendly for our
76+ workload (mostly file reads + pure-Python computation). A ThreadPoolExecutor
77+ with a small pool avoids the overhead of multiprocessing while still giving
78+ meaningful wall-clock improvement on portfolios of 20+ repos.
79+
80+ The ranking and export phases are kept sequential because they operate on the
81+ full aggregated result set and are not the bottleneck.
5482 """
5583
56- def __init__ (self , settings : Settings ) -> None :
84+ def __init__ (self , settings : Settings , max_workers : int = _DEFAULT_MAX_WORKERS ) -> None :
5785 self .settings = settings
86+ self .max_workers = max_workers
5887 self .client = GitHubClient (settings )
5988 self .collector = GitHubCollector (self .client , settings )
6089 self .clone_manager = CloneManager (settings )
@@ -92,27 +121,30 @@ def run(
92121 shallow = True ,
93122 )
94123
95- scans : list [RepoScanResult ] = []
96- scores : list [RepoScore ] = []
97- reviews : list [RepoReview ] = []
124+ # -------------------------------------------------------------------
125+ # Parallel scan / score / review
126+ # -------------------------------------------------------------------
127+ pipeline_results = self ._run_pipeline_parallel (repos )
98128
99- for repo in repos :
100- local_path = self .settings .get_repo_clone_path (repo .full_name )
101- scan = self .scan_repo (repo , local_path )
102- score = self .scoring_engine .score (repo , scan )
103- review = self .reviewer .review (repo , scan , score )
129+ # Re-order results to match the original repo list order (parallelism
130+ # may return them in arbitrary completion order).
131+ result_index : dict [str , _RepoPipelineResult ] = {
132+ r .repo .full_name : r for r in pipeline_results
133+ }
134+ ordered_results = [result_index [repo .full_name ] for repo in repos if repo .full_name in result_index ]
104135
105- scans . append ( scan )
106- scores . append ( score )
107- reviews . append ( review )
136+ scans = [ r . scan for r in ordered_results ]
137+ scores = [ r . score for r in ordered_results ]
138+ reviews = [ r . review for r in ordered_results ]
108139
109- if export :
140+ if export :
141+ for result in ordered_results :
110142 self ._export_repo_bundle (
111143 owner = owner ,
112- repo = repo ,
113- scan = scan ,
114- score = score ,
115- review = review ,
144+ repo = result . repo ,
145+ scan = result . scan ,
146+ score = result . score ,
147+ review = result . review ,
116148 )
117149
118150 ranking = self .ranker .build_ranking (repos = repos , scores = scores , reviews = reviews )
@@ -133,6 +165,42 @@ def run(
133165
134166 return artifacts
135167
168+ def _run_pipeline_parallel (
169+ self , repos : list [RepoMetadata ]
170+ ) -> list [_RepoPipelineResult ]:
171+ """
172+ Process each repo (scan → score → review) in a thread pool.
173+
174+ Failures on individual repos are logged and skipped so that a single
175+ broken clone does not abort the entire audit.
176+ """
177+ results : list [_RepoPipelineResult ] = []
178+
179+ with ThreadPoolExecutor (max_workers = self .max_workers ) as executor :
180+ future_to_repo = {
181+ executor .submit (self ._process_single_repo , repo ): repo
182+ for repo in repos
183+ }
184+ for future in as_completed (future_to_repo ):
185+ repo = future_to_repo [future ]
186+ try :
187+ result = future .result ()
188+ results .append (result )
189+ logger .debug ("Processed %s" , repo .full_name )
190+ except Exception :
191+ logger .exception (
192+ "Pipeline failed for %s — repo will be skipped" , repo .full_name
193+ )
194+
195+ return results
196+
197+ def _process_single_repo (self , repo : RepoMetadata ) -> _RepoPipelineResult :
198+ local_path = self .settings .get_repo_clone_path (repo .full_name )
199+ scan = self .scan_repo (repo , local_path )
200+ score = self .scoring_engine .score (repo , scan )
201+ review = self .reviewer .review (repo , scan , score )
202+ return _RepoPipelineResult (repo = repo , scan = scan , score = score , review = review )
203+
136204 def scan_repo (self , repo : RepoMetadata , local_path : Path ) -> RepoScanResult :
137205 scan_result = RepoScanResult (
138206 repo_name = repo .name ,
@@ -242,4 +310,4 @@ def _export_repo_bundle(
242310 scan = scan ,
243311 score = score ,
244312 review = review ,
245- )
313+ )
0 commit comments