Skip to content

Commit c1cf551

Browse files
authored
Update data build to policyengine-us 1.690.7 (#943)
1 parent 3a1a06a commit c1cf551

8 files changed

Lines changed: 147 additions & 23 deletions

File tree

changelog.d/943.changed

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Update the production data-build runtime to policyengine-us 1.690.7 and harden Modal pipeline resume behavior.

modal_app/pipeline.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def _calibration_package_parameters(
143143
) -> dict:
144144
"""Return manifest parameters that affect package construction."""
145145
effective_parallel = bool(chunked_matrix and parallel_matrix)
146-
return {
146+
params = {
147147
"workers": workers if not chunked_matrix else None,
148148
"n_clones": n_clones,
149149
"target_config": target_config,
@@ -153,6 +153,7 @@ def _calibration_package_parameters(
153153
"parallel_matrix": effective_parallel,
154154
"num_matrix_workers": num_matrix_workers if effective_parallel else None,
155155
}
156+
return {key: value for key, value in params.items() if value is not None}
156157

157158

158159
def get_pinned_sha(branch: str) -> str:
@@ -1092,6 +1093,15 @@ def run_pipeline(
10921093
expected_input_identities=package_inputs,
10931094
expected_parameters=package_parameters,
10941095
)
1096+
if not package_reuse.reusable:
1097+
previous = package_reuse.manifest
1098+
print(f" Package reuse invalidated: {package_reuse.reason}")
1099+
if previous is not None:
1100+
print(f" prior status: {previous.status}")
1101+
print(f" prior parameters: {previous.parameters}")
1102+
print(f" expected parameters: {package_parameters}")
1103+
print(f" prior inputs: {previous.input_identities}")
1104+
print(f" expected inputs: {package_inputs}")
10951105
if package_reuse.reusable:
10961106
_mark_step_reused(
10971107
meta,
@@ -1502,9 +1512,8 @@ def run_pipeline(
15021512
vol=pipeline_volume,
15031513
)
15041514

1505-
pipeline_volume.reload()
1506-
1507-
# Now wait for H5 builds to finish
1515+
# Now wait for H5 builds to finish. Do not reload the shared
1516+
# volume until the child jobs release SQLite handles.
15081517
print(" Waiting for regional H5 build...")
15091518
regional_h5_result = regional_h5_handle.get()
15101519
regional_msg = (
@@ -1514,6 +1523,20 @@ def run_pipeline(
15141523
)
15151524
print(f" Regional H5: {regional_msg}")
15161525

1526+
national_h5_result = None
1527+
if national_h5_handle is not None:
1528+
print(" Waiting for national H5 build...")
1529+
national_h5_result = national_h5_handle.get()
1530+
national_msg = (
1531+
national_h5_result.get("message", national_h5_result)
1532+
if isinstance(national_h5_result, dict)
1533+
else national_h5_result
1534+
)
1535+
print(f" National H5: {national_msg}")
1536+
1537+
pipeline_volume.reload()
1538+
staging_volume.reload()
1539+
15171540
if isinstance(regional_h5_result, dict) and regional_h5_result.get(
15181541
"fingerprint"
15191542
):
@@ -1542,16 +1565,7 @@ def run_pipeline(
15421565
)
15431566
active_step_manifest = national_h5_manifest
15441567

1545-
national_h5_result = None
15461568
if national_h5_handle is not None:
1547-
print(" Waiting for national H5 build...")
1548-
national_h5_result = national_h5_handle.get()
1549-
national_msg = (
1550-
national_h5_result.get("message", national_h5_result)
1551-
if isinstance(national_h5_result, dict)
1552-
else national_h5_result
1553-
)
1554-
print(f" National H5: {national_msg}")
15551569
if isinstance(national_h5_result, dict) and national_h5_result.get(
15561570
"fingerprint"
15571571
):

modal_app/step_manifests/state.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
from __future__ import annotations
44

55
import os
6+
import hashlib
7+
import json
8+
import sqlite3
69
from dataclasses import asdict, dataclass, field, fields
710
from pathlib import Path
811
from typing import Optional
@@ -100,13 +103,91 @@ def artifacts_dir(run_id: str) -> Path:
100103
return Path(artifacts_dir_for_run(run_id))
101104

102105

106+
def _quote_sql_identifier(identifier: str) -> str:
107+
return '"' + identifier.replace('"', '""') + '"'
108+
109+
110+
def _canonical_sqlite_value(value):
111+
if isinstance(value, bytes):
112+
return {"__bytes__": value.hex()}
113+
return value
114+
115+
116+
def _canonical_sqlite_sha256(path: Path) -> str:
117+
"""Hash logical SQLite contents instead of mutable file metadata."""
118+
digest = hashlib.sha256()
119+
120+
def update(payload) -> None:
121+
digest.update(
122+
json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()
123+
)
124+
digest.update(b"\n")
125+
126+
with sqlite3.connect(f"file:{path}?mode=ro", uri=True) as conn:
127+
conn.row_factory = sqlite3.Row
128+
schema_rows = conn.execute(
129+
"""
130+
SELECT type, name, tbl_name, sql
131+
FROM sqlite_master
132+
WHERE name NOT LIKE 'sqlite_%'
133+
ORDER BY type, name
134+
"""
135+
).fetchall()
136+
update(
137+
{
138+
"schema": [
139+
{
140+
"type": row["type"],
141+
"name": row["name"],
142+
"tbl_name": row["tbl_name"],
143+
"sql": row["sql"],
144+
}
145+
for row in schema_rows
146+
]
147+
}
148+
)
149+
150+
table_names = [row["name"] for row in schema_rows if row["type"] == "table"]
151+
for table_name in table_names:
152+
columns = [
153+
row["name"]
154+
for row in conn.execute(
155+
f"PRAGMA table_info({_quote_sql_identifier(table_name)})"
156+
)
157+
]
158+
quoted_columns = [_quote_sql_identifier(column) for column in columns]
159+
select_columns = ", ".join(quoted_columns)
160+
order_columns = ", ".join(quoted_columns)
161+
for row in conn.execute(
162+
f"""
163+
SELECT {select_columns}
164+
FROM {_quote_sql_identifier(table_name)}
165+
ORDER BY {order_columns}
166+
"""
167+
):
168+
update(
169+
{
170+
"table": table_name,
171+
"row": [
172+
_canonical_sqlite_value(row[column]) for column in columns
173+
],
174+
}
175+
)
176+
return digest.hexdigest()
177+
178+
103179
def artifact_identity(path: str | Path) -> dict:
104180
artifact = ArtifactReference.from_path(path)
105-
return {
181+
identity = {
106182
"path": artifact.path,
107183
"size_bytes": artifact.size_bytes,
108184
"sha256": artifact.sha256,
109185
}
186+
if Path(path).suffix == ".db":
187+
identity["sha256"] = _canonical_sqlite_sha256(Path(path))
188+
identity.pop("size_bytes", None)
189+
identity["identity_kind"] = "sqlite_content"
190+
return identity
110191

111192

112193
def artifact_identities(paths: dict[str, str | Path]) -> dict:

policyengine_us_data/calibration/calibration_utils.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,12 @@ def create_target_groups(
344344
pairs = sorted(
345345
level_df[["domain_variable", "variable"]]
346346
.drop_duplicates()
347-
.itertuples(index=False, name=None)
347+
.itertuples(index=False, name=None),
348+
key=lambda pair: (
349+
pair[0] is not None,
350+
"" if pair[0] is None else str(pair[0]),
351+
str(pair[1]),
352+
),
348353
)
349354
else:
350355
pairs = [(None, v) for v in sorted(level_df["variable"].unique())]
@@ -353,8 +358,11 @@ def create_target_groups(
353358
var_mask = (
354359
(targets_df["variable"] == var_name) & level_mask & ~processed_mask
355360
)
356-
if has_domain and domain_var is not None:
357-
var_mask &= targets_df["domain_variable"] == domain_var
361+
if has_domain:
362+
if domain_var is None:
363+
var_mask &= targets_df["domain_variable"].isna()
364+
else:
365+
var_mask &= targets_df["domain_variable"] == domain_var
358366

359367
if not var_mask.any():
360368
continue

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ classifiers = [
2222
"Programming Language :: Python :: 3.14",
2323
]
2424
dependencies = [
25-
"policyengine-us>=1.690.6",
25+
"policyengine-us>=1.690.7",
2626
# policyengine-core 3.26.1 is the current 3.26.x runtime and includes the fix for
2727
# PolicyEngine/policyengine-core#482 (user-set ETERNITY inputs lost
2828
# after _invalidate_all_caches) and is required by policyengine-us 1.682.1+.

tests/unit/calibration/test_drop_target_groups.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,23 @@ def sample_data():
4343

4444

4545
class TestDropTargetGroups:
46+
def test_target_groups_separate_null_and_string_domains(self):
47+
targets_df = pd.DataFrame(
48+
{
49+
"variable": ["person_count", "person_count", "snap"],
50+
"domain_variable": [None, "age", "snap"],
51+
"geographic_id": ["US", "US", "US"],
52+
"value": [1000, 200, 300],
53+
}
54+
)
55+
56+
target_groups, group_info = create_target_groups(targets_df)
57+
58+
assert len(set(target_groups)) == 3
59+
assert target_groups[0] != target_groups[1]
60+
assert any("Person Count" in info for info in group_info)
61+
assert any("AGE Person Count" in info for info in group_info)
62+
4663
def test_drops_matching_group(self, sample_data):
4764
targets_df, X, target_groups, group_info = sample_data
4865
n_before = len(targets_df)

uv.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

validation/stage_1/test_xw_consistency.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ def test_xw_matches_stacked_sim():
8080
hierarchical_domains=["snap"],
8181
rerandomize_takeup=True,
8282
county_level=False,
83-
workers=2,
83+
# Keep this validation serial. In Modal's Python 3.14 runtime the
84+
# short-lived ProcessPool path can leave the test waiting on futures
85+
# after the worker processes have exited.
86+
workers=1,
8487
)
8588

8689
takeup_filter = [spec["variable"] for spec in SIMPLE_TAKEUP_VARS]

0 commit comments

Comments
 (0)