Skip to content

Commit d22f895

Browse files
committed
Adding diffusion.
1 parent 5884544 commit d22f895

3 files changed

Lines changed: 460 additions & 0 deletions

File tree

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from .diffusion import (
2+
DiffusionActor,
3+
DiffusionAlgorithm,
4+
DiffusionMessage,
5+
NoDiffusionActor,
6+
create_diffusion_participant,
7+
create_diffusion_start,
8+
)
9+
from .economic_dispatch import (
10+
LinearCostEconomicDispatchDiffusionActor,
11+
ReservoirStorageDiffusionActor,
12+
)
13+
14+
__all__ = [
15+
"DiffusionActor",
16+
"NoDiffusionActor",
17+
"DiffusionMessage",
18+
"DiffusionAlgorithm",
19+
"create_diffusion_participant",
20+
"create_diffusion_start",
21+
"LinearCostEconomicDispatchDiffusionActor",
22+
"ReservoirStorageDiffusionActor",
23+
]
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
"""Diffusion algorithm (adapt-then-combine).
2+
3+
Each participant maintains a local price estimate λ and a power iterate φ over a
4+
scheduling horizon. At every iteration a participant
5+
6+
1. **adapts** its power iterate via a local gradient step
7+
``φ = λ - ε · ∇J(λ, data)``,
8+
2. broadcasts ``φ`` to its neighbours, and
9+
3. **combines** its own φ with all received φ's by unweighted averaging to form
10+
the next λ.
11+
12+
The update rule is:
13+
14+
.. math::
15+
16+
\\lambda^{k+1} = \\frac{1}{N+1} \\left( \\varphi^k +
17+
\\sum_{j \\in \\mathcal{N}} \\varphi_j^k \\right),
18+
\\qquad
19+
\\varphi^{k+1} = \\lambda^{k+1} - \\varepsilon \\, \\nabla J(\\lambda^{k+1}, \\text{data}).
20+
21+
The optional :class:`DiffusionActor` plug-in supplies ``∇J``; the default
22+
:class:`NoDiffusionActor` returns zero.
23+
"""
24+
25+
from __future__ import annotations
26+
27+
from dataclasses import dataclass
28+
from typing import TYPE_CHECKING, Any, Callable
29+
30+
import numpy as np
31+
32+
from ..core import DistributedAlgorithm, OptimizationMessage
33+
34+
if TYPE_CHECKING:
35+
from ...carrier.core import Carrier
36+
37+
38+
# ---------------------------------------------------------------------------
39+
# DiffusionActor hierarchy
40+
# ---------------------------------------------------------------------------
41+
42+
43+
class DiffusionActor:
44+
"""Optional plug-in that supplies the gradient term for the adapt step.
45+
46+
Subclass this to bias the diffusion iterates toward a local optimum
47+
(e.g. economic dispatch or battery storage scheduling).
48+
"""
49+
50+
def gradient_term(self, lam: np.ndarray, data: Any) -> np.ndarray | float:
51+
"""Return the gradient ``∇J(λ, data)`` for the current iterate *lam*.
52+
53+
:param lam: Current local price/λ estimate over the schedule.
54+
:param data: Auxiliary data forwarded from the start message.
55+
:returns: Additive gradient (default: 0).
56+
"""
57+
return 0
58+
59+
60+
class NoDiffusionActor(DiffusionActor):
61+
"""Neutral diffusion actor — gradient is identically zero."""
62+
63+
64+
# ---------------------------------------------------------------------------
65+
# Message types
66+
# ---------------------------------------------------------------------------
67+
68+
69+
@dataclass
70+
class DiffusionMessage(OptimizationMessage):
71+
"""Message exchanged between diffusion participants.
72+
73+
:param phi: Current power iterate φ of the sender.
74+
:param k: Current iteration counter.
75+
:param data: Auxiliary payload forwarded to :meth:`DiffusionActor.gradient_term`.
76+
:param initial: If ``True`` this is the kick-off message; recipients
77+
(re-)initialise their state.
78+
"""
79+
80+
phi: np.ndarray
81+
k: int
82+
data: Any
83+
initial: bool = False
84+
85+
86+
# ---------------------------------------------------------------------------
87+
# DiffusionAlgorithm
88+
# ---------------------------------------------------------------------------
89+
90+
91+
class DiffusionAlgorithm(DistributedAlgorithm):
92+
"""Distributed adapt-then-combine diffusion over a scheduling horizon.
93+
94+
:param finish_callback: Called with ``(algorithm, carrier)`` when
95+
:attr:`max_iter` is reached.
96+
:param diffusion_actor: Optional :class:`DiffusionActor` supplying the
97+
gradient. ``None`` → :class:`NoDiffusionActor`.
98+
:param initial_lam: Starting scalar (broadcast to all λ dimensions).
99+
:param epsilon: Gradient step size (ε).
100+
:param max_iter: Maximum number of diffusion iterations.
101+
:param horizon: Number of time steps in the schedule.
102+
"""
103+
104+
def __init__(
105+
self,
106+
finish_callback: Callable,
107+
diffusion_actor: DiffusionActor | None = None,
108+
initial_lam: float = 10.0,
109+
epsilon: float = 0.1,
110+
max_iter: int = 300,
111+
horizon: int = 24,
112+
) -> None:
113+
self.finish_callback = finish_callback
114+
self.actor: DiffusionActor = (
115+
diffusion_actor if diffusion_actor is not None else NoDiffusionActor()
116+
)
117+
self.initial_lam = initial_lam
118+
self.epsilon = epsilon
119+
self.max_iter = max_iter
120+
self.horizon = horizon
121+
122+
self._message_queue: dict[int, list[DiffusionMessage]] = {}
123+
self._first_message: bool = True
124+
self._k: int = 0
125+
self._lam: np.ndarray = np.array([1.0])
126+
self._phi: np.ndarray = np.array([1.0])
127+
128+
async def on_exchange_message(
129+
self,
130+
carrier: "Carrier",
131+
message_data: DiffusionMessage,
132+
meta: Any,
133+
) -> None:
134+
"""Process one incoming diffusion message."""
135+
neighbours = carrier.others("")
136+
137+
# --- Termination path ---
138+
if message_data.k >= self.max_iter:
139+
if self._first_message:
140+
return
141+
self.finish_callback(self, carrier)
142+
self._first_message = True
143+
self._message_queue.clear()
144+
return
145+
146+
# --- Initialisation path ---
147+
if self._first_message or message_data.initial:
148+
self._first_message = False
149+
self._k = 0
150+
self._lam = np.ones(len(message_data.phi)) * self.initial_lam
151+
152+
grad_J = self.actor.gradient_term(self._lam, message_data.data)
153+
self._phi = self._lam - self.epsilon * np.asarray(grad_J)
154+
155+
for addr in neighbours:
156+
carrier.send_to_other(
157+
DiffusionMessage(
158+
phi=self._phi.copy(),
159+
k=0,
160+
data=message_data.data,
161+
),
162+
addr,
163+
)
164+
165+
# --- Queue the message ---
166+
queue = self._message_queue.setdefault(message_data.k, [])
167+
queue.append(message_data)
168+
169+
# --- Advance if all neighbours have reported for this iteration ---
170+
if len(queue) == len(neighbours):
171+
# Combination: unweighted average of own φ and all received φ's
172+
n = len(queue) + 1
173+
lam_new = self._phi.copy()
174+
for m in queue:
175+
lam_new = lam_new + m.phi
176+
self._lam = lam_new / n
177+
178+
del self._message_queue[message_data.k]
179+
180+
# Adaptation
181+
grad_J = self.actor.gradient_term(self._lam, message_data.data)
182+
self._phi = self._lam - self.epsilon * np.asarray(grad_J)
183+
184+
self._k += 1
185+
186+
for addr in neighbours:
187+
carrier.send_to_other(
188+
DiffusionMessage(
189+
phi=self._phi.copy(),
190+
k=self._k,
191+
data=message_data.data,
192+
),
193+
addr,
194+
)
195+
196+
197+
# ---------------------------------------------------------------------------
198+
# Factories
199+
# ---------------------------------------------------------------------------
200+
201+
202+
def create_diffusion_participant(
203+
finish_callback: Callable,
204+
diffusion_actor: DiffusionActor | None = None,
205+
initial_lam: float = 10.0,
206+
epsilon: float = 0.1,
207+
max_iter: int = 300,
208+
horizon: int = 24,
209+
) -> DiffusionAlgorithm:
210+
"""Create a :class:`DiffusionAlgorithm` participant.
211+
212+
:param finish_callback: ``(algorithm, carrier) -> None`` — called when done.
213+
:param diffusion_actor: Optional gradient actor. ``None`` → no gradient.
214+
:param initial_lam: Initial λ scalar.
215+
:param epsilon: Gradient step size.
216+
:param max_iter: Maximum iterations.
217+
:param horizon: Number of schedule time steps.
218+
"""
219+
return DiffusionAlgorithm(
220+
finish_callback=finish_callback,
221+
diffusion_actor=diffusion_actor,
222+
initial_lam=initial_lam,
223+
epsilon=epsilon,
224+
max_iter=max_iter,
225+
horizon=horizon,
226+
)
227+
228+
229+
def create_diffusion_start(
230+
initial_lam: float,
231+
data: Any = None,
232+
) -> DiffusionMessage:
233+
"""Create the initial kick-off message for a diffusion run.
234+
235+
:param initial_lam: Starting scalar; broadcast to all λ dimensions.
236+
:param data: Auxiliary payload forwarded to each participant's
237+
:meth:`DiffusionActor.gradient_term`.
238+
:returns: A :class:`DiffusionMessage` with ``initial=True``.
239+
"""
240+
return DiffusionMessage(
241+
phi=np.array([initial_lam]),
242+
k=0,
243+
data=data,
244+
initial=True,
245+
)

0 commit comments

Comments
 (0)