Skip to content

Commit 31b267a

Browse files
committed
Fix ORG loader fallback and Modal CI env
- Make ORG loading fall back from the CSV endpoint to the Census ZIP/dat source when the CSV response does not expose the required ORG fields. - Parse the ZIP member with the published fixed-width layout, then normalize it back onto the existing ORG schema before building donor rows. - Make pytest collection in the optimized Modal seam job lazy-load heavy version-manifest fixtures so the job does not need the full Google dependency chain. - Pin PR and push Modal runs to the main environment for consistent CI behavior.
1 parent 257a334 commit 31b267a

7 files changed

Lines changed: 258 additions & 26 deletions

File tree

.github/workflows/pr.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ jobs:
100100
with:
101101
python-version: "3.14"
102102
- name: Install optimized test deps
103-
run: pip install modal pandas pytest
103+
run: pip install modal pytest
104104
- name: Deploy Modal pipeline app
105105
run: modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/pipeline.py
106106
- name: Run optimized integration tests
@@ -160,6 +160,7 @@ jobs:
160160
env:
161161
MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
162162
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}
163+
MODAL_ENVIRONMENT: main
163164
HUGGING_FACE_TOKEN: ${{ secrets.HUGGING_FACE_TOKEN }}
164165
steps:
165166
- uses: actions/checkout@v4
@@ -184,6 +185,6 @@ jobs:
184185
} >> "$GITHUB_STEP_SUMMARY"
185186
fi
186187
187-
modal run modal_app/data_build.py \
188+
modal run --env="${MODAL_ENVIRONMENT}" modal_app/data_build.py \
188189
--branch=${{ github.head_ref || github.ref_name }} \
189190
${STAGE_ARGS}

.github/workflows/push.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ jobs:
2121
env:
2222
MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
2323
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}
24+
MODAL_ENVIRONMENT: main
2425
HUGGING_FACE_TOKEN: ${{ secrets.HUGGING_FACE_TOKEN }}
2526
steps:
2627
- uses: actions/checkout@v4
@@ -30,7 +31,7 @@ jobs:
3031
- run: pip install modal
3132
- name: Run linear integration tests on Modal
3233
run: |
33-
modal run modal_app/data_build.py \
34+
modal run --env="${MODAL_ENVIRONMENT}" modal_app/data_build.py \
3435
--upload \
3536
--branch=${{ github.ref_name }}
3637

modal_app/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ Produces `source_imputed_stratified_extended_cps_2024.h5` from raw CPS/PUF/ACS/S
145145
| Method | Command |
146146
|--------|---------|
147147
| **Local** | `make data` |
148-
| **Modal (CI)** | `modal run modal_app/data_build.py --branch=<branch>` |
148+
| **Modal (CI)** | `modal run --env=main modal_app/data_build.py --branch=<branch>` |
149149
| **GitHub Actions** | Automatic on merge to `main` via `code_changes.yaml``reusable_test.yaml` (with `full_suite: true`). Also triggered by `pr_code_changes.yaml` on PRs. |
150150

151151
Notes:

policyengine_us_data/datasets/org/README.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ the CPS ASEC records.
1111

1212
The checked-in code does not vendor the donor file itself. Instead,
1313
`org.py` builds `census_cps_org_2024_wages.csv.gz` on demand by
14-
downloading the twelve official CPS basic monthly public-use CSVs for
14+
downloading the twelve official CPS basic monthly public-use files for
1515
`ORG_YEAR` (currently 2024) directly from the Census Bureau and filtering
16-
each file to the ORG rotations.
16+
each file to the ORG rotations. The loader prefers the CSV endpoint and
17+
falls back to the ZIP'd fixed-width archive when the CSV is unavailable.
1718

1819
## Documentation
1920

@@ -31,8 +32,8 @@ See also:
3132

3233
## Data products in this folder
3334

34-
- `org.py` — downloads the twelve monthly CSVs, filters to the MIS-4 and
35-
MIS-8 outgoing rotations (`HRMIS`), and caches the combined ORG donor
36-
frame. Trains a QRF model to impute `wage_income`, `hourly_wage`, and
37-
union-coverage variables onto the CPS ASEC records used by the
38-
Enhanced CPS pipeline.
35+
- `org.py` — downloads the twelve monthly public-use files, filters to
36+
the MIS-4 and MIS-8 outgoing rotations (`HRMIS`), and caches the
37+
combined ORG donor frame. Trains a QRF model to impute `wage_income`,
38+
`hourly_wage`, and union-coverage variables onto the CPS ASEC records
39+
used by the Enhanced CPS pipeline.

policyengine_us_data/datasets/org/org.py

