Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
STAT_XPLORE_API_KEY=65794a30655841694f694a4b563151694c434a68624763694f694a49557a49314e694a392e65794a7063334d694f694a7a644849756333526c6247786863694973496e4e3159694936496d357061326870624542776232787059336c6c626d6470626d557562334a6e4969776961574630496a6f784e7a59334f5459794f4455324c434a68645751694f694a7a6448497562325268496e302e6c47647841313333512d49544968645a6467384865365275324275494153664b7570304c6f506b42704c77
30 changes: 21 additions & 9 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
*.egg-info
**/__pycache__
**/.DS_STORE
**/.DS_Store
**/*.h5
**/*.csv
**/*.zip
**/*.pkl
**/*.tab
!uprating_factors.csv
!uprating_growth_factors.csv
!incomes.csv
!tax_benefit.csv
!demographics.csv
!incomes_projection.csv
!policyengine_uk_data/datasets/local_areas/**/*.csv
**/_build
!policyengine_uk_data/storage/*.csv
**/version.json

# Databases
**/*.db

# Local data bucket (Dagster)
policyengine_uk_data/data/

# Dagster
.tmp_dagster_home*
storage/

# Build artifacts
dist/
build/
*.egg

# IDE
.idea/
.vscode/
*.swp
80 changes: 69 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,36 @@ test:
install:
uv pip install -e ".[dev]" --config-settings editable_mode=compat

download:
python policyengine_uk_data/storage/download_private_prerequisites.py
# Dagster commands
ui:
dagster dev -m policyengine_uk_data.definitions

upload:
python policyengine_uk_data/storage/upload_completed_datasets.py

documentation:
pip install --pre "jupyter-book>=2"
jb clean docs && jb build docs
python docs/add_plotly_to_book.py docs
dev: ui

data:
python policyengine_uk_data/datasets/create_datasets.py
dagster asset materialize --select "*" -m policyengine_uk_data.definitions

data-test:
TESTING=1 dagster asset materialize --select "*" -m policyengine_uk_data.definitions

# Materialise specific asset groups
raw:
dagster asset materialize --select "group:raw_data" -m policyengine_uk_data.definitions

models:
dagster asset materialize --select "group:models" -m policyengine_uk_data.definitions

imputations:
dagster asset materialize --select "group:imputations" -m policyengine_uk_data.definitions

calibration:
dagster asset materialize --select "group:calibration" -m policyengine_uk_data.definitions

output:
dagster asset materialize --select "enhanced_frs" -m policyengine_uk_data.definitions

targets:
dagster asset materialize --select "targets_db" -m policyengine_uk_data.definitions

build:
python -m build
Expand All @@ -31,7 +48,48 @@ publish:

changelog:
build-changelog changelog.yaml --output changelog.yaml --update-last-date --start-from 1.0.0 --append-file changelog_entry.yaml
build-changelog changelog.yaml --org PolicyEngine --repo policyengine-us-data --output CHANGELOG.md --template .github/changelog_template.md
build-changelog changelog.yaml --org PolicyEngine --repo policyengine-uk-data --output CHANGELOG.md --template .github/changelog_template.md
bump-version changelog.yaml pyproject.toml
rm changelog_entry.yaml || true
touch changelog_entry.yaml

# Dashboard commands (local - faster)
dashboard:
@echo "Starting dashboard locally..."
@cd dashboard/api && DATABASE_PATH=../../policyengine_uk_data/targets/targets.db CORS_ORIGINS=http://localhost:3000 uvicorn main:app --host 0.0.0.0 --port 8000 > /tmp/api.log 2>&1 & \
cd dashboard/frontend && bun dev > /tmp/frontend.log 2>&1 & \
sleep 3 && \
echo "✓ Frontend: http://localhost:3000" && \
echo "✓ API: http://localhost:8000" && \
echo "✓ API Docs: http://localhost:8000/docs"

dashboard-stop:
@lsof -ti:8000 | xargs kill -9 2>/dev/null || true
@lsof -ti:3000 | xargs kill -9 2>/dev/null || true
@echo "Dashboard stopped"

# Dashboard commands (Docker - slower but isolated)
dashboard-docker:
cd dashboard && docker-compose up --build

dashboard-docker-down:
cd dashboard && docker-compose down

# Help
help:
@echo "Available targets:"
@echo " make ui - Start Dagster web UI"
@echo " make data - Materialise all assets"
@echo " make data-test - Materialise all assets (testing mode, reduced epochs)"
@echo " make raw - Materialise raw data assets only"
@echo " make models - Materialise imputation models only"
@echo " make imputations - Materialise imputation assets only"
@echo " make calibration - Materialise calibration assets only"
@echo " make output - Materialise final enhanced_frs only"
@echo " make targets - Materialise targets database only"
@echo " make dashboard - Start dashboard locally (fast)"
@echo " make dashboard-stop - Stop dashboard"
@echo " make dashboard-docker - Start dashboard with Docker (slower)"
@echo " make test - Run pytest"
@echo " make format - Format with black"
@echo " make install - Install package with dev deps"
24 changes: 16 additions & 8 deletions policyengine_uk_data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
from .datasets import *
from .storage.download_private_prerequisites import (
download_prerequisites,
check_prerequisites,
)

# Check prerequisites on import and warn if missing
check_prerequisites()
"""PolicyEngine UK data pipeline.

This package builds representative microdata for UK tax-benefit modelling.
Orchestrated with Dagster for reproducibility and observability.

Usage:
# Run Dagster UI
dagster dev -m policyengine_uk_data.definitions

# Or materialise assets programmatically
from policyengine_uk_data.definitions import defs
"""

from policyengine_uk_data.definitions import defs

__all__ = ["defs"]
74 changes: 74 additions & 0 deletions policyengine_uk_data/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Dagster assets for the policyengine-uk-data pipeline."""

from policyengine_uk_data.assets.raw_surveys import (
raw_frs,
raw_was,
raw_lcfs,
raw_etb,
raw_spi,
)
from policyengine_uk_data.assets.frs import base_frs
from policyengine_uk_data.assets.imputations import (
frs_with_wealth,
frs_with_consumption,
frs_with_vat,
frs_with_services,
frs_with_income,
frs_with_capital_gains,
frs_with_salary_sacrifice,
frs_with_student_loans,
uprated_frs,
)
from policyengine_uk_data.assets.calibration import (
constituency_weights,
la_weights,
)
from policyengine_uk_data.assets.outputs import enhanced_frs
from policyengine_uk_data.assets.targets import (
targets_areas,
targets_metrics,
dwp_benefit_observations,
ons_demographics_observations,
observations_from_official_stats,
observations_council_tax,
targets_db,
)
from policyengine_uk_data.assets.sources.obr import obr_receipts_observations
from policyengine_uk_data.assets.sources.dwp_stat_xplore import (
dwp_stat_xplore_observations,
)

__all__ = [
# Raw data
"raw_frs",
"raw_was",
"raw_lcfs",
"raw_etb",
"raw_spi",
# Dataset pipeline
"base_frs",
"frs_with_wealth",
"frs_with_consumption",
"frs_with_vat",
"frs_with_services",
"frs_with_income",
"frs_with_capital_gains",
"frs_with_salary_sacrifice",
"frs_with_student_loans",
"uprated_frs",
# Calibration
"constituency_weights",
"la_weights",
# Outputs
"enhanced_frs",
# Targets
"targets_areas",
"targets_metrics",
"obr_receipts_observations",
"dwp_stat_xplore_observations",
"dwp_benefit_observations",
"ons_demographics_observations",
"observations_from_official_stats",
"observations_council_tax",
"targets_db",
]
131 changes: 131 additions & 0 deletions policyengine_uk_data/assets/calibration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Weight calibration assets using original calibration code with real targets."""

import os
from pathlib import Path

import h5py
import numpy as np
from dagster import asset, AssetExecutionContext, Config
from policyengine_uk.data import UKSingleYearDataset
from pydantic import Field

from policyengine_uk_data.resources.bucket import BucketResource
from policyengine_uk_data.resources.compute import ComputeResource
from policyengine_uk_data.storage import STORAGE_FOLDER


class CalibrationConfig(Config):
epochs: int | None = Field(
default=None, description="Override epochs (None uses compute default)"
)


def _load_dataset(data: dict) -> UKSingleYearDataset:
return UKSingleYearDataset(
person=data["person"],
benunit=data["benunit"],
household=data["household"],
fiscal_year=data.get("fiscal_year", 2025),
)


@asset(group_name="calibration")
def constituency_weights(
context: AssetExecutionContext,
config: CalibrationConfig,
uprated_frs: dict,
compute: ComputeResource,
) -> np.ndarray:
"""Calibrated weights for 650 parliamentary constituencies."""
from policyengine_uk_data.utils.calibrate import calibrate_local_areas
from policyengine_uk_data.datasets.local_areas.constituencies.loss import (
create_constituency_target_matrix,
create_national_target_matrix,
)
from policyengine_uk_data.datasets.local_areas.constituencies.calibrate import (
get_performance,
)

dataset = _load_dataset(uprated_frs)
epochs = config.epochs or compute.epochs

context.log.info(f"Calibrating constituency weights ({epochs} epochs)")

weight_file = "parliamentary_constituency_weights.h5"
calibrate_local_areas(
dataset=dataset,
epochs=epochs,
matrix_fn=create_constituency_target_matrix,
national_matrix_fn=create_national_target_matrix,
area_count=650,
weight_file=weight_file,
excluded_training_targets=[],
log_csv="constituency_calibration_log.csv",
verbose=True,
area_name="Constituency",
get_performance=get_performance,
)

# Read weights from saved file
with h5py.File(STORAGE_FOLDER / weight_file, "r") as f:
weights = f["2025"][:]

context.add_output_metadata({
"shape": list(weights.shape),
"epochs": epochs,
"areas": 650,
})

return weights


@asset(group_name="calibration")
def la_weights(
context: AssetExecutionContext,
config: CalibrationConfig,
uprated_frs: dict,
compute: ComputeResource,
) -> np.ndarray:
"""Calibrated weights for 360 local authorities."""
from policyengine_uk_data.utils.calibrate import calibrate_local_areas
from policyengine_uk_data.datasets.local_areas.local_authorities.loss import (
create_local_authority_target_matrix,
)
from policyengine_uk_data.datasets.local_areas.constituencies.loss import (
create_national_target_matrix,
)
from policyengine_uk_data.datasets.local_areas.local_authorities.calibrate import (
get_performance,
)

dataset = _load_dataset(uprated_frs)
epochs = config.epochs or compute.epochs

context.log.info(f"Calibrating local authority weights ({epochs} epochs)")

weight_file = "local_authority_weights.h5"
calibrate_local_areas(
dataset=dataset,
epochs=epochs,
matrix_fn=create_local_authority_target_matrix,
national_matrix_fn=create_national_target_matrix,
area_count=360,
weight_file=weight_file,
excluded_training_targets=[],
log_csv="la_calibration_log.csv",
verbose=True,
area_name="Local Authority",
get_performance=get_performance,
)

# Read weights from saved file
with h5py.File(STORAGE_FOLDER / weight_file, "r") as f:
weights = f["2025"][:]

context.add_output_metadata({
"shape": list(weights.shape),
"epochs": epochs,
"areas": 360,
})

return weights
Loading
Loading