Skip to content

Commit b5edc38

Browse files
DLWoodruffclaude
andauthored
incumbent spokes: optionally write solution on each new best (#285) (#709)
* incumbent spokes: optionally write solution on each new best (#285) Previously the first-stage solution was written only at end-of-run via WheelSpinner.write_first_stage_solution. For long runs, users want a snapshot every time an xhat-finder spoke discovers a new best inner bound -- e.g., so downstream tooling can pick up a usable solution before the cylinder system finishes. - Add CLI flag --incumbent-on-improvement-filename-prefix <prefix> (mpisppy.utils.config.Config.popular_args). - Forward cfg.incumbent_on_improvement_filename_prefix through cfg_vanilla.shared_options so it lands in opt_kwargs.options. - In _BoundSpoke.update_if_improving, after a new best is broadcast, call self.opt.write_first_stage_solution to <prefix>_<NNNN>.csv and <prefix>_<NNNN>.npy, where <NNNN> is a per-spoke zero-padded counter starting at 0000. The writer self-gates on cylinder_rank == 0, so the helper is safe to call on every rank. - Fail soft: if write_first_stage_solution raises (e.g., for xhatter paths like xhatlooper / xhatshuffle that pass update_best_solution_cache=False and don't populate the spbase best-solution cache), warn once on rank 0 and disable further writes from that spoke. xhatxbar / xhatspecific (which use the default update_best_solution_cache=True) are the happy path. Fixes #285 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test: cover --incumbent-on-improvement-filename-prefix behavior Adds mpisppy/tests/test_incumbent_writing.py with 13 unit tests across three classes: - TestConfigRegistration: Config.popular_args() registers the new option with default None. - TestSharedOptionsForwarding: cfg_vanilla.shared_options forwards the prefix into the options dict on both hub and spoke surfaces. - TestMaybeWriteIncumbent: drives InnerBoundSpoke._maybe_write_incumbent_on_improvement against a SimpleNamespace stub to cover the no-op (prefix None / already disabled), happy-path (csv + npy writes, 4-digit zero-padded counter, counter advances across calls), and fail-soft branches (RuntimeError → rank-0 warning, disabled flag set, counter NOT incremented; silent on rank != 0; subsequent calls short-circuit). Wired into run_coverage.bash and .github/workflows/test_pr_and_main.yml unit-tests job so codecov/patch picks up the new patch lines. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent bbe8fba commit b5edc38

6 files changed

Lines changed: 265 additions & 1 deletion

File tree

.github/workflows/test_pr_and_main.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,8 @@ jobs:
10631063
mpisppy/tests/test_solver_spec.py \
10641064
mpisppy/tests/test_extensions.py \
10651065
mpisppy/tests/test_jensens.py \
1066-
mpisppy/tests/test_proper_bundler.py
1066+
mpisppy/tests/test_proper_bundler.py \
1067+
mpisppy/tests/test_incumbent_writing.py
10671068
10681069
- name: Upload coverage data
10691070
if: always()

mpisppy/cylinders/spoke.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
import time
1212
import os
1313
import math
14+
import warnings
1415

1516
from mpisppy.cylinders.spcommunicator import SPCommunicator, SendCircularBuffer
1617
from mpisppy.cylinders.spwindow import Field
18+
from mpisppy.utils import sputils
1719

1820

1921
class Spoke(SPCommunicator):
@@ -161,6 +163,8 @@ def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, options=
161163
self.is_minimizing = self.opt.is_minimizing
162164
self.best_inner_bound = math.inf if self.is_minimizing else -math.inf
163165
self.solver_options = None # can be overwritten by derived classes
166+
self._incumbent_write_counter = 0
167+
self._incumbent_write_disabled = False
164168

165169
def register_send_fields(self):
166170
super().register_send_fields()
@@ -186,9 +190,38 @@ def update_if_improving(self, candidate_inner_bound, update_best_solution_cache=
186190
# send to hub
187191
self.send_bound(candidate_inner_bound)
188192
self.send_best_xhat()
193+
self._maybe_write_incumbent_on_improvement()
189194
return True
190195
return False
191196

197+
def _maybe_write_incumbent_on_improvement(self):
198+
""" If --incumbent-on-improvement-filename-prefix is set, write the
199+
current first-stage solution to <prefix>_<NNNN>.csv and
200+
<prefix>_<NNNN>.npy. Fails soft: if no first-stage solution is
201+
available (e.g. an xhatter spoke that bypasses the best-solution
202+
cache), warn once on rank 0 and stop trying.
203+
"""
204+
prefix = self.opt.options.get("incumbent_on_improvement_filename_prefix")
205+
if prefix is None or self._incumbent_write_disabled:
206+
return
207+
counter = self._incumbent_write_counter
208+
try:
209+
self.opt.write_first_stage_solution(f"{prefix}_{counter:04d}.csv")
210+
self.opt.write_first_stage_solution(
211+
f"{prefix}_{counter:04d}.npy",
212+
first_stage_solution_writer=sputils.first_stage_nonant_npy_serializer,
213+
)
214+
except RuntimeError as exc:
215+
if self.cylinder_rank == 0:
216+
warnings.warn(
217+
f"incumbent_on_improvement_filename_prefix is set but no "
218+
f"first-stage solution was available on this update: {exc}. "
219+
f"Disabling further per-improvement writes from this spoke."
220+
)
221+
self._incumbent_write_disabled = True
222+
return
223+
self._incumbent_write_counter = counter + 1
224+
192225
def send_best_xhat(self):
193226
best_xhat_buf = self.send_buffers[Field.BEST_XHAT]
194227
ci = 0
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
###############################################################################
2+
# mpi-sppy: MPI-based Stochastic Programming in PYthon
3+
#
4+
# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for
5+
# Sustainable Energy, LLC, The Regents of the University of California, et al.
6+
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for
7+
# full copyright and license information.
8+
###############################################################################
9+
"""Tests for the --incumbent-on-improvement-filename-prefix feature (issue #285).
10+
11+
Covers three surfaces touched by the feature:
12+
13+
1. Config.popular_args() registers the option with default None.
14+
2. cfg_vanilla.shared_options forwards cfg.incumbent_on_improvement_filename_prefix
15+
into the options dict consumed by spokes.
16+
3. InnerBoundSpoke._maybe_write_incumbent_on_improvement does the right thing
17+
across the disabled / happy / fail-soft branches.
18+
19+
The spoke tests call the unbound method against a SimpleNamespace stub so we
20+
don't have to stand up MPI infrastructure to exercise pure file-IO control flow.
21+
"""
22+
23+
import types
24+
import unittest
25+
import warnings
26+
from unittest.mock import MagicMock
27+
28+
from mpisppy.cylinders.spoke import InnerBoundSpoke
29+
from mpisppy.utils import sputils
30+
from mpisppy.utils.config import Config
31+
32+
33+
class TestConfigRegistration(unittest.TestCase):
34+
"""Config.popular_args must register the new option and default it to None."""
35+
36+
def test_option_is_registered(self):
37+
cfg = Config()
38+
cfg.popular_args()
39+
self.assertIn("incumbent_on_improvement_filename_prefix", cfg)
40+
41+
def test_option_defaults_to_none(self):
42+
cfg = Config()
43+
cfg.popular_args()
44+
self.assertIsNone(cfg.incumbent_on_improvement_filename_prefix)
45+
46+
47+
class TestSharedOptionsForwarding(unittest.TestCase):
48+
"""cfg_vanilla.shared_options must forward the prefix into the options
49+
dict so InnerBoundSpoke can see it via self.opt.options.get(...)."""
50+
51+
def _make_cfg(self, prefix=None):
52+
cfg = Config()
53+
cfg.popular_args()
54+
cfg.solver_name = "gurobi"
55+
if prefix is not None:
56+
cfg.incumbent_on_improvement_filename_prefix = prefix
57+
return cfg
58+
59+
def test_prefix_forwarded_when_set(self):
60+
import mpisppy.utils.cfg_vanilla as vanilla
61+
cfg = self._make_cfg(prefix="/tmp/incumbent")
62+
opts = vanilla.shared_options(cfg, is_hub=False)
63+
self.assertEqual(
64+
opts["incumbent_on_improvement_filename_prefix"],
65+
"/tmp/incumbent",
66+
)
67+
68+
def test_prefix_none_by_default(self):
69+
import mpisppy.utils.cfg_vanilla as vanilla
70+
cfg = self._make_cfg(prefix=None)
71+
opts = vanilla.shared_options(cfg, is_hub=False)
72+
self.assertIsNone(opts["incumbent_on_improvement_filename_prefix"])
73+
74+
def test_prefix_present_on_hub_too(self):
75+
import mpisppy.utils.cfg_vanilla as vanilla
76+
cfg = self._make_cfg(prefix="/tmp/hub_inc")
77+
opts = vanilla.shared_options(cfg, is_hub=True)
78+
self.assertEqual(
79+
opts["incumbent_on_improvement_filename_prefix"],
80+
"/tmp/hub_inc",
81+
)
82+
83+
84+
class TestMaybeWriteIncumbent(unittest.TestCase):
85+
"""Behavioral tests for InnerBoundSpoke._maybe_write_incumbent_on_improvement.
86+
87+
We call the method against a minimal stub (SimpleNamespace) rather than
88+
construct a real spoke. The method only touches:
89+
90+
self.opt.options.get(...)
91+
self.opt.write_first_stage_solution(...)
92+
self.cylinder_rank
93+
self._incumbent_write_counter
94+
self._incumbent_write_disabled
95+
96+
so a stub with those attributes is enough.
97+
"""
98+
99+
def _stub(self, prefix=None, rank=0, disabled=False, counter=0):
100+
stub = types.SimpleNamespace()
101+
stub.cylinder_rank = rank
102+
stub._incumbent_write_counter = counter
103+
stub._incumbent_write_disabled = disabled
104+
stub.opt = types.SimpleNamespace()
105+
stub.opt.options = {"incumbent_on_improvement_filename_prefix": prefix}
106+
stub.opt.write_first_stage_solution = MagicMock()
107+
return stub
108+
109+
def _run(self, stub):
110+
return InnerBoundSpoke._maybe_write_incumbent_on_improvement(stub)
111+
112+
# ---- disabled branches ----
113+
114+
def test_noop_when_prefix_is_none(self):
115+
stub = self._stub(prefix=None)
116+
self._run(stub)
117+
stub.opt.write_first_stage_solution.assert_not_called()
118+
self.assertEqual(stub._incumbent_write_counter, 0)
119+
self.assertFalse(stub._incumbent_write_disabled)
120+
121+
def test_noop_when_already_disabled(self):
122+
stub = self._stub(prefix="/tmp/p", disabled=True)
123+
self._run(stub)
124+
stub.opt.write_first_stage_solution.assert_not_called()
125+
# counter untouched, disabled stays True
126+
self.assertEqual(stub._incumbent_write_counter, 0)
127+
self.assertTrue(stub._incumbent_write_disabled)
128+
129+
# ---- happy path ----
130+
131+
def test_happy_path_writes_csv_and_npy_and_bumps_counter(self):
132+
stub = self._stub(prefix="/tmp/inc", counter=0)
133+
self._run(stub)
134+
135+
# csv goes first, with default writer (no kwarg)
136+
call_csv, call_npy = stub.opt.write_first_stage_solution.call_args_list
137+
self.assertEqual(call_csv.args, ("/tmp/inc_0000.csv",))
138+
self.assertEqual(call_csv.kwargs, {})
139+
140+
# npy follows with the explicit npy serializer
141+
self.assertEqual(call_npy.args, ("/tmp/inc_0000.npy",))
142+
self.assertEqual(
143+
call_npy.kwargs,
144+
{"first_stage_solution_writer": sputils.first_stage_nonant_npy_serializer},
145+
)
146+
147+
self.assertEqual(stub._incumbent_write_counter, 1)
148+
self.assertFalse(stub._incumbent_write_disabled)
149+
150+
def test_counter_zero_pads_to_four_digits(self):
151+
stub = self._stub(prefix="/tmp/inc", counter=42)
152+
self._run(stub)
153+
names = [c.args[0] for c in stub.opt.write_first_stage_solution.call_args_list]
154+
self.assertEqual(names, ["/tmp/inc_0042.csv", "/tmp/inc_0042.npy"])
155+
self.assertEqual(stub._incumbent_write_counter, 43)
156+
157+
def test_repeated_calls_increment_counter(self):
158+
stub = self._stub(prefix="/tmp/inc", counter=0)
159+
self._run(stub)
160+
self._run(stub)
161+
self._run(stub)
162+
names = [c.args[0] for c in stub.opt.write_first_stage_solution.call_args_list]
163+
# 3 calls × 2 files each = 6 filenames, counters 0/0/1/1/2/2
164+
self.assertEqual(names, [
165+
"/tmp/inc_0000.csv", "/tmp/inc_0000.npy",
166+
"/tmp/inc_0001.csv", "/tmp/inc_0001.npy",
167+
"/tmp/inc_0002.csv", "/tmp/inc_0002.npy",
168+
])
169+
self.assertEqual(stub._incumbent_write_counter, 3)
170+
171+
# ---- fail-soft branches ----
172+
173+
def test_runtime_error_warns_on_rank0_and_disables(self):
174+
stub = self._stub(prefix="/tmp/inc", rank=0, counter=5)
175+
stub.opt.write_first_stage_solution.side_effect = RuntimeError(
176+
"No first stage solution available"
177+
)
178+
with warnings.catch_warnings(record=True) as w:
179+
warnings.simplefilter("always")
180+
# must not propagate
181+
self._run(stub)
182+
self.assertTrue(stub._incumbent_write_disabled)
183+
# counter must NOT be incremented — the increment is past the try block
184+
self.assertEqual(stub._incumbent_write_counter, 5)
185+
# exactly one warning, and it mentions the option name
186+
self.assertEqual(len(w), 1)
187+
self.assertIn("incumbent_on_improvement_filename_prefix", str(w[0].message))
188+
self.assertIn("Disabling", str(w[0].message))
189+
190+
def test_runtime_error_silent_on_nonzero_rank(self):
191+
stub = self._stub(prefix="/tmp/inc", rank=1, counter=0)
192+
stub.opt.write_first_stage_solution.side_effect = RuntimeError("boom")
193+
with warnings.catch_warnings(record=True) as w:
194+
warnings.simplefilter("always")
195+
self._run(stub)
196+
# disabled flag still set so future calls short-circuit on every rank
197+
self.assertTrue(stub._incumbent_write_disabled)
198+
# but no warning printed on non-rank-0
199+
self.assertEqual(len(w), 0)
200+
201+
def test_disabled_after_failure_makes_next_call_noop(self):
202+
stub = self._stub(prefix="/tmp/inc", rank=0)
203+
stub.opt.write_first_stage_solution.side_effect = RuntimeError("nope")
204+
with warnings.catch_warnings():
205+
warnings.simplefilter("ignore")
206+
self._run(stub) # first call: trips fail-soft
207+
stub.opt.write_first_stage_solution.reset_mock()
208+
self._run(stub) # second call: short-circuits before writer
209+
stub.opt.write_first_stage_solution.assert_not_called()
210+
211+
212+
if __name__ == "__main__":
213+
unittest.main()

mpisppy/utils/cfg_vanilla.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ def shared_options(cfg, is_hub=False):
8181
# Optional initial xhat candidate file (.npy); None disables.
8282
# Consumed by XhatInnerBoundBase._try_file_xhat.
8383
"xhat_from_file" : cfg.get("xhat_from_file", None),
84+
# Optional filename prefix; if set, _BoundSpoke.update_if_improving
85+
# writes a first-stage solution snapshot on each new best incumbent.
86+
"incumbent_on_improvement_filename_prefix" : cfg.get(
87+
"incumbent_on_improvement_filename_prefix", None),
8488
}
8589
# The options-file (--solver-options-file) sits at the bottom of
8690
# axis 2: any CLI flags below override file entries at the same

mpisppy/utils/config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,16 @@ def popular_args(self):
295295
domain=str,
296296
default=None)
297297

298+
self.add_to_config("incumbent_on_improvement_filename_prefix",
299+
description="If set, incumbent (xhat) bound spokes "
300+
"write the first-stage solution to "
301+
"<prefix>_<NNNN>.csv and <prefix>_<NNNN>.npy "
302+
"each time they find a new best inner bound. "
303+
"<NNNN> is a zero-padded counter starting at "
304+
"0000. Default None disables.",
305+
domain=str,
306+
default=None)
307+
298308
self.add_to_config("tee_rank0_solves",
299309
description="Some cylinders support tee of rank 0 solves."
300310
"(With multiple cylinders this could be confusing.)",

run_coverage.bash

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ run_phase "test_jensens (serial)" \
109109
run_phase "test_xhat_from_file (serial)" \
110110
coverage run --rcfile=.coveragerc -m pytest mpisppy/tests/test_xhat_from_file.py -v
111111

112+
run_phase "test_incumbent_writing (serial)" \
113+
coverage run --rcfile=.coveragerc -m pytest mpisppy/tests/test_incumbent_writing.py -v
114+
112115
run_phase "test_conf_int_farmer (spawns mpiexec)" \
113116
coverage run --rcfile=.coveragerc mpisppy/tests/test_conf_int_farmer.py
114117

0 commit comments

Comments
 (0)