Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pypop7/benchmarks/base_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,6 @@ def step(x):
x = squeeze_and_check(x)
y = np.sum(np.square(np.floor(x + 0.5)))
return y


from pypop7.benchmarks.class_of_base_functions import BaseFunction # noqa: F401, E402
6 changes: 5 additions & 1 deletion pypop7/optimizers/pso/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from pypop7.optimizers.pso.clpso import CLPSO
from pypop7.optimizers.pso.ipso import IPSO
from pypop7.optimizers.pso.ccpso2 import CCPSO2
from pypop7.optimizers.pso.ghepso import GHEPSO
from pypop7.optimizers.pso.mdpso import MDPSO


__all__ = [PSO, # Particle Swarm Optimizer [1995]
Expand All @@ -13,4 +15,6 @@
CPSO, # Cooperative Particle Swarm Optimizer (CPSO)
CLPSO, # Comprehensive Learning Particle Swarm Optimizer (CLPSO)
IPSO, # Incremental Particle Swarm Optimizer (IPSO)
CCPSO2] # Cooperative Coevolving Particle Swarm Optimizer (CCPSO2)
CCPSO2, # Cooperative Coevolving Particle Swarm Optimizer (CCPSO2)
GHEPSO, # Group Historical Experience Particle Swarm Optimizer (GHEPSO)
MDPSO] # Multimodal Delayed Particle Swarm Optimizer (MDPSO)
148 changes: 148 additions & 0 deletions pypop7/optimizers/pso/ghepso.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import numpy as np
from collections import deque

from pypop7.optimizers.pso.pso import PSO


class GHEPSO(PSO):
"""Group Historical Experience Particle Swarm Optimizer (GHEPSO).

Implements the algorithm proposed in Yan, Li & Deng (2012). In standard
PSO the social term pulls each particle toward the *current* generation's
global best (gbest). GHEPSO replaces that single attractor with ``p_vd``,
the running mean of the last ``n_history`` completed-generation gbest
positions, so that particles are guided by accumulated group experience
rather than just the most recent optimum.

Velocity update (eq. 5 in the paper):

v_i(k+1) = w * v_i(k)
+ c1 * r1 * (p_i(k) - x_i(k)) # cognitive: personal best
+ c2 * r2 * (p_vd(k) - x_i(k)) # social: history mean

where:

p_vd(k) = [ p_g(k) + p_g(k-1) + ... + p_g(k-N+1) ] / N

and ``p_g(k)`` is the gbest position recorded at the end of generation k.
When N=1 the algorithm reduces to standard PSO.

The paper reports best results with N equal to the swarm size (default 20),
fixed acceleration coefficients c1 = c2 = 2.0, and fixed inertia weight
w = 0.7. The base-class linear inertia schedule (Shi & Eberhart, 1998) is
*not* part of GHEPSO; pass ``inertia_weight=0.7`` in ``options`` to match
the paper exactly.

Parameters
----------
problem : dict
problem arguments with the following common settings (`keys`):
* 'fitness_function' - objective function to be **minimized** (`func`),
* 'ndim_problem' - number of dimensionality (`int`),
* 'upper_boundary' - upper boundary of search range (`array_like`),
* 'lower_boundary' - lower boundary of search range (`array_like`).
options : dict
optimizer options with the following common settings (`keys`):
* 'max_function_evaluations' - maximum of function evaluations (`int`, default: `np.inf`),
* 'max_runtime' - maximal runtime to be allowed (`float`, default: `np.inf`),
* 'seed_rng' - seed for random number generation needed to be *explicitly* set (`int`);
and with the following particular settings (`keys`):
* 'n_individuals' - swarm (population) size, aka number of particles (`int`, default: `20`),
* 'cognition' - cognitive learning rate c1 (`float`, default: `2.0`),
* 'society' - social learning rate c2 applied to history mean (`float`, default: `2.0`),
* 'max_ratio_v' - maximal ratio of velocities w.r.t. search range (`float`, default: `0.2`),
* 'n_history' - number of past generations whose global-best positions are averaged,
N in the paper (`int`, default: `20`).
* 'inertia_weight' - fixed inertia weight w (`float`, default: `None`).
When None the base-class linear decay schedule is used.
Original GHEPSO does not use linear inertia schedule.

Examples
--------
Use the optimizer `GHEPSO` to minimize the well-known test function
`Rosenbrock <http://en.wikipedia.org/wiki/Rosenbrock_function>`_:

.. code-block:: python
:linenos:

>>> import numpy
>>> from pypop7.benchmarks.base_functions import rosenbrock
>>> from pypop7.optimizers.pso.ghepso import GHEPSO
>>> problem = {'fitness_function': rosenbrock,
... 'ndim_problem': 2,
... 'lower_boundary': -5.0*numpy.ones((2,)),
... 'upper_boundary': 5.0*numpy.ones((2,))}
>>> options = {'max_function_evaluations': 5000,
... 'seed_rng': 2022,
... 'inertia_weight': 0.7}
>>> ghepso = GHEPSO(problem, options)
>>> results = ghepso.optimize()
>>> print(f"GHEPSO: {results['n_function_evaluations']}, {results['best_so_far_y']}")

Attributes
----------
cognition : `float`
Cognitive learning rate c1.
society : `float`
Social learning rate c2, applied to the history-mean target p_vd.
max_ratio_v : `float`
Maximal velocity as a fraction of the search range.
n_history : `int`
Length N of the gbest history buffer (number of generations).
n_individuals : `int`
Swarm size.
inertia_weight : `float` or None
Fixed inertia weight, or None to use the base-class schedule.

References
----------
Yan, Z., Li, B. and Deng, C., 2012.
A PSO algorithm based on group history experience.
In Proceedings of the 10th World Congress on Intelligent Control and
Automation (pp. 4108-4112). IEEE.
https://ieeexplore.ieee.org/document/6359163
"""


