Skip to content

Commit e43141b

Browse files
authored
Merge pull request #756 from PolicyEngine/fix-754-cps-puf-build-order
Avoid CPS/PUF parallel build race
2 parents 267eeb5 + 244a12e commit e43141b

3 files changed

Lines changed: 103 additions & 28 deletions

File tree

changelog.d/754.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Serialize CPS and PUF builds in the Modal integration data build pipeline to avoid reading `CPS_2024` while it is being written.

modal_app/data_build.py

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@
7777
),
7878
}
7979

80+
CPS_BUILD_SCRIPT = "policyengine_us_data/datasets/cps/cps.py"
81+
PUF_BUILD_SCRIPT = "policyengine_us_data/datasets/puf/puf.py"
82+
8083
# Test modules to run individually for checkpoint tracking
8184
TEST_MODULES = [
8285
"tests/unit/",
@@ -314,6 +317,25 @@ def run_script_with_checkpoint(
314317
return script_path
315318

316319

320+
def run_cps_then_puf_phase(
321+
branch: str,
322+
volume: modal.Volume,
323+
*,
324+
env: dict,
325+
log_file: IO = None,
326+
) -> None:
327+
"""Build CPS before PUF because PUF pension imputation loads CPS_2024."""
328+
for script in (CPS_BUILD_SCRIPT, PUF_BUILD_SCRIPT):
329+
run_script_with_checkpoint(
330+
script,
331+
SCRIPT_OUTPUTS[script],
332+
branch,
333+
volume,
334+
env=env,
335+
log_file=log_file,
336+
)
337+
338+
317339
def run_tests_with_checkpoints(
318340
branch: str,
319341
volume: modal.Volume,
@@ -508,34 +530,16 @@ def build_datasets(
508530
for future in as_completed(futures):
509531
future.result() # Raises if script failed
510532

511-
# GROUP 2: Depends on Group 1 - run in parallel
512-
# cps.py needs acs, puf.py needs irs_puf + uprating
513-
print("=== Phase 2: Building CPS and PUF (parallel) ===")
514-
group2 = [
515-
(
516-
"policyengine_us_data/datasets/cps/cps.py",
517-
SCRIPT_OUTPUTS["policyengine_us_data/datasets/cps/cps.py"],
518-
),
519-
(
520-
"policyengine_us_data/datasets/puf/puf.py",
521-
SCRIPT_OUTPUTS["policyengine_us_data/datasets/puf/puf.py"],
522-
),
523-
]
524-
with ThreadPoolExecutor(max_workers=2) as executor:
525-
futures = {
526-
executor.submit(
527-
run_script_with_checkpoint,
528-
script,
529-
output,
530-
branch,
531-
checkpoint_volume,
532-
env=env,
533-
log_file=log_file,
534-
): script
535-
for script, output in group2
536-
}
537-
for future in as_completed(futures):
538-
future.result()
533+
# GROUP 2: Depends on Group 1 - run sequentially.
534+
# puf.py pension imputation can instantiate CPS_2024, so it must
535+
# not run while cps.py is writing cps_2024.h5.
536+
print("=== Phase 2: Building CPS then PUF (sequential) ===")
537+
run_cps_then_puf_phase(
538+
branch,
539+
checkpoint_volume,
540+
env=env,
541+
log_file=log_file,
542+
)
539543

540544
# SEQUENTIAL: Extended CPS (needs both cps and puf)
541545
print("=== Phase 3: Building extended CPS ===")

tests/unit/test_modal_data_build.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,73 @@ def fake_run_script(script_path, args=None, env=None, log_file=None):
120120
{"TEST_ENV": "1"},
121121
),
122122
]
123+
124+
125+
def test_run_cps_then_puf_phase_uses_sequential_checkpointed_builds(
126+
monkeypatch,
127+
):
128+
data_build = _load_data_build_module()
129+
calls = []
130+
volume = object()
131+
log_file = object()
132+
env = {"TEST_ENV": "1"}
133+
134+
def fake_executor(*args, **kwargs):
135+
raise AssertionError("CPS/PUF phase must not use ThreadPoolExecutor")
136+
137+
def fake_run_script_with_checkpoint(
138+
script_path,
139+
output_files,
140+
branch,
141+
volume_arg,
142+
args=None,
143+
env=None,
144+
log_file=None,
145+
):
146+
calls.append(
147+
(
148+
script_path,
149+
output_files,
150+
branch,
151+
volume_arg,
152+
args,
153+
env,
154+
log_file,
155+
)
156+
)
157+
return script_path
158+
159+
monkeypatch.setattr(data_build, "ThreadPoolExecutor", fake_executor)
160+
monkeypatch.setattr(
161+
data_build,
162+
"run_script_with_checkpoint",
163+
fake_run_script_with_checkpoint,
164+
)
165+
166+
data_build.run_cps_then_puf_phase(
167+
"fix-754",
168+
volume,
169+
env=env,
170+
log_file=log_file,
171+
)
172+
173+
assert calls == [
174+
(
175+
data_build.CPS_BUILD_SCRIPT,
176+
data_build.SCRIPT_OUTPUTS[data_build.CPS_BUILD_SCRIPT],
177+
"fix-754",
178+
volume,
179+
None,
180+
env,
181+
log_file,
182+
),
183+
(
184+
data_build.PUF_BUILD_SCRIPT,
185+
data_build.SCRIPT_OUTPUTS[data_build.PUF_BUILD_SCRIPT],
186+
"fix-754",
187+
volume,
188+
None,
189+
env,
190+
log_file,
191+
),
192+
]

0 commit comments

Comments
 (0)