Skip to content

Commit 52f1e4f

Browse files
authored
Merge pull request #637 from PolicyEngine/maria/issue_634
Add run_id to artifact directories
2 parents 243fdeb + 8b9d231 commit 52f1e4f

5 files changed

Lines changed: 69 additions & 30 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Scope pipeline artifact directory by run ID to prevent concurrent runs from clobbering each other's H5 files, calibration packages, and weights.

modal_app/data_build.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ def build_datasets(
350350
clear_checkpoints: bool = False,
351351
skip_tests: bool = False,
352352
skip_enhanced_cps: bool = False,
353+
run_id: str = "",
353354
):
354355
"""Build all datasets with preemption-resilient checkpointing.
355356
@@ -593,6 +594,8 @@ def build_datasets(
593594
# failure does not block downstream calibration steps.
594595
print("Copying pipeline artifacts to shared volume...")
595596
artifacts_dir = Path(PIPELINE_MOUNT) / "artifacts"
597+
if run_id:
598+
artifacts_dir = artifacts_dir / run_id
596599
artifacts_dir.mkdir(parents=True, exist_ok=True)
597600

598601
# Copy all intermediate H5 datasets for lineage tracing

modal_app/local_area.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ def run_phase(
294294
},
295295
memory=16384,
296296
cpu=4.0,
297-
timeout=14400,
297+
timeout=28800,
298298
nonpreemptible=True,
299299
)
300300
def build_areas_worker(
@@ -438,7 +438,7 @@ def validate_staging(branch: str, version: str, run_id: str = "") -> Dict:
438438
secrets=[hf_secret],
439439
volumes={VOLUME_MOUNT: staging_volume},
440440
memory=8192,
441-
timeout=14400,
441+
timeout=28800,
442442
nonpreemptible=True,
443443
)
444444
def upload_to_staging(
@@ -646,7 +646,9 @@ def coordinate_publish(
646646
version_dir = staging_dir / version
647647

648648
pipeline_volume.reload()
649-
artifacts = Path("/pipeline/artifacts")
649+
artifacts = (
650+
Path(f"/pipeline/artifacts/{run_id}") if run_id else Path("/pipeline/artifacts")
651+
)
650652
weights_path = artifacts / "calibration_weights.npy"
651653
db_path = artifacts / "policy_data.db"
652654
dataset_path = artifacts / "source_imputed_stratified_extended_cps.h5"
@@ -900,7 +902,7 @@ def main(
900902
"/pipeline": pipeline_volume,
901903
},
902904
memory=16384,
903-
timeout=14400,
905+
timeout=28800,
904906
nonpreemptible=True,
905907
)
906908
def coordinate_national_publish(
@@ -929,7 +931,9 @@ def coordinate_national_publish(
929931
staging_dir = Path(VOLUME_MOUNT)
930932

931933
pipeline_volume.reload()
932-
artifacts = Path("/pipeline/artifacts")
934+
artifacts = (
935+
Path(f"/pipeline/artifacts/{run_id}") if run_id else Path("/pipeline/artifacts")
936+
)
933937
weights_path = artifacts / "national_calibration_weights.npy"
934938
db_path = artifacts / "policy_data.db"
935939
dataset_path = artifacts / "source_imputed_stratified_extended_cps.h5"

modal_app/pipeline.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,21 @@
6767
REPO_URL = "https://github.com/PolicyEngine/policyengine-us-data.git"
6868
PIPELINE_MOUNT = "/pipeline"
6969
STAGING_MOUNT = "/staging"
70-
ARTIFACTS_DIR = f"{PIPELINE_MOUNT}/artifacts"
70+
ARTIFACTS_BASE = f"{PIPELINE_MOUNT}/artifacts"
7171
RUNS_DIR = f"{PIPELINE_MOUNT}/runs"
7272

7373

74+
def artifacts_dir_for_run(run_id: str) -> str:
75+
"""Return the run-scoped artifacts directory.
76+
77+
When run_id is empty, falls back to the flat base path
78+
for backward compatibility with standalone invocations.
79+
"""
80+
if run_id:
81+
return f"{ARTIFACTS_BASE}/{run_id}"
82+
return ARTIFACTS_BASE
83+
84+
7485
# ── Run metadata ─────────────────────────────────────────────────
7586

7687

@@ -302,7 +313,7 @@ def stage_base_datasets(
302313
version: Package version string for the commit.
303314
branch: Git branch for repo clone.
304315
"""
305-
artifacts = Path(ARTIFACTS_DIR)
316+
artifacts = Path(artifacts_dir_for_run(run_id))
306317