def __init__(self, problem, options):
PSO.__init__(self, problem, options)
self.n_history = options.get('n_history', 20)
assert self.n_history >= 1
self.inertia_weight = options.get('inertia_weight', None)
self._g_history = deque(maxlen=self.n_history)

def initialize(self, args=None):
v, x, y, p_x, p_y, n_x = PSO.initialize(self, args)
self._g_history.clear()
self._g_history.append(np.copy(p_x[np.argmin(p_y)]))
return v, x, y, p_x, p_y, n_x

def iterate(self, v=None, x=None, y=None, p_x=None, p_y=None, n_x=None, args=None):
# mean of the last N global-best positions
p_vd = np.mean(np.stack(tuple(self._g_history), axis=0), axis=0)

for i in range(self.n_individuals):
if self._check_terminations():
return v, x, y, p_x, p_y, n_x
# Stochastic weights per dimension, same style as standard PSO.
cognition_rand = self.rng_optimization.uniform(size=(self.ndim_problem,))
social_rand = self.rng_optimization.uniform(size=(self.ndim_problem,))
# Linear inertia schedule (Shi & Eberhart); index matches SPSO (incremented after the generation).
w = self._w[min(self._n_generations, len(self._w) - 1)]
if self.inertia_weight is not None:
w = self.inertia_weight

v[i] = (w*v[i] + # inertia
self.cognition*cognition_rand*(p_x[i] - x[i]) + # cognitive (personal best)
self.society*social_rand*(p_vd - x[i])) # pull toward mean of past global bests
v[i] = np.clip(v[i], self._min_v, self._max_v)
x[i] += v[i] # position update
if self.is_bound:
x[i] = np.clip(x[i], self.lower_boundary, self.upper_boundary)
y[i] = self._evaluate_fitness(x[i], args) # fitness evaluation
if y[i] < p_y[i]: # online update
p_x[i], p_y[i] = x[i], y[i]
# Record this generation's global best for future g_bar averages (deque drops oldest beyond n_history).
self._g_history.append(np.copy(p_x[np.argmin(p_y)]))
self._n_generations += 1
return v, x, y, p_x, p_y, n_x
179 changes: 179 additions & 0 deletions pypop7/optimizers/pso/mdpso.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import numpy as np # engine for numerical computing

from pypop7.optimizers.pso.pso import PSO # abstract class of all particle swarm optimizer (PSO) classes


