Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/spi-2022-23.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update SPI private prerequisites and income imputation to the 2022-23 Public Use Tape.
110 changes: 72 additions & 38 deletions policyengine_uk_data/datasets/imputations/income.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@

import pandas as pd
import numpy as np
import os
from policyengine_uk_data.storage import STORAGE_FOLDER
from policyengine_uk.data import UKSingleYearDataset
from policyengine_uk import Microsimulation
from policyengine_uk_data.datasets.spi import (
AGE_RANGES,
REGION_MAP,
SPI_RELEASE_NAME,
SPI_TAB_FILENAME,
)
from policyengine_uk_data.utils.stack import stack_datasets
from policyengine_uk_data.utils.subsample import subsample_dataset

SPI_TAB_FOLDER = STORAGE_FOLDER / "spi_2020_21"
SPI_TAB_FOLDER = STORAGE_FOLDER / SPI_RELEASE_NAME
SPI_RENAMES = dict(
private_pension_income="PENSION",
self_employment_income="PROFITS",
Expand All @@ -37,7 +44,18 @@
)


def generate_spi_table(spi: pd.DataFrame):
def _spi_age_bounds(age_code) -> tuple[int, int]:
try:
return AGE_RANGES[int(age_code)]
except (TypeError, ValueError, KeyError):
return AGE_RANGES[-1]


def generate_spi_table(
spi: pd.DataFrame,
seed: int = 0,
sample_size: int | None = 100_000,
):
"""
Clean and transform SPI data for income imputation model training.

Expand All @@ -47,29 +65,12 @@ def generate_spi_table(spi: pd.DataFrame):
Returns:
Cleaned DataFrame with age and region mappings applied.
"""
LOWER = np.array([0, 16, 25, 35, 45, 55, 65, 75])
UPPER = np.array([16, 25, 35, 45, 55, 65, 75, 80])
rng = np.random.default_rng(seed)
age_range = spi.AGERANGE
spi["age"] = LOWER[age_range] + np.random.rand(len(spi)) * (
UPPER[age_range] - LOWER[age_range]
)
bounds = np.array([_spi_age_bounds(age) for age in age_range])
spi["age"] = bounds[:, 0] + rng.random(len(spi)) * (bounds[:, 1] - bounds[:, 0])

REGIONS = {
1: "NORTH_EAST",
2: "NORTH_WEST",
3: "YORKSHIRE",
4: "EAST_MIDLANDS",
5: "WEST_MIDLANDS",
6: "EAST_OF_ENGLAND",
7: "LONDON",
8: "SOUTH_EAST",
9: "SOUTH_WEST",
10: "WALES",
11: "SCOTLAND",
12: "NORTHERN_IRELAND",
}

spi["region"] = np.array([REGIONS.get(x, "LONDON") for x in spi.GORCODE])
spi["region"] = spi.GORCODE.map(REGION_MAP).fillna("UNKNOWN")

spi["gender"] = np.where(spi.SEX == 1, "MALE", "FEMALE")

Expand All @@ -78,11 +79,17 @@ def generate_spi_table(spi: pd.DataFrame):

spi["employment_income"] = spi[["PAY", "EPB", "TAXTERM"]].sum(axis=1)

spi = pd.concat(
[
spi.sample(100_000, weights=spi.person_weight, replace=True),
]
)
if sample_size is not None:
spi = pd.concat(
[
spi.sample(
sample_size,
weights=spi.person_weight,
replace=True,
random_state=seed,
),
]
)

return spi

Expand Down Expand Up @@ -119,7 +126,35 @@ def generate_spi_table(spi: pd.DataFrame):
IMPUTATIONS = INCOME_COMPONENTS + ["gift_aid", "charitable_investment_gifts"]


INCOME_MODEL_PATH = STORAGE_FOLDER / "income.pkl"
INCOME_MODEL_METADATA = {
"spi_release_name": SPI_RELEASE_NAME,
"spi_tab_filename": SPI_TAB_FILENAME,
"imputations": tuple(IMPUTATIONS),
}
INCOME_MODEL_PATH = STORAGE_FOLDER / f"income_{SPI_RELEASE_NAME}.pkl"
INCOME_MODEL_SAMPLE_SIZE = 100_000
TESTING_INCOME_MODEL_SAMPLE_SIZE = 10_000


def get_income_model_sample_size() -> int:
if os.environ.get("TESTING", "0") == "1":
return TESTING_INCOME_MODEL_SAMPLE_SIZE
return INCOME_MODEL_SAMPLE_SIZE


def get_income_model_metadata() -> dict:
return {
**INCOME_MODEL_METADATA,
"sample_size": get_income_model_sample_size(),
}


def _income_model_matches_current_release(model) -> bool:
if getattr(model, "metadata", {}) != get_income_model_metadata():
return False

cached_outputs = set(getattr(model.model, "imputed_variables", []))
return cached_outputs == set(IMPUTATIONS)


