Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1096.changed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved SSI-sensitive liquid asset imputation predictors and added SSA SSI recipient calibration targets.
253 changes: 178 additions & 75 deletions policyengine_us_data/calibration/source_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
derive_is_tipped_occupation,
)
from policyengine_us_data.datasets.sipp.sipp import (
ASSET_JOB_EARNINGS_COLUMNS,
ASSET_PREDICTORS,
VEHICLE_MODEL_PREDICTORS,
build_vehicle_training_frame,
)
Expand Down Expand Up @@ -121,16 +123,7 @@
"is_tipped_occupation",
]

SIPP_ASSETS_PREDICTORS = [
"employment_income",
"interest_income",
"dividend_income",
"rental_income",
"age",
"is_female",
"is_married",
"count_under_18",
]
SIPP_ASSETS_PREDICTORS = ASSET_PREDICTORS

SCF_PREDICTORS = [
"age",
Expand Down Expand Up @@ -316,6 +309,164 @@ def _person_state_fips(
return np.repeat(state_fips, counts)


def _person_is_married(
data: Dict[str, Dict[int, np.ndarray]],
time_period: int,
n_persons: int,
) -> np.ndarray:
"""Return a person-level married flag from CPS-compatible inputs."""
if "is_married" in data and time_period in data["is_married"]:
values = np.asarray(data["is_married"][time_period])
if len(values) == n_persons:
return values.astype(np.float32)

marital_unit_id = data.get("person_marital_unit_id", {}).get(time_period)
if marital_unit_id is not None and len(marital_unit_id) == n_persons:
marital_unit_id = np.asarray(marital_unit_id)
counts = pd.Series(marital_unit_id).map(
pd.Series(marital_unit_id).value_counts()
)
return (counts.to_numpy() > 1).astype(np.float32)

return np.zeros(n_persons, dtype=np.float32)


def _add_person_household_counts(
df: pd.DataFrame,
data: Dict[str, Dict[int, np.ndarray]],
time_period: int,
) -> pd.DataFrame:
"""Add household composition predictors to a person-level CPS frame."""
if "age" not in df.columns and "age" in data:
df["age"] = data["age"][time_period].astype(np.float32)

hh_ids_person = data.get("person_household_id", {}).get(time_period)
if hh_ids_person is None or "age" not in df.columns:
df["count_under_18"] = 0.0
df["count_under_6"] = 0.0
df["household_size"] = 1.0
return df

age_df = pd.DataFrame(
{
"hh": hh_ids_person,
"age": np.asarray(df["age"]),
}
)
grouped = age_df.groupby("hh")["age"]
df["count_under_18"] = (
grouped.transform(lambda values: (values < 18).sum())
.to_numpy()
.astype(np.float32)
)
df["count_under_6"] = (
grouped.transform(lambda values: (values < 6).sum())
.to_numpy()
.astype(np.float32)
)
df["household_size"] = grouped.transform("size").to_numpy().astype(np.float32)
return df


def _add_sipp_asset_predictors(asset_df: pd.DataFrame) -> pd.DataFrame:
"""Add SIPP-side liquid-asset model predictors without SSI receipt."""
asset_df = asset_df.copy()
asset_df["bank_account_assets"] = asset_df["TVAL_BANK"].fillna(0)
asset_df["stock_assets"] = asset_df["TVAL_STMF"].fillna(0)
asset_df["bond_assets"] = asset_df["TVAL_BOND"].fillna(0)
asset_df["age"] = asset_df.TAGE
asset_df["is_female"] = asset_df.ESEX == 2
asset_df["is_married"] = asset_df.EMS == 1

job_cols = [col for col in ASSET_JOB_EARNINGS_COLUMNS if col in asset_df]
if job_cols:
asset_df["employment_income"] = asset_df[job_cols].fillna(0).sum(axis=1) * 12
elif "TPTOTINC" in asset_df:
asset_df["employment_income"] = asset_df.TPTOTINC.fillna(0) * 12
else:
asset_df["employment_income"] = 0.0

asset_df["interest_income"] = (
asset_df["TINC_BANK"].fillna(0) + asset_df["TINC_BOND"].fillna(0)
) * 12
asset_df["dividend_income"] = asset_df["TINC_STMF"].fillna(0) * 12
asset_df["rental_income"] = asset_df["TINC_RENT"].fillna(0) * 12
asset_df["social_security"] = asset_df["TSSSAMT"].fillna(0) * 12
asset_df["retirement_income"] = asset_df["TRETINCAMT"].fillna(0) * 12
asset_df["non_ssi_income"] = (
asset_df["employment_income"]
+ asset_df["social_security"]
+ asset_df["retirement_income"]
)
asset_df["household_weight"] = asset_df.WPFINWGT

asset_df["is_under_18"] = asset_df.TAGE < 18
asset_df["is_under_6"] = asset_df.TAGE < 6
grouped = asset_df.groupby("SSUID")
asset_df["count_under_18"] = grouped["is_under_18"].transform("sum")
asset_df["count_under_6"] = grouped["is_under_6"].transform("sum")
asset_df["household_size"] = grouped["PNUM"].transform("count")
return asset_df


def _add_cps_asset_predictors(
cps_asset_df: pd.DataFrame,
data: Dict[str, Dict[int, np.ndarray]],
time_period: int,
) -> pd.DataFrame:
"""Add CPS-side predictors aligned to the SIPP liquid-asset model."""
cps_asset_df = cps_asset_df.copy()
n_persons = len(cps_asset_df)

if "is_male" in cps_asset_df.columns:
cps_asset_df["is_female"] = (~cps_asset_df["is_male"].astype(bool)).astype(
np.float32
)
elif "is_female" in data:
cps_asset_df["is_female"] = data["is_female"][time_period].astype(np.float32)
else:
cps_asset_df["is_female"] = 0.0

cps_asset_df["is_married"] = _person_is_married(
data,
time_period,
n_persons,
)
cps_asset_df = _add_person_household_counts(cps_asset_df, data, time_period)

for var in [
"employment_income",
"interest_income",
"dividend_income",
"rental_income",
"social_security",
"pension_income",
"retirement_distributions",
]:
if var in cps_asset_df.columns:
continue
if var in data:
cps_asset_df[var] = data[var][time_period].astype(np.float32)
else:
cps_asset_df[var] = 0.0

cps_asset_df["retirement_income"] = cps_asset_df["pension_income"].fillna(
0
) + cps_asset_df["retirement_distributions"].fillna(0)
cps_asset_df["non_ssi_income"] = (
cps_asset_df["employment_income"].fillna(0)
+ cps_asset_df["social_security"].fillna(0)
+ cps_asset_df["retirement_income"].fillna(0)
)

for predictor in SIPP_ASSETS_PREDICTORS:
if predictor not in cps_asset_df.columns:
cps_asset_df[predictor] = 0.0
cps_asset_df[predictor] = cps_asset_df[predictor].fillna(0).astype(np.float32)

return cps_asset_df


@pipeline_node(
PipelineNode(
id="acs_qrf",
Expand Down Expand Up @@ -571,56 +722,30 @@ def _impute_sipp(
"TAGE",
"ESEX",
"EMS",
"TPTOTINC",
"TSSSAMT",
"TRETINCAMT",
"TVAL_BANK",
"TVAL_STMF",
"TVAL_BOND",
"TINC_BANK",
"TINC_STMF",
"TINC_BOND",
"TINC_RENT",
]
] + ASSET_JOB_EARNINGS_COLUMNS
asset_df = pd.read_csv(
STORAGE_FOLDER / "pu2023.csv",
delimiter="|",
usecols=asset_cols,
)
asset_df = asset_df[asset_df.MONTHCODE == 12]

asset_df["bank_account_assets"] = asset_df["TVAL_BANK"].fillna(0)
asset_df["stock_assets"] = asset_df["TVAL_STMF"].fillna(0)
asset_df["bond_assets"] = asset_df["TVAL_BOND"].fillna(0)
asset_df["age"] = asset_df.TAGE
asset_df["is_female"] = asset_df.ESEX == 2
asset_df["is_married"] = asset_df.EMS == 1
asset_df["employment_income"] = asset_df.TPTOTINC * 12
asset_df["interest_income"] = (
asset_df["TINC_BANK"].fillna(0) + asset_df["TINC_BOND"].fillna(0)
) * 12
asset_df["dividend_income"] = asset_df["TINC_STMF"].fillna(0) * 12
asset_df["rental_income"] = asset_df["TINC_RENT"].fillna(0) * 12
asset_df["household_weight"] = asset_df.WPFINWGT
asset_df["is_under_18"] = asset_df.TAGE < 18
asset_df["count_under_18"] = (
asset_df.groupby("SSUID")["is_under_18"]
.sum()
.loc[asset_df.SSUID.values]
.values
)
asset_df = _add_sipp_asset_predictors(asset_df)

asset_train_cols = [
"employment_income",
"interest_income",
"dividend_income",
"rental_income",
"bank_account_assets",
"stock_assets",
"bond_assets",
"age",
"is_female",
"is_married",
"count_under_18",
"household_weight",
*SIPP_ASSETS_PREDICTORS,
]
asset_train = asset_df[asset_train_cols].dropna()
asset_train = asset_train.loc[
Expand All @@ -641,39 +766,18 @@ def _impute_sipp(
"interest_income",
"dividend_income",
"rental_income",
"social_security",
"pension_income",
"retirement_distributions",
"age",
"is_male",
],
)
if "is_male" in cps_asset_df.columns:
cps_asset_df["is_female"] = (~cps_asset_df["is_male"].astype(bool)).astype(
np.float32
)
else:
cps_asset_df["is_female"] = 0.0
if "is_married" in data:
cps_asset_df["is_married"] = data["is_married"][time_period].astype(
np.float32
)
else:
cps_asset_df["is_married"] = 0.0
cps_asset_df["count_under_18"] = (
cps_tip_df["count_under_18"]
if "count_under_18" in cps_tip_df.columns
else 0.0
cps_asset_df = _add_cps_asset_predictors(
cps_asset_df,
data,
time_period,
)
for cap_var in [
"interest_income",
"dividend_income",
"rental_income",
]:
if cap_var not in cps_asset_df.columns:
if cap_var in data:
cps_asset_df[cap_var] = data[cap_var][time_period].astype(
np.float32
)
else:
cps_asset_df[cap_var] = 0.0

asset_vars = [
"bank_account_assets",
Expand Down Expand Up @@ -738,12 +842,11 @@ def _impute_sipp(
).astype(np.float32)
else:
cps_vehicle_df["is_female"] = 0.0
if "is_married" in data:
cps_vehicle_df["is_married"] = data["is_married"][time_period].astype(
np.float32
)
else:
cps_vehicle_df["is_married"] = 0.0
cps_vehicle_df["is_married"] = _person_is_married(
data,
time_period,
len(cps_vehicle_df),
)
for cap_var in [
"interest_income",
"dividend_income",
Expand Down
6 changes: 6 additions & 0 deletions policyengine_us_data/calibration/target_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ include:
geo_level: national
- variable: ssi
geo_level: national
- variable: person_count
geo_level: national
domain_variable: ssi
- variable: person_count
geo_level: national
domain_variable: age,ssi
- variable: tanf
geo_level: national
- variable: spm_unit_count
Expand Down
22 changes: 20 additions & 2 deletions policyengine_us_data/calibration/unified_matrix_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2110,8 +2110,26 @@ def _query_targets(self, target_filter: dict) -> pd.DataFrame:

if "domain_variables" in target_filter:
dvs = target_filter["domain_variables"]
ph = ",".join(f"'{dv}'" for dv in dvs)
and_conditions.append(f"tv.domain_variable IN ({ph})")
exact_ph = ",".join(f"'{dv}'" for dv in dvs)
single_constraint_dvs = [dv for dv in dvs if "," not in str(dv)]
if single_constraint_dvs:
component_ph = ",".join(f"'{dv}'" for dv in single_constraint_dvs)
and_conditions.append(
"("
f"tv.domain_variable IN ({exact_ph}) "
"OR EXISTS ("
"SELECT 1 FROM stratum_constraints sc_domain "
"WHERE sc_domain.stratum_id = tv.stratum_id "
"AND sc_domain.constraint_variable NOT IN ("
"'state_fips', 'congressional_district_geoid', "
"'tax_unit_is_filer', 'ucgid_str'"
") "
f"AND sc_domain.constraint_variable IN ({component_ph})"
")"
")"
)
else:
and_conditions.append(f"tv.domain_variable IN ({exact_ph})")

if "variables" in target_filter:
vs = ",".join(f"'{v}'" for v in target_filter["variables"])
Expand Down
Loading
Loading