Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1123.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve SSI disability imputation by using comparable CPS and SIPP difficulty flags and refreshing CPS-only disability attributes on PUF clones.
2 changes: 2 additions & 0 deletions policyengine_us_data/calibration/create_source_imputed_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def create_source_imputed_cps(
assign_random_geography,
)
from policyengine_us_data.calibration.source_impute import (
drop_source_imputation_construction_variables,
impute_source_variables,
)

Expand Down Expand Up @@ -65,6 +66,7 @@ def create_source_imputed_cps(
time_period=time_period,
dataset_path=input_path,
)
data_dict = drop_source_imputation_construction_variables(data_dict)

logger.info("Saving to %s", output_path)
with h5py.File(output_path, "w") as f:
Expand Down
69 changes: 69 additions & 0 deletions policyengine_us_data/calibration/create_stratified_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from policyengine_us_data.datasets.puf.variable_roles import (
PUF_REPORTED_CALCULATED_TAX_OUTPUT_VARIABLES,
)
from policyengine_us_data.datasets.sipp import SSI_DISABILITY_DIFFICULTY_PREDICTORS
from policyengine_us_data.pipeline_metadata import pipeline_node
from policyengine_us_data.pipeline_schema import PipelineNode

Expand All @@ -37,6 +38,9 @@
]

TOP_AGI_FLOOR = HIGH_AGI_BRACKETS[0][0] # $500k — boundary between top and middle
STRATIFIED_CONSTRUCTION_ONLY_PERSON_VARIABLES = tuple(
SSI_DISABILITY_DIFFICULTY_PREDICTORS
)


def _format_agi(x):
Expand Down Expand Up @@ -65,6 +69,62 @@ def _split_non_top_strata(agi, top_agi_floor):
return non_top_mask, bottom_mask, middle_mask, bottom_25_threshold


def _period_values(raw_data, variable, time_period):
if variable not in raw_data:
return None
value = raw_data[variable]
if isinstance(value, dict):
period_value = value.get(time_period, value.get(str(time_period)))
return None if period_value is None else np.asarray(period_value)
if hasattr(value, "keys") and str(time_period) in value:
return np.asarray(value[str(time_period)])
try:
return np.asarray(value[...])
except TypeError:
return np.asarray(value)


def _construction_only_person_variable_data(
raw_data,
df_filtered,
time_period,
variables=STRATIFIED_CONSTRUCTION_ONLY_PERSON_VARIABLES,
):
person_id_column = f"person_id__{time_period}"
if person_id_column not in df_filtered:
return {}

person_ids = _period_values(raw_data, "person_id", time_period)
if person_ids is None:
return {}

selected_person_ids = df_filtered[person_id_column].to_numpy()
row_by_person_id = {
person_id: row for row, person_id in enumerate(np.asarray(person_ids))
}
try:
selected_rows = np.asarray(
[row_by_person_id[person_id] for person_id in selected_person_ids],
dtype=int,
)
except KeyError as error:
raise ValueError(
f"Selected person_id {error.args[0]} is missing from source data"
) from error

data = {}
for variable in variables:
values = _period_values(raw_data, variable, time_period)
if values is None:
continue
if len(values) != len(person_ids):
raise ValueError(
f"{variable} has {len(values)} rows, expected {len(person_ids)}"
)
data[variable] = {time_period: np.asarray(values)[selected_rows]}
return data


