Skip to content

Commit 6ef9cc7

Browse files
committed
fix(memory): streaming on-disk APIs + worker recycling for backtest pipelines
The previous v8.7.1 bounded in-flight tasks but two underlying causes of OOM remained for users with thousands of large backtests: 1. `recalculate_backtests(List[Backtest])` requires the entire batch to be resident in the parent process before the call. With thousands of backtests holding portfolio snapshots, trades and timeseries, that alone is tens of GB. 2. ProcessPoolExecutor reuses worker processes across tasks. Heavy tasks (pandas/polars/numpy intermediates) leave behind cached buffers the worker's allocator never returns to the OS, so RSS grows over time even with bounded in-flight tasks. This release adds two new public functions: - `recalculate_backtests_in_directory(src_dir, dst_dir=None, ...)`: streams from disk. The Backtest never crosses the process boundary — workers load, recalc, save back, return only the destination path and a small index row. Parent memory stays flat regardless of batch size. - `iter_backtests_from_directory(...)`: generator yielding one Backtest at a time so callers can process and discard sequentially without materialising a List[Backtest]. Also adds worker recycling via `max_tasks_per_child=16` to: - `recalculate_backtests` (with cross-version fallback) - `load_backtests_from_directory` - `migrate_backtests` On Python 3.11+ this is the native ProcessPoolExecutor parameter; on 3.10 it is emulated by closing and re-opening the executor every N completions. Bump version to v8.7.2.
1 parent 377a740 commit 6ef9cc7

8 files changed

Lines changed: 483 additions & 74 deletions

File tree

investing_algorithm_framework/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
Position, TimeFrame, INDEX_DATETIME, MarketCredential, TakeProfitRule, \
1818
PortfolioConfiguration, RESOURCE_DIRECTORY, AWS_LAMBDA_LOGGING_CONFIG, \
1919
Trade, APP_MODE, AppMode, DATETIME_FORMAT, load_backtests_from_directory, \
20+
iter_backtests_from_directory, \
2021
BacktestDateRange, convert_polars_to_pandas, BacktestRun, \
2122
DEFAULT_LOGGING_CONFIG, DataType, DataProvider, StopLossRule, \
2223
ScalingRule, TradingCost, \
@@ -71,7 +72,8 @@
7172
get_current_average_trade_loss, get_negative_trades, \
7273
get_positive_trades, get_number_of_trades, get_current_win_rate, \
7374
get_current_win_loss_ratio, create_backtest_metrics_for_backtest, \
74-
recalculate_backtests, TradeTakeProfitService, TradeStopLossService, \
75+
recalculate_backtests, recalculate_backtests_in_directory, \
76+
TradeTakeProfitService, TradeStopLossService, \
7577
get_cv_consistency, get_normalized_stability, \
7678
get_consistency_score, get_stability_score
7779

@@ -220,12 +222,14 @@
220222
"get_number_of_trades",
221223
"BacktestRun",
222224
"load_backtests_from_directory",
225+
"iter_backtests_from_directory",
223226
"save_backtests_to_directory",
224227
"retag_backtests",
225228
"migrate_backtests",
226229
"DataError",
227230
"create_backtest_metrics_for_backtest",
228231
"recalculate_backtests",
232+
"recalculate_backtests_in_directory",
229233
"TakeProfitRule",
230234
"StopLossRule",
231235
"ScalingRule",

investing_algorithm_framework/domain/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
BacktestDateRange, Backtest, BacktestMetrics, combine_backtests, \
4545
BacktestPermutationTest, BacktestEvaluationFocus, \
4646
generate_backtest_summary_metrics, load_backtests_from_directory, \
47+
iter_backtests_from_directory, \
4748
save_backtests_to_directory, retag_backtests, migrate_backtests, \
4849
BacktestIndex, save_bundle, open_bundle, BUNDLE_EXT, \
4950
BUNDLE_FORMAT_VERSION
@@ -174,6 +175,7 @@
174175
'ScalingRule',
175176
'TradingCost',
176177
"load_backtests_from_directory",
178+
"iter_backtests_from_directory",
177179
"save_backtests_to_directory",
178180
"retag_backtests",
179181
"migrate_backtests",

