diff --git a/AGENTS.md b/AGENTS.md index 73a77e779..46b7f9c85 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -23,6 +23,9 @@ read `docs/engineering/skills/pipeline_operations.md`. When adding, changing, or reviewing calibration target definitions, read `docs/engineering/skills/calibration_targets.md`. +When adding, changing, or reviewing donor-survey imputations, read +`docs/engineering/skills/imputation.md`. + ## Calibration targets Manually sourced national or local-file calibration targets must be registered diff --git a/changelog.d/1103.changed b/changelog.d/1103.changed new file mode 100644 index 000000000..88f4a2a49 --- /dev/null +++ b/changelog.d/1103.changed @@ -0,0 +1 @@ +Use target-specific source-quality filters for ACS and SIPP imputations. diff --git a/docs/engineering/skills/README.md b/docs/engineering/skills/README.md index 06ddfba94..080210f08 100644 --- a/docs/engineering/skills/README.md +++ b/docs/engineering/skills/README.md @@ -14,6 +14,8 @@ Current skills: notes. - `github-prs.md`: same-repository PR workflow, PR head verification, and title conventions. +- `imputation.md`: donor-survey imputation provenance rules, including + target-level exclusion of allocated source values. - `pipeline_docs.md`: decorator-backed pipeline map maintenance and generated pydoc-style artifacts. - `pipeline_operations.md`: model-neutral workflow for diagnosing deployed Modal diff --git a/docs/engineering/skills/imputation.md b/docs/engineering/skills/imputation.md new file mode 100644 index 000000000..5ec7fb30d --- /dev/null +++ b/docs/engineering/skills/imputation.md @@ -0,0 +1,36 @@ +# Imputation + +Use this guide when adding, changing, or reviewing donor-survey imputations. + +## Source Provenance + +Do not train an imputation target on donor rows whose source value for that +target is itself allocated, hot-decked, edited, or imputed by the source survey. +Wire source-survey allocation or quality flags into the training frame whenever +the donor file exposes them. + +Apply this rule at the target-variable level, not the donor-row level. A donor +row with observed tip income but allocated bank-account assets can train +`tip_income`; the same row must be excluded from the `bank_account_assets` +training target. Use `policyengine_us_data.utils.source_quality` to build +target masks, then pass them to `microimpute` through `target_filters` or +`row_filter` so the filtering logic lives in the imputation library rather than +in one-off model wrappers. + +Do not drop final CPS, ECPS, or calibration records solely because a donor +survey target was excluded from training. The exclusion applies to donor +training rows only; recipient datasets should remain complete. + +When a donor source lacks target-level quality flags, document that limitation +near the imputation code and keep the training surface structured so flags can +be added later. + +## Tests + +Add focused regression tests when adding a donor imputation or a source-quality +flag: + +- allocation flags are read from the donor source, +- allocated source values are excluded for the affected target, +- unrelated observed targets from the same row can still train, and +- legacy and current imputation surfaces use the same target provenance rule. diff --git a/policyengine_us_data/calibration/puf_impute.py b/policyengine_us_data/calibration/puf_impute.py index 6c71a42ad..f8dd58fa9 100644 --- a/policyengine_us_data/calibration/puf_impute.py +++ b/policyengine_us_data/calibration/puf_impute.py @@ -325,6 +325,29 @@ def _qrf_ss_shares( for sub in shares: shares[sub] = np.where(total > 0, shares[sub] / total, 0.0) + if ( + "age" in data + and "social_security_retirement" in shares + and "social_security_disability" in shares + ): + # Preserve QRF survivor/dependent predictions, but anchor the + # retirement-vs-disability split to the same age rule as the fallback. + age = data["age"][time_period][:n_cps][puf_has_ss] + is_old = age >= MINIMUM_RETIREMENT_AGE + retirement_or_disability = ( + shares["social_security_retirement"] + shares["social_security_disability"] + ) + shares["social_security_retirement"] = np.where( + is_old, + retirement_or_disability, + 0.0, + ) + shares["social_security_disability"] = np.where( + is_old, + 0.0, + retirement_or_disability, + ) + del fitted, predictions gc.collect() diff --git a/policyengine_us_data/calibration/source_impute.py b/policyengine_us_data/calibration/source_impute.py index 1e002e02a..027acf8ff 100644 --- a/policyengine_us_data/calibration/source_impute.py +++ b/policyengine_us_data/calibration/source_impute.py @@ -26,11 +26,13 @@ """ import gc +import h5py import logging from typing import Dict, Optional import numpy as np import pandas as pd +from microimpute.models.qrf import QRF from policyengine_us_data.datasets.cps.tipped_occupation import ( derive_any_treasury_tipped_occupation_code, derive_is_tipped_occupation, @@ -38,6 +40,12 @@ from policyengine_us_data.datasets.sipp.sipp import ( ASSET_JOB_EARNINGS_COLUMNS, ASSET_PREDICTORS, + SIPP_ASSET_ALLOCATION_COLUMNS, + SIPP_ASSET_TARGET_ALLOCATION_COLUMNS, + SIPP_ASSET_TARGET_SOURCE_COLUMNS, + SIPP_TIP_AMOUNT_COLUMNS, + SIPP_TIP_AMOUNT_TO_ALLOCATION_COLUMN, + SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS, SSI_DISABILITY_MODEL_VARIABLE, VEHICLE_MODEL_PREDICTORS, build_vehicle_training_frame, @@ -72,6 +80,11 @@ ) from policyengine_us_data.pipeline_metadata import pipeline_node from policyengine_us_data.pipeline_schema import PipelineNode +from policyengine_us_data.utils.source_quality import ( + cap_training_sample, + require_columns_present, + target_observed_source_masks, +) logger = logging.getLogger(__name__) @@ -79,6 +92,10 @@ "rent", "real_estate_taxes", ] +ACS_TARGET_ALLOCATION_COLUMNS = { + "rent": ["rent_is_allocated"], + "real_estate_taxes": ["real_estate_taxes_is_allocated"], +} SIPP_IMPUTED_VARIABLES = [ "tip_income", @@ -504,7 +521,6 @@ def _impute_acs( Returns: Updated data dict. """ - from microimpute.models.qrf import QRF from policyengine_us import Microsimulation from policyengine_us_data.datasets.acs.acs import ACS_2022 @@ -518,8 +534,22 @@ def _impute_acs( acs_df["state_fips"] = acs.calculate("state_fips", map_to="person").values.astype( np.float32 ) + required_acs_flags = [ + column + for columns in ACS_TARGET_ALLOCATION_COLUMNS.values() + for column in columns + ] + with h5py.File(ACS_2022.file_path, "r") as acs_h5: + require_columns_present( + acs_h5, + required_acs_flags, + source_name="ACS_2022 artifact", + ) + for flag_columns in ACS_TARGET_ALLOCATION_COLUMNS.values(): + for flag_column in flag_columns: + acs_df[flag_column] = np.asarray(acs_h5[flag_column], dtype=bool) - train_df = acs_df[acs_df.is_household_head].sample(10_000, random_state=42) + train_df = acs_df[acs_df.is_household_head].copy() train_df = _encode_tenure_type(train_df) del acs @@ -545,17 +575,28 @@ def _impute_acs( ) cps_heads = cps_df[mask] - qrf = QRF() logger.info( "ACS QRF: %d train, %d test, %d predictors", len(train_df), len(cps_heads), len(predictors), ) - fitted = qrf.fit( + acs_target_filters = target_observed_source_masks( + train_df, + targets=ACS_IMPUTED_VARIABLES, + target_allocation_flag_columns=ACS_TARGET_ALLOCATION_COLUMNS, + ) + train_df, acs_target_filters = cap_training_sample( + train_df, + max_train_samples=10_000, + seed_name="calibration_acs_source_imputation_training_sample", + target_filters=acs_target_filters, + ) + fitted = QRF().fit( X_train=train_df, predictors=predictors, imputed_variables=ACS_IMPUTED_VARIABLES, + target_filters=acs_target_filters, ) predictions = fitted.predict(X_test=cps_heads) @@ -606,12 +647,8 @@ def _impute_sipp( Updated data dict. """ from huggingface_hub import hf_hub_download - from microimpute.models.qrf import QRF - from policyengine_us_data.storage import STORAGE_FOLDER - rng = np.random.default_rng(seed=88) - hf_hub_download( repo_id="PolicyEngine/policyengine-us-data", filename="pu2023_slim.csv", @@ -620,12 +657,18 @@ def _impute_sipp( ) sipp_df = pd.read_csv(STORAGE_FOLDER / "pu2023_slim.csv") - sipp_df["tip_income"] = ( - sipp_df[sipp_df.columns[sipp_df.columns.str.contains("TXAMT")]] - .fillna(0) - .sum(axis=1) - * 12 + tip_amount_columns = [ + column for column in SIPP_TIP_AMOUNT_COLUMNS if column in sipp_df + ] + tip_allocation_columns = [ + SIPP_TIP_AMOUNT_TO_ALLOCATION_COLUMN[column] for column in tip_amount_columns + ] + require_columns_present( + sipp_df.columns, + tip_allocation_columns, + source_name="SIPP slim tip donor file", ) + sipp_df["tip_income"] = sipp_df[tip_amount_columns].fillna(0).sum(axis=1) * 12 sipp_df["employment_income"] = sipp_df.TPTOTINC * 12 sipp_df["age"] = sipp_df.TAGE sipp_df["household_weight"] = sipp_df.WPFINWGT @@ -637,6 +680,8 @@ def _impute_sipp( sipp_df["treasury_tipped_occupation_code"] ) + if "MONTHCODE" in sipp_df: + sipp_df = sipp_df[sipp_df["MONTHCODE"] == 12].copy() sipp_df["is_under_18"] = sipp_df.TAGE < 18 sipp_df["is_under_6"] = sipp_df.TAGE < 6 sipp_df["count_under_18"] = ( @@ -646,6 +691,14 @@ def _impute_sipp( sipp_df.groupby("SSUID")["is_under_6"].sum().loc[sipp_df.SSUID.values].values ) + tip_target_filters = target_observed_source_masks( + sipp_df, + targets=["tip_income"], + target_source_columns={"tip_income": tip_amount_columns}, + target_allocation_flag_columns={"tip_income": tip_allocation_columns}, + require_nonmissing_source=False, + ) + tip_cols = [ "household_id", "employment_income", @@ -657,14 +710,12 @@ def _impute_sipp( "household_weight", ] tip_train = sipp_df[tip_cols].dropna() - tip_train = tip_train.loc[ - rng.choice( - tip_train.index, - size=min(10_000, len(tip_train)), - replace=True, - p=(tip_train.household_weight / tip_train.household_weight.sum()), - ) - ] + tip_train, tip_target_filters = cap_training_sample( + tip_train, + max_train_samples=10_000, + seed_name="calibration_sipp_tip_training_sample", + target_filters=tip_target_filters, + ) cps_tip_df = _build_cps_receiver( data, time_period, dataset_path, ["employment_income", "age"] @@ -691,16 +742,17 @@ def _impute_sipp( else: cps_tip_df["is_tipped_occupation"] = 0.0 - qrf = QRF() logger.info( "SIPP tips QRF: %d train, %d test", len(tip_train), len(cps_tip_df), ) - fitted = qrf.fit( + fitted = QRF().fit( X_train=tip_train, predictors=SIPP_TIPS_PREDICTORS, imputed_variables=["tip_income"], + target_filters=tip_target_filters, + weight_col="household_weight", ) tip_preds = fitted.predict(X_test=cps_tip_df) data["tip_income"] = { @@ -719,24 +771,28 @@ def _impute_sipp( repo_type="model", local_dir=STORAGE_FOLDER, ) - asset_cols = [ - "SSUID", - "PNUM", - "MONTHCODE", - "WPFINWGT", - "TAGE", - "ESEX", - "EMS", - "TSSSAMT", - "TRETINCAMT", - "TVAL_BANK", - "TVAL_STMF", - "TVAL_BOND", - "TINC_BANK", - "TINC_STMF", - "TINC_BOND", - "TINC_RENT", - ] + ASSET_JOB_EARNINGS_COLUMNS + asset_cols = ( + [ + "SSUID", + "PNUM", + "MONTHCODE", + "WPFINWGT", + "TAGE", + "ESEX", + "EMS", + "TSSSAMT", + "TRETINCAMT", + "TVAL_BANK", + "TVAL_STMF", + "TVAL_BOND", + "TINC_BANK", + "TINC_STMF", + "TINC_BOND", + "TINC_RENT", + ] + + ASSET_JOB_EARNINGS_COLUMNS + + SIPP_ASSET_ALLOCATION_COLUMNS + ) asset_df = pd.read_csv( STORAGE_FOLDER / "pu2023.csv", delimiter="|", @@ -751,16 +807,14 @@ def _impute_sipp( "bond_assets", "household_weight", *SIPP_ASSETS_PREDICTORS, + *[ + column + for columns in SIPP_ASSET_TARGET_SOURCE_COLUMNS.values() + for column in columns + ], + *SIPP_ASSET_ALLOCATION_COLUMNS, ] - asset_train = asset_df[asset_train_cols].dropna() - asset_train = asset_train.loc[ - rng.choice( - asset_train.index, - size=min(20_000, len(asset_train)), - replace=True, - p=(asset_train.household_weight / asset_train.household_weight.sum()), - ) - ] + asset_train = asset_df[asset_train_cols].copy() cps_asset_df = _build_cps_receiver( data, @@ -789,16 +843,29 @@ def _impute_sipp( "stock_assets", "bond_assets", ] - qrf = QRF() + asset_target_filters = target_observed_source_masks( + asset_train, + targets=asset_vars, + target_source_columns=SIPP_ASSET_TARGET_SOURCE_COLUMNS, + target_allocation_flag_columns=SIPP_ASSET_TARGET_ALLOCATION_COLUMNS, + ) + asset_train, asset_target_filters = cap_training_sample( + asset_train, + max_train_samples=20_000, + seed_name="calibration_sipp_asset_training_sample", + target_filters=asset_target_filters, + ) logger.info( "SIPP assets QRF: %d train, %d test", len(asset_train), len(cps_asset_df), ) - fitted = qrf.fit( + fitted = QRF().fit( X_train=asset_train, predictors=SIPP_ASSETS_PREDICTORS, imputed_variables=asset_vars, + target_filters=asset_target_filters, + weight_col="household_weight", ) asset_preds = fitted.predict(X_test=cps_asset_df) @@ -889,17 +956,6 @@ def _impute_sipp( logger.info("SIPP SSI disability criteria imputation complete") vehicle_train = build_vehicle_training_frame() - vehicle_train = vehicle_train.loc[ - rng.choice( - vehicle_train.index, - size=min(20_000, len(vehicle_train)), - replace=True, - p=( - vehicle_train.household_weight - / vehicle_train.household_weight.sum() - ), - ) - ] cps_vehicle_df = _build_cps_receiver( data, @@ -943,19 +999,32 @@ def _impute_sipp( tenure_type=data.get("tenure_type", {}).get(time_period), ) - qrf = QRF() logger.info( "SIPP vehicle QRF: %d train, %d test", len(vehicle_train), len(vehicle_receiver), ) - fitted = qrf.fit( + vehicle_vars = [ + "household_vehicles_owned", + "household_vehicles_value", + ] + vehicle_target_filters = target_observed_source_masks( + vehicle_train, + targets=vehicle_vars, + target_allocation_flag_columns=SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS, + ) + vehicle_train, vehicle_target_filters = cap_training_sample( + vehicle_train, + max_train_samples=20_000, + seed_name="calibration_sipp_vehicle_training_sample", + target_filters=vehicle_target_filters, + ) + fitted = QRF().fit( X_train=vehicle_train, predictors=VEHICLE_MODEL_PREDICTORS, - imputed_variables=[ - "household_vehicles_owned", - "household_vehicles_value", - ], + imputed_variables=vehicle_vars, + target_filters=vehicle_target_filters, + weight_col="household_weight", ) vehicle_preds = fitted.predict(X_test=vehicle_receiver) data["household_vehicles_owned"] = { diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index 8bbc39af9..dd1a3c5d9 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -5,16 +5,33 @@ construct_tax_units_acs, ) from policyengine_us_data.storage import STORAGE_FOLDER +from policyengine_us_data.utils.source_quality import require_columns_present from pandas import DataFrame import numpy as np import pandas as pd +ACS_SOURCE_QUALITY_DATASETS = ( + "rent_is_allocated", + "real_estate_taxes_is_allocated", +) + class ACS(Dataset): data_format = Dataset.ARRAYS time_period = None census_acs = None + @property + def exists(self) -> bool: + """Treat stale ACS artifacts without source-quality flags as absent.""" + if not self.file_path.exists(): + return False + try: + with h5py.File(self.file_path, mode="r") as acs: + return all(column in acs for column in ACS_SOURCE_QUALITY_DATASETS) + except OSError: + return False + def generate(self) -> None: """Generates the ACS dataset.""" @@ -76,12 +93,34 @@ def add_person_variables( .loc[person["household_id"]][["RNTP", "TAXAMT"]] .values ) + allocation_flag_columns = [ + ("FRNTP", "rent_is_allocated"), + ("FTAXP", "real_estate_taxes_is_allocated"), + ] + require_columns_present( + household.columns, + [source_flag for source_flag, _ in allocation_flag_columns], + source_name="raw Census ACS household table", + ) + for source_flag, output_flag in allocation_flag_columns: + person[output_flag] = ( + household.set_index("household_id") + .loc[person["household_id"]][source_flag] + .fillna(0) + .astype(int) + .ne(0) + .values + ) acs["is_household_head"] = person.SPORDER == 1 factor = person.SPORDER == 1 person.rent *= factor * 12 person.real_estate_taxes *= factor acs["rent"] = person.rent acs["real_estate_taxes"] = person.real_estate_taxes + acs["rent_is_allocated"] = person.rent_is_allocated & factor + acs["real_estate_taxes_is_allocated"] = ( + person.real_estate_taxes_is_allocated & factor + ) acs["tenure_type"] = ( household.TEN.astype(int) .map( @@ -107,7 +146,7 @@ class ACS_2022(ACS): time_period = 2022 file_path = STORAGE_FOLDER / "acs_2022.h5" census_acs = CensusACS_2022 - url = "release://PolicyEngine/policyengine-us-data/1.13.0/acs_2022.h5" + url = None if __name__ == "__main__": diff --git a/policyengine_us_data/datasets/acs/census_acs.py b/policyengine_us_data/datasets/acs/census_acs.py index cc913115c..583cf4eea 100644 --- a/policyengine_us_data/datasets/acs/census_acs.py +++ b/policyengine_us_data/datasets/acs/census_acs.py @@ -56,17 +56,42 @@ "GASP", # Gas monthly cost "RMSP", # Number of rooms "RNTP", # Monthly rent + "FRNTP", # Monthly rent allocation flag "TEN", # Tenure "VEH", # Number of vehicles "FINCP", # Total income "GRNTP", # Gross rent "TAXAMT", # Property taxes + "FTAXP", # Property taxes allocation flag ] +RAW_HOUSEHOLD_SOURCE_QUALITY_COLUMNS = ("FRNTP", "FTAXP") + class CensusACS(Dataset): data_format = Dataset.TABLES + @property + def exists(self) -> bool: + """Treat stale raw ACS caches without allocation flags as absent.""" + if not self.file_path.exists(): + return False + try: + with pd.HDFStore(self.file_path, mode="r") as storage: + if "/household" not in storage.keys(): + return False + storer = storage.get_storer("household") + columns = [ + column + for _, axis_columns in getattr(storer, "non_index_axes", []) + for column in axis_columns + ] + if not columns: + columns = list(storage["household"].columns) + except (OSError, KeyError, ValueError): + return False + return set(RAW_HOUSEHOLD_SOURCE_QUALITY_COLUMNS) <= set(columns) + def generate(self) -> None: person_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{self.time_period}/1-Year/csv_pus.zip" household_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{self.time_period}/1-Year/csv_hus.zip" diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index c5f7c2e95..6a691ec4f 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -72,6 +72,16 @@ ) from policyengine_us_data.pipeline_metadata import pipeline_node from policyengine_us_data.pipeline_schema import PipelineNode +from policyengine_us_data.utils.source_quality import ( + cap_training_sample, + require_columns_present, + target_observed_source_masks, +) + +ACS_RENT_TARGET_ALLOCATION_COLUMNS = { + "rent": ["rent_is_allocated"], + "real_estate_taxes": ["real_estate_taxes_is_allocated"], +} CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP = { "reported_has_direct_purchase_health_coverage_at_interview": "NOW_DIR", @@ -409,28 +419,52 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): # H5; for CPS we use the in-memory dict (already populated upstream in # add_id_variables). Remove both overrides once pyproject.toml's # policyengine-core upper bound is lifted. + required_acs_flags = [ + column + for columns in ACS_RENT_TARGET_ALLOCATION_COLUMNS.values() + for column in columns + ] with h5py.File(ACS_2022.file_path, "r") as acs_h5: train_df["is_household_head"] = np.asarray( acs_h5["is_household_head"], dtype=bool ) + require_columns_present( + acs_h5, + required_acs_flags, + source_name="ACS_2022 artifact", + ) + for flag_columns in ACS_RENT_TARGET_ALLOCATION_COLUMNS.values(): + for flag_column in flag_columns: + train_df[flag_column] = np.asarray(acs_h5[flag_column], dtype=bool) train_df.tenure_type = train_df.tenure_type.map( { "OWNED_OUTRIGHT": "OWNED_WITH_MORTGAGE", }, na_action="ignore", ).fillna(train_df.tenure_type) - train_df = train_df[train_df.is_household_head].sample(10_000) + train_df = train_df[train_df.is_household_head].copy() inference_df = cps_sim.calculate_dataframe(PREDICTORS, map_to="person") inference_df["is_household_head"] = np.asarray(cps["is_household_head"], dtype=bool) mask = inference_df.is_household_head.values inference_df = inference_df[mask] - qrf = QRF() logging.info("Training imputation model for rent and real estate taxes.") - fitted_model = qrf.fit( + rent_target_filters = target_observed_source_masks( + train_df, + targets=IMPUTATIONS, + target_allocation_flag_columns=ACS_RENT_TARGET_ALLOCATION_COLUMNS, + ) + train_df, rent_target_filters = cap_training_sample( + train_df, + max_train_samples=10_000, + seed_name="legacy_acs_rent_training_sample", + target_filters=rent_target_filters, + ) + fitted_model = QRF().fit( X_train=train_df, predictors=PREDICTORS, imputed_variables=IMPUTATIONS, + target_filters=rent_target_filters, ) logging.info("Imputing rent and real estate taxes.") imputed_values = fitted_model.predict(X_test=inference_df) diff --git a/policyengine_us_data/datasets/sipp/sipp.py b/policyengine_us_data/datasets/sipp/sipp.py index 9d0a10f5a..6fee8fc39 100644 --- a/policyengine_us_data/datasets/sipp/sipp.py +++ b/policyengine_us_data/datasets/sipp/sipp.py @@ -9,9 +9,21 @@ derive_any_treasury_tipped_occupation_code, derive_is_tipped_occupation, ) +from policyengine_us_data.utils.source_quality import ( + cap_training_sample, + filter_observed_source_rows, + require_columns_present, + sipp_allocation_flag_for, + target_observed_source_masks, +) SIPP_JOB_OCCUPATION_COLUMNS = [f"TJB{i}_OCC" for i in range(1, 8)] +SIPP_TIP_AMOUNT_COLUMNS = [f"TJB{i}_TXAMT" for i in range(1, 8)] +SIPP_TIP_AMOUNT_TO_ALLOCATION_COLUMN = { + column: sipp_allocation_flag_for(column) for column in SIPP_TIP_AMOUNT_COLUMNS +} +SIPP_TIP_ALLOCATION_COLUMNS = list(SIPP_TIP_AMOUNT_TO_ALLOCATION_COLUMN.values()) TIP_MODEL_PREDICTORS = [ "employment_income", "age", @@ -114,9 +126,16 @@ def train_tip_model(): # Previously used `str.contains("TXAMT")`, which also picked up # AJB*_TXAMT Census allocation flags (small ints 0/1/2 indicating # imputation status) and added them to the dollar totals. - df["tip_income"] = ( - df[df.columns[df.columns.str.match(r"TJB\d_TXAMT")]].fillna(0).sum(axis=1) * 12 + tip_amount_columns = [column for column in SIPP_TIP_AMOUNT_COLUMNS if column in df] + tip_allocation_columns = [ + SIPP_TIP_AMOUNT_TO_ALLOCATION_COLUMN[column] for column in tip_amount_columns + ] + require_columns_present( + df.columns, + tip_allocation_columns, + source_name="SIPP tip donor file", ) + df["tip_income"] = df[tip_amount_columns].fillna(0).sum(axis=1) * 12 df["employment_income"] = df.TPTOTINC * 12 df["is_under_18"] = (df.TAGE < 18) & (df.MONTHCODE == 12) df["is_under_6"] = (df.TAGE < 6) & (df.MONTHCODE == 12) @@ -147,6 +166,13 @@ def train_tip_model(): # December (end of year) so every training row represents one # person-year. df = df[df["MONTHCODE"] == 12] + tip_target_filters = target_observed_source_masks( + df, + targets=["tip_income"], + target_source_columns={"tip_income": tip_amount_columns}, + target_allocation_flag_columns={"tip_income": tip_allocation_columns}, + require_nonmissing_source=False, + ) sipp = df[ [ @@ -162,18 +188,12 @@ def train_tip_model(): ] sipp = sipp[~sipp.isna().any(axis=1)] - - # Seed the weighted resample so the tip-model training frame (and - # the pickled QRF) is deterministic across rebuilds of the cache. - tip_rng = seeded_rng("sipp_tip_model_training_sample") - sipp = sipp.loc[ - tip_rng.choice( - sipp.index, - size=10_000, - replace=True, - p=sipp.household_weight / sipp.household_weight.sum(), - ) - ] + sipp, tip_target_filters = cap_training_sample( + sipp, + max_train_samples=10_000, + seed_name="sipp_tip_model_training_sample", + target_filters=tip_target_filters, + ) model = QRF() @@ -181,13 +201,15 @@ def train_tip_model(): X_train=sipp, predictors=TIP_MODEL_PREDICTORS, imputed_variables=["tip_income"], + target_filters=tip_target_filters, + weight_col="household_weight", ) return model def get_tip_model() -> QRF: - model_path = STORAGE_FOLDER / "tips_tipped_occ_v2.pkl" + model_path = STORAGE_FOLDER / "tips_tipped_occ_v3.pkl" if not model_path.exists(): model = train_tip_model() @@ -205,29 +227,49 @@ def get_tip_model() -> QRF: # Imputes asset categories separately for policy flexibility ASSET_JOB_EARNINGS_COLUMNS = [f"TJB{i}_MSUM" for i in range(1, 8)] +SIPP_ASSET_TARGET_SOURCE_COLUMNS = { + "bank_account_assets": ["TVAL_BANK"], + "stock_assets": ["TVAL_STMF"], + "bond_assets": ["TVAL_BOND"], +} +SIPP_ASSET_TARGET_ALLOCATION_COLUMNS = { + target: [sipp_allocation_flag_for(column) for column in columns] + for target, columns in SIPP_ASSET_TARGET_SOURCE_COLUMNS.items() +} +SIPP_ASSET_ALLOCATION_COLUMNS = sorted( + { + column + for columns in SIPP_ASSET_TARGET_ALLOCATION_COLUMNS.values() + for column in columns + } +) -ASSET_COLUMNS = [ - "SSUID", - "PNUM", - "MONTHCODE", - "SPANEL", - "SWAVE", - "WPFINWGT", - "TAGE", - "ESEX", - "EMS", - "TSSSAMT", - "TRETINCAMT", - # Asset values (person-level sums from SIPP) - "TVAL_BANK", # Checking, savings, money market - "TVAL_STMF", # Stocks and mutual funds - "TVAL_BOND", # Bonds and government securities - # Income from assets (monthly, person-level) - "TINC_BANK", # Interest from bank accounts - "TINC_STMF", # Dividends from stocks/mutual funds - "TINC_BOND", # Interest from bonds - "TINC_RENT", # Rental income -] + ASSET_JOB_EARNINGS_COLUMNS +ASSET_COLUMNS = ( + [ + "SSUID", + "PNUM", + "MONTHCODE", + "SPANEL", + "SWAVE", + "WPFINWGT", + "TAGE", + "ESEX", + "EMS", + "TSSSAMT", + "TRETINCAMT", + # Asset values (person-level sums from SIPP) + "TVAL_BANK", # Checking, savings, money market + "TVAL_STMF", # Stocks and mutual funds + "TVAL_BOND", # Bonds and government securities + # Income from assets (monthly, person-level) + "TINC_BANK", # Interest from bank accounts + "TINC_STMF", # Dividends from stocks/mutual funds + "TINC_BOND", # Interest from bonds + "TINC_RENT", # Rental income + ] + + ASSET_JOB_EARNINGS_COLUMNS + + SIPP_ASSET_ALLOCATION_COLUMNS +) ASSET_PREDICTORS = [ "employment_income", @@ -257,6 +299,10 @@ def get_tip_model() -> QRF: "TDIS9AMT", "TDIS10AMT", ] +SSI_DISABILITY_LABEL_SOURCE_COLUMNS = ["RSSI_YRYN", "ESSI_BRSN"] +SSI_DISABILITY_LABEL_ALLOCATION_COLUMNS = [ + sipp_allocation_flag_for(column) for column in SSI_DISABILITY_LABEL_SOURCE_COLUMNS +] SSI_DISABILITY_COLUMNS = sorted( set( @@ -273,10 +319,16 @@ def get_tip_model() -> QRF: "ESSRSN2YN", "ESSI_BRSN", *SSI_DISABILITY_INCOME_AMOUNT_COLUMNS, + *SSI_DISABILITY_LABEL_ALLOCATION_COLUMNS, ] ) ) +SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS = { + "household_vehicles_owned": [sipp_allocation_flag_for("TVEH_NUM")], + "household_vehicles_value": [sipp_allocation_flag_for("THVAL_VEH")], +} + VEHICLE_COLUMNS = [ "SSUID", "PNUM", @@ -293,6 +345,8 @@ def get_tip_model() -> QRF: "TVEH_NUM", "THVAL_VEH", "THVAL_HOME", + "AVEH_NUM", + "AHVAL_VEH", ] @@ -440,6 +494,12 @@ def build_ssi_disability_training_frame( df["ssi_disability_training_candidate"] = (financial_candidate & under_65) | df[ SSI_DISABILITY_MODEL_VARIABLE ] + df = filter_observed_source_rows( + df, + target_name=SSI_DISABILITY_MODEL_VARIABLE, + source_columns=SSI_DISABILITY_LABEL_SOURCE_COLUMNS, + allocation_flag_columns=SSI_DISABILITY_LABEL_ALLOCATION_COLUMNS, + ) columns = SSI_DISABILITY_MODEL_PREDICTORS + [ SSI_DISABILITY_MODEL_VARIABLE, @@ -572,33 +632,39 @@ def train_asset_model(): "bond_assets", "household_weight", *ASSET_PREDICTORS, + *[ + column + for columns in SIPP_ASSET_TARGET_SOURCE_COLUMNS.values() + for column in columns + ], + *SIPP_ASSET_ALLOCATION_COLUMNS, ] ] - sipp = sipp[~sipp.isna().any(axis=1)] - - # Seed the weighted resample so the asset-model training frame - # (and the pickled QRF) is deterministic across rebuilds. - asset_rng = seeded_rng("sipp_asset_model_training_sample") - sipp = sipp.loc[ - asset_rng.choice( - sipp.index, - size=min(20_000, len(sipp)), - replace=True, - p=sipp.household_weight / sipp.household_weight.sum(), - ) + asset_vars = [ + "bank_account_assets", + "stock_assets", + "bond_assets", ] - + asset_target_filters = target_observed_source_masks( + sipp, + targets=asset_vars, + target_source_columns=SIPP_ASSET_TARGET_SOURCE_COLUMNS, + target_allocation_flag_columns=SIPP_ASSET_TARGET_ALLOCATION_COLUMNS, + ) + sipp, asset_target_filters = cap_training_sample( + sipp, + max_train_samples=20_000, + seed_name="sipp_asset_model_training_sample", + target_filters=asset_target_filters, + ) model = QRF() - model = model.fit( X_train=sipp, predictors=ASSET_PREDICTORS, - imputed_variables=[ - "bank_account_assets", - "stock_assets", - "bond_assets", - ], + imputed_variables=asset_vars, + target_filters=asset_target_filters, + weight_col="household_weight", ) return model @@ -606,7 +672,7 @@ def train_asset_model(): def get_asset_model() -> QRF: """Get or train the liquid asset imputation model.""" - model_path = STORAGE_FOLDER / "liquid_assets_v2.pkl" + model_path = STORAGE_FOLDER / "liquid_assets_v3.pkl" if not model_path.exists(): model = train_asset_model() @@ -668,7 +734,7 @@ def train_ssi_disability_model(time_period: int = 2024): def get_ssi_disability_model(time_period: int = 2024) -> QRF: """Get or train the SSI disability criteria imputation model.""" - model_path = STORAGE_FOLDER / f"ssi_disability_criteria_v1_{time_period}.pkl" + model_path = STORAGE_FOLDER / f"ssi_disability_criteria_v2_{time_period}.pkl" if not model_path.exists(): model = train_ssi_disability_model(time_period=time_period) @@ -731,6 +797,8 @@ def build_vehicle_training_frame() -> pd.DataFrame: "household_size": grouped.size(), "household_vehicles_owned": grouped["TVEH_NUM"].max().fillna(0), "household_vehicles_value": grouped["THVAL_VEH"].first().fillna(0), + "AVEH_NUM": grouped["AVEH_NUM"].max().fillna(0), + "AHVAL_VEH": grouped["AHVAL_VEH"].first().fillna(0), "is_homeowner": (grouped["THVAL_HOME"].first().fillna(0) > 0).astype( np.float32 ), @@ -762,33 +830,35 @@ def train_vehicle_model(): """Train a household-level vehicle asset model from SIPP 2023.""" sipp = build_vehicle_training_frame() sipp = sipp[~sipp.isna().any(axis=1)] - # Seed the weighted resample so the vehicle-model training frame (and - # the pickled QRF) is deterministic across rebuilds of the cache. - vehicle_rng = seeded_rng("sipp_vehicle_model_training_sample") - sipp = sipp.loc[ - vehicle_rng.choice( - sipp.index, - size=min(20_000, len(sipp)), - replace=True, - p=sipp.household_weight / sipp.household_weight.sum(), - ) + vehicle_vars = [ + "household_vehicles_owned", + "household_vehicles_value", ] - + vehicle_target_filters = target_observed_source_masks( + sipp, + targets=vehicle_vars, + target_allocation_flag_columns=SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS, + ) + sipp, vehicle_target_filters = cap_training_sample( + sipp, + max_train_samples=20_000, + seed_name="sipp_vehicle_model_training_sample", + target_filters=vehicle_target_filters, + ) model = QRF() model = model.fit( X_train=sipp, predictors=VEHICLE_MODEL_PREDICTORS, - imputed_variables=[ - "household_vehicles_owned", - "household_vehicles_value", - ], + imputed_variables=vehicle_vars, + target_filters=vehicle_target_filters, + weight_col="household_weight", ) return model def get_vehicle_model() -> QRF: """Get or train the household vehicle imputation model.""" - model_path = STORAGE_FOLDER / "household_vehicle_assets.pkl" + model_path = STORAGE_FOLDER / "household_vehicle_assets_v2.pkl" if not model_path.exists(): model = train_vehicle_model() diff --git a/policyengine_us_data/utils/source_quality.py b/policyengine_us_data/utils/source_quality.py new file mode 100644 index 000000000..a31560dd8 --- /dev/null +++ b/policyengine_us_data/utils/source_quality.py @@ -0,0 +1,227 @@ +"""Utilities for training imputations from observed donor-survey targets.""" + +from __future__ import annotations + +import logging +from collections.abc import Container, Mapping, Sequence + +import numpy as np +import pandas as pd + +from policyengine_us_data.utils.randomness import seeded_rng + +logger = logging.getLogger(__name__) + + +def sipp_allocation_flag_for(source_column: str) -> str: + """Return the SIPP allocation flag name for a source variable.""" + if not source_column: + raise ValueError("source_column must be non-empty") + return f"A{source_column[1:]}" + + +def require_columns_present( + available_columns: Container[str], + required_columns: Sequence[str], + *, + source_name: str, +) -> None: + """Raise if required donor-source provenance columns are unavailable.""" + missing_columns = sorted( + {column for column in required_columns if column not in available_columns} + ) + if missing_columns: + raise KeyError( + f"{source_name} is missing required source-quality columns: " + f"{', '.join(missing_columns)}. Regenerate the donor artifact with " + "allocation flag columns before fitting source imputations." + ) + + +def observed_source_mask( + df: pd.DataFrame, + *, + source_columns: Sequence[str] = (), + allocation_flag_columns: Sequence[str] = (), + require_nonmissing_source: bool = True, +) -> pd.Series: + """Mask rows whose donor source values are observed for one target. + + Source-survey allocation flags conventionally use ``0`` for not allocated + and non-zero values for allocated/imputed. Missing flag columns are ignored + so callers can use this helper across sources with different flag coverage. + """ + mask = pd.Series(True, index=df.index) + + if require_nonmissing_source: + for column in source_columns: + if column in df: + mask &= df[column].notna() + + for column in allocation_flag_columns: + if column not in df: + continue + flag = pd.to_numeric(df[column], errors="coerce").fillna(0) + mask &= flag.eq(0) + + return mask + + +def filter_observed_source_rows( + df: pd.DataFrame, + *, + target_name: str, + source_columns: Sequence[str] = (), + allocation_flag_columns: Sequence[str] = (), + require_nonmissing_source: bool = True, +) -> pd.DataFrame: + """Return rows whose source values are observed for ``target_name``.""" + mask = observed_source_mask( + df, + source_columns=source_columns, + allocation_flag_columns=allocation_flag_columns, + require_nonmissing_source=require_nonmissing_source, + ) + dropped = int((~mask).sum()) + if dropped: + logger.info( + "Dropped %d/%d donor rows with imputed source values for %s", + dropped, + len(df), + target_name, + ) + return df.loc[mask].copy() + + +def target_observed_source_masks( + df: pd.DataFrame, + *, + targets: Sequence[str], + target_source_columns: Mapping[str, Sequence[str]] | None = None, + target_allocation_flag_columns: Mapping[str, Sequence[str]] | None = None, + require_nonmissing_source: bool = True, +) -> dict[str, pd.Series]: + """Return target-specific masks for rows with observed source values. + + The masks can be passed directly to microimpute models that support + ``target_filters``. Source values are checked per target so a row with an + allocated value for one target can still train another target whose source + value is observed. + """ + target_source_columns = target_source_columns or {} + target_allocation_flag_columns = target_allocation_flag_columns or {} + + masks = {} + for target in targets: + masks[target] = observed_source_mask( + df, + source_columns=target_source_columns.get(target, (target,)), + allocation_flag_columns=target_allocation_flag_columns.get(target, ()), + require_nonmissing_source=require_nonmissing_source, + ) + if not masks[target].any(): + raise ValueError(f"No observed donor rows available for {target}") + dropped = int((~masks[target]).sum()) + if dropped: + logger.info( + "Target %s has %d observed donor rows; excluded %d rows", + target, + int(masks[target].sum()), + dropped, + ) + + return masks + + +def cap_training_sample( + df: pd.DataFrame, + *, + max_train_samples: int, + seed_name: str, + target_filters: Mapping[str, pd.Series] | None = None, +) -> tuple[pd.DataFrame, dict[str, pd.Series]]: + """Deterministically cap a target-filtered QRF training frame. + + microimpute's QRF ``max_train_samples`` currently does not subsample when + ``target_filters`` are present, because target-specific masks must remain + aligned with each target's training rows. This helper applies the same + positional, without-replacement cap locally and returns reindexed masks. + """ + if max_train_samples < 1: + raise ValueError("max_train_samples must be a positive integer") + + filters = {} + for target, mask in (target_filters or {}).items(): + aligned = mask.reindex(df.index) + if aligned.isna().any(): + raise ValueError(f"target_filters[{target!r}] contains missing values") + filters[target] = aligned.astype(bool) + + if not filters: + if len(df) <= max_train_samples: + return df, filters + sample_positions = seeded_rng(seed_name).choice( + len(df), + size=max_train_samples, + replace=False, + ) + else: + if max_train_samples < len(filters): + raise ValueError( + "max_train_samples must be at least the number of target filters" + ) + union_mask = pd.Series(False, index=df.index) + for mask in filters.values(): + union_mask |= mask + if not union_mask.any(): + raise ValueError("No observed donor rows available across target_filters") + + union_positions = np.flatnonzero(union_mask.to_numpy()) + if len(union_positions) <= max_train_samples: + sample_positions = union_positions + else: + selected: list[int] = [] + selected_set: set[int] = set() + per_target_cap = max(1, max_train_samples // len(filters)) + for target, mask in filters.items(): + target_positions = np.flatnonzero(mask.to_numpy()) + target_n = min(per_target_cap, len(target_positions)) + target_sample = seeded_rng(seed_name, salt=target).choice( + target_positions, + size=target_n, + replace=False, + ) + for position in target_sample: + if int(position) not in selected_set: + selected.append(int(position)) + selected_set.add(int(position)) + + remaining_n = max_train_samples - len(selected) + if remaining_n > 0: + remaining_positions = np.array( + [ + position + for position in union_positions + if int(position) not in selected_set + ], + dtype=int, + ) + if len(remaining_positions): + fill_sample = seeded_rng(seed_name, salt="fill").choice( + remaining_positions, + size=min(remaining_n, len(remaining_positions)), + replace=False, + ) + selected.extend(int(position) for position in fill_sample) + + sample_positions = np.asarray(selected, dtype=int) + + sampled_df = df.iloc[sample_positions].copy().reset_index(drop=True) + sampled_filters = { + target: pd.Series( + np.asarray(mask.iloc[sample_positions], dtype=bool), + index=sampled_df.index, + ) + for target, mask in filters.items() + } + return sampled_df, sampled_filters diff --git a/pyproject.toml b/pyproject.toml index 907d463d0..c55886690 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "tqdm>=4.60.0", "microdf_python>=1.2.1", "setuptools>=60", - "microimpute>=1.15.1", + "microimpute>=2.1.0", "pip-system-certs>=3.0", "google-cloud-storage>=2.0.0", "google-auth>=2.0.0", diff --git a/tests/integration/test_cps_generation.py b/tests/integration/test_cps_generation.py index eb8a68796..f1e72be34 100644 --- a/tests/integration/test_cps_generation.py +++ b/tests/integration/test_cps_generation.py @@ -296,6 +296,14 @@ def test_add_rent_requests_person_level_frames(monkeypatch, tmp_path): "is_household_head", data=np.ones(10_050, dtype=bool), ) + fake_acs_h5.create_dataset( + "rent_is_allocated", + data=np.zeros(10_050, dtype=bool), + ) + fake_acs_h5.create_dataset( + "real_estate_taxes_is_allocated", + data=np.zeros(10_050, dtype=bool), + ) class FakeACSDataset: file_path = fake_acs_path @@ -366,10 +374,16 @@ def predict(self, X_test): ) class FakeQRF: - def fit(self, X_train, predictors, imputed_variables): + def __init__(self, *args, **kwargs): + self.init_kwargs = kwargs + + def fit(self, X_train, predictors, imputed_variables, target_filters=None): + assert self.init_kwargs == {} assert len(X_train) == 10_000 assert predictors[-1] == "household_size" assert imputed_variables == ["rent", "real_estate_taxes"] + assert set(target_filters) == {"rent", "real_estate_taxes"} + assert all(len(mask) == 10_000 for mask in target_filters.values()) return FakeQRFModel() monkeypatch.setattr(policyengine_us, "Microsimulation", FakeMicrosimulation) diff --git a/tests/unit/calibration/test_source_impute.py b/tests/unit/calibration/test_source_impute.py index 70f6b4b32..b8c064ac4 100644 --- a/tests/unit/calibration/test_source_impute.py +++ b/tests/unit/calibration/test_source_impute.py @@ -5,10 +5,14 @@ import numpy as np import pandas as pd +import huggingface_hub +import pytest +from policyengine_us_data.calibration import source_impute from policyengine_us_data.calibration.source_impute import ( ACS_IMPUTED_VARIABLES, ACS_PREDICTORS, + ACS_TARGET_ALLOCATION_COLUMNS, ALL_SOURCE_VARIABLES, SCF_IMPUTED_VARIABLES, SCF_PREDICTORS, @@ -79,6 +83,10 @@ class TestConstants: def test_acs_variables_defined(self): assert "rent" in ACS_IMPUTED_VARIABLES assert "real_estate_taxes" in ACS_IMPUTED_VARIABLES + assert ACS_TARGET_ALLOCATION_COLUMNS == { + "rent": ["rent_is_allocated"], + "real_estate_taxes": ["real_estate_taxes_is_allocated"], + } def test_sipp_variables_defined(self): assert "tip_income" in SIPP_IMPUTED_VARIABLES @@ -327,6 +335,246 @@ def test_impute_acs_exists(self): def test_impute_sipp_exists(self): assert callable(_impute_sipp) + def test_calibration_sipp_tip_counts_use_reference_month(self, monkeypatch): + captured = {} + + columns = { + "SSUID": [1, 1, 1, 2], + "MONTHCODE": [1, 12, 12, 12], + "TAGE": [5, 40, 10, 30], + "TPTOTINC": [1_000.0, 2_000.0, 0.0, 3_000.0], + "WPFINWGT": [1.0, 1.0, 1.0, 1.0], + } + for column in source_impute.SIPP_TIP_AMOUNT_COLUMNS: + columns[column] = [0.0, 10.0, 0.0, 5.0] + for column in source_impute.SIPP_TIP_AMOUNT_TO_ALLOCATION_COLUMN.values(): + columns[column] = [0, 0, 0, 0] + for column in source_impute.SIPP_JOB_OCCUPATION_COLUMNS: + columns[column] = [0, 0, 0, 0] + tip_source = pd.DataFrame(columns) + + read_count = {"count": 0} + + def fake_read_csv(*args, **kwargs): + read_count["count"] += 1 + if read_count["count"] == 1: + return tip_source.copy() + raise FileNotFoundError("stop after tip imputation") + + class FakeQRF: + def __init__(self, *args, **kwargs): + captured["init_kwargs"] = kwargs + + def fit(self, X_train, **kwargs): + captured["train"] = X_train.copy() + captured["fit_kwargs"] = kwargs + return self + + def predict(self, X_test): + return pd.DataFrame({"tip_income": np.zeros(len(X_test))}) + + monkeypatch.setattr( + huggingface_hub, + "hf_hub_download", + lambda *args, **kwargs: None, + ) + monkeypatch.setattr(source_impute.pd, "read_csv", fake_read_csv) + monkeypatch.setattr(source_impute, "QRF", FakeQRF) + + data = _make_data_dict(n_persons=4) + _impute_sipp( + data=data, + state_fips=np.array([1, 1], dtype=np.int32), + time_period=2024, + ) + + household_one = captured["train"][captured["train"]["household_id"] == 1] + np.testing.assert_array_equal(household_one["count_under_18"], [1, 1]) + np.testing.assert_array_equal(household_one["count_under_6"], [0, 0]) + assert captured["init_kwargs"] == {} + assert set(captured["fit_kwargs"]["target_filters"]) == {"tip_income"} + assert len(captured["fit_kwargs"]["target_filters"]["tip_income"]) == len( + captured["train"] + ) + + def test_calibration_sipp_qrf_passes_target_filters(self, monkeypatch): + fit_calls = [] + + tip_columns = { + "SSUID": [1, 2, 3], + "MONTHCODE": [12, 12, 12], + "TAGE": [40, 30, 10], + "TPTOTINC": [1_000.0, 2_000.0, 0.0], + "WPFINWGT": [1.0, 1.0, 1.0], + } + for column in source_impute.SIPP_TIP_AMOUNT_COLUMNS: + tip_columns[column] = [10.0, 5.0, 0.0] + for column in source_impute.SIPP_TIP_AMOUNT_TO_ALLOCATION_COLUMN.values(): + tip_columns[column] = [0, 1, 0] + for column in source_impute.SIPP_JOB_OCCUPATION_COLUMNS: + tip_columns[column] = [0, 0, 0] + tip_source = pd.DataFrame(tip_columns) + + asset_columns = { + "SSUID": [1, 2, 3], + "PNUM": [1, 1, 1], + "MONTHCODE": [12, 12, 12], + "WPFINWGT": [1.0, 1.0, 1.0], + "TAGE": [40, 30, 10], + "ESEX": [1, 2, 1], + "EMS": [1, 2, 2], + "TSSSAMT": [0.0, 0.0, 0.0], + "TRETINCAMT": [0.0, 0.0, 0.0], + "TVAL_BANK": [100.0, 200.0, 300.0], + "TVAL_STMF": [10.0, 20.0, 30.0], + "TVAL_BOND": [1.0, 2.0, 3.0], + "TINC_BANK": [0.0, 0.0, 0.0], + "TINC_STMF": [0.0, 0.0, 0.0], + "TINC_BOND": [0.0, 0.0, 0.0], + "TINC_RENT": [0.0, 0.0, 0.0], + } + for column in source_impute.ASSET_JOB_EARNINGS_COLUMNS: + asset_columns[column] = [1_000.0, 2_000.0, 0.0] + for column in source_impute.SIPP_ASSET_ALLOCATION_COLUMNS: + asset_columns[column] = [0, 0, 0] + asset_columns["AVAL_BANK"] = [0, 1, 0] + asset_columns["AVAL_STMF"] = [0, 0, 1] + asset_source = pd.DataFrame(asset_columns) + + vehicle_train = pd.DataFrame( + { + **{ + predictor: [0.0, 1.0, 2.0] + for predictor in source_impute.VEHICLE_MODEL_PREDICTORS + }, + "household_vehicles_owned": [1.0, 2.0, 3.0], + "household_vehicles_value": [5_000.0, 10_000.0, 15_000.0], + "AVEH_NUM": [0, 1, 0], + "AHVAL_VEH": [0, 0, 1], + "household_weight": [1.0, 1.0, 1.0], + } + ) + + def fake_read_csv(path, *args, **kwargs): + if str(path).endswith("pu2023_slim.csv"): + return tip_source.copy() + if str(path).endswith("pu2023.csv"): + return asset_source.copy() + raise AssertionError(f"Unexpected read_csv path: {path}") + + class FakeQRF: + def __init__(self, *args, **kwargs): + self.init_kwargs = kwargs + + def fit(self, X_train, **kwargs): + self.imputed_variables = kwargs["imputed_variables"] + fit_calls.append( + { + "init_kwargs": self.init_kwargs, + "train": X_train.copy(), + "kwargs": kwargs, + } + ) + return self + + def predict(self, X_test): + return pd.DataFrame( + { + variable: np.zeros(len(X_test), dtype=np.float32) + for variable in self.imputed_variables + } + ) + + monkeypatch.setattr( + huggingface_hub, + "hf_hub_download", + lambda *args, **kwargs: None, + ) + monkeypatch.setattr(source_impute.pd, "read_csv", fake_read_csv) + monkeypatch.setattr(source_impute, "QRF", FakeQRF) + monkeypatch.setattr( + source_impute, + "get_ssi_disability_model", + lambda time_period: object(), + ) + monkeypatch.setattr( + source_impute, + "predict_ssi_disability_criteria", + lambda model, receiver: np.zeros(len(receiver), dtype=bool), + ) + monkeypatch.setattr( + source_impute, + "build_vehicle_training_frame", + lambda: vehicle_train.copy(), + ) + + _impute_sipp( + data=_make_data_dict(n_persons=6), + state_fips=np.array([1, 1, 1], dtype=np.int32), + time_period=2024, + ) + + by_targets = { + tuple(call["kwargs"]["imputed_variables"]): call for call in fit_calls + } + assert set(by_targets) == { + ("tip_income",), + ("bank_account_assets", "stock_assets", "bond_assets"), + ("household_vehicles_owned", "household_vehicles_value"), + } + for call in fit_calls: + assert call["init_kwargs"] == {} + filters = call["kwargs"]["target_filters"] + assert set(filters) == set(call["kwargs"]["imputed_variables"]) + assert all(len(mask) == len(call["train"]) for mask in filters.values()) + + tip_filters = by_targets[("tip_income",)]["kwargs"]["target_filters"] + assert len(by_targets[("tip_income",)]["train"]) == 2 + np.testing.assert_array_equal(tip_filters["tip_income"].values, [True, True]) + + asset_filters = by_targets[ + ("bank_account_assets", "stock_assets", "bond_assets") + ]["kwargs"]["target_filters"] + np.testing.assert_array_equal( + asset_filters["bank_account_assets"].values, + [True, False, True], + ) + np.testing.assert_array_equal( + asset_filters["stock_assets"].values, + [True, True, False], + ) + + vehicle_filters = by_targets[ + ("household_vehicles_owned", "household_vehicles_value") + ]["kwargs"]["target_filters"] + np.testing.assert_array_equal( + vehicle_filters["household_vehicles_owned"].values, + [True, False, True], + ) + np.testing.assert_array_equal( + vehicle_filters["household_vehicles_value"].values, + [True, True, False], + ) + + def test_calibration_sipp_tip_requires_allocation_flags(self, monkeypatch): + monkeypatch.setattr( + huggingface_hub, + "hf_hub_download", + lambda *args, **kwargs: None, + ) + monkeypatch.setattr( + source_impute.pd, + "read_csv", + lambda *args, **kwargs: pd.DataFrame({"TJB1_TXAMT": [10.0]}), + ) + + with pytest.raises(KeyError, match="AJB1_TXAMT"): + _impute_sipp( + data=_make_data_dict(n_persons=4), + state_fips=np.array([1, 1], dtype=np.int32), + time_period=2024, + ) + def test_impute_org_exists(self): assert callable(_impute_org) diff --git a/tests/unit/datasets/test_acs_tax_unit_construction.py b/tests/unit/datasets/test_acs_tax_unit_construction.py index 61bbf986c..c19fbf517 100644 --- a/tests/unit/datasets/test_acs_tax_unit_construction.py +++ b/tests/unit/datasets/test_acs_tax_unit_construction.py @@ -3,7 +3,8 @@ import pandas as pd import pytest -from policyengine_us_data.datasets.acs.acs import ACS +from policyengine_us_data.datasets.acs.acs import ACS, ACS_2022 +from policyengine_us_data.datasets.acs.census_acs import CensusACS from policyengine_us_data.datasets.acs.acs_to_cps_columns import ( acs_person_to_cps_tax_unit_columns, ) @@ -235,3 +236,96 @@ def test_acs_add_id_variables_writes_related_to_head_or_spouse(): assert person_tax_unit_id.tolist() == [1, 1] assert is_related_to_head_or_spouse.tolist() == [True, False] + + +def test_acs_add_person_variables_requires_raw_allocation_flags(): + person = _acs_person_fixture(household_id=[0]) + household = pd.DataFrame( + { + "household_id": [0], + "RNTP": [1_200], + "TAXAMT": [500], + "TEN": [3], + } + ) + + with h5py.File("memory", mode="w", driver="core", backing_store=False) as acs: + with pytest.raises(KeyError, match="FRNTP"): + ACS.add_person_variables(acs, person, household) + + +def test_acs_add_person_variables_writes_allocation_flags_for_heads_only(): + person = _acs_person_fixture( + household_id=[0, 0], + SPORDER=[1, 2], + AGEP=[40, 10], + ) + household = pd.DataFrame( + { + "household_id": [0], + "RNTP": [1_200], + "TAXAMT": [500], + "FRNTP": [1], + "FTAXP": [0], + "TEN": [3], + } + ) + + with h5py.File("memory", mode="w", driver="core", backing_store=False) as acs: + ACS.add_person_variables(acs, person, household) + rent_is_allocated = acs["rent_is_allocated"][:] + real_estate_taxes_is_allocated = acs["real_estate_taxes_is_allocated"][:] + + assert rent_is_allocated.tolist() == [True, False] + assert real_estate_taxes_is_allocated.tolist() == [False, False] + + +def test_acs_2022_does_not_download_stale_release_artifact(): + assert ACS_2022.url is None + + +def test_acs_exists_rejects_stale_artifact_missing_allocation_flags(tmp_path): + path = tmp_path / "acs_2022.h5" + with h5py.File(path, "w") as h5: + h5.create_dataset("is_household_head", data=np.array([True])) + + class TempACS(ACS): + name = "temp_acs" + label = "Temp ACS" + file_path = path + + assert TempACS().exists is False + + with h5py.File(path, "a") as h5: + h5.create_dataset("rent_is_allocated", data=np.array([False])) + h5.create_dataset("real_estate_taxes_is_allocated", data=np.array([False])) + + assert TempACS().exists is True + + +def test_raw_census_acs_exists_rejects_stale_cache_missing_allocation_flags(tmp_path): + path = tmp_path / "census_acs_2022.h5" + + class TempCensusACS(CensusACS): + name = "temp_census_acs" + label = "Temp Census ACS" + file_path = path + + with pd.HDFStore(path, mode="w") as storage: + storage["household"] = pd.DataFrame({"SERIALNO": ["1"], "RNTP": [1_200]}) + storage["person"] = pd.DataFrame({"SERIALNO": ["1"]}) + + assert TempCensusACS().exists is False + + with pd.HDFStore(path, mode="w") as storage: + storage["household"] = pd.DataFrame( + { + "SERIALNO": ["1"], + "RNTP": [1_200], + "FRNTP": [0], + "FTAXP": [0], + } + ) + storage["person"] = pd.DataFrame({"SERIALNO": ["1"]}) + + assert TempCensusACS().exists is True diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py index 9d500953f..be351bd29 100644 --- a/tests/unit/datasets/test_cps_file_handles.py +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -390,6 +390,8 @@ def recording_hdfstore(path, mode="a", *args, **kwargs): acs_fixture_path = tmp_path / "acs_fixture.h5" with h5py.File(acs_fixture_path, "w") as acs_fixture: acs_fixture["is_household_head"] = np.ones(10_000, dtype=bool) + acs_fixture["rent_is_allocated"] = np.zeros(10_000, dtype=bool) + acs_fixture["real_estate_taxes_is_allocated"] = np.zeros(10_000, dtype=bool) real_h5py_file = cps_module.h5py.File opened_h5_paths = [] @@ -403,7 +405,11 @@ def recording_h5py_file(path, mode="r", *args, **kwargs): return real_h5py_file(path, mode=mode, *args, **kwargs) class FakeQRF: - def fit(self, X_train, predictors, imputed_variables): + def __init__(self, *args, **kwargs): + pass + + def fit(self, X_train, predictors, imputed_variables, **kwargs): + assert set(kwargs["target_filters"]) == set(imputed_variables) return self def predict(self, X_test): diff --git a/tests/unit/datasets/test_rng_seeding.py b/tests/unit/datasets/test_rng_seeding.py index 487d34995..c14ee1f67 100644 --- a/tests/unit/datasets/test_rng_seeding.py +++ b/tests/unit/datasets/test_rng_seeding.py @@ -130,7 +130,7 @@ def test_select_random_subset_uses_local_generator_only(): def test_sipp_training_samples_use_seeded_rng(): """N5: the weighted resample for tip and asset training frames - must come from a seeded Generator, not the global ``np.random``.""" + must come from a deterministic sampler, not the global ``np.random``.""" src = SIPP_SOURCE.read_text() assert "seeded_rng(" in src, "sipp.py must import/use seeded_rng()" tree = ast.parse(src) @@ -149,8 +149,9 @@ def test_sipp_training_samples_use_seeded_rng(): assert "np.random.choice" not in fn_src, ( f"{fn_name} must not use np.random.choice (use a seeded_rng Generator)" ) - assert "seeded_rng(" in fn_src, ( - f"{fn_name} must derive its resampler from a seeded generator" + assert "seeded_rng(" in fn_src or "max_train_samples=" in fn_src, ( + f"{fn_name} must derive its resampler from a seeded generator or " + "delegate capped sampling to microimpute's deterministic QRF sampler" ) diff --git a/tests/unit/datasets/test_sipp_ssi_disability.py b/tests/unit/datasets/test_sipp_ssi_disability.py index 3a87c89a3..ac883ad8f 100644 --- a/tests/unit/datasets/test_sipp_ssi_disability.py +++ b/tests/unit/datasets/test_sipp_ssi_disability.py @@ -69,6 +69,22 @@ def test_build_ssi_disability_training_frame_uses_all_disability_amounts(): def test_ssi_disability_training_usecols_include_label_and_income_columns(): assert {"TPTOTINC", "RSSI_YRYN"} <= set(SSI_DISABILITY_COLUMNS) + assert {"ASSI_YRYN", "ASSI_BRSN"} <= set(SSI_DISABILITY_COLUMNS) + + +def test_build_ssi_disability_training_frame_excludes_allocated_label_source(): + frame = _base_sipp_frame() + frame.loc[0, "ASSI_YRYN"] = 1 + frame.loc[1:, "ASSI_YRYN"] = 0 + frame["ASSI_BRSN"] = 0 + + result = build_ssi_disability_training_frame(frame) + + assert len(result) == 3 + np.testing.assert_array_equal( + result[SSI_DISABILITY_MODEL_VARIABLE].values, + np.array([False, False, False]), + ) def test_prepare_ssi_disability_receiver_fills_missing_predictors(): diff --git a/tests/unit/datasets/test_sipp_tip_columns.py b/tests/unit/datasets/test_sipp_tip_columns.py index ba585a1d5..41ccff436 100644 --- a/tests/unit/datasets/test_sipp_tip_columns.py +++ b/tests/unit/datasets/test_sipp_tip_columns.py @@ -2,10 +2,14 @@ Previously `str.contains("TXAMT")` matched both `TJB*_TXAMT` (dollar amounts) and `AJB*_TXAMT` (Census allocation flags). The fix narrows -to `TJB\\d_TXAMT` (dollar-amount columns only). +to explicit `TJB*_TXAMT` dollar-amount columns only. """ import pandas as pd +import pytest + +from policyengine_us_data.datasets.sipp.sipp import SIPP_TIP_AMOUNT_COLUMNS +import policyengine_us_data.datasets.sipp.sipp as sipp_module def test_tip_regex_matches_dollar_amounts_only(): @@ -24,7 +28,7 @@ def test_tip_regex_matches_dollar_amounts_only(): ] ) - matches = columns[columns.str.match(r"TJB\d_TXAMT")] + matches = [column for column in SIPP_TIP_AMOUNT_COLUMNS if column in columns] assert list(matches) == ["TJB1_TXAMT", "TJB2_TXAMT"] @@ -39,9 +43,8 @@ def test_tip_sum_excludes_allocation_flags(): } ) # Mirror the sipp.py computation using the new regex. - tip_income_monthly = ( - df[df.columns[df.columns.str.match(r"TJB\d_TXAMT")]].fillna(0).sum(axis=1) - ) + tip_cols = [column for column in SIPP_TIP_AMOUNT_COLUMNS if column in df] + tip_income_monthly = df[tip_cols].fillna(0).sum(axis=1) assert list(tip_income_monthly) == [150.0, 275.0] # Sanity check: the buggy regex would have included AJB flags. @@ -49,3 +52,17 @@ def test_tip_sum_excludes_allocation_flags(): df[df.columns[df.columns.str.contains("TXAMT")]].fillna(0).sum(axis=1) ) assert list(buggy_tip_income_monthly) == [151.0, 278.0] + + +def test_train_tip_model_requires_allocation_flags_for_present_tip_columns( + monkeypatch, +): + monkeypatch.setattr(sipp_module, "hf_hub_download", lambda *args, **kwargs: None) + monkeypatch.setattr( + sipp_module.pd, + "read_csv", + lambda *args, **kwargs: pd.DataFrame({"TJB1_TXAMT": [10.0]}), + ) + + with pytest.raises(KeyError, match="AJB1_TXAMT"): + sipp_module.train_tip_model() diff --git a/tests/unit/test_source_quality.py b/tests/unit/test_source_quality.py new file mode 100644 index 000000000..505d8d91a --- /dev/null +++ b/tests/unit/test_source_quality.py @@ -0,0 +1,203 @@ +import numpy as np +import pandas as pd + +from policyengine_us_data.utils.source_quality import ( + cap_training_sample, + observed_source_mask, + require_columns_present, + sipp_allocation_flag_for, + target_observed_source_masks, +) + + +def test_sipp_allocation_flag_for_source_column(): + assert sipp_allocation_flag_for("TVAL_BANK") == "AVAL_BANK" + assert sipp_allocation_flag_for("RSSI_YRYN") == "ASSI_YRYN" + assert sipp_allocation_flag_for("TJB1_TXAMT") == "AJB1_TXAMT" + + +def test_require_columns_present_accepts_available_columns(): + require_columns_present( + {"rent_is_allocated", "real_estate_taxes_is_allocated"}, + ["rent_is_allocated", "real_estate_taxes_is_allocated"], + source_name="ACS", + ) + + +def test_require_columns_present_raises_for_missing_columns(): + try: + require_columns_present( + {"rent_is_allocated"}, + ["rent_is_allocated", "real_estate_taxes_is_allocated"], + source_name="ACS", + ) + except KeyError as error: + message = str(error) + else: + raise AssertionError("Expected missing source-quality columns to fail") + + assert "real_estate_taxes_is_allocated" in message + assert "Regenerate the donor artifact" in message + + +def test_observed_source_mask_excludes_nonzero_allocation_flags(): + df = pd.DataFrame( + { + "TVAL_BANK": [100.0, 200.0, 300.0], + "AVAL_BANK": [0, 1, 2], + } + ) + + result = observed_source_mask( + df, + source_columns=["TVAL_BANK"], + allocation_flag_columns=["AVAL_BANK"], + ) + + np.testing.assert_array_equal(result.values, [True, False, False]) + + +def test_observed_source_mask_is_target_specific(): + df = pd.DataFrame( + { + "tip_income": [10.0, 20.0], + "bank_account_assets": [100.0, 200.0], + "AJB1_TXAMT": [0, 0], + "AVAL_BANK": [1, 0], + } + ) + + tip_mask = observed_source_mask( + df, + source_columns=["tip_income"], + allocation_flag_columns=["AJB1_TXAMT"], + ) + bank_mask = observed_source_mask( + df, + source_columns=["bank_account_assets"], + allocation_flag_columns=["AVAL_BANK"], + ) + + np.testing.assert_array_equal(tip_mask.values, [True, True]) + np.testing.assert_array_equal(bank_mask.values, [False, True]) + + +def test_observed_source_mask_allows_missing_tip_components_when_requested(): + df = pd.DataFrame( + { + "TJB1_TXAMT": [np.nan, 5.0], + "AJB1_TXAMT": [0, 0], + } + ) + + result = observed_source_mask( + df, + source_columns=["TJB1_TXAMT"], + allocation_flag_columns=["AJB1_TXAMT"], + require_nonmissing_source=False, + ) + + np.testing.assert_array_equal(result.values, [True, True]) + + +def test_target_observed_source_masks_are_target_specific(): + df = pd.DataFrame( + { + "tip_income": [10.0, 20.0], + "bank_account_assets": [100.0, 200.0], + "AJB1_TXAMT": [0, 0], + "AVAL_BANK": [1, 0], + } + ) + + result = target_observed_source_masks( + df, + targets=["tip_income", "bank_account_assets"], + target_allocation_flag_columns={ + "tip_income": ["AJB1_TXAMT"], + "bank_account_assets": ["AVAL_BANK"], + }, + ) + + np.testing.assert_array_equal(result["tip_income"].values, [True, True]) + np.testing.assert_array_equal(result["bank_account_assets"].values, [False, True]) + + +def test_cap_training_sample_keeps_target_filters_aligned(): + df = pd.DataFrame({"value": np.arange(20)}, index=np.arange(100, 120)) + filters = { + "value": pd.Series( + [i % 2 == 0 for i in range(20)], + index=df.index, + ) + } + + sampled, sampled_filters = cap_training_sample( + df, + max_train_samples=5, + seed_name="unit_test_cap_training_sample", + target_filters=filters, + ) + + assert len(sampled) == 5 + assert list(sampled.index) == list(sampled_filters["value"].index) + assert sampled["value"].mod(2).eq(0).all() + np.testing.assert_array_equal(sampled_filters["value"].values, [True] * 5) + + +def test_cap_training_sample_uses_observed_union_before_capping(): + df = pd.DataFrame({"value": np.arange(100)}) + filters = { + "value": pd.Series( + [False] * 97 + [True] * 3, + index=df.index, + ) + } + + sampled, sampled_filters = cap_training_sample( + df, + max_train_samples=5, + seed_name="unit_test_sparse_cap_training_sample", + target_filters=filters, + ) + + assert sampled["value"].tolist() == [97, 98, 99] + np.testing.assert_array_equal(sampled_filters["value"].values, [True, True, True]) + + +def test_cap_training_sample_preserves_each_target_when_capping(): + df = pd.DataFrame({"value": np.arange(100)}) + filters = { + "rare_a": pd.Series([True, True] + [False] * 98, index=df.index), + "rare_b": pd.Series([False] * 98 + [True, True], index=df.index), + } + + sampled, sampled_filters = cap_training_sample( + df, + max_train_samples=4, + seed_name="unit_test_target_coverage_cap_training_sample", + target_filters=filters, + ) + + assert set(sampled["value"]) == {0, 1, 98, 99} + assert sampled_filters["rare_a"].sum() == 2 + assert sampled_filters["rare_b"].sum() == 2 + + +def test_cap_training_sample_rejects_misaligned_filters(): + df = pd.DataFrame({"value": [1, 2]}, index=[10, 11]) + filters = {"value": pd.Series([True, False], index=[0, 1])} + + try: + cap_training_sample( + df, + max_train_samples=1, + seed_name="unit_test_misaligned_cap_training_sample", + target_filters=filters, + ) + except ValueError as error: + message = str(error) + else: + raise AssertionError("Expected misaligned target filters to fail") + + assert "target_filters['value']" in message diff --git a/uv.lock b/uv.lock index 81d80b028..593e371fd 100644 --- a/uv.lock +++ b/uv.lock @@ -1409,7 +1409,7 @@ wheels = [ [[package]] name = "microimpute" -version = "1.15.1" +version = "2.1.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "joblib" }, @@ -1426,9 +1426,9 @@ dependencies = [ { name = "statsmodels" }, { name = "tqdm" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/97/17/d621d4ed40e0afac6f1a2c4dea423783576613820d1460ae30d65c48309e/microimpute-1.15.1.tar.gz", hash = "sha256:af409525d475efeb8c8526e9630834c4f16563e15cd42665117d2a1397fcf404", size = 128669, upload-time = "2026-03-09T15:59:33.885Z" } +sdist = { url = "https://files.pythonhosted.org/packages/99/55/49c2244cb8485196152065120b8d2e3c8da18331e198233b4beccfa17039/microimpute-2.1.0.tar.gz", hash = "sha256:ebdcd86eebbdd03130cb22335aef5de672b523c9a67771a5db6b6059c8e9d696", size = 146040, upload-time = "2026-05-21T17:53:54.731Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/42/f1/1d80dbb8cc9e85962524a4233cfe42ac1a78e6f2cc0ca479ed1817f6d8ae/microimpute-1.15.1-py3-none-any.whl", hash = "sha256:f5f2de91eeedea28ddae42d42757b558d6eb85c1a1fd6a9097b53e309f19369c", size = 111313, upload-time = "2026-03-09T15:59:32.553Z" }, + { url = "https://files.pythonhosted.org/packages/da/ed/274ec5f92ce49d367c09e0038cbb6adbe3c81941bf06b162cf9a2d8bdebd/microimpute-2.1.0-py3-none-any.whl", hash = "sha256:04463740c2091bbbe7552b9bd87bc3dd472902a3798b96b6c9924b8f9870c4dd", size = 127301, upload-time = "2026-05-21T17:53:53.212Z" }, ] [[package]] @@ -2199,7 +2199,7 @@ requires-dist = [ { name = "google-cloud-storage", specifier = ">=2.0.0" }, { name = "l0-python", marker = "extra == 'l0'" }, { name = "microdf-python", specifier = ">=1.2.1" }, - { name = "microimpute", specifier = ">=1.15.1" }, + { name = "microimpute", specifier = ">=2.1.0" }, { name = "openpyxl", specifier = ">=3.1.5" }, { name = "pandas", specifier = ">=2.3.1" }, { name = "pip-system-certs", specifier = ">=3.0" },