Skip to content
This repository was archived by the owner on Jun 19, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/disability-benefit-categories.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Map reported disability benefit amounts to category inputs in the data pipeline.
17 changes: 14 additions & 3 deletions policyengine_uk_data/datasets/create_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def main():
assert_local_build_environment()

from policyengine_uk.data import UKSingleYearDataset
from policyengine_uk_data.datasets.disability_benefits import (
strip_internal_disability_reported_amounts,
)
from policyengine_uk_data.datasets.frs import create_frs
from policyengine_uk_data.storage import STORAGE_FOLDER
from policyengine_uk_data.utils.progress import (
Expand Down Expand Up @@ -79,8 +82,11 @@ def main():
frs = create_frs(
raw_frs_folder=STORAGE_FOLDER / "frs_2023_24",
year=2023,
include_internal_disability_reported_amounts=True,
)
strip_internal_disability_reported_amounts(frs).save(
STORAGE_FOLDER / "frs_2023_24.h5"
)
frs.save(STORAGE_FOLDER / "frs_2023_24.h5")
update_dataset("Create base FRS dataset", "completed")

# Import imputation functions
Expand Down Expand Up @@ -212,7 +218,9 @@ def main():
update_dataset("Downrate to 2023", "completed")

update_dataset("Save final dataset", "processing")
frs_calibrated.save(STORAGE_FOLDER / "enhanced_frs_2023_24.h5")
strip_internal_disability_reported_amounts(frs_calibrated).save(
STORAGE_FOLDER / "enhanced_frs_2023_24.h5"
)
update_dataset("Save final dataset", "completed")

# Create tiny (n=1000 households) versions for testing
Expand All @@ -225,7 +233,10 @@ def main():
tiny_frs = subsample_dataset(frs_base, TINY_SIZE)
tiny_frs.save(STORAGE_FOLDER / "frs_2023_24_tiny.h5")

tiny_enhanced = subsample_dataset(frs_calibrated, TINY_SIZE)
tiny_enhanced = subsample_dataset(
strip_internal_disability_reported_amounts(frs_calibrated),
TINY_SIZE,
)
tiny_enhanced.save(STORAGE_FOLDER / "enhanced_frs_2023_24_tiny.h5")
update_dataset("Create tiny datasets", "completed")

Expand Down
204 changes: 204 additions & 0 deletions policyengine_uk_data/datasets/disability_benefits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""Dataset-side disability benefit category mapping.

PolicyEngine UK models PIP, DLA, and Attendance Allowance from category
inputs. The FRS observes reported amounts, so the data pipeline keeps those
amounts as internal build intermediates and converts them to model inputs
before datasets are published.
"""

from __future__ import annotations

from functools import lru_cache

import numpy as np
import pandas as pd
from policyengine_uk import CountryTaxBenefitSystem
from policyengine_uk.data import UKSingleYearDataset
from policyengine_uk.model_api import WEEKS_IN_YEAR as MODEL_WEEKS_IN_YEAR


DISABILITY_REPORTED_AMOUNT_COLUMNS = (
"attendance_allowance_reported",
"dla_sc_reported",
"dla_m_reported",
"pip_m_reported",
"pip_dl_reported",
)

DISABILITY_CATEGORY_COLUMNS = (
"aa_category",
"dla_sc_category",
"dla_m_category",
"pip_m_category",
"pip_dl_category",
)

SAFETY_MARGIN = 0.1
SURVEY_REPORTED_AMOUNT_WEEKS_IN_YEAR = 365.25 / 7


@lru_cache(maxsize=None)
def _dwp_category_threshold_parameters(year: int):
# Match the category formulas removed from policyengine-uk. Those formulas
# thresholded reported amounts against the baseline DWP rates.
return CountryTaxBenefitSystem().parameters(year).baseline.gov.dwp


@lru_cache(maxsize=None)
def _dwp_flag_parameters(year: int):
# Match the FRS disability flag derivation that already lived in uk-data.
return CountryTaxBenefitSystem().parameters(year).gov.dwp


def _reported_amount(person: pd.DataFrame, column: str) -> pd.Series:
if column not in person.columns:
return pd.Series(0.0, index=person.index)
return pd.to_numeric(person[column], errors="coerce").fillna(0.0)


def _category_from_reported_amount(
reported_amount: pd.Series,
thresholds: tuple[tuple[str, float], ...],
) -> np.ndarray:
weekly_amount = pd.to_numeric(reported_amount, errors="coerce").fillna(0)
weekly_amount = weekly_amount.to_numpy(dtype=float) / MODEL_WEEKS_IN_YEAR
category = np.full(len(weekly_amount), "NONE", dtype=object)
for category_name, weekly_rate in thresholds:
category[weekly_amount >= float(weekly_rate) * (1 - SAFETY_MARGIN)] = (
category_name
)
return category


def add_disability_benefit_categories_from_reported_amounts(
person: pd.DataFrame,
year: int,
*,
inplace: bool = False,
) -> pd.DataFrame:
"""Convert reported disability benefit amounts into category inputs."""

if not inplace:
person = person.copy()

dwp = _dwp_category_threshold_parameters(int(year))
mappings = (
(
"attendance_allowance_reported",
"aa_category",
(
("LOWER", dwp.attendance_allowance.lower),
("HIGHER", dwp.attendance_allowance.higher),
),
),
(
"dla_sc_reported",
"dla_sc_category",
(
("LOWER", dwp.dla.self_care.lower),
("MIDDLE", dwp.dla.self_care.middle),
("HIGHER", dwp.dla.self_care.higher),
),
),
(
"dla_m_reported",
"dla_m_category",
(
("LOWER", dwp.dla.mobility.lower),
("HIGHER", dwp.dla.mobility.higher),
),
),
(
"pip_m_reported",
"pip_m_category",
(
("STANDARD", dwp.pip.mobility.standard),
("ENHANCED", dwp.pip.mobility.enhanced),
),
),
(
"pip_dl_reported",
"pip_dl_category",
(
("STANDARD", dwp.pip.daily_living.standard),
("ENHANCED", dwp.pip.daily_living.enhanced),
),
),
)

for reported_column, category_column, thresholds in mappings:
if reported_column in person.columns:
person[category_column] = _category_from_reported_amount(
person[reported_column],
thresholds,
)

return person


def add_disability_benefit_flags_from_reported_amounts(
person: pd.DataFrame,
year: int,
*,
inplace: bool = False,
) -> pd.DataFrame:
"""Recompute disability flags derived from reported benefit amounts."""

if not inplace:
person = person.copy()

dwp = _dwp_flag_parameters(int(year))
dla_sc = _reported_amount(person, "dla_sc_reported")
dla_m = _reported_amount(person, "dla_m_reported")
pip_m = _reported_amount(person, "pip_m_reported")
pip_dl = _reported_amount(person, "pip_dl_reported")
afcs = _reported_amount(person, "afcs_reported")

person["is_disabled_for_benefits"] = (dla_sc + dla_m + pip_m + pip_dl) > 0

threshold_safety_gap = 1 * SURVEY_REPORTED_AMOUNT_WEEKS_IN_YEAR
dla_sc_higher = (
dwp.dla.self_care.higher * SURVEY_REPORTED_AMOUNT_WEEKS_IN_YEAR
- threshold_safety_gap
)
pip_dl_enhanced = (
dwp.pip.daily_living.enhanced * SURVEY_REPORTED_AMOUNT_WEEKS_IN_YEAR
- threshold_safety_gap
)

person["is_enhanced_disabled_for_benefits"] = dla_sc > dla_sc_higher
person["is_severely_disabled_for_benefits"] = (
(dla_sc >= dla_sc_higher) | (pip_dl >= pip_dl_enhanced) | (afcs > 0)
)

return person


def drop_internal_disability_reported_amounts(
person: pd.DataFrame,
*,
inplace: bool = False,
) -> pd.DataFrame:
"""Drop disability amount intermediates that are not PE-UK inputs."""

if inplace:
person.drop(
columns=list(DISABILITY_REPORTED_AMOUNT_COLUMNS),
errors="ignore",
inplace=True,
)
return person
return person.drop(
columns=list(DISABILITY_REPORTED_AMOUNT_COLUMNS),
errors="ignore",
)


def strip_internal_disability_reported_amounts(
dataset: UKSingleYearDataset,
) -> UKSingleYearDataset:
"""Return ``dataset`` without internal disability amount intermediates."""

dataset = dataset.copy()
dataset.person = drop_internal_disability_reported_amounts(dataset.person)
return dataset
34 changes: 5 additions & 29 deletions policyengine_uk_data/datasets/enhanced_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,13 @@
"yearly-average-currency-exchange-rates"
)

# 2025/26 reported-benefit mapping assumptions used only to populate UK input
# leaves from U.S. source records. PolicyEngine UK applies its own parameters
# when calculating derived tax and benefit outputs.
# 2025/26 benefit mapping assumptions used only to populate UK input leaves from
# U.S. source records. PolicyEngine UK applies its own parameters when
# calculating derived tax and benefit outputs.
NEW_STATE_PENSION_2025 = 224.96 * 52
DIVIDEND_YIELD_FOR_WEALTH_IMPUTATION = 0.03
RENTAL_YIELD_FOR_WEALTH_IMPUTATION = 0.04

PIP_2025_WEEKLY_RATES = {
"daily_living": {
"NONE": 0.0,
"STANDARD": 73.89,
"ENHANCED": 110.40,
},
"mobility": {
"NONE": 0.0,
"STANDARD": 29.19,
"ENHANCED": 77.04,
},
}

REGION_SHARES = (
("NORTH_EAST", 0.04),
("NORTH_WEST", 0.11),
Expand Down Expand Up @@ -248,11 +235,6 @@ def _pip_category(person: dict) -> str:
return "ENHANCED" if severe_signal or low_earnings else "STANDARD"


def _pip_reported_amount(category: str, component: str) -> float:
weekly = PIP_2025_WEEKLY_RATES[component][category]
return round(weekly * 52, 2)


def _household_cash_income(people: list[dict], exchange_rate: float) -> float:
total = 0.0
for person in people:
Expand Down Expand Up @@ -688,14 +670,8 @@ def _build_base_dataset(
if bool(inputs.get("is_blind", False))
else 0.0,
"is_disabled_for_benefits": bool(inputs.get("is_disabled", False)),
"pip_dl_reported": _pip_reported_amount(
pip_category,
"daily_living",
),
"pip_m_reported": _pip_reported_amount(
pip_category,
"mobility",
),
"pip_dl_category": pip_category,
"pip_m_category": pip_category,
"hours_worked": float(
inputs.get(
"weekly_hours_worked",
Expand Down
Loading