Skip to content

Commit 7a6e3bb

Browse files
committed
Donor-impute CPS clone features
1 parent 52f1e4f commit 7a6e3bb

3 files changed

Lines changed: 442 additions & 16 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Donor-impute race, Hispanic status, sex, and occupation-based CPS features onto the PUF clone half of the extended CPS so subgroup and overtime-eligibility inputs better align with PUF-imputed incomes.

policyengine_us_data/datasets/cps/extended_cps.py

Lines changed: 273 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import pandas as pd
77
from policyengine_core.data import Dataset
88

9-
from policyengine_us_data.datasets.cps.cps import * # noqa: F403
10-
from policyengine_us_data.datasets.puf import * # noqa: F403
9+
from policyengine_us_data.datasets.cps.cps import CPS, CPS_2024, CPS_2024_Full
10+
from policyengine_us_data.datasets.puf import PUF, PUF_2024
1111
from policyengine_us_data.storage import STORAGE_FOLDER
1212
from policyengine_us_data.utils.retirement_limits import (
1313
get_retirement_limits,
@@ -16,10 +16,75 @@
1616

1717
logger = logging.getLogger(__name__)
1818

19+
# CPS-only categorical features to donor-impute onto the PUF clone half.
20+
# These drive subgroup analysis and occupation-based logic, so naive donor
21+
# duplication dilutes the relationship between the clone's PUF-imputed
22+
# income and its CPS-side demographic/occupation labels.
23+
CPS_CLONE_FEATURE_VARIABLES = [
24+
"is_male",
25+
"cps_race",
26+
"is_hispanic",
27+
"detailed_occupation_recode",
28+
]
29+
30+
# Predictors used to rematch CPS features onto the PUF clone half.
31+
# These are all available on the CPS half and on the doubled extended CPS.
32+
CPS_CLONE_FEATURE_PREDICTORS = [
33+
"age",
34+
"state_fips",
35+
"tax_unit_is_joint",
36+
"tax_unit_count_dependents",
37+
"is_tax_unit_head",
38+
"is_tax_unit_spouse",
39+
"is_tax_unit_dependent",
40+
"employment_income",
41+
"self_employment_income",
42+
"social_security",
43+
]
44+
45+
_OVERTIME_OCCUPATION_CODES = {
46+
"has_never_worked": 53,
47+
"is_military": 52,
48+
"is_computer_scientist": 8,
49+
"is_farmer_fisher": 41,
50+
}
51+
_EXECUTIVE_ADMINISTRATIVE_PROFESSIONAL_CODES = np.array(
52+
[
53+
1,
54+
2,
55+
3,
56+
5,
57+
7,
58+
9,
59+
10,
60+
11,
61+
12,
62+
13,
63+
14,
64+
15,
65+
16,
66+
18,
67+
19,
68+
20,
69+
21,
70+
22,
71+
24,
72+
25,
73+
27,
74+
28,
75+
29,
76+
30,
77+
32,
78+
33,
79+
34,
80+
],
81+
dtype=np.int16,
82+
)
83+
1984
# CPS-only variables that should be QRF-imputed for the PUF clone half
20-
# instead of naively duplicated from the CPS donor. These are
21-
# income-correlated variables that exist only in the CPS; demographics,
22-
# IDs, weights, and random seeds are fine to duplicate.
85+
# instead of naively duplicated from the CPS donor. Most demographics,
86+
# IDs, weights, and random seeds are fine to duplicate; the categorical
87+
# clone features above are rematched separately.
2388
CPS_ONLY_IMPUTED_VARIABLES = [
2489
# Retirement distributions
2590
"taxable_401k_distributions",
@@ -98,6 +163,186 @@
98163
]
99164

100165

166+
def _clone_half_person_values(data: dict, variable: str, time_period: int):
167+
"""Return clone-half values for ``variable`` mapped to person rows."""
168+
if variable not in data:
169+
return None
170+
171+
values = data[variable][time_period]
172+
n_persons = len(data["person_id"][time_period])
173+
n_persons_half = n_persons // 2
174+
if len(values) == n_persons:
175+
return np.asarray(values[n_persons_half:])
176+
177+
entity_mappings = [
178+
("household_id", "person_household_id"),
179+
("tax_unit_id", "person_tax_unit_id"),
180+
("spm_unit_id", "person_spm_unit_id"),
181+
("family_id", "person_family_id"),
182+
]
183+
for entity_id_var, person_entity_id_var in entity_mappings:
184+
if entity_id_var not in data or person_entity_id_var not in data:
185+
continue
186+
entity_ids = data[entity_id_var][time_period]
187+
if len(values) != len(entity_ids):
188+
continue
189+
entity_half = len(entity_ids) // 2
190+
clone_entity_ids = entity_ids[entity_half:]
191+
clone_person_entity_ids = data[person_entity_id_var][time_period][n_persons_half:]
192+
value_map = dict(zip(clone_entity_ids, values[entity_half:]))
193+
return np.array([value_map[idx] for idx in clone_person_entity_ids])
194+
195+
return None
196+
197+
198+
def _build_clone_test_frame(
199+
cps_sim,
200+
data: dict,
201+
time_period: int,
202+
predictors: list[str],
203+
) -> pd.DataFrame:
204+
"""Build clone-half predictor data with available doubled-dataset overrides."""
205+
X_test = cps_sim.calculate_dataframe(predictors).copy()
206+
for predictor in predictors:
207+
clone_values = _clone_half_person_values(data, predictor, time_period)
208+
if clone_values is not None and len(clone_values) == len(X_test):
209+
X_test[predictor] = clone_values
210+
return X_test[predictors]
211+
212+
213+
def _prepare_knn_matrix(
214+
df: pd.DataFrame,
215+
reference: pd.DataFrame | None = None,
216+
) -> np.ndarray:
217+
"""Normalise mixed-scale donor-matching predictors for kNN."""
218+
X = df.astype(float).copy()
219+
for income_var in CPS_STAGE2_INCOME_PREDICTORS:
220+
if income_var in X:
221+
X[income_var] = np.arcsinh(X[income_var])
222+
223+
ref = X if reference is None else reference.astype(float).copy()
224+
for income_var in CPS_STAGE2_INCOME_PREDICTORS:
225+
if income_var in ref:
226+
ref[income_var] = np.arcsinh(ref[income_var])
227+
228+
means = ref.mean()
229+
stds = ref.std(ddof=0).replace(0, 1)
230+
normalised = (X - means) / stds
231+
return np.nan_to_num(normalised.to_numpy(dtype=np.float32), nan=0.0)
232+
233+
234+
def _derive_overtime_occupation_inputs(
235+
occupation_codes: np.ndarray,
236+
) -> pd.DataFrame:
237+
"""Derive occupation-based overtime-exemption inputs from POCCU2."""
238+
occupation_codes = np.rint(occupation_codes).astype(np.int16, copy=False)
239+
derived = {
240+
name: occupation_codes == code
241+
for name, code in _OVERTIME_OCCUPATION_CODES.items()
242+
}
243+
derived["is_executive_administrative_professional"] = np.isin(
244+
occupation_codes,
245+
_EXECUTIVE_ADMINISTRATIVE_PROFESSIONAL_CODES,
246+
)
247+
return pd.DataFrame(derived)
248+
249+
250+
def _impute_clone_cps_features(
251+
data: dict,
252+
time_period: int,
253+
dataset_path: str,
254+
) -> pd.DataFrame:
255+
"""Rematch CPS demographic/occupation features for the clone half."""
256+
from policyengine_us import Microsimulation
257+
from sklearn.neighbors import NearestNeighbors
258+
259+
cps_sim = Microsimulation(dataset=dataset_path)
260+
X_train = cps_sim.calculate_dataframe(
261+
CPS_CLONE_FEATURE_PREDICTORS + CPS_CLONE_FEATURE_VARIABLES
262+
)
263+
available_outputs = [
264+
variable for variable in CPS_CLONE_FEATURE_VARIABLES if variable in X_train.columns
265+
]
266+
if not available_outputs:
267+
n_half = len(data["person_id"][time_period]) // 2
268+
return pd.DataFrame(index=np.arange(n_half))
269+
270+
X_test = _build_clone_test_frame(
271+
cps_sim,
272+
data,
273+
time_period,
274+
CPS_CLONE_FEATURE_PREDICTORS,
275+
)
276+
del cps_sim
277+
278+
train_roles = (
279+
X_train[["is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent"]]
280+
.round()
281+
.astype(int)
282+
.apply(tuple, axis=1)
283+
)
284+
test_roles = (
285+
X_test[["is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent"]]
286+
.round()
287+
.astype(int)
288+
.apply(tuple, axis=1)
289+
)
290+
291+
predictions = pd.DataFrame(index=X_test.index, columns=available_outputs)
292+
for role in test_roles.unique():
293+
test_mask = test_roles == role
294+
train_mask = train_roles == role
295+
if not train_mask.any():
296+
train_mask = pd.Series(True, index=X_train.index)
297+
298+
train_predictors = X_train.loc[train_mask, CPS_CLONE_FEATURE_PREDICTORS]
299+
test_predictors = X_test.loc[test_mask, CPS_CLONE_FEATURE_PREDICTORS]
300+
train_matrix = _prepare_knn_matrix(train_predictors)
301+
test_matrix = _prepare_knn_matrix(test_predictors, reference=train_predictors)
302+
303+
matcher = NearestNeighbors(n_neighbors=1)
304+
matcher.fit(train_matrix)
305+
donor_indices = matcher.kneighbors(
306+
test_matrix,
307+
return_distance=False,
308+
).ravel()
309+
donor_outputs = (
310+
X_train.loc[train_mask, available_outputs]
311+
.iloc[donor_indices]
312+
.reset_index(drop=True)
313+
)
314+
predictions.loc[test_mask, available_outputs] = donor_outputs.to_numpy()
315+
316+
if "detailed_occupation_recode" in predictions:
317+
occupation_codes = predictions["detailed_occupation_recode"].astype(float).to_numpy()
318+
for column, values in _derive_overtime_occupation_inputs(occupation_codes).items():
319+
predictions[column] = values
320+
321+
return predictions
322+
323+
324+
def _splice_clone_feature_predictions(
325+
data: dict,
326+
predictions: pd.DataFrame,
327+
time_period: int,
328+
) -> dict:
329+
"""Replace clone-half person-level feature variables with donor matches."""
330+
n_half = len(data["person_id"][time_period]) // 2
331+
for variable in predictions.columns:
332+
if variable not in data:
333+
continue
334+
values = data[variable][time_period]
335+
new_values = np.array(values, copy=True)
336+
pred_values = predictions[variable].to_numpy()
337+
if np.issubdtype(new_values.dtype, np.bool_):
338+
pred_values = pred_values.astype(bool, copy=False)
339+
else:
340+
pred_values = pred_values.astype(new_values.dtype, copy=False)
341+
new_values[n_half:] = pred_values
342+
data[variable] = {time_period: new_values}
343+
return data
344+
345+
101346
def _impute_cps_only_variables(
102347
data: dict,
103348
time_period: int,
@@ -161,17 +406,16 @@ def _impute_cps_only_variables(
161406
missing_outputs,
162407
)
163408

164-
# Build PUF clone test data: demographics from CPS sim (PUF clones
165-
# share demographics with their CPS donors), income from the
166-
# PUF-imputed values in the second half of the doubled data.
167-
n_persons_half = len(data["person_id"][time_period]) // 2
168-
X_test = cps_sim.calculate_dataframe(CPS_STAGE2_DEMOGRAPHIC_PREDICTORS)
409+
# Build PUF clone test data from the clone half itself, falling back to
410+
# the CPS sim for formula variables that are not stored in the dataset.
411+
X_test = _build_clone_test_frame(
412+
cps_sim,
413+
data,
414+
time_period,
415+
all_predictors,
416+
)
169417
del cps_sim
170418

171-
for var in CPS_STAGE2_INCOME_PREDICTORS:
172-
# Income comes from PUF imputation in the second half.
173-
X_test[var] = data[var][time_period][n_persons_half:]
174-
175419
logger.info(
176420
"Stage-2 CPS-only imputation: %d outputs, "
177421
"training on %d CPS persons, predicting for %d PUF clones",
@@ -427,11 +671,24 @@ def generate(self):
427671
dataset_path=str(self.cps.file_path),
428672
)
429673

430-
# Stage 2: QRF-impute CPS-only variables for PUF clones.
674+
# Stage 2a: donor-impute CPS feature variables for PUF clones.
675+
logger.info("Stage-2a: rematching CPS features for PUF clones")
676+
clone_feature_predictions = _impute_clone_cps_features(
677+
data=new_data,
678+
time_period=self.time_period,
679+
dataset_path=str(self.cps.file_path),
680+
)
681+
new_data = _splice_clone_feature_predictions(
682+
data=new_data,
683+
predictions=clone_feature_predictions,
684+
time_period=self.time_period,
685+
)
686+
687+
# Stage 2b: QRF-impute CPS-only continuous variables for PUF clones.
431688
# Train on CPS data using demographics + PUF-imputed income
432689
# as predictors, so the PUF clone half gets values consistent
433690
# with its imputed income rather than naive donor duplication.
434-
logger.info("Stage-2: imputing CPS-only variables for PUF clones")
691+
logger.info("Stage-2b: imputing CPS-only variables for PUF clones")
435692
cps_only_predictions = _impute_cps_only_variables(
436693
data=new_data,
437694
time_period=self.time_period,

0 commit comments

Comments
 (0)