Skip to content

Commit a6f4bff

Browse files
committed
feat: add per-strategy progress tracking for parallel backtests
Replace batch-level progress bar with strategy-level progress tracking when using parallel workers (n_workers). Workers now increment a shared counter after each individual strategy completes, and a monitoring thread updates a tqdm progress bar in real time (every 500ms). Changes: - Use multiprocessing.Manager().Value() as shared counter across workers - Add monitoring thread to poll counter and update tqdm bar - Show per-strategy throughput (strategies/s) and ETA instead of batch-level progress - Move multiprocessing, threading, concurrent.futures imports to top level - Order all imports per PEP 8 (stdlib, third-party, local) - Remove redundant inline combine_backtests import
1 parent 2b11f11 commit a6f4bff

2 files changed

Lines changed: 89 additions & 33 deletions

File tree

investing_algorithm_framework/infrastructure/services/backtesting/backtest_service.py

Lines changed: 89 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
import gc
22
import json
33
import logging
4+
import multiprocessing
45
import os
5-
import numpy as np
6-
import pandas as pd
7-
import polars as pl
6+
import threading
87
from collections import defaultdict
8+
from concurrent.futures import ProcessPoolExecutor, as_completed
99
from datetime import datetime, timedelta, timezone
1010
from pathlib import Path
11-
from typing import Dict, List, Union, Optional, Callable
11+
from typing import Callable, Dict, List, Optional, Union
12+
13+
import numpy as np
14+
import pandas as pd
15+
import polars as pl
1216

1317
from investing_algorithm_framework.domain import BacktestRun, TimeUnit, \
1418
OperationalException, BacktestDateRange, Backtest, combine_backtests, \
@@ -902,10 +906,6 @@ def run_vector_backtests(
902906

903907
if use_parallel:
904908
# Parallel processing of backtests (batches per worker)
905-
import multiprocessing
906-
from concurrent.futures import \
907-
ProcessPoolExecutor, as_completed
908-
909909
# Determine number of workers
910910
if n_workers == -1:
911911
n_workers = multiprocessing.cpu_count()
@@ -933,6 +933,12 @@ def run_vector_backtests(
933933
show_progress
934934
)
935935

936+
# Shared counter for strategy-level progress
937+
# across all workers. Use Manager so the proxy
938+
# object can be pickled by ProcessPoolExecutor.
939+
manager = multiprocessing.Manager()
940+
progress_counter = manager.Value('i', 0)
941+
936942
worker_args = []
937943

938944
for batch in strategy_batches:
@@ -945,9 +951,34 @@ def run_vector_backtests(
945951
continue_on_error,
946952
self._data_provider_service.copy(),
947953
False,
948-
dynamic_position_sizing
954+
dynamic_position_sizing,
955+
progress_counter,
949956
))
950957

958+
# Start a monitoring thread that updates a
959+
# strategy-level progress bar in real time
960+
total_strategies = len(strategies_to_run)
961+
pbar = tqdm(
962+
total=total_strategies,
963+
colour="green",
964+
desc="Running backtests for "
965+
f"{start_date} to {end_date}",
966+
disable=not show_progress,
967+
unit="strategy",
968+
)
969+
stop_event = threading.Event()
970+
971+
def _monitor_progress():
972+
while not stop_event.is_set():
973+
pbar.n = progress_counter.value
974+
pbar.refresh()
975+
stop_event.wait(0.5)
976+
977+
monitor = threading.Thread(
978+
target=_monitor_progress, daemon=True
979+
)
980+
monitor.start()
981+
951982
# Execute batches in parallel
952983
with ProcessPoolExecutor(max_workers=n_workers) as ex:
953984
# Submit all batch tasks
@@ -961,15 +992,8 @@ def run_vector_backtests(
961992
# Track completed batches for periodic cleanup
962993
completed_count = 0
963994

964-
# Collect results with progress bar
965-
for future in tqdm(
966-
as_completed(futures),
967-
total=len(futures),
968-
colour="green",
969-
desc="Running backtests for "
970-
f"{start_date} to {end_date}",
971-
disable=not show_progress
972-
):
995+
# Collect results as batches complete
996+
for future in as_completed(futures):
973997
try:
974998
batch_result = future.result()
975999
if batch_result:
@@ -1006,6 +1030,15 @@ def run_vector_backtests(
10061030
else:
10071031
raise
10081032

1033+
# Stop the monitoring thread and finalise
1034+
# the progress bar
1035+
stop_event.set()
1036+
monitor.join()
1037+
pbar.n = progress_counter.value
1038+
pbar.refresh()
1039+
pbar.close()
1040+
manager.shutdown()
1041+
10091042
# Save remaining batch and create checkpoint files when
10101043
# storage directory provided
10111044
if backtest_storage_directory is not None:
@@ -1309,8 +1342,6 @@ def run_vector_backtests(
13091342
combined_backtests.append(backtests_list[0])
13101343
else:
13111344
# Combine multiple backtests for the same algorithm
1312-
from investing_algorithm_framework.domain import (
1313-
combine_backtests)
13141345
combined = combine_backtests(backtests_list)
13151346
combined_backtests.append(combined)
13161347

@@ -1709,23 +1740,40 @@ def _run_batch_backtest_worker(args):
17091740
continue_on_error,
17101741
data_provider_service,
17111742
show_progress,
1712-
dynamic_position_sizing
1743+
dynamic_position_sizing,
1744+
progress_counter (optional),
17131745
)
17141746
17151747
Returns:
17161748
List[Backtest]: List of completed backtest results
17171749
"""
1718-
(
1719-
strategy_batch,
1720-
backtest_date_range,
1721-
portfolio_configuration,
1722-
snapshot_interval,
1723-
risk_free_rate,
1724-
continue_on_error,
1725-
data_provider_service,
1726-
show_progress,
1727-
dynamic_position_sizing
1728-
) = args
1750+
# Support both old (9-element) and new (10-element) tuple
1751+
if len(args) == 10:
1752+
(
1753+
strategy_batch,
1754+
backtest_date_range,
1755+
portfolio_configuration,
1756+
snapshot_interval,
1757+
risk_free_rate,
1758+
continue_on_error,
1759+
data_provider_service,
1760+
show_progress,
1761+
dynamic_position_sizing,
1762+
progress_counter,
1763+
) = args
1764+
else:
1765+
(
1766+
strategy_batch,
1767+
backtest_date_range,
1768+
portfolio_configuration,
1769+
snapshot_interval,
1770+
risk_free_rate,
1771+
continue_on_error,
1772+
data_provider_service,
1773+
show_progress,
1774+
dynamic_position_sizing,
1775+
) = args
1776+
progress_counter = None
17291777

17301778
vector_backtest_service = VectorBacktestService(
17311779
data_provider_service=data_provider_service
@@ -1768,12 +1816,21 @@ def _run_batch_backtest_worker(args):
17681816
)
17691817
batch_results.append(backtest)
17701818

1819+
# Increment shared progress counter so the
1820+
# main process can track per-strategy progress
1821+
if progress_counter is not None:
1822+
progress_counter.value += 1
1823+
17711824
except Exception as e:
17721825
if continue_on_error:
17731826
logger.error(
17741827
"Worker error for strategy "
17751828
f"{strategy.algorithm_id}: {e}"
17761829
)
1830+
# Still increment counter for failed strategies
1831+
# so progress total stays accurate
1832+
if progress_counter is not None:
1833+
progress_counter.value += 1
17771834
continue
17781835
else:
17791836
raise

tests/notebook/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +0,0 @@
1-

0 commit comments

Comments
 (0)