Skip to content

Commit d286241

Browse files
authored
Merge pull request #570 from PolicyEngine/add-dataset-sanity-tests
Harden data pipeline against corrupted dataset uploads
2 parents bebb787 + f652baf commit d286241

20 files changed

Lines changed: 10968 additions & 5 deletions

Makefile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.PHONY: all format test install download upload docker documentation data calibrate publish-local-area clean build paper clean-paper presentations database database-refresh promote-database promote-dataset
1+
.PHONY: all format test install download upload docker documentation data validate-data calibrate publish-local-area clean build paper clean-paper presentations database database-refresh promote-database promote-dataset
22

33
HF_CLONE_DIR ?= $(HOME)/huggingface/policyengine-us-data
44

@@ -25,7 +25,7 @@ upload:
2525

2626
docker:
2727
docker buildx build --platform linux/amd64 . -t policyengine-us-data:latest
28-
28+
2929
documentation:
3030
cd docs && \
3131
rm -rf _build .jupyter_cache && \
@@ -101,6 +101,9 @@ calibrate: data
101101
publish-local-area:
102102
python policyengine_us_data/datasets/cps/local_area_calibration/publish_local_area.py
103103

104+
validate-data:
105+
python -c "from policyengine_us_data.storage.upload_completed_datasets import validate_all_datasets; validate_all_datasets()"
106+
104107
clean:
105108
rm -f policyengine_us_data/storage/*.h5
106109
rm -f policyengine_us_data/storage/*.db
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Hardened data pipeline against corrupted dataset uploads: pre-upload validation gate, post-generation assertions in enhanced CPS and sparse builders, CI workflow safety guards, file size checks, and comprehensive sanity tests for all dataset variants (5 layers of defense).

oregon_ctc_analysis.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
"""
2+
Oregon Child Tax Credit Analysis by State Senate District
3+
4+
Calculates the impact of doubling Oregon's Young Child Tax Credit (or_ctc)
5+
by State Legislative District Upper (SLDU) - i.e., State Senate districts.
6+
"""
7+
8+
import numpy as np
9+
import pandas as pd
10+
from pathlib import Path
11+
from policyengine_us import Microsimulation
12+
from policyengine_core.reforms import Reform
13+
14+
# Local imports
15+
from policyengine_us_data.datasets.cps.local_area_calibration.block_assignment import (
16+
assign_geography_for_cd,
17+
load_block_crosswalk,
18+
)
19+
from policyengine_us_data.storage import STORAGE_FOLDER
20+
21+
# Oregon congressional districts (119th Congress)
22+
# Oregon has 6 CDs, geoid format: state_fips * 100 + district
23+
# Oregon FIPS = 41, so: 4101, 4102, 4103, 4104, 4105, 4106
24+
OREGON_CD_GEOIDS = [4101, 4102, 4103, 4104, 4105, 4106]
25+
26+
27+
def load_district_data(cd_geoid: int) -> dict:
28+
"""Load household data from a district H5 file."""
29+
h5_path = STORAGE_FOLDER / "districts" / f"OR-{cd_geoid % 100:02d}.h5"
30+
if not h5_path.exists():
31+
raise FileNotFoundError(f"District file not found: {h5_path}")
32+
33+
import h5py
34+
35+
data = {}
36+
with h5py.File(h5_path, "r") as f:
37+
# Get key variables we need
38+
for var in [
39+
"household_weight",
40+
"household_id",
41+
"person_id",
42+
"age",
43+
"is_tax_unit_head",
44+
"tax_unit_id",
45+
]:
46+
if var in f:
47+
# Handle year dimension if present
48+
arr = f[var][:]
49+
if len(arr.shape) > 1:
50+
arr = arr[:, 0] # Take first year
51+
data[var] = arr
52+
return data
53+
54+
55+
def run_oregon_ctc_analysis():
56+
"""Run the Oregon CTC analysis by state senate district."""
57+
print("=" * 60)
58+
print("Oregon Child Tax Credit Analysis by State Senate District")
59+
print("=" * 60)
60+
61+
# Load block crosswalk for SLDU lookups
62+
print("\nLoading block crosswalk...")
63+
crosswalk = load_block_crosswalk()
64+
oregon_blocks = crosswalk[crosswalk["block_geoid"].str[:2] == "41"]
65+
print(f" Oregon blocks: {len(oregon_blocks):,}")
66+
print(f" Unique SLDUs: {oregon_blocks['sldu'].nunique()}")
67+
68+
# Results accumulator
69+
results_by_sldu = {}
70+
71+
print("\nProcessing Oregon congressional districts...")
72+
73+
for cd_geoid in OREGON_CD_GEOIDS:
74+
cd_name = f"OR-{cd_geoid % 100:02d}"
75+
print(f"\n Processing {cd_name}...")
76+
77+
# Load district data
78+
h5_path = STORAGE_FOLDER / "districts" / f"{cd_name}.h5"
79+
if not h5_path.exists():
80+
print(f" Skipping - file not found")
81+
continue
82+
83+
# Run microsimulation for this district
84+
# Baseline
85+
baseline = Microsimulation(dataset=str(h5_path))
86+
baseline_ctc = baseline.calculate("or_ctc", 2024)
87+
baseline_weights = baseline.calculate("household_weight", 2024)
88+
89+
# Reform: double the OR CTC max amounts
90+
# or_young_child_tax_credit_max is the parameter
91+
def double_or_ctc(parameters):
92+
# Double the max credit amount
93+
or_ctc = parameters.gov.states.or_.tax.income.credits.ctc
94+
or_ctc.amount.update(
95+
start=pd.Timestamp("2024-01-01"),
96+
stop=pd.Timestamp("2100-12-31"),
97+
value=or_ctc.amount("2024-01-01") * 2,
98+
)
99+
return parameters
100+
101+
class DoubleORCTC(Reform):
102+
def apply(self):
103+
self.modify_parameters(double_or_ctc)
104+
105+
reform = Microsimulation(dataset=str(h5_path), reform=DoubleORCTC)
106+
reform_ctc = reform.calculate("or_ctc", 2024)
107+
108+
# Get number of households for block assignment
109+
n_households = len(baseline_weights)
110+
print(f" Households: {n_households:,}")
111+
112+
# Assign blocks and get SLDU for each household
113+
geo = assign_geography_for_cd(
114+
cd_geoid=str(cd_geoid),
115+
n_households=n_households,
116+
seed=cd_geoid, # Reproducible
117+
)
118+
119+
sldu_assignments = geo["sldu"]
120+
121+
# Calculate impact per household
122+
impact = reform_ctc - baseline_ctc
123+
124+
# Aggregate by SLDU
125+
unique_sldus = np.unique(sldu_assignments[sldu_assignments != ""])
126+
127+
for sldu in unique_sldus:
128+
mask = sldu_assignments == sldu
129+
sldu_impact = np.sum(impact[mask] * baseline_weights[mask])
130+
sldu_baseline = np.sum(baseline_ctc[mask] * baseline_weights[mask])
131+
sldu_reform = np.sum(reform_ctc[mask] * baseline_weights[mask])
132+
sldu_hh = np.sum(mask)
133+
sldu_weighted_hh = np.sum(baseline_weights[mask])
134+
135+
if sldu not in results_by_sldu:
136+
results_by_sldu[sldu] = {
137+
"baseline_ctc": 0,
138+
"reform_ctc": 0,
139+
"impact": 0,
140+
"households": 0,
141+
"weighted_households": 0,
142+
}
143+
144+
results_by_sldu[sldu]["baseline_ctc"] += sldu_baseline
145+
results_by_sldu[sldu]["reform_ctc"] += sldu_reform
146+
results_by_sldu[sldu]["impact"] += sldu_impact
147+
results_by_sldu[sldu]["households"] += sldu_hh
148+
results_by_sldu[sldu]["weighted_households"] += sldu_weighted_hh
149+
150+
# Create results DataFrame
151+
print("\n" + "=" * 60)
152+
print("RESULTS: Impact of Doubling Oregon CTC by State Senate District")
153+
print("=" * 60)
154+
155+
df = pd.DataFrame.from_dict(results_by_sldu, orient="index")
156+
df.index.name = "sldu"
157+
df = df.reset_index()
158+
159+
# Convert to millions
160+
df["baseline_ctc_millions"] = df["baseline_ctc"] / 1e6
161+
df["reform_ctc_millions"] = df["reform_ctc"] / 1e6
162+
df["impact_millions"] = df["impact"] / 1e6
163+
164+
# Sort by impact
165+
df = df.sort_values("impact_millions", ascending=False)
166+
167+
# Display results
168+
print(
169+
f"\n{'SLDU':<8} {'Baseline':>12} {'Reform':>12} {'Impact':>12} {'Households':>12}"
170+
)
171+
print(f"{'':8} {'($M)':>12} {'($M)':>12} {'($M)':>12} {'(weighted)':>12}")
172+
print("-" * 60)
173+
174+
for _, row in df.iterrows():
175+
print(
176+
f"{row['sldu']:<8} "
177+
f"{row['baseline_ctc_millions']:>12.2f} "
178+
f"{row['reform_ctc_millions']:>12.2f} "
179+
f"{row['impact_millions']:>12.2f} "
180+
f"{row['weighted_households']:>12,.0f}"
181+
)
182+
183+
print("-" * 60)
184+
total_baseline = df["baseline_ctc_millions"].sum()
185+
total_reform = df["reform_ctc_millions"].sum()
186+
total_impact = df["impact_millions"].sum()
187+
total_hh = df["weighted_households"].sum()
188+
print(
189+
f"{'TOTAL':<8} {total_baseline:>12.2f} {total_reform:>12.2f} "
190+
f"{total_impact:>12.2f} {total_hh:>12,.0f}"
191+
)
192+
193+
# Save to CSV
194+
output_path = Path("oregon_ctc_by_sldu.csv")
195+
df.to_csv(output_path, index=False)
196+
print(f"\nResults saved to: {output_path}")
197+
198+
return df
199+
200+
201+
if __name__ == "__main__":
202+
run_oregon_ctc_analysis()

policyengine_us_data/datasets/cps/enhanced_cps.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,31 @@ def generate(self):
201201
)
202202
data["household_weight"][year] = optimised_weights
203203

204+
# Validate dense weights
205+
w = optimised_weights
206+
if np.any(np.isnan(w)):
207+
raise ValueError(
208+
f"Year {year}: household_weight contains NaN values"
209+
)
210+
if np.any(w < 0):
211+
raise ValueError(
212+
f"Year {year}: household_weight contains negative values"
213+
)
214+
weighted_hh_count = float(np.sum(w))
215+
if not (1e8 <= weighted_hh_count <= 2e8):
216+
raise ValueError(
217+
f"Year {year}: weighted household count "
218+
f"{weighted_hh_count:,.0f} outside expected range "
219+
f"[100M, 200M]"
220+
)
221+
logging.info(
222+
f"Year {year}: weights validated — "
223+
f"{weighted_hh_count:,.0f} weighted households, "
224+
f"{int(np.sum(w > 0))} non-zero"
225+
)
226+
227+
logging.info("Post-generation weight validation passed")
228+
204229
self.save_dataset(data)
205230

206231

policyengine_us_data/datasets/cps/small_enhanced_cps.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import pandas as pd
23
import numpy as np
34
import h5py
@@ -17,6 +18,19 @@ def create_small_ecps():
1718
)
1819
simulation.subsample(1_000)
1920

21+
# Basic validation that subsample has reasonable data
22+
weights = simulation.calculate("household_weight").values
23+
if np.all(weights == 0):
24+
raise ValueError(
25+
"create_small_ecps: all household weights are zero "
26+
"after subsample"
27+
)
28+
logging.info(
29+
f"create_small_ecps: subsample has "
30+
f"{len(weights)} households, "
31+
f"{int(np.sum(weights > 0))} with non-zero weight"
32+
)
33+
2034
data = {}
2135
for variable in simulation.tax_benefit_system.variables:
2236
data[variable] = {}
@@ -75,6 +89,16 @@ def create_sparse_ecps():
7589
h_ids = h_ids[h_weights > 0]
7690
h_weights = h_weights[h_weights > 0]
7791

92+
if len(h_ids) < 1000:
93+
raise ValueError(
94+
f"create_sparse_ecps: only {len(h_ids)} households with "
95+
f"non-zero weight (expected > 1000)"
96+
)
97+
logging.info(
98+
f"create_sparse_ecps: {len(h_ids)} households after "
99+
f"zero-weight filtering"
100+
)
101+
78102
subset_df = df[df[df_household_id_column].isin(h_ids)].copy()
79103

80104
# Update the dataset and rebuild the simulation
@@ -104,12 +128,38 @@ def create_sparse_ecps():
104128
if len(data[variable]) == 0:
105129
del data[variable]
106130

107-
with h5py.File(STORAGE_FOLDER / "sparse_enhanced_cps_2024.h5", "w") as f:
131+
# Validate critical variables exist before writing
132+
critical_vars = [
133+
"household_weight",
134+
"employment_income",
135+
"household_id",
136+
"person_id",
137+
]
138+
missing = [v for v in critical_vars if v not in data]
139+
if missing:
140+
raise ValueError(
141+
f"create_sparse_ecps: missing critical variables: {missing}"
142+
)
143+
logging.info(f"create_sparse_ecps: data dict has {len(data)} variables")
144+
145+
output_path = STORAGE_FOLDER / "sparse_enhanced_cps_2024.h5"
146+
with h5py.File(output_path, "w") as f:
108147
for variable, periods in data.items():
109148
grp = f.create_group(variable)
110149
for period, values in periods.items():
111150
grp.create_dataset(str(period), data=values)
112151

152+
file_size = os.path.getsize(output_path)
153+
if file_size < 1_000_000:
154+
raise ValueError(
155+
f"create_sparse_ecps: output file only {file_size:,} bytes "
156+
f"(expected > 1MB)"
157+
)
158+
logging.info(
159+
f"create_sparse_ecps: wrote {file_size / 1e6:.1f}MB to "
160+
f"{output_path}"
161+
)
162+
113163

114164
if __name__ == "__main__":
115165
create_small_ecps()

policyengine_us_data/storage/calibration/raw_inputs/acs5_congressional_districts_2024.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

policyengine_us_data/storage/calibration/raw_inputs/acs_S0101_district_2024.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

policyengine_us_data/storage/calibration/raw_inputs/acs_S0101_national_2024.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

policyengine_us_data/storage/calibration/raw_inputs/acs_S0101_state_2024.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

policyengine_us_data/storage/calibration/raw_inputs/acs_S2201_district_2024.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)