Skip to content
This repository was archived by the owner on Jun 14, 2026. It is now read-only.

Commit 49f31d0

Browse files
MaxGhenisclaude
andauthored
Adopt microunit for tax-unit reconstruction (scoped, behavior-preserving; part of #113) (#114)
* Delegate PE tax-unit reconstruction to microunit (part of #113) Add microunit as a dependency and route the reconstruction-from-scratch tax-unit path through microunit.construct_tax_units when the person frame carries microunit's raw CPS input columns (PH_SEQ, A_LINENO, A_MARITL, A_SPOUSE, PEPAR1, PEPAR2, A_EXPRRP). When those columns are absent -- the current production case, since microplex's reconstruction frame collapses relationship_to_head and drops the spouse/parent pointers -- the new USPipeline._build_policyengine_tax_units_via_microunit returns None and the legacy role-flag reconstruction runs unchanged. The authoritative-ID path (#112) is never routed here. Net effect is behavior-preserving on today's data: the delegation stays inert until an upstream change threads CPS columns through to entity construction. microunit IS eCPS's tax-unit engine, so activating the delegation converges microplex's tax units toward eCPS's; any resulting loss movement is an entity-convergence effect and must be interpreted as such, not as a quality win (see #113). Adds tests/pipelines/test_us_microunit_delegation.py (4 passing); ruff clean. Implementation produced by the parallel wire-microunit agent; verified (ruff + delegation tests) and committed here. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Depend on microunit from PyPI (>=0.1.0) instead of the git pin microunit 0.1.0 is now published to PyPI, so drop the pre-PyPI git+https commit pin in favor of a standard version constraint. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Harden microunit delegation: defensive filing-status normalize + order test - Route microunit's filing_status through _normalize_policyengine_filing_status so the delegated path cannot diverge from the legacy paths if microunit ever changes its spelling/casing (today the vocabularies already match). - Add a regression test feeding rows out of PH_SEQ/A_LINENO order, asserting correct unit/role/filing assignment — locks in microunit's input-row-order contract that the positional TAX_ID mapping relies on. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 76fb8e6 commit 49f31d0

4 files changed

Lines changed: 481 additions & 0 deletions

File tree

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ policyengine = [
3535
"microimpute==1.15.1 ; python_full_version >= '3.12' and python_full_version < '3.15'",
3636
"policyengine-us==1.715.2; python_version >= '3.11' and python_version < '3.15'",
3737
"spm-calculator>=0.3.1",
38+
# Standalone tax-unit construction engine (the extraction of eCPS's
39+
# tax-unit logic), used by the PolicyEngine pipeline to reconstruct tax
40+
# units from CPS-like person frames (issue #113).
41+
"microunit>=0.1.0",
3842
]
3943

4044
[project.urls]

src/microplex_us/pipelines/us.py

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6262,12 +6262,198 @@ def _build_policyengine_tax_units(
62626262
tax_units = pd.DataFrame(tax_unit_rows)
62636263
return tax_units, person_rows
62646264

6265+
# Raw CPS ASEC columns that ``microunit.construct_tax_units`` consumes to
6266+
# reconstruct tax units. ``microunit`` is the standalone extraction of
6267+
# eCPS's tax-unit logic (issue #113); it is *source-agnostic* and expects
6268+
# this normalized CPS-like contract rather than microplex's collapsed
6269+
# ``relationship_to_head`` coding. We only delegate when the frame actually
6270+
# carries these columns, so the delegation is behavior-preserving on
6271+
# today's frames (which do not carry them) and only becomes active once an
6272+
# upstream change threads CPS columns through to entity construction.
6273+
_MICROUNIT_REQUIRED_CPS_COLUMNS = (
6274+
"PH_SEQ",
6275+
"A_LINENO",
6276+
"A_AGE",
6277+
"A_MARITL",
6278+
"A_SPOUSE",
6279+
"PEPAR1",
6280+
"PEPAR2",
6281+
"A_EXPRRP",
6282+
)
6283+
6284+
def _build_policyengine_tax_units_via_microunit(
6285+
self,
6286+
persons: pd.DataFrame,
6287+
*,
6288+
start_tax_unit_id: int = 0,
6289+
) -> tuple[pd.DataFrame, pd.DataFrame, set[Any]] | None:
6290+
"""Reconstruct tax units by delegating to ``microunit`` (issue #113).
6291+
6292+
This is the *reconstruction-from-scratch* path. The authoritative-ID
6293+
path (#112, :meth:`_build_policyengine_tax_units_from_existing_ids`) is
6294+
handled separately and is never routed here.
6295+
6296+
Delegation only happens when ``persons`` carries the raw CPS columns in
6297+
:attr:`_MICROUNIT_REQUIRED_CPS_COLUMNS`. ``microunit``'s logic genuinely
6298+
depends on marital status (``A_MARITL``), spouse/parent line pointers
6299+
(``A_SPOUSE``/``PEPAR1``/``PEPAR2``) and the CPS relationship recode
6300+
(``A_EXPRRP``); microplex's reconstruction-stage frame collapses
6301+
relationship into a 0/1/2/3 coding and drops the pointer columns, so a
6302+
*faithful* mapping is not possible from that frame. Rather than fabricate
6303+
microunit inputs (which would silently change behavior), we return
6304+
``None`` when the columns are absent and let the caller fall back to the
6305+
legacy role-flag reconstruction.
6306+
6307+
.. warning::
6308+
``microunit`` *is* eCPS's tax-unit construction. Routing microplex
6309+
through it makes microplex's constructed tax units **converge toward
6310+
eCPS's**. Any loss change from enabling this delegation is an
6311+
*entity-convergence* effect and must be interpreted as such, not as
6312+
a quality improvement. See issue #113.
6313+
6314+
Returns the same ``(tax_units, person_rows, households)`` triple shape as
6315+
:meth:`_build_policyengine_tax_units_from_role_flags`, or ``None`` to
6316+
defer to the caller's fallback.
6317+
"""
6318+
if "person_id" not in persons.columns or "household_id" not in persons.columns:
6319+
return None
6320+
if not set(self._MICROUNIT_REQUIRED_CPS_COLUMNS).issubset(persons.columns):
6321+
return None
6322+
6323+
# Imported lazily to match this module's optional-dependency convention:
6324+
# ``microunit`` ships in the ``policyengine`` extra, and the base test
6325+
# suite must import this module without that extra installed.
6326+
from microunit import POLICYENGINE_MODE, construct_tax_units
6327+
6328+
# microunit keys its CPS-style frame on (PH_SEQ, A_LINENO); resetting the
6329+
# index keeps row order so the returned per-person TAX_ID and role align
6330+
# positionally back onto person_rows.
6331+
person_rows = persons.reset_index(drop=True).copy()
6332+
person_assignments, tax_unit = construct_tax_units(
6333+
person_rows.copy(),
6334+
year=self._microunit_reference_year(person_rows),
6335+
mode=POLICYENGINE_MODE,
6336+
)
6337+
6338+
tax_id = pd.to_numeric(person_assignments["TAX_ID"], errors="coerce")
6339+
person_rows["tax_unit_id"] = (
6340+
tax_id.to_numpy() + int(start_tax_unit_id)
6341+
).astype(np.int64)
6342+
# microunit emits an authoritative per-person HEAD/SPOUSE/DEPENDENT role;
6343+
# use it directly for the filer/dependent split rather than re-deriving
6344+
# from the (possibly absent) collapsed relationship_to_head coding.
6345+
person_rows["_microunit_role"] = [
6346+
self._decode_microunit_bytes(role)
6347+
for role in person_assignments["tax_unit_role_input"].tolist()
6348+
]
6349+
6350+
# microunit emits the canonical filing-status vocabulary already, but
6351+
# normalize defensively so this path can never diverge from the legacy
6352+
# paths if microunit ever changes its spelling/casing.
6353+
filing_status_by_unit = {
6354+
int(row_tax_id) + int(start_tax_unit_id): (
6355+
self._normalize_policyengine_filing_status(
6356+
self._decode_microunit_bytes(filing_value)
6357+
)
6358+
)
6359+
for row_tax_id, filing_value in zip(
6360+
tax_unit["TAX_ID"].tolist(),
6361+
tax_unit["filing_status_input"].tolist(),
6362+
strict=True,
6363+
)
6364+
}
6365+
6366+
tax_unit_rows: list[dict[str, Any]] = []
6367+
for unit_id, unit_persons in person_rows.groupby("tax_unit_id", sort=False):
6368+
ordered = unit_persons.sort_values(
6369+
["_microunit_role", "age", "person_id"],
6370+
ascending=[True, False, True],
6371+
).reset_index(drop=True)
6372+
is_filer = ordered["_microunit_role"].isin(["HEAD", "SPOUSE"])
6373+
filer_ids = [
6374+
int(person_id) for person_id in ordered.loc[is_filer, "person_id"]
6375+
]
6376+
dependent_ids = [
6377+
int(person_id) for person_id in ordered.loc[~is_filer, "person_id"]
6378+
]
6379+
if not filer_ids:
6380+
filer_ids = [int(ordered.iloc[0]["person_id"])]
6381+
dependent_ids = [
6382+
int(person_id)
6383+
for person_id in ordered["person_id"].tolist()
6384+
if int(person_id) not in filer_ids
6385+
]
6386+
tax_unit_rows.append(
6387+
{
6388+
"tax_unit_id": int(unit_id),
6389+
"household_id": int(ordered.iloc[0]["household_id"]),
6390+
"filing_status": filing_status_by_unit.get(int(unit_id), "SINGLE"),
6391+
"member_ids": [
6392+
int(person_id) for person_id in ordered["person_id"]
6393+
],
6394+
"filer_ids": filer_ids,
6395+
"dependent_ids": dependent_ids,
6396+
"n_dependents": len(dependent_ids),
6397+
"total_income": float(
6398+
pd.to_numeric(ordered.get("income", 0.0), errors="coerce")
6399+
.fillna(0.0)
6400+
.sum()
6401+
),
6402+
"tax_liability": 0.0,
6403+
**self._aggregate_policyengine_tax_unit_input_columns(ordered),
6404+
}
6405+
)
6406+
6407+
if not tax_unit_rows:
6408+
return None
6409+
6410+
households = set(person_rows["household_id"].drop_duplicates().tolist())
6411+
person_rows = person_rows.drop(columns=["_microunit_role"], errors="ignore")
6412+
return pd.DataFrame(tax_unit_rows), person_rows, households
6413+
6414+
@staticmethod
6415+
def _decode_microunit_bytes(value: Any) -> str:
6416+
"""Decode a ``microunit`` bytes-typed status/role into an upper string."""
6417+
if isinstance(value, bytes):
6418+
return value.decode()
6419+
return str(value)
6420+
6421+
def _microunit_reference_year(self, persons: pd.DataFrame) -> int:
6422+
"""Year passed to ``microunit`` for its dependency income thresholds.
6423+
6424+
Prefers an explicit ``year``/``tax_year`` column when the frame carries
6425+
one; otherwise falls back to the pipeline's configured reference year so
6426+
the only year-dependent behavior (the qualifying-relative gross income
6427+
limit) matches the rest of the pipeline. TODO(#113): thread the dataset
6428+
reference year through entity construction explicitly.
6429+
"""
6430+
for column in ("year", "tax_year"):
6431+
if column in persons.columns:
6432+
values = pd.to_numeric(persons[column], errors="coerce").dropna()
6433+
if not values.empty:
6434+
return int(values.iloc[0])
6435+
configured = getattr(self.config, "reference_year", None)
6436+
if configured is not None:
6437+
return int(configured)
6438+
return 2024
6439+
62656440
def _build_policyengine_tax_units_from_role_flags(
62666441
self,
62676442
persons: pd.DataFrame,
62686443
*,
62696444
start_tax_unit_id: int = 0,
62706445
) -> tuple[pd.DataFrame, pd.DataFrame, set[Any]] | None:
6446+
# Issue #113: when the frame carries microunit's CPS-style input
6447+
# columns, delegate the reconstruction to microunit. Otherwise fall
6448+
# through to the legacy role-flag reconstruction below (the current
6449+
# production path, since these columns are not yet threaded through).
6450+
microunit_result = self._build_policyengine_tax_units_via_microunit(
6451+
persons,
6452+
start_tax_unit_id=start_tax_unit_id,
6453+
)
6454+
if microunit_result is not None:
6455+
return microunit_result
6456+
62716457
role_columns = {
62726458
"is_tax_unit_head",
62736459
"is_tax_unit_spouse",
@@ -7326,6 +7512,16 @@ def _coerce_policyengine_status_code(self, value: Any) -> int | None:
73267512
return int(numeric)
73277513

73287514
def _assign_family_and_spm_units(self, persons: pd.DataFrame) -> pd.DataFrame:
7515+
"""Assign family and SPM units, preserving authoritative IDs when present.
7516+
7517+
NOT delegated to ``microunit`` in this pass (issue #113). At the pinned
7518+
commit ``microunit.units.spm.assign_spm_partition`` is documented as "a
7519+
conservative adapter, not yet the full Census-parity constructor" and is
7520+
not exported from microunit's public API, and microunit has no
7521+
family-unit constructor. The authoritative-ID fast path is preserved
7522+
here. TODO(#113): delegate once microunit grows a Census-parity
7523+
SPM/family constructor.
7524+
"""
73297525
result = persons.copy()
73307526
preserved_family_ids = self._normalized_complete_existing_group_ids(
73317527
result,
@@ -7399,6 +7595,14 @@ def _assign_marital_units(
73997595
self,
74007596
persons: pd.DataFrame,
74017597
) -> pd.DataFrame:
7598+
"""Assign marital units, preserving authoritative IDs when present.
7599+
7600+
NOT delegated to ``microunit`` in this pass (issue #113): microunit does
7601+
not construct marital units at the pinned commit (filing status is its
7602+
only marital-related output; there is no ``construct_marital_units``).
7603+
The authoritative-ID fast path is preserved here. TODO(#113): revisit if
7604+
microunit grows marital-unit support.
7605+
"""
74027606
result = persons.copy()
74037607
preserved_marital_unit_ids = self._normalized_complete_existing_group_ids(
74047608
result,

0 commit comments

Comments
 (0)