investing_algorithm_framework/domain/backtesting/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
generate_backtest_summary_metrics
1010
from .backtest_utils import (
1111
load_backtests_from_directory,
12+
iter_backtests_from_directory,
1213
save_backtests_to_directory,
1314
retag_backtests,
1415
migrate_backtests,
@@ -33,6 +34,7 @@
3334
"combine_backtests",
3435
"generate_backtest_summary_metrics",
3536
"load_backtests_from_directory",
37+
"iter_backtests_from_directory",
3638
"save_backtests_to_directory",
3739
"retag_backtests",
3840
"migrate_backtests",

investing_algorithm_framework/domain/backtesting/backtest_utils.py

Lines changed: 128 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -410,22 +410,47 @@ def load_backtests_from_directory(
410410
else:
411411
# Process pool only handles bundle loads efficiently; loading
412412
# legacy directories from workers also works but the win is
413-
# smaller. We still parallelise both for simplicity.
414-
with ProcessPoolExecutor(max_workers=n_workers) as ex:
415-
futures = {
416-
ex.submit(_load_one_dispatch, it): it for it in sources
417-
}
418-
for fut in as_completed(futures):
419-
src = futures[fut]
413+
# smaller. We still parallelise both for simplicity. Submission
414+
# is bounded to ``n_workers`` in flight so the executor's
415+
# feeder thread does not buffer the entire source list, and
416+
# workers are recycled (Python 3.11+) so RSS stays bounded.
417+
import sys
418+
pool_kwargs = {"max_workers": n_workers}
419+
if sys.version_info >= (3, 11):
420+
pool_kwargs["max_tasks_per_child"] = 16
421+
422+
sources_iter = iter(sources)
423+
inflight = {}
424+
425+
with ProcessPoolExecutor(**pool_kwargs) as ex:
426+
def _submit_next():
420427
try:
421-
backtests.append(fut.result())
422-
except Exception as e:
423-
logger.error(
424-
f"Failed to load backtest from {src[0]}: {e}"
425-
)
426-
finally:
427-
if pbar is not None:
428-
pbar.update(1)
428+
it = next(sources_iter)
429+
except StopIteration:
430+
return False
431+
inflight[ex.submit(_load_one_dispatch, it)] = it
432+
return True
433+
434+
for _ in range(n_workers):
435+
if not _submit_next():
436+
break
437+
438+
while inflight:
439+
done_set, _unused = wait(
440+
inflight.keys(), return_when=FIRST_COMPLETED
441+
)
442+
for fut in done_set:
443+
src = inflight.pop(fut)
444+
try:
445+
backtests.append(fut.result())
446+
except Exception as e:
447+
logger.error(
448+
f"Failed to load backtest from {src[0]}: {e}"
449+
)
450+
finally:
451+
if pbar is not None:
452+
pbar.update(1)
453+
_submit_next()
429454

430455
if pbar is not None:
431456
pbar.close()
@@ -439,6 +464,87 @@ def load_backtests_from_directory(
439464
return backtests
440465

441466

467+
def iter_backtests_from_directory(
468+
directory_path: Union[str, Path],
469+
filter_function: Callable[[Backtest], bool] = None,
470+
number_of_backtests_to_load: int = None,
471+
show_progress: bool = False,
472+
):
473+
"""Yield :class:`Backtest` objects one at a time from *directory_path*.
474+
475+
Memory-stable alternative to :func:`load_backtests_from_directory`:
476+
each backtest is loaded only when the caller asks for the next
477+
item, so peak memory equals one backtest at a time. Use this when
478+
a directory contains so many large backtests that the full list
479+
would not fit in RAM.
480+
481+
Example:
482+
>>> for bt in iter_backtests_from_directory(path):
483+
... process(bt)
484+
... # bt is dropped here; its memory is reclaimed before
485+
... # the next one is read from disk.
486+
487+
Args:
488+
directory_path: Source directory.
489+
filter_function: Optional predicate; backtests for which it
490+
returns False are skipped.
491+
number_of_backtests_to_load: Optional cap on the number of
492+
backtests yielded.
493+
show_progress: Display a tqdm progress bar.
494+
"""
495+
directory_path = str(directory_path)
496+
if not os.path.exists(directory_path):
497+
logger.warning(
498+
f"Directory {directory_path} does not exist. "
499+
"No backtests loaded."
500+
)
501+
return
502+
503+
sources: List[tuple] = []
504+
for entry in sorted(os.listdir(directory_path)):
505+
if entry == "checkpoints.json" or entry.endswith(".py"):
506+
continue
507+
if entry == "index.parquet" or entry == "ohlcv":
508+
continue
509+
full = os.path.join(directory_path, entry)
510+
if os.path.isfile(full):
511+
if entry.endswith(BUNDLE_EXT) or is_bundle_file(full):
512+
sources.append((full, "bundle"))
513+
elif os.path.isdir(full):
514+
sources.append((full, "directory"))
515+
516+
if number_of_backtests_to_load is not None:
517+
sources = sources[: max(0, int(number_of_backtests_to_load))]
518+
519+
pbar = tqdm(
520+
total=len(sources),
521+
desc="Loading backtests",
522+
disable=not show_progress,
523+
)
524+
try:
525+
for item in sources:
526+
try:
527+
bt = _load_one_dispatch(item)
528+
except Exception as e:
529+
logger.error(
530+
f"Failed to load backtest from {item[0]}: {e}"
531+
)
532+
pbar.update(1)
533+
continue
534+
try:
535+
if filter_function is not None and not filter_function(bt):
536+
pbar.update(1)
537+
continue
538+
except Exception as fe: # pragma: no cover - defensive
539+
logger.error(f"Error in filter_function: {fe}")
540+
pbar.update(1)
541+
continue
542+
yield bt
543+
pbar.update(1)
544+
finally:
545+
pbar.close()
546+
547+
442548
# ---------------------------------------------------------------------------
443549
# Index helpers (#487 step 4)
444550
# ---------------------------------------------------------------------------
@@ -699,8 +805,14 @@ def migrate_backtests(
699805
# Bound in-flight tasks to ``resolved_workers`` so we don't
700806
# buffer the full plan inside the executor's feeder, and
701807
# consume results as they finish to keep memory flat.
808+
# Recycle workers (3.11+) so RSS stays bounded across long
809+
# migrations.
810+
import sys
811+
pool_kwargs = {"max_workers": resolved_workers}
812+
if sys.version_info >= (3, 11):
813+
pool_kwargs["max_tasks_per_child"] = 16
702814
plan_iter = iter(plan)
703-
with ProcessPoolExecutor(max_workers=resolved_workers) as ex:
815+
with ProcessPoolExecutor(**pool_kwargs) as ex:
704816
inflight = {}
705817
for _ in range(resolved_workers):
706818
try:

investing_algorithm_framework/services/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
get_current_win_rate, get_current_average_trade_return, \
3939
get_current_average_trade_loss, get_current_average_trade_duration, \
4040
get_current_average_trade_gain, create_backtest_metrics_for_backtest, \
41-
recalculate_backtests, get_cv_consistency, get_normalized_stability, \
41+
recalculate_backtests, recalculate_backtests_in_directory, \
42+
get_cv_consistency, get_normalized_stability, \
4243
get_consistency_score, get_stability_score
4344

4445
__all__ = [
@@ -133,6 +134,7 @@
133134
"get_current_average_trade_return",
134135
"create_backtest_metrics_for_backtest",
135136
"recalculate_backtests",
137+
"recalculate_backtests_in_directory",
136138
"TradeStopLossService",
137139
"TradeTakeProfitService",
138140
"get_mean_yearly_return",

investing_algorithm_framework/services/metrics/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
get_current_win_loss_ratio
2929
from .calmar_ratio import get_calmar_ratio
3030
from .generate import create_backtest_metrics, \
31-
create_backtest_metrics_for_backtest, recalculate_backtests
31+
create_backtest_metrics_for_backtest, recalculate_backtests, \
32+
recalculate_backtests_in_directory
3233
from .risk_free_rate import get_risk_free_rate_us
3334
from .trades import get_negative_trades, get_positive_trades, \
3435
get_number_of_trades, get_number_of_closed_trades, \

0 commit comments

Comments
 (0)