Skip to content

Commit e857306

Browse files
committed
Use microimpute target filters for source imputations
1 parent fde866d commit e857306

16 files changed

Lines changed: 534 additions & 161 deletions

File tree

AGENTS.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ read `docs/engineering/skills/pipeline_operations.md`.
2323
When adding, changing, or reviewing calibration target definitions, read
2424
`docs/engineering/skills/calibration_targets.md`.
2525

26+
When adding, changing, or reviewing donor-survey imputations, read
27+
`docs/engineering/skills/imputation.md`.
28+
2629
## Calibration targets
2730

2831
Manually sourced national or local-file calibration targets must be registered

docs/engineering/skills/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ Current skills:
1414
notes.
1515
- `github-prs.md`: same-repository PR workflow, PR head verification, and title
1616
conventions.
17+
- `imputation.md`: donor-survey imputation provenance rules, including
18+
target-level exclusion of allocated source values.
1719
- `pipeline_docs.md`: decorator-backed pipeline map maintenance and generated
1820
pydoc-style artifacts.
1921
- `pipeline_operations.md`: model-neutral workflow for diagnosing deployed Modal
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Imputation
2+
3+
Use this guide when adding, changing, or reviewing donor-survey imputations.
4+
5+
## Source Provenance
6+
7+
Do not train an imputation target on donor rows whose source value for that
8+
target is itself allocated, hot-decked, edited, or imputed by the source survey.
9+
Wire source-survey allocation or quality flags into the training frame whenever
10+
the donor file exposes them.
11+
12+
Apply this rule at the target-variable level, not the donor-row level. A donor
13+
row with observed tip income but allocated bank-account assets can train
14+
`tip_income`; the same row must be excluded from the `bank_account_assets`
15+
training target. Use `policyengine_us_data.utils.source_quality` to build
16+
target masks, then pass them to `microimpute` through `target_filters` or
17+
`row_filter` so the filtering logic lives in the imputation library rather than
18+
in one-off model wrappers.
19+
20+
Do not drop final CPS, ECPS, or calibration records solely because a donor
21+
survey target was excluded from training. The exclusion applies to donor
22+
training rows only; recipient datasets should remain complete.
23+
24+
When a donor source lacks target-level quality flags, document that limitation
25+
near the imputation code and keep the training surface structured so flags can
26+
be added later.
27+
28+
## Tests
29+
30+
Add focused regression tests when adding a donor imputation or a source-quality
31+
flag:
32+
33+
- allocation flags are read from the donor source,
34+
- allocated source values are excluded for the affected target,
35+
- unrelated observed targets from the same row can still train, and
36+
- legacy and current imputation surfaces use the same target provenance rule.

policyengine_us_data/calibration/source_impute.py

Lines changed: 93 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,26 @@
2626
"""
2727

2828
import gc
29+
import h5py
2930
import logging
3031
from typing import Dict, Optional
3132

3233
import numpy as np
3334
import pandas as pd
35+
from microimpute.models.qrf import QRF
3436
from policyengine_us_data.datasets.cps.tipped_occupation import (
3537
derive_any_treasury_tipped_occupation_code,
3638
derive_is_tipped_occupation,
3739
)
3840
from policyengine_us_data.datasets.sipp.sipp import (
3941
ASSET_JOB_EARNINGS_COLUMNS,
4042
ASSET_PREDICTORS,
43+
SIPP_ASSET_ALLOCATION_COLUMNS,
44+
SIPP_ASSET_TARGET_ALLOCATION_COLUMNS,
45+
SIPP_ASSET_TARGET_SOURCE_COLUMNS,
46+
SIPP_TIP_ALLOCATION_COLUMNS,
47+
SIPP_TIP_AMOUNT_COLUMNS,
48+
SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS,
4149
SSI_DISABILITY_MODEL_VARIABLE,
4250
VEHICLE_MODEL_PREDICTORS,
4351
build_vehicle_training_frame,
@@ -72,13 +80,20 @@
7280
)
7381
from policyengine_us_data.pipeline_metadata import pipeline_node
7482
from policyengine_us_data.pipeline_schema import PipelineNode
83+
from policyengine_us_data.utils.source_quality import (
84+
target_observed_source_masks,
85+
)
7586

7687
logger = logging.getLogger(__name__)
7788

7889
ACS_IMPUTED_VARIABLES = [
7990
"rent",
8091
"real_estate_taxes",
8192
]
93+
ACS_TARGET_ALLOCATION_COLUMNS = {
94+
"rent": ["rent_is_allocated"],
95+
"real_estate_taxes": ["real_estate_taxes_is_allocated"],
96+
}
8297

8398
SIPP_IMPUTED_VARIABLES = [
8499
"tip_income",
@@ -504,7 +519,6 @@ def _impute_acs(
504519
Returns:
505520
Updated data dict.
506521
"""
507-
from microimpute.models.qrf import QRF
508522
from policyengine_us import Microsimulation
509523

