Skip to content

Commit 79e9f55

Browse files
baogorekclaude
andcommitted
Fix build pipeline: add missing script, remove geography.npz, input-scoped checkpoints
- Add create_source_imputed_cps.py to data_build.py Phase 5 (was skipped in CI) - Remove geography.npz dependency from Modal pipeline; workers regenerate geography deterministically from (n_records, n_clones, seed) - Add input-scoped checkpoints to publish_local_area.py: hash weights+dataset to auto-clear stale checkpoints when inputs change - Remove stale artifacts from push-to-modal (stacked_blocks, stacked_takeup, geo_labels) - Stop uploading source_imputed H5 as intermediate; promote-dataset uploads at promotion time instead - Default skip_download=True in Modal local_area (reads from volume) - Remove _upload_source_imputed from remote_calibration_runner - Clean up huggingface.py: remove geography/blocks/geo_labels from download and upload functions - ruff format Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1bed18a commit 79e9f55

10 files changed

Lines changed: 206 additions & 406 deletions

File tree

Makefile

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,11 @@ promote-database:
8787
@echo "Copied DB and raw_inputs to HF clone. Now cd to HF repo, commit, and push."
8888

8989
promote-dataset:
90-
cp policyengine_us_data/storage/source_imputed_stratified_extended_cps_2024.h5 \
91-
$(HF_CLONE_DIR)/calibration/source_imputed_stratified_extended_cps.h5
92-
@echo "Copied dataset to HF clone. Now cd to HF repo, commit, and push."
90+
python -c "from policyengine_us_data.utils.huggingface import upload; \
91+
upload('policyengine_us_data/storage/source_imputed_stratified_extended_cps_2024.h5', \
92+
'policyengine/policyengine-us-data', \
93+
'calibration/source_imputed_stratified_extended_cps.h5')"
94+
@echo "Dataset promoted to HF."
9395

9496
data: download
9597
python policyengine_us_data/utils/uprating.py
@@ -141,11 +143,9 @@ upload-calibration:
141143
upload_calibration_artifacts()"
142144

143145
upload-dataset:
144-
python -c "from policyengine_us_data.utils.huggingface import upload; \
145-
upload('policyengine_us_data/storage/source_imputed_stratified_extended_cps_2024.h5', \
146-
'policyengine/policyengine-us-data', \
147-
'calibration/source_imputed_stratified_extended_cps.h5')"
148-
@echo "Dataset uploaded to HF."
146+
@echo "NOTE: source_imputed H5 is an intermediate artifact."
147+
@echo "Use 'make push-to-modal' to push to Modal volume,"
148+
@echo "or 'make promote-dataset' to publish to HF at promotion time."
149149

150150
upload-database:
151151
python -c "from policyengine_us_data.utils.huggingface import upload; \
@@ -158,18 +158,9 @@ push-to-modal:
158158
modal volume put local-area-staging \
159159
policyengine_us_data/storage/calibration/calibration_weights.npy \
160160
calibration_inputs/calibration/calibration_weights.npy --force
161-
modal volume put local-area-staging \
162-
policyengine_us_data/storage/calibration/stacked_blocks.npy \
163-
calibration_inputs/calibration/stacked_blocks.npy --force
164-
modal volume put local-area-staging \
165-
policyengine_us_data/storage/calibration/stacked_takeup.npz \
166-
calibration_inputs/calibration/stacked_takeup.npz --force
167161
modal volume put local-area-staging \
168162
policyengine_us_data/storage/calibration/policy_data.db \
169163
calibration_inputs/calibration/policy_data.db --force
170-
modal volume put local-area-staging \
171-
policyengine_us_data/storage/calibration/geo_labels.json \
172-
calibration_inputs/calibration/geo_labels.json --force
173164
modal volume put local-area-staging \
174165
policyengine_us_data/storage/source_imputed_stratified_extended_cps_2024.h5 \
175166
calibration_inputs/calibration/source_imputed_stratified_extended_cps.h5 --force
@@ -195,8 +186,7 @@ calibrate-both:
195186

196187
stage-h5s:
197188
modal run modal_app/local_area.py::main \
198-
--branch $(BRANCH) --num-workers $(NUM_WORKERS) \
199-
$(if $(SKIP_DOWNLOAD),--skip-download)
189+
--branch $(BRANCH) --num-workers $(NUM_WORKERS)
200190

