66imputation onto CPS records.
77"""
88
9+ from contextlib import contextmanager
910from functools import lru_cache
11+ from io import BytesIO
12+ from pathlib import Path
13+ import fcntl
1014
1115from microimpute .models .qrf import QRF
1216import numpy as np
1317import pandas as pd
18+ import requests
1419
1520from policyengine_us_data .storage import STORAGE_FOLDER
1621
@@ -181,11 +186,13 @@ def _cps_basic_org_month_url(year: int, month: str) -> str:
181186 )
182187
183188
184- def _select_cps_basic_org_columns (month_df : pd .DataFrame ) -> pd .DataFrame :
185- """Normalize CPS basic-month columns onto the ORG schema."""
189+ def _resolve_cps_basic_org_column_names (
190+ columns : pd .Index | list [str ],
191+ ) -> list [str ]:
192+ """Resolve CPS basic-month columns onto the expected ORG schema order."""
186193 column_lookup = {
187- str (column ).lower (): column
188- for column in month_df . columns
194+ str (column ).lower (): str ( column )
195+ for column in columns
189196 if isinstance (column , str )
190197 }
191198 missing = [
@@ -196,35 +203,14 @@ def _select_cps_basic_org_columns(month_df: pd.DataFrame) -> pd.DataFrame:
196203 if missing :
197204 raise ValueError (f"CPS basic ORG month is missing required columns: { missing } " )
198205
199- selected = month_df [
200- [column_lookup [column .lower ()] for column in CPS_BASIC_MONTHLY_ORG_COLUMNS ]
201- ].copy ()
202- selected .columns = CPS_BASIC_MONTHLY_ORG_COLUMNS
203- return selected
204-
206+ return [column_lookup [column .lower ()] for column in CPS_BASIC_MONTHLY_ORG_COLUMNS ]
205207
206- def _resolve_cps_basic_org_usecols (url : str ) -> list [str ]:
207- """Resolve the exact remote column names before reading the full CPS month.
208208
209- Pandas' callable `usecols` path against remote CSVs can intermittently
210- mis-handle the header row and return an empty selection. Resolving the
211- concrete header first avoids that parser path while keeping the full read
212- column-limited.
213- """
214- header_df = pd .read_csv (url , nrows = 0 )
215- column_lookup = {
216- str (column ).lower (): column
217- for column in header_df .columns
218- if isinstance (column , str )
219- }
220- missing = [
221- column
222- for column in CPS_BASIC_MONTHLY_ORG_COLUMNS
223- if column .lower () not in column_lookup
224- ]
225- if missing :
226- raise ValueError (f"CPS basic ORG month is missing required columns: { missing } " )
227- return [column_lookup [column .lower ()] for column in CPS_BASIC_MONTHLY_ORG_COLUMNS ]
209+ def _select_cps_basic_org_columns (month_df : pd .DataFrame ) -> pd .DataFrame :
210+ """Normalize CPS basic-month columns onto the ORG schema."""
211+ selected = month_df [_resolve_cps_basic_org_column_names (month_df .columns )].copy ()
212+ selected .columns = CPS_BASIC_MONTHLY_ORG_COLUMNS
213+ return selected
228214
229215
230216def _load_cps_basic_org_month (
@@ -239,10 +225,14 @@ def _load_cps_basic_org_month(
239225
240226 for _ in range (max_attempts ):
241227 try :
242- usecols = _resolve_cps_basic_org_usecols (url )
228+ response = requests .get (url , timeout = 60 )
229+ response .raise_for_status ()
230+ content = response .content
231+ header = pd .read_csv (BytesIO (content ), nrows = 0 )
232+ selected_columns = _resolve_cps_basic_org_column_names (header .columns )
243233 month_df = pd .read_csv (
244- url ,
245- usecols = usecols ,
234+ BytesIO ( content ) ,
235+ usecols = selected_columns ,
246236 low_memory = False ,
247237 )
248238 return _select_cps_basic_org_columns (month_df )
@@ -255,6 +245,36 @@ def _load_cps_basic_org_month(
255245 ) from last_error
256246
257247
248+ @contextmanager
249+ def _org_cache_build_lock (lock_path : Path ):
250+ lock_path .parent .mkdir (parents = True , exist_ok = True )
251+ with open (lock_path , "w" ) as lock_file :
252+ fcntl .flock (lock_file .fileno (), fcntl .LOCK_EX )
253+ try :
254+ yield
255+ finally :
256+ fcntl .flock (lock_file .fileno (), fcntl .LOCK_UN )
257+
258+
259+ def _load_valid_cached_org_training_data (cache_path : Path ) -> pd .DataFrame | None :
260+ """Return a cached ORG training frame when it is present and structurally valid."""
261+ required_columns = set (
262+ ORG_PREDICTORS + ORG_QRF_IMPUTED_VARIABLES + ["sample_weight" ]
263+ )
264+ try :
265+ cached = pd .read_csv (cache_path )
266+ except (FileNotFoundError , OSError , pd .errors .EmptyDataError ):
267+ return None
268+
269+ if cached .empty :
270+ return None
271+
272+ if not required_columns .issubset (cached .columns ):
273+ return None
274+
275+ return cached
276+
277+
258278def _transform_cps_basic_org_month (month_df : pd .DataFrame ) -> pd .DataFrame :
259279 """Convert one monthly CPS basic file into ORG donor rows.
260280
@@ -473,17 +493,31 @@ def _predict_union_coverage_from_bls_tables(
473493def load_org_training_data () -> pd .DataFrame :
474494 """Load ORG donor rows built from official CPS basic monthly files."""
475495 cache_path = STORAGE_FOLDER / ORG_FILENAME
476- if cache_path .exists ():
477- return pd .read_csv (cache_path )
478-
479- months = []
480- for month in ORG_MONTHS :
481- month_df = _load_cps_basic_org_month (ORG_YEAR , month )
482- months .append (_transform_cps_basic_org_month (month_df ))
483-
484- org = pd .concat (months , ignore_index = True )
485- org .to_csv (cache_path , index = False , compression = "gzip" )
486- return org
496+ lock_path = cache_path .parent / f"{ cache_path .name } .lock"
497+ cached = _load_valid_cached_org_training_data (cache_path )
498+ if cached is not None :
499+ return cached
500+
501+ with _org_cache_build_lock (lock_path ):
502+ cached = _load_valid_cached_org_training_data (cache_path )
503+ if cached is not None :
504+ return cached
505+ if cache_path .exists ():
506+ cache_path .unlink ()
507+
508+ months = []
509+ for month in ORG_MONTHS :
510+ month_df = _load_cps_basic_org_month (ORG_YEAR , month )
511+ months .append (_transform_cps_basic_org_month (month_df ))
512+
513+ org = pd .concat (months , ignore_index = True )
514+ temp_path = cache_path .parent / f"{ cache_path .name } .tmp.gz"
515+ org .to_csv (temp_path , index = False , compression = "gzip" )
516+ temp_path .replace (cache_path )
517+ cached = _load_valid_cached_org_training_data (cache_path )
518+ if cached is None :
519+ raise ValueError ("Failed to build a valid cached ORG donor file" )
520+ return cached
487521
488522
489523@lru_cache (maxsize = 1 )
0 commit comments