Skip to content

Commit f042204

Browse files
baogorekclaude
andcommitted
H5 lineage tracing, at-large CD fix, target pruning, and linting
- Copy all intermediate H5 datasets to pipeline volume for lineage tracing - Add yearless source_imputed alias for downstream pipeline consumers - Route source_imputed H5s to calibration/ path in HF staging for promote - Normalize at-large congressional district GEOID 200→201 (AK, DE, etc.) - Prune filer-gated and high-error calibration targets (67→32) - Remove unused imports and normalize Unicode across ~58 files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 829dcd9 commit f042204

62 files changed

Lines changed: 86 additions & 223 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/local_area_calibration_setup.ipynb

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"\n",
1010
"This notebook demonstrates the clone-based calibration pipeline: how raw CPS records become a calibration matrix and, ultimately, CD-level stacked datasets.\n",
1111
"\n",
12-
"The paradigm shift from the old approach: instead of replicating every household into every congressional district, we **clone** each record N times and assign each clone a **random census block** drawn from a population-weighted distribution. Each clone inherits a state, CD, and block \u2014 and gets re-simulated under the rules of its assigned state.\n",
12+
"The paradigm shift from the old approach: instead of replicating every household into every congressional district, we **clone** each record N times and assign each clone a **random census block** drawn from a population-weighted distribution. Each clone inherits a state, CD, and block and gets re-simulated under the rules of its assigned state.\n",
1313
"\n",
1414
"We follow one household (`record_idx=8629`, household_id 128694, SNAP \\$18,396) through the entire pipeline:\n",
1515
"1. Clone and assign geography\n",
@@ -19,7 +19,7 @@
1919
"5. Build the calibration matrix\n",
2020
"6. Create stacked datasets from calibrated weights\n",
2121
"\n",
22-
"**Companion notebook:** [calibration_internals.ipynb](calibration_internals.ipynb) covers the *finished* matrix \u2014 row/column anatomy, target groups, sparsity. This notebook covers the *process* that creates it and what happens after (stacked datasets).\n",
22+
"**Companion notebook:** [calibration_internals.ipynb](calibration_internals.ipynb) covers the *finished* matrix row/column anatomy, target groups, sparsity. This notebook covers the *process* that creates it and what happens after (stacked datasets).\n",
2323
"\n",
2424
"**Requirements:** `policy_data.db`, `block_cd_distributions.csv.gz`, and the stratified CPS h5 file in `STORAGE_FOLDER`."
2525
]
@@ -56,7 +56,6 @@
5656
"from policyengine_us_data.storage import STORAGE_FOLDER\n",
5757
"from policyengine_us_data.calibration.clone_and_assign import (\n",
5858
" assign_random_geography,\n",
59-
" GeographyAssignment,\n",
6059
" load_global_block_distribution,\n",
6160
")\n",
6261
"from policyengine_us_data.calibration.unified_matrix_builder import (\n",
@@ -303,13 +302,13 @@
303302
"id": "cell-9",
304303
"metadata": {},
305304
"source": [
306-
"## Section 3: Inside `_simulate_clone` \u2014 State-Swap\n",
305+
"## Section 3: Inside `_simulate_clone` State-Swap\n",
307306
"\n",
308307
"For each clone, `_simulate_clone` does four things:\n",
309308
"1. Creates a **fresh** `Microsimulation` from the base dataset\n",
310309
"2. Overwrites `state_fips` with the clone's assigned states\n",
311310
"3. Optionally calls a `sim_modifier` (e.g., takeup re-randomization)\n",
312-
"4. **Clears cached formulas** via `get_calculated_variables` \u2014 preserving survey inputs and IDs while forcing recalculation of state-dependent variables like SNAP\n",
311+
"4. **Clears cached formulas** via `get_calculated_variables` preserving survey inputs and IDs while forcing recalculation of state-dependent variables like SNAP\n",
313312
"\n",
314313
"Let's reproduce this manually for clone 0."
315314
]
@@ -476,7 +475,7 @@
476475
"\n",
477476
"When assembling the calibration matrix, each target row only \"sees\" columns (clones) whose geography matches the target's geography. This is implemented via `state_to_cols` and `cd_to_cols` dictionaries built from the `GeographyAssignment`.\n",
478477
"\n",
479-
"This is step 3 of `build_matrix` \u2014 reproduced here for transparency."
478+
"This is step 3 of `build_matrix` reproduced here for transparency."
480479
]
481480
},
482481
{
@@ -585,7 +584,7 @@
585584
"source": [
586585
"## Section 5: Takeup Re-randomization\n",
587586
"\n",
588-
"The base CPS has fixed takeup decisions (e.g., \"this household takes up SNAP\"). But when we clone a household into different census blocks, each block should have independently drawn takeup \u2014 otherwise every clone of a SNAP-participating household would still participate, regardless of geography.\n",
587+
"The base CPS has fixed takeup decisions (e.g., \"this household takes up SNAP\"). But when we clone a household into different census blocks, each block should have independently drawn takeup otherwise every clone of a SNAP-participating household would still participate, regardless of geography.\n",
589588
"\n",
590589
"`rerandomize_takeup` solves this: for each census block, it uses `seeded_rng(variable_name, salt=block_geoid)` to draw new takeup booleans. The seed is deterministic per (variable, block) pair, so results are reproducible."
591590
]
@@ -763,7 +762,7 @@
763762
"id": "cell-22",
764763
"metadata": {},
765764
"source": [
766-
"In the full pipeline, `rerandomize_takeup` is passed to `build_matrix` as a `sim_modifier` callback. For each clone, after `state_fips` is set but before formula caches are cleared, the callback draws new takeup booleans per census block. This means the same household in block A might take up SNAP while in block B it doesn't \u2014 matching the statistical reality that takeup varies by geography."
765+
"In the full pipeline, `rerandomize_takeup` is passed to `build_matrix` as a `sim_modifier` callback. For each clone, after `state_fips` is set but before formula caches are cleared, the callback draws new takeup booleans per census block. This means the same household in block A might take up SNAP while in block B it doesn't matching the statistical reality that takeup varies by geography."
767766
]
768767
},
769768
{
@@ -871,9 +870,9 @@
871870
"source": [
872871
"## Section 7: From Weights to Datasets\n",
873872
"\n",
874-
"`create_sparse_cd_stacked_dataset` takes calibrated weights and builds an h5 file with only the non-zero-weight households, reindexed per CD. Internally it does its own state-swap simulation \u2014 loading the base dataset, assigning `state_fips` for the target CD's state, and recalculating benefits from scratch. This means SNAP values in the output reflect the destination state's rules (e.g., a $70 SNAP household from ME may get $0 under AK rules).\n",
873+
"`create_sparse_cd_stacked_dataset` takes calibrated weights and builds an h5 file with only the non-zero-weight households, reindexed per CD. Internally it does its own state-swap simulation loading the base dataset, assigning `state_fips` for the target CD's state, and recalculating benefits from scratch. This means SNAP values in the output reflect the destination state's rules (e.g., a $70 SNAP household from ME may get $0 under AK rules).\n",
875874
"\n",
876-
"**Format gap:** The calibration produces weights in clone layout `(n_records * n_clones,)` where each clone maps to one specific CD via the `GeographyAssignment`. The stacked dataset builder expects CD layout `(n_cds * n_households,)` where every CD has a weight slot for every household. Converting between these \u2014 accumulating clone weights into their assigned CDs \u2014 is a separate step not yet implemented. The demo below constructs artificial CD-layout weights directly to show how the builder works."
875+
"**Format gap:** The calibration produces weights in clone layout `(n_records * n_clones,)` where each clone maps to one specific CD via the `GeographyAssignment`. The stacked dataset builder expects CD layout `(n_cds * n_households,)` where every CD has a weight slot for every household. Converting between these accumulating clone weights into their assigned CDs is a separate step not yet implemented. The demo below constructs artificial CD-layout weights directly to show how the builder works."
877876
]
878877
},
879878
{
@@ -1012,9 +1011,9 @@
10121011
"\n",
10131012
"Overflow check:\n",
10141013
" Max person ID after reindexing: 5,025,365\n",
1015-
" Max person ID \u00d7 100: 502,536,500\n",
1014+
" Max person ID × 100: 502,536,500\n",
10161015
" int32 max: 2,147,483,647\n",
1017-
" \u2713 No overflow risk!\n",
1016+
" No overflow risk!\n",
10181017
"\n",
10191018
"Creating Dataset from combined DataFrame...\n",
10201019
"Building simulation from Dataset...\n",
@@ -1134,12 +1133,12 @@
11341133
"\n",
11351134
"The clone-based calibration pipeline has six stages:\n",
11361135
"\n",
1137-
"1. **Clone + assign geography** \u2014 `assign_random_geography()` creates N copies of each CPS record, each with a population-weighted random census block.\n",
1138-
"2. **Simulate** \u2014 `_simulate_clone()` sets each clone's `state_fips` and recalculates state-dependent benefits.\n",
1139-
"3. **Geographic masking** \u2014 `state_to_cols` / `cd_to_cols` restrict each target row to geographically relevant columns.\n",
1140-
"4. **Re-randomize takeup** \u2014 `rerandomize_takeup()` draws new takeup per census block, breaking the fixed-takeup assumption.\n",
1141-
"5. **Build matrix** \u2014 `UnifiedMatrixBuilder.build_matrix()` assembles the sparse CSR matrix from all clones.\n",
1142-
"6. **Stacked datasets** \u2014 `create_sparse_cd_stacked_dataset()` converts calibrated weights into CD-level h5 files.\n",
1136+
"1. **Clone + assign geography** `assign_random_geography()` creates N copies of each CPS record, each with a population-weighted random census block.\n",
1137+
"2. **Simulate** `_simulate_clone()` sets each clone's `state_fips` and recalculates state-dependent benefits.\n",
1138+
"3. **Geographic masking** `state_to_cols` / `cd_to_cols` restrict each target row to geographically relevant columns.\n",
1139+
"4. **Re-randomize takeup** `rerandomize_takeup()` draws new takeup per census block, breaking the fixed-takeup assumption.\n",
1140+
"5. **Build matrix** `UnifiedMatrixBuilder.build_matrix()` assembles the sparse CSR matrix from all clones.\n",
1141+
"6. **Stacked datasets** `create_sparse_cd_stacked_dataset()` converts calibrated weights into CD-level h5 files.\n",
11431142
"\n",
11441143
"For matrix diagnostics (row/column anatomy, target groups, sparsity analysis), see [calibration_internals.ipynb](calibration_internals.ipynb)."
11451144
]

