Skip to content

Commit 34f5fc0

Browse files
committed
modal
1 parent 2335eb3 commit 34f5fc0

4 files changed

Lines changed: 159 additions & 216 deletions

File tree

Makefile

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.PHONY: all format test install download upload docker documentation data validate-data calibrate calibrate-build publish-local-area upload-calibration upload-dataset upload-database push-to-modal build-matrices calibrate-modal calibrate-modal-national calibrate-both stage-h5s stage-national-h5 stage-all-h5s pipeline validate-staging validate-staging-full upload-validation check-staging check-sanity clean build paper clean-paper presentations database database-refresh promote-database promote-dataset promote build-h5s validate-local
1+
.PHONY: all format test install download upload docker documentation data validate-data calibrate calibrate-build publish-local-area upload-calibration upload-dataset upload-database push-to-modal build-data-modal build-matrices calibrate-modal calibrate-modal-national calibrate-both stage-h5s stage-national-h5 stage-all-h5s pipeline validate-staging validate-staging-full upload-validation check-staging check-sanity clean build paper clean-paper presentations database database-refresh promote-database promote-dataset promote build-h5s validate-local
22

33
GPU ?= A100-80GB
44
EPOCHS ?= 200
@@ -157,16 +157,16 @@ upload-database:
157157
@echo "Database uploaded to HF."
158158

159159
push-to-modal:
160-
modal volume put local-area-staging \
160+
modal volume put pipeline-artifacts \
161161
policyengine_us_data/storage/calibration/calibration_weights.npy \
162-
calibration_inputs/calibration/calibration_weights.npy --force
163-
modal volume put local-area-staging \
162+
artifacts/calibration_weights.npy --force
163+
modal volume put pipeline-artifacts \
164164
policyengine_us_data/storage/calibration/policy_data.db \
165-
calibration_inputs/calibration/policy_data.db --force
166-
modal volume put local-area-staging \
165+
artifacts/policy_data.db --force
166+
modal volume put pipeline-artifacts \
167167
policyengine_us_data/storage/source_imputed_stratified_extended_cps_2024.h5 \
168-
calibration_inputs/calibration/source_imputed_stratified_extended_cps.h5 --force
169-
@echo "All calibration inputs pushed to Modal volume."
168+
artifacts/source_imputed_stratified_extended_cps.h5 --force
169+
@echo "All pipeline artifacts pushed to Modal volume."
170170

171171
build-matrices:
172172
modal run modal_app/remote_calibration_runner.py::build_package \
@@ -188,8 +188,7 @@ calibrate-both:
188188

189189
stage-h5s:
190190
modal run modal_app/local_area.py::main \
191-
--branch $(BRANCH) --num-workers $(NUM_WORKERS) \
192-
$(if $(SKIP_DOWNLOAD),--skip-download)
191+
--branch $(BRANCH) --num-workers $(NUM_WORKERS)
193192

194193
stage-national-h5:
195194
modal run modal_app/local_area.py::main_national \
@@ -224,7 +223,10 @@ check-sanity:
224223
python -m policyengine_us_data.calibration.validate_staging \
225224
--sanity-only --area-type states --areas NC
226225

227-
pipeline: data upload-dataset build-matrices calibrate-both stage-all-h5s
226+
build-data-modal:
227+
modal run modal_app/data_build.py::main --branch $(BRANCH) --upload
228+
229+
pipeline: build-data-modal build-matrices calibrate-both stage-all-h5s
228230
@echo ""
229231
@echo "========================================"
230232
@echo "Pipeline complete. H5s are in HF staging."

modal_app/data_build.py

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@
2020
create_if_missing=True,
2121
)
2222

