From 74691d4f4e752b710f991a7a40de304265791de9 Mon Sep 17 00:00:00 2001 From: Dave Woodruff Date: Sun, 17 May 2026 15:13:49 -0700 Subject: [PATCH 1/2] incumbent spokes: optionally write solution on each new best (#285) Previously the first-stage solution was written only at end-of-run via WheelSpinner.write_first_stage_solution. For long runs, users want a snapshot every time an xhat-finder spoke discovers a new best inner bound -- e.g., so downstream tooling can pick up a usable solution before the cylinder system finishes. - Add CLI flag --incumbent-on-improvement-filename-prefix (mpisppy.utils.config.Config.popular_args). - Forward cfg.incumbent_on_improvement_filename_prefix through cfg_vanilla.shared_options so it lands in opt_kwargs.options. - In _BoundSpoke.update_if_improving, after a new best is broadcast, call self.opt.write_first_stage_solution to _.csv and _.npy, where is a per-spoke zero-padded counter starting at 0000. The writer self-gates on cylinder_rank == 0, so the helper is safe to call on every rank. - Fail soft: if write_first_stage_solution raises (e.g., for xhatter paths like xhatlooper / xhatshuffle that pass update_best_solution_cache=False and don't populate the spbase best-solution cache), warn once on rank 0 and disable further writes from that spoke. xhatxbar / xhatspecific (which use the default update_best_solution_cache=True) are the happy path. Fixes #285 Co-Authored-By: Claude Opus 4.7 (1M context) --- mpisppy/cylinders/spoke.py | 33 +++++++++++++++++++++++++++++++++ mpisppy/utils/cfg_vanilla.py | 4 ++++ mpisppy/utils/config.py | 10 ++++++++++ 3 files changed, 47 insertions(+) diff --git a/mpisppy/cylinders/spoke.py b/mpisppy/cylinders/spoke.py index eada9624c..1c10db59f 100644 --- a/mpisppy/cylinders/spoke.py +++ b/mpisppy/cylinders/spoke.py @@ -11,9 +11,11 @@ import time import os import math +import warnings from mpisppy.cylinders.spcommunicator import SPCommunicator, SendCircularBuffer from mpisppy.cylinders.spwindow import Field +from mpisppy.utils import sputils class Spoke(SPCommunicator): @@ -161,6 +163,8 @@ def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, options= self.is_minimizing = self.opt.is_minimizing self.best_inner_bound = math.inf if self.is_minimizing else -math.inf self.solver_options = None # can be overwritten by derived classes + self._incumbent_write_counter = 0 + self._incumbent_write_disabled = False def register_send_fields(self): super().register_send_fields() @@ -186,9 +190,38 @@ def update_if_improving(self, candidate_inner_bound, update_best_solution_cache= # send to hub self.send_bound(candidate_inner_bound) self.send_best_xhat() + self._maybe_write_incumbent_on_improvement() return True return False + def _maybe_write_incumbent_on_improvement(self): + """ If --incumbent-on-improvement-filename-prefix is set, write the + current first-stage solution to _.csv and + _.npy. Fails soft: if no first-stage solution is + available (e.g. an xhatter spoke that bypasses the best-solution + cache), warn once on rank 0 and stop trying. + """ + prefix = self.opt.options.get("incumbent_on_improvement_filename_prefix") + if prefix is None or self._incumbent_write_disabled: + return + counter = self._incumbent_write_counter + try: + self.opt.write_first_stage_solution(f"{prefix}_{counter:04d}.csv") + self.opt.write_first_stage_solution( + f"{prefix}_{counter:04d}.npy", + first_stage_solution_writer=sputils.first_stage_nonant_npy_serializer, + ) + except RuntimeError as exc: + if self.cylinder_rank == 0: + warnings.warn( + f"incumbent_on_improvement_filename_prefix is set but no " + f"first-stage solution was available on this update: {exc}. " + f"Disabling further per-improvement writes from this spoke." + ) + self._incumbent_write_disabled = True + return + self._incumbent_write_counter = counter + 1 + def send_best_xhat(self): best_xhat_buf = self.send_buffers[Field.BEST_XHAT] ci = 0 diff --git a/mpisppy/utils/cfg_vanilla.py b/mpisppy/utils/cfg_vanilla.py index 4cea09835..838b9c6c3 100644 --- a/mpisppy/utils/cfg_vanilla.py +++ b/mpisppy/utils/cfg_vanilla.py @@ -81,6 +81,10 @@ def shared_options(cfg, is_hub=False): # Optional initial xhat candidate file (.npy); None disables. # Consumed by XhatInnerBoundBase._try_file_xhat. "xhat_from_file" : cfg.get("xhat_from_file", None), + # Optional filename prefix; if set, _BoundSpoke.update_if_improving + # writes a first-stage solution snapshot on each new best incumbent. + "incumbent_on_improvement_filename_prefix" : cfg.get( + "incumbent_on_improvement_filename_prefix", None), } # The options-file (--solver-options-file) sits at the bottom of # axis 2: any CLI flags below override file entries at the same diff --git a/mpisppy/utils/config.py b/mpisppy/utils/config.py index e948363a7..d5785b8d2 100644 --- a/mpisppy/utils/config.py +++ b/mpisppy/utils/config.py @@ -295,6 +295,16 @@ def popular_args(self): domain=str, default=None) + self.add_to_config("incumbent_on_improvement_filename_prefix", + description="If set, incumbent (xhat) bound spokes " + "write the first-stage solution to " + "_.csv and _.npy " + "each time they find a new best inner bound. " + " is a zero-padded counter starting at " + "0000. Default None disables.", + domain=str, + default=None) + self.add_to_config("tee_rank0_solves", description="Some cylinders support tee of rank 0 solves." "(With multiple cylinders this could be confusing.)", From 386837f48a7ff35d570e278431e5b83f9d0dc0a6 Mon Sep 17 00:00:00 2001 From: Dave Woodruff Date: Sun, 17 May 2026 16:23:00 -0700 Subject: [PATCH 2/2] test: cover --incumbent-on-improvement-filename-prefix behavior MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds mpisppy/tests/test_incumbent_writing.py with 13 unit tests across three classes: - TestConfigRegistration: Config.popular_args() registers the new option with default None. - TestSharedOptionsForwarding: cfg_vanilla.shared_options forwards the prefix into the options dict on both hub and spoke surfaces. - TestMaybeWriteIncumbent: drives InnerBoundSpoke._maybe_write_incumbent_on_improvement against a SimpleNamespace stub to cover the no-op (prefix None / already disabled), happy-path (csv + npy writes, 4-digit zero-padded counter, counter advances across calls), and fail-soft branches (RuntimeError → rank-0 warning, disabled flag set, counter NOT incremented; silent on rank != 0; subsequent calls short-circuit). Wired into run_coverage.bash and .github/workflows/test_pr_and_main.yml unit-tests job so codecov/patch picks up the new patch lines. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/test_pr_and_main.yml | 3 +- mpisppy/tests/test_incumbent_writing.py | 213 ++++++++++++++++++++++++ run_coverage.bash | 3 + 3 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 mpisppy/tests/test_incumbent_writing.py diff --git a/.github/workflows/test_pr_and_main.yml b/.github/workflows/test_pr_and_main.yml index 6d75192f6..b0804b50d 100644 --- a/.github/workflows/test_pr_and_main.yml +++ b/.github/workflows/test_pr_and_main.yml @@ -1063,7 +1063,8 @@ jobs: mpisppy/tests/test_solver_spec.py \ mpisppy/tests/test_extensions.py \ mpisppy/tests/test_jensens.py \ - mpisppy/tests/test_proper_bundler.py + mpisppy/tests/test_proper_bundler.py \ + mpisppy/tests/test_incumbent_writing.py - name: Upload coverage data if: always() diff --git a/mpisppy/tests/test_incumbent_writing.py b/mpisppy/tests/test_incumbent_writing.py new file mode 100644 index 000000000..a3dba90c3 --- /dev/null +++ b/mpisppy/tests/test_incumbent_writing.py @@ -0,0 +1,213 @@ +############################################################################### +# mpi-sppy: MPI-based Stochastic Programming in PYthon +# +# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for +# Sustainable Energy, LLC, The Regents of the University of California, et al. +# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for +# full copyright and license information. +############################################################################### +"""Tests for the --incumbent-on-improvement-filename-prefix feature (issue #285). + +Covers three surfaces touched by the feature: + + 1. Config.popular_args() registers the option with default None. + 2. cfg_vanilla.shared_options forwards cfg.incumbent_on_improvement_filename_prefix + into the options dict consumed by spokes. + 3. InnerBoundSpoke._maybe_write_incumbent_on_improvement does the right thing + across the disabled / happy / fail-soft branches. + +The spoke tests call the unbound method against a SimpleNamespace stub so we +don't have to stand up MPI infrastructure to exercise pure file-IO control flow. +""" + +import types +import unittest +import warnings +from unittest.mock import MagicMock + +from mpisppy.cylinders.spoke import InnerBoundSpoke +from mpisppy.utils import sputils +from mpisppy.utils.config import Config + + +class TestConfigRegistration(unittest.TestCase): + """Config.popular_args must register the new option and default it to None.""" + + def test_option_is_registered(self): + cfg = Config() + cfg.popular_args() + self.assertIn("incumbent_on_improvement_filename_prefix", cfg) + + def test_option_defaults_to_none(self): + cfg = Config() + cfg.popular_args() + self.assertIsNone(cfg.incumbent_on_improvement_filename_prefix) + + +class TestSharedOptionsForwarding(unittest.TestCase): + """cfg_vanilla.shared_options must forward the prefix into the options + dict so InnerBoundSpoke can see it via self.opt.options.get(...).""" + + def _make_cfg(self, prefix=None): + cfg = Config() + cfg.popular_args() + cfg.solver_name = "gurobi" + if prefix is not None: + cfg.incumbent_on_improvement_filename_prefix = prefix + return cfg + + def test_prefix_forwarded_when_set(self): + import mpisppy.utils.cfg_vanilla as vanilla + cfg = self._make_cfg(prefix="/tmp/incumbent") + opts = vanilla.shared_options(cfg, is_hub=False) + self.assertEqual( + opts["incumbent_on_improvement_filename_prefix"], + "/tmp/incumbent", + ) + + def test_prefix_none_by_default(self): + import mpisppy.utils.cfg_vanilla as vanilla + cfg = self._make_cfg(prefix=None) + opts = vanilla.shared_options(cfg, is_hub=False) + self.assertIsNone(opts["incumbent_on_improvement_filename_prefix"]) + + def test_prefix_present_on_hub_too(self): + import mpisppy.utils.cfg_vanilla as vanilla + cfg = self._make_cfg(prefix="/tmp/hub_inc") + opts = vanilla.shared_options(cfg, is_hub=True) + self.assertEqual( + opts["incumbent_on_improvement_filename_prefix"], + "/tmp/hub_inc", + ) + + +class TestMaybeWriteIncumbent(unittest.TestCase): + """Behavioral tests for InnerBoundSpoke._maybe_write_incumbent_on_improvement. + + We call the method against a minimal stub (SimpleNamespace) rather than + construct a real spoke. The method only touches: + + self.opt.options.get(...) + self.opt.write_first_stage_solution(...) + self.cylinder_rank + self._incumbent_write_counter + self._incumbent_write_disabled + + so a stub with those attributes is enough. + """ + + def _stub(self, prefix=None, rank=0, disabled=False, counter=0): + stub = types.SimpleNamespace() + stub.cylinder_rank = rank + stub._incumbent_write_counter = counter + stub._incumbent_write_disabled = disabled + stub.opt = types.SimpleNamespace() + stub.opt.options = {"incumbent_on_improvement_filename_prefix": prefix} + stub.opt.write_first_stage_solution = MagicMock() + return stub + + def _run(self, stub): + return InnerBoundSpoke._maybe_write_incumbent_on_improvement(stub) + + # ---- disabled branches ---- + + def test_noop_when_prefix_is_none(self): + stub = self._stub(prefix=None) + self._run(stub) + stub.opt.write_first_stage_solution.assert_not_called() + self.assertEqual(stub._incumbent_write_counter, 0) + self.assertFalse(stub._incumbent_write_disabled) + + def test_noop_when_already_disabled(self): + stub = self._stub(prefix="/tmp/p", disabled=True) + self._run(stub) + stub.opt.write_first_stage_solution.assert_not_called() + # counter untouched, disabled stays True + self.assertEqual(stub._incumbent_write_counter, 0) + self.assertTrue(stub._incumbent_write_disabled) + + # ---- happy path ---- + + def test_happy_path_writes_csv_and_npy_and_bumps_counter(self): + stub = self._stub(prefix="/tmp/inc", counter=0) + self._run(stub) + + # csv goes first, with default writer (no kwarg) + call_csv, call_npy = stub.opt.write_first_stage_solution.call_args_list + self.assertEqual(call_csv.args, ("/tmp/inc_0000.csv",)) + self.assertEqual(call_csv.kwargs, {}) + + # npy follows with the explicit npy serializer + self.assertEqual(call_npy.args, ("/tmp/inc_0000.npy",)) + self.assertEqual( + call_npy.kwargs, + {"first_stage_solution_writer": sputils.first_stage_nonant_npy_serializer}, + ) + + self.assertEqual(stub._incumbent_write_counter, 1) + self.assertFalse(stub._incumbent_write_disabled) + + def test_counter_zero_pads_to_four_digits(self): + stub = self._stub(prefix="/tmp/inc", counter=42) + self._run(stub) + names = [c.args[0] for c in stub.opt.write_first_stage_solution.call_args_list] + self.assertEqual(names, ["/tmp/inc_0042.csv", "/tmp/inc_0042.npy"]) + self.assertEqual(stub._incumbent_write_counter, 43) + + def test_repeated_calls_increment_counter(self): + stub = self._stub(prefix="/tmp/inc", counter=0) + self._run(stub) + self._run(stub) + self._run(stub) + names = [c.args[0] for c in stub.opt.write_first_stage_solution.call_args_list] + # 3 calls × 2 files each = 6 filenames, counters 0/0/1/1/2/2 + self.assertEqual(names, [ + "/tmp/inc_0000.csv", "/tmp/inc_0000.npy", + "/tmp/inc_0001.csv", "/tmp/inc_0001.npy", + "/tmp/inc_0002.csv", "/tmp/inc_0002.npy", + ]) + self.assertEqual(stub._incumbent_write_counter, 3) + + # ---- fail-soft branches ---- + + def test_runtime_error_warns_on_rank0_and_disables(self): + stub = self._stub(prefix="/tmp/inc", rank=0, counter=5) + stub.opt.write_first_stage_solution.side_effect = RuntimeError( + "No first stage solution available" + ) + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + # must not propagate + self._run(stub) + self.assertTrue(stub._incumbent_write_disabled) + # counter must NOT be incremented — the increment is past the try block + self.assertEqual(stub._incumbent_write_counter, 5) + # exactly one warning, and it mentions the option name + self.assertEqual(len(w), 1) + self.assertIn("incumbent_on_improvement_filename_prefix", str(w[0].message)) + self.assertIn("Disabling", str(w[0].message)) + + def test_runtime_error_silent_on_nonzero_rank(self): + stub = self._stub(prefix="/tmp/inc", rank=1, counter=0) + stub.opt.write_first_stage_solution.side_effect = RuntimeError("boom") + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + self._run(stub) + # disabled flag still set so future calls short-circuit on every rank + self.assertTrue(stub._incumbent_write_disabled) + # but no warning printed on non-rank-0 + self.assertEqual(len(w), 0) + + def test_disabled_after_failure_makes_next_call_noop(self): + stub = self._stub(prefix="/tmp/inc", rank=0) + stub.opt.write_first_stage_solution.side_effect = RuntimeError("nope") + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + self._run(stub) # first call: trips fail-soft + stub.opt.write_first_stage_solution.reset_mock() + self._run(stub) # second call: short-circuits before writer + stub.opt.write_first_stage_solution.assert_not_called() + + +if __name__ == "__main__": + unittest.main() diff --git a/run_coverage.bash b/run_coverage.bash index 2e7b166f5..01d63dfc6 100755 --- a/run_coverage.bash +++ b/run_coverage.bash @@ -109,6 +109,9 @@ run_phase "test_jensens (serial)" \ run_phase "test_xhat_from_file (serial)" \ coverage run --rcfile=.coveragerc -m pytest mpisppy/tests/test_xhat_from_file.py -v +run_phase "test_incumbent_writing (serial)" \ + coverage run --rcfile=.coveragerc -m pytest mpisppy/tests/test_incumbent_writing.py -v + run_phase "test_conf_int_farmer (spawns mpiexec)" \ coverage run --rcfile=.coveragerc mpisppy/tests/test_conf_int_farmer.py