Skip to content

Commit 6fa9417

Browse files
MaxGhenisclaude
andcommitted
Persist scale-up results incrementally as JSONL
Previous harness wrote all results atomically at the end of the run. If ZI-QDNN crashed after ZI-QRF and ZI-MAF had completed, their numbers were lost. Now ScaleUpRunner.run() takes an optional incremental_path and appends each ScaleUpResult as a JSONL line immediately after it completes. The final atomic JSON is still written at the end as before; the JSONL is supplementary and survives mid-run kills. CLI adds --incremental-jsonl; defaults to <output>.partial.jsonl so the feature is on by default. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1576d06 commit 6fa9417

1 file changed

Lines changed: 79 additions & 39 deletions

File tree

src/microplex_us/bakeoff/scale_up.py

Lines changed: 79 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -514,8 +514,17 @@ def fit_and_generate(
514514
"peak_rss_gb_during_fit": peak_fit_rss,
515515
}
516516

517-
def run(self) -> list[ScaleUpResult]:
518-
"""Run every configured method on the loaded frame; return results."""
517+
def run(
518+
self,
519+
incremental_path: Path | None = None,
520+
) -> list[ScaleUpResult]:
521+
"""Run every configured method on the loaded frame; return results.
522+
523+
If `incremental_path` is given, each method's `ScaleUpResult` is
524+
appended to that path as JSONL *as soon as it completes*. This
525+
guarantees at least partial output if a later method crashes or
526+
the host is interrupted.
527+
"""
519528
df = self.load_frame()
520529
train, holdout = self.split(df)
521530
n_generate = self.config.n_generate or len(train)
@@ -526,6 +535,11 @@ def run(self) -> list[ScaleUpResult]:
526535
n_generate,
527536
)
528537

538+
if incremental_path is not None:
539+
incremental_path.parent.mkdir(parents=True, exist_ok=True)
540+
# Truncate any prior JSONL so this run's output is self-contained.
541+
incremental_path.write_text("")
542+
529543
results: list[ScaleUpResult] = []
530544
for method_name in self.config.methods:
531545
self.logger.info("== fitting %s ==", method_name)
@@ -535,25 +549,25 @@ def run(self) -> list[ScaleUpResult]:
535549
)
536550
except Exception as exc: # pragma: no cover
537551
self.logger.error("method %s failed: %s", method_name, exc)
538-
results.append(
539-
ScaleUpResult(
540-
stage=self.config.stage,
541-
method=method_name,
542-
seed=self.config.seed,
543-
n_train_rows=len(train),
544-
n_holdout_rows=len(holdout),
545-
n_cols=len(df.columns),
546-
fit_wall_seconds=0.0,
547-
generate_wall_seconds=0.0,
548-
peak_rss_gb_during_fit=0.0,
549-
precision=0.0,
550-
density=0.0,
551-
coverage=0.0,
552-
rare_cell_ratios={},
553-
zero_rate_mae=0.0,
554-
notes=f"FAILED: {type(exc).__name__}: {exc}",
555-
)
552+
result = ScaleUpResult(
553+
stage=self.config.stage,
554+
method=method_name,
555+
seed=self.config.seed,
556+
n_train_rows=len(train),
557+
n_holdout_rows=len(holdout),
558+
n_cols=len(df.columns),
559+
fit_wall_seconds=0.0,
560+
generate_wall_seconds=0.0,
561+
peak_rss_gb_during_fit=0.0,
562+
precision=0.0,
563+
density=0.0,
564+
coverage=0.0,
565+
rare_cell_ratios={},
566+
zero_rate_mae=0.0,
567+
notes=f"FAILED: {type(exc).__name__}: {exc}",
556568
)
569+
results.append(result)
570+
self._persist_incremental(incremental_path, result)
557571
continue
558572

559573
precision, density, coverage = _compute_prdc(
@@ -564,25 +578,25 @@ def run(self) -> list[ScaleUpResult]:
564578
)
565579
zero_mae = _compute_zero_rate_mae(holdout, synthetic)
566580

567-
results.append(
568-
ScaleUpResult(
569-
stage=self.config.stage,
570-
method=method_name,
571-
seed=self.config.seed,
572-
n_train_rows=len(train),
573-
n_holdout_rows=len(holdout),
574-
n_cols=len(df.columns),
575-
fit_wall_seconds=timing["fit_wall_seconds"],
576-
generate_wall_seconds=timing["generate_wall_seconds"],
577-
peak_rss_gb_during_fit=timing["peak_rss_gb_during_fit"],
578-
precision=precision,
579-
density=density,
580-
coverage=coverage,
581-
rare_cell_ratios=rare,
582-
zero_rate_mae=zero_mae,
583-
notes="",
584-
)
581+
result = ScaleUpResult(
582+
stage=self.config.stage,
583+
method=method_name,
584+
seed=self.config.seed,
585+
n_train_rows=len(train),
586+
n_holdout_rows=len(holdout),
587+
n_cols=len(df.columns),
588+
fit_wall_seconds=timing["fit_wall_seconds"],
589+
generate_wall_seconds=timing["generate_wall_seconds"],
590+
peak_rss_gb_during_fit=timing["peak_rss_gb_during_fit"],
591+
precision=precision,
592+
density=density,
593+
coverage=coverage,
594+
rare_cell_ratios=rare,
595+
zero_rate_mae=zero_mae,
596+
notes="",
585597
)
598+
results.append(result)
599+
self._persist_incremental(incremental_path, result)
586600
self.logger.info(
587601
" %s: coverage=%.3f precision=%.3f density=%.3f fit=%.1fs gen=%.1fs peak_rss=%.2fGB",
588602
method_name,
@@ -595,6 +609,17 @@ def run(self) -> list[ScaleUpResult]:
595609
)
596610
return results
597611

612+
@staticmethod
613+
def _persist_incremental(
614+
path: Path | None, result: ScaleUpResult
615+
) -> None:
616+
"""Append one `ScaleUpResult` as a JSONL row (if path is set)."""
617+
if path is None:
618+
return
619+
with path.open("a") as f:
620+
f.write(json.dumps(result.to_dict(), default=str))
621+
f.write("\n")
622+
598623

599624
def _results_to_dataframe(results: list[ScaleUpResult]) -> pd.DataFrame:
600625
rows: list[dict[str, Any]] = []
@@ -630,8 +655,23 @@ def main(argv: list[str] | None = None) -> int:
630655
default="INFO",
631656
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
632657
)
658+
parser.add_argument(
659+
"--incremental-jsonl",
660+
type=Path,
661+
default=None,
662+
help=(
663+
"Optional path to a JSONL file where each method's result is "
664+
"appended as soon as it completes. Defaults to the final "
665+
"--output path with '.partial.jsonl' appended."
666+
),
667+
)
633668
args = parser.parse_args(argv)
634669

670+
if args.incremental_jsonl is None:
671+
args.incremental_jsonl = args.output.with_suffix(
672+
args.output.suffix + ".partial.jsonl"
673+
)
674+
635675
logging.basicConfig(
636676
level=getattr(logging, args.log_level),
637677
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
@@ -655,7 +695,7 @@ def main(argv: list[str] | None = None) -> int:
655695
)
656696

657697
runner = ScaleUpRunner(cfg)
658-
results = runner.run()
698+
results = runner.run(incremental_path=args.incremental_jsonl)
659699

660700
args.output.parent.mkdir(parents=True, exist_ok=True)
661701
args.output.write_text(

0 commit comments

Comments
 (0)