Skip to content

Commit 679f3ee

Browse files
baogorekclaude
andcommitted
Revert BaseSimData: use fresh Microsimulation per build_h5 call
BaseSimData extracted simulation data into a static dataclass to avoid reloading per area, but this reimplemented Microsimulation internals and produced incorrect population numbers. Each build_h5 call now creates a fresh Microsimulation from dataset_path — correct by construction. Also includes worker log streaming fix and target config updates. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 56c3059 commit 679f3ee

7 files changed

Lines changed: 218 additions & 240 deletions

File tree

modal_app/data_build.py

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
import os
33
import shutil
44
import subprocess
5+
import sys
56
import threading
67
from concurrent.futures import ThreadPoolExecutor, as_completed
8+
from datetime import datetime, timezone
79
from pathlib import Path
8-
from typing import Optional
10+
from typing import IO, Optional
911

1012
import modal
1113

@@ -211,10 +213,35 @@ def cleanup_checkpoints(branch: str, volume: modal.Volume) -> None:
211213
print(f"Cleaned up checkpoints for branch: {branch}")
212214

213215

216+
def run_script_logged(
217+
cmd: list,
218+
log_file: IO,
219+
env: dict,
220+
check: bool = True,
221+
) -> subprocess.CompletedProcess:
222+
"""Run a command, streaming output to both stdout and a log file."""
223+
proc = subprocess.Popen(
224+
cmd,
225+
stdout=subprocess.PIPE,
226+
stderr=subprocess.STDOUT,
227+
text=True,
228+
env=env,
229+
)
230+
for line in proc.stdout:
231+
sys.stdout.write(line)
232+
sys.stdout.flush()
233+
log_file.write(line)
234+
proc.wait()
235+
if check and proc.returncode != 0:
236+
raise subprocess.CalledProcessError(proc.returncode, cmd)
237+
return subprocess.CompletedProcess(cmd, proc.returncode)
238+
239+
214240
def run_script(
215241
script_path: str,
216242
args: Optional[list] = None,
217243
env: Optional[dict] = None,
244+
log_file: IO = None,
218245
) -> str:
219246
"""Run a script with uv and return its path for logging.
220247
@@ -229,11 +256,18 @@ def run_script(
229256
Raises:
230257
subprocess.CalledProcessError: If the script fails.
231258
"""
232-
cmd = ["uv", "run", "python", script_path]
259+
cmd = ["uv", "run", "python", "-u", script_path]
233260
if args:
234261
cmd.extend(args)
262+
run_env = env or os.environ.copy()
263+
run_env["PYTHONUNBUFFERED"] = "1"
235264
print(f"Starting {script_path}...")
236-
subprocess.run(cmd, check=True, env=env or os.environ.copy())
265+
if log_file:
266+
log_file.write(f"\n{'=' * 60}\nStarting {script_path}...\n{'=' * 60}\n")
267+
log_file.flush()
268+
run_script_logged(cmd, log_file, run_env)
269+
else:
270+
subprocess.run(cmd, check=True, env=run_env)
237271
print(f"Completed {script_path}")
238272
return script_path
239273

