Skip to content

Commit 42788ed

Browse files
authored
Harden long-run production metadata (#997)
1 parent 64b3e4b commit 42788ed

6 files changed

Lines changed: 329 additions & 64 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Stamp long-run H5 metadata with exact policyengine-us build provenance.

policyengine_us_data/datasets/cps/long_term/calibration_artifacts.py

Lines changed: 49 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22

33
from datetime import datetime, timezone
44
import hashlib
5-
from importlib import metadata as importlib_metadata
65
import json
76
from pathlib import Path
8-
import subprocess
97
from typing import Any
108

9+
from policyengine_us_data.utils.policyengine import (
10+
PolicyEngineUSBuildInfo,
11+
get_policyengine_us_build_info,
12+
)
13+
1114
try:
1215
from .calibration_profiles import (
1316
classify_calibration_quality,
@@ -27,6 +30,38 @@
2730
SUPPORT_AUGMENTATION_REPORT_FILENAME = "support_augmentation_report.json"
2831

2932

33+
def _coerce_policyengine_us_metadata(
34+
policyengine_us: PolicyEngineUSBuildInfo | dict[str, Any] | None,
35+
) -> dict[str, Any]:
36+
if policyengine_us is None:
37+
return get_policyengine_us_build_info().to_metadata_dict()
38+
if isinstance(policyengine_us, PolicyEngineUSBuildInfo):
39+
return policyengine_us.to_metadata_dict()
40+
coerced = json.loads(json.dumps(policyengine_us))
41+
git_head = coerced.get("git_head")
42+
if git_head is not None:
43+
coerced.setdefault("git_commit", git_head)
44+
coerced.setdefault("commit_id", git_head)
45+
coerced.setdefault(
46+
"direct_url",
47+
{"vcs_info": {"commit_id": git_head, "vcs": "git"}},
48+
)
49+
return coerced
50+
51+
52+
def _metadata_policyengine_us(
53+
metadata_path: str | Path,
54+
) -> dict[str, Any] | None:
55+
path = Path(metadata_path)
56+
if not path.exists():
57+
return None
58+
metadata = json.loads(path.read_text(encoding="utf-8"))
59+
policyengine_us = metadata.get("policyengine_us")
60+
if policyengine_us is None:
61+
return None
62+
return json.loads(json.dumps(policyengine_us))
63+
64+
3065
def metadata_path_for(h5_path: str | Path) -> Path:
3166
return Path(f"{Path(h5_path)}.metadata.json")
3267

@@ -39,55 +74,8 @@ def _sha256_file(path: Path) -> str:
3974
return hashlib.sha256(path.read_bytes()).hexdigest()
4075

4176

42-
def _find_git_repo_root(path: Path) -> Path | None:
43-
current = path if path.is_dir() else path.parent
44-
for candidate in (current, *current.parents):
45-
if (candidate / ".git").exists():
46-
return candidate
47-
return None
48-
49-
5077
def capture_policyengine_us_provenance() -> dict[str, Any]:
51-
import policyengine_us
52-
53-
package_file = Path(policyengine_us.__file__).resolve()
54-
version = getattr(policyengine_us, "__version__", None)
55-
if version is None:
56-
try:
57-
version = importlib_metadata.version("policyengine-us")
58-
except importlib_metadata.PackageNotFoundError:
59-
version = None
60-
provenance: dict[str, Any] = {
61-
"package_file": str(package_file),
62-
"package_file_sha256": _sha256_file(package_file),
63-
"package_mtime_ns": package_file.stat().st_mtime_ns,
64-
"package_size": package_file.stat().st_size,
65-
"version": version,
66-
}
67-
repo_root = _find_git_repo_root(package_file)
68-
if repo_root is None:
69-
return provenance
70-
71-
provenance["repo_root"] = str(repo_root)
72-
head = subprocess.run(
73-
["git", "rev-parse", "HEAD"],
74-
cwd=repo_root,
75-
check=False,
76-
capture_output=True,
77-
text=True,
78-
)
79-
if head.returncode == 0:
80-
provenance["git_head"] = head.stdout.strip()
81-
status = subprocess.run(
82-
["git", "status", "--porcelain=v1"],
83-
cwd=repo_root,
84-
check=False,
85-
capture_output=True,
86-
text=True,
87-
)
88-
if status.returncode == 0:
89-
provenance["git_dirty"] = bool(status.stdout.strip())
90-
return provenance
78+
return get_policyengine_us_build_info().to_metadata_dict()
9179

9280

9381
def _resolve_base_dataset_path(base_dataset_path: str) -> Path | None:
@@ -237,7 +225,7 @@ def write_year_metadata(
237225
target_source: dict[str, Any] | None = None,
238226
tax_assumption: dict[str, Any] | None = None,
239227
support_augmentation: dict[str, Any] | None = None,
240-
policyengine_us: dict[str, Any] | None = None,
228+
policyengine_us: PolicyEngineUSBuildInfo | dict[str, Any] | None = None,
241229
base_dataset_snapshot: dict[str, Any] | None = None,
242230
) -> Path:
243231
metadata = {
@@ -246,15 +234,14 @@ def write_year_metadata(
246234
"base_dataset_path": base_dataset_path,
247235
"profile": profile,
248236
"calibration_audit": calibration_audit,
237+
"policyengine_us": _coerce_policyengine_us_metadata(policyengine_us),
249238
}
250239
if target_source is not None:
251240
metadata["target_source"] = target_source
252241
if tax_assumption is not None:
253242
metadata["tax_assumption"] = tax_assumption
254243
if support_augmentation is not None:
255244
metadata["support_augmentation"] = support_augmentation
256-
if policyengine_us is not None:
257-
metadata["policyengine_us"] = policyengine_us
258245
if base_dataset_snapshot is not None:
259246
metadata["base_dataset_snapshot"] = base_dataset_snapshot
260247
metadata = normalize_metadata(metadata)
@@ -294,7 +281,7 @@ def update_dataset_manifest(
294281
target_source: dict[str, Any] | None = None,
295282
tax_assumption: dict[str, Any] | None = None,
296283
support_augmentation: dict[str, Any] | None = None,
297-
policyengine_us: dict[str, Any] | None = None,
284+
policyengine_us: PolicyEngineUSBuildInfo | dict[str, Any] | None = None,
298285
base_dataset_snapshot: dict[str, Any] | None = None,
299286
) -> Path:
300287
output_dir = Path(output_dir)
@@ -303,7 +290,13 @@ def update_dataset_manifest(
303290
target_source = _json_clone(target_source)
304291
tax_assumption = _json_clone(tax_assumption)
305292
support_augmentation = _json_clone(support_augmentation)
306-
policyengine_us = _json_clone(policyengine_us)
293+
policyengine_us = (
294+
_coerce_policyengine_us_metadata(policyengine_us)
295+
if policyengine_us is not None
296+
else _metadata_policyengine_us(metadata_path)
297+
)
298+
if policyengine_us is None:
299+
policyengine_us = get_policyengine_us_build_info().to_metadata_dict()
307300
base_dataset_snapshot = _json_clone(base_dataset_snapshot)
308301

309302
if manifest_path.exists():
@@ -402,6 +395,7 @@ def update_dataset_manifest(
402395
"negative_weight_household_pct": calibration_audit.get(
403396
"negative_weight_household_pct"
404397
),
398+
"policyengine_us_version": policyengine_us.get("version"),
405399
"validation_passed": calibration_audit.get("validation_passed"),
406400
"validation_issue_count": len(calibration_audit.get("validation_issues", [])),
407401
}

policyengine_us_data/datasets/cps/long_term/prototype_synthetic_2100_support.py

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,45 @@
110110
)
111111
PAYROLL_UPRATING_FACTOR_COLUMN = "__pe_payroll_uprating_factor"
112112
SS_UPRATING_FACTOR_COLUMN = "__pe_ss_uprating_factor"
113+
NON_TARGET_CLONE_INCOME_COMPONENTS = (
114+
"alimony_income",
115+
"disability_benefits",
116+
"estate_income",
117+
"farm_income",
118+
"farm_operations_income",
119+
"farm_rent_income",
120+
"keogh_distributions",
121+
"long_term_capital_gains_before_response",
122+
"long_term_capital_gains_on_collectibles",
123+
"miscellaneous_income",
124+
"non_qualified_dividend_income",
125+
"non_sch_d_capital_gains",
126+
"partnership_s_corp_income",
127+
"partnership_se_income",
128+
"qualified_bdc_income",
129+
"qualified_dividend_income",
130+
"qualified_reit_and_ptp_income",
131+
"rental_income",
132+
"salt_refund_income",
133+
"short_term_capital_gains",
134+
"strike_benefits",
135+
"tax_exempt_401k_distributions",
136+
"tax_exempt_403b_distributions",
137+
"tax_exempt_interest_income",
138+
"tax_exempt_ira_distributions",
139+
"tax_exempt_private_pension_income",
140+
"tax_exempt_sep_distributions",
141+
"taxable_401k_distributions",
142+
"taxable_403b_distributions",
143+
"taxable_interest_income",
144+
"taxable_ira_distributions",
145+
"taxable_private_pension_income",
146+
"taxable_sep_distributions",
147+
"unemployment_compensation",
148+
"unrecaptured_section_1250_gain",
149+
"veterans_benefits",
150+
"workers_compensation",
151+
)
113152

114153

115154
@dataclass(frozen=True)
@@ -685,14 +724,24 @@ def _scale_levels(levels: list[float], scale: float) -> list[float]:
685724

686725

687726
@lru_cache(maxsize=None)
688-
def load_policyengine_social_security_cap(year: int) -> float:
727+
def _load_policyengine_social_security_cap_without_reform(year: int) -> float:
689728
sim = Microsimulation(dataset=DEFAULT_DATASET)
690729
return validate_projected_social_security_cap(
691730
sim.tax_benefit_system.parameters,
692731
year,
693732
)
694733

695734

735+
def load_policyengine_social_security_cap(year: int, *, reform=None) -> float:
736+
if reform is None:
737+
return _load_policyengine_social_security_cap_without_reform(year)
738+
sim = Microsimulation(dataset=DEFAULT_DATASET, reform=reform)
739+
return validate_projected_social_security_cap(
740+
sim.tax_benefit_system.parameters,
741+
year,
742+
)
743+
744+
696745
def allocate_taxable_payroll_wages(
697746
total_taxable_payroll: float,
698747
payroll_split: tuple[float, float],
@@ -1868,6 +1917,21 @@ def _zero_period_columns(
18681917
return available_columns
18691918

18701919

1920+
def _zero_clone_non_target_income(
1921+
cloned: pd.DataFrame,
1922+
*,
1923+
base_year: int,
1924+
) -> pd.DataFrame:
1925+
columns = [
1926+
_period_column(component, base_year)
1927+
for component in NON_TARGET_CLONE_INCOME_COMPONENTS
1928+
if _period_column(component, base_year) in cloned.columns
1929+
]
1930+
if columns:
1931+
cloned.loc[:, columns] = 0.0
1932+
return cloned
1933+
1934+
18711935
def _target_base_total_for_row(
18721936
row: pd.Series,
18731937
*,
@@ -1964,6 +2028,7 @@ def _clone_tax_unit_rows_to_target(
19642028
_period_column(component, base_year) for component in SS_COMPONENTS
19652029
)
19662030
qbi_col = _period_column("w2_wages_from_qualified_business", base_year)
2031+
cloned = _zero_clone_non_target_income(cloned, base_year=base_year)
19672032

19682033
target_head_payroll = _target_base_total_for_row(
19692034
cloned.loc[head_idx],
@@ -2351,7 +2416,10 @@ def build_donor_backed_augmented_input_dataframe(
23512416
ss_scale=ss_scale,
23522417
earnings_scale=earnings_scale,
23532418
)
2354-
payroll_cap = load_policyengine_social_security_cap(target_year)
2419+
payroll_cap = load_policyengine_social_security_cap(
2420+
target_year,
2421+
reform=reform,
2422+
)
23552423
candidates = generate_synthetic_candidates(
23562424
pools,
23572425
payroll_cap=payroll_cap,
@@ -2499,7 +2567,10 @@ def build_role_composite_augmented_input_dataframe(
24992567
ss_scale=ss_scale,
25002568
earnings_scale=earnings_scale,
25012569
)
2502-
payroll_cap = load_policyengine_social_security_cap(target_year)
2570+
payroll_cap = load_policyengine_social_security_cap(
2571+
target_year,
2572+
reform=reform,
2573+
)
25032574
candidates = generate_synthetic_candidates(
25042575
pools,
25052576
payroll_cap=payroll_cap,

policyengine_us_data/datasets/cps/long_term/run_long_term_production.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
parse_years,
1616
)
1717
from policyengine_us_data.utils.data_upload import upload_to_staging_hf
18+
from policyengine_us_data.utils.policyengine import (
19+
PolicyEngineUSBuildInfo,
20+
assert_locked_policyengine_us_version,
21+
)
1822
from policyengine_us_data.utils.run_context import resolve_run_id, staging_prefix
1923

2024

@@ -176,6 +180,7 @@ def write_manifest(
176180
years: list[int],
177181
run_id: str,
178182
source_sha: str,
183+
policyengine_us_build: PolicyEngineUSBuildInfo,
179184
artifacts: list[Path],
180185
) -> Path:
181186
manifest_path = output_dir / "long_run_production_manifest.json"
@@ -197,6 +202,7 @@ def write_manifest(
197202
"policyengine-us": _package_version("policyengine-us"),
198203
"policyengine-core": _package_version("policyengine-core"),
199204
},
205+
"policyengine_us": policyengine_us_build.to_metadata_dict(),
200206
"projection": {
201207
"years_spec": args.years,
202208
"years": years,
@@ -329,6 +335,7 @@ def main() -> int:
329335
source_sha = args.source_sha or os.environ.get("GITHUB_SHA", "") or _git_sha()
330336

331337
command = build_projection_command(args, output_dir)
338+
policyengine_us_build = assert_locked_policyengine_us_version()
332339
print("Running long-run projection command:")
333340
print(" ".join(command))
334341
subprocess.run(command, check=True)
@@ -341,6 +348,7 @@ def main() -> int:
341348
years=years,
342349
run_id=run_id,
343350
source_sha=source_sha,
351+
policyengine_us_build=policyengine_us_build,
344352
artifacts=artifacts,
345353
)
346354
artifacts = collect_artifacts(output_dir, args.artifact_prefix)

0 commit comments

Comments
 (0)