Skip to content

Commit 377a740

Browse files
committed
fix(memory): bound RAM in recalculate_backtests and migrate_backtests; export migrate_backtests
- recalculate_backtests: workers now return only computed metrics+summary instead of the full Backtest (with snapshots/trades/timeseries), and in-flight tasks are bounded to n_workers so memory scales with workers, not len(backtests). - migrate_backtests: build index.parquet from rows returned by workers instead of re-loading every freshly written bundle into the parent process; bound in-flight tasks to n_workers (replaces ex.map which buffered the full plan in the executor feeder). - Export migrate_backtests from the package root. Bump version to v8.7.1.
1 parent b7f9ea4 commit 377a740

4 files changed

Lines changed: 138 additions & 65 deletions

File tree

investing_algorithm_framework/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
APPLICATION_DIRECTORY, DataSource, OrderExecutor, PortfolioProvider, \
2525
SnapshotInterval, AWS_S3_STATE_BUCKET_NAME, BacktestEvaluationFocus, \
2626
save_backtests_to_directory, BacktestMetrics, DATA_DIRECTORY, \
27-
retag_backtests, \
27+
retag_backtests, migrate_backtests, \
2828
Blotter, DefaultBlotter, SimulationBlotter, Transaction, \
2929
SlippageModel, NoSlippage, PercentageSlippage, FixedSlippage, \
3030
VolumeImpactSlippage, \
@@ -222,6 +222,7 @@
222222
"load_backtests_from_directory",
223223
"save_backtests_to_directory",
224224
"retag_backtests",
225+
"migrate_backtests",
225226
"DataError",
226227
"create_backtest_metrics_for_backtest",
227228
"recalculate_backtests",

investing_algorithm_framework/domain/backtesting/backtest_utils.py

Lines changed: 74 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import os
3-
from concurrent.futures import ProcessPoolExecutor, as_completed
3+
from concurrent.futures import ProcessPoolExecutor, as_completed, \
4+
wait, FIRST_COMPLETED
45
from logging import getLogger
56
from pathlib import Path
67
from random import Random
@@ -570,13 +571,14 @@ def load_backtests(
570571
def _migrate_one(args):
571572
"""Worker entry point: open *src* (legacy dir or bundle), write
572573
*dst* as a bundle, optionally delete *src*, return the
573-
destination path.
574+
destination path together with a flat index row.
574575
575576
Doing load+save (and optionally delete) in one worker call keeps
576577
each backtest's data in a single process — avoiding the cost of
577578
pickling fully-decoded Backtest objects back to the parent
578-
process. This roughly halves peak memory usage for large
579-
migrations and is faster end-to-end.
579+
process. We also build the index row here, while the backtest is
580+
still in memory, so the parent never has to re-open the migrated
581+
bundles just to build ``index.parquet``.
580582
"""
581583
src, dst, include_ohlcv, ohlcv_store, delete_source = args
582584
bt = _open_bundle(src) if is_bundle_file(src) else Backtest.open(src)
@@ -585,6 +587,11 @@ def _migrate_one(args):
585587
include_ohlcv=include_ohlcv,
586588
ohlcv_store=ohlcv_store,
587589
))
590+
rel = os.path.basename(out)
591+
row = _backtest_to_index_row(bt, bundle_path=rel)
592+
# Drop the heavy backtest before returning so the worker process's
593+
# RSS can be reclaimed before it picks up the next task.
594+
del bt
588595
if delete_source and os.path.abspath(src) != os.path.abspath(out):
589596
import shutil
590597
if os.path.isdir(src):
@@ -594,7 +601,7 @@ def _migrate_one(args):
594601
os.remove(src)
595602
except OSError:
596603
pass
597-
return out
604+
return out, row
598605

599606