201191
stage-national-h5:
202192
modal run modal_app/local_area.py::main_national \
@@ -231,7 +221,7 @@ check-sanity:
231221
python -m policyengine_us_data.calibration.validate_staging \
232222
--sanity-only --area-type states --areas NC
233223

234-
pipeline: data upload-dataset build-matrices calibrate-both stage-all-h5s
224+
pipeline: data push-to-modal build-matrices calibrate-both stage-all-h5s
235225
@echo ""
236226
@echo "========================================"
237227
@echo "Pipeline complete. H5s are in HF staging."

modal_app/README.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ Every run produces these local files (whichever the calibration script emits):
7676
- **unified_diagnostics.csv** — Final per-target diagnostics
7777
- **calibration_log.csv** — Per-target metrics across epochs (requires `--log-freq`)
7878
- **unified_run_config.json** — Run configuration and summary stats
79-
- **stacked_blocks.npy** — Census block assignments for stacked records
8079

8180
## Artifact Upload to HuggingFace
8281

@@ -86,7 +85,6 @@ atomic commit after writing them locally:
8685
| Local file | HF path |
8786
|------------|---------|
8887
| `calibration_weights.npy` | `calibration/calibration_weights.npy` |
89-
| `stacked_blocks.npy` | `calibration/stacked_blocks.npy` |
9088
| `calibration_log.csv` | `calibration/logs/calibration_log.csv` |
9189
| `unified_diagnostics.csv` | `calibration/logs/unified_diagnostics.csv` |
9290
| `unified_run_config.json` | `calibration/logs/unified_run_config.json` |
@@ -201,7 +199,6 @@ Artifacts uploaded to HF by `--push-results`:
201199
| Local file | HF path |
202200
|------------|---------|
203201
| `calibration_weights.npy` | `calibration/calibration_weights.npy` |
204-
| `stacked_blocks.npy` | `calibration/stacked_blocks.npy` |
205202
| `calibration_log.csv` | `calibration/logs/calibration_log.csv` |
206203
| `unified_diagnostics.csv` | `calibration/logs/unified_diagnostics.csv` |
207204
| `unified_run_config.json` | `calibration/logs/unified_run_config.json` |