def save_imputation_models():
Expand All @@ -132,8 +167,9 @@ def save_imputation_models():
from policyengine_uk_data.utils import QRF

income = QRF()
spi = pd.read_csv(SPI_TAB_FOLDER / "put2021uk.tab", delimiter="\t")
spi = generate_spi_table(spi)
income.metadata = get_income_model_metadata()
spi = pd.read_csv(SPI_TAB_FOLDER / SPI_TAB_FILENAME, delimiter="\t")
spi = generate_spi_table(spi, sample_size=get_income_model_sample_size())
spi = spi[PREDICTORS + IMPUTATIONS]
income.fit(spi[PREDICTORS], spi[IMPUTATIONS])
income.save(INCOME_MODEL_PATH)
Expand All @@ -144,10 +180,9 @@ def create_income_model(overwrite_existing: bool = False):
"""
Create or load income imputation model.

If a cached model exists and its trained output columns don't match the
current ``IMPUTATIONS`` list, the cache is discarded and the model is
retrained. This handles the case where ``IMPUTATIONS`` is extended in
code but an older pickle is still on disk.
If a cached model exists and its training metadata or output columns don't
match the current SPI release and ``IMPUTATIONS`` list, the cache is
discarded and the model is retrained.

Args:
overwrite_existing: Whether to retrain model if it exists.
Expand All @@ -159,10 +194,9 @@ def create_income_model(overwrite_existing: bool = False):

if INCOME_MODEL_PATH.exists() and not overwrite_existing:
cached = QRF(file_path=INCOME_MODEL_PATH)
cached_outputs = set(getattr(cached.model, "imputed_variables", []))
if cached_outputs == set(IMPUTATIONS):
if _income_model_matches_current_release(cached):
return cached
# Cached model was trained against a different output set; retrain.
# Cached model was trained against a different SPI release or output set.
return save_imputation_models()


Expand Down
20 changes: 13 additions & 7 deletions policyengine_uk_data/datasets/spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import numpy as np
from policyengine_uk.data import UKSingleYearDataset

SPI_RELEASE_NAME = "spi_2022_23"
SPI_TAB_FILENAME = "put2223uk.tab"
SPI_FISCAL_YEAR = 2022
SPI_H5_FILENAME = "spi_2022_23.h5"


# Age-range midpoints for random age imputation.
# Key -1 covers records with no reported AGERANGE — use a broad working-age
Expand Down Expand Up @@ -86,8 +91,8 @@ def create_spi(
"""Build a :class:`UKSingleYearDataset` from an SPI microdata `.tab` file.

Args:
spi_data_file_path: Path to the SPI `.tab` file (e.g. `put2021uk.tab`).
fiscal_year: UK fiscal year for the dataset (e.g. 20202020-21).
spi_data_file_path: Path to the SPI `.tab` file (e.g. `put2223uk.tab`).
fiscal_year: UK fiscal year for the dataset (e.g. 20222022-23).
output_file_path: Unused here — callers may save the returned dataset
themselves with ``dataset.save(path)``. Kept as a kwarg so
existing call sites don't break.
Expand Down Expand Up @@ -142,8 +147,9 @@ def create_spi(
# generator so builds are reproducible (previously used the unseeded
# global np.random.rand).
percent_along_age_range = rng.random(len(df))
min_age = np.array([AGE_RANGES[age][0] for age in age_range])
max_age = np.array([AGE_RANGES[age][1] for age in age_range])
bounds = np.array([AGE_RANGES.get(int(age), AGE_RANGES[-1]) for age in age_range])
min_age = bounds[:, 0]
max_age = bounds[:, 1]
person["age"] = (min_age + (max_age - min_age) * percent_along_age_range).astype(
int
)
Expand Down Expand Up @@ -174,8 +180,8 @@ def create_spi(


if __name__ == "__main__":
spi_data_file_path = STORAGE_FOLDER / "spi_2020_21" / "put2021uk.tab"
fiscal_year = 2020
output_file_path = STORAGE_FOLDER / "spi_2020.h5"
spi_data_file_path = STORAGE_FOLDER / SPI_RELEASE_NAME / SPI_TAB_FILENAME
fiscal_year = SPI_FISCAL_YEAR
output_file_path = STORAGE_FOLDER / SPI_H5_FILENAME
spi = create_spi(spi_data_file_path, fiscal_year, output_file_path)
spi.save(output_file_path)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from policyengine_uk_data.datasets.frs_release import CURRENT_FRS_RELEASE
from policyengine_uk_data.datasets.spi import SPI_RELEASE_NAME
from policyengine_uk_data.utils.hf_destinations import PRIVATE_REPO
from policyengine_uk_data.utils.huggingface import download
from pathlib import Path
Expand All @@ -13,7 +14,7 @@
("lcfs_2021_22.zip", None),
("was_2006_20.zip", None),
("etb_1977_21.zip", None),
("spi_2020_21.zip", None),
(f"{SPI_RELEASE_NAME}.zip", None),
]


Expand Down
Loading