Skip to content

Commit 7a11756

Browse files
authored
Merge pull request #850 from PolicyEngine/codex/issue-848-hourly-wage-diagnostics
Add hourly wage and employment income consistency diagnostics
2 parents 491ac09 + b7c0ab1 commit 7a11756

4 files changed

Lines changed: 319 additions & 0 deletions

File tree

changelog.d/848.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added hourly wage and annual employment income consistency diagnostics.

policyengine_us_data/calibration/sanity_checks.py

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@
1414

1515
logger = logging.getLogger(__name__)
1616

17+
WEEKS_IN_YEAR = 52
18+
STANDARD_HOURS_PER_WEEK = 40
19+
OVERTIME_RATE_MULTIPLIER = 1.5
20+
HOURLY_WAGE_INCOME_RELATIVE_TOLERANCE = 0.10
21+
HOURLY_WAGE_INCOME_MISMATCH_SHARE_WARN_THRESHOLD = 0.25
22+
HOURLY_WAGE_INCOME_MEAN_ABS_REL_ERROR_WARN_THRESHOLD = 0.20
23+
1724
KEY_MONETARY_VARS = [
1825
"employment_income",
1926
"adjusted_gross_income",
@@ -36,6 +43,162 @@
3643
]
3744

3845

46+
def _weighted_mean(values: np.ndarray, weights: np.ndarray) -> float:
47+
if values.size == 0 or weights.sum() <= 0:
48+
return 0.0
49+
return float(np.average(values, weights=weights))
50+
51+
52+
def _weighted_quantile(
53+
values: np.ndarray,
54+
weights: np.ndarray,
55+
quantile: float,
56+
) -> float:
57+
if values.size == 0 or weights.sum() <= 0:
58+
return 0.0
59+
60+
sorter = np.argsort(values)
61+
values = values[sorter]
62+
weights = weights[sorter]
63+
cumulative_weights = np.cumsum(weights)
64+
cutoff = quantile * cumulative_weights[-1]
65+
return float(values[np.searchsorted(cumulative_weights, cutoff, side="left")])
66+
67+
68+
def _format_hourly_wage_income_detail(
69+
*,
70+
comparable_count: int,
71+
comparable_weight: float,
72+
mismatch_count: int,
73+
mismatch_share: float,
74+
mean_abs_rel_error: float,
75+
p90_abs_rel_error: float,
76+
over_income_share: float,
77+
tolerance: float,
78+
) -> str:
79+
return (
80+
f"{mismatch_count:,}/{comparable_count:,} unweighted mismatches; "
81+
f"{mismatch_share:.1%} weighted mismatch share at "
82+
f">{tolerance:.0%} tolerance among {comparable_weight:,.0f} weighted workers; "
83+
f"mean absolute relative gap {mean_abs_rel_error:.1%}; "
84+
f"p90 absolute relative gap {p90_abs_rel_error:.1%}; "
85+
f"{over_income_share:.1%} imply annual wages above employment_income "
86+
f"by >{tolerance:.0%}"
87+
)
88+
89+
90+
def build_hourly_wage_income_consistency_diagnostics(
91+
employment_income: np.ndarray,
92+
hourly_wage: np.ndarray,
93+
hours_worked_last_week: np.ndarray,
94+
is_paid_hourly: np.ndarray,
95+
weights: np.ndarray | None = None,
96+
*,
97+
relative_tolerance: float = HOURLY_WAGE_INCOME_RELATIVE_TOLERANCE,
98+
mismatch_share_warn_threshold: float = (
99+
HOURLY_WAGE_INCOME_MISMATCH_SHARE_WARN_THRESHOLD
100+
),
101+
mean_abs_rel_error_warn_threshold: float = (
102+
HOURLY_WAGE_INCOME_MEAN_ABS_REL_ERROR_WARN_THRESHOLD
103+
),
104+
) -> List[dict]:
105+
"""Compare hourly facts with annual employment income.
106+
107+
Warns when more than 25 percent of comparable hourly workers differ by
108+
more than 10 percent, or when the weighted mean absolute relative gap
109+
exceeds 20 percent. These thresholds flag broad inconsistencies while
110+
allowing last-week hours and annual wages to differ for normal reasons.
111+
"""
112+
employment_income = np.asarray(employment_income, dtype=float)
113+
hourly_wage = np.asarray(hourly_wage, dtype=float)
114+
hours_worked_last_week = np.asarray(hours_worked_last_week, dtype=float)
115+
is_paid_hourly = np.asarray(is_paid_hourly, dtype=bool)
116+
117+
if weights is None:
118+
weights = np.ones_like(employment_income, dtype=float)
119+
else:
120+
weights = np.asarray(weights, dtype=float)
121+
122+
straight_time_hours = np.minimum(hours_worked_last_week, STANDARD_HOURS_PER_WEEK)
123+
overtime_hours = np.maximum(hours_worked_last_week - STANDARD_HOURS_PER_WEEK, 0)
124+
straight_time_equivalent_hours = WEEKS_IN_YEAR * (
125+
straight_time_hours + overtime_hours * OVERTIME_RATE_MULTIPLIER
126+
)
127+
implied_annual_wages = hourly_wage * straight_time_equivalent_hours
128+
129+
base_mask = (
130+
is_paid_hourly
131+
& (hourly_wage > 0)
132+
& (hours_worked_last_week > 0)
133+
& (employment_income > 0)
134+
& np.isfinite(implied_annual_wages)
135+
& np.isfinite(employment_income)
136+
& np.isfinite(weights)
137+
& (weights > 0)
138+
)
139+
140+
results = []
141+
subsets = [
142+
("hourly_wage_income_consistency", base_mask),
143+
(
144+
"hourly_wage_income_consistency_overtime",
145+
base_mask & (overtime_hours > 0),
146+
),
147+
]
148+
149+
for check_name, mask in subsets:
150+
if not mask.any():
151+
results.append(
152+
{
153+
"check": check_name,
154+
"status": "SKIP",
155+
"detail": "no comparable hourly workers",
156+
}
157+
)
158+
continue
159+
160+
rel_gap = (
161+
implied_annual_wages[mask] - employment_income[mask]
162+
) / employment_income[mask]
163+
subset_weights = weights[mask]
164+
mismatch = np.abs(rel_gap) >= relative_tolerance
165+
over_income = rel_gap >= relative_tolerance
166+
mismatch_share = _weighted_mean(mismatch.astype(float), subset_weights)
167+
mean_abs_rel_error = _weighted_mean(np.abs(rel_gap), subset_weights)
168+
p90_abs_rel_error = _weighted_quantile(
169+
np.abs(rel_gap),
170+
subset_weights,
171+
0.9,
172+
)
173+
over_income_share = _weighted_mean(
174+
over_income.astype(float),
175+
subset_weights,
176+
)
177+
178+
warn = (
179+
mismatch_share > mismatch_share_warn_threshold
180+
or mean_abs_rel_error > mean_abs_rel_error_warn_threshold
181+
)
182+
results.append(
183+
{
184+
"check": check_name,
185+
"status": "WARN" if warn else "PASS",
186+
"detail": _format_hourly_wage_income_detail(
187+
comparable_count=int(mask.sum()),
188+
comparable_weight=float(subset_weights.sum()),
189+
mismatch_count=int(mismatch.sum()),
190+
mismatch_share=mismatch_share,
191+
mean_abs_rel_error=mean_abs_rel_error,
192+
p90_abs_rel_error=p90_abs_rel_error,
193+
over_income_share=over_income_share,
194+
tolerance=relative_tolerance,
195+
),
196+
}
197+
)
198+
199+
return results
200+
201+
39202
def run_sanity_checks(
40203
h5_path: str,
41204
period: int = 2024,
@@ -61,6 +224,32 @@ def _get(f, path):
61224
except KeyError:
62225
return None
63226

227+
def _get_person_weights(f, period, person_count, household_weights):
228+
if household_weights is None:
229+
return None
230+
if len(household_weights) == person_count:
231+
return household_weights
232+
233+
person_hh_arr = _get(f, f"person_household_id/{period}")
234+
if person_hh_arr is None:
235+
person_hh_arr = _get(f, "person_household_id")
236+
hh_id_arr = _get(f, f"household_id/{period}")
237+
if hh_id_arr is None:
238+
hh_id_arr = _get(f, "household_id")
239+
if person_hh_arr is None or hh_id_arr is None:
240+
return None
241+
if len(hh_id_arr) != len(household_weights):
242+
return None
243+
244+
household_weight_by_id = dict(zip(hh_id_arr.tolist(), household_weights))
245+
try:
246+
return np.array(
247+
[household_weight_by_id[hh_id] for hh_id in person_hh_arr.tolist()],
248+
dtype=float,
249+
)
250+
except KeyError:
251+
return None
252+
64253
with h5py.File(h5_path, "r") as f:
65254
# 1. Weight non-negativity
66255
w_key = f"household_weight/{period}"
@@ -249,6 +438,48 @@ def _get(f, path):
249438
}
250439
)
251440

