Skip to content

Commit 38717c5

Browse files
authored
Add income target wiring preflight tests (#1064)
* Add income target wiring tests * Test publication preflight script
1 parent 75363bd commit 38717c5

7 files changed

Lines changed: 548 additions & 4 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add CI coverage for income target wiring and a local publication preflight script for built enhanced CPS artifacts.

docs/engineering/skills/pipeline_operations.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,22 @@ repository root.
119119
may show a running run with no durable error. In that case, report the last
120120
completed/running manifest and then use Modal dashboard logs as secondary
121121
evidence.
122+
123+
## Local Publication Preflight
124+
125+
When you already have a locally built or checkpointed
126+
`enhanced_cps_2024.h5`, run the publication preflight before launching or
127+
resuming the long local-area publication stages:
128+
129+
```bash
130+
uv run python scripts/run_publication_preflight.py \
131+
--enhanced-cps /path/to/enhanced_cps_2024.h5 \
132+
--calibration-log /path/to/calibration_log.csv
133+
```
134+
135+
This reuses the upload dataset contract, computes baseline SPM, checks
136+
`employment_income` against the BEA NIPA wages target with a tight tolerance,
137+
and runs final-epoch JCT diagnostics plus ACA/Medicaid state checks unless
138+
explicitly skipped. Do not treat a completed local data build as publication
139+
ready until this preflight or the equivalent Stage 1 publication validation has
140+
passed.

policyengine_us_data/storage/upload_completed_datasets.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,10 @@ class MicrosimulationAggregateCheck:
129129

130130
# Aggregate thresholds for broad sanity checks (year 2024).
131131
MIN_PLAUSIBLE_EMPLOYMENT_INCOME_SUM = 5e12 # $5 trillion
132-
NIPA_EMPLOYMENT_INCOME_TOLERANCE = 0.10
132+
# This is a publication gate, not a broad plausibility check: enhanced CPS
133+
# calibration should hit the BEA NIPA wages target closely enough that missing
134+
# target wiring fails before local-area outputs are built.
135+
NIPA_EMPLOYMENT_INCOME_TOLERANCE = 0.01
133136
MIN_ENHANCED_CPS_EMPLOYMENT_INCOME_SUM = BEA_NIPA_WAGES_AND_SALARIES_2024 * (
134137
1 - NIPA_EMPLOYMENT_INCOME_TOLERANCE
135138
)
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
"""Run fast artifact checks before launching the full publication pipeline."""
2+
3+
from __future__ import annotations
4+
5+
import argparse
6+
import json
7+
import sys
8+
from dataclasses import asdict, dataclass
9+
from pathlib import Path
10+
11+
import numpy as np
12+
import pandas as pd
13+
from policyengine_core.data import Dataset
14+
from policyengine_us import Microsimulation
15+
16+
from policyengine_us_data.db.etl_national_targets import (
17+
BEA_NIPA_WAGES_AND_SALARIES_2024,
18+
)
19+
from policyengine_us_data.storage import STORAGE_FOLDER
20+
from policyengine_us_data.storage.upload_completed_datasets import (
21+
DatasetValidationError,
22+
validate_dataset,
23+
)
24+
from policyengine_us_data.utils import ABSOLUTE_ERROR_SCALE_TARGETS
25+
26+
DEFAULT_ENHANCED_CPS_PATH = STORAGE_FOLDER / "enhanced_cps_2024.h5"
27+
DEFAULT_CALIBRATION_LOG_PATH = Path("calibration_log.csv")
28+
DEFAULT_PERIOD = 2024
29+
DEFAULT_EMPLOYMENT_TOLERANCE = 0.01
30+
DEFAULT_FINAL_EPOCH_TARGET_SHARE = 60.0
31+
MEDICAID_VALIDATION_PERIOD = 2025
32+
MEDICAID_STATE_TOLERANCE = 10.0
33+
REPO_ROOT = Path(__file__).resolve().parents[1]
34+
35+
36+
@dataclass(frozen=True)
37+
class PreflightResult:
38+
enhanced_cps_path: str
39+
calibration_log_path: str | None
40+
period: int
41+
baseline_spm: float
42+
employment_income: float
43+
employment_income_target: float
44+
employment_income_relative_error: float
45+
dataset_validation_passed: bool
46+
jct_diagnostics_passed: bool | None
47+
final_epoch_target_share_within_tolerance: float | None
48+
aca_state_calibration_passed: bool | None
49+
medicaid_state_calibration_passed: bool | None
50+
51+
52+
def parse_args() -> argparse.Namespace:
53+
parser = argparse.ArgumentParser(
54+
description=(
55+
"Validate a built enhanced CPS artifact before spending publication "
56+
"time on local-area outputs."
57+
)
58+
)
59+
parser.add_argument(
60+
"--enhanced-cps",
61+
type=Path,
62+
default=DEFAULT_ENHANCED_CPS_PATH,
63+
help="Path to enhanced_cps_2024.h5.",
64+
)
65+
parser.add_argument(
66+
"--calibration-log",
67+
type=Path,
68+
default=DEFAULT_CALIBRATION_LOG_PATH,
69+
help="Path to calibration_log.csv.",
70+
)
71+
parser.add_argument(
72+
"--period",
73+
type=int,
74+
default=DEFAULT_PERIOD,
75+
help="PolicyEngine year for SPM and income aggregates.",
76+
)
77+
parser.add_argument(
78+
"--employment-tolerance",
79+
type=float,
80+
default=DEFAULT_EMPLOYMENT_TOLERANCE,
81+
help="Allowed relative error against the BEA NIPA wages target.",
82+
)
83+
parser.add_argument(
84+
"--skip-dataset-validation",
85+
action="store_true",
86+
help="Skip upload-contract validation of the enhanced CPS H5.",
87+
)
88+
parser.add_argument(
89+
"--skip-calibration-log",
90+
action="store_true",
91+
help="Skip calibration_log.csv diagnostics.",
92+
)
93+
parser.add_argument(
94+
"--skip-state-health",
95+
action="store_true",
96+
help="Skip ACA and Medicaid state calibration checks.",
97+
)
98+
parser.add_argument(
99+
"--json-output",
100+
type=Path,
101+
default=None,
102+
help="Optional path for a JSON summary.",
103+
)
104+
return parser.parse_args()
105+
106+
107+
def load_simulation(path: Path) -> Microsimulation:
108+
return Microsimulation(dataset=Dataset.from_file(path))
109+
110+
111+
def ensure_repo_root_on_path() -> None:
112+
if str(REPO_ROOT) not in sys.path:
113+
sys.path.insert(0, str(REPO_ROOT))
114+
115+
116+
def calculate_baseline_spm(sim: Microsimulation, period: int) -> float:
117+
try:
118+
return float(sim.calculate("in_poverty", period, map_to="person").mean())
119+
except ValueError:
120+
return float(sim.calculate("person_in_poverty", period, map_to="person").mean())
121+
122+
123+
def calculate_employment_income(sim: Microsimulation, period: int) -> float:
124+
return float(sim.calculate("employment_income", period, map_to="person").sum())
125+
126+
127+
def validate_employment_income(
128+
value: float,
129+
*,
130+
target: float,
131+
tolerance: float,
132+
) -> float:
133+
relative_error = (value - target) / target
134+
if abs(relative_error) > tolerance:
135+
raise AssertionError(
136+
"employment_income is outside the NIPA wages tolerance: "
137+
f"value={value:,.0f}, target={target:,.0f}, "
138+
f"relative_error={relative_error:.4%}, tolerance={tolerance:.2%}"
139+
)
140+
return relative_error
141+
142+
143+
def final_epoch_target_share_within_tolerance(calibration_log: pd.DataFrame) -> float:
144+
final_epoch = calibration_log["epoch"].max()
145+
final_rows = calibration_log[calibration_log["epoch"] == final_epoch].copy()
146+
if final_rows.empty:
147+
raise AssertionError("No final-epoch calibration diagnostics found.")
148+
149+
tolerance = 0.10 * final_rows["target"].abs()
150+
for target_name, scale in ABSOLUTE_ERROR_SCALE_TARGETS.items():
151+
tolerance.loc[final_rows["target_name"] == target_name] = 0.10 * scale
152+
return float((final_rows["abs_error"] <= tolerance).mean() * 100)
153+
154+
155+
def validate_calibration_log(path: Path) -> float:
156+
ensure_repo_root_on_path()
157+
from validation.stage_1.jct_calibration import (
158+
assert_no_unexpected_high_error_jct_diagnostics,
159+
)
160+
161+
calibration_log = pd.read_csv(path)
162+
assert_no_unexpected_high_error_jct_diagnostics(calibration_log)
163+
share = final_epoch_target_share_within_tolerance(calibration_log)
164+
if share <= DEFAULT_FINAL_EPOCH_TARGET_SHARE:
165+
raise AssertionError(
166+
"Too few final-epoch calibration targets are within tolerance: "
167+
f"{share:.1f}% <= {DEFAULT_FINAL_EPOCH_TARGET_SHARE:.1f}%"
168+
)
169+
return share
170+
171+
172+
def validate_medicaid_state_calibration(sim: Microsimulation) -> None:
173+
targets_path = (
174+
Path("policyengine_us_data/storage/calibration_targets")
175+
/ f"medicaid_enrollment_{MEDICAID_VALIDATION_PERIOD}.csv"
176+
)
177+
targets = pd.read_csv(targets_path)
178+
state_code_hh = sim.calculate("state_code", map_to="household").values
179+
medicaid_enrolled = sim.calculate(
180+
"medicaid_enrolled",
181+
MEDICAID_VALIDATION_PERIOD,
182+
map_to="household",
183+
)
184+
185+
failures = []
186+
for row in targets.itertuples(index=False):
187+
target_enrollment = float(row.enrollment)
188+
simulated = float(medicaid_enrolled[state_code_hh == row.state].sum())
189+
pct_error = (
190+
np.inf
191+
if target_enrollment <= 0
192+
else abs(simulated - target_enrollment) / target_enrollment
193+
)
194+
if pct_error > MEDICAID_STATE_TOLERANCE:
195+
failures.append(
196+
f"{row.state}: simulated {simulated:,.0f}, "
197+
f"target {target_enrollment:,.0f}, error {pct_error:.2%}"
198+
)
199+
200+
if failures:
201+
raise AssertionError(
202+
"One or more Medicaid state targets exceeded tolerance of "
203+
f"{MEDICAID_STATE_TOLERANCE:.0%}:\n" + "\n".join(failures)
204+
)
205+
206+
207+
def write_summary(result: PreflightResult, path: Path | None) -> None:
208+
if path is None:
209+
return
210+
path.parent.mkdir(parents=True, exist_ok=True)
211+
path.write_text(json.dumps(asdict(result), indent=2, sort_keys=True) + "\n")
212+
213+
214+
def main() -> None:
215+
args = parse_args()
216+
enhanced_cps_path = args.enhanced_cps.expanduser().resolve()
217+
calibration_log_path = args.calibration_log.expanduser().resolve()
218+
219+
if not enhanced_cps_path.exists():
220+
raise FileNotFoundError(enhanced_cps_path)
221+
222+
dataset_validation_passed = False
223+
if not args.skip_dataset_validation:
224+
try:
225+
validate_dataset(enhanced_cps_path)
226+
except DatasetValidationError:
227+
raise
228+
dataset_validation_passed = True
229+
230+
sim = load_simulation(enhanced_cps_path)
231+
baseline_spm = calculate_baseline_spm(sim, args.period)
232+
employment_income = calculate_employment_income(sim, args.period)
233+
employment_relative_error = validate_employment_income(
234+
employment_income,
235+
target=BEA_NIPA_WAGES_AND_SALARIES_2024,
236+
tolerance=args.employment_tolerance,
237+
)
238+
239+
jct_diagnostics_passed = None
240+
target_share = None
241+
if not args.skip_calibration_log:
242+
if not calibration_log_path.exists():
243+
raise FileNotFoundError(calibration_log_path)
244+
target_share = validate_calibration_log(calibration_log_path)
245+
jct_diagnostics_passed = True
246+
247+
aca_state_calibration_passed = None
248+
medicaid_state_calibration_passed = None
249+
if not args.skip_state_health:
250+
ensure_repo_root_on_path()
251+
from validation.stage_1.aca_calibration import assert_aca_ptc_calibration
252+
253+
assert_aca_ptc_calibration(sim, emit=print)
254+
aca_state_calibration_passed = True
255+
validate_medicaid_state_calibration(sim)
256+
medicaid_state_calibration_passed = True
257+
258+
result = PreflightResult(
259+
enhanced_cps_path=str(enhanced_cps_path),
260+
calibration_log_path=(
261+
str(calibration_log_path) if not args.skip_calibration_log else None
262+
),
263+
period=args.period,
264+
baseline_spm=baseline_spm,
265+
employment_income=employment_income,
266+
employment_income_target=BEA_NIPA_WAGES_AND_SALARIES_2024,
267+
employment_income_relative_error=employment_relative_error,
268+
dataset_validation_passed=dataset_validation_passed,
269+
jct_diagnostics_passed=jct_diagnostics_passed,
270+
final_epoch_target_share_within_tolerance=target_share,
271+
aca_state_calibration_passed=aca_state_calibration_passed,
272+
medicaid_state_calibration_passed=medicaid_state_calibration_passed,
273+
)
274+
write_summary(result, args.json_output)
275+
276+
print("\nPublication preflight passed.")
277+
print(f" enhanced CPS: {enhanced_cps_path}")
278+
print(f" baseline SPM ({args.period}): {baseline_spm:.6f}")
279+
print(f" employment_income ({args.period}): {employment_income:,.0f}")
280+
print(f" employment_income vs NIPA wages: {employment_relative_error:.4%}")
281+
if target_share is not None:
282+
print(
283+
f" final-epoch calibration targets within tolerance: {target_share:.1f}%"
284+
)
285+
286+
287+
if __name__ == "__main__":
288+
main()

0 commit comments

Comments
 (0)