Skip to content

Commit 01c8154

Browse files
committed
feat(backtesting): streaming migrate_backtests with resume support
migrate_backtests() now fuses load+save inside each worker process instead of loading every backtest into the parent first. Memory usage stays roughly constant regardless of source size. New options: - include_ohlcv: include OHLCV data in destination bundles - skip_existing (default True): skip backtests whose destination bundle already exists, making interrupted migrations resumable. The CLI 'iaf migrate-backtests' exposes both flags. Documented the function in Getting Started/backtesting.md.
1 parent b84cce7 commit 01c8154

3 files changed

Lines changed: 168 additions & 14 deletions

File tree

docusaurus/docs/Getting Started/backtesting.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,39 @@ backtests = load_backtests_from_directory("./my_backtests")
8989
Backtests are persisted in the framework's optimized **`.iafbt` bundle format** — a single binary file per backtest using zstd compression + MessagePack encoding. Compared to the legacy directory layout it is ~21× smaller, ~27× fewer files, and ~3× faster to load. Both `save_backtests_to_directory` and `load_backtests_from_directory` support parallel I/O via `workers=N`. Existing legacy directories keep working transparently; use `iaf migrate-backtests --src ... --dst ...` to convert them.
9090
:::
9191

92+
### Migrating Existing Backtests
93+
94+
Convert a directory of legacy backtest folders to the new bundle
95+
format with `migrate_backtests`. The migration is **streamed**
96+
(load+save fused per worker), so memory usage stays roughly
97+
constant regardless of how many backtests you migrate, and an
98+
interrupted run can simply be re-invoked to resume — destination
99+
bundles that already exist are skipped by default.
100+
101+
```python
102+
from investing_algorithm_framework import migrate_backtests
103+
104+
n = migrate_backtests(
105+
src_dir="./old_backtests", # legacy folders and/or .iafbt
106+
dst_dir="./bundled_backtests", # destination
107+
workers=8, # parallel; None = min(8, cpu)
108+
show_progress=True,
109+
write_index=True, # also build index.parquet
110+
include_ohlcv=False,
111+
skip_existing=True, # resume-safe (default)
112+
)
113+
print(f"migrated {n} backtests")
114+
```
115+
116+
Or from the CLI:
117+
118+
```bash
119+
iaf migrate-backtests \
120+
--src ./old_backtests \
121+
--dst ./bundled_backtests \
122+
--workers 8
123+
```
124+
92125
## Reporting
93126

94127
Use [Backtest Reports](/docs/Getting%20Started/backtest-reports) to turn

investing_algorithm_framework/cli/cli.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,13 +273,28 @@ def mcp(directory):
273273
"--no-index", is_flag=True, default=False,
274274
help="Skip writing index.parquet at the destination.",
275275
)
276-
def migrate_backtests_cmd(src, dst, workers, no_index):
276+
@click.option(
277+
"--include-ohlcv", is_flag=True, default=False,
278+
help="Include OHLCV data in the destination bundles.",
279+
)
280+
@click.option(
281+
"--no-skip-existing", is_flag=True, default=False,
282+
help="Re-migrate even if the destination bundle already exists.",
283+
)
284+
def migrate_backtests_cmd(
285+
src, dst, workers, no_index, include_ohlcv, no_skip_existing
286+
):
277287
"""Convert a directory of legacy backtest folders into the bundled
278288
binary format introduced in issue #487.
279289
280290
The new ``.iafbt`` format is a single zstd-compressed MessagePack
281291
file per backtest. Loading bundled directories is dramatically
282292
faster than the legacy multi-file layout for large batches.
293+
294+
Migration is streamed (load+save fused per worker) so memory
295+
usage stays roughly constant regardless of source size, and
296+
interrupted runs can be resumed (existing destination bundles
297+
are skipped by default).
283298
"""
284299
from investing_algorithm_framework.domain import migrate_backtests
285300

@@ -289,6 +304,8 @@ def migrate_backtests_cmd(src, dst, workers, no_index):
289304
workers=workers,
290305
show_progress=True,
291306
write_index=not no_index,
307+
include_ohlcv=include_ohlcv,
308+
skip_existing=not no_skip_existing,
292309
)
293310
click.echo(f"Migrated {n} backtest(s) from {src} to {dst}")
294311

investing_algorithm_framework/domain/backtesting/backtest_utils.py

