Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/calibration-stage-heartbeats.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add periodic CI heartbeats around long calibration setup stages so dataset release builds do not get canceled while constituency target matrices are being prepared.
76 changes: 76 additions & 0 deletions policyengine_uk_data/tests/test_calibration_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import time

import numpy as np
import pandas as pd

from policyengine_uk_data.utils.calibrate import calibrate_local_areas


class _DummyValues:
def __init__(self, values):
self.values = np.array(values, dtype=float)


class _DummyHousehold:
def __init__(self, weights):
self.household_weight = _DummyValues(weights)


class _DummyDataset:
def __init__(self, weights):
self.household = _DummyHousehold(weights)

def copy(self):
return _DummyDataset(self.household.household_weight.values.copy())


def test_calibrate_local_areas_logs_setup_stage_heartbeats_in_ci(
monkeypatch, capsys, tmp_path
):
monkeypatch.setenv("CI", "true")
monkeypatch.setenv("POLICYENGINE_PROGRESS_HEARTBEAT_SECONDS", "0.01")
monkeypatch.setattr(
"policyengine_uk_data.utils.calibrate.STORAGE_FOLDER",
tmp_path,
)

dataset = _DummyDataset([10.0, 20.0, 30.0])

def matrix_fn(_dataset):
time.sleep(0.03)
matrix = pd.DataFrame({"metric": [1.0, 0.0, 1.0]})
targets = pd.DataFrame({"metric": [2.0]})
mask = np.ones((1, 3))
return matrix, targets, mask

def national_matrix_fn(_dataset):
time.sleep(0.03)
matrix = pd.DataFrame({"national_metric": [1.0, 1.0, 1.0]})
targets = pd.Series({"national_metric": 3.0})
return matrix, targets

calibrate_local_areas(
dataset=dataset,
matrix_fn=matrix_fn,
national_matrix_fn=national_matrix_fn,
area_count=1,
weight_file="weights.h5",
epochs=1,
verbose=True,
area_name="Constituency",
)

output = capsys.readouterr().out
assert "[calibration] starting: Constituency: build local target matrix" in output
assert "[calibration] heartbeat: Constituency: build local target matrix" in output
assert "[calibration] completed: Constituency: build local target matrix" in output
assert (
"[calibration] starting: Constituency: build national target matrix" in output
)
assert (
"[calibration] heartbeat: Constituency: build national target matrix" in output
)
assert (
"[calibration] completed: Constituency: build national target matrix" in output
)
assert "[calibration] epoch 1/1: calculating loss" in output
34 changes: 34 additions & 0 deletions policyengine_uk_data/tests/test_progress.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import time

import pytest

from policyengine_uk_data.utils.progress import ProcessingProgress


Expand Down Expand Up @@ -36,3 +40,33 @@ def test_track_calibration_logs_heartbeats_in_ci(monkeypatch, capsys):
assert "[calibration] epoch 1/12: calculating loss" in output
assert "[calibration] epoch 10/12: loss=1.000000" in output
assert "[calibration] epoch 12/12: loss=1.200000" in output


def test_track_stage_logs_periodic_heartbeats_in_ci(monkeypatch, capsys):
monkeypatch.setenv("CI", "true")
monkeypatch.setenv("POLICYENGINE_PROGRESS_HEARTBEAT_SECONDS", "0.01")

progress = ProcessingProgress()

with progress.track_stage("Constituency: build local target matrix"):
time.sleep(0.03)

output = capsys.readouterr().out
assert "[calibration] starting: Constituency: build local target matrix" in output
assert "[calibration] heartbeat: Constituency: build local target matrix" in output
assert "[calibration] completed: Constituency: build local target matrix" in output


def test_track_stage_logs_failures_in_ci(monkeypatch, capsys):
monkeypatch.setenv("CI", "true")
monkeypatch.setenv("POLICYENGINE_PROGRESS_HEARTBEAT_SECONDS", "0.01")

progress = ProcessingProgress()

with pytest.raises(RuntimeError, match="boom"):
with progress.track_stage("Constituency: build local target matrix"):
time.sleep(0.02)
raise RuntimeError("boom")

output = capsys.readouterr().out
assert "[calibration] failed: Constituency: build local target matrix" in output
136 changes: 76 additions & 60 deletions policyengine_uk_data/utils/calibrate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from contextlib import nullcontext

