Skip to content

Commit e1379af

Browse files
committed
Donor-impute CPS clone features
1 parent 14fb1f0 commit e1379af

3 files changed

Lines changed: 439 additions & 15 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Donor-impute race, Hispanic status, sex, and occupation-based CPS features onto the PUF clone half of the extended CPS so subgroup and overtime-eligibility inputs better align with PUF-imputed incomes.

policyengine_us_data/datasets/cps/extended_cps.py

Lines changed: 270 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,78 @@
2222

2323
logger = logging.getLogger(__name__)
2424

25-
2625
def _supports_structural_mortgage_inputs() -> bool:
2726
return has_policyengine_us_variables(*STRUCTURAL_MORTGAGE_VARIABLES)
2827

28+
# CPS-only categorical features to donor-impute onto the PUF clone half.
29+
# These drive subgroup analysis and occupation-based logic, so naive donor
30+
# duplication dilutes the relationship between the clone's PUF-imputed
31+
# income and its CPS-side demographic/occupation labels.
32+
CPS_CLONE_FEATURE_VARIABLES = [
33+
"is_male",
34+
"cps_race",
35+
"is_hispanic",
36+
"detailed_occupation_recode",
37+
]
38+
39+
# Predictors used to rematch CPS features onto the PUF clone half.
40+
# These are all available on the CPS half and on the doubled extended CPS.
41+
CPS_CLONE_FEATURE_PREDICTORS = [
42+
"age",
43+
"state_fips",
44+
"tax_unit_is_joint",
45+
"tax_unit_count_dependents",
46+
"is_tax_unit_head",
47+
"is_tax_unit_spouse",
48+
"is_tax_unit_dependent",
49+
"employment_income",
50+
"self_employment_income",
51+
"social_security",
52+
]
53+
54+
_OVERTIME_OCCUPATION_CODES = {
55+
"has_never_worked": 53,
56+
"is_military": 52,
57+
"is_computer_scientist": 8,
58+
"is_farmer_fisher": 41,
59+
}
60+
_EXECUTIVE_ADMINISTRATIVE_PROFESSIONAL_CODES = np.array(
61+
[
62+
1,
63+
2,
64+
3,
65+
5,
66+
7,
67+
9,
68+
10,
69+
11,
70+
12,
71+
13,
72+
14,
73+
15,
74+
16,
75+
18,
76+
19,
77+
20,
78+
21,
79+
22,
80+
24,
81+
25,
82+
27,
83+
28,
84+
29,
85+
30,
86+
32,
87+
33,
88+
34,
89+
],
90+
dtype=np.int16,
91+
)
2992

3093
# CPS-only variables that should be QRF-imputed for the PUF clone half
31-
# instead of naively duplicated from the CPS donor. These are
32-
# income-correlated variables that exist only in the CPS; demographics,
33-
# IDs, weights, and random seeds are fine to duplicate.
94+
# instead of naively duplicated from the CPS donor. Most demographics,
95+
# IDs, weights, and random seeds are fine to duplicate; the categorical
96+
# clone features above are rematched separately.
3497
CPS_ONLY_IMPUTED_VARIABLES = [
3598
# Retirement distributions
3699
"taxable_401k_distributions",
@@ -109,6 +172,186 @@ def _supports_structural_mortgage_inputs() -> bool:
109172
]
110173

111174