Lines changed: 145 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
"""Build and cache ORG-style labor-market donor rows from CPS basic-month files.
22
33
The checked-in code does not vendor the donor file itself. `ORG_FILENAME` is a
4-
generated cache built from the 12 official CPS basic monthly public-use CSVs for
5-
`ORG_YEAR`, then reused as the donor sample for wage, hourly-pay, and union
6-
imputation onto CPS records.
4+
generated cache built from the 12 official CPS basic monthly public-use files
5+
for `ORG_YEAR`, then reused as the donor sample for wage, hourly-pay, and
6+
union imputation onto CPS records.
77
"""
88

99
from contextlib import contextmanager
1010
from functools import lru_cache
1111
from io import BytesIO
12+
from io import TextIOWrapper
1213
from pathlib import Path
1314
import fcntl
15+
import zipfile
1416

1517
from microimpute.models.qrf import QRF
1618
import numpy as np
@@ -52,6 +54,44 @@
5254
"peio1cow",
5355
]
5456

57+
# Field order in the 2024 CPS basic-month fixed-width dat file.
58+
CPS_BASIC_MONTHLY_ORG_FWF_COLUMNS = [
59+
"HRMIS",
60+
"gestfips",
61+
"prtage",
62+
"pesex",
63+
"ptdtrace",
64+
"pehspnon",
65+
"pemlr",
66+
"pehruslt",
67+
"peio1cow",
68+
"prerelg",
69+
"peernhry",
70+
"pternhly",
71+
"pternwa",
72+
"pworwgt",
73+
]
74+
75+
# 2024 Basic CPS record layout positions for the ORG columns we need, in file order.
76+
# The CSV endpoint sometimes returns challenge HTML in Modal, so the ZIP'd
77+
# fixed-width dat fallback is the reliable source of truth.
78+
CPS_BASIC_MONTHLY_ORG_COLSPECS = [
79+
(62, 64), # HRMIS 63-64
80+
(92, 94), # GESTFIPS 93-94
81+
(121, 123), # PRTAGE 122-123
82+
(128, 130), # PESEX 129-130
83+
(138, 140), # PTDTRACE 139-140
84+
(156, 158), # PEHSPNON 157-158
85+
(179, 181), # PEMLR 180-181
86+
(223, 226), # PEHRUSLT 224-226
87+
(430, 433), # PEIO1COW 432-433
88+
(497, 499), # PRERELG 498-499
89+
(505, 507), # PEERNHRY 506-507
90+
(519, 523), # PTERNHLY 520-523
91+
(526, 534), # PTERNWA 527-534
92+
(602, 612), # PWORWGT 603-612
93+
]
94+
5595
ORG_PREDICTORS = [
5696
"employment_income",
5797
"weekly_hours_worked",
@@ -186,6 +226,14 @@ def _cps_basic_org_month_url(year: int, month: str) -> str:
186226
)
187227

188228

229+
def _cps_basic_org_month_zip_url(year: int, month: str) -> str:
230+
year_suffix = str(year)[-2:]
231+
return (
232+
f"https://www2.census.gov/programs-surveys/cps/datasets/"
233+
f"{year}/basic/{month}{year_suffix}pub.zip"
234+
)
235+
236+
189237
def _resolve_cps_basic_org_column_names(
190238
columns: pd.Index | list[str],
191239
) -> list[str]:
@@ -213,13 +261,20 @@ def _select_cps_basic_org_columns(month_df: pd.DataFrame) -> pd.DataFrame:
213261
return selected
214262

215263

