Skip to content
Open
6 changes: 5 additions & 1 deletion .github/workflows/test_pr_and_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,11 @@ jobs:
mpisppy/tests/test_solver_spec.py \
mpisppy/tests/test_extensions.py \
mpisppy/tests/test_jensens.py \
mpisppy/tests/test_proper_bundler.py
mpisppy/tests/test_proper_bundler.py \
mpisppy/tests/test_grad_rho_bundles.py \
mpisppy/tests/test_sensi_rho_bundles.py \
mpisppy/tests/test_reduced_costs_rho_bundles.py \
mpisppy/tests/test_rho_deprecations.py

- name: Upload coverage data
if: always()
Expand Down
106 changes: 103 additions & 3 deletions mpisppy/cylinders/reduced_costs_spoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,75 @@
from mpisppy.cylinders.lagrangian_bounder import LagrangianOuterBound
from mpisppy.cylinders.spwindow import Field
from mpisppy.utils.sputils import is_persistent
from mpisppy.utils.nonant_sensitivities import _bundle_consensus_groups
from mpisppy import MPI, global_toc


def _assert_consensus_rc_loaded(scenario, consensus_groups):
"""Pre-aggregation guard: every Var in every consensus group must have a
reduced cost on ``scenario.rc`` before _consensus_rc_sum reads it.

This catches a future regression where ``vars_to_load`` is restricted
back to bundle ref Vars (or any other path) that leaves per-sub-scenario
nonants without an rc entry — the consensus sum would then silently
read stale or zero values. No-op for unbundled scenarios.
"""
if consensus_groups is None:
return
for ndn_i, group in consensus_groups.items():
for v in group:
# Plain `assert` would be stripped under `python -O`, disabling
# the guard exactly where silent stale-rc reads would do real
# damage. Raise unconditionally instead.
if v not in scenario.rc:
raise RuntimeError(
f"reduced cost not loaded for {v.name}; vars_to_load did "
f"not cover the consensus group at bundle position {ndn_i}"
)


def _consensus_rc_sum(scenario, ndn_i, ref_var, consensus_groups):
"""Aggregated reduced cost at a nonant position.

For an unbundled scenario this is just ``scenario.rc[ref_var]`` — the
single nonant Var's reduced cost.

For a proper bundle the bundle objective references each sub-scenario's
*own* nonant Vars (the bundle ref is the first sub-scenario's Var; the
rest are tied by NA equality constraints). Probing only the ref Var's
reduced cost captures one representative sub-scenario's contribution
plus the NA constraint dual; summing the reduced costs of every
per-sub-scenario nonant in the consensus group cancels the NA
multipliers and gives the rate of bundle-objective change per unit
consensus shift — the correct "expected reduced cost" input to
rc-rho/rc-fixing. See issue #673.

Note on probability weighting (no explicit weighting needed):

``sputils.create_EF`` builds the bundle objective as
``(sum_s prob_s * obj_s) / EF_prob = sum_s cond_prob_s * obj_s``,
where ``cond_prob_s = prob_s / EF_prob``. Pyomo's reduced cost for
each per-sub-scenario nonant is therefore
``bundle.rc[x_s,k] = cond_prob_s * unbundled_rc_s,k`` (the per-
sub-scenario constraint duals scale with ``cond_prob_s`` too because
the objective scaling carries through to the dual LP), modulo the NA
multipliers that cancel in the consensus sum. So the consensus sum
already equals
``sum_s cond_prob_s * unbundled_rc_s,k = (1/EF_prob) * sum_s prob_s * unbundled_rc_s,k``
— the ``cond_prob_s`` is baked in by ``create_EF``.

The caller multiplies the result by ``sub._mpisppy_probability``
(= ``EF_prob`` for a bundle), recovering ``sum_s prob_s * unbundled_rc_s,k``
— exactly the unbundled formula. Adding an explicit ``prob_s``
weighting inside this sum would double-count.
"""
if consensus_groups is None:
return scenario.rc[ref_var]
return sum(
scenario.rc[v]
for v in consensus_groups.get(ndn_i, (ref_var,))
)

class ReducedCostsSpoke(LagrangianOuterBound):