23+
# Shared pipeline volume for inter-step artifact transport
24+
pipeline_volume = modal.Volume.from_name(
25+
"pipeline-artifacts",
26+
create_if_missing=True,
27+
)
28+
PIPELINE_MOUNT = "/pipeline"
29+
2330
image = (
2431
modal.Image.debian_slim(python_version="3.13").apt_install("git").pip_install("uv")
2532
)
@@ -270,7 +277,10 @@ def run_tests_with_checkpoints(
270277
@app.function(
271278
image=image,
272279
secrets=[hf_secret, gcp_secret],
273-
volumes={VOLUME_MOUNT: checkpoint_volume},
280+
volumes={
281+
VOLUME_MOUNT: checkpoint_volume,
282+
PIPELINE_MOUNT: pipeline_volume,
283+
},
274284
memory=32768,
275285
cpu=8.0,
276286
timeout=14400,
@@ -470,32 +480,27 @@ def build_datasets(
470480
print("=== Running tests with checkpointing ===")
471481
run_tests_with_checkpoints(branch, checkpoint_volume, env)
472482

473-
# Upload if requested
483+
# Copy pipeline artifacts to shared volume for downstream steps
484+
print("Copying pipeline artifacts to shared volume...")
485+
artifacts_dir = Path(PIPELINE_MOUNT) / "artifacts"
486+
artifacts_dir.mkdir(parents=True, exist_ok=True)
487+
shutil.copy2(
488+
"policyengine_us_data/storage/source_imputed_stratified_extended_cps_2024.h5",
489+
artifacts_dir / "source_imputed_stratified_extended_cps.h5",
490+
)
491+
shutil.copy2(
492+
"policyengine_us_data/storage/calibration/policy_data.db",
493+
artifacts_dir / "policy_data.db",
494+
)
495+
pipeline_volume.commit()
496+
print("Pipeline artifacts committed to shared volume")
497+
498+
# Upload if requested (HF publication only)
474499
if upload:
475500
run_script(
476501
"policyengine_us_data/storage/upload_completed_datasets.py",
477502
env=env,
478503
)
479-
# Upload source_imputed to calibration/ path for downstream pipeline
480-
print("Uploading source_imputed dataset to HF calibration/...")
481-
subprocess.run(
482-
[
483-
"uv",
484-
"run",
485-
"python",
486-
"-c",
487-
"from policyengine_us_data.utils.huggingface import upload; "
488-
"upload("
489-
"'policyengine_us_data/storage/"
490-
"source_imputed_stratified_extended_cps_2024.h5', "
491-
"'policyengine/policyengine-us-data', "
492-
"'calibration/"
493-
"source_imputed_stratified_extended_cps.h5')",
494-
],
495-
check=True,
496-
env=env,
497-
)
498-
print("Source imputed dataset uploaded to HF")
499504

500505
# Clean up checkpoints after successful completion
501506
cleanup_checkpoints(branch, checkpoint_volume)

modal_app/local_area.py

Lines changed: 57 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
create_if_missing=True,
2929
)
3030

31+
pipeline_volume = modal.Volume.from_name(
32+
"pipeline-artifacts",
33+
create_if_missing=True,
34+
)
35+
3136
image = (
3237
modal.Image.debian_slim(python_version="3.13")
3338
.apt_install("git")
@@ -274,7 +279,10 @@ def run_phase(
274279
@app.function(
275280
image=image,
276281
secrets=[hf_secret, gcp_secret],
277-
volumes={VOLUME_MOUNT: staging_volume},
282+
volumes={
283+
VOLUME_MOUNT: staging_volume,
284+
"/pipeline": pipeline_volume,
285+
},
278286
memory=16384,
279287
cpu=4.0,
280288
timeout=14400,
@@ -560,15 +568,17 @@ def promote_publish(branch: str = "main", version: str = "") -> str:
560568
@app.function(
561569
image=image,
562570
secrets=[hf_secret, gcp_secret],
563-
volumes={VOLUME_MOUNT: staging_volume},
571+
volumes={
572+
VOLUME_MOUNT: staging_volume,
573+
"/pipeline": pipeline_volume,
574+
},
564575
memory=8192,
565576
timeout=86400,
566577
)
567578
def coordinate_publish(
568579
branch: str = "main",
569580
num_workers: int = 8,
570581
skip_upload: bool = False,
571-
skip_download: bool = False,
572582
) -> str:
573583
"""Coordinate the full publishing workflow."""
574584
setup_gcp_credentials()
@@ -587,73 +597,34 @@ def coordinate_publish(
587597
shutil.rmtree(version_dir)
588598
version_dir.mkdir(parents=True, exist_ok=True)
589599

590-
calibration_dir = staging_dir / "calibration_inputs"
591-
592-
# hf_hub_download preserves directory structure, so files are in calibration/ subdir
593-
weights_path = calibration_dir / "calibration" / "calibration_weights.npy"
594-
db_path = calibration_dir / "calibration" / "policy_data.db"
595-
596-
if skip_download:
597-
print("Verifying pre-pushed calibration inputs...")
598-
staging_volume.reload()
599-
dataset_path = (
600-
calibration_dir
601-
/ "calibration"
602-
/ "source_imputed_stratified_extended_cps.h5"
603-
)
604-
required = {
605-
"weights": weights_path,
606-
"dataset": dataset_path,
607-
"database": db_path,
608-
}
609-
for label, p in required.items():
610-
if not p.exists():
611-
raise RuntimeError(
612-
f"Missing required calibration input ({label}): {p}"
613-
)
614-
print("All required calibration inputs found on volume.")
615-
else:
616-
if calibration_dir.exists():
617-
shutil.rmtree(calibration_dir)
618-
calibration_dir.mkdir(parents=True, exist_ok=True)
619-
620-
print("Downloading calibration inputs from HuggingFace...")
621-
result = subprocess.run(
622-
[
623-
"uv",
624-
"run",
625-
"python",
626-
"-c",
627-
f"""
628-
from policyengine_us_data.utils.huggingface import download_calibration_inputs
629-
download_calibration_inputs("{calibration_dir}")
630-
print("Done")
631-
""",
632-
],
633-
text=True,
634-
env=os.environ.copy(),
635-
)
636-
if result.returncode != 0:
637-
raise RuntimeError(f"Download failed: {result.stderr}")
638-
staging_volume.commit()
639-
print("Calibration inputs downloaded")
640-
641-
dataset_path = (
642-
calibration_dir / "calibration" / "source_imputed_stratified_extended_cps.h5"
643-
)
600+
pipeline_volume.reload()
601+
artifacts = Path("/pipeline/artifacts")
602+
weights_path = artifacts / "calibration_weights.npy"
603+
db_path = artifacts / "policy_data.db"
604+
dataset_path = artifacts / "source_imputed_stratified_extended_cps.h5"
605+
config_json_path = artifacts / "unified_run_config.json"
606+
607+
required = {
608+
"weights": weights_path,
609+
"dataset": dataset_path,
610+
"database": db_path,
611+
}
612+
for label, p in required.items():
613+
if not p.exists():
614+
raise RuntimeError(
615+
f"Missing {label} on pipeline volume: {p}. "
616+
f"Run upstream pipeline steps first."
617+
)
618+
print("All required pipeline artifacts found on volume.")
644619

645-
config_json_path = calibration_dir / "calibration" / "unified_run_config.json"
646620
calibration_inputs = {
647621
"weights": str(weights_path),
648622
"dataset": str(dataset_path),
649623
"database": str(db_path),
650624
"n_clones": 430,
651625
"seed": 42,
652626
}
653-
validate_artifacts(
654-
config_json_path,
655-
calibration_dir / "calibration",
656-
)
627+
validate_artifacts(config_json_path, artifacts)
657628
result = subprocess.run(
658629
[
659630
"uv",
@@ -780,22 +751,23 @@ def main(
780751
branch: str = "main",
781752
num_workers: int = 8,
782753
skip_upload: bool = False,
783-
skip_download: bool = False,
784754
):
785755
"""Local entrypoint for Modal CLI."""
786756
result = coordinate_publish.remote(
787757
branch=branch,
788758
num_workers=num_workers,
789759
skip_upload=skip_upload,
790-
skip_download=skip_download,
791760
)
792761
print(result)
793762

794763

795764
@app.function(
796765
image=image,
797766
secrets=[hf_secret, gcp_secret],
798-
volumes={VOLUME_MOUNT: staging_volume},
767+
volumes={
768+
VOLUME_MOUNT: staging_volume,
769+
"/pipeline": pipeline_volume,
770+
},
799771
memory=16384,
800772
timeout=14400,
801773
)
@@ -809,57 +781,36 @@ def coordinate_national_publish(
809781
version = get_version()
810782
print(f"Building national H5 for version {version} from branch {branch}")
811783

812-
import shutil
813-
814784
staging_dir = Path(VOLUME_MOUNT)
815-
calibration_dir = staging_dir / "national_calibration_inputs"
816-
if calibration_dir.exists():
817-
shutil.rmtree(calibration_dir)
818-
calibration_dir.mkdir(parents=True, exist_ok=True)
819-
820-
print("Downloading national calibration inputs from HF...")
821-
result = subprocess.run(
822-
[
823-
"uv",
824-
"run",
825-
"python",
826-
"-c",
827-
f"""
828-
from policyengine_us_data.utils.huggingface import (
829-
download_calibration_inputs,
830-
)
831-
download_calibration_inputs("{calibration_dir}", prefix="national_")
832-
print("Done")
833-
""",
834-
],
835-
text=True,
836-
env=os.environ.copy(),
837-
)
838-
if result.returncode != 0:
839-
raise RuntimeError(f"Download failed: {result.stderr}")
840-
staging_volume.commit()
841-
print("National calibration inputs downloaded")
842785

843-
weights_path = calibration_dir / "calibration" / "national_calibration_weights.npy"
844-
db_path = calibration_dir / "calibration" / "policy_data.db"
845-
dataset_path = (
846-
calibration_dir / "calibration" / "source_imputed_stratified_extended_cps.h5"
847-
)
786+
pipeline_volume.reload()
787+
artifacts = Path("/pipeline/artifacts")
788+
weights_path = artifacts / "national_calibration_weights.npy"
789+
db_path = artifacts / "policy_data.db"
790+
dataset_path = artifacts / "source_imputed_stratified_extended_cps.h5"
791+
config_json_path = artifacts / "national_unified_run_config.json"
792+
793+
required = {
794+
"weights": weights_path,
795+
"dataset": dataset_path,
796+
"database": db_path,
797+
}
798+
for label, p in required.items():
799+
if not p.exists():
800+
raise RuntimeError(
801+
f"Missing {label} on pipeline volume: {p}. "
802+
f"Run upstream pipeline steps first."
803+
)
804+
print("All required national pipeline artifacts found.")
848805

849-
config_json_path = (
850-
calibration_dir / "calibration" / "national_unified_run_config.json"
851-
)
852806
calibration_inputs = {
853807
"weights": str(weights_path),
854808
"dataset": str(dataset_path),
855809
"database": str(db_path),
856810
"n_clones": 430,
857811
"seed": 42,
858812
}
859-
validate_artifacts(
860-
config_json_path,
861-
calibration_dir / "calibration",
862-
)
813+
validate_artifacts(config_json_path, artifacts)
863814
version_dir = staging_dir / version
864815
version_dir.mkdir(parents=True, exist_ok=True)
865816

0 commit comments

Comments
 (0)