441+
employment_income = _get(f, f"employment_income/{period}")
442+
hourly_wage = _get(f, f"hourly_wage/{period}")
443+
hours_worked_last_week = _get(f, f"hours_worked_last_week/{period}")
444+
is_paid_hourly = _get(f, f"is_paid_hourly/{period}")
445+
hourly_inputs = [
446+
employment_income,
447+
hourly_wage,
448+
hours_worked_last_week,
449+
is_paid_hourly,
450+
]
451+
if any(value is None for value in hourly_inputs):
452+
results.append(
453+
{
454+
"check": "hourly_wage_income_consistency",
455+
"status": "SKIP",
456+
"detail": "missing one or more hourly wage consistency inputs",
457+
}
458+
)
459+
results.append(
460+
{
461+
"check": "hourly_wage_income_consistency_overtime",
462+
"status": "SKIP",
463+
"detail": "missing one or more hourly wage consistency inputs",
464+
}
465+
)
466+
else:
467+
person_weights = _get_person_weights(
468+
f,
469+
period,
470+
len(employment_income),
471+
weights,
472+
)
473+
results.extend(
474+
build_hourly_wage_income_consistency_diagnostics(
475+
employment_income=employment_income,
476+
hourly_wage=hourly_wage,
477+
hours_worked_last_week=hours_worked_last_week,
478+
is_paid_hourly=is_paid_hourly,
479+
weights=person_weights,
480+
)
481+
)
482+
252483
return results
253484