class MDPSO(PSO):
"""Multimodal Delayed Particle Swarm Optimizer (MDPSO).

Following the velocity model of Song, Wang and Zou, the update adds two stochastic *delayed* terms
built from randomly chosen past personal-best and global-best positions stored over previous
generations. Concretely, at generation ``k`` the velocity update reads:

v_i(k+1) = w*v_i(k)
+ c1*r1*(p_i(k) - x_i(k))
+ c2*r2*(p_g(k) - x_i(k))
+ s_i*c1*r3*(p_i(k - tau_i) - x_i(k))
+ s_g*c2*r4*(p_g(k - tau_g) - x_i(k))

where ``tau_i``, ``tau_g`` are random integer delays drawn uniformly from ``[0, k]``, and the
intensity factors ``s_i``, ``s_g`` are determined by the evolutionary state (swarm dispersion).
Acceleration coefficients follow the PSO-TVAC schedule by default (Ratnaweera et al., 2004).

Parameters
----------
problem : dict
problem arguments with the following common settings (`keys`):
* 'fitness_function' - objective function to be **minimized** (`func`),
* 'ndim_problem' - number of dimensionality (`int`),
* 'upper_boundary' - upper boundary of search range (`array_like`),
* 'lower_boundary' - lower boundary of search range (`array_like`).
options : dict
optimizer options with the following common settings (`keys`):
* 'max_function_evaluations' - maximum of function evaluations (`int`, default: `np.inf`),
* 'max_runtime' - maximal runtime to be allowed (`float`, default: `np.inf`),
* 'seed_rng' - seed for random number generation needed to be *explicitly* set (`int`);
and with the following particular settings (`keys`):
* 'n_individuals' - swarm (population) size, aka number of particles (`int`, default: `20`),
* 'cognition' - cognitive learning rate when ``use_tvac`` is ``False`` (`float`, default: `2.0`),
* 'society' - social learning rate when ``use_tvac`` is ``False`` (`float`, default: `2.0`),
* 'max_ratio_v' - maximal ratio of velocities w.r.t. search range (`float`, default: `0.2`),
* 'use_tvac' - if ``True``, use time-varying acceleration coefficients (PSO-TVAC)
as described in the paper, with ``c1`` decreasing from 2.5 to 0.5
and ``c2`` increasing from 0.5 to 2.5 (`bool`, default: ``True``),
* 'c1i' - initial value of ``c1`` for PSO-TVAC (`float`, default: `2.5`),
* 'c1f' - final value of ``c1`` for PSO-TVAC (`float`, default: `0.5`),
* 'c2i' - initial value of ``c2`` for PSO-TVAC (`float`, default: `0.5`),
* 'c2f' - final value of ``c2`` for PSO-TVAC (`float`, default: `2.5`).

References
----------
Song, B., Wang, Z. and Zou, L., On Global Smooth Path Planning for Mobile Robots Using A Novel
Multimodal Delayed PSO Algorithm. Brunel University Research Archive:
https://bura.brunel.ac.uk/bitstream/2438/14201/1/FullText.pdf

Kennedy, J. and Eberhart, R., 1995. Particle swarm optimization. IEEE ICNN.
"""
def __init__(self, problem, options):
PSO.__init__(self, problem, options)
self.use_tvac = options.get('use_tvac', True)
self.c1i = options.get('c1i', 2.5)
self.c1f = options.get('c1f', 0.5)
self.c2i = options.get('c2i', 0.5)
self.c2f = options.get('c2f', 2.5)
self._pg_hist = [] # list of global-best position vectors, one per generation
self._pp_hist = [] # list of (n_individuals, ndim_problem) personal-best snapshots, one per generation

def initialize(self, args=None):
v, x, y, p_x, p_y, n_x = PSO.initialize(self, args)
self._pg_hist.clear()
self._pp_hist.clear()
g_idx = int(np.argmin(p_y))
self._pg_hist.append(np.copy(p_x[g_idx]))
self._pp_hist.append(np.copy(p_x))
return v, x, y, p_x, p_y, n_x

def _mean_distances(self, x):
"""Mean distance from each particle position to all other positions (eq. (6) structure)."""
s = self.n_individuals
if s <= 1:
return np.zeros((s,))
d = np.empty((s,))
for i in range(s):
idx = np.concatenate((np.arange(0, i), np.arange(i + 1, s)))
d[i] = np.mean(np.linalg.norm(x[i] - x[idx], axis=1))
return d

def _evolutionary_factor(self, x, p_y):
"""Return Ef in [0, 1] (eq. (6)-(7)); degenerate spreads yield 0.5."""
d = self._mean_distances(x)
d_min, d_max = float(np.min(d)), float(np.max(d))
g_idx = int(np.argmin(p_y))
idx_others = np.concatenate((np.arange(0, g_idx), np.arange(g_idx + 1, self.n_individuals)))
if len(idx_others) == 0:
return 0.5
d_g = float(np.mean(np.linalg.norm(x[g_idx] - x[idx_others], axis=1)))
span = d_max - d_min
if span <= 1e-30:
return 0.5
ef = (d_g - d_min) / span
return float(np.clip(ef, 0.0, 1.0))

@staticmethod
def _state_from_ef(ef):
"""Map evolutionary factor to xi in {1,2,3,4} (eq. (8))."""
if ef < 0.25:
return 1
if ef < 0.5:
return 2
if ef < 0.75:
return 3
return 4

@staticmethod
def _delay_intensity(xi, ef):
"""Table I: local/global delay strengths s_i, s_g."""
if xi == 1: # convergence: no delayed information
return 0.0, 0.0
if xi == 2: # exploitation: only local delayed term
return ef, 0.0
if xi == 3: # exploration: only global delayed term
return 0.0, ef
return ef, ef # jumping-out: both delayed terms