modal_app/data_build.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -433,15 +433,38 @@ def build_datasets(
433433
for future in as_completed(futures):
434434
future.result()
435435

436-
# SEQUENTIAL: Small enhanced CPS (needs enhanced_cps)
437-
print("=== Phase 5: Building small enhanced CPS ===")
438-
run_script_with_checkpoint(
439-
"policyengine_us_data/datasets/cps/small_enhanced_cps.py",
440-
SCRIPT_OUTPUTS["policyengine_us_data/datasets/cps/small_enhanced_cps.py"],
441-
branch,
442-
checkpoint_volume,
443-
env=env,
436+
# GROUP 4: After Phase 4 - run in parallel
437+
# create_source_imputed_cps needs stratified_cps
438+
# small_enhanced_cps needs enhanced_cps
439+
print(
440+
"=== Phase 5: Building source imputed CPS "
441+
"and small enhanced CPS (parallel) ==="
444442
)
443+
with ThreadPoolExecutor(max_workers=2) as executor:
444+
futures = [
445+
executor.submit(
446+
run_script_with_checkpoint,
447+
"policyengine_us_data/calibration/create_source_imputed_cps.py",
448+
SCRIPT_OUTPUTS[
449+
"policyengine_us_data/calibration/create_source_imputed_cps.py"
450+
],
451+
branch,
452+
checkpoint_volume,
453+
env=env,
454+
),
455+
executor.submit(
456+
run_script_with_checkpoint,
457+
"policyengine_us_data/datasets/cps/small_enhanced_cps.py",
458+
SCRIPT_OUTPUTS[
459+
"policyengine_us_data/datasets/cps/small_enhanced_cps.py"
460+
],
461+
branch,
462+
checkpoint_volume,
463+
env=env,
464+
),
465+
]
466+
for future in as_completed(futures):
467+
future.result()
445468

446469
# Run tests with checkpointing
447470
print("=== Running tests with checkpointing ===")

modal_app/local_area.py

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -313,17 +313,10 @@ def build_areas_worker(
313313
"--output-dir",
314314
str(output_dir),
315315
]
316-
if "geography" not in calibration_inputs:
317-
raise RuntimeError(
318-
"geography.npz path missing from calibration_inputs. "
319-
"Re-run calibration to generate this artifact."
320-
)
321-
worker_cmd.extend(
322-
[
323-
"--geography-path",
324-
calibration_inputs["geography"],
325-
]
326-
)
316+
if "n_clones" in calibration_inputs:
317+
worker_cmd.extend(["--n-clones", str(calibration_inputs["n_clones"])])
318+
if "seed" in calibration_inputs:
319+
worker_cmd.extend(["--seed", str(calibration_inputs["seed"])])
327320
result = subprocess.run(
328321
worker_cmd,
329322
capture_output=True,
@@ -575,7 +568,7 @@ def coordinate_publish(
575568
branch: str = "main",
576569
num_workers: int = 8,
577570
skip_upload: bool = False,
578-
skip_download: bool = False,
571+
skip_download: bool = True,
579572
) -> str:
580573
"""Coordinate the full publishing workflow."""
581574
setup_gcp_credentials()
@@ -612,12 +605,12 @@ def coordinate_publish(
612605
"weights": weights_path,
613606
"dataset": dataset_path,
614607
"database": db_path,
615-
"geography": (calibration_dir / "calibration" / "geography.npz"),
616-
"run_config": (calibration_dir / "calibration" / "unified_run_config.json"),
617608
}
618609
for label, p in required.items():
619610
if not p.exists():
620-
raise RuntimeError(f"Missing required calibration input ({label}): {p}")
611+
raise RuntimeError(
612+
f"Missing required calibration input ({label}): {p}"
613+
)
621614
print("All required calibration inputs found on volume.")
622615
else:
623616
if calibration_dir.exists():
@@ -649,20 +642,14 @@ def coordinate_publish(
649642
calibration_dir / "calibration" / "source_imputed_stratified_extended_cps.h5"
650643
)
651644

652-
geo_npz_path = calibration_dir / "calibration" / "geography.npz"
653645
config_json_path = calibration_dir / "calibration" / "unified_run_config.json"
654646
calibration_inputs = {
655647
"weights": str(weights_path),
656648
"dataset": str(dataset_path),
657649
"database": str(db_path),
650+
"n_clones": 430,
651+
"seed": 42,
658652
}
659-
if not geo_npz_path.exists():
660-
raise RuntimeError(
661-
f"geography.npz not found at {geo_npz_path}. "
662-
f"Re-run calibration to generate this artifact."
663-
)
664-
calibration_inputs["geography"] = str(geo_npz_path)
665-
print(f"Geography artifact found: {geo_npz_path}")
666653
validate_artifacts(
667654
config_json_path,
668655
calibration_dir / "calibration",
@@ -793,7 +780,7 @@ def main(
793780
branch: str = "main",
794781
num_workers: int = 8,
795782
skip_upload: bool = False,
796-
skip_download: bool = False,
783+
skip_download: bool = True,
797784
):
798785
"""Local entrypoint for Modal CLI."""
799786
result = coordinate_publish.remote(
@@ -859,23 +846,16 @@ def coordinate_national_publish(
859846
calibration_dir / "calibration" / "source_imputed_stratified_extended_cps.h5"
860847
)
861848

862-
geo_npz_path = calibration_dir / "calibration" / "national_geography.npz"
863849
config_json_path = (
864850
calibration_dir / "calibration" / "national_unified_run_config.json"
865851
)
866852
calibration_inputs = {
867853
"weights": str(weights_path),
868854
"dataset": str(dataset_path),
869855
"database": str(db_path),
856+
"n_clones": 430,
857+
"seed": 42,
870858
}
871-
if not geo_npz_path.exists():
872-
raise RuntimeError(
873-
f"national_geography.npz not found at "
874-
f"{geo_npz_path}. Re-run national calibration "
875-
f"to generate this artifact."
876-
)
877-
calibration_inputs["geography"] = str(geo_npz_path)
878-
print(f"National geography artifact found: {geo_npz_path}")
879859
validate_artifacts(
880860
config_json_path,
881861
calibration_dir / "calibration",

0 commit comments

Comments
 (0)