Skip to content

Commit e23b696

Browse files
authored
Merge pull request #688 from PolicyEngine/codex/liheap-db-calibration
[codex] Add LIHEAP targets to local calibration DB
2 parents dcd05b2 + 0dfa53f commit e23b696

6 files changed

Lines changed: 234 additions & 8 deletions

File tree

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ PolicyEngine constructs its representative household datasets through a multi-st
2828

2929
The Enhanced CPS (`make data-legacy`) produces a national-only calibrated dataset. For the current geography-specific pipeline, see [docs/calibration.md](docs/calibration.md).
3030

31+
The repo currently contains two calibration tracks:
32+
- Legacy Enhanced CPS (`make data-legacy`), which uses the older `EnhancedCPS` / `build_loss_matrix()` path for national-only calibration.
33+
- Unified calibration (`docs/calibration.md`), which uses `storage/calibration/policy_data.db` and the sparse matrix + L0 pipeline for current national and geography-specific builds.
34+
3135
For detailed calibration usage, see [docs/calibration.md](docs/calibration.md) and [modal_app/README.md](modal_app/README.md).
3236

3337
### Running the Full Pipeline

docs/calibration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
The unified calibration pipeline reweights cloned CPS records to match administrative targets using L0-regularized optimization. This guide covers the main workflows: lightweight build-then-fit, full pipeline with PUF, and fitting from a saved package.
44

5+
This is the current production calibration path. The older national-only Enhanced CPS path (`make data-legacy`) remains in the repo for legacy reproduction and uses a separate `EnhancedCPS` / `build_loss_matrix()` flow.
6+
57
## Quick Start
68

79
```bash

docs/methodology.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ graph TD
9595
classDef output fill:#5091CC,stroke:#2C6496,color:#FFFFFF
9696
```
9797

98+
The current production calibration path is the geography-specific target-database pipeline shown above. The legacy national-only Enhanced CPS reweighting branch remains in the repo for reproduction, so calibration-target changes that must affect both paths need updates in both the unified database pipeline and the older `EnhancedCPS` / `build_loss_matrix()` flow.
99+
98100
## Stage 1: Variable Imputation
99101

100102
The imputation process begins by aging both the CPS and PUF datasets to the target year, then creating a copy of the aged CPS dataset. This allows us to preserve the original CPS structure while adding imputed tax variables.
@@ -298,4 +300,4 @@ Key files:
298300
- `policyengine_us_data/calibration/unified_matrix_builder.py` — Sparse calibration matrix builder
299301
- `policyengine_us_data/calibration/clone_and_assign.py` — Geography cloning and block assignment
300302
- `policyengine_us_data/calibration/publish_local_area.py` — H5 file generation
301-
- `policyengine_us_data/db/` — Target database ETL scripts
303+
- `policyengine_us_data/db/` — Target database ETL scripts

policyengine_us_data/db/etl_national_targets.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,22 @@ def extract_national_targets(dataset: str = DEFAULT_DATASET):
321321
"notes": "ACA Premium Tax Credit recipients",
322322
"year": HARDCODED_YEAR,
323323
},
324+
{
325+
"constraint_variable": "spm_unit_energy_subsidy_reported",
326+
"target_variable": "household_count",
327+
"household_count": 5_939_605,
328+
"source": "https://liheappm.acf.gov/sites/default/files/private/congress/profiles/2023/FY2023AllStates%28National%29Profile-508Compliant.pdf",
329+
"notes": "LIHEAP total households served by state programs",
330+
"year": 2023,
331+
},
332+
{
333+
"constraint_variable": "spm_unit_energy_subsidy_reported",
334+
"target_variable": "household_count",
335+
"household_count": 5_876_646,
336+
"source": "https://liheappm.acf.gov/sites/default/files/private/congress/profiles/2024/FY2024_AllStates%28National%29_Profile.pdf",
337+
"notes": "LIHEAP total households served by state programs",
338+
"year": 2024,
339+
},
324340
]
325341

