Skip to content

Commit 0ed7341

Browse files
authored
Tolerate post-H5 Modal volume reload conflicts (#988)
* Tolerate post-H5 volume reload conflicts * Add changelog for H5 reload fix * Stage local H5s with candidate scope
1 parent 3dd4d12 commit 0ed7341

7 files changed

Lines changed: 174 additions & 8 deletions

File tree

changelog.d/988.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Tolerated Modal open-file conflicts when reloading the pipeline volume after local H5 child builds complete.

modal_app/local_area.py

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@
4444
)
4545
from policyengine_us_data.pipeline_metadata import pipeline_node # noqa: E402
4646
from policyengine_us_data.pipeline_schema import PipelineNode # noqa: E402
47-
from policyengine_us_data.utils.run_context import resolve_run_id # noqa: E402
47+
from policyengine_us_data.utils.run_context import ( # noqa: E402
48+
resolve_candidate_version,
49+
resolve_run_id,
50+
)
4851

4952
app = modal.App(
5053
os.environ.get("US_DATA_LOCAL_AREA_APP_NAME") or "policyengine-us-data-local-area"
@@ -361,6 +364,17 @@ def get_version() -> str:
361364
return pyproject["project"]["version"]
362365

363366

367+
def get_staging_candidate_version(
368+
fallback_version: str,
369+
explicit_candidate_version: str = "",
370+
) -> str:
371+
"""Resolve the HF staging candidate scope for local-area artifacts."""
372+
return (
373+
resolve_candidate_version(explicit_candidate_version, env=os.environ)
374+
or fallback_version
375+
)
376+
377+
364378
@pipeline_node(
365379
PipelineNode(
366380
id="build_publishing_input_bundle",
@@ -901,7 +915,11 @@ def validate_staging(branch: str, run_id: str, version: str = "") -> Dict:
901915
)
902916
)
903917
def upload_to_staging(
904-
branch: str, version: str, manifest: Dict, run_id: str = ""
918+
branch: str,
919+
version: str,
920+
manifest: Dict,
921+
run_id: str = "",
922+
candidate_version: str = "",
905923
) -> str:
906924
"""
907925
Upload files to HuggingFace staging only.
@@ -912,6 +930,10 @@ def upload_to_staging(
912930
setup_repo(branch)
913931

914932
manifest_json = json.dumps(manifest)
933+
staging_candidate_version = get_staging_candidate_version(
934+
version,
935+
explicit_candidate_version=candidate_version,
936+
)
915937

916938
result = subprocess.run(
917939
_python_cmd(
@@ -924,6 +946,7 @@ def upload_to_staging(
924946
925947
manifest = json.loads('''{manifest_json}''')
926948
version = "{version}"
949+
staging_candidate_version = "{staging_candidate_version}"
927950
run_id = "{run_id}"
928951
staging_dir = Path("{VOLUME_MOUNT}")
929952
run_dir = staging_dir / run_id
@@ -947,10 +970,14 @@ def upload_to_staging(
947970
948971
# Upload to HuggingFace staging/
949972
print(f"Uploading {{len(files_with_paths)}} files to HuggingFace staging/...")
950-
hf_count = upload_to_staging_hf(files_with_paths, version, run_id=run_id)
973+
hf_count = upload_to_staging_hf(
974+
files_with_paths,
975+
candidate_version=staging_candidate_version,
976+
run_id=run_id,
977+
)
951978
print(f"Uploaded {{hf_count}} files to HuggingFace staging/")
952979
953-
print(f"Staged version {{version}} for promotion")
980+
print(f"Staged candidate {{staging_candidate_version}} for promotion")
954981
""",
955982
),
956983
text=True,
@@ -961,7 +988,8 @@ def upload_to_staging(
961988
raise RuntimeError(f"Upload failed: {result.stderr}")
962989

963990
return (
964-
f"Staged version {version} with {len(manifest['files'])} files. "
991+
f"Staged candidate {staging_candidate_version} with "
992+
f"{len(manifest['files'])} files. "
965993
f"Run promote workflow to publish to HuggingFace production and GCS."
966994
)
967995

@@ -1065,6 +1093,7 @@ def coordinate_publish(
10651093
n_clones: int = 430,
10661094
validate: bool = True,
10671095
run_id: str = "",
1096+
candidate_version: str = "",
10681097
expected_fingerprint: str = "",
10691098
work_items_override: List[Dict] | None = None,
10701099
) -> Dict:
@@ -1073,6 +1102,10 @@ def coordinate_publish(
10731102
setup_repo(branch)
10741103

10751104
version = get_version()
1105+
staging_candidate_version = get_staging_candidate_version(
1106+
version,
1107+
explicit_candidate_version=candidate_version,
1108+
)
10761109

10771110
run_id = run_id or resolve_run_id()
10781111
if not run_id:
@@ -1085,6 +1118,7 @@ def coordinate_publish(
10851118
print(f"Run ID: {run_id}")
10861119
print("=" * 60)
10871120
print(f"Publishing version {version} from branch {branch}")
1121+
print(f"Staging candidate {staging_candidate_version}")
10881122
print(f"Using {num_workers} parallel workers")
10891123

10901124
staging_dir = Path(VOLUME_MOUNT)
@@ -1320,7 +1354,11 @@ def coordinate_publish(
13201354

13211355
print("\nStarting upload to staging...")
13221356
result = upload_to_staging.remote(
1323-
branch=branch, version=version, manifest=manifest, run_id=run_id
1357+
branch=branch,
1358+
version=version,
1359+
manifest=manifest,
1360+
run_id=run_id,
1361+
candidate_version=staging_candidate_version,
13241362
)
13251363
print(result)
13261364

@@ -1382,12 +1420,17 @@ def coordinate_national_publish(
13821420
validate: bool = True,
13831421
run_id: str = "",
13841422
skip_upload: bool = False,
1423+
candidate_version: str = "",
13851424
) -> Dict:
13861425
"""Build and upload a national US.h5 from national weights."""
13871426
setup_gcp_credentials()
13881427
setup_repo(branch)
13891428

13901429
version = get_version()
1430+
staging_candidate_version = get_staging_candidate_version(
1431+
version,
1432+
explicit_candidate_version=candidate_version,
1433+
)
13911434

13921435
run_id = run_id or resolve_run_id()
13931436
if not run_id:
@@ -1400,6 +1443,7 @@ def coordinate_national_publish(
14001443
print(f"Run ID: {run_id}")
14011444
print("=" * 60)
14021445
print(f"Building national H5 for version {version} from branch {branch}")
1446+
print(f"Staging candidate {staging_candidate_version}")
14031447

14041448
staging_dir = Path(VOLUME_MOUNT)
14051449

@@ -1557,7 +1601,7 @@ def coordinate_national_publish(
15571601
)
15581602
upload_to_staging_hf(
15591603
[("{national_h5}", "national/US.h5")],
1560-
"{version}",
1604+
candidate_version="{staging_candidate_version}",
15611605
run_id="{run_id}",
15621606
)
15631607
print("Done")

modal_app/pipeline.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,23 @@ def _python_cmd(*args: str) -> list[str]:
140140
return [sys.executable, *args]
141141

142142

143+
def _try_reload_pipeline_volume_after_h5_builds(vol) -> bool:
144+
"""Reload the pipeline volume unless Modal still sees child-open artifacts."""
145+
146+
try:
147+
vol.reload()
148+
return True
149+
except RuntimeError as exc:
150+
message = str(exc)
151+
if "open files preventing the operation" not in message:
152+
raise
153+
print(
154+
"WARNING: Skipping pipeline volume reload after H5 builds because "
155+
f"Modal still reports an open artifact file: {message}"
156+
)
157+
return False
158+
159+
143160
def _calibration_package_parameters(
144161
*,
145162
workers: int,
@@ -1611,6 +1628,7 @@ def run_pipeline(
16111628
n_clones=n_clones,
16121629
validate=True,
16131630
run_id=run_id,
1631+
candidate_version=candidate_version,
16141632
expected_fingerprint=(
16151633
meta.regional_fingerprint or meta.fingerprint or ""
16161634
),
@@ -1636,6 +1654,7 @@ def run_pipeline(
16361654
n_clones=n_clones,
16371655
validate=True,
16381656
run_id=run_id,
1657+
candidate_version=candidate_version,
16391658
)
16401659
print(
16411660
f" → coordinate_national_publish fc: {national_h5_handle.object_id}"
@@ -1672,7 +1691,7 @@ def run_pipeline(
16721691
)
16731692
print(f" National H5: {national_msg}")
16741693

1675-
pipeline_volume.reload()
1694+
_try_reload_pipeline_volume_after_h5_builds(pipeline_volume)
16761695
staging_volume.reload()
16771696

16781697
if isinstance(regional_h5_result, dict) and regional_h5_result.get(

tests/support/modal_local_area.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,24 @@ def decorator(func):
103103

104104
fake_pipeline_metadata.pipeline_node = _fake_pipeline_node
105105
fake_pipeline_schema.PipelineNode = _FakePipelineNode
106+
107+
def _fake_resolve_candidate_version(
108+
explicit="",
109+
*,
110+
env=None,
111+
**kwargs,
112+
):
113+
env = env or {}
114+
return (
115+
explicit
116+
or env.get("US_DATA_CANDIDATE_SCOPE", "")
117+
or env.get("US_DATA_CANDIDATE_VERSION", "")
118+
or env.get("CANDIDATE_SCOPE", "")
119+
or env.get("CANDIDATE_VERSION", "")
120+
or env.get("US_DATA_PACKAGE_VERSION", "")
121+
)
122+
123+
fake_run_context.resolve_candidate_version = _fake_resolve_candidate_version
106124
fake_run_context.resolve_run_id = lambda explicit="", **kwargs: explicit
107125
fake_partitioning.partition_weighted_work_items = lambda *args, **kwargs: []
108126

tests/unit/test_modal_local_area.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,41 @@ def fake_run(cmd, **kwargs):
127127
assert f'version = "{run_id}"' not in script
128128

129129

130+
def test_staging_candidate_version_prefers_pipeline_candidate(monkeypatch):
131+
local_area = load_local_area_module()
132+
133+
monkeypatch.setenv("US_DATA_CANDIDATE_VERSION", "1.115.2-patch")
134+
135+
assert local_area.get_staging_candidate_version("1.115.2") == "1.115.2-patch"
136+
137+
138+
def test_upload_to_staging_uses_candidate_scope(monkeypatch):
139+
local_area = load_local_area_module()
140+
captured = {}
141+
142+
monkeypatch.setattr(local_area, "setup_repo", lambda branch: None)
143+
144+
def fake_run(cmd, **kwargs):
145+
captured["cmd"] = cmd
146+
return SimpleNamespace(returncode=0, stderr="")
147+
148+
monkeypatch.setattr(local_area.subprocess, "run", fake_run)
149+
150+
result = local_area.upload_to_staging(
151+
branch="main",
152+
version="1.115.2",
153+
manifest={"files": {"states/NC.h5": {"sha256": "abc"}}},
154+
run_id="usdata-gha123-a1",
155+
candidate_version="1.115.2-patch",
156+
)
157+
158+
script = captured["cmd"][-1]
159+
assert 'version = "1.115.2"' in script
160+
assert 'staging_candidate_version = "1.115.2-patch"' in script
161+
assert "candidate_version=staging_candidate_version" in script
162+
assert result.startswith("Staged candidate 1.115.2-patch")
163+
164+
130165
def test_build_publishing_input_bundle_preserves_traceability_inputs():
131166
local_area = load_local_area_module(stub_policyengine=False)
132167

tests/unit/test_pipeline.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
_new_run_metadata,
1717
_pipeline_error_summary,
1818
_run_required_promotion_subprocess,
19+
_try_reload_pipeline_volume_after_h5_builds,
1920
)
2021
from modal_app.step_manifests.state import RunMetadata # noqa: E402
2122
from modal_app.step_manifests.store import ( # noqa: E402
@@ -71,6 +72,29 @@ def test_national_fit_lambda_matches_national_preset():
7172
assert NATIONAL_FIT_LAMBDA_L0 == pytest.approx(1e-4)
7273

7374

75+
def test_try_reload_pipeline_volume_after_h5_builds_tolerates_modal_open_file():
76+
class VolumeWithOpenFileConflict:
77+
def reload(self):
78+
raise RuntimeError(
79+
"there are open files preventing the operation: "
80+
"path artifacts/run/policy_data.db is open"
81+
)
82+
83+
assert (
84+
_try_reload_pipeline_volume_after_h5_builds(VolumeWithOpenFileConflict())
85+
is False
86+
)
87+
88+
89+
def test_try_reload_pipeline_volume_after_h5_builds_reraises_other_errors():
90+
class BrokenVolume:
91+
def reload(self):
92+
raise RuntimeError("volume service unavailable")
93+
94+
with pytest.raises(RuntimeError, match="volume service unavailable"):
95+
_try_reload_pipeline_volume_after_h5_builds(BrokenVolume())
96+
97+
7498
def test_pipeline_error_summary_uses_traceback_ref_when_available():
7599
ref = ArtifactReference(
76100
path="runs/run-1/errors/error.json",

tests/unit/test_pipeline_source_contracts.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,31 @@ def test_run_pipeline_refreshes_diagnostics_even_when_h5_outputs_reused() -> Non
109109
assert "Upload validation diagnostics even when H5 outputs are reused." in source
110110

111111

112+
def test_run_pipeline_tolerates_post_h5_pipeline_volume_open_files() -> None:
113+
source_text = PIPELINE_SOURCE.read_text()
114+
tree = ast.parse(source_text)
115+
run_pipeline = _function_def(tree, "run_pipeline")
116+
reload_helper = _function_def(tree, "_try_reload_pipeline_volume_after_h5_builds")
117+
run_pipeline_source = ast.get_source_segment(source_text, run_pipeline)
118+
helper_source = ast.get_source_segment(source_text, reload_helper)
119+
120+
assert "_try_reload_pipeline_volume_after_h5_builds(pipeline_volume)" in (
121+
run_pipeline_source
122+
)
123+
assert "pipeline_volume.reload()" not in run_pipeline_source
124+
assert "open files preventing the operation" in helper_source
125+
assert "return False" in helper_source
126+
127+
128+
def test_run_pipeline_passes_candidate_version_to_h5_publishers() -> None:
129+
source_text = PIPELINE_SOURCE.read_text()
130+
tree = ast.parse(source_text)
131+
run_pipeline = _function_def(tree, "run_pipeline")
132+
source = ast.get_source_segment(source_text, run_pipeline)
133+
134+
assert source.count("candidate_version=candidate_version") >= 2
135+
136+
112137
def test_full_release_path_combines_base_regional_and_national_outputs():
113138
tree = ast.parse(PIPELINE_SOURCE.read_text())
114139
helper = _function_def(tree, "_full_release_staging_rel_paths")

0 commit comments

Comments
 (0)