send_fields = (*LagrangianOuterBound.send_fields, Field.EXPECTED_REDUCED_COST, Field.SCENARIO_REDUCED_COST,
Expand Down Expand Up @@ -160,17 +227,45 @@ def extract_and_store_reduced_costs(self):
# would probably need additional info about where scenarios disagree
rc = np.zeros(self.nonant_length)

# _bundle_consensus_groups walks every sub-scenario/node/var in the
# bundle; cache once per `sub` and reuse below in the second pass that
# populates _scenario_rc_buffer.
consensus_groups_by_sub = {
sub: (
_bundle_consensus_groups(sub)
if hasattr(sub, "_ef_scenario_names")
else None
)
for sub in self.opt.local_scenarios.values()
}

for sub in self.opt.local_scenarios.values():
consensus_groups = consensus_groups_by_sub[sub]
if is_persistent(sub._solver_plugin):
# Note: what happens with non-persistent solvers?
# - if rc is accepted as a model suffix by the solver (e.g. gurobi shell), it is loaded in postsolve
# - if not, the solver should throw an error
# - direct solvers seem to behave the same as persistent solvers
# GurobiDirect needs vars_to_load argument
# XpressDirect loads for all vars by default - TODO: should notify someone of this inconsistency
vars_to_load = [x for _, x in sub._mpisppy_data.nonant_indices.items()]
if consensus_groups is None:
vars_to_load = [
x for _, x in sub._mpisppy_data.nonant_indices.items()
]
else:
# Bundle: load rc for every per-sub-scenario nonant Var,
# not only the ref. The consensus rc sums over the whole
# group; loading just the ref would leave most entries
# at their stale pre-solve values.
vars_to_load = []
for ndn_i, ref in sub._mpisppy_data.nonant_indices.items():
vars_to_load.extend(
consensus_groups.get(ndn_i, (ref,))
)
sub._solver_plugin.load_rc(vars_to_load=vars_to_load)

_assert_consensus_rc_loaded(sub, consensus_groups)

for ci, (ndn_i, xvar) in enumerate(sub._mpisppy_data.nonant_indices.items()):
# fixed by modeler
if ndn_i in self._modeler_fixed_nonants[sub]:
Expand All @@ -187,7 +282,9 @@ def extract_and_store_reduced_costs(self):
# solver takes care of sign of rc, based on lb, ub and max,min
# rc can be of wrong sign if numerically 0 - accepted here, checked in extension
if (xvar.lb is not None and xb - xvar.lb <= self.bound_tol) or (xvar.ub is not None and xvar.ub - xb <= self.bound_tol):
rc[ci] += sub._mpisppy_probability * sub.rc[xvar]
rc[ci] += sub._mpisppy_probability * _consensus_rc_sum(
sub, ndn_i, xvar, consensus_groups,
)
# not close to either bound -> rc = nan
else:
rc[ci] = np.nan
Expand All @@ -196,12 +293,15 @@ def extract_and_store_reduced_costs(self):
assert self._scenario_rc_buffer.size == self.send_buffers[Field.SCENARIO_REDUCED_COST].data_len()
ci = 0 # buffer index
for sub in self.opt.local_scenarios.values():
consensus_groups = consensus_groups_by_sub[sub]
for ndn_i, xvar in sub._mpisppy_data.nonant_indices.items():
# fixed by modeler
if ndn_i in self._modeler_fixed_nonants[sub]:
self._scenario_rc_buffer[ci] = np.nan
else:
self._scenario_rc_buffer[ci] = sub.rc[xvar]
self._scenario_rc_buffer[ci] = _consensus_rc_sum(
sub, ndn_i, xvar, consensus_groups,
)
ci += 1
self.rc_scenario = self._scenario_rc_buffer
# print(f"In ReducedCostsSpoke; {self.rc_scenario=}")
Expand Down
94 changes: 80 additions & 14 deletions mpisppy/extensions/grad_rho.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import mpisppy.MPI as MPI
from mpisppy import global_toc
import mpisppy.utils.sputils as sputils
from mpisppy.utils.nonant_sensitivities import _bundle_consensus_groups
from mpisppy.cylinders.spwindow import Field

class GradRho(mpisppy.extensions.dyn_rho_base.Dyn_Rho_extension_base):
Expand Down Expand Up @@ -129,32 +130,97 @@ def _scen_indep_denom(self):
return scen_indep_denom

def _get_grad_exprs(self):
""" Grabs and caches the gradient expressions for each scenario's objective (without proximal term). """
""" Grabs and caches the gradient expressions for each scenario's objective (without proximal term).

Proper bundles need special handling: ``s._mpisppy_data.nonant_indices``
holds the bundle's ref Vars, but those ref Vars *are* one specific
sub-scenario's nonant Vars (sputils.create_EF picks the first scenario's
Var as the ref); other sub-scenarios' nonants are tied to the ref by
equality constraints. Differentiating the bundle objective w.r.t. the
ref Vars therefore captures only that "representative" sub-scenario's
contribution. Instead, differentiate w.r.t. every per-sub-scenario
nonant Var and sum the resulting partials per bundle position
``(ndn, k)``. This mirrors the bundle branch in
``sputils.nonant_cost_coeffs``.
"""

self.grad_exprs = dict()
self._bundle_per_scen_vars = dict()

for s in self.opt.local_scenarios.values():
self.grad_exprs[s] = differentiate(sputils.find_active_objective(s),
wrt_list=s._mpisppy_data.nonant_indices.values(),
mode=Modes.reverse_symbolic,
)
active_obj = sputils.find_active_objective(s)

if hasattr(s, "_ef_scenario_names"):
# Shared helper builds the bundle position -> per-sub-scenario
# Vars mapping (see mpisppy/utils/nonant_sensitivities.py); we
# then flatten it into parallel wrt_vars/wrt_keys so we can
# differentiate once and re-aggregate the partials by key.
per_scen_vars = _bundle_consensus_groups(s)

wrt_vars = []
wrt_keys = []
for bundle_key, vars_at_pos in per_scen_vars.items():
for v in vars_at_pos:
wrt_vars.append(v)
wrt_keys.append(bundle_key)

partials = differentiate(active_obj,
wrt_list=wrt_vars,
mode=Modes.reverse_symbolic,
)

grad_dict = {ndn_i: 0 for ndn_i in s._mpisppy_data.nonant_indices}
for bundle_key, partial in zip(wrt_keys, partials):
grad_dict[bundle_key] = grad_dict[bundle_key] + partial
self.grad_exprs[s] = grad_dict
self._bundle_per_scen_vars[s] = per_scen_vars
else:
partials = differentiate(active_obj,
wrt_list=s._mpisppy_data.nonant_indices.values(),
mode=Modes.reverse_symbolic,
)
self.grad_exprs[s] = {ndn_i: partials[i]
for i, ndn_i in enumerate(s._mpisppy_data.nonant_indices)}

self.grad_exprs[s] = {ndn_i : self.grad_exprs[s][i] for i, ndn_i in enumerate(s._mpisppy_data.nonant_indices)}

return
return

def _eval_grad_exprs(self, s, xhat):
""" Evaluates the gradient expressions of the objectives for scenario s at xhat (if available) or the current values. """

ci = 0

grads = {}

if self.eval_at_xhat:
if True not in np.isnan(self.best_xhat_buf.value_array()):
for ndn_i, var in s._mpisppy_data.nonant_indices.items():
var.value = xhat[ci]
ci += 1

ci = 0
if s in self._bundle_per_scen_vars:
# Bundle: the cached gradient expressions reference
# per-sub-scenario nonant Vars. All per-sub-scenario
# nonants tied to a given bundle position are equal at
# an NA-feasible xhat, so set every one of them to the
# bundle's xhat value at that position.
per_scen_vars = self._bundle_per_scen_vars[s]
for ndn_i, var in s._mpisppy_data.nonant_indices.items():
var.value = xhat[ci]
for v in per_scen_vars.get(ndn_i, ()):
v.value = xhat[ci]
ci += 1
else:
for ndn_i, var in s._mpisppy_data.nonant_indices.items():
var.value = xhat[ci]
ci += 1

# Current-values branch (the default path: cfg.eval_at_xhat is False,
# or eval_at_xhat is True but no best xhat has been received yet).
# Pyomo evaluates the cached gradient expressions against whatever
# values are currently on the Vars — the most recent post-solve
# state. For a bundle this is correct ONLY because the NA equality
# constraints leave every per-sub-scenario nonant at bundle
# position k equal to the bundle's consensus value at that
# position; the cached expression is a sum of partials over those
# Vars and therefore evaluates the consensus-direction gradient.
# If a future caller invokes _eval_grad_exprs while the bundle is
# in a non-NA-feasible intermediate state, this branch would
# silently return a non-consensus gradient.
for ndn_i, var in s._mpisppy_data.nonant_indices.items():
grads[ndn_i] = pyo.value(self.grad_exprs[s][ndn_i])

Expand Down
9 changes: 9 additions & 0 deletions mpisppy/extensions/reduced_costs_rho.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# full copyright and license information.
###############################################################################

import warnings

import numpy as np
from mpisppy import global_toc
from mpisppy.extensions.sensi_rho import _SensiRhoBase
Expand All @@ -20,6 +22,13 @@ class ReducedCostsRho(_SensiRhoBase):
"""

def __init__(self, ph):
warnings.warn(
"ReducedCostsRho is scheduled for deprecation; reduced-cost "
"rho has not been demonstrated to be effective in practice. "
"See https://github.com/Pyomo/mpi-sppy/issues/673.",
DeprecationWarning,
stacklevel=2,
)
cfg = ph.options["reduced_costs_rho_options"]["cfg"]
super().__init__(ph, cfg)
self.ph = ph
Expand Down
9 changes: 9 additions & 0 deletions mpisppy/extensions/sep_rho.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# full copyright and license information.
###############################################################################

import warnings

import mpisppy.extensions.dyn_rho_base
import numpy as np
import mpisppy.MPI as MPI
Expand All @@ -24,6 +26,13 @@ class SepRho(mpisppy.extensions.dyn_rho_base.Dyn_Rho_extension_base):
"""

def __init__(self, ph):
warnings.warn(
"SepRho is scheduled for deprecation; its functionality is "
"expected to be subsumed into GradRho. See "
"https://github.com/Pyomo/mpi-sppy/issues/673.",
DeprecationWarning,
stacklevel=2,
)
cfg = ph.options["sep_rho_options"]["cfg"]
super().__init__(ph, cfg)
self.ph = ph
Expand Down
Loading
Loading