326342
# Add SSN card type NONE targets for multiple years
@@ -730,6 +746,8 @@ def load_national_targets(
730746
for cond_target in conditional_targets:
731747
constraint_var = cond_target["constraint_variable"]
732748
target_year = cond_target["year"]
749+
target_variable = cond_target.get("target_variable", "person_count")
750+
target_value = cond_target.get(target_variable)
733751

734752
# Determine constraint details
735753
if constraint_var == "medicaid":
@@ -740,6 +758,10 @@ def load_national_targets(
740758
stratum_notes = "National ACA Premium Tax Credit Recipients"
741759
constraint_operation = ">"
742760
constraint_value = "0"
761+
elif constraint_var == "spm_unit_energy_subsidy_reported":
762+
stratum_notes = "National LIHEAP Recipient Households"
763+
constraint_operation = ">"
764+
constraint_value = "0"
743765
elif constraint_var == "ssn_card_type":
744766
stratum_notes = "National Undocumented Population"
745767
constraint_operation = "=="
@@ -765,23 +787,23 @@ def load_national_targets(
765787
session.query(Target)
766788
.filter(
767789
Target.stratum_id == existing_stratum.stratum_id,
768-
Target.variable == "person_count",
790+
Target.variable == target_variable,
769791
Target.period == target_year,
770792
)
771793
.first()
772794
)
773795

774796
if existing_target:
775-
existing_target.value = cond_target["person_count"]
797+
existing_target.value = target_value
776798
existing_target.source = "PolicyEngine"
777799
print(f"Updated enrollment target for {constraint_var}")
778800
else:
779801
# Add new target to existing stratum
780802
new_target = Target(
781803
stratum_id=existing_stratum.stratum_id,
782-
variable="person_count",
804+
variable=target_variable,
783805
period=target_year,
784-
value=cond_target["person_count"],
806+
value=target_value,
785807
active=True,
786808
source="PolicyEngine",
787809
notes=f"{cond_target['notes']} | Source: {cond_target['source']}",
@@ -807,9 +829,9 @@ def load_national_targets(
807829
# Add target
808830
new_stratum.targets_rel = [
809831
Target(
810-
variable="person_count",
832+
variable=target_variable,
811833
period=target_year,
812-
value=cond_target["person_count"],
834+
value=target_value,
813835
active=True,
814836
source="PolicyEngine",
815837
notes=f"{cond_target['notes']} | Source: {cond_target['source']}",

policyengine_us_data/utils/loss.py

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
import pandas as pd
44
import numpy as np
55
import logging
6+
import sqlite3
67

7-
from policyengine_us_data.storage import CALIBRATION_FOLDER
8+
from policyengine_us_data.storage import CALIBRATION_FOLDER, STORAGE_FOLDER
89
from policyengine_us_data.storage.calibration_targets.pull_soi_targets import (
910
STATE_ABBR_TO_FIPS,
1011
)
@@ -118,6 +119,133 @@ def fmt(x):
118119
return f"{x / 1e9:.1f}bn"
119120

120121

122+
def _parse_constraint_value(value):
123+
if value == "True":
124+
return True
125+
if value == "False":
126+
return False
127+
try:
128+
return int(value)
129+
except (TypeError, ValueError):
130+
try:
131+
return float(value)
132+
except (TypeError, ValueError):
133+
return value
134+
135+
136+
def _apply_constraint(values, operation: str, raw_value: str):
137+
if operation == "in":
138+
allowed_values = [part.strip() for part in raw_value.split("|")]
139+
return np.isin(values, allowed_values)
140+
141+
value = _parse_constraint_value(raw_value)
142+
if operation in ("equals", "==", "="):
143+
return values == value
144+
if operation in ("greater_than", ">"):
145+
return values > value
146+
if operation in ("greater_than_or_equal", ">="):
147+
return values >= value
148+
if operation in ("less_than", "<"):
149+
return values < value
150+
if operation in ("less_than_or_equal", "<="):
151+
return values <= value
152+
if operation in ("not_equals", "!=", "<>"):
153+
return values != value
154+
155+
raise ValueError(f"Unsupported stratum constraint operation: {operation}")
156+
157+
158+
def _geo_label_from_ucgid(ucgid_str: str) -> str:
159+
if ucgid_str in (None, "", "0100000US"):
160+
return "nation"
161+
return f"geo/{ucgid_str}"
162+
163+
164+
def _add_liheap_targets_from_db(loss_matrix, targets_list, sim, time_period):
165+
db_path = STORAGE_FOLDER / "calibration" / "policy_data.db"
166+
if not db_path.exists():
167+
return targets_list, loss_matrix
168+
169+
query = """
170+
SELECT
171+
t.target_id,
172+
t.variable,
173+
t.value AS target_value,
174+
s.notes,
175+
sc.constraint_variable,
176+
sc.operation,
177+
sc.value AS constraint_value
178+
FROM targets t
179+
JOIN strata s
180+
ON s.stratum_id = t.stratum_id
181+
JOIN stratum_constraints sc
182+
ON sc.stratum_id = s.stratum_id
183+
WHERE
184+
t.active = 1
185+
AND t.reform_id = 0
186+
AND t.period = ?
187+
AND s.notes LIKE '%LIHEAP%'
188+
ORDER BY t.target_id
189+
"""
190+
191+
with sqlite3.connect(db_path) as conn:
192+
target_rows = pd.read_sql_query(query, conn, params=[time_period])
193+
194+
if target_rows.empty:
195+
return targets_list, loss_matrix
196+
197+
household_values_cache = {
198+
"household_weight": sim.calculate("household_weight").values
199+
}
200+
201+
def get_household_values(variable: str):
202+
if variable not in household_values_cache:
203+
household_values_cache[variable] = sim.calculate(
204+
variable,
205+
map_to="household",
206+
).values
207+
return household_values_cache[variable]
208+
209+
n_households = len(household_values_cache["household_weight"])
210+
211+
for _, target_df in target_rows.groupby("target_id", sort=False):
212+
mask = np.ones(n_households, dtype=bool)
213+
for row in target_df.itertuples(index=False):
214+
if (
215+
row.constraint_variable == "ucgid_str"
216+
and row.constraint_value == "0100000US"
217+
):
218+
continue
219+
values = get_household_values(row.constraint_variable)
220+
mask &= _apply_constraint(
221+
values,
222+
row.operation,
223+
row.constraint_value,
224+
)
225+
226+
variable = target_df["variable"].iat[0]
227+
if variable == "household_count":
228+
metric = mask.astype(float)
229+
else:
230+
metric = np.where(mask, get_household_values(variable), 0.0)
231+
232+
ucgid_constraints = target_df.loc[
233+
target_df.constraint_variable == "ucgid_str", "constraint_value"
234+
]
235+
geo_label = _geo_label_from_ucgid(
236+
ucgid_constraints.iat[0] if not ucgid_constraints.empty else None
237+
)
238+
label = f"{geo_label}/db/liheap/{variable}"
239+
loss_matrix[label] = metric
240+
targets_list.append(target_df["target_value"].iat[0])
241+
242+
logging.info(
243+
f"Loaded {target_rows['target_id'].nunique()} LIHEAP targets from the local targets DB"
244+
)
245+
246+
return targets_list, loss_matrix
247+
248+
121249
def build_loss_matrix(dataset: type, time_period):
122250
loss_matrix = pd.DataFrame()
123251
df = pe_to_soi(dataset, time_period)
@@ -667,6 +795,10 @@ def build_loss_matrix(dataset: type, time_period):
667795
targets_array.extend(snap_state_targets)
668796
loss_matrix = _add_snap_metric_columns(loss_matrix, sim)
669797

798+
targets_array, loss_matrix = _add_liheap_targets_from_db(
799+
loss_matrix, targets_array, sim, time_period
800+
)
801+
670802
del sim, df
671803
gc.collect()
672804

tests/unit/test_etl_national_targets.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,67 @@ def test_load_national_targets_deactivates_stale_baseline_rows(tmp_path, monkeyp
137137
in (target.notes or "")
138138
for target in reform_rows
139139
)
140+
141+
142+
def test_load_national_targets_supports_liheap_household_counts(tmp_path, monkeypatch):
143+
calibration_dir = tmp_path / "calibration"
144+
calibration_dir.mkdir()
145+
db_uri = f"sqlite:///{calibration_dir / 'policy_data.db'}"
146+
engine = create_database(db_uri)
147+
148+
with Session(engine) as session:
149+
national = _make_stratum(session, notes="United States")
150+
assert national is not None
151+
152+
monkeypatch.setattr(
153+
"policyengine_us_data.db.etl_national_targets.STORAGE_FOLDER",
154+
tmp_path,
155+
)
156+
157+
conditional_targets = [
158+
{
159+
"constraint_variable": "spm_unit_energy_subsidy_reported",
160+
"target_variable": "household_count",
161+
"household_count": 5_876_646,
162+
"source": "https://example.com/liheap-2024.pdf",
163+
"notes": "LIHEAP total households served by state programs",
164+
"year": 2024,
165+
}
166+
]
167+
168+
load_national_targets(
169+
direct_targets_df=pd.DataFrame(),
170+
tax_filer_df=pd.DataFrame(),
171+
tax_expenditure_df=pd.DataFrame(),
172+
conditional_targets=conditional_targets,
173+
)
174+
175+
with Session(engine) as session:
176+
liheap_stratum = (
177+
session.query(Stratum)
178+
.filter(Stratum.notes == "National LIHEAP Recipient Households")
179+
.first()
180+
)
181+
assert liheap_stratum is not None
182+
183+
constraints = {
184+
(
185+
constraint.constraint_variable,
186+
constraint.operation,
187+
constraint.value,
188+
)
189+
for constraint in liheap_stratum.constraints_rel
190+
}
191+
assert ("spm_unit_energy_subsidy_reported", ">", "0") in constraints
192+
193+
liheap_target = (
194+
session.query(Target)
195+
.filter(
196+
Target.stratum_id == liheap_stratum.stratum_id,
197+
Target.variable == "household_count",
198+
Target.period == 2024,
199+
)
200+
.first()
201+
)
202+
assert liheap_target is not None
203+
assert liheap_target.value == 5_876_646

0 commit comments

Comments
 (0)