510524
from policyengine_us_data.datasets.acs.acs import ACS_2022
@@ -518,8 +532,13 @@ def _impute_acs(
518532
acs_df["state_fips"] = acs.calculate("state_fips", map_to="person").values.astype(
519533
np.float32
520534
)
535+
with h5py.File(ACS_2022.file_path, "r") as acs_h5:
536+
for flag_columns in ACS_TARGET_ALLOCATION_COLUMNS.values():
537+
for flag_column in flag_columns:
538+
if flag_column in acs_h5:
539+
acs_df[flag_column] = np.asarray(acs_h5[flag_column], dtype=bool)
521540

522-
train_df = acs_df[acs_df.is_household_head].sample(10_000, random_state=42)
541+
train_df = acs_df[acs_df.is_household_head].copy()
523542
train_df = _encode_tenure_type(train_df)
524543
del acs
525544

@@ -545,17 +564,21 @@ def _impute_acs(
545564
)
546565
cps_heads = cps_df[mask]
547566

548-
qrf = QRF()
549567
logger.info(
550568
"ACS QRF: %d train, %d test, %d predictors",
551569
len(train_df),
552570
len(cps_heads),
553571
len(predictors),
554572
)
555-
fitted = qrf.fit(
573+
fitted = QRF(max_train_samples=10_000).fit(
556574
X_train=train_df,
557575
predictors=predictors,
558576
imputed_variables=ACS_IMPUTED_VARIABLES,
577+
target_filters=target_observed_source_masks(
578+
train_df,
579+
targets=ACS_IMPUTED_VARIABLES,
580+
target_allocation_flag_columns=ACS_TARGET_ALLOCATION_COLUMNS,
581+
),
559582
)
560583
predictions = fitted.predict(X_test=cps_heads)
561584

@@ -606,12 +629,8 @@ def _impute_sipp(
606629
Updated data dict.
607630
"""
608631
from huggingface_hub import hf_hub_download
609-
from microimpute.models.qrf import QRF
610-
611632
from policyengine_us_data.storage import STORAGE_FOLDER
612633

613-
rng = np.random.default_rng(seed=88)
614-
615634
hf_hub_download(
616635
repo_id="PolicyEngine/policyengine-us-data",
617636
filename="pu2023_slim.csv",
@@ -620,12 +639,10 @@ def _impute_sipp(
620639
)
621640
sipp_df = pd.read_csv(STORAGE_FOLDER / "pu2023_slim.csv")
622641

623-
sipp_df["tip_income"] = (
624-
sipp_df[sipp_df.columns[sipp_df.columns.str.contains("TXAMT")]]
625-
.fillna(0)
626-
.sum(axis=1)
627-
* 12
628-
)
642+
tip_amount_columns = [
643+
column for column in SIPP_TIP_AMOUNT_COLUMNS if column in sipp_df
644+
]
645+
sipp_df["tip_income"] = sipp_df[tip_amount_columns].fillna(0).sum(axis=1) * 12
629646
sipp_df["employment_income"] = sipp_df.TPTOTINC * 12
630647
sipp_df["age"] = sipp_df.TAGE
631648
sipp_df["household_weight"] = sipp_df.WPFINWGT
@@ -645,6 +662,16 @@ def _impute_sipp(
645662
sipp_df["count_under_6"] = (
646663
sipp_df.groupby("SSUID")["is_under_6"].sum().loc[sipp_df.SSUID.values].values
647664
)
665+
if "MONTHCODE" in sipp_df:
666+
sipp_df = sipp_df[sipp_df["MONTHCODE"] == 12].copy()
667+
668+
tip_target_filters = target_observed_source_masks(
669+
sipp_df,
670+
targets=["tip_income"],
671+
target_source_columns={"tip_income": tip_amount_columns},
672+
target_allocation_flag_columns={"tip_income": SIPP_TIP_ALLOCATION_COLUMNS},
673+
require_nonmissing_source=False,
674+
)
648675

649676
tip_cols = [
650677
"household_id",
@@ -657,14 +684,6 @@ def _impute_sipp(
657684
"household_weight",
658685
]
659686
tip_train = sipp_df[tip_cols].dropna()
660-
tip_train = tip_train.loc[
661-
rng.choice(
662-
tip_train.index,
663-
size=min(10_000, len(tip_train)),
664-
replace=True,
665-
p=(tip_train.household_weight / tip_train.household_weight.sum()),
666-
)
667-
]
668687

669688
cps_tip_df = _build_cps_receiver(
670689
data, time_period, dataset_path, ["employment_income", "age"]
@@ -691,16 +710,17 @@ def _impute_sipp(
691710
else:
692711
cps_tip_df["is_tipped_occupation"] = 0.0
693712

694-
qrf = QRF()
695713
logger.info(
696714
"SIPP tips QRF: %d train, %d test",
697715
len(tip_train),
698716
len(cps_tip_df),
699717
)
700-
fitted = qrf.fit(
718+
fitted = QRF(max_train_samples=10_000).fit(
701719
X_train=tip_train,
702720
predictors=SIPP_TIPS_PREDICTORS,
703721
imputed_variables=["tip_income"],
722+
target_filters=tip_target_filters,
723+
weight_col="household_weight",
704724
)
705725
tip_preds = fitted.predict(X_test=cps_tip_df)
706726
data["tip_income"] = {
@@ -719,24 +739,28 @@ def _impute_sipp(
719739
repo_type="model",
720740
local_dir=STORAGE_FOLDER,
721741
)
722-
asset_cols = [
723-
"SSUID",
724-
"PNUM",
725-
"MONTHCODE",
726-
"WPFINWGT",
727-
"TAGE",
728-
"ESEX",
729-
"EMS",
730-
"TSSSAMT",
731-
"TRETINCAMT",
732-
"TVAL_BANK",
733-
"TVAL_STMF",
734-
"TVAL_BOND",
735-
"TINC_BANK",
736-
"TINC_STMF",
737-
"TINC_BOND",
738-
"TINC_RENT",
739-
] + ASSET_JOB_EARNINGS_COLUMNS
742+
asset_cols = (
743+
[
744+
"SSUID",
745+
"PNUM",
746+
"MONTHCODE",
747+
"WPFINWGT",
748+
"TAGE",
749+
"ESEX",
750+
"EMS",
751+
"TSSSAMT",
752+
"TRETINCAMT",
753+
"TVAL_BANK",
754+
"TVAL_STMF",
755+
"TVAL_BOND",
756+
"TINC_BANK",
757+
"TINC_STMF",
758+
"TINC_BOND",
759+
"TINC_RENT",
760+
]
761+
+ ASSET_JOB_EARNINGS_COLUMNS
762+
+ SIPP_ASSET_ALLOCATION_COLUMNS
763+
)
740764
asset_df = pd.read_csv(
741765
STORAGE_FOLDER / "pu2023.csv",
742766
delimiter="|",
@@ -751,16 +775,14 @@ def _impute_sipp(
751775
"bond_assets",
752776
"household_weight",
753777
*SIPP_ASSETS_PREDICTORS,
778+
*[
779+
column
780+
for columns in SIPP_ASSET_TARGET_SOURCE_COLUMNS.values()
781+
for column in columns
782+
],
783+
*SIPP_ASSET_ALLOCATION_COLUMNS,
754784
]
755-
asset_train = asset_df[asset_train_cols].dropna()
756-
asset_train = asset_train.loc[
757-
rng.choice(
758-
asset_train.index,
759-
size=min(20_000, len(asset_train)),
760-
replace=True,
761-
p=(asset_train.household_weight / asset_train.household_weight.sum()),
762-
)
763-
]
785+
asset_train = asset_df[asset_train_cols].copy()
764786

765787
cps_asset_df = _build_cps_receiver(
766788
data,
@@ -789,16 +811,22 @@ def _impute_sipp(
789811
"stock_assets",
790812
"bond_assets",
791813
]
792-
qrf = QRF()
793814
logger.info(
794815
"SIPP assets QRF: %d train, %d test",
795816
len(asset_train),
796817
len(cps_asset_df),
797818
)
798-
fitted = qrf.fit(
819+
fitted = QRF(max_train_samples=20_000).fit(
799820
X_train=asset_train,
800821
predictors=SIPP_ASSETS_PREDICTORS,
801822
imputed_variables=asset_vars,
823+
target_filters=target_observed_source_masks(
824+
asset_train,
825+
targets=asset_vars,
826+
target_source_columns=SIPP_ASSET_TARGET_SOURCE_COLUMNS,
827+
target_allocation_flag_columns=SIPP_ASSET_TARGET_ALLOCATION_COLUMNS,
828+
),
829+
weight_col="household_weight",
802830
)
803831
asset_preds = fitted.predict(X_test=cps_asset_df)
804832

@@ -889,17 +917,6 @@ def _impute_sipp(
889917
logger.info("SIPP SSI disability criteria imputation complete")
890918

891919
vehicle_train = build_vehicle_training_frame()
892-
vehicle_train = vehicle_train.loc[
893-
rng.choice(
894-
vehicle_train.index,
895-
size=min(20_000, len(vehicle_train)),
896-
replace=True,
897-
p=(
898-
vehicle_train.household_weight
899-
/ vehicle_train.household_weight.sum()
900-
),
901-
)
902-
]
903920

904921
cps_vehicle_df = _build_cps_receiver(
905922
data,
@@ -943,19 +960,25 @@ def _impute_sipp(
943960
tenure_type=data.get("tenure_type", {}).get(time_period),
944961
)
945962

946-
qrf = QRF()
947963
logger.info(
948964
"SIPP vehicle QRF: %d train, %d test",
949965
len(vehicle_train),
950966
len(vehicle_receiver),
951967
)
952-
fitted = qrf.fit(
968+
vehicle_vars = [
969+
"household_vehicles_owned",
970+
"household_vehicles_value",
971+
]
972+
fitted = QRF(max_train_samples=20_000).fit(
953973
X_train=vehicle_train,
954974
predictors=VEHICLE_MODEL_PREDICTORS,
955-
imputed_variables=[
956-
"household_vehicles_owned",
957-
"household_vehicles_value",
958-
],
975+
imputed_variables=vehicle_vars,
976+
target_filters=target_observed_source_masks(
977+
vehicle_train,
978+
targets=vehicle_vars,
979+
target_allocation_flag_columns=SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS,
980+
),
981+
weight_col="household_weight",
959982
)
960983
vehicle_preds = fitted.predict(X_test=vehicle_receiver)
961984
data["household_vehicles_owned"] = {

0 commit comments

Comments
 (0)