307318
files_with_paths = []
308319

@@ -666,8 +677,8 @@ def run_pipeline(
666677
run_dir.mkdir(parents=True, exist_ok=True)
667678
(run_dir / "diagnostics").mkdir(exist_ok=True)
668679

669-
# Create artifacts directory
670-
Path(ARTIFACTS_DIR).mkdir(parents=True, exist_ok=True)
680+
# Create run-scoped artifacts directory
681+
Path(artifacts_dir_for_run(run_id)).mkdir(parents=True, exist_ok=True)
671682

672683
write_run_meta(meta, pipeline_volume)
673684

@@ -704,6 +715,7 @@ def run_pipeline(
704715
clear_checkpoints=clear_checkpoints,
705716
skip_tests=True,
706717
skip_enhanced_cps=False,
718+
run_id=run_id,
707719
)
708720

709721
# The build_datasets step produces files in its
@@ -732,6 +744,7 @@ def run_pipeline(
732744
branch=branch,
733745
workers=num_workers,
734746
n_clones=n_clones,
747+
run_id=run_id,
735748
)
736749
print(f" Package at: {pkg_path}")
737750

@@ -750,7 +763,7 @@ def run_pipeline(
750763
print("\n[Step 3/5] Fitting calibration weights...")
751764
step_start = time.time()
752765

753-
vol_path = "/pipeline/artifacts/calibration_package.pkl"
766+
vol_path = f"{artifacts_dir_for_run(run_id)}/calibration_package.pkl"
754767
target_cfg = "policyengine_us_data/calibration/target_config.yaml"
755768

756769
# Spawn regional fit
@@ -794,16 +807,17 @@ def run_pipeline(
794807
regional_result = regional_handle.get()
795808
print(" Regional fit complete. Writing to volume...")
796809

797-
# Write regional results to pipeline volume
810+
# Write regional results to pipeline volume (run-scoped)
811+
artifacts_rel = f"artifacts/{run_id}" if run_id else "artifacts"
798812
with pipeline_volume.batch_upload(force=True) as batch:
799813
batch.put_file(
800814
BytesIO(regional_result["weights"]),
801-
"artifacts/calibration_weights.npy",
815+
f"{artifacts_rel}/calibration_weights.npy",
802816
)
803817
if regional_result.get("config"):
804818
batch.put_file(
805819
BytesIO(regional_result["config"]),
806-
"artifacts/unified_run_config.json",
820+
f"{artifacts_rel}/unified_run_config.json",
807821
)
808822

809823
archive_diagnostics(
@@ -822,12 +836,12 @@ def run_pipeline(
822836
with pipeline_volume.batch_upload(force=True) as batch:
823837
batch.put_file(
824838
BytesIO(national_result["weights"]),
825-
"artifacts/national_calibration_weights.npy",
839+
f"{artifacts_rel}/national_calibration_weights.npy",
826840
)
827841
if national_result.get("config"):
828842
batch.put_file(
829843
BytesIO(national_result["config"]),
830-
"artifacts/national_unified_run_config.json",
844+
f"{artifacts_rel}/national_unified_run_config.json",
831845
)
832846

833847
archive_diagnostics(

0 commit comments

Comments
 (0)