216-
def _load_cps_basic_org_month(
264+
def _coerce_cps_basic_org_numeric_columns(
265+
month_df: pd.DataFrame,
266+
) -> pd.DataFrame:
267+
"""Convert ORG columns into numeric values regardless of source format."""
268+
return month_df.apply(lambda column: pd.to_numeric(column, errors="coerce"))
269+
270+
271+
def _load_cps_basic_org_month_from_csv(
217272
year: int,
218273
month: str,
219274
*,
220275
max_attempts: int = 3,
221276
) -> pd.DataFrame:
222-
"""Load one CPS basic-month file with light retry around transient fetch/parser issues."""
277+
"""Load one CPS basic-month file from the CSV endpoint."""
223278
url = _cps_basic_org_month_url(year, month)
224279
last_error: Exception | None = None
225280

@@ -235,16 +290,99 @@ def _load_cps_basic_org_month(
235290
usecols=selected_columns,
236291
low_memory=False,
237292
)
238-
return _select_cps_basic_org_columns(month_df)
293+
return _coerce_cps_basic_org_numeric_columns(
294+
_select_cps_basic_org_columns(month_df)
295+
)
239296
except Exception as error:
240297
last_error = error
241298

242299
raise ValueError(
243-
f"Failed to load CPS basic ORG month {month} {year} after "
300+
f"Failed to load CPS basic ORG month {month} {year} from CSV after "
244301
f"{max_attempts} attempts"
245302
) from last_error
246303

247304

305+
def _load_cps_basic_org_month_from_zip(
306+
year: int,
307+
month: str,
308+
*,
309+
max_attempts: int = 3,
310+
) -> pd.DataFrame:
311+
"""Load one CPS basic-month file from the ZIP'd fixed-width dat archive."""
312+
url = _cps_basic_org_month_zip_url(year, month)
313+
last_error: Exception | None = None
314+
315+
for _ in range(max_attempts):
316+
try:
317+
response = requests.get(url, timeout=60)
318+
response.raise_for_status()
319+
with zipfile.ZipFile(BytesIO(response.content)) as archive:
320+
dat_name = next(
321+
(
322+
name
323+
for name in archive.namelist()
324+
if name.lower().endswith(".dat")
325+
),
326+
None,
327+
)
328+
if dat_name is None:
329+
raise ValueError(
330+
"CPS basic ORG month zip did not contain a .dat file"
331+
)
332+
with (
333+
archive.open(dat_name) as raw,
334+
TextIOWrapper(
335+
raw,
336+
encoding="utf-8",
337+
newline="",
338+
) as text,
339+
):
340+
month_df = pd.read_fwf(
341+
text,
342+
colspecs=CPS_BASIC_MONTHLY_ORG_COLSPECS,
343+
names=CPS_BASIC_MONTHLY_ORG_FWF_COLUMNS,
344+
header=None,
345+
dtype=str,
346+
)
347+
return _coerce_cps_basic_org_numeric_columns(
348+
_select_cps_basic_org_columns(month_df)
349+
)
350+
except Exception as error:
351+
last_error = error
352+
353+
raise ValueError(
354+
f"Failed to load CPS basic ORG month {month} {year} from ZIP after "
355+
f"{max_attempts} attempts"
356+
) from last_error
357+
358+
359+
def _load_cps_basic_org_month(
360+
year: int,
361+
month: str,
362+
*,
363+
max_attempts: int = 3,
364+
) -> pd.DataFrame:
365+
"""Load one CPS basic-month file, preferring CSV and falling back to ZIP."""
366+
try:
367+
return _load_cps_basic_org_month_from_csv(
368+
year,
369+
month,
370+
max_attempts=max_attempts,
371+
)
372+
except Exception as csv_error:
373+
try:
374+
return _load_cps_basic_org_month_from_zip(
375+
year,
376+
month,
377+
max_attempts=max_attempts,
378+
)
379+
except Exception as zip_error:
380+
raise ValueError(
381+
f"Failed to load CPS basic ORG month {month} {year} from CSV or ZIP: "
382+
f"csv={csv_error!r}; zip={zip_error!r}"
383+
) from zip_error
384+
385+
248386
@contextmanager
249387
def _org_cache_build_lock(lock_path: Path):
250388
lock_path.parent.mkdir(parents=True, exist_ok=True)

tests/conftest.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,12 @@
11
"""Shared fixtures and helpers for version manifest tests."""
22

3+
from __future__ import annotations
4+
35
import json
46
from unittest.mock import MagicMock
57

68
import pytest
79

8-
from policyengine_us_data.utils.version_manifest import (
9-
HFVersionInfo,
10-
GCSVersionInfo,
11-
VersionManifest,
12-
VersionRegistry,
13-
)
14-
from policyengine_us_data.utils.policyengine import PolicyEngineUSBuildInfo
15-
1610
# -- Fixtures ------------------------------------------------------
1711

1812

@@ -27,6 +21,8 @@ def sample_generations() -> dict[str, int]:
2721

2822
@pytest.fixture
2923
def sample_hf_info() -> HFVersionInfo:
24+
from policyengine_us_data.utils.version_manifest import HFVersionInfo
25+
3026
return HFVersionInfo(
3127
repo="policyengine/policyengine-us-data",
3228
commit="abc123def456",
@@ -35,6 +31,8 @@ def sample_hf_info() -> HFVersionInfo:
3531

3632
@pytest.fixture
3733
def sample_policyengine_us_info() -> PolicyEngineUSBuildInfo:
34+
from policyengine_us_data.utils.policyengine import PolicyEngineUSBuildInfo
35+
3836
return PolicyEngineUSBuildInfo(
3937
version="1.587.0",
4038
locked_version="1.587.0",
@@ -49,6 +47,11 @@ def sample_manifest(
4947
sample_hf_info: HFVersionInfo,
5048
sample_policyengine_us_info: PolicyEngineUSBuildInfo,
5149
) -> VersionManifest:
50+
from policyengine_us_data.utils.version_manifest import (
51+
GCSVersionInfo,
52+
VersionManifest,
53+
)
54+
5255
return VersionManifest(
5356
version="1.72.3",
5457
created_at="2026-03-10T14:30:00Z",
@@ -66,6 +69,8 @@ def sample_registry(
6669
sample_manifest: VersionManifest,
6770
) -> VersionRegistry:
6871
"""A registry with one version entry."""
72+
from policyengine_us_data.utils.version_manifest import VersionRegistry
73+
6974
return VersionRegistry(
7075
current="1.72.3",
7176
versions=[sample_manifest],

0 commit comments

Comments
 (0)