Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## Updating data

If your changes present a non-bugfix change to one or more datasets which are cloud-hosted (CPS, ECPS and PUF), then please change both the filename and URL (in both the class definition file and in `storage/upload_completed_datasets.py`. This enables us to store historical versions of datasets separately and reproducibly.

## Opening PRs

Push PR branches to the upstream `PolicyEngine/policyengine-us-data` repository, not to a personal fork. From the repo root, run:

`make push-pr-branch`

This avoids the fork-only CI failure path and sets the upstream tracking branch correctly before opening the PR.
6 changes: 4 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
- **CRITICAL**: NEVER create PRs from personal forks - ALL PRs MUST be created from branches pushed to the upstream PolicyEngine repository
- CI requires access to secrets that are not available to fork PRs for security reasons
- Fork PRs will fail on data download steps and cannot be merged
- Before opening a PR, always run `make push-pr-branch` from the repo root. This pushes the current branch to the `upstream` remote and sets the upstream tracking branch correctly for PR creation.
- Do not prefix PR titles with `[codex]` or any other agent label. Use the plain descriptive title.
- Always create branches directly on the upstream repository:
```bash
git checkout main
git pull upstream main
git checkout -b your-branch-name
git push -u upstream your-branch-name
make push-pr-branch
```
- Use descriptive branch names like `fix-issue-123` or `add-feature-name`
- Always run `make format` before committing
Expand Down Expand Up @@ -62,4 +64,4 @@
- Blacklisting from future publications
- Damage to institutional reputation
- Legal consequences in funded research
- Career-ending academic misconduct charges
- Career-ending academic misconduct charges
19 changes: 18 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
.PHONY: all format test install download upload docker documentation data validate-data calibrate calibrate-build publish-local-area upload-calibration upload-dataset upload-database push-to-modal build-data-modal build-matrices calibrate-modal calibrate-modal-national calibrate-both stage-h5s stage-national-h5 stage-all-h5s pipeline validate-staging validate-staging-full upload-validation check-staging check-sanity clean build paper clean-paper presentations database database-refresh promote-database promote-dataset promote build-h5s validate-local
.PHONY: all format test install download upload docker documentation data validate-data calibrate calibrate-build publish-local-area upload-calibration upload-dataset upload-database push-to-modal build-data-modal build-matrices calibrate-modal calibrate-modal-national calibrate-both stage-h5s stage-national-h5 stage-all-h5s pipeline validate-staging validate-staging-full upload-validation check-staging check-sanity clean build paper clean-paper presentations database database-refresh promote-database promote-dataset promote build-h5s validate-local refresh-soi-targets push-pr-branch

SOI_SOURCE_YEAR ?= 2021
SOI_TARGET_YEAR ?= 2023

GPU ?= T4
EPOCHS ?= 1000
Expand All @@ -8,6 +11,8 @@ BRANCH ?= $(shell git rev-parse --abbrev-ref HEAD)
NUM_WORKERS ?= 8
N_CLONES ?= 430
VERSION ?=
SOI_SOURCE_YEAR ?= 2021
SOI_TARGET_YEAR ?= 2023

HF_CLONE_DIR ?= $(HOME)/huggingface/policyengine-us-data

Expand Down Expand Up @@ -139,6 +144,18 @@ validate-local:
validate-data:
python -c "from policyengine_us_data.storage.upload_completed_datasets import validate_all_datasets; validate_all_datasets()"

refresh-soi-targets:
python policyengine_us_data/storage/calibration_targets/refresh_soi_table_targets.py \
--source-year $(SOI_SOURCE_YEAR) \
--target-year $(SOI_TARGET_YEAR)

push-pr-branch:
@if [ "$(BRANCH)" = "main" ]; then \
echo "Refusing to push main as a PR branch."; \
exit 1; \
fi
@git push -u upstream $(BRANCH)

upload-calibration:
python -c "from policyengine_us_data.utils.huggingface import upload_calibration_artifacts; \
upload_calibration_artifacts()"
Expand Down
4 changes: 4 additions & 0 deletions changelog.d/refresh-soi-targets-2023.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Refresh tracked national SOI workbook targets through TY2023, backfill TY2022,
teach `get_soi()` to pick the best available source year per variable, and
overlay the national DB IRS-SOI targets that can now use the newer workbook
release instead of staying stuck on the TY2022 geography file.
126 changes: 85 additions & 41 deletions policyengine_us_data/calibration/unified_matrix_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,7 @@ def __init__(
self.time_period = time_period
self.dataset_path = dataset_path
self._entity_rel_cache = None
self._target_overview_columns = None

# ---------------------------------------------------------------
# Entity relationships
Expand Down Expand Up @@ -959,8 +960,8 @@ def _build_state_values(
sim,
target_vars: set,
constraint_vars: set,
reform_vars: set,
geography,
reform_vars: set = None,
geography=None,
rerandomize_takeup: bool = True,
workers: int = 1,
) -> dict:
Expand Down Expand Up @@ -997,6 +998,9 @@ def _build_state_values(
TAKEUP_AFFECTED_TARGETS,
)

if geography is None:
raise ValueError("geography is required")

unique_states = sorted(set(int(s) for s in geography.state_fips))
n_hh = geography.n_records

Expand All @@ -1022,7 +1026,7 @@ def _build_state_values(
# Convert sets to sorted lists for deterministic iteration
target_vars_list = sorted(target_vars)
constraint_vars_list = sorted(constraint_vars)
reform_vars_list = sorted(reform_vars)
reform_vars_list = sorted(reform_vars or set())

state_values = {}

Expand Down Expand Up @@ -1518,63 +1522,103 @@ def _get_stratum_constraints(self, stratum_id: int) -> List[dict]:
)
return df.to_dict("records")

def _get_target_overview_columns(self) -> set:
if self._target_overview_columns is None:
with self.engine.connect() as conn:
rows = conn.execute(
text("PRAGMA table_info(target_overview)")
).fetchall()
self._target_overview_columns = {row[1] for row in rows}
return self._target_overview_columns

def _query_targets(self, target_filter: dict) -> pd.DataFrame:
"""Query targets via target_overview view with
best-period selection."""
or_conditions = []
and_conditions = []

if "domain_variables" in target_filter:
dvs = target_filter["domain_variables"]
ph = ",".join(f"'{dv}'" for dv in dvs)
or_conditions.append(f"tv.domain_variable IN ({ph})")
and_conditions.append(f"tv.domain_variable IN ({ph})")

if "variables" in target_filter:
vs = ",".join(f"'{v}'" for v in target_filter["variables"])
or_conditions.append(f"tv.variable IN ({vs})")
and_conditions.append(f"tv.variable IN ({vs})")

if "target_ids" in target_filter:
ids = ",".join(map(str, target_filter["target_ids"]))
or_conditions.append(f"tv.target_id IN ({ids})")
and_conditions.append(f"tv.target_id IN ({ids})")

if "stratum_ids" in target_filter:
ids = ",".join(map(str, target_filter["stratum_ids"]))
or_conditions.append(f"tv.stratum_id IN ({ids})")
and_conditions.append(f"tv.stratum_id IN ({ids})")

if not or_conditions:
if not and_conditions:
where_clause = "1=1"
else:
where_clause = " OR ".join(f"({c})" for c in or_conditions)

query = f"""
WITH filtered_targets AS (
SELECT tv.target_id, tv.stratum_id, tv.variable, tv.reform_id,
tv.value, tv.period, tv.geo_level,
tv.geographic_id, tv.domain_variable
FROM target_overview tv
WHERE tv.active = 1
AND ({where_clause})
),
best_periods AS (
SELECT stratum_id, variable, reform_id,
CASE
WHEN MAX(CASE WHEN period <= :time_period
THEN period END) IS NOT NULL
THEN MAX(CASE WHEN period <= :time_period
THEN period END)
ELSE MIN(period)
END as best_period
FROM filtered_targets
GROUP BY stratum_id, variable, reform_id
)
SELECT ft.*
FROM filtered_targets ft
JOIN best_periods bp
ON ft.stratum_id = bp.stratum_id
AND ft.variable = bp.variable
AND ft.reform_id = bp.reform_id
AND ft.period = bp.best_period
ORDER BY ft.target_id
"""
where_clause = " AND ".join(f"({c})" for c in and_conditions)

if "reform_id" in self._get_target_overview_columns():
query = f"""
WITH filtered_targets AS (
SELECT tv.target_id, tv.stratum_id, tv.variable, tv.reform_id,
tv.value, tv.period, tv.geo_level,
tv.geographic_id, tv.domain_variable
FROM target_overview tv
WHERE tv.active = 1
AND ({where_clause})
),
best_periods AS (
SELECT stratum_id, variable, reform_id,
CASE
WHEN MAX(CASE WHEN period <= :time_period
THEN period END) IS NOT NULL
THEN MAX(CASE WHEN period <= :time_period
THEN period END)
ELSE MIN(period)
END as best_period
FROM filtered_targets
GROUP BY stratum_id, variable, reform_id
)
SELECT ft.*
FROM filtered_targets ft
JOIN best_periods bp
ON ft.stratum_id = bp.stratum_id
AND ft.variable = bp.variable
AND ft.reform_id = bp.reform_id
AND ft.period = bp.best_period
ORDER BY ft.target_id
"""
else:
query = f"""
WITH filtered_targets AS (
SELECT tv.target_id, tv.stratum_id, tv.variable,
0 AS reform_id, tv.value, tv.period, tv.geo_level,
tv.geographic_id, tv.domain_variable
FROM target_overview tv
WHERE tv.active = 1
AND ({where_clause})
),
best_periods AS (
SELECT stratum_id, variable,
CASE
WHEN MAX(CASE WHEN period <= :time_period
THEN period END) IS NOT NULL
THEN MAX(CASE WHEN period <= :time_period
THEN period END)
ELSE MIN(period)
END as best_period
FROM filtered_targets
GROUP BY stratum_id, variable
)
SELECT ft.*
FROM filtered_targets ft
JOIN best_periods bp
ON ft.stratum_id = bp.stratum_id
AND ft.variable = bp.variable
AND ft.period = bp.best_period
ORDER BY ft.target_id
"""

with self.engine.connect() as conn:
return pd.read_sql(
Expand Down
Loading
Loading