254485

tests/integration/test_enhanced_cps.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,31 @@ def test_ecps_mean_employment_income_reasonable(ecps_sim):
7979
)
8080

8181

82+
def test_ecps_hourly_wage_income_consistency_diagnostic_runs(ecps_sim):
83+
from policyengine_us_data.calibration.sanity_checks import (
84+
build_hourly_wage_income_consistency_diagnostics,
85+
)
86+
87+
diagnostics = build_hourly_wage_income_consistency_diagnostics(
88+
employment_income=ecps_sim.calculate("employment_income", map_to="person"),
89+
hourly_wage=ecps_sim.calculate("hourly_wage", map_to="person"),
90+
hours_worked_last_week=ecps_sim.calculate(
91+
"hours_worked_last_week",
92+
map_to="person",
93+
),
94+
is_paid_hourly=ecps_sim.calculate("is_paid_hourly", map_to="person"),
95+
weights=ecps_sim.calculate("household_weight", map_to="person"),
96+
)
97+
98+
by_check = {diagnostic["check"]: diagnostic for diagnostic in diagnostics}
99+
for check in (
100+
"hourly_wage_income_consistency",
101+
"hourly_wage_income_consistency_overtime",
102+
):
103+
assert by_check[check]["status"] in {"PASS", "WARN"}
104+
assert "weighted mismatch share" in by_check[check]["detail"]
105+
106+
82107
def test_ecps_file_size():
83108
from policyengine_us_data.storage import STORAGE_FOLDER
84109

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import h5py
2+
import numpy as np
3+
4+
from policyengine_us_data.calibration.sanity_checks import (
5+
build_hourly_wage_income_consistency_diagnostics,
6+
run_sanity_checks,
7+
)
8+
9+
10+
def _write_period_dataset(h5, name, values, period=2024):
11+
group = h5.create_group(name)
12+
group.create_dataset(str(period), data=np.asarray(values))
13+
14+
15+
def test_hourly_wage_income_consistency_warns_on_overtime_mismatch():
16+
diagnostics = build_hourly_wage_income_consistency_diagnostics(
17+
employment_income=np.array([65_000.0, 57_200.0, 40_000.0, 65_000.0]),
18+
hourly_wage=np.array([25.0, 20.0, 0.0, 25.0]),
19+
hours_worked_last_week=np.array([50.0, 50.0, 50.0, 50.0]),
20+
is_paid_hourly=np.array([True, True, True, False]),
21+
weights=np.array([2.0, 1.0, 1.0, 1.0]),
22+
)
23+
24+
by_check = {diagnostic["check"]: diagnostic for diagnostic in diagnostics}
25+
26+
assert by_check["hourly_wage_income_consistency"]["status"] == "WARN"
27+
assert by_check["hourly_wage_income_consistency_overtime"]["status"] == "WARN"
28+
assert (
29+
"66.7% weighted mismatch share"
30+
in by_check["hourly_wage_income_consistency_overtime"]["detail"]
31+
)
32+
assert (
33+
"imply annual wages above employment_income"
34+
in by_check["hourly_wage_income_consistency_overtime"]["detail"]
35+
)
36+
37+
38+
def test_hourly_wage_income_consistency_passes_when_hourly_facts_reconcile():
39+
diagnostics = build_hourly_wage_income_consistency_diagnostics(
40+
employment_income=np.array([57_200.0, 41_600.0]),
41+
hourly_wage=np.array([20.0, 20.0]),
42+
hours_worked_last_week=np.array([50.0, 40.0]),
43+
is_paid_hourly=np.array([True, True]),
44+
)
45+
46+
assert [diagnostic["status"] for diagnostic in diagnostics] == ["PASS", "PASS"]
47+
48+
49+
def test_run_sanity_checks_adds_hourly_wage_income_consistency(tmp_path):
50+
h5_path = tmp_path / "sample.h5"
51+
with h5py.File(h5_path, "w") as h5:
52+
_write_period_dataset(h5, "household_weight", [2.0, 1.0])
53+
_write_period_dataset(h5, "employment_income", [65_000.0, 57_200.0])
54+
_write_period_dataset(h5, "hourly_wage", [25.0, 20.0])
55+
_write_period_dataset(h5, "hours_worked_last_week", [50.0, 50.0])
56+
_write_period_dataset(h5, "is_paid_hourly", [True, True])
57+
58+
diagnostics = run_sanity_checks(str(h5_path), period=2024)
59+
by_check = {diagnostic["check"]: diagnostic for diagnostic in diagnostics}
60+
61+
assert by_check["hourly_wage_income_consistency"]["status"] == "WARN"
62+
assert by_check["hourly_wage_income_consistency_overtime"]["status"] == "WARN"

0 commit comments

Comments
 (0)