@pipeline_node(
PipelineNode(
id="create_stratified",
Expand Down Expand Up @@ -333,6 +393,15 @@ def create_stratified_cps_dataset(
if len(data[variable]) == 0:
del data[variable]

raw_data = sim.dataset.load_dataset()
data.update(
_construction_only_person_variable_data(
raw_data,
df_filtered,
time_period,
)
)

# Write to h5
with h5py.File(output_path, "w") as f:
for variable, periods in data.items():
Expand Down
28 changes: 22 additions & 6 deletions policyengine_us_data/calibration/source_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
SIPP_TIP_AMOUNT_COLUMNS,
SIPP_TIP_AMOUNT_TO_ALLOCATION_COLUMN,
SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS,
SSI_DISABILITY_MODEL_VARIABLE,
SSI_DISABILITY_CRITERIA_VARIABLE,
SSI_DISABILITY_DIFFICULTY_PREDICTORS,
SSI_DISABILITY_EXPORT_VARIABLES,
VEHICLE_MODEL_PREDICTORS,
build_vehicle_training_frame,
get_ssi_disability_model,
Expand Down Expand Up @@ -103,7 +105,7 @@
"bank_account_assets",
"stock_assets",
"bond_assets",
SSI_DISABILITY_MODEL_VARIABLE,
*SSI_DISABILITY_EXPORT_VARIABLES,
"household_vehicles_owned",
"household_vehicles_value",
]
Expand All @@ -126,6 +128,20 @@
+ SCF_IMPUTED_VARIABLES
)

SOURCE_IMPUTATION_CONSTRUCTION_ONLY_VARIABLES = tuple(
SSI_DISABILITY_DIFFICULTY_PREDICTORS
)


def drop_source_imputation_construction_variables(
data: Dict[str, Dict[int, np.ndarray]],
) -> Dict[str, Dict[int, np.ndarray]]:
"""Drop predictors needed during source imputation but not final exports."""
for variable in SOURCE_IMPUTATION_CONSTRUCTION_ONLY_VARIABLES:
data.pop(variable, None)
return data


ACS_PREDICTORS = [
"is_household_head",
"age",
Expand Down Expand Up @@ -902,7 +918,7 @@ def _impute_sipp(
"rental_income",
"age",
"is_male",
"is_disabled",
*SSI_DISABILITY_DIFFICULTY_PREDICTORS,
"social_security_disability",
"disability_benefits",
],
Expand Down Expand Up @@ -930,7 +946,7 @@ def _impute_sipp(
"interest_income",
"dividend_income",
"rental_income",
"is_disabled",
*SSI_DISABILITY_DIFFICULTY_PREDICTORS,
"social_security_disability",
]:
if var not in cps_ssi_df.columns:
Expand All @@ -953,7 +969,7 @@ def _impute_sipp(
cps_ssi_df,
)
existing_meets_ssi_disability_criteria = data.get(
SSI_DISABILITY_MODEL_VARIABLE, {}
SSI_DISABILITY_CRITERIA_VARIABLE, {}
).get(time_period)
ssi_reported = data.get("ssi_reported", {}).get(time_period)
meets_ssi_disability_criteria = preserve_under_65_ssi_disability_criteria(
Expand All @@ -962,7 +978,7 @@ def _impute_sipp(
ssi_reported=ssi_reported,
existing_meets_ssi_disability_criteria=existing_meets_ssi_disability_criteria,
)
data[SSI_DISABILITY_MODEL_VARIABLE] = {
data[SSI_DISABILITY_CRITERIA_VARIABLE] = {
time_period: meets_ssi_disability_criteria
}

Expand Down
5 changes: 2 additions & 3 deletions policyengine_us_data/calibration/unified_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
build_checkpoint_signature,
checkpoint_signature_mismatches,
)
from policyengine_us_data.calibration.calibration_utils import (
create_target_groups,
)
from policyengine_us_data.calibration_package.specs import (
DEFAULT_TARGET_CONFIG_PATH as DEFAULT_TARGET_CONFIG_RELATIVE_PATH,
TargetConfigIdentity,
Expand Down Expand Up @@ -1601,6 +1598,7 @@ def run_calibration(
data_dict[var] = {time_period: val[...]}

from policyengine_us_data.calibration.source_impute import (
drop_source_imputation_construction_variables,
impute_source_variables,
)

Expand All @@ -1610,6 +1608,7 @@ def run_calibration(
time_period=time_period,
dataset_path=dataset_path,
)
data_dict = drop_source_imputation_construction_variables(data_dict)

source_path = str(
Path(dataset_path).parent / f"source_imputed_{Path(dataset_path).stem}.h5"
Expand Down
25 changes: 19 additions & 6 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@
),
}

CPS_SSI_DISABILITY_DIFFICULTY_COLUMNS = {
"difficulty_dressing_or_bathing": "PEDISDRS",
"difficulty_hearing": "PEDISEAR",
"difficulty_seeing": "PEDISEYE",
"difficulty_doing_errands": "PEDISOUT",
"difficulty_walking_or_climbing_stairs": "PEDISPHY",
"difficulty_remembering_or_making_decisions": "PEDISREM",
}

# Census CPS ASEC 2024 technical documentation, PERRP:
# https://www2.census.gov/programs-surveys/cps/techdocs/cpsmar24.pdf
PERRP_UNMARRIED_PARTNER_OF_HOUSEHOLD_HEAD_CODES = {
Expand Down Expand Up @@ -1076,8 +1085,11 @@ def add_personal_variables(cps: h5py.File, person: DataFrame) -> None:
# "Is...blind or does...have serious difficulty seeing even when Wearing
# glasses?" 1 -> Yes
cps["is_blind"] = person.PEDISEYE == 1
DISABILITY_FLAGS = ["PEDIS" + i for i in ["DRS", "EAR", "EYE", "OUT", "PHY", "REM"]]
cps["is_disabled"] = (person[DISABILITY_FLAGS] == 1).any(axis=1)
for variable, cps_column in CPS_SSI_DISABILITY_DIFFICULTY_COLUMNS.items():
cps[variable] = person[cps_column] == 1
cps["is_disabled"] = np.column_stack(
[cps[variable] for variable in CPS_SSI_DISABILITY_DIFFICULTY_COLUMNS]
).any(axis=1)

def children_per_parent(col: str) -> pd.DataFrame:
"""Calculate number of children in the household using parental
Expand Down Expand Up @@ -2719,15 +2731,16 @@ def add_tips(self, cps: h5py.File):
cps["bond_assets"] = asset_predictions.bond_assets.values

from policyengine_us_data.datasets.sipp import (
SSI_DISABILITY_MODEL_VARIABLE,
SSI_DISABILITY_CRITERIA_VARIABLE,
SSI_DISABILITY_DIFFICULTY_PREDICTORS,
get_ssi_disability_model,
predict_ssi_disability_criteria,
preserve_under_65_ssi_disability_criteria,
)

n_persons = len(cps)
for variable in [
"is_disabled",
*SSI_DISABILITY_DIFFICULTY_PREDICTORS,
"social_security_disability",
]:
cps[variable] = np.asarray(
Expand All @@ -2747,10 +2760,10 @@ def add_tips(self, cps: h5py.File):
age=existing_data.get("age", np.full(n_persons, 65)),
ssi_reported=existing_data.get("ssi_reported"),
existing_meets_ssi_disability_criteria=existing_data.get(
SSI_DISABILITY_MODEL_VARIABLE
SSI_DISABILITY_CRITERIA_VARIABLE
),
)
cps[SSI_DISABILITY_MODEL_VARIABLE] = meets_ssi_disability_criteria
cps[SSI_DISABILITY_CRITERIA_VARIABLE] = meets_ssi_disability_criteria

from policyengine_us_data.datasets.sipp import get_vehicle_model

Expand Down
Loading
Loading