Skip to content

Commit 5b296a0

Browse files
authored
Add ORG labor-market imputations to CPS and eCPS (#674)
* Add ORG labor-market imputations to CPS * Format ORG imputation changes * Fix ORG state mapping for CPS build * Update CPS overtime premium target * Serialize SCF dataset generation
1 parent f8224b2 commit 5b296a0

12 files changed

Lines changed: 1009 additions & 15 deletions

File tree

policyengine_us_data/calibration/source_impute.py

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
"""Non-PUF QRF imputations from donor surveys.
22
3-
Re-imputes variables from ACS, SIPP, and SCF donor surveys.
4-
Only ACS includes state_fips as a QRF predictor (ACS has state
5-
identifiers). SIPP and SCF lack state data, so their imputations
6-
use only demographic and financial predictors.
3+
Re-imputes variables from ACS, SIPP, ORG, and SCF donor surveys.
4+
Only ACS and ORG include state_fips as a QRF predictor. SIPP and SCF
5+
lack state data, so their imputations use only demographic and
6+
financial predictors.
77
88
Sources and variables:
99
ACS -> rent, real_estate_taxes (with state predictor)
1010
SIPP -> tip_income, bank_account_assets, stock_assets,
1111
bond_assets (no state predictor)
12+
ORG -> hourly_wage, is_paid_hourly,
13+
is_union_member_or_covered
1214
SCF -> net_worth, auto_loan_balance, auto_loan_interest
1315
(no state predictor)
1416
@@ -27,6 +29,13 @@
2729
import numpy as np
2830
import pandas as pd
2931

32+
from policyengine_us_data.datasets.org import (
33+
ORG_BOOL_VARIABLES,
34+
ORG_IMPUTED_VARIABLES,
35+
build_org_receiver_frame,
36+
predict_org_features,
37+
)
38+
3039
logger = logging.getLogger(__name__)
3140

3241
ACS_IMPUTED_VARIABLES = [
@@ -48,7 +57,10 @@
4857
]
4958

5059
ALL_SOURCE_VARIABLES = (
51-
ACS_IMPUTED_VARIABLES + SIPP_IMPUTED_VARIABLES + SCF_IMPUTED_VARIABLES
60+
ACS_IMPUTED_VARIABLES
61+
+ SIPP_IMPUTED_VARIABLES
62+
+ ORG_IMPUTED_VARIABLES
63+
+ SCF_IMPUTED_VARIABLES
5264
)
5365

5466
ACS_PREDICTORS = [
@@ -118,13 +130,15 @@ def impute_source_variables(
118130
dataset_path: Optional[str] = None,
119131
skip_acs: bool = False,
120132
skip_sipp: bool = False,
133+
skip_org: bool = False,
121134
skip_scf: bool = False,
122135
) -> Dict[str, Dict[int, np.ndarray]]:
123-
"""Re-impute ACS/SIPP/SCF variables from donor surveys.
136+
"""Re-impute ACS/SIPP/ORG/SCF variables from donor surveys.
124137
125138
Overwrites existing imputed values in data. ACS uses
126-
state_fips as a QRF predictor; SIPP and SCF use only
127-
demographic and financial predictors (no state data).
139+
state_fips as a QRF predictor; ORG uses state plus labor-market
140+
predictors; SIPP and SCF use only demographic and financial
141+
predictors (no state data).
128142
129143
Args:
130144
data: CPS dataset dict {variable: {time_period: array}}.
@@ -133,6 +147,7 @@ def impute_source_variables(
133147
dataset_path: Path to CPS h5 for Microsimulation.
134148
skip_acs: Skip ACS imputation.
135149
skip_sipp: Skip SIPP imputation.
150+
skip_org: Skip ORG imputation.
136151
skip_scf: Skip SCF imputation.
137152
138153
Returns:
@@ -150,6 +165,10 @@ def impute_source_variables(
150165
logger.info("Imputing SIPP variables")
151166
data = _impute_sipp(data, state_fips, time_period, dataset_path)
152167

168+
if not skip_org:
169+
logger.info("Imputing ORG variables")
170+
data = _impute_org(data, state_fips, time_period, dataset_path)
171+
153172
if not skip_scf:
154173
logger.info("Imputing SCF variables")
155174
data = _impute_scf(data, state_fips, time_period, dataset_path)
@@ -700,3 +719,59 @@ def _impute_scf(
700719

701720
logger.info("SCF imputation complete: %s", available_vars)
702721
return data
722+
723+
724+
def _impute_org(
725+
data: Dict[str, Dict[int, np.ndarray]],
726+
state_fips: np.ndarray,
727+
time_period: int,
728+
dataset_path: Optional[str] = None,
729+
) -> Dict[str, Dict[int, np.ndarray]]:
730+
"""Impute ORG-only labor-market variables onto CPS persons."""
731+
pe_vars = [
732+
"age",
733+
"is_male",
734+
"is_hispanic",
735+
"cps_race",
736+
"employment_income",
737+
"weekly_hours_worked",
738+
"self_employment_income",
739+
]
740+
cps_df = _build_cps_receiver(data, time_period, dataset_path, pe_vars)
741+
742+
if "is_male" in cps_df.columns:
743+
is_female = (~cps_df["is_male"].astype(bool)).astype(np.float32).values
744+
elif "is_female" in data:
745+
is_female = data["is_female"][time_period].astype(np.float32)
746+
else:
747+
is_female = np.zeros(len(cps_df), dtype=np.float32)
748+
749+
person_states = _person_state_fips(data, state_fips, time_period)
750+
receiver = build_org_receiver_frame(
751+
age=cps_df["age"].values,
752+
is_female=is_female,
753+
is_hispanic=cps_df["is_hispanic"].values,
754+
cps_race=cps_df["cps_race"].values,
755+
state_fips=person_states,
756+
employment_income=cps_df["employment_income"].values,
757+
weekly_hours_worked=cps_df["weekly_hours_worked"].values,
758+
)
759+
self_employment_income = (
760+
cps_df["self_employment_income"].values
761+
if "self_employment_income" in cps_df.columns
762+
else None
763+
)
764+
predictions = predict_org_features(
765+
receiver,
766+
self_employment_income=self_employment_income,
767+
)
768+
769+
for var in ORG_IMPUTED_VARIABLES:
770+
values = predictions[var].values
771+
if var in ORG_BOOL_VARIABLES:
772+
data[var] = {time_period: values.astype(bool)}
773+
else:
774+
data[var] = {time_period: values.astype(np.float32)}
775+
776+
logger.info("ORG imputation complete: %s", ORG_IMPUTED_VARIABLES)
777+
return data

policyengine_us_data/datasets/cps/cps.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818
align_reported_ssi_disability,
1919
prioritize_reported_recipients,
2020
)
21+
from policyengine_us_data.datasets.org import (
22+
ORG_BOOL_VARIABLES,
23+
ORG_IMPUTED_VARIABLES,
24+
build_org_receiver_frame,
25+
predict_org_features,
26+
)
2127
from policyengine_us_data.utils.randomness import seeded_rng
2228

2329

@@ -77,6 +83,8 @@ def generate(self):
7783
add_rent(self, cps, person, household)
7884
logging.info("Adding tips")
7985
add_tips(self, cps)
86+
logging.info("Adding ORG labor-market inputs")
87+
add_org_labor_market_inputs(cps)
8088
logging.info("Adding auto loan balance, interest and wealth")
8189
add_auto_loan_interest_and_net_worth(self, cps)
8290
logging.info("Added all variables")
@@ -1828,6 +1836,54 @@ def add_tips(self, cps: h5py.File):
18281836
self.save_dataset(cps)
18291837

18301838

1839+
def add_org_labor_market_inputs(cps: h5py.File) -> None:
1840+
"""Impute ORG-derived wage and union inputs onto CPS persons."""
1841+
household_ids = np.asarray(cps["household_id"], dtype=np.int64)
1842+
person_household_ids = np.asarray(
1843+
cps["person_household_id"],
1844+
dtype=np.int64,
1845+
)
1846+
household_state_fips = np.asarray(cps["state_fips"], dtype=np.float32)
1847+
household_index = {
1848+
int(household_id): i for i, household_id in enumerate(household_ids)
1849+
}
1850+
person_state_fips = np.array(
1851+
[
1852+
household_state_fips[household_index[int(household_id)]]
1853+
for household_id in person_household_ids
1854+
],
1855+
dtype=np.float32,
1856+
)
1857+
1858+
receiver = build_org_receiver_frame(
1859+
age=cps["age"],
1860+
is_female=cps["is_female"],
1861+
is_hispanic=cps["is_hispanic"],
1862+
cps_race=cps["cps_race"],
1863+
state_fips=person_state_fips,
1864+
employment_income=cps["employment_income"],
1865+
weekly_hours_worked=cps["weekly_hours_worked"],
1866+
)
1867+
self_employment_income = np.asarray(
1868+
cps.get(
1869+
"self_employment_income",
1870+
np.zeros(len(receiver), dtype=np.float32),
1871+
),
1872+
dtype=np.float32,
1873+
)
1874+
predictions = predict_org_features(
1875+
receiver,
1876+
self_employment_income=self_employment_income,
1877+
)
1878+
1879+
for variable in ORG_IMPUTED_VARIABLES:
1880+
values = predictions[variable].values
1881+
if variable in ORG_BOOL_VARIABLES:
1882+
cps[variable] = values.astype(bool)
1883+
else:
1884+
cps[variable] = values.astype(np.float32)
1885+
1886+
18311887
def add_overtime_occupation(cps: h5py.File, person: DataFrame) -> None:
18321888
"""Add occupation categories relevant to overtime eligibility calculations.
18331889
Based on:

policyengine_us_data/datasets/cps/extended_cps.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
from policyengine_core.data import Dataset
88

99
from policyengine_us_data.datasets.cps.cps import CPS, CPS_2024, CPS_2024_Full
10+
from policyengine_us_data.datasets.org import (
11+
ORG_IMPUTED_VARIABLES,
12+
apply_org_domain_constraints,
13+
)
1014
from policyengine_us_data.datasets.puf import PUF, PUF_2024
1115
from policyengine_us_data.storage import STORAGE_FOLDER
1216
from policyengine_us_data.utils.mortgage_interest import (
@@ -85,6 +89,10 @@ def _supports_structural_mortgage_inputs() -> bool:
8589
# Hours/employment
8690
"weekly_hours_worked",
8791
"hours_worked_last_week",
92+
# ORG labor-market variables
93+
"hourly_wage",
94+
"is_paid_hourly",
95+
"is_union_member_or_covered",
8896
# Previous year income
8997
"employment_income_last_year",
9098
"self_employment_income_last_year",
@@ -336,6 +344,28 @@ def _apply_post_processing(predictions, X_test, time_period, data):
336344
for col in ss_cols:
337345
predictions[col] = reconciled[col]
338346

347+
org_cols = [c for c in predictions.columns if c in ORG_IMPUTED_VARIABLES]
348+
if org_cols:
349+
n_half = len(data["person_id"][time_period]) // 2
350+
weekly_hours = (
351+
predictions["weekly_hours_worked"].values
352+
if "weekly_hours_worked" in predictions.columns
353+
else data["weekly_hours_worked"][time_period][n_half:]
354+
)
355+
receiver = pd.DataFrame(
356+
{
357+
"employment_income": X_test["employment_income"].values,
358+
"weekly_hours_worked": np.asarray(weekly_hours, dtype=np.float32),
359+
}
360+
)
361+
constrained = apply_org_domain_constraints(
362+
predictions[org_cols],
363+
receiver,
364+
self_employment_income=X_test["self_employment_income"].values,
365+
)
366+
for col in org_cols:
367+
predictions[col] = constrained[col]
368+
339369
return predictions
340370

341371

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from .org import (
2+
ORG_BOOL_VARIABLES,
3+
ORG_IMPUTED_VARIABLES,
4+
ORG_PREDICTORS,
5+
apply_org_domain_constraints,
6+
build_org_receiver_frame,
7+
get_org_model,
8+
load_org_training_data,
9+
predict_org_features,
10+
)

0 commit comments

Comments
 (0)