Skip to content

Commit d68a37b

Browse files
committed
Logic + docs for the event injection to simulate a server down
1 parent b7d85be commit d68a37b

10 files changed

Lines changed: 413 additions & 97 deletions

File tree

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
# Server Event Injection — End-to-End Design & Rationale
2+
3+
This document explains how **server-level events** (planned outages) are modeled and executed across all layers of the simulation stack. It complements the Edge Event Injection design.
4+
5+
---
6+
7+
## 1) Goals
8+
9+
* Hide outage semantics from the load balancer algorithms: **they see only the current set of edges**.
10+
* Keep **runtime cost O(1)** per transition (down/up).
11+
* Preserve determinism and fairness when servers rejoin.
12+
* Centralize event logic; avoid per-server coroutines and ad-hoc flags.
13+
14+
---
15+
16+
## 2) Participants (layers)
17+
18+
* **Schema / Validation (Pydantic)**: validates `EventInjection` objects (pairing, order, target existence).
19+
* **SimulationRunner**: builds runtimes; owns the **single shared** `OrderedDict[str, EdgeRuntime]` used by the LB (`_lb_out_edges`).
20+
* **EventInjectionRuntime**: central event engine; builds the **server timeline** and a **reverse index** `server_id → (edge_id, EdgeRuntime)`; mutates `_lb_out_edges` at runtime.
21+
* **LoadBalancerRuntime**: reads `_lb_out_edges` to select the next edge (RR / least-connections). **No outage logic inside.**
22+
* **EdgeRuntime (LB→Server edges)**: unaffected by server outages; disappears from the LB’s choice set while the server is down.
23+
* **ServerRuntime**: unaffected structurally; no extra checks for “am I down?”.
24+
* **SimPy Environment**: schedules the central outage coroutine.
25+
* **Metric Collector**: optional; observes effects but is not part of the mechanism.
26+
27+
---
28+
29+
## 3) Data & Structures
30+
31+
* **`_lb_out_edges: OrderedDict[str, EdgeRuntime]`**
32+
Single shared map of **currently routable** LB→server edges.
33+
34+
* Removal/Insertion/Move are **O(1)**.
35+
* Aliased into both `LoadBalancerRuntime` and `EventInjectionRuntime`.
36+
37+
* **`_servers_timeline: list[tuple[time, event_id, server_id, mark]]`**
38+
Absolute timestamps, sorted by `(time, mark == start, event_id, server_id)` so **END precedes START** when equal.
39+
40+
* **`_edge_by_server: dict[str, tuple[str, EdgeRuntime]]`**
41+
Reverse index built from `_lb_out_edges` at initialization.
42+
43+
---
44+
45+
## 4) Build-time Responsibilities
46+
47+
* **SimulationRunner**
48+
49+
1. Build LB and pass it `_lb_out_edges` (empty at first).
50+
2. Build edges; when wiring LB→Server, insert that edge into `_lb_out_edges`.
51+
3. Build `EventInjectionRuntime`, passing:
52+
53+
* validated `events`
54+
* `servers` and `edges` (IDs for sanity checks)
55+
* aliased `_lb_out_edges`
56+
57+
* **EventInjectionRuntime.**init****
58+
59+
* Partition events; construct ` _servers_timeline`.
60+
* Sort timeline (END before START at equal `time`).
61+
* Build ` _edge_by_server` by scanning `_lb_out_edges` (edge target → server\_id).
62+
63+
---
64+
65+
## 5) Run-time Responsibilities
66+
67+
* **EventInjectionRuntime.\_assign\_server\_state()**
68+
69+
* Iterate the server timeline with absolute→relative waits: `dt = t_event − last_t`, then `yield env.timeout(dt)`.
70+
* On `SERVER_DOWN` (START):
71+
`lb_out_edges.pop(edge_id, None)`
72+
* On `SERVER_UP` (END):
73+
74+
```
75+
lb_out_edges[edge_id] = edge_runtime
76+
lb_out_edges.move_to_end(edge_id) # fairness on rejoin
77+
```
78+
79+
* **LoadBalancerRuntime**
80+
81+
* For each request, read `_lb_out_edges` and apply the chosen algorithm. If a server is down, its edge simply **isn’t there**.
82+
83+
* **EdgeRuntime & ServerRuntime**
84+
85+
* No additional work: outage is reflected entirely by presence/absence of the LB→server edge.
86+
87+
---
88+
89+
## 6) Sequence Overview (all layers)
90+
91+
```
92+
User YAML ──► Schema/Validation
93+
│ (pairing, ordering, target checks)
94+
95+
SimulationRunner
96+
_lb_out_edges: OrderedDict[...] (shared object)
97+
│ build LB, edges (LB→S inserted into _lb_out_edges)
98+
│ build EventInjectionRuntime(..., lb_out_edges=alias)
99+
100+
├─ _start_events()
101+
│ └─ EventInjectionRuntime.start()
102+
│ └─ start _assign_server_state() (SimPy proc)
103+
104+
├─ _start_all_processes()
105+
│ ├─ LoadBalancerRuntime.start()
106+
│ ├─ EdgeRuntime.start() (if any process)
107+
│ └─ ServerRuntime.start()
108+
109+
└─ env.run(until=T)
110+
111+
Runtime progression (example):
112+
t=5s EventInjectionRuntime: SERVER_DOWN(S1)
113+
└─ _edge_by_server[S1] -> (edge-S1, edge_rt)
114+
└─ _lb_out_edges.pop("edge-S1") # O(1)
115+
116+
t=7s LoadBalancerRuntime picks next edge
117+
└─ "edge-S1" not present → never selected
118+
119+
t=10s EventInjectionRuntime: SERVER_UP(S1)
120+
└─ _lb_out_edges["edge-S1"] = edge_rt # O(1)
121+
└─ _lb_out_edges.move_to_end("edge-S1") # fairness
122+
123+
t>10s LoadBalancerRuntime now sees edge-S1 again
124+
└─ RR/LC proceeds as usual
125+
```
126+
127+
---
128+
129+
## 7) Correctness & Determinism
130+
131+
* **Exact timing**: absolute→relative conversion ensures transitions happen at precise timestamps.
132+
* **END before START** at identical times prevents spuriously “stuck down” outcomes for back-to-back events.
133+
* **Fair rejoin**: `move_to_end` reintroduces the server in a predictable RR position (least recently used).
134+
(Least-connections remains deterministic because the edge reappears with its current connection count.)
135+
* **Availability constraint**: schema can enforce “at least one server up,” avoiding degenerate LB states.
136+
137+
---
138+
139+
## 8) Design Choices & Rationale
140+
141+
* **Mutate the edge set, not the algorithm**
142+
Removing/adding the LB→server edge keeps LB code **pure** and reusable; no conditional branches for “down servers”.
143+
* **Single shared `OrderedDict`**
144+
145+
* O(1) for remove/insert/rotate.
146+
* Aliasing between LB and injector removes the need for signaling or copies.
147+
* **Centralized coroutine**
148+
One SimPy process for server outages scales better than per-server processes; simpler mental model.
149+
* **Reverse index `server_id → edge`**
150+
Constant-time resolution; avoids coupling servers to LB or vice-versa.
151+
152+
---
153+
154+
## 9) Performance
155+
156+
* **Build**:
157+
158+
* Timeline construction: O(#server-events)
159+
* Sort: O(#server-events · log #server-events)
160+
* **Run**:
161+
162+
* Each transition: O(1) (pop/set/move)
163+
* LB pick: unchanged (RR O(1), LC O(n))
164+
* **Space**:
165+
166+
* Reverse index: O(#servers with LB edges)
167+
* Timeline: O(#server-events)
168+
169+
---
170+
171+
## 10) Failure Modes & Guards
172+
173+
* Unknown server in an event → rejected by schema (or ignored with a log if you prefer leniency).
174+
* Concurrent DOWN/UP at same timestamp → resolved by timeline ordering (END first).
175+
* All servers down → disallowed by schema (or handled by LB guard if you opt in later).
176+
* Missing reverse mapping (no LB) → injector safely no-ops.
177+
178+
---
179+
180+
## 11) Extensibility
181+
182+
* **Multiple LB instances**: make the reverse index `(lb_id, server_id) → edge_id`, or pass per-LB `lb_out_edges`.
183+
* **Partial capacity**: instead of removing edges, attach capacity/weight and have the LB respect it (requires extending LB policy).
184+
* **Dynamic scale-out**: adding new servers at runtime is the same operation as “UP” with a previously unseen edge.
185+
186+
---
187+
188+
## 12) Operational Notes
189+
190+
* Start the **event coroutine** before LB to avoid off-by-one delivery at `t_start`.
191+
* Keep `_lb_out_edges` the **only source of truth** for routable edges.
192+
* If you also use edge-level spikes, both coroutines can run concurrently; they are independent.
193+
194+
---
195+
196+
## 13) Summary
197+
198+
We model server outages by **mutating the LB’s live edge set** via a centralized event runtime:
199+
200+
* **O(1)** down/up transitions by `pop`/`set` on a shared `OrderedDict`.
201+
* LB algorithms remain untouched and deterministic.
202+
* A single SimPy coroutine drives the timeline; a reverse index resolves targets in constant time.
203+
* The design is minimal, performant, and easy to extend to richer failure models.

src/asyncflow/runtime/actors/edge.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def __init__( # Noqa: PLR0913
5656
self.edges_affected = edges_affected
5757
self.target_box = target_box
5858
self.rng = rng or np.random.default_rng()
59-
self.setting = settings
59+
self.settings = settings
6060
self._edge_enabled_metrics = build_edge_metrics(
6161
settings.enabled_sample_metrics,
6262
)
@@ -93,8 +93,8 @@ def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]:
9393
# Logic to add if exists the event injection for the given edge
9494
spike = 0.0
9595
if (
96-
self.edges_affected
97-
and self.edges_spike
96+
self.edges_spike
97+
and self.edges_affected
9898
and self.edge_config.id in self.edges_affected
9999
):
100100
spike = self.edges_spike.get(self.edge_config.id, 0.0)

src/asyncflow/runtime/actors/load_balancer.py

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
"""Definition of the node represented by the LB in the simulation"""
22

3+
4+
from collections import OrderedDict
35
from collections.abc import Generator
4-
from typing import TYPE_CHECKING
6+
from typing import (
7+
TYPE_CHECKING,
8+
)
59

610
import simpy
711

8-
from asyncflow.config.constants import LbAlgorithmsName, SystemNodes
12+
from asyncflow.config.constants import SystemNodes
913
from asyncflow.runtime.actors.edge import EdgeRuntime
10-
from asyncflow.runtime.actors.routing.lb_algorithms import (
11-
least_connections,
12-
round_robin,
13-
)
14+
from asyncflow.runtime.actors.routing.lb_algorithms import LB_TABLE
1415
from asyncflow.schemas.topology.nodes import LoadBalancer
1516

1617
if TYPE_CHECKING:
@@ -26,29 +27,38 @@ def __init__(
2627
*,
2728
env: simpy.Environment,
2829
lb_config: LoadBalancer,
29-
out_edges: list[EdgeRuntime] | None,
30+
31+
# We use an OrderedDict because, for the RR algorithm,
32+
# we rotate elements in O(1) by moving the selected key to the end.
33+
# An OrderedDict also lets us remove an element by key in O(1)
34+
# without implementing a custom doubly linked list + hashmap.
35+
# Keys are the unique edge IDs that connect the LB to the servers.
36+
# If multiple LBs are present, the SimulationRunner assigns
37+
# the correct dict to each LB. Removals/insertions are performed
38+
# by the EventInjectionRuntime.
39+
40+
lb_out_edges: OrderedDict[str, EdgeRuntime],
3041
lb_box: simpy.Store,
3142
) -> None:
3243
"""
3344
Descriprion of the instance attributes for the class
3445
Args:
35-
env (simpy.Environment): env of the simulation
36-
lb_config (LoadBalancer): input to define the lb in the runtime
37-
rqs_state (RequestState): state of the simulation
38-
out_edges (list[EdgeRuntime]): list of edges that connects lb with servers
39-
lb_box (simpy.Store): store to add the state
40-
46+
env (simpy.Environment): Simulation environment.
47+
lb_config (LoadBalancer): LB configuration for the runtime.
48+
out_edges (OrderedDict[str, EdgeRuntime]): Edges connecting
49+
the LB to servers.
50+
lb_box (simpy.Store): Queue (mailbox) from which the LB
51+
consumes request states.
4152
"""
4253
self.env = env
4354
self.lb_config = lb_config
44-
self.out_edges = out_edges
55+
self.lb_out_edges = lb_out_edges
4556
self.lb_box = lb_box
46-
self._round_robin_index: int = 0
57+
4758

4859

4960
def _forwarder(self) -> Generator[simpy.Event, None, None]:
5061
"""Updtate the state before passing it to another node"""
51-
assert self.out_edges is not None
5262
while True:
5363
state: RequestState = yield self.lb_box.get() # type: ignore[assignment]
5464

@@ -58,14 +68,7 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
5868
self.env.now,
5969
)
6070

61-
if self.lb_config.algorithms == LbAlgorithmsName.ROUND_ROBIN:
62-
out_edge, self._round_robin_index = round_robin(
63-
self.out_edges,
64-
self._round_robin_index,
65-
)
66-
else:
67-
out_edge = least_connections(self.out_edges)
68-
71+
out_edge = LB_TABLE[self.lb_config.algorithms](self.lb_out_edges)
6972
out_edge.transport(state)
7073

7174
def start(self) -> simpy.Process:
Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,45 @@
11
"""algorithms to simulate the load balancer during the simulation"""
22

3+
from collections import OrderedDict
4+
from collections.abc import Callable
35

4-
6+
from asyncflow.config.constants import LbAlgorithmsName
57
from asyncflow.runtime.actors.edge import EdgeRuntime
68

79

8-
def least_connections(list_edges: list[EdgeRuntime]) -> EdgeRuntime:
9-
"""We send the state to the edge with less concurrent connections"""
10-
concurrent_connections = [edge.concurrent_connections for edge in list_edges]
11-
12-
idx_min = concurrent_connections.index(min(concurrent_connections))
13-
14-
return list_edges[idx_min]
15-
16-
def round_robin(edges: list[EdgeRuntime], idx: int) -> tuple[EdgeRuntime, int]:
10+
def least_connections(
11+
edges: OrderedDict[str, EdgeRuntime],
12+
) -> EdgeRuntime:
13+
"""Return the edge with the fewest concurrent connections"""
14+
# Here we use a O(n) operation, considering the amount of edges
15+
# for the average simulation it should be ok, however, in the
16+
# future we might consider to implement an heap structure to
17+
# reduce the time complexity, especially if we will see
18+
# during the Montecarlo analysis not good performances
19+
name = min(edges, key=lambda k: edges[k].concurrent_connections)
20+
return edges[name]
21+
22+
def round_robin(
23+
edges: OrderedDict[str, EdgeRuntime],
24+
) -> EdgeRuntime:
1725
"""
1826
We send states to different server in uniform way by
19-
rotating the list of edges that should transport the state
20-
to the correct server, we rotate the index and not the list
21-
to avoid aliasing since the list is shared by many components
27+
rotating the ordered dict, given the pydantic validation
28+
we don't have to manage the edge case where the dict
29+
is empty
2230
"""
23-
idx %= len(edges)
24-
chosen = edges[idx]
25-
idx = (idx + 1) % len(edges)
26-
return chosen, idx
31+
# we use iter next creating all time a new iterator
32+
# to be sure that we return always the first element
33+
key, value = next(iter(edges.items()))
34+
edges.move_to_end(key)
35+
36+
return value
2737

2838

39+
LB_TABLE: dict[LbAlgorithmsName,
40+
Callable[[OrderedDict[str, EdgeRuntime]], EdgeRuntime]] = {
41+
LbAlgorithmsName.LEAST_CONNECTIONS: least_connections,
42+
LbAlgorithmsName.ROUND_ROBIN: round_robin,
43+
}
2944

3045

0 commit comments

Comments
 (0)