Skip to content

Commit 960ac2f

Browse files
committed
Instrument post-donor build stages
1 parent c042f27 commit 960ac2f

1 file changed

Lines changed: 79 additions & 0 deletions

File tree

  • src/microplex_us/pipelines

src/microplex_us/pipelines/us.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1744,18 +1744,51 @@ def build_from_frames(
17441744
scaffold_input=scaffold_input,
17451745
)
17461746
seed_data = self._apply_dependent_tax_leaf_soft_caps(seed_data)
1747+
_emit_us_pipeline_progress(
1748+
"US microplex build: seed ready",
1749+
scaffold_source=scaffold_input.frame.source.name,
1750+
sources=_format_progress_values(fusion_plan.source_names),
1751+
rows=int(len(seed_data)),
1752+
columns=int(len(seed_data.columns)),
1753+
donor_integrated_variables=int(
1754+
len(donor_integration["integrated_variables"])
1755+
),
1756+
)
1757+
_emit_us_pipeline_progress(
1758+
"US microplex build: targets start",
1759+
rows=int(len(seed_data)),
1760+
)
17471761
targets = self.build_targets(seed_data)
1762+
_emit_us_pipeline_progress(
1763+
"US microplex build: targets complete",
1764+
marginal_targets=int(len(targets.marginal)),
1765+
continuous_targets=int(len(targets.continuous)),
1766+
)
17481767
synthesis_variables = self._resolve_synthesis_variables(
17491768
scaffold_input,
17501769
fusion_plan=fusion_plan,
17511770
include_all_observed_targets=len(source_inputs) > 1,
17521771
available_columns=set(seed_data.columns),
17531772
observed_frame=seed_data,
17541773
)
1774+
_emit_us_pipeline_progress(
1775+
"US microplex build: synthesis variables ready",
1776+
condition_vars=int(len(synthesis_variables.condition_vars)),
1777+
target_vars=int(len(synthesis_variables.target_vars)),
1778+
)
1779+
_emit_us_pipeline_progress(
1780+
"US microplex build: synthesis start",
1781+
rows=int(len(seed_data)),
1782+
)
17551783
synthetic_data, synthesizer, synthesis_metadata = self.synthesize(
17561784
seed_data,
17571785
synthesis_variables=synthesis_variables,
17581786
)
1787+
_emit_us_pipeline_progress(
1788+
"US microplex build: synthesis complete",
1789+
rows=int(len(synthetic_data)),
1790+
columns=int(len(synthetic_data.columns)),
1791+
)
17591792
synthesis_metadata = {
17601793
**synthesis_metadata,
17611794
"source_names": fusion_plan.source_names,
@@ -1776,19 +1809,65 @@ def build_from_frames(
17761809
set(seed_data.columns)
17771810
),
17781811
}
1812+
_emit_us_pipeline_progress(
1813+
"US microplex build: support enforcement start",
1814+
rows=int(len(synthetic_data)),
1815+
)
17791816
synthetic_data = self.ensure_target_support(synthetic_data, seed_data, targets)
1817+
_emit_us_pipeline_progress(
1818+
"US microplex build: support enforcement complete",
1819+
rows=int(len(synthetic_data)),
1820+
columns=int(len(synthetic_data.columns)),
1821+
)
17801822
if self.config.policyengine_targets_db is not None:
1823+
_emit_us_pipeline_progress(
1824+
"US microplex build: policyengine tables start",
1825+
rows=int(len(synthetic_data)),
1826+
)
17811827
synthetic_tables = self.build_policyengine_entity_tables(synthetic_data)
1828+
_emit_us_pipeline_progress(
1829+
"US microplex build: policyengine tables complete",
1830+
households=int(len(synthetic_tables.households)),
1831+
persons=int(len(synthetic_tables.persons)),
1832+
)
1833+
_emit_us_pipeline_progress(
1834+
"US microplex build: policyengine calibration start",
1835+
backend=self.config.calibration_backend,
1836+
)
17821837
(
17831838
policyengine_tables,
17841839
calibrated_data,
17851840
calibration_summary,
17861841
) = self.calibrate_policyengine_tables(synthetic_tables)
1842+
_emit_us_pipeline_progress(
1843+
"US microplex build: policyengine calibration complete",
1844+
backend=self.config.calibration_backend,
1845+
calibrated_rows=int(len(calibrated_data)),
1846+
)
17871847
else:
1848+
_emit_us_pipeline_progress(
1849+
"US microplex build: calibration start",
1850+
backend=self.config.calibration_backend,
1851+
rows=int(len(synthetic_data)),
1852+
)
17881853
calibrated_data, calibration_summary = self.calibrate(
17891854
synthetic_data, targets
17901855
)
1856+
_emit_us_pipeline_progress(
1857+
"US microplex build: calibration complete",
1858+
backend=self.config.calibration_backend,
1859+
calibrated_rows=int(len(calibrated_data)),
1860+
)
1861+
_emit_us_pipeline_progress(
1862+
"US microplex build: policyengine tables start",
1863+
rows=int(len(calibrated_data)),
1864+
)
17911865
policyengine_tables = self.build_policyengine_entity_tables(calibrated_data)
1866+
_emit_us_pipeline_progress(
1867+
"US microplex build: policyengine tables complete",
1868+
households=int(len(policyengine_tables.households)),
1869+
persons=int(len(policyengine_tables.persons)),
1870+
)
17921871

17931872
return USMicroplexBuildResult(
17941873
config=self.config,

0 commit comments

Comments
 (0)