Skip to content

Commit 5aadef5

Browse files
authored
Merge pull request #853 from PolicyEngine/maria/pipeline_fix
Fix Modal data-build pipeline failures (TANF retry, rent ETERNITY workaround, db diagnostics) Skipping, given the H5 publication pathway is broken anyway
2 parents b39e000 + 889cc5c commit 5aadef5

9 files changed

Lines changed: 121 additions & 41 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Read `is_household_head` directly from the ACS and CPS H5 datasets in `add_rent` and require `policyengine-core>=3.25.4` for PolicyEngine/policyengine-core#482, where user-supplied ETERNITY inputs were dropped after `_invalidate_all_caches`. Removes the empty-train-frame `ValueError` from `sample(10_000)` that was failing the Modal CPS build.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve Modal data-build diagnostics so database-step failures preserve their stdout and stderr in the build log.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Skip the flaky ACF HTML landing page in `etl_tanf` and fetch the FY-stamped workbooks directly via a per-year `TANF_WORKBOOK_URLS` constant; keep the tenacity retry / extended timeout around the workbook GET so transient `acf.gov` slowness does not fail `make database` on Modal builds.

modal_app/data_build.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -479,12 +479,10 @@ def build_datasets(
479479
log_file=log_file,
480480
)
481481
# Build policy_data.db from source
482-
subprocess.run(
483-
["make", "database"],
484-
check=True,
485-
cwd="/root/policyengine-us-data",
486-
env=env,
487-
)
482+
env["PYTHONUNBUFFERED"] = "1"
483+
log_file.write(f"\n{'=' * 60}\nStarting make database...\n{'=' * 60}\n")
484+
log_file.flush()
485+
run_script_logged(["make", "database"], log_file, env)
488486
# Checkpoint policy_data.db immediately after build so it survives
489487
# test failures and can be restored on retries.
490488
save_checkpoint(

policyengine_us_data/datasets/cps/cps.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,19 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame):
262262
]
263263
IMPUTATIONS = ["rent", "real_estate_taxes"]
264264
train_df = acs.calculate_dataframe(PREDICTORS + IMPUTATIONS, map_to="person")
265+
# TODO(PolicyEngine/policyengine-core#482): policyengine-core 3.24.0+
266+
# silently drops user-supplied ETERNITY inputs on dataset reload because
267+
# _user_input_keys records the user-supplied period instead of the
268+
# canonicalized ETERNITY key. is_household_head therefore comes back as
269+
# all False from calculate_dataframe and the household-head filter below
270+
# produces an empty frame. For ACS we read it directly from the source
271+
# H5; for CPS we use the in-memory dict (already populated upstream in
272+
# add_id_variables). Remove both overrides once pyproject.toml's
273+
# policyengine-core upper bound is lifted.
274+
with h5py.File(ACS_2022.file_path, "r") as acs_h5:
275+
train_df["is_household_head"] = np.asarray(
276+
acs_h5["is_household_head"], dtype=bool
277+
)
265278
train_df.tenure_type = train_df.tenure_type.map(
266279
{
267280
"OWNED_OUTRIGHT": "OWNED_WITH_MORTGAGE",
@@ -270,6 +283,7 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame):
270283
).fillna(train_df.tenure_type)
271284
train_df = train_df[train_df.is_household_head].sample(10_000)
272285
inference_df = cps_sim.calculate_dataframe(PREDICTORS, map_to="person")
286+
inference_df["is_household_head"] = np.asarray(cps["is_household_head"], dtype=bool)
273287
mask = inference_df.is_household_head.values
274288
inference_df = inference_df[mask]
275289

policyengine_us_data/db/etl_tanf.py

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55
import pandas as pd
66
import requests
77
from sqlmodel import Session, create_engine
8+
from tenacity import (
9+
before_sleep_log,
10+
retry,
11+
retry_if_exception_type,
12+
stop_after_attempt,
13+
wait_exponential,
14+
)
815

