Skip to content

Commit 119ecb1

Browse files
committed
Filter invalid SIPP imputation weights
1 parent e6fdcaf commit 119ecb1

6 files changed

Lines changed: 189 additions & 1 deletion

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Filter non-positive SIPP donor weights before fitting source imputation models.

policyengine_us_data/calibration/source_impute.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
from policyengine_us_data.pipeline_schema import PipelineNode
8383
from policyengine_us_data.utils.source_quality import (
8484
cap_training_sample,
85+
filter_positive_finite_weight_rows,
8586
require_columns_present,
8687
target_observed_source_masks,
8788
)
@@ -710,6 +711,12 @@ def _impute_sipp(
710711
"household_weight",
711712
]
712713
tip_train = sipp_df[tip_cols].dropna()
714+
tip_train, tip_target_filters = filter_positive_finite_weight_rows(
715+
tip_train,
716+
weight_col="household_weight",
717+
target_filters=tip_target_filters,
718+
context_name="SIPP source tip donor",
719+
)
713720
tip_train, tip_target_filters = cap_training_sample(
714721
tip_train,
715722
max_train_samples=10_000,
@@ -849,6 +856,12 @@ def _impute_sipp(
849856
target_source_columns=SIPP_ASSET_TARGET_SOURCE_COLUMNS,
850857
target_allocation_flag_columns=SIPP_ASSET_TARGET_ALLOCATION_COLUMNS,
851858
)
859+
asset_train, asset_target_filters = filter_positive_finite_weight_rows(
860+
asset_train,
861+
weight_col="household_weight",
862+
target_filters=asset_target_filters,
863+
context_name="SIPP source asset donor",
864+
)
852865
asset_train, asset_target_filters = cap_training_sample(
853866
asset_train,
854867
max_train_samples=20_000,
@@ -1013,6 +1026,12 @@ def _impute_sipp(
10131026
targets=vehicle_vars,
10141027
target_allocation_flag_columns=SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS,
10151028
)
1029+
vehicle_train, vehicle_target_filters = filter_positive_finite_weight_rows(
1030+
vehicle_train,
1031+
weight_col="household_weight",
1032+
target_filters=vehicle_target_filters,
1033+
context_name="SIPP source vehicle donor",
1034+
)
10161035
vehicle_train, vehicle_target_filters = cap_training_sample(
10171036
vehicle_train,
10181037
max_train_samples=20_000,

policyengine_us_data/datasets/sipp/sipp.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
)
1212
from policyengine_us_data.utils.source_quality import (
1313
cap_training_sample,
14+
filter_positive_finite_weight_rows,
1415
filter_observed_source_rows,
1516
require_columns_present,
1617
sipp_allocation_flag_for,
@@ -188,6 +189,12 @@ def train_tip_model():
188189
]
189190

190191
sipp = sipp[~sipp.isna().any(axis=1)]
192+
sipp, tip_target_filters = filter_positive_finite_weight_rows(
193+
sipp,
194+
weight_col="household_weight",
195+
target_filters=tip_target_filters,
196+
context_name="SIPP tip donor",
197+
)
191198
sipp, tip_target_filters = cap_training_sample(
192199
sipp,
193200
max_train_samples=10_000,
@@ -652,6 +659,12 @@ def train_asset_model():
652659
target_source_columns=SIPP_ASSET_TARGET_SOURCE_COLUMNS,
653660
target_allocation_flag_columns=SIPP_ASSET_TARGET_ALLOCATION_COLUMNS,
654661
)
662+
sipp, asset_target_filters = filter_positive_finite_weight_rows(
663+
sipp,
664+
weight_col="household_weight",
665+
target_filters=asset_target_filters,
666+
context_name="SIPP asset donor",
667+
)
655668
sipp, asset_target_filters = cap_training_sample(
656669
sipp,
657670
max_train_samples=20_000,
@@ -839,6 +852,12 @@ def train_vehicle_model():
839852
targets=vehicle_vars,
840853
target_allocation_flag_columns=SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS,
841854
)
855+
sipp, vehicle_target_filters = filter_positive_finite_weight_rows(
856+
sipp,
857+
weight_col="household_weight",
858+
target_filters=vehicle_target_filters,
859+
context_name="SIPP vehicle donor",
860+
)
842861
sipp, vehicle_target_filters = cap_training_sample(
843862
sipp,
844863
max_train_samples=20_000,

policyengine_us_data/utils/source_quality.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,3 +225,52 @@ def cap_training_sample(
225225
for target, mask in filters.items()
226226
}
227227
return sampled_df, sampled_filters
228+
229+
230+
def filter_positive_finite_weight_rows(
231+
df: pd.DataFrame,
232+
*,
233+
weight_col: str,
234+
target_filters: Mapping[str, pd.Series] | None = None,
235+
context_name: str = "donor training frame",
236+
) -> tuple[pd.DataFrame, dict[str, pd.Series]]:
237+
"""Drop rows whose fit weight cannot be passed to microimpute."""
238+
if weight_col not in df:
239+
raise KeyError(f"{context_name} is missing weight column {weight_col!r}")
240+
241+
filters = {}
242+
for target, mask in (target_filters or {}).items():
243+
aligned = mask.reindex(df.index)
244+
if aligned.isna().any():
245+
raise ValueError(f"target_filters[{target!r}] contains missing values")
246+
filters[target] = aligned.astype(bool)
247+
248+
weights = pd.to_numeric(df[weight_col], errors="coerce")
249+
valid_weight = np.isfinite(weights) & weights.gt(0)
250+
dropped = int((~valid_weight).sum())
251+
if dropped:
252+
logger.info(
253+
"Dropped %d/%d %s rows with non-positive or non-finite %s",
254+
dropped,
255+
len(df),
256+
context_name,
257+
weight_col,
258+
)
259+
260+
filtered_df = df.loc[valid_weight].copy().reset_index(drop=True)
261+
filtered_filters = {
262+
target: pd.Series(
263+
mask.loc[valid_weight].to_numpy(dtype=bool),
264+
index=filtered_df.index,
265+
)
266+
for target, mask in filters.items()
267+
}
268+
269+
for target, mask in filtered_filters.items():
270+
if not mask.any():
271+
raise ValueError(
272+
f"No observed donor rows with positive finite {weight_col} "
273+
f"available for {target}"
274+
)
275+
276+
return filtered_df, filtered_filters

tests/unit/datasets/test_sipp_tip_columns.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@
55
to explicit `TJB*_TXAMT` dollar-amount columns only.
66
"""
77

8+
import numpy as np
89
import pandas as pd
910
import pytest
1011

11-
from policyengine_us_data.datasets.sipp.sipp import SIPP_TIP_AMOUNT_COLUMNS
12+
from policyengine_us_data.datasets.sipp.sipp import (
13+
SIPP_JOB_OCCUPATION_COLUMNS,
14+
SIPP_TIP_AMOUNT_COLUMNS,
15+
)
1216
import policyengine_us_data.datasets.sipp.sipp as sipp_module
1317

1418

@@ -66,3 +70,47 @@ def test_train_tip_model_requires_allocation_flags_for_present_tip_columns(
6670

6771
with pytest.raises(KeyError, match="AJB1_TXAMT"):
6872
sipp_module.train_tip_model()
73+
74+
75+
def test_train_tip_model_drops_non_positive_weights(monkeypatch):
76+
monkeypatch.setattr(sipp_module, "hf_hub_download", lambda *args, **kwargs: None)
77+
78+
data = {
79+
"SSUID": [1, 2, 3, 4],
80+
"MONTHCODE": [12, 12, 12, 12],
81+
"TAGE": [30, 31, 32, 33],
82+
"WPFINWGT": [100.0, 0.0, -5.0, 200.0],
83+
"TPTOTINC": [1_000.0, 2_000.0, 3_000.0, 4_000.0],
84+
"TJB1_TXAMT": [10.0, 20.0, 30.0, 40.0],
85+
"AJB1_TXAMT": [0, 0, 0, 0],
86+
}
87+
for column in SIPP_JOB_OCCUPATION_COLUMNS:
88+
data[column] = [0, 0, 0, 0]
89+
monkeypatch.setattr(
90+
sipp_module.pd,
91+
"read_csv",
92+
lambda *args, **kwargs: pd.DataFrame(data),
93+
)
94+
95+
captured = {}
96+
97+
class FakeQRF:
98+
def fit(
99+
self,
100+
*,
101+
X_train,
102+
predictors,
103+
imputed_variables,
104+
target_filters,
105+
weight_col,
106+
):
107+
captured["weights"] = X_train[weight_col].to_numpy()
108+
captured["target_filter"] = target_filters["tip_income"].to_numpy()
109+
return self
110+
111+
monkeypatch.setattr(sipp_module, "QRF", FakeQRF)
112+
113+
sipp_module.train_tip_model()
114+
115+
np.testing.assert_array_equal(captured["weights"], [100.0, 200.0])
116+
np.testing.assert_array_equal(captured["target_filter"], [True, True])

tests/unit/test_source_quality.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from policyengine_us_data.utils.source_quality import (
55
cap_training_sample,
6+
filter_positive_finite_weight_rows,
67
observed_source_mask,
78
require_columns_present,
89
sipp_allocation_flag_for,
@@ -201,3 +202,54 @@ def test_cap_training_sample_rejects_misaligned_filters():
201202
raise AssertionError("Expected misaligned target filters to fail")
202203

203204
assert "target_filters['value']" in message
205+
206+
207+
def test_filter_positive_finite_weight_rows_reindexes_target_filters():
208+
df = pd.DataFrame(
209+
{
210+
"value": [10, 20, 30, 40, 50],
211+
"household_weight": [1.0, 0.0, np.nan, np.inf, 5.0],
212+
},
213+
index=[10, 11, 12, 13, 14],
214+
)
215+
filters = {
216+
"value": pd.Series(
217+
[True, True, False, True, True],
218+
index=df.index,
219+
)
220+
}
221+
222+
filtered, filtered_filters = filter_positive_finite_weight_rows(
223+
df,
224+
weight_col="household_weight",
225+
target_filters=filters,
226+
context_name="unit-test donor",
227+
)
228+
229+
assert filtered["value"].tolist() == [10, 50]
230+
assert filtered.index.tolist() == [0, 1]
231+
np.testing.assert_array_equal(filtered_filters["value"].values, [True, True])
232+
assert filtered_filters["value"].index.tolist() == [0, 1]
233+
234+
235+
def test_filter_positive_finite_weight_rows_requires_observed_target_rows():
236+
df = pd.DataFrame(
237+
{
238+
"value": [10, 20],
239+
"household_weight": [0.0, 1.0],
240+
}
241+
)
242+
filters = {"value": pd.Series([True, False], index=df.index)}
243+
244+
try:
245+
filter_positive_finite_weight_rows(
246+
df,
247+
weight_col="household_weight",
248+
target_filters=filters,
249+
)
250+
except ValueError as error:
251+
message = str(error)
252+
else:
253+
raise AssertionError("Expected all invalid observed weights to fail")
254+
255+
assert "No observed donor rows with positive finite household_weight" in message

0 commit comments

Comments
 (0)