Skip to content

Commit a01b760

Browse files
authored
Close leaked CPS HDF stores (#694)
* Close leaked CPS HDF stores * Use context managers for CPS raw stores * Use current changelog fragment naming * Fix CPS raw-store CI regressions
1 parent b1be0b6 commit a01b760

3 files changed

Lines changed: 333 additions & 82 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Close raw CPS HDF stores after previous-year income and auto-loan preprocessing so CPS builds do not leave `census_cps_2021.h5` and `census_cps_2022.h5` open at process shutdown.

policyengine_us_data/datasets/cps/cps.py

Lines changed: 96 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from contextlib import closing, contextmanager
12
from importlib.resources import files
23
from policyengine_core.data import Dataset
34
from policyengine_us_data.storage import STORAGE_FOLDER, DOCS_FOLDER
@@ -91,6 +92,20 @@
9192
}
9293

9394

95+
@contextmanager
96+
def _open_dataset_read_only(dataset_source):
97+
dataset = dataset_source(require=True)
98+
file_path = getattr(dataset, "file_path", None)
99+
100+
if file_path is not None:
101+
with pd.HDFStore(file_path, mode="r") as store:
102+
yield store
103+
return
104+
105+
with closing(dataset.load()) as store:
106+
yield store
107+
108+
94109
class CPS(Dataset):
95110
name = "cps"
96111
label = "CPS"
@@ -113,13 +128,13 @@ def generate(self):
113128
"For future years, use PolicyEngine's uprating at simulation time."
114129
)
115130

116-
raw_data = self.raw_cps(require=True).load()
117131
cps = {}
118132

119133
ENTITIES = ("person", "tax_unit", "family", "spm_unit", "household")
120-
person, tax_unit, family, spm_unit, household = [
121-
raw_data[entity] for entity in ENTITIES
122-
]
134+
with _open_dataset_read_only(self.raw_cps) as raw_data:
135+
person, tax_unit, family, spm_unit, household = [
136+
raw_data[entity] for entity in ENTITIES
137+
]
123138

124139
logging.info("Adding ID variables")
125140
add_id_variables(cps, person, tax_unit, family, spm_unit, household)
@@ -160,7 +175,6 @@ def generate(self):
160175
add_auto_loan_interest_and_net_worth(self, cps)
161176
logging.info("Added all variables")
162177

163-
raw_data.close()
164178
self.save_dataset(cps)
165179
logging.info("Adding takeup")
166180
add_takeup(self)
@@ -930,39 +944,41 @@ def add_previous_year_income(self, cps: h5py.File) -> None:
930944
)
931945
return
932946

933-
cps_current_year_data = self.raw_cps(require=True).load()
934-
cps_previous_year_data = self.previous_year_raw_cps(require=True).load()
935-
cps_previous_year = cps_previous_year_data.person.set_index(
936-
cps_previous_year_data.person.PERIDNUM
937-
)
938-
cps_current_year = cps_current_year_data.person.set_index(
939-
cps_current_year_data.person.PERIDNUM
940-
)
947+
with (
948+
_open_dataset_read_only(self.raw_cps) as cps_current_year_data,
949+
_open_dataset_read_only(self.previous_year_raw_cps) as cps_previous_year_data,
950+
):
951+
cps_previous_year = cps_previous_year_data.person.set_index(
952+
cps_previous_year_data.person.PERIDNUM
953+
)
954+
cps_current_year = cps_current_year_data.person.set_index(
955+
cps_current_year_data.person.PERIDNUM
956+
)
941957

942-
previous_year_data = cps_previous_year[
943-
["WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"]
944-
].rename(
945-
{
946-
"WSAL_VAL": "employment_income_last_year",
947-
"SEMP_VAL": "self_employment_income_last_year",
948-
},
949-
axis=1,
950-
)
958+
previous_year_data = cps_previous_year[
959+
["WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"]
960+
].rename(
961+
{
962+
"WSAL_VAL": "employment_income_last_year",
963+
"SEMP_VAL": "self_employment_income_last_year",
964+
},
965+
axis=1,
966+
)
951967

952-
previous_year_data = previous_year_data[
953-
(previous_year_data.I_ERNVAL == 0) & (previous_year_data.I_SEVAL == 0)
954-
]
968+
previous_year_data = previous_year_data[
969+
(previous_year_data.I_ERNVAL == 0) & (previous_year_data.I_SEVAL == 0)
970+
]
955971

956-
previous_year_data.drop(["I_ERNVAL", "I_SEVAL"], axis=1, inplace=True)
972+
previous_year_data.drop(["I_ERNVAL", "I_SEVAL"], axis=1, inplace=True)
957973