@@ -245,6 +279,7 @@ def run_script_with_checkpoint(
245279
volume: modal.Volume,
246280
args: Optional[list] = None,
247281
env: Optional[dict] = None,
282+
log_file: IO = None,
248283
) -> str:
249284
"""Run script if output not checkpointed, then checkpoint result.
250285
@@ -275,7 +310,7 @@ def run_script_with_checkpoint(
275310
return script_path
276311

277312
# Run the script
278-
run_script(script_path, args=args, env=env)
313+
run_script(script_path, args=args, env=env, log_file=log_file)
279314

280315
# Checkpoint all outputs
281316
for output_file in output_files:
@@ -319,7 +354,7 @@ def run_tests_with_checkpoints(
319354

320355
print(f"Running tests: {module}")
321356
result = subprocess.run(
322-
["uv", "run", "pytest", module, "-v"],
357+
["uv", "run", "python", "-u", "-m", "pytest", module, "-v"],
323358
env=env,
324359
)
325360

@@ -341,7 +376,7 @@ def run_tests_with_checkpoints(
341376
},
342377
memory=32768,
343378
cpu=8.0,
344-
timeout=14400,
379+
timeout=28800, # 8 hours
345380
nonpreemptible=True,
346381
)
347382
def build_datasets(
@@ -389,10 +424,26 @@ def build_datasets(
389424

390425
env = os.environ.copy()
391426

427+
# Open persistent build log with provenance header
428+
commit = get_current_commit()
429+
log_path = Path("build_log.txt")
430+
log_file = open(log_path, "w")
431+
started = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
432+
log_file.write(
433+
f"{'=' * 40}\n"
434+
f" Data Build Log\n"
435+
f" Branch: {branch}\n"
436+
f" Commit: {commit[:8]}\n"
437+
f" Started: {started}\n"
438+
f"{'=' * 40}\n"
439+
)
440+
log_file.flush()
441+
392442
# Download prerequisites
393443
run_script(
394444
"policyengine_us_data/storage/download_private_prerequisites.py",
395445
env=env,
446+
log_file=log_file,
396447
)
397448
# Checkpoint policy_data.db immediately after download so it survives
398449
# test failures and can be restored on retries.
@@ -416,6 +467,7 @@ def build_datasets(
416467
branch,
417468
checkpoint_volume,
418469
env=env,
470+
log_file=log_file,
419471
)
420472
else:
421473
# Parallel execution based on dependency groups with checkpointing
@@ -444,6 +496,7 @@ def build_datasets(
444496
branch,
445497
checkpoint_volume,
446498
env=env,
499+
log_file=log_file,
447500
): script
448501
for script, output in group1
449502
}
@@ -472,6 +525,7 @@ def build_datasets(
472525
branch,
473526
checkpoint_volume,
474527
env=env,
528+
log_file=log_file,
475529
): script
476530
for script, output in group2
477531
}
@@ -486,6 +540,7 @@ def build_datasets(
486540
branch,
487541
checkpoint_volume,
488542
env=env,
543+
log_file=log_file,
489544
)
490545

491546
# GROUP 3: After extended_cps - run in parallel
@@ -504,6 +559,7 @@ def build_datasets(
504559
branch,
505560
checkpoint_volume,
506561
env=env,
562+
log_file=log_file,
507563
)
508564
)
509565
else:
@@ -518,6 +574,7 @@ def build_datasets(
518574
branch,
519575
checkpoint_volume,
520576
env=env,
577+
log_file=log_file,
521578
)
522579
)
523580
for future in as_completed(phase4_futures):
@@ -542,6 +599,7 @@ def build_datasets(
542599
branch,
543600
checkpoint_volume,
544601
env=env,
602+
log_file=log_file,
545603
)
546604
)
547605
if not skip_enhanced_cps:
@@ -555,19 +613,25 @@ def build_datasets(
555613
branch,
556614
checkpoint_volume,
557615
env=env,
616+
log_file=log_file,
558617
)
559618
)
560619
else:
561620
print("Skipping small_enhanced_cps.py (--skip-enhanced-cps)")
562621
for future in as_completed(phase5_futures):
563622
future.result()
564623

624+
# Checkpoint the build log so it survives preemption
625+
log_file.flush()
626+
save_checkpoint(branch, str(log_path), checkpoint_volume)
627+
565628
# Copy pipeline artifacts to shared volume before tests so that a test
566629
# failure does not block downstream calibration steps.
567630
# Files selected:
568631
# - source_imputed H5: main dataset for calibration and local area builds
569632
# - policy_data.db: calibration target database
570633
# - calibration_weights.npy: pre-existing weights for re-runs (if present)
634+
# - build_log.txt: persistent build log with provenance
571635
print("Copying pipeline artifacts to shared volume...")
572636
artifacts_dir = Path(PIPELINE_MOUNT) / "artifacts"
573637
artifacts_dir.mkdir(parents=True, exist_ok=True)
@@ -586,6 +650,8 @@ def build_datasets(
586650
artifacts_dir / "calibration_weights.npy",
587651
)
588652
print("Copied existing calibration_weights.npy to pipeline volume")
653+
shutil.copy2(log_path, artifacts_dir / "build_log.txt")
654+
log_file.close()
589655
pipeline_volume.commit()
590656
print("Pipeline artifacts committed to shared volume")
591657

modal_app/local_area.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -397,13 +397,11 @@ def build_areas_worker(
397397
worker_cmd.append("--no-validate")
398398
result = subprocess.run(
399399
worker_cmd,
400-
capture_output=True,
400+
stdout=subprocess.PIPE,
401401
text=True,
402402
env=os.environ.copy(),
403403
)
404404

405-
print(result.stderr)
406-
407405
if result.returncode != 0:
408406
return {
409407
"completed": [],

modal_app/worker_script.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ def main():
208208

209209
from policyengine_us_data.calibration.publish_local_area import (
210210
build_h5,
211-
prepare_base_sim_data,
212211
NYC_COUNTIES,
213212
NYC_CDS,
214213
AT_LARGE_DISTRICTS,
@@ -222,8 +221,11 @@ def main():
222221

223222
weights = np.load(weights_path)
224223

225-
base_data = prepare_base_sim_data(dataset_path)
226-
n_records = base_data.n_hh
224+
from policyengine_us import Microsimulation
225+
226+
_sim = Microsimulation(dataset=str(dataset_path))
227+
n_records = len(_sim.calculate("household_id", map_to="household").values)
228+
del _sim
227229

228230
geography = assign_random_geography(
229231
n_records=n_records,
@@ -337,7 +339,7 @@ def main():
337339
path = build_h5(
338340
weights=weights,
339341
geography=geography,
340-
base_data=base_data,
342+
dataset_path=dataset_path,
341343
output_path=states_dir / f"{item_id}.h5",
342344
cd_subset=cd_subset,
343345
takeup_filter=takeup_filter,
@@ -380,7 +382,7 @@ def main():
380382
path = build_h5(
381383
weights=weights,
382384
geography=geography,
383-
base_data=base_data,
385+
dataset_path=dataset_path,
384386
output_path=districts_dir / f"{friendly_name}.h5",
385387
cd_subset=[geoid],
386388
takeup_filter=takeup_filter,
@@ -399,7 +401,7 @@ def main():
399401
path = build_h5(
400402
weights=weights,
401403
geography=geography,
402-
base_data=base_data,
404+
dataset_path=dataset_path,
403405
output_path=cities_dir / "NYC.h5",
404406
cd_subset=cd_subset,
405407
county_filter=NYC_COUNTIES,
@@ -427,7 +429,7 @@ def main():
427429
path = build_h5(
428430
weights=weights,
429431
geography=national_geo,
430-
base_data=base_data,
432+
dataset_path=dataset_path,
431433
output_path=national_dir / "US.h5",
432434
)
433435
else:

0 commit comments

Comments
 (0)