Skip to content

Commit 43a2906

Browse files
committed
perf: reduce parallel backtest overhead on Windows/WSL
Pass data_provider_service via ProcessPoolExecutor initializer instead of pickling it per task submission. On spawn-based systems (Windows/WSL) this avoids serializing the full data provider (with loaded dataframes) for every batch — now each worker pickles it only once at startup. - Add _init_worker() initializer and _worker_data_provider_service global - Copy data_provider_service once before pool starts - Worker falls back to module-level global when args value is None - No behavior change on fork-based systems (macOS/Linux)
1 parent a6f4bff commit 43a2906

1 file changed

Lines changed: 38 additions & 3 deletions

File tree

investing_algorithm_framework/infrastructure/services/backtesting/backtest_service.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,21 @@
3131

3232
logger = logging.getLogger(__name__)
3333

34+
# Module-level global used by worker processes. Set via _init_worker
35+
# which is called once per worker by ProcessPoolExecutor's initializer.
36+
_worker_data_provider_service = None
37+
38+
39+
def _init_worker(data_provider_service):
40+
"""Initializer for ProcessPoolExecutor workers.
41+
42+
Stores the data_provider_service in a module-level global so each
43+
worker pickles/unpickles it only once at startup rather than per task.
44+
This dramatically reduces overhead on Windows/WSL (spawn start method).
45+
"""
46+
global _worker_data_provider_service
47+
_worker_data_provider_service = data_provider_service
48+
3449

3550
def _print_progress(message: str, show_progress: bool = False):
3651
"""
@@ -939,6 +954,13 @@ def run_vector_backtests(
939954
manager = multiprocessing.Manager()
940955
progress_counter = manager.Value('i', 0)
941956

957+
# Copy data provider once and pass via initializer
958+
# so each worker inherits it at startup instead of
959+
# pickling it per task (major speedup on Windows/WSL
960+
# where spawn is used instead of fork).
961+
shared_data_provider = \
962+
self._data_provider_service.copy()
963+
942964
worker_args = []
943965

944966
for batch in strategy_batches:
@@ -949,7 +971,7 @@ def run_vector_backtests(
949971
snapshot_interval,
950972
risk_free_rate,
951973
continue_on_error,
952-
self._data_provider_service.copy(),
974+
None, # placeholder, worker reads global
953975
False,
954976
dynamic_position_sizing,
955977
progress_counter,
@@ -979,8 +1001,15 @@ def _monitor_progress():
9791001
)
9801002
monitor.start()
9811003

982-
# Execute batches in parallel
983-
with ProcessPoolExecutor(max_workers=n_workers) as ex:
1004+
# Execute batches in parallel.
1005+
# Use initializer to pass data_provider_service
1006+
# once per worker process rather than pickling it
1007+
# with every submitted task.
1008+
with ProcessPoolExecutor(
1009+
max_workers=n_workers,
1010+
initializer=_init_worker,
1011+
initargs=(shared_data_provider,),
1012+
) as ex:
9841013
# Submit all batch tasks
9851014
futures = [
9861015
ex.submit(
@@ -1775,6 +1804,12 @@ def _run_batch_backtest_worker(args):
17751804
) = args
17761805
progress_counter = None
17771806

1807+
# Use the worker-global data provider if none was passed
1808+
# directly (parallel mode passes None and relies on the
1809+
# initializer to set the global once per worker process).
1810+
if data_provider_service is None:
1811+
data_provider_service = _worker_data_provider_service
1812+
17781813
vector_backtest_service = VectorBacktestService(
17791814
data_provider_service=data_provider_service
17801815
)

0 commit comments

Comments
 (0)