modal_app/data_build.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -591,18 +591,26 @@ def build_datasets(
591591

592592
# Copy pipeline artifacts to shared volume before tests so that a test
593593
# failure does not block downstream calibration steps.
594-
# Files selected:
595-
# - source_imputed H5: main dataset for calibration and local area builds
596-
# - policy_data.db: calibration target database
597-
# - calibration_weights.npy: pre-existing weights for re-runs (if present)
598-
# - build_log.txt: persistent build log with provenance
599594
print("Copying pipeline artifacts to shared volume...")
600595
artifacts_dir = Path(PIPELINE_MOUNT) / "artifacts"
601596
artifacts_dir.mkdir(parents=True, exist_ok=True)
602-
shutil.copy2(
603-
"policyengine_us_data/storage/source_imputed_stratified_extended_cps_2024.h5",
604-
artifacts_dir / "source_imputed_stratified_extended_cps.h5",
605-
)
597+
598+
# Copy all intermediate H5 datasets for lineage tracing
599+
for output in SCRIPT_OUTPUTS.values():
600+
paths = output if isinstance(output, list) else [output]
601+
for p in paths:
602+
src = Path(p)
603+
if src.suffix == ".h5" and src.exists():
604+
shutil.copy2(src, artifacts_dir / src.name)
605+
print(
606+
f" Copied {src.name} ({src.stat().st_size / 1024 / 1024:.1f} MB)"
607+
)
608+
609+
# Yearless alias for pipeline consumers (remote_calibration_runner, local_area)
610+
si = artifacts_dir / "source_imputed_stratified_extended_cps_2024.h5"
611+
if si.exists():
612+
shutil.copy2(si, artifacts_dir / "source_imputed_stratified_extended_cps.h5")
613+
606614
shutil.copy2(
607615
"policyengine_us_data/storage/calibration/policy_data.db",
608616
artifacts_dir / "policy_data.db",
@@ -613,7 +621,7 @@ def build_datasets(
613621
cal_weights,
614622
artifacts_dir / "calibration_weights.npy",
615623
)
616-
print("Copied existing calibration_weights.npy to pipeline volume")
624+
print(" Copied calibration_weights.npy")
617625
shutil.copy2(log_path, artifacts_dir / "build_log.txt")
618626
log_file.close()
619627
pipeline_volume.commit()

modal_app/pipeline.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -305,21 +305,19 @@ def stage_base_datasets(
305305
"""
306306
artifacts = Path(ARTIFACTS_DIR)
307307

308-
source_imputed = artifacts / "source_imputed_stratified_extended_cps.h5"
309-
policy_db = artifacts / "policy_data.db"
310-
311308
files_with_paths = []
312-
if source_imputed.exists():
313-
files_with_paths.append(
314-
(
315-
str(source_imputed),
316-
"calibration/source_imputed_stratified_extended_cps.h5",
317-
)
318-
)
319-
print(f" source_imputed: {source_imputed.stat().st_size:,} bytes")
320-
else:
321-
print(" WARNING: source_imputed not found, skipping")
322309

310+
# Stage all intermediate H5 datasets for lineage tracing
311+
# source_imputed* goes to calibration/ (promote expects that path)
312+
for h5_file in sorted(artifacts.glob("*.h5")):
313+
if h5_file.name.startswith("source_imputed"):
314+
repo_path = f"calibration/{h5_file.name}"
315+
else:
316+
repo_path = f"datasets/{h5_file.name}"
317+
files_with_paths.append((str(h5_file), repo_path))
318+
print(f" {h5_file.name} -> {repo_path}: {h5_file.stat().st_size:,} bytes")
319+
320+
policy_db = artifacts / "policy_data.db"
323321
if policy_db.exists():
324322
files_with_paths.append((str(policy_db), "calibration/policy_data.db"))
325323
print(f" policy_data.db: {policy_db.stat().st_size:,} bytes")

modal_app/worker_script.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,6 @@ def main():
250250
from policyengine_us_data.calibration.validate_staging import (
251251
_query_all_active_targets,
252252
_batch_stratum_constraints,
253-
CSV_COLUMNS,
254253
)
255254
from policyengine_us_data.calibration.unified_calibration import (
256255
load_target_config,

paper/scripts/calculate_target_performance.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import numpy as np
1010
from pathlib import Path
1111
import json
12-
from typing import Dict, List, Tuple
12+
from typing import Dict, List
1313

1414

1515
def calculate_target_achievement(

paper/scripts/generate_all_tables.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
"""
77

88
import pandas as pd
9-
import numpy as np
109
from pathlib import Path
11-
import os
1210

1311

1412
def format_number(value, decimals=3):

paper/scripts/generate_validation_metrics.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
"""
88

99
import pandas as pd
10-
import numpy as np
1110
from policyengine_us import Microsimulation
1211
from policyengine_us_data.datasets.cps.enhanced_cps import EnhancedCPS
1312
from policyengine_us_data.datasets.cps.cps import CPS

paper/scripts/markdown_to_latex.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
"""
77

88
import re
9-
import os
109
from pathlib import Path
1110

1211

policyengine_us_data/calibration/calibration_utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,6 @@ def get_cd_index_mapping(db_uri: str = None):
491491
tuple: (cd_to_index dict, index_to_cd dict, cds_ordered list)
492492
"""
493493
from sqlalchemy import create_engine, text
494-
from pathlib import Path
495494
from policyengine_us_data.storage import STORAGE_FOLDER
496495

497496
if db_uri is None:

policyengine_us_data/calibration/clone_and_assign.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ def load_global_block_distribution():
5151

5252
df = pd.read_csv(csv_path, dtype={"block_geoid": str})
5353

54+
# Normalize at-large districts: Census uses 00 (and 98 for DC) → 01
55+
district_num = df["cd_geoid"] % 100
56+
state_fips_col = df["cd_geoid"] // 100
57+
at_large = (district_num == 0) | ((state_fips_col == 11) & (district_num == 98))
58+
df.loc[at_large, "cd_geoid"] = state_fips_col[at_large] * 100 + 1
59+
5460
block_geoids = df["block_geoid"].values
5561
cd_geoids = np.array(df["cd_geoid"].astype(str).tolist())
5662
state_fips = np.array([int(b[:2]) for b in block_geoids])

0 commit comments

Comments
 (0)