Skip to content

Commit af3d5b8

Browse files
committed
fixing data build order from #674
1 parent 4479b78 commit af3d5b8

3 files changed

Lines changed: 28 additions & 50 deletions

File tree

modal_app/data_build.py

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -468,49 +468,27 @@ def build_datasets(
468468
for future in as_completed(futures):
469469
future.result() # Raises if script failed
470470

471-
# GROUP 2: Depends on Group 1 - run in parallel
472-
# cps.py needs acs, puf.py needs irs_puf + uprating
473-
print("=== Phase 2: Building CPS and PUF (parallel) ===")
474-
group2 = [
475-
(
476-
"policyengine_us_data/datasets/cps/cps.py",
477-
SCRIPT_OUTPUTS["policyengine_us_data/datasets/cps/cps.py"],
478-
),
479-
(
480-
"policyengine_us_data/datasets/puf/puf.py",
481-
SCRIPT_OUTPUTS["policyengine_us_data/datasets/puf/puf.py"],
482-
),
483-
]
484-
with ThreadPoolExecutor(max_workers=2) as executor:
485-
futures = {
486-
executor.submit(
487-
run_script_with_checkpoint,
488-
script,
489-
output,
490-
branch,
491-
checkpoint_volume,
492-
env=env,
493-
log_file=log_file,
494-
): script
495-
for script, output in group2
496-
}
497-
for future in as_completed(futures):
498-
future.result()
499-
500-
# SEQUENTIAL: Extended CPS (needs both cps and puf)
501-
print("=== Phase 3: Building extended CPS ===")
502-
run_script_with_checkpoint(
471+
# GROUP 2: Sequential chain — each step depends on the previous.
472+
# cps.py needs acs; puf.py needs irs_puf + uprating + cps
473+
# (pension imputation); extended_cps.py needs both cps and puf.
474+
print("=== Phase 2: Building CPS → PUF → extended CPS ===")
475+
for script in (
476+
"policyengine_us_data/datasets/cps/cps.py",
477+
"policyengine_us_data/datasets/puf/puf.py",
503478
"policyengine_us_data/datasets/cps/extended_cps.py",
504-
SCRIPT_OUTPUTS["policyengine_us_data/datasets/cps/extended_cps.py"],
505-
branch,
506-
checkpoint_volume,
507-
env=env,
508-
log_file=log_file,
509-
)
479+
):
480+
run_script_with_checkpoint(
481+
script,
482+
SCRIPT_OUTPUTS[script],
483+
branch,
484+
checkpoint_volume,
485+
env=env,
486+
log_file=log_file,
487+
)
510488

511489
# GROUP 3: After extended_cps - run in parallel
512490
# enhanced_cps and stratified_cps both depend on extended_cps
513-
print("=== Phase 4: Building enhanced and stratified CPS (parallel) ===")
491+
print("=== Phase 3: Building enhanced and stratified CPS (parallel) ===")
514492
phase4_futures = []
515493
with ThreadPoolExecutor(max_workers=2) as executor:
516494
if not skip_enhanced_cps:
@@ -545,11 +523,11 @@ def build_datasets(
545523
for future in as_completed(phase4_futures):
546524
future.result()
547525

548-
# GROUP 4: After Phase 4 - run in parallel
526+
# GROUP 4: After Phase 3 - run in parallel
549527
# create_source_imputed_cps needs stratified_cps
550528
# small_enhanced_cps needs enhanced_cps
551529
print(
552-
"=== Phase 5: Building source imputed CPS "
530+
"=== Phase 4: Building source imputed CPS "
553531
"and small enhanced CPS (parallel) ==="
554532
)
555533
phase5_futures = []

policyengine_us_data/datasets/org/org.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from functools import lru_cache
23

34
from microimpute.models.qrf import QRF
@@ -392,7 +393,13 @@ def load_org_training_data() -> pd.DataFrame:
392393
"""Load ORG donor rows built from official CPS basic monthly files."""
393394
cache_path = STORAGE_FOLDER / ORG_FILENAME
394395
if cache_path.exists():
395-
return pd.read_csv(cache_path)
396+
try:
397+
return pd.read_csv(cache_path)
398+
except (EOFError, pd.errors.ParserError):
399+
logging.warning(
400+
"Corrupt ORG cache %s — deleting and rebuilding", cache_path
401+
)
402+
cache_path.unlink()
396403

397404
months = []
398405
for month in ORG_MONTHS:

policyengine_us_data/datasets/puf/puf.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,7 @@ def impute_pension_contributions_to_puf(puf_df):
167167
from policyengine_us import Microsimulation
168168
from policyengine_us_data.datasets.cps import CPS_2024
169169

170-
# CPS_2024 may not exist yet during parallel CI builds.
171-
# Fall back to CPS_2021 release artifact if needed.
172-
try:
173-
cps = Microsimulation(dataset=CPS_2024)
174-
except Exception:
175-
from policyengine_us_data.datasets.cps import CPS_2021
176-
177-
cps = Microsimulation(dataset=CPS_2021)
170+
cps = Microsimulation(dataset=CPS_2024)
178171
cps.subsample(10_000)
179172

180173
predictors = [

0 commit comments

Comments
 (0)