Skip to content

Commit 4f6324e

Browse files
Add ACA marketplace bronze-selection target ETL (#618)
* Rebase ACA marketplace ETL onto main * Format CPS marketplace benchmark helper * Fix bronze-stratum domain_variable ordering and drop dead ETL Review fixes from the standing review of PR 618: 1. P0 bug: bronze stratum constraints were inserted in the order ``state_fips, used_aca_ptc, selected_marketplace_plan_benchmark_ratio``, which SQLite's ``GROUP_CONCAT(DISTINCT ...)`` preserves insertion order for. That produced ``domain_variable = "used_aca_ptc,...``, but ``target_config.yaml:68`` expects the alphabetical form ``selected_marketplace_plan_benchmark_ratio,used_aca_ptc``. The rule didn't match, so the bronze target silently dropped out of the loss. Reorder the inserts and add a comment explaining why order matters. 2. Delete the now-dead ``etl_aca_agi_state_targets.py`` — it still used ``source="CMS Marketplace"`` (rejected by ``create_field_valid_values``) and the Makefile no longer invokes it. Redirect ``tests/integration/test_database_build.py`` to the new ``etl_aca_marketplace.py``. 3. Add a ValueError guard for corrupt source data (bronze APTC consumers exceeding total APTC consumers for any state). 4. Add the CMS Marketplace PUF URL to the ETL extract docstring so the input CSV is actually refetchable. 5. Expand the unit test file: add a real-CSV regression test (expects 27+ HC.gov states with bronze ≤ total and no SBM states leaking in) and a negative test for the new ValueError. * Make domain_variable ordering deterministic and fix stale integration test Codex review on 8fd8990 found two issues: 1. ``tests/integration/test_database_build.py::test_state_aca_and_agi_targets_loaded`` still asserted legacy ``aca_ptc`` / ``person_count`` / ``adjusted_gross_income`` state targets that the deleted ``etl_aca_agi_state_targets.py`` used to load, so it would fail against the rebuilt DB. Rename and rewrite it as ``test_state_marketplace_targets_loaded`` that asserts the new APTC and bronze-selection targets land with the canonical alphabetical ``domain_variable`` strings. 2. The previous constraint-insertion-order workaround relied on SQLite's ``GROUP_CONCAT(DISTINCT ...)`` preserving insertion order, which is undocumented. Add ``ORDER BY`` to the ``domain_variable`` aggregation in the ``stratum_domain`` view so the canonical form is enforced at the view level, regardless of how callers insert constraints. Drop the now-obsolete ordering comment in ``etl_aca_marketplace.py``. * Use correlated subquery for domain_variable ordering (SQLite portability) The prior ``GROUP_CONCAT(DISTINCT ... ORDER BY ...)`` form requires SQLite >= 3.44 and failed on the Modal integration runner with ``sqlite3.OperationalError: near "ORDER": syntax error``. Replace with a correlated subquery that selects distinct constraint names ordered alphabetically and then concatenates them without an inner ORDER BY. Works on all supported SQLite versions and still produces the canonical form (e.g. ``selected_marketplace_plan_benchmark_ratio,used_aca_ptc``) regardless of constraint insertion order. Verified by running the real view against in-memory SQLite with non-alphabetical insert order; result matches the expected canonical string. * Restore etl_aca_agi_state_targets.py alongside new marketplace ETL The deletion in 8fd8990 was too aggressive. That file loaded three distinct target families into the calibration DB: 1. state-level ``aca_ptc`` spending targets (sourced from ``aca_spending_and_enrollment_2024.csv``) 2. state-level ``person_count`` enrollment targets (same source) 3. state-level AGI bracket targets (sourced from ``agi_state.csv``) This PR adds *new* marketplace APTC-count and bronze-count targets but does not replace the ACA spending/enrollment or AGI targets. Without them the calibrator has nothing to pin state-level ACA PTC spending, and ``test_aca_calibration`` / ``test_sparse_aca_calibration`` fail with >500% state deviations. Restore the file verbatim from the pre-deletion state, keep the ``CMS Marketplace`` source string it uses (re-added to the ``create_field_valid_values`` allowlist alongside the newer ``CMS 2024 OEP state metal status PUF`` source the marketplace ETL uses), re-add the Makefile invocation, and put its entry back in the integration-build script list ahead of the marketplace ETL. Keep the new ``test_state_marketplace_targets_loaded`` as a peer to the restored ``test_state_aca_and_agi_targets_loaded``. The long-term cleanup (migrating the spending/enrollment targets into the marketplace ETL or deprecating them) is a follow-up. --------- Co-authored-by: Max Ghenis <mghenis@gmail.com>
1 parent ad58017 commit 4f6324e

10 files changed

Lines changed: 731 additions & 7 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ node_modules
2727
!age_state.csv
2828
!agi_state.csv
2929
!soi_targets.csv
30+
!policyengine_us_data/storage/calibration_targets/aca_marketplace_state_metal_selection_2024.csv
3031
!policyengine_us_data/storage/social_security_aux.csv
3132
!policyengine_us_data/storage/SSPopJul_TR2024.csv
3233
!policyengine_us_data/storage/national_and_district_rents_2023.csv

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ database:
8888
python policyengine_us_data/db/etl_state_income_tax.py --year $(YEAR)
8989
python policyengine_us_data/db/etl_irs_soi.py --year $(YEAR)
9090
python policyengine_us_data/db/etl_aca_agi_state_targets.py --year $(YEAR)
91+
python policyengine_us_data/db/etl_aca_marketplace.py --year $(YEAR)
9192
python policyengine_us_data/db/etl_pregnancy.py --year $(YEAR)
9293
python policyengine_us_data/db/validate_database.py
9394

changelog.d/618.added.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Add an ACA marketplace ETL that loads state-level HC.gov bronze-plan
2+
selection targets for APTC recipients into the calibration database.

policyengine_us_data/calibration/target_config.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ include:
5959
geo_level: state
6060
domain_variable: tanf
6161

62+
# === STATE — ACA marketplace APTC and bronze-plan enrollment counts ===
63+
- variable: tax_unit_count
64+
geo_level: state
65+
domain_variable: used_aca_ptc
66+
- variable: tax_unit_count
67+
geo_level: state
68+
domain_variable: selected_marketplace_plan_benchmark_ratio,used_aca_ptc
69+
6270
# === STATE — fine AGI bracket targets (stubs 9/10 from in55cmcsv) ===
6371
- variable: person_count
6472
geo_level: state

policyengine_us_data/db/create_database_tables.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -341,12 +341,24 @@ def validate_parent_child_constraints(mapper, connection, target: Stratum):
341341
THEN sc.value END),
342342
'US'
343343
) AS geographic_id,
344-
GROUP_CONCAT(DISTINCT CASE
345-
WHEN sc.constraint_variable NOT IN (
346-
'state_fips', 'congressional_district_geoid',
347-
'tax_unit_is_filer', 'ucgid_str'
348-
) THEN sc.constraint_variable
349-
END) AS domain_variable
344+
-- Compute domain_variable via a correlated subquery so we can sort
345+
-- the distinct constraint names alphabetically before concatenation.
346+
-- We can't use `GROUP_CONCAT(DISTINCT ... ORDER BY ...)` because the
347+
-- `ORDER BY` form inside aggregates requires SQLite >= 3.44, and the
348+
-- Modal runner ships an older libsqlite.
349+
(
350+
SELECT GROUP_CONCAT(cv, ',')
351+
FROM (
352+
SELECT DISTINCT sc2.constraint_variable AS cv
353+
FROM stratum_constraints sc2
354+
WHERE sc2.stratum_id = t.stratum_id
355+
AND sc2.constraint_variable NOT IN (
356+
'state_fips', 'congressional_district_geoid',
357+
'tax_unit_is_filer', 'ucgid_str'
358+
)
359+
ORDER BY sc2.constraint_variable
360+
)
361+
) AS domain_variable
350362
FROM targets t
351363
LEFT JOIN stratum_constraints sc ON t.stratum_id = sc.stratum_id
352364
GROUP BY t.target_id, t.stratum_id, t.variable,

