From ffb0095755edab3eecad7a4497a442ee4500c525 Mon Sep 17 00:00:00 2001 From: "Wojciech Mackowiak (Nokia)" Date: Tue, 31 Mar 2026 14:31:17 +0200 Subject: [PATCH 1/4] Simple first step --- pypop7/benchmarks/base_functions.py | 3 + pypop7/optimizers/pso/__init__.py | 4 +- pypop7/optimizers/pso/hpso.py | 126 ++++++++++++++++++++++++++++ pypop7/optimizers/pso/test_hpso.py | 14 ++++ 4 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 pypop7/optimizers/pso/hpso.py create mode 100644 pypop7/optimizers/pso/test_hpso.py diff --git a/pypop7/benchmarks/base_functions.py b/pypop7/benchmarks/base_functions.py index 0a6000b6b..7e8a36e88 100644 --- a/pypop7/benchmarks/base_functions.py +++ b/pypop7/benchmarks/base_functions.py @@ -492,3 +492,6 @@ def step(x): x = squeeze_and_check(x) y = np.sum(np.square(np.floor(x + 0.5))) return y + + +from pypop7.benchmarks.class_of_base_functions import BaseFunction # noqa: F401, E402 diff --git a/pypop7/optimizers/pso/__init__.py b/pypop7/optimizers/pso/__init__.py index 7664b0f48..4215a59ed 100644 --- a/pypop7/optimizers/pso/__init__.py +++ b/pypop7/optimizers/pso/__init__.py @@ -5,6 +5,7 @@ from pypop7.optimizers.pso.clpso import CLPSO from pypop7.optimizers.pso.ipso import IPSO from pypop7.optimizers.pso.ccpso2 import CCPSO2 +from pypop7.optimizers.pso.hpso import HPSO __all__ = [PSO, # Particle Swarm Optimizer [1995] @@ -13,4 +14,5 @@ CPSO, # Cooperative Particle Swarm Optimizer (CPSO) CLPSO, # Comprehensive Learning Particle Swarm Optimizer (CLPSO) IPSO, # Incremental Particle Swarm Optimizer (IPSO) - CCPSO2] # Cooperative Coevolving Particle Swarm Optimizer (CCPSO2) + CCPSO2, # Cooperative Coevolving Particle Swarm Optimizer (CCPSO2) + HPSO] # History-based Particle Swarm Optimizer (HPSO) diff --git a/pypop7/optimizers/pso/hpso.py b/pypop7/optimizers/pso/hpso.py new file mode 100644 index 000000000..b5f135158 --- /dev/null +++ b/pypop7/optimizers/pso/hpso.py @@ -0,0 +1,126 @@ +import numpy as np # engine for numerical computing +from collections import deque + +from pypop7.optimizers.pso.pso import PSO # abstract class of all particle swarm optimizer (PSO) classes + + +class HPSO(PSO): + """History-based Particle Swarm Optimizer (HPSO). + + Variant of standard global-topology PSO with an extra velocity term pulling each particle toward the + mean of the swarm global-best positions recorded over the last ``n_history`` completed generations. + + Parameters + ---------- + problem : dict + problem arguments with the following common settings (`keys`): + * 'fitness_function' - objective function to be **minimized** (`func`), + * 'ndim_problem' - number of dimensionality (`int`), + * 'upper_boundary' - upper boundary of search range (`array_like`), + * 'lower_boundary' - lower boundary of search range (`array_like`). + options : dict + optimizer options with the following common settings (`keys`): + * 'max_function_evaluations' - maximum of function evaluations (`int`, default: `np.inf`), + * 'max_runtime' - maximal runtime to be allowed (`float`, default: `np.inf`), + * 'seed_rng' - seed for random number generation needed to be *explicitly* set (`int`); + and with the following particular settings (`keys`): + * 'n_individuals' - swarm (population) size, aka number of particles (`int`, default: `20`), + * 'cognition' - cognitive learning rate (`float`, default: `2.0`), + * 'society' - social learning rate (`float`, default: `2.0`), + * 'max_ratio_v' - maximal ratio of velocities w.r.t. search range (`float`, default: `0.2`), + * 'n_history' - number of past generations whose global-best positions are averaged (`int`, + default: `10`), + * 'history_weight' - acceleration weight for the history-mean term (`float`, default: `0.5`). + + Examples + -------- + Use the optimizer `HPSO` to minimize the well-known test function + `Rosenbrock `_: + + .. code-block:: python + :linenos: + + >>> import numpy + >>> from pypop7.benchmarks.base_functions import rosenbrock # function to be minimized + >>> from pypop7.optimizers.pso.hpso import HPSO + >>> problem = {'fitness_function': rosenbrock, # define problem arguments + ... 'ndim_problem': 2, + ... 'lower_boundary': -5.0*numpy.ones((2,)), + ... 'upper_boundary': 5.0*numpy.ones((2,))} + >>> options = {'max_function_evaluations': 5000, # set optimizer options + ... 'seed_rng': 2022} + >>> hpso = HPSO(problem, options) # initialize the optimizer class + >>> results = hpso.optimize() # run the optimization process + >>> # return the number of function evaluations and best-so-far fitness + >>> print(f"HPSO: {results['n_function_evaluations']}, {results['best_so_far_y']}") + HPSO: 5000, 3.111603118059686e-06 + + Attributes + ---------- + cognition : `float` + cognitive learning rate, aka acceleration coefficient. + history_weight : `float` + weight on the velocity term toward the mean of recent global-best positions. + max_ratio_v : `float` + maximal ratio of velocities w.r.t. search range. + n_history : `int` + length of the global-best history buffer (generations). + n_individuals : `int` + swarm (population) size, aka number of particles. + society : `float` + social learning rate, aka acceleration coefficient. + + References + ---------- + Kennedy, J. and Eberhart, R., 1995, November. + Particle swarm optimization. + In Proceedings of International Conference on Neural Networks (pp. 1942-1948). IEEE. + https://ieeexplore.ieee.org/document/488968 + """ + def __init__(self, problem, options): + PSO.__init__(self, problem, options) + self.n_history = options.get('n_history', 10) + assert self.n_history >= 1 + self.history_weight = options.get('history_weight', 0.5) + assert self.history_weight >= 0.0 + self._g_history = deque(maxlen=self.n_history) + + def initialize(self, args=None): + v, x, y, p_x, p_y, n_x = PSO.initialize(self, args) + self._g_history.clear() + self._g_history.append(np.copy(p_x[np.argmin(p_y)])) + return v, x, y, p_x, p_y, n_x + + def iterate(self, v=None, x=None, y=None, p_x=None, p_y=None, n_x=None, args=None): + # Mean of stored global-best positions from prior completed generations (HPSO extra term). + # If history was cleared unexpectedly, fall back to the current global best (same as n_x target). + if len(self._g_history) == 0: + g_bar = p_x[np.argmin(p_y)] + else: + g_bar = np.mean(np.stack(tuple(self._g_history), axis=0), axis=0) + # One synchronous generation: update every particle once (same order as SPSO). + for i in range(self.n_individuals): + if self._check_terminations(): + return v, x, y, p_x, p_y, n_x + n_x[i] = p_x[np.argmin(p_y)] # online update within global topology + # Stochastic weights per dimension, same style as standard PSO. + cognition_rand = self.rng_optimization.uniform(size=(self.ndim_problem,)) + society_rand = self.rng_optimization.uniform(size=(self.ndim_problem,)) + history_rand = self.rng_optimization.uniform(size=(self.ndim_problem,)) + # Linear inertia schedule (Shi & Eberhart); index matches SPSO (incremented after the generation). + w = self._w[min(self._n_generations, len(self._w) - 1)] + v[i] = (w*v[i] + # inertia + self.cognition*cognition_rand*(p_x[i] - x[i]) + # cognitive (personal best) + self.society*society_rand*(n_x[i] - x[i]) + # social (global best this step) + self.history_weight*history_rand*(g_bar - x[i])) # pull toward mean of past global bests + v[i] = np.clip(v[i], self._min_v, self._max_v) + x[i] += v[i] # position update + if self.is_bound: + x[i] = np.clip(x[i], self.lower_boundary, self.upper_boundary) + y[i] = self._evaluate_fitness(x[i], args) # fitness evaluation + if y[i] < p_y[i]: # online update + p_x[i], p_y[i] = x[i], y[i] + # Record this generation's global best for future g_bar averages (deque drops oldest beyond n_history). + self._g_history.append(np.copy(p_x[np.argmin(p_y)])) + self._n_generations += 1 + return v, x, y, p_x, p_y, n_x diff --git a/pypop7/optimizers/pso/test_hpso.py b/pypop7/optimizers/pso/test_hpso.py new file mode 100644 index 000000000..ee997d1a3 --- /dev/null +++ b/pypop7/optimizers/pso/test_hpso.py @@ -0,0 +1,14 @@ +def test_optimize(): + import numpy # engine for numerical computing + from pypop7.benchmarks.base_functions import rosenbrock # function to be minimized + from pypop7.optimizers.pso.hpso import HPSO + problem = {'fitness_function': rosenbrock, # to define problem arguments + 'ndim_problem': 2, + 'lower_boundary': -5.0 * numpy.ones((2,)), + 'upper_boundary': 5.0 * numpy.ones((2,))} + options = {'max_function_evaluations': 5000, # to set optimizer options + 'seed_rng': 2022} # global step-size may need to be tuned for optimality + hpso = HPSO(problem, options) # to initialize the black-box optimizer class + results = hpso.optimize() # to run its optimization/evolution process + assert results['n_function_evaluations'] == 5000 + assert results['best_so_far_y'] < 1.0 From b32e907d30cc40d6e8089274e4a04755666862c4 Mon Sep 17 00:00:00 2001 From: "Wojciech Mackowiak (Nokia)" Date: Tue, 31 Mar 2026 14:50:13 +0200 Subject: [PATCH 2/4] Second better functions --- pypop7/optimizers/pso/__init__.py | 4 +- pypop7/optimizers/pso/mdpso.py | 160 ++++++++++++++++++++++++++++ pypop7/optimizers/pso/test_mdpso.py | 14 +++ 3 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 pypop7/optimizers/pso/mdpso.py create mode 100644 pypop7/optimizers/pso/test_mdpso.py diff --git a/pypop7/optimizers/pso/__init__.py b/pypop7/optimizers/pso/__init__.py index 4215a59ed..71d8909c0 100644 --- a/pypop7/optimizers/pso/__init__.py +++ b/pypop7/optimizers/pso/__init__.py @@ -6,6 +6,7 @@ from pypop7.optimizers.pso.ipso import IPSO from pypop7.optimizers.pso.ccpso2 import CCPSO2 from pypop7.optimizers.pso.hpso import HPSO +from pypop7.optimizers.pso.mdpso import MDPSO __all__ = [PSO, # Particle Swarm Optimizer [1995] @@ -15,4 +16,5 @@ CLPSO, # Comprehensive Learning Particle Swarm Optimizer (CLPSO) IPSO, # Incremental Particle Swarm Optimizer (IPSO) CCPSO2, # Cooperative Coevolving Particle Swarm Optimizer (CCPSO2) - HPSO] # History-based Particle Swarm Optimizer (HPSO) + HPSO, # History-based Particle Swarm Optimizer (HPSO) + MDPSO] # Multimodal Delayed Particle Swarm Optimizer (MDPSO) diff --git a/pypop7/optimizers/pso/mdpso.py b/pypop7/optimizers/pso/mdpso.py new file mode 100644 index 000000000..2c7d88cf3 --- /dev/null +++ b/pypop7/optimizers/pso/mdpso.py @@ -0,0 +1,160 @@ +import numpy as np # engine for numerical computing + +from pypop7.optimizers.pso.pso import PSO # abstract class of all particle swarm optimizer (PSO) classes + + +class MDPSO(PSO): + """Multimodal Delayed Particle Swarm Optimizer (MDPSO). + + Following the velocity model in Song, Wang and Zou (multimodal delayed PSO), the update adds two + stochastic *delayed* terms built from randomly chosen past personal-best and global-best positions + stored over previous generations. The strengths ``s_i``, ``s_g`` of those terms depend on the + evolutionary factor (swarm dispersion) and the evolutionary state; acceleration coefficients follow + the PSO-TVAC schedule as in the paper (optional fixed coefficients via ``use_tvac``). + + Parameters + ---------- + problem : dict + problem arguments with the following common settings (`keys`): + * 'fitness_function' - objective function to be **minimized** (`func`), + * 'ndim_problem' - number of dimensionality (`int`), + * 'upper_boundary' - upper boundary of search range (`array_like`), + * 'lower_boundary' - lower boundary of search range (`array_like`). + options : dict + optimizer options with the following common settings (`keys`): + * 'max_function_evaluations' - maximum of function evaluations (`int`, default: `np.inf`), + * 'max_runtime' - maximal runtime to be allowed (`float`, default: `np.inf`), + * 'seed_rng' - seed for random number generation needed to be *explicitly* set (`int`); + and with the following particular settings (`keys`): + * 'n_individuals' - swarm (population) size, aka number of particles (`int`, default: `20`), + * 'cognition' - cognitive learning rate when ``use_tvac`` is ``False`` (`float`, default: `2.0`), + * 'society' - social learning rate when ``use_tvac`` is ``False`` (`float`, default: `2.0`), + * 'max_ratio_v' - maximal ratio of velocities w.r.t. search range (`float`, default: `0.2`), + * 'use_tvac' - if ``True``, use time-varying ``c1``, ``c2`` (PSO-TVAC) as in the paper + (`bool`, default: ``True``). + + References + ---------- + Song, B., Wang, Z. and Zou, L., On Global Smooth Path Planning for Mobile Robots Using A Novel + Multimodal Delayed PSO Algorithm. Brunel University Research Archive: + https://bura.brunel.ac.uk/bitstream/2438/14201/1/FullText.pdf + + Kennedy, J. and Eberhart, R., 1995. Particle swarm optimization. IEEE ICNN. + """ + def __init__(self, problem, options): + PSO.__init__(self, problem, options) + self.use_tvac = options.get('use_tvac', True) + self._pg_hist = [] # list of global-best position vectors, one per stored generation + self._pp_hist = [] # list of (n_individuals, ndim) personal-best snapshots + + def initialize(self, args=None): + v, x, y, p_x, p_y, n_x = PSO.initialize(self, args) + self._pg_hist.clear() + self._pp_hist.clear() + g_idx = int(np.argmin(p_y)) + self._pg_hist.append(np.copy(p_x[g_idx])) + self._pp_hist.append(np.copy(p_x)) + return v, x, y, p_x, p_y, n_x + + def _mean_distances(self, x): + """Mean distance from each particle position to all other positions (eq. (6) structure).""" + s = self.n_individuals + if s <= 1: + return np.zeros((s,)) + d = np.empty((s,)) + for i in range(s): + idx = np.concatenate((np.arange(0, i), np.arange(i + 1, s))) + d[i] = np.mean(np.linalg.norm(x[i] - x[idx], axis=1)) + return d + + def _evolutionary_factor(self, x, p_y): + """Return Ef in [0, 1] (eq. (6)-(7)); degenerate spreads yield 0.5.""" + d = self._mean_distances(x) + d_min, d_max = float(np.min(d)), float(np.max(d)) + g_idx = int(np.argmin(p_y)) + idx_others = np.concatenate((np.arange(0, g_idx), np.arange(g_idx + 1, self.n_individuals))) + if len(idx_others) == 0: + return 0.5 + d_g = float(np.mean(np.linalg.norm(x[g_idx] - x[idx_others], axis=1))) + span = d_max - d_min + if span <= 1e-30: + return 0.5 + ef = (d_g - d_min) / span + return float(np.clip(ef, 0.0, 1.0)) + + @staticmethod + def _state_from_ef(ef): + """Map evolutionary factor to xi in {1,2,3,4} (eq. (8)).""" + if ef < 0.25: + return 1 + if ef < 0.5: + return 2 + if ef < 0.75: + return 3 + return 4 + + @staticmethod + def _delay_intensity(xi, ef): + """Table I: local/global delay strengths s_i, s_g.""" + if xi == 1: + return 0.0, 0.0 + if xi == 2: + return ef, 0.0 + if xi == 3: + return 0.0, ef + return ef, ef + + def _tvac_coefficients(self, k_iter): + """PSO-TVAC c1, c2 (eq. (3)-(4)) with c1i=2.5, c2i=0.5, c1f=0.5, c2f=2.5.""" + t_max = max(float(self._max_generations), 1.0) + t = float(k_iter) + c1 = (2.5 - 0.5) * (t_max - t) / t_max + 0.5 + c2 = (0.5 - 2.5) * (t_max - t) / t_max + 2.5 + return c1, c2 + + def iterate(self, v=None, x=None, y=None, p_x=None, p_y=None, n_x=None, args=None): + # Eq. (5): standard PSO terms + s_i*c1*r3*(p_i(k-tau_i)-x) + s_g*c2*r4*(p_g(k-tau_g)-x); + # delayed positions are drawn from stored pbest / gbest histories (random integer tau in [0, k]). + k = self._n_generations + ef = self._evolutionary_factor(x, p_y) + xi = self._state_from_ef(ef) + s_i, s_g = self._delay_intensity(xi, ef) + + if self.use_tvac: + c1, c2 = self._tvac_coefficients(k) + else: + c1, c2 = self.cognition, self.society + + w = self._w[min(k, len(self._w) - 1)] + + for i in range(self.n_individuals): + if self._check_terminations(): + return v, x, y, p_x, p_y, n_x + n_x[i] = p_x[np.argmin(p_y)] + r1 = self.rng_optimization.uniform(size=(self.ndim_problem,)) + r2 = self.rng_optimization.uniform(size=(self.ndim_problem,)) + r3 = self.rng_optimization.uniform(size=(self.ndim_problem,)) + r4 = self.rng_optimization.uniform(size=(self.ndim_problem,)) + # Random integer delays in [0, k] inclusive (paper: uniform on [0, k]). + tau_i = int(self.rng_optimization.integers(0, k + 1)) + tau_g = int(self.rng_optimization.integers(0, k + 1)) + p_delayed = self._pp_hist[k - tau_i][i] + g_delayed = self._pg_hist[k - tau_g] + v[i] = (w*v[i] + + c1*r1*(p_x[i] - x[i]) + + c2*r2*(n_x[i] - x[i]) + + s_i*c1*r3*(p_delayed - x[i]) + + s_g*c2*r4*(g_delayed - x[i])) + v[i] = np.clip(v[i], self._min_v, self._max_v) + x[i] += v[i] + if self.is_bound: + x[i] = np.clip(x[i], self.lower_boundary, self.upper_boundary) + y[i] = self._evaluate_fitness(x[i], args) + if y[i] < p_y[i]: + p_x[i], p_y[i] = x[i], y[i] + + g_idx = int(np.argmin(p_y)) + self._pg_hist.append(np.copy(p_x[g_idx])) + self._pp_hist.append(np.copy(p_x)) + self._n_generations += 1 + return v, x, y, p_x, p_y, n_x diff --git a/pypop7/optimizers/pso/test_mdpso.py b/pypop7/optimizers/pso/test_mdpso.py new file mode 100644 index 000000000..56aab8649 --- /dev/null +++ b/pypop7/optimizers/pso/test_mdpso.py @@ -0,0 +1,14 @@ +def test_optimize(): + import numpy # engine for numerical computing + from pypop7.benchmarks.base_functions import rosenbrock # function to be minimized + from pypop7.optimizers.pso.mdpso import MDPSO + problem = {'fitness_function': rosenbrock, # to define problem arguments + 'ndim_problem': 2, + 'lower_boundary': -5.0 * numpy.ones((2,)), + 'upper_boundary': 5.0 * numpy.ones((2,))} + options = {'max_function_evaluations': 5000, # to set optimizer options + 'seed_rng': 2022} + mdpso = MDPSO(problem, options) + results = mdpso.optimize() + assert results['n_function_evaluations'] == 5000 + assert results['best_so_far_y'] < 1.0 From 446e9c055786830704b2f51dd525ae4b884bcec0 Mon Sep 17 00:00:00 2001 From: mateuszlampert Date: Wed, 8 Apr 2026 00:37:50 +0200 Subject: [PATCH 3/4] fix: hpso -> ghepso & velocity eq update --- pypop7/optimizers/pso/__init__.py | 4 +- pypop7/optimizers/pso/ghepso.py | 148 ++++++++++++++++++ pypop7/optimizers/pso/hpso.py | 126 --------------- .../pso/{test_hpso.py => test_ghepso.py} | 6 +- 4 files changed, 153 insertions(+), 131 deletions(-) create mode 100644 pypop7/optimizers/pso/ghepso.py delete mode 100644 pypop7/optimizers/pso/hpso.py rename pypop7/optimizers/pso/{test_hpso.py => test_ghepso.py} (75%) diff --git a/pypop7/optimizers/pso/__init__.py b/pypop7/optimizers/pso/__init__.py index 71d8909c0..52fdad796 100644 --- a/pypop7/optimizers/pso/__init__.py +++ b/pypop7/optimizers/pso/__init__.py @@ -5,7 +5,7 @@ from pypop7.optimizers.pso.clpso import CLPSO from pypop7.optimizers.pso.ipso import IPSO from pypop7.optimizers.pso.ccpso2 import CCPSO2 -from pypop7.optimizers.pso.hpso import HPSO +from pypop7.optimizers.pso.ghepso import GHEPSO from pypop7.optimizers.pso.mdpso import MDPSO @@ -16,5 +16,5 @@ CLPSO, # Comprehensive Learning Particle Swarm Optimizer (CLPSO) IPSO, # Incremental Particle Swarm Optimizer (IPSO) CCPSO2, # Cooperative Coevolving Particle Swarm Optimizer (CCPSO2) - HPSO, # History-based Particle Swarm Optimizer (HPSO) + GHEPSO, # Group Historical Experience Particle Swarm Optimizer (GHEPSO) MDPSO] # Multimodal Delayed Particle Swarm Optimizer (MDPSO) diff --git a/pypop7/optimizers/pso/ghepso.py b/pypop7/optimizers/pso/ghepso.py new file mode 100644 index 000000000..1c5d5ba46 --- /dev/null +++ b/pypop7/optimizers/pso/ghepso.py @@ -0,0 +1,148 @@ +import numpy as np +from collections import deque + +from pypop7.optimizers.pso.pso import PSO + + +class GHEPSO(PSO): + """Group Historical Experience Particle Swarm Optimizer (GHEPSO). + + Implements the algorithm proposed in Yan, Li & Deng (2012). In standard + PSO the social term pulls each particle toward the *current* generation's + global best (gbest). GHEPSO replaces that single attractor with ``p_vd``, + the running mean of the last ``n_history`` completed-generation gbest + positions, so that particles are guided by accumulated group experience + rather than just the most recent optimum. + + Velocity update (eq. 5 in the paper): + + v_i(k+1) = w * v_i(k) + + c1 * r1 * (p_i(k) - x_i(k)) # cognitive: personal best + + c2 * r2 * (p_vd(k) - x_i(k)) # social: history mean + + where: + + p_vd(k) = [ p_g(k) + p_g(k-1) + ... + p_g(k-N+1) ] / N + + and ``p_g(k)`` is the gbest position recorded at the end of generation k. + When N=1 the algorithm reduces to standard PSO. + + The paper reports best results with N equal to the swarm size (default 20), + fixed acceleration coefficients c1 = c2 = 2.0, and fixed inertia weight + w = 0.7. The base-class linear inertia schedule (Shi & Eberhart, 1998) is + *not* part of GHEPSO; pass ``inertia_weight=0.7`` in ``options`` to match + the paper exactly. + + Parameters + ---------- + problem : dict + problem arguments with the following common settings (`keys`): + * 'fitness_function' - objective function to be **minimized** (`func`), + * 'ndim_problem' - number of dimensionality (`int`), + * 'upper_boundary' - upper boundary of search range (`array_like`), + * 'lower_boundary' - lower boundary of search range (`array_like`). + options : dict + optimizer options with the following common settings (`keys`): + * 'max_function_evaluations' - maximum of function evaluations (`int`, default: `np.inf`), + * 'max_runtime' - maximal runtime to be allowed (`float`, default: `np.inf`), + * 'seed_rng' - seed for random number generation needed to be *explicitly* set (`int`); + and with the following particular settings (`keys`): + * 'n_individuals' - swarm (population) size, aka number of particles (`int`, default: `20`), + * 'cognition' - cognitive learning rate c1 (`float`, default: `2.0`), + * 'society' - social learning rate c2 applied to history mean (`float`, default: `2.0`), + * 'max_ratio_v' - maximal ratio of velocities w.r.t. search range (`float`, default: `0.2`), + * 'n_history' - number of past generations whose global-best positions are averaged, + N in the paper (`int`, default: `20`). + * 'inertia_weight' - fixed inertia weight w (`float`, default: `None`). + When None the base-class linear decay schedule is used. + Original GHEPSO does not use linear inertia schedule. + + Examples + -------- + Use the optimizer `GHEPSO` to minimize the well-known test function + `Rosenbrock `_: + + .. code-block:: python + :linenos: + + >>> import numpy + >>> from pypop7.benchmarks.base_functions import rosenbrock + >>> from pypop7.optimizers.pso.ghepso import GHEPSO + >>> problem = {'fitness_function': rosenbrock, + ... 'ndim_problem': 2, + ... 'lower_boundary': -5.0*numpy.ones((2,)), + ... 'upper_boundary': 5.0*numpy.ones((2,))} + >>> options = {'max_function_evaluations': 5000, + ... 'seed_rng': 2022, + ... 'inertia_weight': 0.7} + >>> ghepso = GHEPSO(problem, options) + >>> results = ghepso.optimize() + >>> print(f"GHEPSO: {results['n_function_evaluations']}, {results['best_so_far_y']}") + + Attributes + ---------- + cognition : `float` + Cognitive learning rate c1. + society : `float` + Social learning rate c2, applied to the history-mean target p_vd. + max_ratio_v : `float` + Maximal velocity as a fraction of the search range. + n_history : `int` + Length N of the gbest history buffer (number of generations). + n_individuals : `int` + Swarm size. + inertia_weight : `float` or None + Fixed inertia weight, or None to use the base-class schedule. + + References + ---------- + Yan, Z., Li, B. and Deng, C., 2012. + A PSO algorithm based on group history experience. + In Proceedings of the 10th World Congress on Intelligent Control and + Automation (pp. 4108-4112). IEEE. + https://ieeexplore.ieee.org/document/6359163 + """ + + + def __init__(self, problem, options): + PSO.__init__(self, problem, options) + self.n_history = options.get('n_history', 20) + assert self.n_history >= 1 + self.inertia_weight = options.get('inertia_weight', None) + self._g_history = deque(maxlen=self.n_history) + + def initialize(self, args=None): + v, x, y, p_x, p_y, n_x = PSO.initialize(self, args) + self._g_history.clear() + self._g_history.append(np.copy(p_x[np.argmin(p_y)])) + return v, x, y, p_x, p_y, n_x + + def iterate(self, v=None, x=None, y=None, p_x=None, p_y=None, n_x=None, args=None): + # mean of the last N global-best positions + p_vd = np.mean(np.stack(tuple(self._g_history), axis=0), axis=0) + + for i in range(self.n_individuals): + if self._check_terminations(): + return v, x, y, p_x, p_y, n_x + # Stochastic weights per dimension, same style as standard PSO. + cognition_rand = self.rng_optimization.uniform(size=(self.ndim_problem,)) + social_rand = self.rng_optimization.uniform(size=(self.ndim_problem,)) + # Linear inertia schedule (Shi & Eberhart); index matches SPSO (incremented after the generation). + w = self._w[min(self._n_generations, len(self._w) - 1)] + if self.inertia_weight is not None: + w = self.inertia_weight + + v[i] = (w*v[i] + # inertia + self.cognition*cognition_rand*(p_x[i] - x[i]) + # cognitive (personal best) + self.society*social_rand*(p_vd - x[i])) # pull toward mean of past global bests + v[i] = np.clip(v[i], self._min_v, self._max_v) + x[i] += v[i] # position update + if self.is_bound: + x[i] = np.clip(x[i], self.lower_boundary, self.upper_boundary) + y[i] = self._evaluate_fitness(x[i], args) # fitness evaluation + if y[i] < p_y[i]: # online update + p_x[i], p_y[i] = x[i], y[i] + # Record this generation's global best for future g_bar averages (deque drops oldest beyond n_history). + self._g_history.append(np.copy(p_x[np.argmin(p_y)])) + self._n_generations += 1 + return v, x, y, p_x, p_y, n_x diff --git a/pypop7/optimizers/pso/hpso.py b/pypop7/optimizers/pso/hpso.py deleted file mode 100644 index b5f135158..000000000 --- a/pypop7/optimizers/pso/hpso.py +++ /dev/null @@ -1,126 +0,0 @@ -import numpy as np # engine for numerical computing -from collections import deque - -from pypop7.optimizers.pso.pso import PSO # abstract class of all particle swarm optimizer (PSO) classes - - -class HPSO(PSO): - """History-based Particle Swarm Optimizer (HPSO). - - Variant of standard global-topology PSO with an extra velocity term pulling each particle toward the - mean of the swarm global-best positions recorded over the last ``n_history`` completed generations. - - Parameters - ---------- - problem : dict - problem arguments with the following common settings (`keys`): - * 'fitness_function' - objective function to be **minimized** (`func`), - * 'ndim_problem' - number of dimensionality (`int`), - * 'upper_boundary' - upper boundary of search range (`array_like`), - * 'lower_boundary' - lower boundary of search range (`array_like`). - options : dict - optimizer options with the following common settings (`keys`): - * 'max_function_evaluations' - maximum of function evaluations (`int`, default: `np.inf`), - * 'max_runtime' - maximal runtime to be allowed (`float`, default: `np.inf`), - * 'seed_rng' - seed for random number generation needed to be *explicitly* set (`int`); - and with the following particular settings (`keys`): - * 'n_individuals' - swarm (population) size, aka number of particles (`int`, default: `20`), - * 'cognition' - cognitive learning rate (`float`, default: `2.0`), - * 'society' - social learning rate (`float`, default: `2.0`), - * 'max_ratio_v' - maximal ratio of velocities w.r.t. search range (`float`, default: `0.2`), - * 'n_history' - number of past generations whose global-best positions are averaged (`int`, - default: `10`), - * 'history_weight' - acceleration weight for the history-mean term (`float`, default: `0.5`). - - Examples - -------- - Use the optimizer `HPSO` to minimize the well-known test function - `Rosenbrock `_: - - .. code-block:: python - :linenos: - - >>> import numpy - >>> from pypop7.benchmarks.base_functions import rosenbrock # function to be minimized - >>> from pypop7.optimizers.pso.hpso import HPSO - >>> problem = {'fitness_function': rosenbrock, # define problem arguments - ... 'ndim_problem': 2, - ... 'lower_boundary': -5.0*numpy.ones((2,)), - ... 'upper_boundary': 5.0*numpy.ones((2,))} - >>> options = {'max_function_evaluations': 5000, # set optimizer options - ... 'seed_rng': 2022} - >>> hpso = HPSO(problem, options) # initialize the optimizer class - >>> results = hpso.optimize() # run the optimization process - >>> # return the number of function evaluations and best-so-far fitness - >>> print(f"HPSO: {results['n_function_evaluations']}, {results['best_so_far_y']}") - HPSO: 5000, 3.111603118059686e-06 - - Attributes - ---------- - cognition : `float` - cognitive learning rate, aka acceleration coefficient. - history_weight : `float` - weight on the velocity term toward the mean of recent global-best positions. - max_ratio_v : `float` - maximal ratio of velocities w.r.t. search range. - n_history : `int` - length of the global-best history buffer (generations). - n_individuals : `int` - swarm (population) size, aka number of particles. - society : `float` - social learning rate, aka acceleration coefficient. - - References - ---------- - Kennedy, J. and Eberhart, R., 1995, November. - Particle swarm optimization. - In Proceedings of International Conference on Neural Networks (pp. 1942-1948). IEEE. - https://ieeexplore.ieee.org/document/488968 - """ - def __init__(self, problem, options): - PSO.__init__(self, problem, options) - self.n_history = options.get('n_history', 10) - assert self.n_history >= 1 - self.history_weight = options.get('history_weight', 0.5) - assert self.history_weight >= 0.0 - self._g_history = deque(maxlen=self.n_history) - - def initialize(self, args=None): - v, x, y, p_x, p_y, n_x = PSO.initialize(self, args) - self._g_history.clear() - self._g_history.append(np.copy(p_x[np.argmin(p_y)])) - return v, x, y, p_x, p_y, n_x - - def iterate(self, v=None, x=None, y=None, p_x=None, p_y=None, n_x=None, args=None): - # Mean of stored global-best positions from prior completed generations (HPSO extra term). - # If history was cleared unexpectedly, fall back to the current global best (same as n_x target). - if len(self._g_history) == 0: - g_bar = p_x[np.argmin(p_y)] - else: - g_bar = np.mean(np.stack(tuple(self._g_history), axis=0), axis=0) - # One synchronous generation: update every particle once (same order as SPSO). - for i in range(self.n_individuals): - if self._check_terminations(): - return v, x, y, p_x, p_y, n_x - n_x[i] = p_x[np.argmin(p_y)] # online update within global topology - # Stochastic weights per dimension, same style as standard PSO. - cognition_rand = self.rng_optimization.uniform(size=(self.ndim_problem,)) - society_rand = self.rng_optimization.uniform(size=(self.ndim_problem,)) - history_rand = self.rng_optimization.uniform(size=(self.ndim_problem,)) - # Linear inertia schedule (Shi & Eberhart); index matches SPSO (incremented after the generation). - w = self._w[min(self._n_generations, len(self._w) - 1)] - v[i] = (w*v[i] + # inertia - self.cognition*cognition_rand*(p_x[i] - x[i]) + # cognitive (personal best) - self.society*society_rand*(n_x[i] - x[i]) + # social (global best this step) - self.history_weight*history_rand*(g_bar - x[i])) # pull toward mean of past global bests - v[i] = np.clip(v[i], self._min_v, self._max_v) - x[i] += v[i] # position update - if self.is_bound: - x[i] = np.clip(x[i], self.lower_boundary, self.upper_boundary) - y[i] = self._evaluate_fitness(x[i], args) # fitness evaluation - if y[i] < p_y[i]: # online update - p_x[i], p_y[i] = x[i], y[i] - # Record this generation's global best for future g_bar averages (deque drops oldest beyond n_history). - self._g_history.append(np.copy(p_x[np.argmin(p_y)])) - self._n_generations += 1 - return v, x, y, p_x, p_y, n_x diff --git a/pypop7/optimizers/pso/test_hpso.py b/pypop7/optimizers/pso/test_ghepso.py similarity index 75% rename from pypop7/optimizers/pso/test_hpso.py rename to pypop7/optimizers/pso/test_ghepso.py index ee997d1a3..616238555 100644 --- a/pypop7/optimizers/pso/test_hpso.py +++ b/pypop7/optimizers/pso/test_ghepso.py @@ -1,14 +1,14 @@ def test_optimize(): import numpy # engine for numerical computing from pypop7.benchmarks.base_functions import rosenbrock # function to be minimized - from pypop7.optimizers.pso.hpso import HPSO + from pypop7.optimizers.pso.ghepso import GHEPSO problem = {'fitness_function': rosenbrock, # to define problem arguments 'ndim_problem': 2, 'lower_boundary': -5.0 * numpy.ones((2,)), 'upper_boundary': 5.0 * numpy.ones((2,))} options = {'max_function_evaluations': 5000, # to set optimizer options 'seed_rng': 2022} # global step-size may need to be tuned for optimality - hpso = HPSO(problem, options) # to initialize the black-box optimizer class - results = hpso.optimize() # to run its optimization/evolution process + ghepso = GHEPSO(problem, options) # to initialize the black-box optimizer class + results = ghepso.optimize() # to run its optimization/evolution process assert results['n_function_evaluations'] == 5000 assert results['best_so_far_y'] < 1.0 From db0dfee4e686ae19db8415d1439a9a103ac89675 Mon Sep 17 00:00:00 2001 From: mateuszlampert Date: Wed, 8 Apr 2026 01:27:18 +0200 Subject: [PATCH 4/4] chore: parametrize tvac (MDPSO), enhance descriptions --- pypop7/optimizers/pso/mdpso.py | 55 +++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/pypop7/optimizers/pso/mdpso.py b/pypop7/optimizers/pso/mdpso.py index 2c7d88cf3..206dfebfb 100644 --- a/pypop7/optimizers/pso/mdpso.py +++ b/pypop7/optimizers/pso/mdpso.py @@ -6,11 +6,19 @@ class MDPSO(PSO): """Multimodal Delayed Particle Swarm Optimizer (MDPSO). - Following the velocity model in Song, Wang and Zou (multimodal delayed PSO), the update adds two - stochastic *delayed* terms built from randomly chosen past personal-best and global-best positions - stored over previous generations. The strengths ``s_i``, ``s_g`` of those terms depend on the - evolutionary factor (swarm dispersion) and the evolutionary state; acceleration coefficients follow - the PSO-TVAC schedule as in the paper (optional fixed coefficients via ``use_tvac``). + Following the velocity model of Song, Wang and Zou, the update adds two stochastic *delayed* terms + built from randomly chosen past personal-best and global-best positions stored over previous + generations. Concretely, at generation ``k`` the velocity update reads: + + v_i(k+1) = w*v_i(k) + + c1*r1*(p_i(k) - x_i(k)) + + c2*r2*(p_g(k) - x_i(k)) + + s_i*c1*r3*(p_i(k - tau_i) - x_i(k)) + + s_g*c2*r4*(p_g(k - tau_g) - x_i(k)) + + where ``tau_i``, ``tau_g`` are random integer delays drawn uniformly from ``[0, k]``, and the + intensity factors ``s_i``, ``s_g`` are determined by the evolutionary state (swarm dispersion). + Acceleration coefficients follow the PSO-TVAC schedule by default (Ratnaweera et al., 2004). Parameters ---------- @@ -30,8 +38,13 @@ class MDPSO(PSO): * 'cognition' - cognitive learning rate when ``use_tvac`` is ``False`` (`float`, default: `2.0`), * 'society' - social learning rate when ``use_tvac`` is ``False`` (`float`, default: `2.0`), * 'max_ratio_v' - maximal ratio of velocities w.r.t. search range (`float`, default: `0.2`), - * 'use_tvac' - if ``True``, use time-varying ``c1``, ``c2`` (PSO-TVAC) as in the paper - (`bool`, default: ``True``). + * 'use_tvac' - if ``True``, use time-varying acceleration coefficients (PSO-TVAC) + as described in the paper, with ``c1`` decreasing from 2.5 to 0.5 + and ``c2`` increasing from 0.5 to 2.5 (`bool`, default: ``True``), + * 'c1i' - initial value of ``c1`` for PSO-TVAC (`float`, default: `2.5`), + * 'c1f' - final value of ``c1`` for PSO-TVAC (`float`, default: `0.5`), + * 'c2i' - initial value of ``c2`` for PSO-TVAC (`float`, default: `0.5`), + * 'c2f' - final value of ``c2`` for PSO-TVAC (`float`, default: `2.5`). References ---------- @@ -44,8 +57,12 @@ class MDPSO(PSO): def __init__(self, problem, options): PSO.__init__(self, problem, options) self.use_tvac = options.get('use_tvac', True) - self._pg_hist = [] # list of global-best position vectors, one per stored generation - self._pp_hist = [] # list of (n_individuals, ndim) personal-best snapshots + self.c1i = options.get('c1i', 2.5) + self.c1f = options.get('c1f', 0.5) + self.c2i = options.get('c2i', 0.5) + self.c2f = options.get('c2f', 2.5) + self._pg_hist = [] # list of global-best position vectors, one per generation + self._pp_hist = [] # list of (n_individuals, ndim_problem) personal-best snapshots, one per generation def initialize(self, args=None): v, x, y, p_x, p_y, n_x = PSO.initialize(self, args) @@ -96,25 +113,28 @@ def _state_from_ef(ef): @staticmethod def _delay_intensity(xi, ef): """Table I: local/global delay strengths s_i, s_g.""" - if xi == 1: + if xi == 1: # convergence: no delayed information return 0.0, 0.0 - if xi == 2: + if xi == 2: # exploitation: only local delayed term return ef, 0.0 - if xi == 3: + if xi == 3: # exploration: only global delayed term return 0.0, ef - return ef, ef + return ef, ef # jumping-out: both delayed terms def _tvac_coefficients(self, k_iter): """PSO-TVAC c1, c2 (eq. (3)-(4)) with c1i=2.5, c2i=0.5, c1f=0.5, c2f=2.5.""" t_max = max(float(self._max_generations), 1.0) t = float(k_iter) - c1 = (2.5 - 0.5) * (t_max - t) / t_max + 0.5 - c2 = (0.5 - 2.5) * (t_max - t) / t_max + 2.5 + c1 = (self.c1i - self.c1f) * (t_max - t) / t_max + self.c1f + c2 = (self.c2i - self.c2f) * (t_max - t) / t_max + self.c2f return c1, c2 def iterate(self, v=None, x=None, y=None, p_x=None, p_y=None, n_x=None, args=None): - # Eq. (5): standard PSO terms + s_i*c1*r3*(p_i(k-tau_i)-x) + s_g*c2*r4*(p_g(k-tau_g)-x); - # delayed positions are drawn from stored pbest / gbest histories (random integer tau in [0, k]). + # Velocity update (eq. 5): + # v_i += c1*r1*(pbest_i - x_i) + c2*r2*(gbest - x_i) + # + s_i*c1*r3*(pbest_i(k-tau_i) - x_i) + # + s_g*c2*r4*(gbest(k-tau_g) - x_i) + # Delays tau_i, tau_g are random integers uniform on [0, k] (paper, Section III-A). k = self._n_generations ef = self._evolutionary_factor(x, p_y) xi = self._state_from_ef(ef) @@ -135,7 +155,6 @@ def iterate(self, v=None, x=None, y=None, p_x=None, p_y=None, n_x=None, args=Non r2 = self.rng_optimization.uniform(size=(self.ndim_problem,)) r3 = self.rng_optimization.uniform(size=(self.ndim_problem,)) r4 = self.rng_optimization.uniform(size=(self.ndim_problem,)) - # Random integer delays in [0, k] inclusive (paper: uniform on [0, k]). tau_i = int(self.rng_optimization.integers(0, k + 1)) tau_g = int(self.rng_optimization.integers(0, k + 1)) p_delayed = self._pp_hist[k - tau_i][i]