@@ -101,10 +101,51 @@ def _run_greg(run_dir: Path):
101101 return weights_path , elapsed
102102
103103
104+ def _collapse_ipf_rows_to_unit_weights (
105+ raw_weights : pd .DataFrame , n_units : int
106+ ) -> np .ndarray :
107+ """Validate a per-row IPF output and collapse it to a length-`n_units` vector.
108+
109+ surveysd::ipf with `meanHH = TRUE` guarantees every row that shares a
110+ `unit_index` carries the same fitted weight; the spread check keeps that
111+ assumption honest.
112+ """
113+ if "unit_index" not in raw_weights .columns :
114+ raise RuntimeError ("IPF runner output must include a unit_index column" )
115+ if raw_weights ["unit_index" ].isna ().any ():
116+ raise RuntimeError ("IPF runner output contains missing unit_index values" )
117+
118+ raw_weights = raw_weights .copy ()
119+ raw_weights ["unit_index" ] = raw_weights ["unit_index" ].astype (np .int64 )
120+ if (raw_weights ["unit_index" ] < 0 ).any () or (
121+ raw_weights ["unit_index" ] >= n_units
122+ ).any ():
123+ raise RuntimeError ("IPF runner output contains out-of-range unit_index values" )
124+
125+ per_unit_spread = raw_weights .groupby ("unit_index" , sort = True )["fitted_weight" ].agg (
126+ lambda series : float (series .max () - series .min ())
127+ )
128+ if (per_unit_spread > 1e-9 ).any ():
129+ raise RuntimeError (
130+ "IPF runner produced inconsistent fitted weights within the same unit_index"
131+ )
132+
133+ weights_by_unit = (
134+ raw_weights .groupby ("unit_index" , sort = True )["fitted_weight" ]
135+ .first ()
136+ .reindex (np .arange (n_units , dtype = np .int64 ))
137+ )
138+ if weights_by_unit .isna ().any ():
139+ raise RuntimeError (
140+ "Aggregated IPF weights do not cover the full benchmark unit range"
141+ )
142+ return weights_by_unit .to_numpy (dtype = np .float64 )
143+
144+
104145def _run_ipf (run_dir : Path ):
146+ """Run one coherent IPF problem in a single `surveysd::ipf` call."""
105147 inputs = run_dir / "inputs"
106148 outputs = run_dir / "outputs"
107- temp_csv = outputs / "_ipf_weights.csv"
108149
109150 with open (inputs / "benchmark_manifest.json" ) as f :
110151 manifest = json .load (f )
@@ -116,69 +157,97 @@ def _run_ipf(run_dir: Path):
116157 "IPF run requires inputs/ipf_target_metadata.csv. "
117158 "Provide external_inputs.ipf_target_metadata_csv in the manifest."
118159 )
160+ unit_metadata_path = inputs / "unit_metadata.csv"
161+ if not unit_metadata_path .exists ():
162+ raise FileNotFoundError ("IPF run requires inputs/unit_metadata.csv." )
163+
164+ full_targets = pd .read_csv (target_metadata_path )
165+ if full_targets .empty :
166+ raise RuntimeError ("IPF target metadata is empty; nothing to run." )
167+ unit_metadata = pd .read_csv (unit_metadata_path )
168+ if "unit_index" not in unit_metadata .columns :
169+ raise RuntimeError ("Unit metadata must include a unit_index column for IPF" )
170+
171+ weight_col = str (options .get ("weight_col" , "base_weight" ))
172+ household_id_col = str (options .get ("household_id_col" , "household_id" ))
173+
174+ initial_weights = np .load (inputs / "initial_weights.npy" ).astype (np .float64 )
175+ n_units = len (initial_weights )
176+ unit_indices = unit_metadata ["unit_index" ].astype (np .int64 ).to_numpy ()
177+ if unit_indices .min () < 0 or unit_indices .max () >= n_units :
178+ raise RuntimeError (
179+ "Unit metadata unit_index values fall outside the initial weight vector"
180+ )
181+ temp_csv = outputs / "_ipf_weights.csv"
182+ unit_with_weights = unit_metadata .copy ()
183+ unit_with_weights [weight_col ] = initial_weights [unit_indices ]
184+ temp_unit_csv = outputs / "_ipf_unit_metadata.csv"
185+ unit_with_weights .to_csv (temp_unit_csv , index = False )
119186
120187 cmd = [
121188 "Rscript" ,
122189 str (RUNNERS_DIR / "ipf_runner.R" ),
123- str (inputs / "unit_metadata.csv" ),
190+ str (temp_unit_csv ),
124191 str (target_metadata_path ),
125192 str (inputs / "initial_weights.npy" ),
126193 str (temp_csv ),
127194 str (int (options .get ("max_iter" , 200 ))),
128195 str (float (options .get ("bound" , 4.0 ))),
129196 str (float (options .get ("epsP" , 1e-6 ))),
130197 str (float (options .get ("epsH" , 1e-2 ))),
131- str ( options . get ( " household_id_col" , "household_id" )) ,
132- str ( options . get ( " weight_col" , "base_weight" )) ,
198+ household_id_col ,
199+ weight_col ,
133200 ]
134201 proc , elapsed = _run_subprocess (cmd )
135202 if proc .returncode != 0 :
136203 raise RuntimeError (f"IPF runner failed with exit code { proc .returncode } " )
137204
138205 raw_weights = pd .read_csv (temp_csv )
139- if "unit_index" not in raw_weights .columns :
140- raise RuntimeError ("IPF runner output must include a unit_index column" )
141- if raw_weights ["unit_index" ].isna ().any ():
142- raise RuntimeError ("IPF runner output contains missing unit_index values" )
143-
144- raw_weights ["unit_index" ] = raw_weights ["unit_index" ].astype (np .int64 )
145- n_units = len (np .load (inputs / "initial_weights.npy" ))
146- if (raw_weights ["unit_index" ] < 0 ).any () or (
147- raw_weights ["unit_index" ] >= n_units
148- ).any ():
149- raise RuntimeError ("IPF runner output contains out-of-range unit_index values" )
150-
151- per_unit_spread = raw_weights .groupby ("unit_index" , sort = True )["fitted_weight" ].agg (
152- lambda series : float (series .max () - series .min ())
153- )
154- inconsistent_units = per_unit_spread [per_unit_spread > 1e-9 ]
155- if not inconsistent_units .empty :
156- raise RuntimeError (
157- "IPF runner produced inconsistent fitted weights within the same unit_index"
158- )
159-
160- weights_by_unit = (
161- raw_weights .groupby ("unit_index" , sort = True )["fitted_weight" ]
162- .first ()
163- .reindex (np .arange (n_units , dtype = np .int64 ))
164- )
165- if weights_by_unit .isna ().any ():
166- raise RuntimeError (
167- "Aggregated IPF weights do not cover the full benchmark unit range"
168- )
169- weights = weights_by_unit .to_numpy (dtype = np .float64 )
206+ current_weights = _collapse_ipf_rows_to_unit_weights (raw_weights , n_units )
170207 weights_path = outputs / "fitted_weights.npy"
171- np .save (weights_path , weights )
208+ np .save (weights_path , current_weights )
172209 temp_csv .unlink (missing_ok = True )
210+ temp_unit_csv .unlink (missing_ok = True )
173211 return weights_path , elapsed
174212
175213
214+ def _select_scoring_inputs (
215+ run_dir : Path , method : str , score_on : str
216+ ) -> tuple [Path , Path , str ]:
217+ inputs = run_dir / "inputs"
218+ ipf_targets = inputs / "ipf_scoring_target_metadata.csv"
219+ ipf_matrix = inputs / "ipf_scoring_X_targets_by_units.mtx"
220+ has_ipf_scoring = ipf_targets .exists () and ipf_matrix .exists ()
221+
222+ if score_on == "ipf_retained_authored" :
223+ if not has_ipf_scoring :
224+ raise FileNotFoundError (
225+ "Requested score_on=ipf_retained_authored, but "
226+ "inputs/ipf_scoring_target_metadata.csv and "
227+ "inputs/ipf_scoring_X_targets_by_units.mtx are not both present."
228+ )
229+ return ipf_targets , ipf_matrix , "ipf_retained_authored"
230+
231+ if score_on == "auto" and method == "ipf" and has_ipf_scoring :
232+ return ipf_targets , ipf_matrix , "ipf_retained_authored"
233+ return (
234+ inputs / "target_metadata.csv" ,
235+ inputs / "X_targets_by_units.mtx" ,
236+ "shared_requested" ,
237+ )
238+
239+
176240def cmd_run (args ):
177241 run_dir = Path (args .run_dir )
178242 inputs = run_dir / "inputs"
179243 outputs = run_dir / "outputs"
180244 outputs .mkdir (parents = True , exist_ok = True )
181- targets_df = load_targets_csv (inputs / "target_metadata.csv" )
245+ targets_path , matrix_path , scoring_target_set = _select_scoring_inputs (
246+ run_dir ,
247+ args .method ,
248+ getattr (args , "score_on" , "auto" ),
249+ )
250+ targets_df = load_targets_csv (targets_path )
182251
183252 started = time .time ()
184253 if args .method == "l0" :
@@ -195,11 +264,12 @@ def cmd_run(args):
195264 summary = compute_common_metrics (
196265 weights = weights ,
197266 targets_df = targets_df ,
198- matrix_path = inputs / "X_targets_by_units.mtx" ,
267+ matrix_path = matrix_path ,
199268 )
200269 summary ["method" ] = args .method
201270 summary ["run_dir" ] = str (run_dir .resolve ())
202271 summary ["runtime_seconds" ] = elapsed
272+ summary ["scoring_target_set" ] = scoring_target_set
203273 write_method_summary (summary , outputs / f"{ args .method } _summary.json" )
204274 print (json .dumps (summary , indent = 2 , sort_keys = True ))
205275 return 0
@@ -225,6 +295,16 @@ def build_parser():
225295 run_parser .add_argument (
226296 "--run-dir" , required = True , help = "Exported benchmark bundle directory"
227297 )
298+ run_parser .add_argument (
299+ "--score-on" ,
300+ default = "auto" ,
301+ choices = ["auto" , "shared_requested" , "ipf_retained_authored" ],
302+ help = (
303+ "Scoring target set. 'auto' uses IPF-retained-authored targets only "
304+ "for method=ipf when available; the other methods default to the "
305+ "shared requested target set unless explicitly overridden."
306+ ),
307+ )
228308 run_parser .set_defaults (func = cmd_run )
229309
230310 return parser
0 commit comments