policyengine_us_data/db/create_field_valid_values.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def populate_field_valid_values(session: Session) -> None:
7070
("source", "Census ACS S0101", "survey"),
7171
("source", "IRS SOI", "administrative"),
7272
("source", "CMS Marketplace", "administrative"),
73+
("source", "CMS 2024 OEP state metal status PUF", "administrative"),
7374
("source", "CMS Medicaid", "administrative"),
7475
("source", "Census ACS S2704", "survey"),
7576
("source", "USDA FNS SNAP", "administrative"),
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from pathlib import Path
5+
6+
import pandas as pd
7+
from sqlmodel import Session, create_engine
8+
9+
from policyengine_us_data.calibration.calibration_utils import STATE_CODES
10+
from policyengine_us_data.db.create_database_tables import (
11+
Stratum,
12+
StratumConstraint,
13+
Target,
14+
)
15+
from policyengine_us_data.storage import CALIBRATION_FOLDER, STORAGE_FOLDER
16+
from policyengine_us_data.utils.db import etl_argparser, get_geographic_strata
17+
18+
logger = logging.getLogger(__name__)
19+
20+
# `selected_marketplace_plan_benchmark_ratio == 1.0` represents benchmark
21+
# silver coverage, so bronze plan selections are the subset below this ratio.
22+
BENCHMARK_SILVER_RATIO = 1.0
23+
24+
STATE_METAL_SELECTION_PATH = (
25+
CALIBRATION_FOLDER / "aca_marketplace_state_metal_selection_2024.csv"
26+
)
27+
28+
STATE_ABBR_TO_FIPS = {abbr: fips for fips, abbr in STATE_CODES.items()}
29+
30+
31+
def _extra_args(parser) -> None:
32+
parser.add_argument(
33+
"--state-metal-csv",
34+
type=Path,
35+
default=STATE_METAL_SELECTION_PATH,
36+
help=("State-metal CMS OEP proxy CSV. Default: %(default)s"),
37+
)
38+
39+
40+
def extract_aca_marketplace_state_metal_data(
41+
state_metal_csv_path: Path,
42+
) -> pd.DataFrame:
43+
"""Extract CMS marketplace state metal-status inputs from the checked-in CSV.
44+
45+
This ETL keeps an explicit extract step even though the source file already
46+
lives in the repository. The original CMS 2024 OEP state metal status PUF
47+
is not currently pulled from a stable direct-download endpoint in CI, so we
48+
store the normalized input CSV at
49+
`policyengine_us_data/storage/calibration_targets/aca_marketplace_state_metal_selection_2024.csv`.
50+
51+
Source (CMS Marketplace Open Enrollment Period Public Use Files):
52+
https://www.cms.gov/marketplace/resources/data/public-use-files
53+
54+
To reproduce or update that file:
55+
1. Download the CMS 2024 OEP State, Metal Level, and Enrollment Status PUF
56+
from the URL above.
57+
2. Preserve one row per state/platform/metal/enrollment-status combination.
58+
3. Keep the `state_code`, `platform`, `metal_level`,
59+
`enrollment_status`, `consumers`, and `aptc_consumers` columns.
60+
4. Save the normalized output back to `state_metal_csv_path`.
61+
"""
62+
return pd.read_csv(state_metal_csv_path)
63+
64+
65+
def build_state_marketplace_bronze_aptc_targets(
66+
state_metal_df: pd.DataFrame,
67+
) -> pd.DataFrame:
68+
"""
69+
Build HC.gov state bronze-selection targets among APTC consumers.
70+
71+
The 2024 CMS state-metal-status PUF exposes:
72+
- metal rows (`B`, `G`, `S`) with enrollment_status=`All`
73+
- aggregate rows (`All`) broken out by enrollment status (`01-atv`, etc.)
74+
75+
We use:
76+
- total APTC consumers = sum of `aptc_consumers` for `metal_level == All`
77+
across enrollment statuses
78+
- bronze APTC consumers = `aptc_consumers` on the bronze row
79+
"""
80+
df = state_metal_df.copy()
81+
df = df[df["platform"] == "HC.gov"].copy()
82+
83+
total_rows = df[
84+
(df["metal_level"] == "All") & (df["aptc_consumers"].notna())
85+
].copy()
86+
bronze_rows = df[
87+
(df["metal_level"] == "B")
88+
& (df["enrollment_status"] == "All")
89+
& (df["aptc_consumers"].notna())
90+
].copy()
91+
92+
total_aptc = total_rows.groupby("state_code", as_index=False).agg(
93+
marketplace_aptc_consumers=("aptc_consumers", "sum"),
94+
marketplace_consumers=("consumers", "sum"),
95+
)
96+
bronze_aptc = bronze_rows[["state_code", "aptc_consumers", "consumers"]].rename(
97+
columns={
98+
"aptc_consumers": "bronze_aptc_consumers",
99+
"consumers": "bronze_consumers",
100+
}
101+
)
102+
103+
result = total_aptc.merge(bronze_aptc, on="state_code", how="inner")
104+
result["state_fips"] = result["state_code"].map(STATE_ABBR_TO_FIPS)
105+
result = result[result["state_fips"].notna()].copy()
106+
result["state_fips"] = result["state_fips"].astype(int)
107+
invalid_bronze = (
108+
result["bronze_aptc_consumers"] > result["marketplace_aptc_consumers"]
109+
)
110+
if invalid_bronze.any():
111+
bad_states = result.loc[invalid_bronze, "state_code"].tolist()
112+
raise ValueError(
113+
"Bronze APTC consumers exceed total APTC consumers for states: "
114+
f"{bad_states}. Source CSV likely corrupted."
115+
)
116+
result["bronze_aptc_share"] = (
117+
result["bronze_aptc_consumers"] / result["marketplace_aptc_consumers"]
118+
)
119+
result.insert(0, "year", 2024)
120+
result.insert(1, "source", "cms_2024_oep_state_metal_status_puf")
121+
return result.sort_values("state_code").reset_index(drop=True)
122+
123+
124+
def load_state_marketplace_bronze_aptc_targets(
125+
targets_df: pd.DataFrame,
126+
year: int,
127+
) -> None:
128+
db_url = f"sqlite:///{STORAGE_FOLDER / 'calibration' / 'policy_data.db'}"
129+
engine = create_engine(db_url)
130+
131+
with Session(engine) as session:
132+
geo_strata = get_geographic_strata(session)
133+
134+
for row in targets_df.itertuples(index=False):
135+
state_fips = int(row.state_fips)
136+
parent_id = geo_strata["state"].get(state_fips)
137+
if parent_id is None:
138+
logger.warning(
139+
"No state geographic stratum for FIPS %s, skipping", state_fips
140+
)
141+
continue
142+
143+
# We intentionally do not subset to `tax_unit_is_filer == 1`.
144+
# These CMS targets describe marketplace coverage groups rather
145+
# than the IRS filer universe, so the closest calibration entity is
146+
# a tax unit with positive modeled APTC use.
147+
aptc_stratum = Stratum(
148+
parent_stratum_id=parent_id,
149+
notes=f"State FIPS {state_fips} Marketplace APTC recipients",
150+
)
151+
aptc_stratum.constraints_rel = [
152+
StratumConstraint(
153+
constraint_variable="state_fips",
154+
operation="==",
155+
value=str(state_fips),
156+
),
157+
StratumConstraint(
158+
constraint_variable="used_aca_ptc",
159+
operation=">",
160+
value="0",
161+
),
162+
]
163+
aptc_stratum.targets_rel.append(
164+
Target(
165+
# We use `tax_unit_count` rather than household/person
166+
# counts because insurance groups map most closely to
167+
# PolicyEngine tax units in the current calibration schema.
168+
variable="tax_unit_count",
169+
period=year,
170+
value=float(row.marketplace_aptc_consumers),
171+
active=True,
172+
source="CMS 2024 OEP state metal status PUF",
173+
notes="HC.gov APTC consumers across all enrollment statuses",
174+
)
175+
)
176+
session.add(aptc_stratum)
177+
session.flush()
178+
179+
bronze_stratum = Stratum(
180+
parent_stratum_id=aptc_stratum.stratum_id,
181+
notes=f"State FIPS {state_fips} Marketplace bronze APTC recipients",
182+
)
183+
bronze_stratum.constraints_rel = [
184+
StratumConstraint(
185+
constraint_variable="state_fips",
186+
operation="==",
187+
value=str(state_fips),
188+
),
189+
StratumConstraint(
190+
constraint_variable="selected_marketplace_plan_benchmark_ratio",
191+
operation="<",
192+
value=str(BENCHMARK_SILVER_RATIO),
193+
),
194+
StratumConstraint(
195+
constraint_variable="used_aca_ptc",
196+
operation=">",
197+
value="0",
198+
),
199+
]
200+
bronze_stratum.targets_rel.append(
201+
Target(
202+
variable="tax_unit_count",
203+
period=year,
204+
value=float(row.bronze_aptc_consumers),
205+
active=True,
206+
source="CMS 2024 OEP state metal status PUF",
207+
notes="HC.gov bronze plan selections among APTC consumers",
208+
)
209+
)
210+
session.add(bronze_stratum)
211+
session.flush()
212+
213+
session.commit()
214+
215+
216+
def main() -> None:
217+
args, year = etl_argparser(
218+
"ETL for ACA marketplace bronze-selection calibration targets",
219+
extra_args_fn=_extra_args,
220+
)
221+
222+
state_metal = extract_aca_marketplace_state_metal_data(args.state_metal_csv)
223+
targets_df = build_state_marketplace_bronze_aptc_targets(state_metal)
224+
if targets_df.empty:
225+
raise RuntimeError("No HC.gov marketplace bronze/APTC targets were generated.")
226+
227+
print(
228+
"Loading ACA marketplace bronze/APTC state targets for "
229+
f"{len(targets_df)} states from {args.state_metal_csv}"
230+
)
231+
load_state_marketplace_bronze_aptc_targets(targets_df, year)
232+
print("ACA marketplace bronze/APTC targets loaded.")
233+
234+
235+
if __name__ == "__main__":
236+
main()

0 commit comments

Comments
 (0)