Skip to content

Commit c042f27

Browse files
committed
Improve donor entity ID preparation
1 parent e6a009b commit c042f27

2 files changed

Lines changed: 75 additions & 2 deletions

File tree

src/microplex_us/pipelines/us.py

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import importlib.util
66
import logging
77
import sys
8+
import time
89
import warnings
910
from collections import Counter
1011
from collections.abc import Iterable
@@ -4059,13 +4060,26 @@ def _integrate_donor_sources(
40594060
if donor_block_spec.native_entity is not EntityType.PERSON
40604061
}
40614062
if required_entities:
4063+
_emit_us_pipeline_progress(
4064+
"US microplex donor integration: entity ids required",
4065+
donor_source=donor_source_name,
4066+
entities=_format_progress_values(
4067+
sorted(entity.value for entity in required_entities)
4068+
),
4069+
current_rows=len(current),
4070+
donor_rows=len(donor_seed),
4071+
)
40624072
current = self._ensure_seed_entity_ids(
40634073
current,
40644074
entities=required_entities,
4075+
frame_role="current",
4076+
donor_source_name=donor_source_name,
40654077
)
40664078
donor_seed = self._ensure_seed_entity_ids(
40674079
donor_seed,
40684080
entities=required_entities,
4081+
frame_role="donor",
4082+
donor_source_name=donor_source_name,
40694083
)
40704084

40714085
for donor_block_spec in donor_block_specs:
@@ -4960,6 +4974,8 @@ def _ensure_seed_entity_ids(
49604974
frame: pd.DataFrame,
49614975
*,
49624976
entities: set[EntityType],
4977+
frame_role: str | None = None,
4978+
donor_source_name: str | None = None,
49634979
) -> pd.DataFrame:
49644980
missing_columns = [
49654981
self._entity_key_column(entity)
@@ -4968,27 +4984,77 @@ def _ensure_seed_entity_ids(
49684984
and self._entity_key_column(entity) not in frame.columns
49694985
]
49704986
if not missing_columns:
4987+
_emit_us_pipeline_progress(
4988+
"US microplex donor integration: entity ids ready",
4989+
donor_source=donor_source_name,
4990+
frame=frame_role,
4991+
rows=len(frame),
4992+
status="already_present",
4993+
columns=_format_progress_values(
4994+
sorted(
4995+
self._entity_key_column(entity) or ""
4996+
for entity in entities
4997+
if entity is not EntityType.PERSON
4998+
)
4999+
),
5000+
)
49715001
return frame
5002+
started_at = time.perf_counter()
5003+
missing_column_set = set(missing_columns)
5004+
can_use_group_only_path = missing_column_set <= {"family_id", "spm_unit_id"}
5005+
method = (
5006+
"family_spm_only"
5007+
if can_use_group_only_path
5008+
else "policyengine_entity_bundle"
5009+
)
5010+
_emit_us_pipeline_progress(
5011+
"US microplex donor integration: entity ids start",
5012+
donor_source=donor_source_name,
5013+
frame=frame_role,
5014+
rows=len(frame),
5015+
missing_columns=_format_progress_values(missing_columns),
5016+
method=method,
5017+
)
49725018
working = frame.copy()
49735019
original_person_ids = working["person_id"].copy()
49745020
working["person_id"] = np.arange(len(working), dtype=np.int64)
49755021
if "household_id" in working.columns:
49765022
working["household_id"] = pd.factorize(working["household_id"])[0].astype(
49775023
np.int64
49785024
)
4979-
persons = self.build_policyengine_entity_tables(working).persons.copy()
5025+
else:
5026+
working["household_id"] = np.arange(len(working), dtype=np.int64)
5027+
if "age" not in working.columns:
5028+
working["age"] = 0
5029+
if can_use_group_only_path:
5030+
working["relationship_to_head"] = self._normalize_relationship_to_head(
5031+
working
5032+
)
5033+
persons = self._assign_family_and_spm_units(working).copy()
5034+
else:
5035+
persons = self.build_policyengine_entity_tables(working).persons.copy()
49805036
persons["source_person_id"] = original_person_ids.to_numpy()
49815037
mapping = persons[["source_person_id", *missing_columns]]
49825038
if mapping["source_person_id"].duplicated().any():
49835039
raise ValueError(
49845040
"PolicyEngine entity table build produced duplicate person mappings"
49855041
)
4986-
return frame.merge(
5042+
result = frame.merge(
49875043
mapping,
49885044
left_on="person_id",
49895045
right_on="source_person_id",
49905046
how="left",
49915047
).drop(columns=["source_person_id"])
5048+
_emit_us_pipeline_progress(
5049+
"US microplex donor integration: entity ids complete",
5050+
donor_source=donor_source_name,
5051+
frame=frame_role,
5052+
rows=len(result),
5053+
added_columns=_format_progress_values(missing_columns),
5054+
method=method,
5055+
elapsed_seconds=f"{time.perf_counter() - started_at:.3f}",
5056+
)
5057+
return result
49925058

49935059
def _strip_generated_entity_ids(
49945060
self,

tests/pipelines/test_us.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8069,6 +8069,13 @@ def generate(self, frame, seed=None):
80698069
cps_input = pipeline.prepare_source_input(cps_frame)
80708070
donor_input = pipeline.prepare_source_input(donor_frame)
80718071
seed_data = pipeline.prepare_seed_data_from_source(cps_input)
8072+
monkeypatch.setattr(
8073+
pipeline,
8074+
"build_policyengine_entity_tables",
8075+
lambda _population: pytest.fail(
8076+
"SPM-only donor projection should not build full entity tables"
8077+
),
8078+
)
80728079

80738080
integration = pipeline._integrate_donor_sources(
80748081
seed_data,

0 commit comments

Comments
 (0)