916
from policyengine_us_data.storage import STORAGE_FOLDER
1017
from policyengine_us_data.db.create_database_tables import (
@@ -19,14 +26,53 @@
1926
logger = logging.getLogger(__name__)
2027

2128
ACF_DATA_YEAR = 2024
22-
CASELOAD_PAGE_URL = "https://www.acf.hhs.gov/ofa/data/tanf-caseload-data-2024"
23-
FINANCIAL_PAGE_URL = "https://www.acf.hhs.gov/ofa/data/tanf-financial-data-fy-2024"
24-
CASELOAD_URL_PATTERN = re.compile(
25-
r"https://acf\.gov/sites/default/files/documents/ofa/fy\d{4}_tanf_caseload\.xlsx"
26-
)
27-
FINANCIAL_URL_PATTERN = re.compile(
28-
r"https://acf\.gov/sites/default/files/documents/ofa/fy-\d{4}-tanf-moe-financial-data\.xlsx"
29+
ACF_REQUEST_TIMEOUT = 60
30+
31+
# Direct URLs for the FY-stamped ACF workbooks. The previous implementation
32+
# scraped the HTML landing page (`acf.gov/ofa/data/tanf-...`) to discover
33+
# these links, but that page is intermittently unreachable on `acf.gov` and
34+
# was the dominant source of `make database` build failures (see #852). The
35+
# workbook URLs themselves on `acf.gov/sites/default/files/documents/ofa/`
36+
# return 200 reliably, so we hit them directly and skip the page entirely.
37+
#
38+
# Update this dict when:
39+
# - ACF publishes a new fiscal year's workbooks (add a new top-level key
40+
# and bump `ACF_DATA_YEAR` / `_validate_supported_year`),
41+
# - or ACF renames an existing FY's workbook on disk. A 404 from
42+
# `_acf_get` is the early signal — that's an authoritative file
43+
# rename, not a transient outage, and the new path needs to be
44+
# copied in by hand from the corresponding ACF page.
45+
TANF_WORKBOOK_URLS: dict[int, dict[str, str]] = {
46+
2024: {
47+
"caseload": (
48+
"https://acf.gov/sites/default/files/documents/ofa/"
49+
"fy2024_tanf_caseload.xlsx"
50+
),
51+
"financial": (
52+
"https://acf.gov/sites/default/files/documents/ofa/"
53+
"fy-2024-tanf-moe-financial-data.xlsx"
54+
),
55+
},
56+
}
57+
58+
59+
@retry(
60+
stop=stop_after_attempt(5),
61+
wait=wait_exponential(multiplier=2, min=5, max=60),
62+
retry=retry_if_exception_type(
63+
(
64+
requests.exceptions.Timeout,
65+
requests.exceptions.ConnectionError,
66+
requests.exceptions.ChunkedEncodingError,
67+
)
68+
),
69+
before_sleep=before_sleep_log(logger, logging.WARNING),
70+
reraise=True,
2971
)
72+
def _acf_get(session: requests.Session, url: str) -> requests.Response:
73+
response = session.get(url, timeout=ACF_REQUEST_TIMEOUT)
74+
response.raise_for_status()
75+
return response
3076

3177

3278
def _validate_supported_year(year: int) -> None:
@@ -37,9 +83,7 @@ def _validate_supported_year(year: int) -> None:
3783
)
3884

3985

40-
def _download_acf_excel(
41-
page_url: str, cache_file: str, url_pattern: re.Pattern
42-
) -> bytes:
86+
def _download_acf_excel(workbook_url: str, cache_file: str) -> bytes:
4387
if is_cached(cache_file):
4488
logger.info("Using cached %s", cache_file)
4589
return load_bytes(cache_file)
@@ -54,25 +98,16 @@ def _download_acf_excel(
5498
}
5599
)
56100

57-
page_response = session.get(page_url, timeout=30)
58-
page_response.raise_for_status()
59-
match = url_pattern.search(page_response.text)
60-
if match is None:
61-
raise ValueError(f"Could not find TANF workbook URL on {page_url}")
62-
63-
workbook_url = match.group(0)
64-
workbook_response = session.get(workbook_url, timeout=60)
65-
workbook_response.raise_for_status()
101+
workbook_response = _acf_get(session, workbook_url)
66102
save_bytes(cache_file, workbook_response.content)
67103
return workbook_response.content
68104

69105

70106
def extract_tanf_caseload_data(year: int) -> pd.DataFrame:
71107
_validate_supported_year(year)
72108
workbook = _download_acf_excel(
73-
CASELOAD_PAGE_URL,
109+
TANF_WORKBOOK_URLS[ACF_DATA_YEAR]["caseload"],
74110
f"tanf_caseload_{ACF_DATA_YEAR}.xlsx",
75-
CASELOAD_URL_PATTERN,
76111
)
77112
return pd.read_excel(io.BytesIO(workbook), sheet_name="TFam", header=3)
78113

