diff --git a/pyomo/contrib/parmest/examples/rooney_biegler/regularization_example.py b/pyomo/contrib/parmest/examples/rooney_biegler/regularization_example.py new file mode 100644 index 00000000000..acdd9efa2ab --- /dev/null +++ b/pyomo/contrib/parmest/examples/rooney_biegler/regularization_example.py @@ -0,0 +1,114 @@ +# ____________________________________________________________________________________ +# +# Pyomo: Python Optimization Modeling Objects +# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC +# Under the terms of Contract DE-NA0003525 with National Technology and Engineering +# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this +# software. This software is distributed under the 3-clause BSD License. +# ____________________________________________________________________________________ + +from pyomo.common.dependencies import pandas as pd +import pyomo.contrib.parmest.parmest as parmest +from pyomo.contrib.parmest.examples.rooney_biegler.rooney_biegler import ( + RooneyBieglerExperiment, +) + + +def main(): + """ + Compare L2 and smooth-L1 regularization on the Rooney-Biegler example. + + Notes + ----- + The model response saturates for large positive ``rate_constant`` values: + ``y = asymptote * (1 - exp(-rate_constant * hour))``. + If ``rate_constant`` is both unpenalized and unbounded, the objective can be + nearly flat in that direction, which can lead to extreme fitted values. + To keep the smooth-L1 fit numerically stable and interpretable, this example: + 1) includes a nonzero L1 weight on ``rate_constant`` via ``prior_FIM_l1``, and + 2) applies finite bounds ``rate_constant in [0, 5]`` on each experiment model. + """ + + # Rooney & Biegler Reference Values + # a = 19.14, b = 0.53 + theta_ref = pd.Series({'asymptote': 20.0, 'rate_constant': 0.8}) + + # L2 setup: create a 'Stiff' Prior for 'asymptote' but leave 'rate_constant' flexible. + prior_FIM_l2 = pd.DataFrame( + [[1000.0, 0.0], [0.0, 0.0]], + index=['asymptote', 'rate_constant'], + columns=['asymptote', 'rate_constant'], + ) + # L1 setup: penalize both parameters to avoid an unregularized flat direction. + prior_FIM_l1 = pd.DataFrame( + [[1000.0, 0.0], [0.0, 1.0]], + index=['asymptote', 'rate_constant'], + columns=['asymptote', 'rate_constant'], + ) + + # Data + data = pd.DataFrame( + data=[[1, 8.3], [2, 10.3], [3, 19.0], [4, 16.0], [5, 15.6], [7, 19.8]], + columns=['hour', 'y'], + ) + + # Create an experiment list + exp_list = [] + for i in range(data.shape[0]): + exp = RooneyBieglerExperiment(data.loc[i, :]) + # Example-scoped stabilization: keep rate_constant in a practical range. + m = exp.get_labeled_model() + m.rate_constant.setlb(0.0) + m.rate_constant.setub(5.0) + exp_list.append(exp) + + # Create an instance of the parmest estimator (L2) + pest_l2 = parmest.Estimator( + exp_list, + obj_function="SSE", + regularization='L2', + prior_FIM=prior_FIM_l2, + theta_ref=theta_ref, + ) + + # Parameter estimation and covariance for L2 + obj_l2, theta_l2 = pest_l2.theta_est() + cov_l2 = pest_l2.cov_est() + + # L1 smooth regularization uses sqrt((theta - theta_ref)^2 + epsilon) + pest_l1 = parmest.Estimator( + exp_list, + obj_function="SSE", + regularization='L1', + prior_FIM=prior_FIM_l1, + theta_ref=theta_ref, + regularization_weight=1.0, + regularization_epsilon=1e-6, + ) + obj_l1, theta_l1 = pest_l1.theta_est() + cov_l1 = pest_l1.cov_est() + + if parmest.graphics.seaborn_available: + parmest.graphics.pairwise_plot( + (theta_l2, cov_l2, 100), + theta_star=theta_l2, + alpha=0.8, + distributions=['MVN'], + title='L2 regularized theta estimates within 80% confidence region', + ) + + # Assert statements compare parameter estimation (theta) to an expected value + # relative_error = abs(theta['asymptote'] - 19.1426) / 19.1426 + # assert relative_error < 0.01 + # relative_error = abs(theta['rate_constant'] - 0.5311) / 0.5311 + # assert relative_error < 0.01 + + return {"L2": (obj_l2, theta_l2, cov_l2), "L1": (obj_l1, theta_l1, cov_l1)} + + +if __name__ == "__main__": + results = main() + for reg_name, (obj, theta, cov) in results.items(): + print(f"{reg_name} estimated parameters (theta):", theta) + print(f"{reg_name} objective function value at theta:", obj) + print(f"{reg_name} covariance of parameter estimates:", cov) diff --git a/pyomo/contrib/parmest/parmest.py b/pyomo/contrib/parmest/parmest.py index 21138d472da..525bc303e13 100644 --- a/pyomo/contrib/parmest/parmest.py +++ b/pyomo/contrib/parmest/parmest.py @@ -297,6 +297,335 @@ def SSE_weighted(model): ) +def _validate_prior_FIM(prior_FIM, require_psd=True): + """ + Validate user-supplied prior Fisher Information Matrix. + + Parameters + ---------- + prior_FIM : pd.DataFrame + Prior Fisher Information Matrix with parameter names as both row and + column labels. + require_psd : bool, optional + If True, enforce positive semi-definiteness. Default is True. + """ + if not isinstance(prior_FIM, pd.DataFrame): + raise TypeError("prior_FIM must be a pandas DataFrame.") + + if prior_FIM.shape[0] != prior_FIM.shape[1]: + raise ValueError("prior_FIM must be square.") + + if set(prior_FIM.index) != set(prior_FIM.columns): + raise ValueError( + "prior_FIM row and column labels must match the same parameter names." + ) + + if not np.issubdtype(prior_FIM.values.dtype, np.number): + raise TypeError("prior_FIM entries must be numeric.") + + if not np.all(np.isfinite(prior_FIM.values)): + raise ValueError("prior_FIM entries must be finite.") + + if not np.allclose(prior_FIM.values, prior_FIM.values.T): + raise ValueError("prior_FIM must be symmetric.") + + if require_psd: + eigenvalues = np.linalg.eigvalsh(prior_FIM.values) + if np.any(eigenvalues < -1e-10): # tolerance for numerical noise + raise ValueError("prior_FIM must be positive semi-definite.") + + +def _calculate_L2_penalty(model, prior_FIM, theta_ref=None): + """ + Calculates (theta - theta_ref)^T * prior_FIM * (theta - theta_ref) + using label-based alignment for safety and subsets for efficiency. + + Parameters + ---------- + model : ConcreteModel + Annotated Pyomo model + prior_FIM : pd.DataFrame + Prior Fisher Information Matrix from previous experimental design + theta_ref: pd.Series, optional + Reference parameter values used in regularization. If None, defaults to the current parameter values in the model. + Returns + ------- + expr : Pyomo expression + Expression representing the L2-regularized objective addition + """ + _validate_prior_FIM(prior_FIM, require_psd=True) + + # Get current model parameters + # We assume model.unknown_parameters is a list of Pyomo Var objects + current_param_names = [p.name for p in model.unknown_parameters] + param_map = {p.name: p for p in model.unknown_parameters} + + # Confirm all matching parameters in both prior_FIM index and columns + common_params = [ + p + for p in current_param_names + if p in prior_FIM.columns and p in prior_FIM.index + ] + + if len(common_params) == 0: + + logger.warning( + "Warning: No matching parameters found between Model and Prior FIM. Returning standard objective." + ) + + return 0.0 # No regularization if no common parameters + + elif len(common_params) < len(set(prior_FIM.columns).union(set(prior_FIM.index))): + + logger.warning( + "Warning: Only a subset of parameters in the Prior FIM match the Model parameters. " + "Regularization will be applied only to the matching subset." + ) + else: + + logger.info( + "All parameters in the Prior FIM match the Model parameters. Regularization will be applied to all parameters." + ) + + # Slice the dataframes to ONLY the common subset + sub_FIM = prior_FIM.loc[common_params, common_params] + + # For the reference theta, we also subset to the common parameters. If theta_ref is None, we create a zero vector of the right size. + if theta_ref is not None: + if not isinstance(theta_ref, pd.Series): + theta_ref = pd.Series(theta_ref) + + missing_ref = [p for p in common_params if p not in theta_ref.index] + if missing_ref: + raise ValueError( + "theta_ref is missing values for parameter(s): " + + ", ".join(missing_ref) + ) + sub_theta = theta_ref.loc[common_params] + else: + sub_theta = pd.Series(0, index=common_params) + + # Fill the sub_theta with the initialized model parameter values (or zeros if not initialized) + for param in common_params: + sub_theta.loc[param] = pyo.value(param_map[param]) + logger.info( + "theta_ref is None. Using initialized parameter values as reference." + ) + + # Construct the Quadratic Form (The L2 term) + # @Reviewers: This can be calculated in a loop or in a vector form. + # Are there any issues with the vectorized form that we should be aware of for Pyomo expressions? + # The loop form is more transparent but the vectorized form is more efficient and concise. + + # l2_expr = 0.0 + # for i in common_params: + # di = name_to_var[i] - float(theta0.loc[i]) + # for j in common_params: + # qij = float(sub_FIM.loc[i, j]) + # if qij != 0.0: + # dj = name_to_var[j] - float(theta0.loc[j]) + # l2_expr += qij * di * dj + + # Create the (theta - theta_ref) vector, ensuring alignment by parameter name + delta_params = np.array( + [param_map[p] - sub_theta.loc[p] for p in common_params] + ) # (theta - theta_ref) vector + + # Compute the quadratic form: delta^T * FIM * delta + l2_term = delta_params.T @ sub_FIM.values @ delta_params + l2_term *= 0.5 # apply the 0.5 convention for quadratic regularization + + return l2_term + + +def L2_regularized_objective( + model, prior_FIM, theta_ref=None, regularization_weight=1.0, obj_function=SSE +): + """ + Calculates objective + regularization_weight*(theta - theta_ref)^T * prior_FIM * (theta - theta_ref) + using label-based alignment for safety and subsets for efficiency. + + Parameters + ---------- + model : ConcreteModel + Annotated Pyomo model + prior_FIM : pd.DataFrame + Prior Fisher Information Matrix from previous experimental design + theta_ref: pd.Series, optional + Reference parameter values used in regularization. If None, defaults to the current parameter values in the model. + regularization_weight: float, optional + Weighting factor for the regularization term. Default is 1.0. + obj_function: callable, optional + Built-in objective function selected by the user, e.g., `SSE`. Default is `SSE`. + + Returns + ------- + expr : Pyomo expression + Expression representing the L2-regularized objective + """ + + # Calculate the L2 penalty term + l2_term = _calculate_L2_penalty(model, prior_FIM, theta_ref) + + # Combine with objective + object_expr = obj_function(model) + + # Calculate the regularized objective + l2reg_objective = object_expr + (regularization_weight * l2_term) + + return l2reg_objective + + +def _calculate_L1_smooth_penalty( + model, prior_FIM, theta_ref=None, regularization_epsilon=1e-6 +): + """ + Calculates smooth L1 penalty: + sum_i w_i*sqrt((theta_i - theta_ref_i)^2 + regularization_epsilon) + where w_i is the corresponding diagonal entry in prior_FIM. + using label-based alignment for safety and subsets for efficiency. + + Parameters + ---------- + model : ConcreteModel + Annotated Pyomo model + prior_FIM : pd.DataFrame + Prior Fisher Information Matrix used for label-based variable selection + theta_ref: pd.Series, optional + Reference parameter values used in regularization. If None, defaults to + the current parameter values in the model. + regularization_epsilon: float, optional + Positive smoothing parameter in the smooth absolute value approximation. + + Returns + ------- + expr : Pyomo expression + Expression representing the smooth L1 objective addition + """ + _validate_prior_FIM(prior_FIM, require_psd=True) + + if regularization_epsilon <= 0: + raise ValueError("regularization_epsilon must be positive.") + + # Get current model parameters + current_param_names = [p.name for p in model.unknown_parameters] + param_map = {p.name: p for p in model.unknown_parameters} + + # Confirm all matching parameters in both prior_FIM index and columns + common_params = [ + p + for p in current_param_names + if p in prior_FIM.columns and p in prior_FIM.index + ] + + if len(common_params) == 0: + logger.warning( + "Warning: No matching parameters found between Model and Prior FIM. Returning standard objective." + ) + return 0.0 + elif len(common_params) < len(set(prior_FIM.columns).union(set(prior_FIM.index))): + logger.warning( + "Warning: Only a subset of parameters in the Prior FIM match the Model parameters. " + "Regularization will be applied only to the matching subset." + ) + else: + logger.info( + "All parameters in the Prior FIM match the Model parameters. Regularization will be applied to all parameters." + ) + + # Slice to matching subset and use diagonal entries as nonnegative weights. + sub_FIM = prior_FIM.loc[common_params, common_params] + + # Construct reference vector over the matching subset + if theta_ref is not None: + if not isinstance(theta_ref, pd.Series): + theta_ref = pd.Series(theta_ref) + missing_ref = [p for p in common_params if p not in theta_ref.index] + if missing_ref: + raise ValueError( + "theta_ref is missing values for parameter(s): " + + ", ".join(missing_ref) + ) + sub_theta = theta_ref.loc[common_params] + else: + sub_theta = pd.Series(0, index=common_params) + for param in common_params: + sub_theta.loc[param] = pyo.value(param_map[param]) + logger.info( + "theta_ref is None. Using initialized parameter values as reference." + ) + + l1_term = 0.0 + for p in common_params: + weight = float(sub_FIM.loc[p, p]) + var = param_map[p] + has_finite_lb = True + has_finite_ub = True + if hasattr(var, "lb") and hasattr(var, "ub"): + lb = pyo.value(var.lb, exception=False) if var.lb is not None else None + ub = pyo.value(var.ub, exception=False) if var.ub is not None else None + has_finite_lb = lb is not None and np.isfinite(lb) + has_finite_ub = ub is not None and np.isfinite(ub) + if weight == 0.0: + if not has_finite_lb and not has_finite_ub: + logger.warning( + "L1 regularization weight is zero for parameter '%s' and the " + "parameter has no finite bounds. This can create a weakly " + "identified flat direction.", + p, + ) + continue + delta = var - sub_theta.loc[p] + l1_term += weight * pyo.sqrt(delta * delta + regularization_epsilon) + + return l1_term + + +def L1_regularized_objective( + model, + prior_FIM, + theta_ref=None, + regularization_weight=1.0, + regularization_epsilon=1e-6, + obj_function=SSE, +): + """ + Calculates objective + regularization_weight*sum_i w_i*sqrt((theta_i-theta_ref_i)^2 + regularization_epsilon) + where w_i is the corresponding diagonal entry in prior_FIM. + using label-based alignment for safety and subsets for efficiency. + + Parameters + ---------- + model : ConcreteModel + Annotated Pyomo model + prior_FIM : pd.DataFrame + Prior Fisher Information Matrix used for label-based variable selection + theta_ref: pd.Series, optional + Reference parameter values used in regularization. If None, defaults to + the current parameter values in the model. + regularization_weight: float, optional + Weighting factor for the regularization term. Default is 1.0. + regularization_epsilon: float, optional + Positive smoothing parameter in the smooth absolute value approximation. + obj_function: callable, optional + Built-in objective function selected by the user, e.g., `SSE`. Default is `SSE`. + + Returns + ------- + expr : Pyomo expression + Expression representing the smooth L1-regularized objective + """ + l1_term = _calculate_L1_smooth_penalty( + model, + prior_FIM=prior_FIM, + theta_ref=theta_ref, + regularization_epsilon=regularization_epsilon, + ) + object_expr = obj_function(model) + l1reg_objective = object_expr + (regularization_weight * l1_term) + return l1reg_objective + + def _check_model_labels(model): """ Checks if the annotated Pyomo model contains the necessary suffixes @@ -375,6 +704,11 @@ class ObjectiveType(Enum): SSE_weighted = "SSE_weighted" +class RegularizationType(Enum): + L2 = "L2" + L1 = "L1" + + # Compute the Jacobian matrix of measured variables with respect to the parameters def _compute_jacobian(experiment, theta_vals, step, solver, tee): """ @@ -474,6 +808,8 @@ def compute_covariance_matrix( solver, tee, estimated_var=None, + prior_FIM=None, + regularization_weight=1.0, ): """ Computes the covariance matrix of the estimated parameters using @@ -502,7 +838,13 @@ def compute_covariance_matrix( Value of the estimated variance of the measurement error in cases where the user does not supply the measurement error standard deviation - + prior_FIM: pd.DataFrame, optional + Prior Fisher Information Matrix from previous experimental design + to be added to the FIM of the current experiments for covariance estimation. + The prior_FIM should be a square matrix with parameter names as both + row and column labels. + regularization_weight: float, optional + Weighting factor for the regularization term. Default is 1.0. Returns ------- cov : pd.DataFrame @@ -540,6 +882,24 @@ def compute_covariance_matrix( FIM = np.sum(FIM_all_exp, axis=0) + # Add prior_FIM if including regularization. We expand the prior FIM to match the size of the + # FIM and align it based on parameter names to ensure correct addition. + # Add the prior FIM to the FIM of the current experiments, weighted by the regularization weight + if prior_FIM is not None: + if regularization_weight < 0: + raise ValueError("regularization_weight must be nonnegative.") + + expanded_prior_FIM = _expand_prior_FIM( + experiment_list[0], prior_FIM # theta_vals + ) # sanity check and alignment + + # Check that the prior FIM shape is the same as the FIM shape + if expanded_prior_FIM.shape != FIM.shape: + raise ValueError( + "The shape of the prior FIM must be the same as the shape of the FIM." + ) + FIM += expanded_prior_FIM * regularization_weight + # calculate the covariance matrix try: cov = np.linalg.inv(FIM) @@ -745,6 +1105,40 @@ def _kaug_FIM(experiment, obj_function, theta_vals, solver, tee, estimated_var=N return FIM +def _expand_prior_FIM(experiment, prior_FIM): + """ + Expands the prior FIM to match the size of the FIM of the current experiment + + Parameters + ---------- + experiment : Experiment class + Experiment class object that contains the Pyomo model + for a particular experimental condition + prior_FIM : pd.DataFrame + Prior Fisher Information Matrix from previous experimental design + + Returns + ------- + expanded_prior_FIM : pd.DataFrame + Expanded prior FIM with the same size as the FIM of the current experiment + """ + _validate_prior_FIM(prior_FIM, require_psd=True) + + model = _get_labeled_model(experiment) + + # Extract parameter names from the Pyomo model + param_names = [param.name for param in model.unknown_parameters] + + # 1. Expand Prior FIM + # We reindex to match param_names. Parameters not in prior_FIM + # will be filled with 0, which maintains the PSD property. + expanded_prior_FIM = prior_FIM.reindex( + index=param_names, columns=param_names, fill_value=0.0 + ) + + return expanded_prior_FIM + + class Estimator: """ Parameter estimation class @@ -768,6 +1162,28 @@ class Estimator: solver_options: dict, optional Provides options to the solver (also the name of an attribute). Default is None. + + Added keyword arguments for objective regularization: + regularization: string, optional + Built-in regularization type ("L2" or "L1"). If no regularization is + specified, no regularization term is added to the objective. + Default is None. + prior_FIM: pd.DataFrame, optional + Prior Fisher Information Matrix from previous experimental design + to be added to the FIM of the current experiments for regularization. + The prior_FIM should be a square matrix with parameter names as both + row and column labels. + theta_ref: pd.Series, optional + Reference parameter values used in regularization. + If None, defaults to the current parameter values in the model. + regularization_weight: float, optional + Weighting factor for the regularization term. Used with + ``regularization="L2"`` or ``regularization="L1"`` + and defaults to 1.0 when omitted. + regularization_epsilon: float, optional + Positive smoothing parameter used with ``regularization="L1"`` + in ``sqrt((theta-theta_ref)^2 + regularization_epsilon)``. + Defaults to ``1e-6`` when omitted. """ # The singledispatchmethod decorator is used here as a deprecation @@ -780,6 +1196,11 @@ def __init__( self, experiment_list, obj_function=None, + regularization=None, + prior_FIM=None, + theta_ref=None, + regularization_weight=None, + regularization_epsilon=None, tee=False, diagnostic_mode=False, solver_options=None, @@ -810,10 +1231,91 @@ def __init__( else: self.obj_function = obj_function + if isinstance(regularization, str): + try: + self.regularization = RegularizationType(regularization) + except ValueError: + raise ValueError( + f"Invalid regularization type: '{regularization}'. " + f"Choose from: {[e.value for e in RegularizationType]}." + ) + elif isinstance(regularization, RegularizationType): + self.regularization = regularization + elif regularization is None: + self.regularization = None + else: + raise TypeError( + "regularization must be None or one of " + f"{[e.value for e in RegularizationType]}." + ) + self.tee = tee self.diagnostic_mode = diagnostic_mode self.solver_options = solver_options + # Validate regularization options and defaults + if self.regularization is None: + if ( + prior_FIM is not None + or theta_ref is not None + or regularization_weight is not None + or regularization_epsilon is not None + ): + raise ValueError( + "regularization must be set when supplying prior_FIM, theta_ref, " + "regularization_weight, or regularization_epsilon." + ) + self.prior_FIM = None + self.theta_ref = None + self.regularization_weight = None + self.regularization_epsilon = None + elif self.regularization == RegularizationType.L2: + if prior_FIM is None: + raise ValueError("prior_FIM must be provided when regularization='L2'.") + _validate_prior_FIM(prior_FIM, require_psd=True) + if theta_ref is not None and not isinstance(theta_ref, pd.Series): + theta_ref = pd.Series(theta_ref) + if regularization_epsilon is not None: + raise ValueError( + "regularization_epsilon is only supported when regularization='L1'." + ) + + if regularization_weight is None: + regularization_weight = 1.0 + if regularization_weight < 0: + raise ValueError("regularization_weight must be nonnegative.") + + self.prior_FIM = prior_FIM + self.theta_ref = theta_ref + self.regularization_weight = regularization_weight + self.regularization_epsilon = None + elif self.regularization == RegularizationType.L1: + if prior_FIM is None: + raise ValueError("prior_FIM must be provided when regularization='L1'.") + _validate_prior_FIM(prior_FIM, require_psd=True) + if theta_ref is not None and not isinstance(theta_ref, pd.Series): + theta_ref = pd.Series(theta_ref) + + if regularization_weight is None: + regularization_weight = 1.0 + if regularization_weight < 0: + raise ValueError("regularization_weight must be nonnegative.") + + if regularization_epsilon is None: + regularization_epsilon = 1e-6 + if regularization_epsilon <= 0: + raise ValueError("regularization_epsilon must be positive.") + + self.prior_FIM = prior_FIM + self.theta_ref = theta_ref + self.regularization_weight = regularization_weight + self.regularization_epsilon = regularization_epsilon + else: + raise ValueError( + f"Unsupported regularization option: {self.regularization}. " + f"Choose from {[e.value for e in RegularizationType]}." + ) + # TODO: delete this when the deprecated interface is removed self.pest_deprecated = None @@ -909,6 +1411,63 @@ def _expand_indexed_unknowns(self, model_temp): return model_theta_list + # Reviewers: Put in architecture to calculate a regularization weight based on the current parameter values and the prior FIM. + # However, if the prior_FIM is properly defined, this should not be necessary. Are there any use cases for this where we should give + # the user a scaling option, or remove and trust the prior_FIM to be properly scaled? + + # def _calc_regularization_weight(self, solver='ipopt'): + # """ + # Calculate regularization weight as the ratio of the objective value to the L2 term value at the current parameter values to balance their magnitudes. + # """ + # # Solve the model at the current parameter values to get the objective function value + # sse_vals = [] + # for experiment in self.exp_list: + # model = _get_labeled_model(experiment) + + # # fix the value of the unknown parameters to the estimated values + # for param in model.unknown_parameters: + # param.fix(pyo.value(param)) + + # # re-solve the model with the estimated parameters + # results = pyo.SolverFactory(solver).solve(model, tee=self.tee) + # assert_optimal_termination(results) + + # # choose and evaluate the sum of squared errors expression + # if self.obj_function == ObjectiveType.SSE: + # sse_expr = SSE(model) + # elif self.obj_function == ObjectiveType.SSE_weighted: + # sse_expr = SSE_weighted(model) + # else: + # raise ValueError( + # f"Invalid objective function for covariance calculation. " + # f"The covariance matrix can only be calculated using the built-in " + # f"objective functions: {[e.value for e in ObjectiveType]}. Supply " + # f"the Estimator object one of these built-in objectives and " + # f"re-run the code." + # ) + # l2_expr = _calculate_L2_penalty(model, self.prior_FIM, self.theta_ref) + + # # evaluate the numerical SSE and store it + # sse_val = pyo.value(sse_expr) + # sse_vals.append(sse_val) + + # sse = sum(sse_vals) + + # if l2_expr is None: + # l2_value = 0 + # else: + # l2_value = pyo.value(l2_expr) + + # if l2_value == 0: + # logger.warning( + # "L2 penalty is zero at the current parameter values. Regularization weight set to 1.0 by default." + # ) + # return 1.0 + + # reg_weight = float(pyo.value(sse) / (pyo.value(l2_value))) + # logger.info(f"Calculated regularization weight: {reg_weight}") + # return reg_weight + def _create_parmest_model(self, experiment_number): """ Modify the Pyomo model for parameter estimation @@ -939,15 +1498,45 @@ def _create_parmest_model(self, experiment_number): # TODO, this needs to be turned into an enum class of options that still support # custom functions - if self.obj_function is ObjectiveType.SSE: - second_stage_rule = SSE - self.covariance_objective = second_stage_rule - elif self.obj_function is ObjectiveType.SSE_weighted: - second_stage_rule = SSE_weighted - self.covariance_objective = second_stage_rule + + if isinstance(self.obj_function, ObjectiveType): + + if self.obj_function == ObjectiveType.SSE: + self.covariance_objective = SSE + + elif self.obj_function == ObjectiveType.SSE_weighted: + self.covariance_objective = SSE_weighted + + base_objective = self.covariance_objective else: # A custom function uses model.experiment_outputs as data - second_stage_rule = self.obj_function + self.covariance_objective = None + base_objective = self.obj_function + + if self.regularization is None: + second_stage_rule = base_objective + elif self.regularization == RegularizationType.L2: + second_stage_rule = lambda m: L2_regularized_objective( + m, + prior_FIM=self.prior_FIM, + theta_ref=self.theta_ref, + regularization_weight=self.regularization_weight, + obj_function=base_objective, + ) + elif self.regularization == RegularizationType.L1: + second_stage_rule = lambda m: L1_regularized_objective( + m, + prior_FIM=self.prior_FIM, + theta_ref=self.theta_ref, + regularization_weight=self.regularization_weight, + regularization_epsilon=self.regularization_epsilon, + obj_function=base_objective, + ) + else: + raise ValueError( + f"Unsupported regularization option: {self.regularization}. " + f"Choose from {[e.value for e in RegularizationType]}." + ) model.FirstStageCost = pyo.Expression(expr=0) model.SecondStageCost = pyo.Expression(rule=second_stage_rule) @@ -1262,6 +1851,12 @@ def _cov_at_theta(self, method, solver, step): ) # check if the user specified 'SSE' or 'SSE_weighted' as the objective function + cov_prior_FIM = None + cov_regularization_weight = None + if self.regularization == RegularizationType.L2: + cov_prior_FIM = self.prior_FIM + cov_regularization_weight = self.regularization_weight + if self.obj_function == ObjectiveType.SSE: # check if the user defined the 'measurement_error' attribute if hasattr(model, "measurement_error"): @@ -1298,6 +1893,8 @@ def _cov_at_theta(self, method, solver, step): method, obj_function=self.covariance_objective, theta_vals=self.estimated_theta, + prior_FIM=cov_prior_FIM, + regularization_weight=cov_regularization_weight, solver=solver, step=step, tee=self.tee, @@ -1328,7 +1925,10 @@ def _cov_at_theta(self, method, solver, step): solver=solver, step=step, tee=self.tee, + prior_FIM=cov_prior_FIM, + regularization_weight=cov_regularization_weight, ) + else: raise ValueError( "One or more values of the measurement errors have " @@ -1366,6 +1966,8 @@ def _cov_at_theta(self, method, solver, step): method, obj_function=self.covariance_objective, theta_vals=self.estimated_theta, + prior_FIM=cov_prior_FIM, + regularization_weight=cov_regularization_weight, step=step, solver=solver, tee=self.tee, @@ -1610,6 +2212,8 @@ def _get_sample_list(self, samplesize, num_samples, replacement=True): return samplelist + # @Reviewers: Currently regularization chosen with objective at Estimator initialization, + # Would it be preferable to have regularization choice as an argument in the theta_est function instead? def theta_est( self, solver="ef_ipopt", return_values=[], calc_cov=NOTSET, cov_n=NOTSET ): diff --git a/pyomo/contrib/parmest/scenarios.csv b/pyomo/contrib/parmest/scenarios.csv new file mode 100644 index 00000000000..af286781a20 --- /dev/null +++ b/pyomo/contrib/parmest/scenarios.csv @@ -0,0 +1,11 @@ +Name,Probability,k1,k2,E1,E2 +ExpScen0,0.1,25.800350784967552,14.144215235968407,31505.74904933868,35000.0 +ExpScen1,0.1,25.1283730831486,149.99999951481198,31452.3366518825,41938.78130161935 +ExpScen2,0.1,22.225574065242643,130.92739780149637,30948.66911165926,41260.15420926035 +ExpScen3,0.1,100.0,149.9999996987801,35182.7313074055,41444.52600370866 +ExpScen4,0.1,82.99114366257251,45.95424665356903,34810.857217160396,38300.63334950135 +ExpScen5,0.1,100.0,150.0,35142.202191502525,41495.411057950805 +ExpScen6,0.1,2.8743643265327625,149.99999474412596,25000.0,41431.61195917287 +ExpScen7,0.1,2.754580914039618,14.381786098093363,25000.0,35000.0 +ExpScen8,0.1,2.8743643265327625,149.99999474412596,25000.0,41431.61195917287 +ExpScen9,0.1,2.6697808222410906,150.0,25000.0,41514.74761132933 diff --git a/pyomo/contrib/parmest/tests/test_examples.py b/pyomo/contrib/parmest/tests/test_examples.py index 94a83d1f058..272f2944e82 100644 --- a/pyomo/contrib/parmest/tests/test_examples.py +++ b/pyomo/contrib/parmest/tests/test_examples.py @@ -8,6 +8,7 @@ # ____________________________________________________________________________________ import pyomo.common.unittest as unittest +import math import pyomo.contrib.parmest.parmest as parmest from pyomo.contrib.parmest.graphics import matplotlib_available, seaborn_available from pyomo.contrib.pynumero.asl import AmplInterface @@ -54,6 +55,25 @@ def test_likelihood_ratio_example(self): likelihood_ratio_example.main() + def test_regularization_example(self): + from pyomo.contrib.parmest.examples.rooney_biegler import regularization_example + + results = regularization_example.main() + # Keep this as a lightweight contract test: example must return both + # regularization modes with finite outputs. + self.assertIn("L1", results) + self.assertIn("L2", results) + + l1_obj, l1_theta, _ = results["L1"] + l2_obj, l2_theta, _ = results["L2"] + + self.assertTrue(math.isfinite(float(l1_obj))) + self.assertTrue(math.isfinite(float(l2_obj))) + self.assertTrue(math.isfinite(float(l1_theta["rate_constant"]))) + self.assertTrue(math.isfinite(float(l2_theta["rate_constant"]))) + self.assertGreaterEqual(float(l1_theta["rate_constant"]), 0.0) + self.assertLessEqual(float(l1_theta["rate_constant"]), 5.0) + @unittest.skipUnless(pynumero_ASL_available, "test requires libpynumero_ASL") @unittest.skipUnless(ipopt_available, "The 'ipopt' solver is not available") diff --git a/pyomo/contrib/parmest/tests/test_parmest.py b/pyomo/contrib/parmest/tests/test_parmest.py index dfeab769e71..5d7487715ae 100644 --- a/pyomo/contrib/parmest/tests/test_parmest.py +++ b/pyomo/contrib/parmest/tests/test_parmest.py @@ -1416,6 +1416,384 @@ def test_theta_est_with_square_initialization_diagnostic_mode_true(self): self.pest.diagnostic_mode = False +@unittest.skipIf( + not parmest.parmest_available, + "Cannot test parmest regularization: required dependencies are missing", +) +class TestRegularizationCore(unittest.TestCase): + # These tests intentionally use a tiny linear model so each expected + # regularization term can be computed analytically and reviewed quickly. + class LinearExperiment(Experiment): + def __init__(self, x, y, theta0_init=0.0, theta1_init=0.0): + self.x = float(x) + self.y = float(y) + self.theta0_init = float(theta0_init) + self.theta1_init = float(theta1_init) + super().__init__(model=None) + self.create_model() + self.label_model() + + def create_model(self): + m = pyo.ConcreteModel() + m.theta0 = pyo.Var(initialize=self.theta0_init) + m.theta1 = pyo.Var(initialize=self.theta1_init) + m.x = pyo.Param(initialize=self.x, mutable=True) + m.pred = pyo.Var(initialize=self.y) + m.pred_link = pyo.Constraint(expr=m.pred == m.theta0 + m.theta1 * m.x) + self.model = m + + def label_model(self): + m = self.model + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update([(m.pred, self.y)]) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters.update( + (k, pyo.ComponentUID(k)) for k in [m.theta0, m.theta1] + ) + m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.measurement_error.update([(m.pred, None)]) + + class DummyExperiment(Experiment): + def __init__(self): + m = pyo.ConcreteModel() + m.theta0 = pyo.Var(initialize=0.0) + m.theta1 = pyo.Var(initialize=0.0) + m.pred = pyo.Var(initialize=0.0) + m.pred_link = pyo.Constraint(expr=m.pred == m.theta0 + m.theta1) + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update([(m.pred, 0.0)]) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters.update( + (k, pyo.ComponentUID(k)) for k in [m.theta0, m.theta1] + ) + m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.measurement_error.update([(m.pred, None)]) + super().__init__(model=m) + + @staticmethod + def _make_var_labeled_model(y_obs=5.0): + # Var-based helper used for direct objective-expression checks. + m = pyo.ConcreteModel() + m.theta0 = pyo.Var(initialize=0.0) + m.theta1 = pyo.Var(initialize=0.0) + m.pred = pyo.Expression(expr=m.theta0 + 2.0 * m.theta1) + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update([(m.pred, float(y_obs))]) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters.update( + (k, pyo.ComponentUID(k)) for k in [m.theta0, m.theta1] + ) + return m + + @staticmethod + def _obj_at_theta(pest, theta0, theta1): + # Evaluate objective for a single theta row to keep assertions explicit. + theta = pd.DataFrame([[theta0, theta1]], columns=["theta0", "theta1"]) + return pest.objective_at_theta(theta_values=theta).iloc[0]["obj"] + + def test_l2_objective_value_matches_manual_quadratic(self): + m = self._make_var_labeled_model(y_obs=5.0) + m.theta0.set_value(4.0) + m.theta1.set_value(-1.0) + + prior_fim = pd.DataFrame( + [[2.0, 0.0], [0.0, 4.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + theta_ref = pd.Series({"theta0": 1.0, "theta1": 2.0}) + weight = 3.0 + + expr = parmest.L2_regularized_objective( + m, + prior_FIM=prior_fim, + theta_ref=theta_ref, + regularization_weight=weight, + obj_function=parmest.SSE, + ) + + sse_expected = 9.0 + l2_expected = 27.0 + expected = sse_expected + weight * l2_expected + + self.assertAlmostEqual(pyo.value(expr), expected) + + def test_l2_penalty_not_double_counted_across_scenarios(self): + # Confirms regularization is applied once at the estimator level, + # not once per scenario. + exp_list = [self.LinearExperiment(1.0, 1.0), self.LinearExperiment(2.0, 2.0)] + prior_fim = pd.DataFrame( + [[0.0, 0.0], [0.0, 2.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + theta_ref = pd.Series({"theta0": 0.0, "theta1": 0.0}) + + pest = parmest.Estimator( + exp_list, + obj_function="SSE", + regularization="L2", + prior_FIM=prior_fim, + theta_ref=theta_ref, + regularization_weight=1.0, + ) + + obj_val = self._obj_at_theta(pest, 0.0, 1.0) + self.assertAlmostEqual(obj_val, 1.0) + + def test_regularization_requires_explicit_option_when_prior_supplied(self): + # Guardrail: passing prior/FIM arguments without selecting a + # regularization mode should fail fast. + exp_list = [self.LinearExperiment(1.0, 1.0)] + prior_fim = pd.DataFrame( + [[1.0, 0.0], [0.0, 1.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + + with pytest.raises( + ValueError, match="regularization must be set when supplying prior_FIM" + ): + parmest.Estimator(exp_list, obj_function="SSE", prior_FIM=prior_fim) + + def test_l2_regularization_requires_prior_fim(self): + exp_list = [self.LinearExperiment(1.0, 1.0)] + + with pytest.raises(ValueError, match="prior_FIM must be provided"): + parmest.Estimator(exp_list, obj_function="SSE", regularization="L2") + + def test_user_specified_unsupported_regularization_raises(self): + exp_list = [self.LinearExperiment(1.0, 1.0)] + + with pytest.raises(TypeError, match="regularization must be None or one of"): + parmest.Estimator( + exp_list, obj_function="SSE", regularization=lambda m: m.theta0**2 + ) + + def test_l2_lambda_zero_matches_unregularized_objective(self): + exp_list = [self.LinearExperiment(1.0, 1.0), self.LinearExperiment(2.0, 2.0)] + prior_fim = pd.DataFrame( + [[2.0, 0.0], [0.0, 1.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + theta_ref = pd.Series({"theta0": 0.0, "theta1": 0.0}) + + pest_base = parmest.Estimator(exp_list, obj_function="SSE") + pest_l2_zero = parmest.Estimator( + exp_list, + obj_function="SSE", + regularization="L2", + prior_FIM=prior_fim, + theta_ref=theta_ref, + regularization_weight=0.0, + ) + + for theta0, theta1 in [(0.0, 0.0), (0.5, 1.5), (-1.0, 2.0)]: + obj_base = self._obj_at_theta(pest_base, theta0, theta1) + obj_l2_zero = self._obj_at_theta(pest_l2_zero, theta0, theta1) + self.assertAlmostEqual(obj_l2_zero, obj_base) + + def test_prior_subset_penalizes_only_selected_parameter(self): + # Prior indexed only by theta1 should leave theta0 unpenalized. + exp_list = [self.LinearExperiment(1.0, 1.0)] + prior_fim = pd.DataFrame([[4.0]], index=["theta1"], columns=["theta1"]) + + pest_base = parmest.Estimator(exp_list, obj_function="SSE") + pest_l2 = parmest.Estimator( + exp_list, + obj_function="SSE", + regularization="L2", + prior_FIM=prior_fim, + theta_ref=pd.Series({"theta1": 0.0}), + regularization_weight=1.0, + ) + + obj_base_theta0 = self._obj_at_theta(pest_base, theta0=1.0, theta1=0.0) + obj_l2_theta0 = self._obj_at_theta(pest_l2, theta0=1.0, theta1=0.0) + self.assertAlmostEqual(obj_l2_theta0, obj_base_theta0) + + obj_base_theta1 = self._obj_at_theta(pest_base, theta0=0.0, theta1=1.0) + obj_l2_theta1 = self._obj_at_theta(pest_l2, theta0=0.0, theta1=1.0) + expected_penalty = 0.5 * 4.0 * (1.0**2) + self.assertAlmostEqual(obj_l2_theta1 - obj_base_theta1, expected_penalty) + + def test_negative_regularization_weight_raises(self): + exp_list = [self.LinearExperiment(1.0, 1.0)] + prior_fim = pd.DataFrame( + [[1.0, 0.0], [0.0, 1.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + + with pytest.raises( + ValueError, match="regularization_weight must be nonnegative" + ): + parmest.Estimator( + exp_list, + obj_function="SSE", + regularization="L2", + prior_FIM=prior_fim, + regularization_weight=-1.0, + ) + + def test_missing_theta_ref_entries_raise_clear_error(self): + exp_list = [self.LinearExperiment(1.0, 1.0)] + prior_fim = pd.DataFrame( + [[1.0, 0.0], [0.0, 1.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + + pest = parmest.Estimator( + exp_list, + obj_function="SSE", + regularization="L2", + prior_FIM=prior_fim, + theta_ref=pd.Series({"theta0": 0.0}), + regularization_weight=1.0, + ) + + with pytest.raises( + ValueError, match=r"theta_ref is missing values for parameter\(s\): theta1" + ): + _ = self._obj_at_theta(pest, theta0=0.0, theta1=0.0) + + def test_non_psd_prior_fim_rejected(self): + exp_list = [self.LinearExperiment(1.0, 1.0)] + non_psd = pd.DataFrame( + [[1.0, 2.0], [2.0, -1.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + + with pytest.raises(ValueError, match="positive semi-definite"): + parmest.Estimator( + exp_list, obj_function="SSE", regularization="L2", prior_FIM=non_psd + ) + + def test_compute_covariance_matrix_adds_prior_fim_weighted(self): + exp_list = [self.DummyExperiment(), self.DummyExperiment()] + + def fake_finite_difference_FIM(*args, **kwargs): + return np.eye(2) + + prior_fim = pd.DataFrame( + [[1.0, 0.0], [0.0, 3.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + theta_vals = {"theta0": 0.0, "theta1": 0.0} + + # Replace finite-difference FIM with identity so this test isolates + # only the prior_FIM + regularization_weight addition path. + original_finite_difference_FIM = parmest._finite_difference_FIM + try: + parmest._finite_difference_FIM = fake_finite_difference_FIM + cov = parmest.compute_covariance_matrix( + experiment_list=exp_list, + method=parmest.CovarianceMethod.finite_difference.value, + obj_function=parmest.SSE, + theta_vals=theta_vals, + step=1e-3, + solver="ipopt", + tee=False, + prior_FIM=prior_fim, + regularization_weight=2.0, + ) + finally: + # Always restore global function to avoid cross-test contamination. + parmest._finite_difference_FIM = original_finite_difference_FIM + + self.assertAlmostEqual(cov.loc["theta0", "theta0"], 0.25) + self.assertAlmostEqual(cov.loc["theta1", "theta1"], 0.125) + self.assertAlmostEqual(cov.loc["theta0", "theta1"], 0.0) + self.assertAlmostEqual(cov.loc["theta1", "theta0"], 0.0) + + def test_l1_smooth_objective_value_matches_manual_expression(self): + # Verifies the smooth-L1 form: sum_i w_i * sqrt(delta_i^2 + epsilon). + m = self._make_var_labeled_model(y_obs=5.0) + m.theta0.set_value(4.0) + m.theta1.set_value(-1.0) + + prior_fim = pd.DataFrame( + [[2.0, 0.0], [0.0, 4.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + theta_ref = pd.Series({"theta0": 1.0, "theta1": 2.0}) + weight = 3.0 + eps = 1e-6 + + expr = parmest.L1_regularized_objective( + m, + prior_FIM=prior_fim, + theta_ref=theta_ref, + regularization_weight=weight, + regularization_epsilon=eps, + obj_function=parmest.SSE, + ) + + sse_expected = 9.0 + l1_expected = 2.0 * np.sqrt(9.0 + eps) + 4.0 * np.sqrt(9.0 + eps) + expected = sse_expected + weight * l1_expected + + self.assertAlmostEqual(pyo.value(expr), expected) + + def test_l1_lambda_zero_matches_unregularized_objective(self): + exp_list = [self.LinearExperiment(1.0, 1.0), self.LinearExperiment(2.0, 2.0)] + prior_fim = pd.DataFrame( + [[2.0, 0.0], [0.0, 1.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + theta_ref = pd.Series({"theta0": 0.0, "theta1": 0.0}) + + pest_base = parmest.Estimator(exp_list, obj_function="SSE") + pest_l1_zero = parmest.Estimator( + exp_list, + obj_function="SSE", + regularization="L1", + prior_FIM=prior_fim, + theta_ref=theta_ref, + regularization_weight=0.0, + regularization_epsilon=1e-6, + ) + + for theta0, theta1 in [(0.0, 0.0), (0.5, 1.5), (-1.0, 2.0)]: + obj_base = self._obj_at_theta(pest_base, theta0, theta1) + obj_l1_zero = self._obj_at_theta(pest_l1_zero, theta0, theta1) + self.assertAlmostEqual(obj_l1_zero, obj_base) + + def test_l1_nonpositive_epsilon_raises(self): + exp_list = [self.LinearExperiment(1.0, 1.0)] + prior_fim = pd.DataFrame( + [[1.0, 0.0], [0.0, 1.0]], + index=["theta0", "theta1"], + columns=["theta0", "theta1"], + ) + + with pytest.raises(ValueError, match="regularization_epsilon must be positive"): + parmest.Estimator( + exp_list, + obj_function="SSE", + regularization="L1", + prior_FIM=prior_fim, + regularization_weight=1.0, + regularization_epsilon=0.0, + ) + + with pytest.raises(ValueError, match="regularization_epsilon must be positive"): + parmest.Estimator( + exp_list, + obj_function="SSE", + regularization="L1", + prior_FIM=prior_fim, + regularization_weight=1.0, + regularization_epsilon=-1e-6, + ) + + ########################### # tests for deprecated UI # ###########################