|
| 1 | +"""Legacy-compatible tax-benefit cliff macro output.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +from pydantic import BaseModel |
| 6 | + |
| 7 | +from policyengine.core import Output, Simulation |
| 8 | +from policyengine.outputs.aggregate import ( |
| 9 | + get_aggregate_variable, |
| 10 | + get_output_entity_data, |
| 11 | + require_output_column, |
| 12 | +) |
| 13 | +from policyengine.outputs.extra_variables import add_extra_variables |
| 14 | + |
| 15 | +CLIFF_IMPACT_VARIABLES = ("cliff_gap", "is_on_cliff", "is_adult") |
| 16 | + |
| 17 | + |
| 18 | +class CliffImpactInSimulation(BaseModel): |
| 19 | + cliff_gap: float |
| 20 | + cliff_share: float |
| 21 | + |
| 22 | + |
| 23 | +class CliffImpact(Output): |
| 24 | + baseline: CliffImpactInSimulation |
| 25 | + reform: CliffImpactInSimulation |
| 26 | + |
| 27 | + |
| 28 | +def _cliff_variables_by_entity( |
| 29 | + simulation: Simulation, |
| 30 | +) -> dict[str, list[str]]: |
| 31 | + variables_by_entity: dict[str, list[str]] = {} |
| 32 | + for variable_name in CLIFF_IMPACT_VARIABLES: |
| 33 | + variable = get_aggregate_variable( |
| 34 | + simulation, |
| 35 | + variable_name, |
| 36 | + "CliffImpact.extra_variables", |
| 37 | + ) |
| 38 | + variables_by_entity.setdefault(variable.entity, []).append(variable_name) |
| 39 | + return variables_by_entity |
| 40 | + |
| 41 | + |
| 42 | +def configure_cliff_impact_variables(*simulations: Simulation) -> None: |
| 43 | + """Materialize cliff columns only for analyses that request them.""" |
| 44 | + for simulation in simulations: |
| 45 | + add_extra_variables( |
| 46 | + simulation, |
| 47 | + _cliff_variables_by_entity(simulation), |
| 48 | + ) |
| 49 | + |
| 50 | + |
| 51 | +def _sum_output_variable( |
| 52 | + simulation: Simulation, |
| 53 | + variable_name: str, |
| 54 | +) -> float: |
| 55 | + context = f"CliffImpact.{variable_name}" |
| 56 | + variable = get_aggregate_variable(simulation, variable_name, context) |
| 57 | + data = get_output_entity_data(simulation, variable.entity, context) |
| 58 | + require_output_column( |
| 59 | + data, |
| 60 | + variable_name, |
| 61 | + variable.entity, |
| 62 | + simulation, |
| 63 | + context, |
| 64 | + ) |
| 65 | + return float(data[variable_name].sum()) |
| 66 | + |
| 67 | + |
| 68 | +def _calculate_cliff_impact_in_simulation( |
| 69 | + simulation: Simulation, |
| 70 | +) -> CliffImpactInSimulation: |
| 71 | + cliff_gap = _sum_output_variable(simulation, "cliff_gap") |
| 72 | + people_on_cliffs = _sum_output_variable(simulation, "is_on_cliff") |
| 73 | + adults = _sum_output_variable(simulation, "is_adult") |
| 74 | + |
| 75 | + return CliffImpactInSimulation( |
| 76 | + cliff_gap=cliff_gap, |
| 77 | + cliff_share=float(people_on_cliffs / adults), |
| 78 | + ) |
| 79 | + |
| 80 | + |
| 81 | +def calculate_cliff_impact( |
| 82 | + baseline_simulation: Simulation, |
| 83 | + reform_simulation: Simulation, |
| 84 | +) -> CliffImpact: |
| 85 | + """Calculate legacy macro cliff output from materialized simulations.""" |
| 86 | + return CliffImpact( |
| 87 | + baseline=_calculate_cliff_impact_in_simulation(baseline_simulation), |
| 88 | + reform=_calculate_cliff_impact_in_simulation(reform_simulation), |
| 89 | + ) |
0 commit comments