Skip to content

Commit bf6def1

Browse files
authored
Improve SSI asset imputation and calibration targets
Fixes #1096
1 parent 3f21cc5 commit bf6def1

17 files changed

Lines changed: 779 additions & 153 deletions

changelog.d/1096.changed

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improved SSI-sensitive liquid asset imputation predictors and added SSA SSI recipient calibration targets.

policyengine_us_data/calibration/source_impute.py

Lines changed: 178 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
derive_is_tipped_occupation,
3737
)
3838
from policyengine_us_data.datasets.sipp.sipp import (
39+
ASSET_JOB_EARNINGS_COLUMNS,
40+
ASSET_PREDICTORS,
3941
VEHICLE_MODEL_PREDICTORS,
4042
build_vehicle_training_frame,
4143
)
@@ -121,16 +123,7 @@
121123
"is_tipped_occupation",
122124
]
123125

124-
SIPP_ASSETS_PREDICTORS = [
125-
"employment_income",
126-
"interest_income",
127-
"dividend_income",
128-
"rental_income",
129-
"age",
130-
"is_female",
131-
"is_married",
132-
"count_under_18",
133-
]
126+
SIPP_ASSETS_PREDICTORS = ASSET_PREDICTORS
134127

135128
SCF_PREDICTORS = [
136129
"age",
@@ -316,6 +309,164 @@ def _person_state_fips(
316309
return np.repeat(state_fips, counts)
317310

318311

312+
def _person_is_married(
313+
data: Dict[str, Dict[int, np.ndarray]],
314+
time_period: int,
315+
n_persons: int,
316+
) -> np.ndarray:
317+
"""Return a person-level married flag from CPS-compatible inputs."""
318+
if "is_married" in data and time_period in data["is_married"]:
319+
values = np.asarray(data["is_married"][time_period])
320+
if len(values) == n_persons:
321+
return values.astype(np.float32)
322+
323+
marital_unit_id = data.get("person_marital_unit_id", {}).get(time_period)
324+
if marital_unit_id is not None and len(marital_unit_id) == n_persons:
325+
marital_unit_id = np.asarray(marital_unit_id)
326+
counts = pd.Series(marital_unit_id).map(
327+
pd.Series(marital_unit_id).value_counts()
328+
)
329+
return (counts.to_numpy() > 1).astype(np.float32)
330+
331+
return np.zeros(n_persons, dtype=np.float32)
332+
333+
334+
def _add_person_household_counts(
335+
df: pd.DataFrame,
336+
data: Dict[str, Dict[int, np.ndarray]],
337+
time_period: int,
338+
) -> pd.DataFrame:
339+
"""Add household composition predictors to a person-level CPS frame."""
340+
if "age" not in df.columns and "age" in data:
341+
df["age"] = data["age"][time_period].astype(np.float32)
342+
343+
hh_ids_person = data.get("person_household_id", {}).get(time_period)
344+
if hh_ids_person is None or "age" not in df.columns:
345+
df["count_under_18"] = 0.0
346+
df["count_under_6"] = 0.0
347+
df["household_size"] = 1.0
348+
return df
349+
350+
age_df = pd.DataFrame(
351+
{
352+
"hh": hh_ids_person,
353+
"age": np.asarray(df["age"]),
354+
}
355+
)
356+
grouped = age_df.groupby("hh")["age"]
357+
df["count_under_18"] = (
358+
grouped.transform(lambda values: (values < 18).sum())
359+
.to_numpy()
360+
.astype(np.float32)
361+
)
362+
df["count_under_6"] = (
363+
grouped.transform(lambda values: (values < 6).sum())
364+
.to_numpy()
365+
.astype(np.float32)
366+
)
367+
df["household_size"] = grouped.transform("size").to_numpy().astype(np.float32)
368+
return df
369+
370+
371+
def _add_sipp_asset_predictors(asset_df: pd.DataFrame) -> pd.DataFrame:
372+
"""Add SIPP-side liquid-asset model predictors without SSI receipt."""
373+
asset_df = asset_df.copy()
374+
asset_df["bank_account_assets"] = asset_df["TVAL_BANK"].fillna(0)
375+
asset_df["stock_assets"] = asset_df["TVAL_STMF"].fillna(0)
376+
asset_df["bond_assets"] = asset_df["TVAL_BOND"].fillna(0)
377+
asset_df["age"] = asset_df.TAGE
378+
asset_df["is_female"] = asset_df.ESEX == 2
379+
asset_df["is_married"] = asset_df.EMS == 1
380+
381+
job_cols = [col for col in ASSET_JOB_EARNINGS_COLUMNS if col in asset_df]
382+
if job_cols:
383+
asset_df["employment_income"] = asset_df[job_cols].fillna(0).sum(axis=1) * 12
384+
elif "TPTOTINC" in asset_df:
385+
asset_df["employment_income"] = asset_df.TPTOTINC.fillna(0) * 12
386+
else:
387+
asset_df["employment_income"] = 0.0
388+
389+
asset_df["interest_income"] = (
390+
asset_df["TINC_BANK"].fillna(0) + asset_df["TINC_BOND"].fillna(0)
391+
) * 12
392+
asset_df["dividend_income"] = asset_df["TINC_STMF"].fillna(0) * 12
393+
asset_df["rental_income"] = asset_df["TINC_RENT"].fillna(0) * 12
394+
asset_df["social_security"] = asset_df["TSSSAMT"].fillna(0) * 12
395+
asset_df["retirement_income"] = asset_df["TRETINCAMT"].fillna(0) * 12
396+
asset_df["non_ssi_income"] = (
397+
asset_df["employment_income"]
398+
+ asset_df["social_security"]
399+
+ asset_df["retirement_income"]
400+
)
401+
asset_df["household_weight"] = asset_df.WPFINWGT
402+
403+
asset_df["is_under_18"] = asset_df.TAGE < 18
404+
asset_df["is_under_6"] = asset_df.TAGE < 6
405+
grouped = asset_df.groupby("SSUID")
406+
asset_df["count_under_18"] = grouped["is_under_18"].transform("sum")
407+
asset_df["count_under_6"] = grouped["is_under_6"].transform("sum")
408+
asset_df["household_size"] = grouped["PNUM"].transform("count")
409+
return asset_df
410+
411+
412+
def _add_cps_asset_predictors(
413+
cps_asset_df: pd.DataFrame,
414+
data: Dict[str, Dict[int, np.ndarray]],
415+
time_period: int,
416+
) -> pd.DataFrame:
417+
"""Add CPS-side predictors aligned to the SIPP liquid-asset model."""
418+
cps_asset_df = cps_asset_df.copy()
419+
n_persons = len(cps_asset_df)
420+
421+
if "is_male" in cps_asset_df.columns:
422+
cps_asset_df["is_female"] = (~cps_asset_df["is_male"].astype(bool)).astype(
423+
np.float32
424+
)
425+
elif "is_female" in data:
426+
cps_asset_df["is_female"] = data["is_female"][time_period].astype(np.float32)
427+
else:
428+
cps_asset_df["is_female"] = 0.0
429+
430+
cps_asset_df["is_married"] = _person_is_married(
431+
data,
432+
time_period,
433+
n_persons,
434+
)
435+
cps_asset_df = _add_person_household_counts(cps_asset_df, data, time_period)
436+
437+
for var in [
438+
"employment_income",
439+
"interest_income",
440+
"dividend_income",
441+
"rental_income",
442+
"social_security",
443+
"pension_income",
444+
"retirement_distributions",
445+
]:
446+
if var in cps_asset_df.columns:
447+
continue
448+
if var in data:
449+
cps_asset_df[var] = data[var][time_period].astype(np.float32)
450+
else:
451+
cps_asset_df[var] = 0.0
452+
453+
cps_asset_df["retirement_income"] = cps_asset_df["pension_income"].fillna(
454+
0
455+
) + cps_asset_df["retirement_distributions"].fillna(0)
456+
cps_asset_df["non_ssi_income"] = (
457+
cps_asset_df["employment_income"].fillna(0)
458+
+ cps_asset_df["social_security"].fillna(0)
459+
+ cps_asset_df["retirement_income"].fillna(0)
460+
)
461+
462+
for predictor in SIPP_ASSETS_PREDICTORS:
463+
if predictor not in cps_asset_df.columns:
464+
cps_asset_df[predictor] = 0.0
465+
cps_asset_df[predictor] = cps_asset_df[predictor].fillna(0).astype(np.float32)
466+
467+
return cps_asset_df
468+
469+
319470
@pipeline_node(
320471
PipelineNode(
321472
id="acs_qrf",
@@ -571,56 +722,30 @@ def _impute_sipp(
571722
"TAGE",
572723
"ESEX",
573724
"EMS",
574-
"TPTOTINC",
725+
"TSSSAMT",
726+
"TRETINCAMT",
575727
"TVAL_BANK",
576728
"TVAL_STMF",
577729
"TVAL_BOND",
578730
"TINC_BANK",
579731
"TINC_STMF",
580732
"TINC_BOND",
581733
"TINC_RENT",
582-
]
734+
] + ASSET_JOB_EARNINGS_COLUMNS
583735
asset_df = pd.read_csv(
584736
STORAGE_FOLDER / "pu2023.csv",
585737
delimiter="|",
586738
usecols=asset_cols,
587739
)
588740
asset_df = asset_df[asset_df.MONTHCODE == 12]
589-
590-
asset_df["bank_account_assets"] = asset_df["TVAL_BANK"].fillna(0)
591-
asset_df["stock_assets"] = asset_df["TVAL_STMF"].fillna(0)
592-
asset_df["bond_assets"] = asset_df["TVAL_BOND"].fillna(0)
593-
asset_df["age"] = asset_df.TAGE
594-
asset_df["is_female"] = asset_df.ESEX == 2
595-
asset_df["is_married"] = asset_df.EMS == 1
596-
asset_df["employment_income"] = asset_df.TPTOTINC * 12
597-
asset_df["interest_income"] = (
598-
asset_df["TINC_BANK"].fillna(0) + asset_df["TINC_BOND"].fillna(0)
599-
) * 12
600-
asset_df["dividend_income"] = asset_df["TINC_STMF"].fillna(0) * 12
601-
asset_df["rental_income"] = asset_df["TINC_RENT"].fillna(0) * 12
602-
asset_df["household_weight"] = asset_df.WPFINWGT
603-
asset_df["is_under_18"] = asset_df.TAGE < 18
604-
asset_df["count_under_18"] = (
605-
asset_df.groupby("SSUID")["is_under_18"]
606-
.sum()
607-
.loc[asset_df.SSUID.values]
608-
.values
609-
)
741+
asset_df = _add_sipp_asset_predictors(asset_df)
610742

611743
asset_train_cols = [
612-
"employment_income",
613-
"interest_income",
614-
"dividend_income",
615-
"rental_income",
616744
"bank_account_assets",
617745
"stock_assets",
618746
"bond_assets",
619-
"age",
620-
"is_female",
621-
"is_married",
622-
"count_under_18",
623747
"household_weight",
748+
*SIPP_ASSETS_PREDICTORS,
624749
]
625750
asset_train = asset_df[asset_train_cols].dropna()
626751
asset_train = asset_train.loc[
@@ -641,39 +766,18 @@ def _impute_sipp(
641766
"interest_income",
642767
"dividend_income",
643768
"rental_income",
769+
"social_security",
770+
"pension_income",
771+
"retirement_distributions",
644772
"age",
645773
"is_male",
646774
],
647775
)
648-
if "is_male" in cps_asset_df.columns:
649-
cps_asset_df["is_female"] = (~cps_asset_df["is_male"].astype(bool)).astype(
650-
np.float32
651-
)
652-
else:
653-
cps_asset_df["is_female"] = 0.0
654-
if "is_married" in data:
655-
cps_asset_df["is_married"] = data["is_married"][time_period].astype(
656-
np.float32
657-
)
658-
else:
659-
cps_asset_df["is_married"] = 0.0
660-
cps_asset_df["count_under_18"] = (
661-
cps_tip_df["count_under_18"]
662-
if "count_under_18" in cps_tip_df.columns
663-
else 0.0
776+
cps_asset_df = _add_cps_asset_predictors(
777+
cps_asset_df,
778+
data,
779+
time_period,
664780
)
665-
for cap_var in [
666-
"interest_income",
667-
"dividend_income",
668-
"rental_income",
669-
]:
670-
if cap_var not in cps_asset_df.columns:
671-
if cap_var in data:
672-
cps_asset_df[cap_var] = data[cap_var][time_period].astype(
673-
np.float32
674-
)
675-
else:
676-
cps_asset_df[cap_var] = 0.0
677781

678782
asset_vars = [
679783
"bank_account_assets",
@@ -738,12 +842,11 @@ def _impute_sipp(
738842
).astype(np.float32)
739843
else:
740844
cps_vehicle_df["is_female"] = 0.0
741-
if "is_married" in data:
742-
cps_vehicle_df["is_married"] = data["is_married"][time_period].astype(
743-
np.float32
744-
)
745-
else:
746-
cps_vehicle_df["is_married"] = 0.0
845+
cps_vehicle_df["is_married"] = _person_is_married(
846+
data,
847+
time_period,
848+
len(cps_vehicle_df),
849+
)
747850
for cap_var in [
748851
"interest_income",
749852
"dividend_income",

policyengine_us_data/calibration/target_config.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ include:
207207
geo_level: national
208208
- variable: ssi
209209
geo_level: national
210+
- variable: person_count
211+
geo_level: national
212+
domain_variable: ssi
213+
- variable: person_count
214+
geo_level: national
215+
domain_variable: age,ssi
210216
- variable: tanf
211217
geo_level: national
212218
- variable: spm_unit_count

policyengine_us_data/calibration/unified_matrix_builder.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2110,8 +2110,26 @@ def _query_targets(self, target_filter: dict) -> pd.DataFrame:
21102110

21112111
if "domain_variables" in target_filter:
21122112
dvs = target_filter["domain_variables"]
2113-
ph = ",".join(f"'{dv}'" for dv in dvs)
2114-
and_conditions.append(f"tv.domain_variable IN ({ph})")
2113+
exact_ph = ",".join(f"'{dv}'" for dv in dvs)
2114+
single_constraint_dvs = [dv for dv in dvs if "," not in str(dv)]
2115+
if single_constraint_dvs:
2116+
component_ph = ",".join(f"'{dv}'" for dv in single_constraint_dvs)
2117+
and_conditions.append(
2118+
"("
2119+
f"tv.domain_variable IN ({exact_ph}) "
2120+
"OR EXISTS ("
2121+
"SELECT 1 FROM stratum_constraints sc_domain "
2122+
"WHERE sc_domain.stratum_id = tv.stratum_id "
2123+
"AND sc_domain.constraint_variable NOT IN ("
2124+
"'state_fips', 'congressional_district_geoid', "
2125+
"'tax_unit_is_filer', 'ucgid_str'"
2126+
") "
2127+
f"AND sc_domain.constraint_variable IN ({component_ph})"
2128+
")"
2129+
")"
2130+
)
2131+
else:
2132+
and_conditions.append(f"tv.domain_variable IN ({exact_ph})")
21152133

21162134
if "variables" in target_filter:
21172135
vs = ",".join(f"'{v}'" for v in target_filter["variables"])

0 commit comments

Comments
 (0)