Skip to content
This repository was archived by the owner on Jun 14, 2026. It is now read-only.

Commit 76fb8e6

Browse files
authored
Restore eCPS export and entity ID parity
Derive is_household_head for PE export and preserve complete existing family/SPM/marital IDs during entity-table construction.
1 parent 188a940 commit 76fb8e6

4 files changed

Lines changed: 526 additions & 224 deletions

File tree

src/microplex_us/pipelines/us.py

Lines changed: 113 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,9 @@ def _select_ssi_takeup_by_age_amount(
502502
person_ids = person_ids.reindex(index)
503503
age_values = pd.to_numeric(ages.reindex(index), errors="coerce").fillna(0.0)
504504
weight_values = (
505-
pd.to_numeric(weights.reindex(index), errors="coerce").fillna(0.0).clip(lower=0.0)
505+
pd.to_numeric(weights.reindex(index), errors="coerce")
506+
.fillna(0.0)
507+
.clip(lower=0.0)
506508
)
507509
reported_values = (
508510
pd.to_numeric(reported_ssi.reindex(index), errors="coerce")
@@ -555,7 +557,9 @@ def _select_until_amount(candidate_mask: np.ndarray, amount: float) -> None:
555557
group_summary[group_name] = {
556558
"reported_amount": target_amount,
557559
"reported_recipients": float(
558-
weight_values.to_numpy(dtype=float)[group_mask & reported_positive].sum()
560+
weight_values.to_numpy(dtype=float)[
561+
group_mask & reported_positive
562+
].sum()
559563
),
560564
"formula_all_takeup_amount": float(full_amount[group_mask].sum()),
561565
"formula_all_takeup_recipients": float(
@@ -3980,9 +3984,7 @@ def _calibrate_policyengine_ssi_takeup_from_reported_amounts(
39803984
"missing_columns": missing_columns,
39813985
}
39823986
reported_ssi = (
3983-
pd.to_numeric(persons["ssi"], errors="coerce")
3984-
.fillna(0.0)
3985-
.clip(lower=0.0)
3987+
pd.to_numeric(persons["ssi"], errors="coerce").fillna(0.0).clip(lower=0.0)
39863988
)
39873989
if not reported_ssi.gt(0.0).any():
39883990
persons["takes_up_ssi_if_eligible"] = False
@@ -4126,6 +4128,7 @@ def build_policyengine_entity_tables(
41264128
).transform("sum")
41274129
persons = self._augment_policyengine_person_inputs(persons)
41284130
persons["relationship_to_head"] = self._normalize_relationship_to_head(persons)
4131+
persons = self._assign_policyengine_household_head_flag(persons)
41294132

41304133
households = self._build_policyengine_households(persons)
41314134
tax_units, persons = self._build_policyengine_tax_units(persons)
@@ -6270,7 +6273,10 @@ def _build_policyengine_tax_units_from_role_flags(
62706273
"is_tax_unit_spouse",
62716274
"is_tax_unit_dependent",
62726275
}
6273-
if not role_columns.issubset(persons.columns) or "person_id" not in persons.columns:
6276+
if (
6277+
not role_columns.issubset(persons.columns)
6278+
or "person_id" not in persons.columns
6279+
):
62746280
return None
62756281

62766282
person_rows = persons.copy()
@@ -6348,9 +6354,7 @@ def _build_policyengine_tax_units_from_role_flags(
63486354
"tax_unit_id": global_tax_unit_id,
63496355
"household_id": int(household_id),
63506356
"filing_status": filing_status,
6351-
"member_ids": [
6352-
int(person_id) for person_id in unit_person_ids
6353-
],
6357+
"member_ids": [int(person_id) for person_id in unit_person_ids],
63546358
"filer_ids": [head_id, *spouse_ids],
63556359
"dependent_ids": dependent_ids,
63566360
"n_dependents": len(dependent_ids),
@@ -6466,12 +6470,9 @@ def _build_policyengine_tax_units_from_existing_ids(
64666470
.nunique()
64676471
)
64686472
if bool((households_per_tax_unit > 1).any()):
6469-
normalized_tax_unit_id = (
6470-
pd.factorize(pd.MultiIndex.from_frame(tax_unit_key), sort=False)[
6471-
0
6472-
].astype(np.int64)
6473-
+ int(start_tax_unit_id)
6474-
)
6473+
normalized_tax_unit_id = pd.factorize(
6474+
pd.MultiIndex.from_frame(tax_unit_key), sort=False
6475+
)[0].astype(np.int64) + int(start_tax_unit_id)
64756476
person_rows["tax_unit_id"] = normalized_tax_unit_id
64766477
else:
64776478
raw_tax_unit_id = raw_tax_unit_id.astype(np.int64)
@@ -6562,9 +6563,7 @@ def _resolve_tax_unit_role_flags(
65626563
& (~head_flag | dependent_hint | ~head_hint)
65636564
)
65646565
resolved_spouse = (
6565-
spouse_flag
6566-
& ~resolved_dependent
6567-
& (~head_flag | spouse_hint | ~head_hint)
6566+
spouse_flag & ~resolved_dependent & (~head_flag | spouse_hint | ~head_hint)
65686567
)
65696568
resolved_head = head_flag & ~resolved_spouse & ~resolved_dependent
65706569
return resolved_head, resolved_spouse, resolved_dependent
@@ -6687,9 +6686,7 @@ def _cohere_tax_unit_role_flags_for_household(
66876686
},
66886687
index=spouse_pool,
66896688
)
6690-
.sort_values(
6691-
["source_spouse", "relationship", "age", "person_id"]
6692-
)
6689+
.sort_values(["source_spouse", "relationship", "age", "person_id"])
66936690
.index[0]
66946691
)
66956692
if spouse_index is not None:
@@ -6700,26 +6697,21 @@ def _cohere_tax_unit_role_flags_for_household(
67006697
available
67016698
& (
67026699
dependent_flag
6703-
| (
6704-
dependent_hint
6705-
& (age.lt(24) | income.le(0.0))
6706-
)
6700+
| (dependent_hint & (age.lt(24) | income.le(0.0)))
67076701
| (spouse_hint & income.le(0.0))
67086702
)
67096703
] = True
67106704

67116705
available = ~(coherent_head | coherent_spouse | coherent_dependent)
6712-
coherent_head.loc[
6713-
available
6714-
& age.ge(18)
6715-
& (head_flag | income.gt(0.0))
6716-
] = True
6706+
coherent_head.loc[available & age.ge(18) & (head_flag | income.gt(0.0))] = True
67176707

67186708
coherent_dependent.loc[
67196709
~(coherent_head | coherent_spouse | coherent_dependent)
67206710
& (age.lt(18) | dependent_hint | income.le(0.0))
67216711
] = True
6722-
coherent_head.loc[~(coherent_head | coherent_spouse | coherent_dependent)] = True
6712+
coherent_head.loc[~(coherent_head | coherent_spouse | coherent_dependent)] = (
6713+
True
6714+
)
67236715

67246716
result["_is_tax_unit_head_flag"] = coherent_head
67256717
result["_is_tax_unit_spouse_flag"] = coherent_spouse
@@ -6803,12 +6795,10 @@ def _assign_role_flag_spouses(
68036795
head_by_person_number = {
68046796
int(person_number.loc[index]): int(row["person_id"])
68056797
for index, row in household_persons.iterrows()
6806-
if int(row["person_id"]) in head_set
6807-
and int(person_number.loc[index]) > 0
6798+
if int(row["person_id"]) in head_set and int(person_number.loc[index]) > 0
68086799
}
68096800
row_by_person_id = {
6810-
int(row["person_id"]): index
6811-
for index, row in household_persons.iterrows()
6801+
int(row["person_id"]): index for index, row in household_persons.iterrows()
68126802
}
68136803
assigned_spouses: set[int] = set()
68146804

@@ -6969,9 +6959,7 @@ def _infer_policyengine_aca_takeup_for_tax_unit(
69696959
marketplace = pd.Series(False, index=unit_persons.index, dtype=bool)
69706960
for column in observed:
69716961
marketplace |= (
6972-
pd.to_numeric(unit_persons[column], errors="coerce")
6973-
.fillna(0.0)
6974-
.ne(0.0)
6962+
pd.to_numeric(unit_persons[column], errors="coerce").fillna(0.0).ne(0.0)
69756963
)
69766964
return bool(marketplace.any())
69776965

@@ -7339,6 +7327,19 @@ def _coerce_policyengine_status_code(self, value: Any) -> int | None:
73397327

73407328
def _assign_family_and_spm_units(self, persons: pd.DataFrame) -> pd.DataFrame:
73417329
result = persons.copy()
7330+
preserved_family_ids = self._normalized_complete_existing_group_ids(
7331+
result,
7332+
"family_id",
7333+
)
7334+
preserved_spm_unit_ids = self._normalized_complete_existing_group_ids(
7335+
result,
7336+
"spm_unit_id",
7337+
)
7338+
if preserved_family_ids is not None and preserved_spm_unit_ids is not None:
7339+
result["family_id"] = preserved_family_ids
7340+
result["spm_unit_id"] = preserved_spm_unit_ids
7341+
return result
7342+
73427343
family_ids: dict[int, int] = {}
73437344
spm_unit_ids: dict[int, int] = {}
73447345
next_family_id = 0
@@ -7363,8 +7364,16 @@ def _assign_family_and_spm_units(self, persons: pd.DataFrame) -> pd.DataFrame:
73637364
family_ids[int(row.name)] = next_family_id
73647365
next_family_id += 1
73657366

7366-
result["family_id"] = result.index.map(family_ids).astype(np.int64)
7367-
result["spm_unit_id"] = result.index.map(spm_unit_ids).astype(np.int64)
7367+
result["family_id"] = (
7368+
preserved_family_ids
7369+
if preserved_family_ids is not None
7370+
else result.index.map(family_ids).astype(np.int64)
7371+
)
7372+
result["spm_unit_id"] = (
7373+
preserved_spm_unit_ids
7374+
if preserved_spm_unit_ids is not None
7375+
else result.index.map(spm_unit_ids).astype(np.int64)
7376+
)
73687377
return result
73697378

73707379
def _primary_family_member_mask(
@@ -7373,9 +7382,7 @@ def _primary_family_member_mask(
73737382
) -> pd.Series:
73747383
"""Identify people who belong to the household's primary family."""
73757384

7376-
relationship_primary = household_persons["relationship_to_head"].isin(
7377-
{0, 1, 2}
7378-
)
7385+
relationship_primary = household_persons["relationship_to_head"].isin({0, 1, 2})
73797386
if "family_relationship" not in household_persons.columns:
73807387
return relationship_primary
73817388

@@ -7393,6 +7400,14 @@ def _assign_marital_units(
73937400
persons: pd.DataFrame,
73947401
) -> pd.DataFrame:
73957402
result = persons.copy()
7403+
preserved_marital_unit_ids = self._normalized_complete_existing_group_ids(
7404+
result,
7405+
"marital_unit_id",
7406+
)
7407+
if preserved_marital_unit_ids is not None:
7408+
result["marital_unit_id"] = preserved_marital_unit_ids
7409+
return result
7410+
73967411
marital_unit_by_person: dict[int, int] = {}
73977412
next_marital_unit_id = 0
73987413

@@ -7421,6 +7436,60 @@ def _assign_marital_units(
74217436
)
74227437
return result
74237438

7439+
def _assign_policyengine_household_head_flag(
7440+
self,
7441+
persons: pd.DataFrame,
7442+
) -> pd.DataFrame:
7443+
result = persons.copy()
7444+
derived = (
7445+
pd.to_numeric(result["relationship_to_head"], errors="coerce")
7446+
.fillna(-1)
7447+
.eq(0)
7448+
)
7449+
if "is_household_head" not in result.columns:
7450+
result["is_household_head"] = derived
7451+
return result
7452+
7453+
existing = pd.to_numeric(result["is_household_head"], errors="coerce")
7454+
result["is_household_head"] = existing.where(existing.notna(), derived).gt(0.5)
7455+
return result
7456+
7457+
def _normalized_complete_existing_group_ids(
7458+
self,
7459+
persons: pd.DataFrame,
7460+
id_column: str,
7461+
) -> pd.Series | None:
7462+
if id_column not in persons.columns:
7463+
return None
7464+
raw_ids = persons[id_column]
7465+
if raw_ids.isna().any():
7466+
return None
7467+
7468+
raw_key = raw_ids.astype("string")
7469+
key = pd.DataFrame(
7470+
{
7471+
"household_id": persons["household_id"],
7472+
id_column: raw_key,
7473+
},
7474+
index=persons.index,
7475+
)
7476+
raw_numeric = pd.to_numeric(raw_ids, errors="coerce")
7477+
households_per_raw_id = key.groupby(id_column, dropna=False)[
7478+
"household_id"
7479+
].nunique()
7480+
must_factorize = raw_numeric.isna().any() or bool(
7481+
households_per_raw_id.gt(1).any()
7482+
)
7483+
if must_factorize:
7484+
return pd.Series(
7485+
pd.factorize(pd.MultiIndex.from_frame(key), sort=False)[0].astype(
7486+
np.int64
7487+
),
7488+
index=persons.index,
7489+
name=id_column,
7490+
)
7491+
return raw_numeric.astype(np.int64).rename(id_column)
7492+
74247493
def _collapse_group_table(
74257494
self,
74267495
persons: pd.DataFrame,
@@ -7742,9 +7811,7 @@ def has_any(*columns: str) -> bool:
77427811
)
77437812
if "is_blind" in result.columns:
77447813
result["is_blind"] = (
7745-
pd.to_numeric(result["is_blind"], errors="coerce")
7746-
.fillna(0.0)
7747-
.ne(0.0)
7814+
pd.to_numeric(result["is_blind"], errors="coerce").fillna(0.0).ne(0.0)
77487815
)
77497816
elif "difficulty_seeing" in result.columns:
77507817
result["is_blind"] = first_present("difficulty_seeing").gt(0.0)

0 commit comments

Comments
 (0)