Skip to content

Commit 0da4048

Browse files
committed
fixing data build order from #674
1 parent c8cac6a commit 0da4048

3 files changed

Lines changed: 28 additions & 50 deletions

File tree

modal_app/data_build.py

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -493,49 +493,27 @@ def build_datasets(
493493
for future in as_completed(futures):
494494
future.result() # Raises if script failed
495495

496-
# GROUP 2: Depends on Group 1 - run in parallel
497-
# cps.py needs acs, puf.py needs irs_puf + uprating
498-
print("=== Phase 2: Building CPS and PUF (parallel) ===")
499-
group2 = [
500-
(
501-
"policyengine_us_data/datasets/cps/cps.py",
502-
SCRIPT_OUTPUTS["policyengine_us_data/datasets/cps/cps.py"],
503-
),
504-
(
505-
"policyengine_us_data/datasets/puf/puf.py",
506-
SCRIPT_OUTPUTS["policyengine_us_data/datasets/puf/puf.py"],
507-
),
508-
]
509-
with ThreadPoolExecutor(max_workers=2) as executor:
510-
futures = {
511-
executor.submit(
512-
run_script_with_checkpoint,
513-
script,
514-
output,
515-
branch,
516-
checkpoint_volume,
517-
env=env,
518-
log_file=log_file,
519-
): script
520-
for script, output in group2
521-
}
522-
for future in as_completed(futures):
523-
future.result()
524-
525-
# SEQUENTIAL: Extended CPS (needs both cps and puf)
526-
print("=== Phase 3: Building extended CPS ===")
527-
run_script_with_checkpoint(
496+
# GROUP 2: Sequential chain — each step depends on the previous.
497+
# cps.py needs acs; puf.py needs irs_puf + uprating + cps
498+
# (pension imputation); extended_cps.py needs both cps and puf.
499+
print("=== Phase 2: Building CPS → PUF → extended CPS ===")
500+
for script in (
501+
"policyengine_us_data/datasets/cps/cps.py",
502+
"policyengine_us_data/datasets/puf/puf.py",
528503
"policyengine_us_data/datasets/cps/extended_cps.py",
529-
SCRIPT_OUTPUTS["policyengine_us_data/datasets/cps/extended_cps.py"],
530-
branch,
531-
checkpoint_volume,
532-
env=env,
533-
log_file=log_file,
534-
)
504+
):
505+
run_script_with_checkpoint(
506+
script,
507+
SCRIPT_OUTPUTS[script],
508+
branch,
509+
checkpoint_volume,
510+
env=env,
511+
log_file=log_file,
512+
)
535513

536514
# GROUP 3: After extended_cps - run in parallel
537515
# enhanced_cps and stratified_cps both depend on extended_cps
538-
print("=== Phase 4: Building enhanced and stratified CPS (parallel) ===")
516+
print("=== Phase 3: Building enhanced and stratified CPS (parallel) ===")
539517
phase4_futures = []
540518
with ThreadPoolExecutor(max_workers=2) as executor:
541519
if not skip_enhanced_cps:
@@ -570,11 +548,11 @@ def build_datasets(
570548
for future in as_completed(phase4_futures):
571549
future.result()
572550

573-
# GROUP 4: After Phase 4 - run in parallel
551+
# GROUP 4: After Phase 3 - run in parallel
574552
# create_source_imputed_cps needs stratified_cps
575553
# small_enhanced_cps needs enhanced_cps
576554
print(
577-
"=== Phase 5: Building source imputed CPS "
555+
"=== Phase 4: Building source imputed CPS "
578556
"and small enhanced CPS (parallel) ==="
579557
)
580558
phase5_futures = []

policyengine_us_data/datasets/org/org.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from functools import lru_cache
23

34
from microimpute.models.qrf import QRF
@@ -392,7 +393,13 @@ def load_org_training_data() -> pd.DataFrame:
392393
"""Load ORG donor rows built from official CPS basic monthly files."""
393394
cache_path = STORAGE_FOLDER / ORG_FILENAME
394395
if cache_path.exists():
395-
return pd.read_csv(cache_path)
396+
try:
397+
return pd.read_csv(cache_path)
398+
except (EOFError, pd.errors.ParserError):
399+
logging.warning(
400+
"Corrupt ORG cache %s — deleting and rebuilding", cache_path
401+
)
402+
cache_path.unlink()
396403

397404
months = []
398405
for month in ORG_MONTHS:

policyengine_us_data/datasets/puf/puf.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,7 @@ def impute_pension_contributions_to_puf(puf_df):
167167
from policyengine_us import Microsimulation
168168
from policyengine_us_data.datasets.cps import CPS_2024
169169

170-
# CPS_2024 may not exist yet during parallel CI builds.
171-
# Fall back to CPS_2021 release artifact if needed.
172-
try:
173-
cps = Microsimulation(dataset=CPS_2024)
174-
except Exception:
175-
from policyengine_us_data.datasets.cps import CPS_2021
176-
177-
cps = Microsimulation(dataset=CPS_2021)
170+
cps = Microsimulation(dataset=CPS_2024)
178171
cps.subsample(10_000)
179172

180173
predictors = [

0 commit comments

Comments
 (0)