Lines changed: 117 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -565,26 +565,130 @@ def load_backtests(
565565
# ---------------------------------------------------------------------------
566566

567567

568+
def _migrate_one(args):
569+
"""Worker entry point: open *src* (legacy dir or bundle), write
570+
*dst* as a bundle, return the destination path.
571+
572+
Doing load+save in one worker call keeps each backtest's data in
573+
a single process — avoiding the cost of pickling fully-decoded
574+
Backtest objects back to the parent process. This roughly halves
575+
peak memory usage for large migrations and is faster end-to-end.
576+
"""
577+
src, dst, include_ohlcv, ohlcv_store = args
578+
bt = _open_bundle(src) if is_bundle_file(src) else Backtest.open(src)
579+
return str(_save_bundle(
580+
bt, dst,
581+
include_ohlcv=include_ohlcv,
582+
ohlcv_store=ohlcv_store,
583+
))
584+
585+
568586
def migrate_backtests(
569587
src_dir: Union[str, Path],
570588
dst_dir: Union[str, Path],
571589
*,
572590
workers: Optional[int] = None,
573591
show_progress: bool = False,
574592
write_index: bool = True,
593+
include_ohlcv: bool = False,
594+
skip_existing: bool = True,
575595
) -> int:
576-
"""Rewrite a directory of legacy backtest folders as ``.iafbt``
577-
bundles in *dst_dir*. Returns the number of backtests migrated.
596+
"""Rewrite a directory of legacy backtest folders (or existing
597+
``.iafbt`` bundles) as ``.iafbt`` bundles in *dst_dir*.
598+
599+
The migration is streamed: each backtest is loaded and re-saved
600+
inside a single worker process, so memory usage stays roughly
601+
constant regardless of how many backtests are being migrated.
602+
603+
Args:
604+
src_dir: Source directory containing legacy backtest folders
605+
and/or ``.iafbt`` bundles. Walked recursively.
606+
dst_dir: Destination directory. Created if it does not exist.
607+
workers: Number of parallel workers. ``None`` picks
608+
``min(8, cpu_count)``. Pass ``1`` to force serial.
609+
show_progress: Display a progress bar.
610+
write_index: Write a sibling ``index.parquet`` summarising the
611+
destination bundles for fast filtering with
612+
:class:`BacktestIndex`.
613+
include_ohlcv: Include OHLCV data in the destination bundles.
614+
skip_existing: Skip backtests whose destination bundle already
615+
exists. Allows resuming an interrupted migration.
616+
617+
Returns:
618+
Number of backtests migrated (excluding skipped ones).
578619
"""
579-
backtests = load_backtests_from_directory(
580-
src_dir, workers=workers, show_progress=show_progress,
620+
src_dir = Path(src_dir)
621+
dst_dir = Path(dst_dir)
622+
dst_dir.mkdir(parents=True, exist_ok=True)
623+
624+
# Discover sources: any *.iafbt file or any directory shaped like
625+
# a legacy backtest (algorithm_id.json + runs/).
626+
sources: List[str] = []
627+
for root, dirs, files in os.walk(src_dir):
628+
for fname in files:
629+
if fname.endswith(BUNDLE_EXT):
630+
sources.append(os.path.join(root, fname))
631+
for dname in list(dirs):
632+
d = os.path.join(root, dname)
633+
if (
634+
os.path.isfile(os.path.join(d, "algorithm_id.json"))
635+
and os.path.isdir(os.path.join(d, "runs"))
636+
):
637+
sources.append(d)
638+
# Don't descend into a recognised backtest dir.
639+
dirs.remove(dname)
640+
641+
if not sources:
642+
return 0
643+
644+
# Build (src, dst) pairs, optionally skipping ones that already
645+
# exist in dst_dir.
646+
ohlcv_store = (
647+
str(dst_dir / "ohlcv") if include_ohlcv else None
581648
)
582-
save_backtests_to_directory(
583-
backtests,
584-
dst_dir,
585-
format=FORMAT_BUNDLE,
586-
workers=workers,
587-
show_progress=show_progress,
588-
write_index=write_index,
589-
)
590-
return len(backtests)
649+
plan = []
650+
for src in sources:
651+
base = os.path.basename(os.path.normpath(src))
652+
if base.endswith(BUNDLE_EXT):
653+
base = base[: -len(BUNDLE_EXT)]
654+
dst = str(dst_dir / f"{base}{BUNDLE_EXT}")
655+
if skip_existing and os.path.isfile(dst):
656+
continue
657+
plan.append((src, dst, include_ohlcv, ohlcv_store))
658+
659+
if not plan:
660+
return 0
661+
662+
n = len(plan)
663+
resolved_workers = min(_resolve_workers(workers), n)
664+
665+
iterator: object
666+
if resolved_workers > 1:
667+
with ProcessPoolExecutor(max_workers=resolved_workers) as ex:
668+
results = ex.map(_migrate_one, plan)
669+
iterator = tqdm(
670+
results,
671+
total=n,
672+
desc="Migrating backtests",
673+
disable=not show_progress,
674+
)
675+
for _ in iterator:
676+
pass
677+
else:
678+
for args in tqdm(
679+
plan,
680+
total=n,
681+
desc="Migrating backtests",
682+
disable=not show_progress,
683+
):
684+
_migrate_one(args)
685+
686+
if write_index:
687+
# Re-open the freshly written bundles (cheap header reads only
688+
# for the index) to build the parquet manifest.
689+
migrated = load_backtests_from_directory(
690+
dst_dir, workers=workers, show_progress=False,
691+
)
692+
_write_index(dst_dir, migrated)
693+
694+
return n

0 commit comments

Comments
 (0)