import torch
from policyengine_uk import Microsimulation
import pandas as pd
import numpy as np
import h5py
Expand Down Expand Up @@ -39,59 +40,74 @@ def calibrate_local_areas(
verbose: Whether to print progress
area_name: Name of the area type for logging
"""
dataset = dataset.copy()
matrix, y, r = matrix_fn(dataset)
progress_tracker = ProcessingProgress() if verbose else None

def track_stage(stage_name: str):
if progress_tracker is None:
return nullcontext()
return progress_tracker.track_stage(stage_name)

with track_stage(f"{area_name}: copy dataset"):
dataset = dataset.copy()

with track_stage(f"{area_name}: build local target matrix"):
matrix, y, r = matrix_fn(dataset)
m_c, y_c = matrix.copy(), y.copy()
m_national, y_national = national_matrix_fn(dataset)

with track_stage(f"{area_name}: build national target matrix"):
m_national, y_national = national_matrix_fn(dataset)
m_n, y_n = m_national.copy(), y_national.copy()

# Weights - area_count x num_households
# Use country-aware initialization: divide each household's weight by the
# number of areas in its country, not the total area count. This ensures
# households start at approximately correct weight for their country's targets.
# The country_mask r[i,j]=1 iff household j is in same country as area i.
areas_per_household = r.sum(
axis=0
) # number of areas each household can contribute to
areas_per_household = np.maximum(areas_per_household, 1) # avoid division by zero
original_weights = np.log(
dataset.household.household_weight.values / areas_per_household
+ np.random.random(len(dataset.household.household_weight.values)) * 0.01
)
weights = torch.tensor(
np.ones((area_count, len(original_weights))) * original_weights,
dtype=torch.float32,
requires_grad=True,
)

# Set up validation targets if specified
validation_targets_local = (
matrix.columns.isin(excluded_training_targets)
if hasattr(matrix, "columns")
else None
)
validation_targets_national = (
m_national.columns.isin(excluded_training_targets)
if hasattr(m_national, "columns")
else None
)
dropout_targets = len(excluded_training_targets) > 0

# Convert to tensors
metrics = torch.tensor(
matrix.values if hasattr(matrix, "values") else matrix,
dtype=torch.float32,
)
y = torch.tensor(y.values if hasattr(y, "values") else y, dtype=torch.float32)
matrix_national = torch.tensor(
m_national.values if hasattr(m_national, "values") else m_national,
dtype=torch.float32,
)
y_national = torch.tensor(
y_national.values if hasattr(y_national, "values") else y_national,
dtype=torch.float32,
)
r = torch.tensor(r, dtype=torch.float32)
with track_stage(f"{area_name}: prepare tensors and optimizer"):
# Weights - area_count x num_households
# Use country-aware initialization: divide each household's weight by the
# number of areas in its country, not the total area count. This ensures
# households start at approximately correct weight for their country's targets.
# The country_mask r[i,j]=1 iff household j is in same country as area i.
areas_per_household = r.sum(
axis=0
) # number of areas each household can contribute to
areas_per_household = np.maximum(
areas_per_household, 1
) # avoid division by zero
original_weights = np.log(
dataset.household.household_weight.values / areas_per_household
+ np.random.random(len(dataset.household.household_weight.values)) * 0.01
)
weights = torch.tensor(
np.ones((area_count, len(original_weights))) * original_weights,
dtype=torch.float32,
requires_grad=True,
)

# Set up validation targets if specified
validation_targets_local = (
matrix.columns.isin(excluded_training_targets)
if hasattr(matrix, "columns")
else None
)
validation_targets_national = (
m_national.columns.isin(excluded_training_targets)
if hasattr(m_national, "columns")
else None
)
dropout_targets = len(excluded_training_targets) > 0

# Convert to tensors
metrics = torch.tensor(
matrix.values if hasattr(matrix, "values") else matrix,
dtype=torch.float32,
)
y = torch.tensor(y.values if hasattr(y, "values") else y, dtype=torch.float32)
matrix_national = torch.tensor(
m_national.values if hasattr(m_national, "values") else m_national,
dtype=torch.float32,
)
y_national = torch.tensor(
y_national.values if hasattr(y_national, "values") else y_national,
dtype=torch.float32,
)
r = torch.tensor(r, dtype=torch.float32)

def sre(x, y):
one_way = ((1 + x) / (1 + y) - 1) ** 2
Expand Down Expand Up @@ -160,8 +176,6 @@ def dropout_weights(weights, p):
final_weights = (torch.exp(weights) * r).detach().numpy()
performance = pd.DataFrame()

progress_tracker = ProcessingProgress() if verbose else None

if verbose and progress_tracker:
with progress_tracker.track_calibration(
epochs, nested_progress
Expand All @@ -171,8 +185,8 @@ def dropout_weights(weights, p):

optimizer.zero_grad()
weights_ = torch.exp(dropout_weights(weights, 0.05)) * r
l = loss(weights_)
l.backward()
loss_value = loss(weights_)
loss_value.backward()
optimizer.step()

local_close = pct_close(weights_, local=True, national=False)
Expand All @@ -187,7 +201,9 @@ def dropout_weights(weights, p):
)
else:
update_calibration(
epoch + 1, loss_value=l.item(), calculating_loss=False
epoch + 1,
loss_value=loss_value.item(),
calculating_loss=False,
)

if epoch % 10 == 0:
Expand Down Expand Up @@ -225,8 +241,8 @@ def dropout_weights(weights, p):
for epoch in range(epochs):
optimizer.zero_grad()
weights_ = torch.exp(dropout_weights(weights, 0.05)) * r
l = loss(weights_)
l.backward()
loss_value = loss(weights_)
loss_value.backward()
optimizer.step()

local_close = pct_close(weights_, local=True, national=False)
Expand All @@ -236,12 +252,12 @@ def dropout_weights(weights, p):
if dropout_targets:
validation_loss = loss(weights_, validation=True)
print(
f"Training loss: {l.item():,.3f}, Validation loss: {validation_loss.item():,.3f}, Epoch: {epoch}, "
f"Training loss: {loss_value.item():,.3f}, Validation loss: {validation_loss.item():,.3f}, Epoch: {epoch}, "
f"{area_name}<10%: {local_close:.1%}, National<10%: {national_close:.1%}"
)
else:
print(
f"Loss: {l.item()}, Epoch: {epoch}, {area_name}<10%: {local_close:.1%}, National<10%: {national_close:.1%}"
f"Loss: {loss_value.item()}, Epoch: {epoch}, {area_name}<10%: {local_close:.1%}, National<10%: {national_close:.1%}"
)

if epoch % 10 == 0:
Expand Down
41 changes: 41 additions & 0 deletions policyengine_uk_data/utils/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from contextlib import contextmanager
import os
import threading
import time
from typing import Any, Dict, List, Optional, Union

from rich.console import Console
Expand Down Expand Up @@ -190,10 +192,49 @@ def __init__(self, console: Optional[Console] = None):
self._plain_output = (
os.environ.get("GITHUB_ACTIONS") == "true" or os.environ.get("CI") == "true"
)
self._heartbeat_seconds = float(
os.environ.get("POLICYENGINE_PROGRESS_HEARTBEAT_SECONDS", "60")
)

def _emit(self, message: str):
print(message, flush=True)

@contextmanager
def track_stage(self, stage_name: str, category: str = "calibration"):
"""Track a long-running stage with periodic CI heartbeats."""
if not self._plain_output:
yield
return

self._emit(f"[{category}] starting: {stage_name}")
started_at = time.monotonic()
stop_event = threading.Event()

def emit_heartbeats():
while not stop_event.wait(self._heartbeat_seconds):
elapsed = int(time.monotonic() - started_at)
self._emit(f"[{category}] heartbeat: {stage_name} ({elapsed}s elapsed)")

heartbeat_thread = threading.Thread(
target=emit_heartbeats,
name=f"{category}-heartbeat",
daemon=True,
)
heartbeat_thread.start()

try:
yield
except Exception:
elapsed = int(time.monotonic() - started_at)
self._emit(f"[{category}] failed: {stage_name} ({elapsed}s elapsed)")
raise
else:
elapsed = int(time.monotonic() - started_at)
self._emit(f"[{category}] completed: {stage_name} ({elapsed}s elapsed)")
finally:
stop_event.set()
heartbeat_thread.join(timeout=1)

@contextmanager
def track_dataset_creation(self, datasets: List[str]):
"""Track dataset creation progress with stable display.
Expand Down
Loading