175+
def _clone_half_person_values(data: dict, variable: str, time_period: int):
176+
"""Return clone-half values for ``variable`` mapped to person rows."""
177+
if variable not in data:
178+
return None
179+
180+
values = data[variable][time_period]
181+
n_persons = len(data["person_id"][time_period])
182+
n_persons_half = n_persons // 2
183+
if len(values) == n_persons:
184+
return np.asarray(values[n_persons_half:])
185+
186+
entity_mappings = [
187+
("household_id", "person_household_id"),
188+
("tax_unit_id", "person_tax_unit_id"),
189+
("spm_unit_id", "person_spm_unit_id"),
190+
("family_id", "person_family_id"),
191+
]
192+
for entity_id_var, person_entity_id_var in entity_mappings:
193+
if entity_id_var not in data or person_entity_id_var not in data:
194+
continue
195+
entity_ids = data[entity_id_var][time_period]
196+
if len(values) != len(entity_ids):
197+
continue
198+
entity_half = len(entity_ids) // 2
199+
clone_entity_ids = entity_ids[entity_half:]
200+
clone_person_entity_ids = data[person_entity_id_var][time_period][n_persons_half:]
201+
value_map = dict(zip(clone_entity_ids, values[entity_half:]))
202+
return np.array([value_map[idx] for idx in clone_person_entity_ids])
203+
204+
return None
205+
206+
207+
def _build_clone_test_frame(
208+
cps_sim,
209+
data: dict,
210+
time_period: int,
211+
predictors: list[str],
212+
) -> pd.DataFrame:
213+
"""Build clone-half predictor data with available doubled-dataset overrides."""
214+
X_test = cps_sim.calculate_dataframe(predictors).copy()
215+
for predictor in predictors:
216+
clone_values = _clone_half_person_values(data, predictor, time_period)
217+
if clone_values is not None and len(clone_values) == len(X_test):
218+
X_test[predictor] = clone_values
219+
return X_test[predictors]
220+
221+
222+
def _prepare_knn_matrix(
223+
df: pd.DataFrame,
224+
reference: pd.DataFrame | None = None,
225+
) -> np.ndarray:
226+
"""Normalise mixed-scale donor-matching predictors for kNN."""
227+
X = df.astype(float).copy()
228+
for income_var in CPS_STAGE2_INCOME_PREDICTORS:
229+
if income_var in X:
230+
X[income_var] = np.arcsinh(X[income_var])
231+
232+
ref = X if reference is None else reference.astype(float).copy()
233+
for income_var in CPS_STAGE2_INCOME_PREDICTORS:
234+
if income_var in ref:
235+
ref[income_var] = np.arcsinh(ref[income_var])
236+
237+
means = ref.mean()
238+
stds = ref.std(ddof=0).replace(0, 1)
239+
normalised = (X - means) / stds
240+
return np.nan_to_num(normalised.to_numpy(dtype=np.float32), nan=0.0)
241+
242+
243+
def _derive_overtime_occupation_inputs(
244+
occupation_codes: np.ndarray,
245+
) -> pd.DataFrame:
246+
"""Derive occupation-based overtime-exemption inputs from POCCU2."""
247+
occupation_codes = np.rint(occupation_codes).astype(np.int16, copy=False)
248+
derived = {
249+
name: occupation_codes == code
250+
for name, code in _OVERTIME_OCCUPATION_CODES.items()
251+
}
252+
derived["is_executive_administrative_professional"] = np.isin(
253+
occupation_codes,
254+
_EXECUTIVE_ADMINISTRATIVE_PROFESSIONAL_CODES,
255+
)
256+
return pd.DataFrame(derived)
257+
258+
259+
def _impute_clone_cps_features(
260+
data: dict,
261+
time_period: int,
262+
dataset_path: str,
263+
) -> pd.DataFrame:
264+
"""Rematch CPS demographic/occupation features for the clone half."""
265+
from policyengine_us import Microsimulation
266+
from sklearn.neighbors import NearestNeighbors
267+
268+
cps_sim = Microsimulation(dataset=dataset_path)
269+
X_train = cps_sim.calculate_dataframe(
270+
CPS_CLONE_FEATURE_PREDICTORS + CPS_CLONE_FEATURE_VARIABLES
271+
)
272+
available_outputs = [
273+
variable for variable in CPS_CLONE_FEATURE_VARIABLES if variable in X_train.columns
274+
]
275+
if not available_outputs:
276+
n_half = len(data["person_id"][time_period]) // 2
277+
return pd.DataFrame(index=np.arange(n_half))
278+
279+
X_test = _build_clone_test_frame(
280+
cps_sim,
281+
data,
282+
time_period,
283+
CPS_CLONE_FEATURE_PREDICTORS,
284+
)
285+
del cps_sim
286+
287+
train_roles = (
288+
X_train[["is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent"]]
289+
.round()
290+
.astype(int)
291+
.apply(tuple, axis=1)
292+
)
293+
test_roles = (
294+
X_test[["is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent"]]
295+
.round()
296+
.astype(int)
297+
.apply(tuple, axis=1)
298+
)
299+
300+
predictions = pd.DataFrame(index=X_test.index, columns=available_outputs)
301+
for role in test_roles.unique():
302+
test_mask = test_roles == role
303+
train_mask = train_roles == role
304+
if not train_mask.any():
305+
train_mask = pd.Series(True, index=X_train.index)
306+
307+
train_predictors = X_train.loc[train_mask, CPS_CLONE_FEATURE_PREDICTORS]
308+
test_predictors = X_test.loc[test_mask, CPS_CLONE_FEATURE_PREDICTORS]
309+
train_matrix = _prepare_knn_matrix(train_predictors)
310+
test_matrix = _prepare_knn_matrix(test_predictors, reference=train_predictors)
311+
312+
matcher = NearestNeighbors(n_neighbors=1)
313+
matcher.fit(train_matrix)
314+
donor_indices = matcher.kneighbors(
315+
test_matrix,
316+
return_distance=False,
317+
).ravel()
318+
donor_outputs = (
319+
X_train.loc[train_mask, available_outputs]
320+
.iloc[donor_indices]
321+
.reset_index(drop=True)
322+
)
323+
predictions.loc[test_mask, available_outputs] = donor_outputs.to_numpy()
324+
325+
if "detailed_occupation_recode" in predictions:
326+
occupation_codes = predictions["detailed_occupation_recode"].astype(float).to_numpy()
327+
for column, values in _derive_overtime_occupation_inputs(occupation_codes).items():
328+
predictions[column] = values
329+
330+
return predictions
331+
332+
333+
def _splice_clone_feature_predictions(
334+
data: dict,
335+
predictions: pd.DataFrame,
336+
time_period: int,
337+
) -> dict:
338+
"""Replace clone-half person-level feature variables with donor matches."""
339+
n_half = len(data["person_id"][time_period]) // 2
340+
for variable in predictions.columns:
341+
if variable not in data:
342+
continue
343+
values = data[variable][time_period]
344+
new_values = np.array(values, copy=True)
345+
pred_values = predictions[variable].to_numpy()
346+
if np.issubdtype(new_values.dtype, np.bool_):
347+
pred_values = pred_values.astype(bool, copy=False)
348+
else:
349+
pred_values = pred_values.astype(new_values.dtype, copy=False)
350+
new_values[n_half:] = pred_values
351+
data[variable] = {time_period: new_values}
352+
return data
353+
354+
112355
def _impute_cps_only_variables(
113356
data: dict,
114357
time_period: int,
@@ -172,17 +415,16 @@ def _impute_cps_only_variables(
172415
missing_outputs,
173416
)
174417

175-
# Build PUF clone test data: demographics from CPS sim (PUF clones
176-
# share demographics with their CPS donors), income from the
177-
# PUF-imputed values in the second half of the doubled data.
178-
n_persons_half = len(data["person_id"][time_period]) // 2
179-
X_test = cps_sim.calculate_dataframe(CPS_STAGE2_DEMOGRAPHIC_PREDICTORS)
418+
# Build PUF clone test data from the clone half itself, falling back to
419+
# the CPS sim for formula variables that are not stored in the dataset.
420+
X_test = _build_clone_test_frame(
421+
cps_sim,
422+
data,
423+
time_period,
424+
all_predictors,
425+
)
180426
del cps_sim
181427

182-
for var in CPS_STAGE2_INCOME_PREDICTORS:
183-
# Income comes from PUF imputation in the second half.
184-
X_test[var] = data[var][time_period][n_persons_half:]
185-
186428
logger.info(
187429
"Stage-2 CPS-only imputation: %d outputs, "
188430
"training on %d CPS persons, predicting for %d PUF clones",
@@ -438,11 +680,24 @@ def generate(self):
438680
dataset_path=str(self.cps.file_path),
439681
)
440682

441-
# Stage 2: QRF-impute CPS-only variables for PUF clones.
683+
# Stage 2a: donor-impute CPS feature variables for PUF clones.
684+
logger.info("Stage-2a: rematching CPS features for PUF clones")
685+
clone_feature_predictions = _impute_clone_cps_features(
686+
data=new_data,
687+
time_period=self.time_period,
688+
dataset_path=str(self.cps.file_path),
689+
)
690+
new_data = _splice_clone_feature_predictions(
691+
data=new_data,
692+
predictions=clone_feature_predictions,
693+
time_period=self.time_period,
694+
)
695+
696+
# Stage 2b: QRF-impute CPS-only continuous variables for PUF clones.
442697
# Train on CPS data using demographics + PUF-imputed income
443698
# as predictors, so the PUF clone half gets values consistent
444699
# with its imputed income rather than naive donor duplication.
445-
logger.info("Stage-2: imputing CPS-only variables for PUF clones")
700+
logger.info("Stage-2b: imputing CPS-only variables for PUF clones")
446701
cps_only_predictions = _impute_cps_only_variables(
447702
data=new_data,
448703
time_period=self.time_period,

0 commit comments

Comments
 (0)