958-
joined_data = cps_current_year.join(previous_year_data)[
959-
[
960-
"employment_income_last_year",
961-
"self_employment_income_last_year",
962-
"I_ERNVAL",
963-
"I_SEVAL",
974+
joined_data = cps_current_year.join(previous_year_data)[
975+
[
976+
"employment_income_last_year",
977+
"self_employment_income_last_year",
978+
"I_ERNVAL",
979+
"I_SEVAL",
980+
]
964981
]
965-
]
966982
joined_data["previous_year_income_available"] = (
967983
~joined_data.employment_income_last_year.isna()
968984
& ~joined_data.self_employment_income_last_year.isna()
@@ -1884,13 +1900,12 @@ def add_tips(self, cps: h5py.File):
18841900
# Get is_married from raw CPS data (A_MARITL codes: 1,2 = married)
18851901
# Note: is_married in policyengine-us is Family-level, but we need
18861902
# person-level for imputation models
1887-
raw_data = self.raw_cps(require=True).load()
1888-
raw_person = raw_data["person"]
1889-
cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values
1890-
cps["is_tipped_occupation"] = derive_is_tipped_occupation(
1891-
derive_treasury_tipped_occupation_code(raw_person.PEIOOCC)
1892-
)
1893-
raw_data.close()
1903+
with _open_dataset_read_only(self.raw_cps) as raw_data:
1904+
raw_person = raw_data["person"]
1905+
cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values
1906+
cps["is_tipped_occupation"] = derive_is_tipped_occupation(
1907+
derive_treasury_tipped_occupation_code(raw_person.PEIOOCC)
1908+
)
18941909

18951910
cps["is_under_18"] = cps.age < 18
18961911
cps["is_under_6"] = cps.age < 6
@@ -2050,51 +2065,50 @@ def add_auto_loan_interest_and_net_worth(self, cps: h5py.File) -> None:
20502065
cps_data = self.load_dataset()
20512066

20522067
# Access raw CPS for additional variables
2053-
raw_data_instance = self.raw_cps(require=True)
2054-
raw_data = raw_data_instance.load()
2055-
person_data = raw_data.person
2056-
2057-
# Preprocess the CPS for imputation
2058-
lengths = {k: len(v) for k, v in cps_data.items()}
2059-
var_len = cps_data["person_household_id"].shape[0]
2060-
vars_of_interest = [name for name, ln in lengths.items() if ln == var_len]
2061-
agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest})
2062-
agg_data["interest_dividend_income"] = np.sum(
2063-
[
2064-
agg_data["taxable_interest_income"],
2065-
agg_data["tax_exempt_interest_income"],
2066-
agg_data["qualified_dividend_income"],
2067-
agg_data["non_qualified_dividend_income"],
2068-
],
2069-
axis=0,
2070-
)
2071-
agg_data["social_security_pension_income"] = np.sum(
2072-
[
2073-
agg_data["tax_exempt_private_pension_income"],
2074-
agg_data["taxable_private_pension_income"],
2075-
agg_data["social_security_retirement"],
2076-
],
2077-
axis=0,
2078-
)
2079-
2080-
agg = (
2081-
agg_data.groupby("person_household_id")[
2068+
with _open_dataset_read_only(self.raw_cps) as raw_data:
2069+
person_data = raw_data.person
2070+
2071+
# Preprocess the CPS for imputation
2072+
lengths = {k: len(v) for k, v in cps_data.items()}
2073+
var_len = cps_data["person_household_id"].shape[0]
2074+
vars_of_interest = [name for name, ln in lengths.items() if ln == var_len]
2075+
agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest})
2076+
agg_data["interest_dividend_income"] = np.sum(
2077+
[
2078+
agg_data["taxable_interest_income"],
2079+
agg_data["tax_exempt_interest_income"],
2080+
agg_data["qualified_dividend_income"],
2081+
agg_data["non_qualified_dividend_income"],
2082+
],
2083+
axis=0,
2084+
)
2085+
agg_data["social_security_pension_income"] = np.sum(
20822086
[
2083-
"employment_income",
2084-
"interest_dividend_income",
2085-
"social_security_pension_income",
2087+
agg_data["tax_exempt_private_pension_income"],
2088+
agg_data["taxable_private_pension_income"],
2089+
agg_data["social_security_retirement"],
2090+
],
2091+
axis=0,
2092+
)
2093+
2094+
agg = (
2095+
agg_data.groupby("person_household_id")[
2096+
[
2097+
"employment_income",
2098+
"interest_dividend_income",
2099+
"social_security_pension_income",
2100+
]
20862101
]
2087-
]
2088-
.sum()
2089-
.rename(
2090-
columns={
2091-
"employment_income": "household_employment_income",
2092-
"interest_dividend_income": "household_interest_dividend_income",
2093-
"social_security_pension_income": "household_social_security_pension_income",
2094-
}
2102+
.sum()
2103+
.rename(
2104+
columns={
2105+
"employment_income": "household_employment_income",
2106+
"interest_dividend_income": "household_interest_dividend_income",
2107+
"social_security_pension_income": "household_social_security_pension_income",
2108+
}
2109+
)
2110+
.reset_index()
20952111
)
2096-
.reset_index()
2097-
)
20982112

20992113
def create_scf_reference_person_mask(cps_data, raw_person_data):
21002114
"""

0 commit comments

Comments
 (0)