@@ -115,9 +150,8 @@ def extract_tanf_financial_data(
115150
) -> tuple[pd.DataFrame, dict[str, pd.DataFrame]]:
116151
_validate_supported_year(year)
117152
workbook = _download_acf_excel(
118-
FINANCIAL_PAGE_URL,
153+
TANF_WORKBOOK_URLS[ACF_DATA_YEAR]["financial"],
119154
f"tanf_financial_{ACF_DATA_YEAR}.xlsx",
120-
FINANCIAL_URL_PATTERN,
121155
)
122156
xls = pd.ExcelFile(io.BytesIO(workbook))
123157
national_df = pd.read_excel(

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ classifiers = [
2323
]
2424
dependencies = [
2525
"policyengine-us>=1.674.1",
26-
"policyengine-core>=3.23.6",
26+
# policyengine-core 3.25.4 fixes PolicyEngine/policyengine-core#482
27+
# (user-set ETERNITY inputs lost after _invalidate_all_caches).
28+
"policyengine-core>=3.25.4,<3.26",
2729
"pandas>=2.3.1",
2830
"requests>=2.25.0",
2931
"tqdm>=4.60.0",

tests/unit/datasets/test_cps_file_handles.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from types import SimpleNamespace
22

3+
import h5py
34
import numpy as np
45
import pandas as pd
56

@@ -249,8 +250,25 @@ def recording_hdfstore(path, mode="a", *args, **kwargs):
249250
opened_modes.append(mode)
250251
return real_hdfstore(path, mode=mode, *args, **kwargs)
251252

252-
def fail_h5py_file(*args, **kwargs):
253-
raise AssertionError("add_rent should not reopen the existing H5 with h5py")
253+
# ACS fixture H5 backing the policyengine-core#482 workaround in add_rent
254+
# (it reads is_household_head straight from the source H5 since
255+
# calculate_dataframe drops user-set ETERNITY inputs under
256+
# policyengine-core 3.24.0+). 10_000 True values match the size of the
257+
# FakeMicrosimulation train frame below.
258+
acs_fixture_path = tmp_path / "acs_fixture.h5"
259+
with h5py.File(acs_fixture_path, "w") as acs_fixture:
260+
acs_fixture["is_household_head"] = np.ones(10_000, dtype=bool)
261+
262+
real_h5py_file = cps_module.h5py.File
263+
opened_h5_paths = []
264+
265+
def recording_h5py_file(path, mode="r", *args, **kwargs):
266+
opened_h5_paths.append((str(path), mode))
267+
if str(path) == str(existing_path):
268+
raise AssertionError(
269+
"add_rent should not reopen the existing CPS H5 with h5py"
270+
)
271+
return real_h5py_file(path, mode=mode, *args, **kwargs)
254272

255273
class FakeQRF:
256274
def fit(self, X_train, predictors, imputed_variables):
@@ -314,26 +332,33 @@ def save_dataset(self, data):
314332
self.saved.append(data)
315333

316334
monkeypatch.setattr(cps_module.pd, "HDFStore", recording_hdfstore)
317-
monkeypatch.setattr(cps_module.h5py, "File", fail_h5py_file)
335+
monkeypatch.setattr(cps_module.h5py, "File", recording_h5py_file)
318336
monkeypatch.setattr(cps_module, "QRF", FakeQRF)
319337

320338
import policyengine_us
321339
import policyengine_us_data.datasets.acs.acs as acs_module
322340

341+
class FakeACS_2022:
342+
file_path = acs_fixture_path
343+
323344
monkeypatch.setattr(policyengine_us, "Microsimulation", FakeMicrosimulation)
324-
monkeypatch.setattr(acs_module, "ACS_2022", object())
345+
monkeypatch.setattr(acs_module, "ACS_2022", FakeACS_2022)
325346

326347
dataset = FakeDataset()
327348
cps = {
328349
"age": np.array([40], dtype=np.int32),
329350
"spm_unit_capped_housing_subsidy_reported": np.array([0.0]),
351+
# add_id_variables populates this upstream of add_rent in the real
352+
# pipeline; see the policyengine-core#482 workaround override below.
353+
"is_household_head": np.array([True]),
330354
}
331355
person = pd.DataFrame({"dummy": [1]})
332356
household = pd.DataFrame({"H_TENURE": [2]})
333357

334358
add_rent(dataset, cps, person, household)
335359

336360
assert opened_modes == ["r"]
361+
assert opened_h5_paths == [(str(acs_fixture_path), "r")]
337362
assert not existing_path.exists()
338363
np.testing.assert_array_equal(cps["rent"], np.array([1_000.0]))
339364
np.testing.assert_array_equal(cps["real_estate_taxes"], np.array([250.0]))

0 commit comments

Comments
 (0)