600607
def migrate_backtests(
@@ -681,33 +688,68 @@ def migrate_backtests(
681688
n = len(plan)
682689
resolved_workers = min(_resolve_workers(workers), n)
683690

684-
iterator: object
685-
if resolved_workers > 1:
686-
with ProcessPoolExecutor(max_workers=resolved_workers) as ex:
687-
results = ex.map(_migrate_one, plan)
688-
iterator = tqdm(
689-
results,
690-
total=n,
691-
desc="Migrating backtests",
692-
disable=not show_progress,
693-
)
694-
for _ in iterator:
695-
pass
696-
else:
697-
for args in tqdm(
698-
plan,
699-
total=n,
700-
desc="Migrating backtests",
701-
disable=not show_progress,
702-
):
703-
_migrate_one(args)
704-
705-
if write_index:
706-
# Re-open the freshly written bundles (cheap header reads only
707-
# for the index) to build the parquet manifest.
708-
migrated = load_backtests_from_directory(
709-
dst_dir, workers=workers, show_progress=False,
691+
rows: List[dict] = []
692+
pbar = tqdm(
693+
total=n,
694+
desc="Migrating backtests",
695+
disable=not show_progress,
696+
)
697+
try:
698+
if resolved_workers > 1:
699+
# Bound in-flight tasks to ``resolved_workers`` so we don't
700+
# buffer the full plan inside the executor's feeder, and
701+
# consume results as they finish to keep memory flat.
702+
plan_iter = iter(plan)
703+
with ProcessPoolExecutor(max_workers=resolved_workers) as ex:
704+
inflight = {}
705+
for _ in range(resolved_workers):
706+
try:
707+
args = next(plan_iter)
708+
except StopIteration:
709+
break
710+
inflight[ex.submit(_migrate_one, args)] = args
711+
712+
while inflight:
713+
done_set, _unused = wait(
714+
inflight.keys(), return_when=FIRST_COMPLETED
715+
)
716+
for fut in done_set:
717+
args = inflight.pop(fut)
718+
try:
719+
_out, row = fut.result()
720+
if write_index:
721+
rows.append(row)
722+
except Exception as e:
723+
logger.error(
724+
f"Failed to migrate {args[0]}: {e}"
725+
)
726+
finally:
727+
pbar.update(1)
728+
try:
729+
nxt = next(plan_iter)
730+
except StopIteration:
731+
continue
732+
inflight[ex.submit(_migrate_one, nxt)] = nxt
733+
else:
734+
for args in plan:
735+
try:
736+
_out, row = _migrate_one(args)
737+
if write_index:
738+
rows.append(row)
739+
except Exception as e:
740+
logger.error(f"Failed to migrate {args[0]}: {e}")
741+
finally:
742+
pbar.update(1)
743+
finally:
744+
pbar.close()
745+
746+
if write_index and rows:
747+
import pandas as pd # local import keeps top of module light
748+
df = pd.DataFrame(rows)
749+
df.to_parquet(
750+
Path(dst_dir) / "index.parquet",
751+
index=False,
752+
compression="zstd",
710753
)
711-
_write_index(dst_dir, migrated)
712754

713755
return n

investing_algorithm_framework/services/metrics/generate.py

Lines changed: 61 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import List, Optional
22
from logging import getLogger
33
import os
4-
from concurrent.futures import ProcessPoolExecutor, as_completed
4+
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
55

66
from investing_algorithm_framework.domain import BacktestMetrics, \
77
BacktestRun, OperationalException, Backtest, BacktestDateRange
@@ -89,23 +89,30 @@ def create_backtest_metrics_for_backtest(
8989
def _recalculate_one(args):
9090
"""Process-pool worker for :func:`recalculate_backtests`.
9191
92-
Must be a module-level function so it pickles. Returns the mutated
93-
backtest so the parent process replaces its in-list reference.
92+
Must be a module-level function so it pickles. Returns only the
93+
freshly computed per-run metrics and summary so the parent can
94+
merge them into the existing backtest object without round-tripping
95+
the full snapshots/trades back through pickle.
9496
"""
9597
backtest, risk_free_rate, metrics = args
9698
rfr = risk_free_rate if risk_free_rate is not None \
9799
else (backtest.risk_free_rate or 0.0)
98100

99-
for run in backtest.get_all_backtest_runs():
100-
run.backtest_metrics = create_backtest_metrics(run, rfr, metrics)
101-
102-
all_metrics = [
103-
run.backtest_metrics
101+
run_metrics = [
102+
create_backtest_metrics(run, rfr, metrics)
104103
for run in backtest.get_all_backtest_runs()
105-
if run.backtest_metrics is not None
106104
]
107-
backtest.backtest_summary = generate_backtest_summary_metrics(all_metrics)
108-
return backtest
105+
summary = generate_backtest_summary_metrics(
106+
[m for m in run_metrics if m is not None]
107+
)
108+
return run_metrics, summary
109+
110+
111+
def _apply_recalc_result(backtest, run_metrics, summary):
112+
runs = backtest.get_all_backtest_runs()
113+
for run, bm in zip(runs, run_metrics):
114+
run.backtest_metrics = bm
115+
backtest.backtest_summary = summary
109116

110117

111118
def recalculate_backtests(
@@ -144,29 +151,52 @@ def recalculate_backtests(
144151

145152
if n_workers <= 1 or len(backtests) <= 1:
146153
for backtest in backtests:
147-
_recalculate_one((backtest, risk_free_rate, metrics))
154+
run_metrics, summary = _recalculate_one(
155+
(backtest, risk_free_rate, metrics)
156+
)
157+
_apply_recalc_result(backtest, run_metrics, summary)
148158
return backtests
149159

150-
# Parallel: each worker mutates and returns the backtest. Replace
151-
# the originals in-place so callers holding the list reference see
152-
# the updated objects.
153-
tasks = [(bt, risk_free_rate, metrics) for bt in backtests]
154-
index_by_id = {id(bt): i for i, bt in enumerate(backtests)}
160+
# Parallel: only keep ``n_workers`` tasks in flight at a time so we
161+
# don't pickle every backtest up-front (which can blow memory for
162+
# large batches with heavy snapshots/trades). Workers return only
163+
# the lightweight metrics + summary, which we merge back into the
164+
# caller's existing backtest objects.
165+
pending = iter(enumerate(backtests))
166+
inflight = {}
167+
168+
def _submit_next(executor):
169+
try:
170+
idx, bt = next(pending)
171+
except StopIteration:
172+
return False
173+
fut = executor.submit(
174+
_recalculate_one, (bt, risk_free_rate, metrics)
175+
)
176+
inflight[fut] = idx
177+
return True
178+
155179
with ProcessPoolExecutor(max_workers=n_workers) as ex:
156-
future_to_task = {
157-
ex.submit(_recalculate_one, t): t for t in tasks
158-
}
159-
for fut in as_completed(future_to_task):
160-
original_bt = future_to_task[fut][0]
161-
try:
162-
updated = fut.result()
163-
except Exception as e: # pragma: no cover - defensive
164-
logger.error(
165-
"Failed to recalculate backtest "
166-
f"{getattr(original_bt, 'algorithm_id', '?')}: {e}"
167-
)
168-
continue
169-
backtests[index_by_id[id(original_bt)]] = updated
180+
for _ in range(n_workers):
181+
if not _submit_next(ex):
182+
break
183+
184+
while inflight:
185+
done_set, _unused = wait(
186+
inflight.keys(), return_when=FIRST_COMPLETED
187+
)
188+
for fut in done_set:
189+
idx = inflight.pop(fut)
190+
bt = backtests[idx]
191+
try:
192+
run_metrics, summary = fut.result()
193+
_apply_recalc_result(bt, run_metrics, summary)
194+
except Exception as e: # pragma: no cover - defensive
195+
logger.error(
196+
"Failed to recalculate backtest "
197+
f"{getattr(bt, 'algorithm_id', '?')}: {e}"
198+
)
199+
_submit_next(ex)
170200

171201
return backtests
172202

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "investing-algorithm-framework"
3-
version = "v8.7.0"
3+
version = "v8.7.1"
44
description = "A framework for creating trading bots"
55
authors = ["MDUYN"]
66
readme = "README.md"

0 commit comments

Comments
 (0)