def _tvac_coefficients(self, k_iter):
"""PSO-TVAC c1, c2 (eq. (3)-(4)) with c1i=2.5, c2i=0.5, c1f=0.5, c2f=2.5."""
t_max = max(float(self._max_generations), 1.0)
t = float(k_iter)
c1 = (self.c1i - self.c1f) * (t_max - t) / t_max + self.c1f
c2 = (self.c2i - self.c2f) * (t_max - t) / t_max + self.c2f
return c1, c2

def iterate(self, v=None, x=None, y=None, p_x=None, p_y=None, n_x=None, args=None):
# Velocity update (eq. 5):
# v_i += c1*r1*(pbest_i - x_i) + c2*r2*(gbest - x_i)
# + s_i*c1*r3*(pbest_i(k-tau_i) - x_i)
# + s_g*c2*r4*(gbest(k-tau_g) - x_i)
# Delays tau_i, tau_g are random integers uniform on [0, k] (paper, Section III-A).
k = self._n_generations
ef = self._evolutionary_factor(x, p_y)
xi = self._state_from_ef(ef)
s_i, s_g = self._delay_intensity(xi, ef)

if self.use_tvac:
c1, c2 = self._tvac_coefficients(k)
else:
c1, c2 = self.cognition, self.society

w = self._w[min(k, len(self._w) - 1)]

for i in range(self.n_individuals):
if self._check_terminations():
return v, x, y, p_x, p_y, n_x
n_x[i] = p_x[np.argmin(p_y)]
r1 = self.rng_optimization.uniform(size=(self.ndim_problem,))
r2 = self.rng_optimization.uniform(size=(self.ndim_problem,))
r3 = self.rng_optimization.uniform(size=(self.ndim_problem,))
r4 = self.rng_optimization.uniform(size=(self.ndim_problem,))
tau_i = int(self.rng_optimization.integers(0, k + 1))
tau_g = int(self.rng_optimization.integers(0, k + 1))
p_delayed = self._pp_hist[k - tau_i][i]
g_delayed = self._pg_hist[k - tau_g]
v[i] = (w*v[i] +
c1*r1*(p_x[i] - x[i]) +
c2*r2*(n_x[i] - x[i]) +
s_i*c1*r3*(p_delayed - x[i]) +
s_g*c2*r4*(g_delayed - x[i]))
v[i] = np.clip(v[i], self._min_v, self._max_v)
x[i] += v[i]
if self.is_bound:
x[i] = np.clip(x[i], self.lower_boundary, self.upper_boundary)
y[i] = self._evaluate_fitness(x[i], args)
if y[i] < p_y[i]:
p_x[i], p_y[i] = x[i], y[i]

g_idx = int(np.argmin(p_y))
self._pg_hist.append(np.copy(p_x[g_idx]))
self._pp_hist.append(np.copy(p_x))
self._n_generations += 1
return v, x, y, p_x, p_y, n_x
14 changes: 14 additions & 0 deletions pypop7/optimizers/pso/test_ghepso.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
def test_optimize():
import numpy # engine for numerical computing
from pypop7.benchmarks.base_functions import rosenbrock # function to be minimized
from pypop7.optimizers.pso.ghepso import GHEPSO
problem = {'fitness_function': rosenbrock, # to define problem arguments
'ndim_problem': 2,
'lower_boundary': -5.0 * numpy.ones((2,)),
'upper_boundary': 5.0 * numpy.ones((2,))}
options = {'max_function_evaluations': 5000, # to set optimizer options
'seed_rng': 2022} # global step-size may need to be tuned for optimality
ghepso = GHEPSO(problem, options) # to initialize the black-box optimizer class
results = ghepso.optimize() # to run its optimization/evolution process
assert results['n_function_evaluations'] == 5000
assert results['best_so_far_y'] < 1.0
14 changes: 14 additions & 0 deletions pypop7/optimizers/pso/test_mdpso.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
def test_optimize():
import numpy # engine for numerical computing
from pypop7.benchmarks.base_functions import rosenbrock # function to be minimized
from pypop7.optimizers.pso.mdpso import MDPSO
problem = {'fitness_function': rosenbrock, # to define problem arguments
'ndim_problem': 2,
'lower_boundary': -5.0 * numpy.ones((2,)),
'upper_boundary': 5.0 * numpy.ones((2,))}
options = {'max_function_evaluations': 5000, # to set optimizer options
'seed_rng': 2022}
mdpso = MDPSO(problem, options)
results = mdpso.optimize()
assert results['n_function_evaluations'] == 5000
assert results['best_so_far_y'] < 1.0