Skip to content

Commit b638d43

Browse files
committed
Added pydantic validation + unit + int tests for eventinjection
1 parent d68a37b commit b638d43

10 files changed

Lines changed: 1281 additions & 11 deletions

File tree

src/asyncflow/schemas/payload.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,55 @@ def ensure_not_all_servers_are_down_simultaneously(
200200
return model
201201

202202

203+
@model_validator(mode="after") # type: ignore[arg-type]
204+
def forbid_overlapping_server_outages(
205+
cls, # noqa: N805
206+
model: "SimulationPayload",
207+
) -> "SimulationPayload":
208+
"""
209+
Forbid overlapping SERVER_DOWN intervals targeting the same server.
210+
211+
Rationale:
212+
- Keeps runtime simple (no reference counting).
213+
- Allows back-to-back windows (END at t and START at t) thanks to sorting
214+
END before START at the same timestamp.
215+
"""
216+
events = model.events
217+
if not events:
218+
return model
219+
220+
servers_ids = {s.id for s in model.topology_graph.nodes.servers}
221+
222+
# Build per-server timelines with (time, kind) marks only for server outages
223+
per_server: dict[str, list[tuple[float, str]]] = {}
224+
for ev in events:
225+
if (
226+
ev.target_id in servers_ids
227+
and ev.start.kind == EventDescription.SERVER_DOWN
228+
):
229+
per_server.setdefault(
230+
ev.target_id, []).append((ev.start.t_start, "start"),
231+
)
232+
per_server[ev.target_id].append((ev.end.t_end, "end"))
233+
234+
# Sweep-line per server: sort by (time, END first), ensure active<=1
235+
for srv_id, timeline in per_server.items():
236+
if not timeline:
237+
continue
238+
# END before START at same t
239+
timeline.sort(key=lambda x: (x[0], x[1] == "start"))
240+
active = 0
241+
for t, mark in timeline:
242+
if mark == "end":
243+
if active > 0:
244+
active -= 1
245+
else: # START
246+
if active >= 1:
247+
msg = (f"Overlapping events for server '{srv_id}' at t={t:.6f};"
248+
" server outage windows must not overlap.")
249+
raise ValueError(msg)
250+
active += 1
251+
252+
return model
253+
254+
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
"""Integration test: LB with two servers and concurrent event injections.
2+
3+
Topology:
4+
5+
rqs-1 → client-1 → lb-1 → {srv-1, srv-2}
6+
srv-* → client-1
7+
8+
Events:
9+
- NETWORK_SPIKE on 'client-to-lb' in [0.20, 0.35].
10+
- SERVER_DOWN/UP on 'srv-1' in [0.40, 0.55].
11+
12+
Assertions:
13+
- Simulation completes.
14+
- Latency stats and throughput exist.
15+
"""
16+
17+
from __future__ import annotations
18+
19+
from typing import TYPE_CHECKING
20+
21+
import simpy
22+
23+
from asyncflow.config.constants import Distribution, EventDescription, LatencyKey
24+
from asyncflow.runtime.simulation_runner import SimulationRunner
25+
from asyncflow.schemas.common.random_variables import RVConfig
26+
from asyncflow.schemas.events.injection import EventInjection
27+
from asyncflow.schemas.payload import SimulationPayload
28+
from asyncflow.schemas.settings.simulation import SimulationSettings
29+
from asyncflow.schemas.topology.edges import Edge
30+
from asyncflow.schemas.topology.graph import TopologyGraph
31+
from asyncflow.schemas.topology.nodes import (
32+
Client,
33+
LoadBalancer,
34+
Server,
35+
ServerResources,
36+
TopologyNodes,
37+
)
38+
from asyncflow.schemas.workload.rqs_generator import RqsGenerator
39+
40+
if TYPE_CHECKING:
41+
from asyncflow.metrics.analyzer import ResultsAnalyzer
42+
43+
44+
def _server(sid: str) -> Server:
45+
return Server(id=sid, server_resources=ServerResources(), endpoints=[])
46+
47+
48+
def _edge(eid: str, src: str, tgt: str, mean: float = 0.002) -> Edge:
49+
return Edge(
50+
id=eid,
51+
source=src,
52+
target=tgt,
53+
latency=RVConfig(mean=mean, distribution=Distribution.POISSON),
54+
)
55+
56+
57+
def test_lb_two_servers_with_events_end_to_end() -> None:
58+
"""Round-robin LB with events; check that KPIs are produced."""
59+
env = simpy.Environment()
60+
rqs = RqsGenerator(
61+
id="rqs-1",
62+
avg_active_users=RVConfig(mean=1.0),
63+
avg_request_per_minute_per_user=RVConfig(mean=2.0),
64+
user_sampling_window=10.0,
65+
)
66+
sim = SimulationSettings(total_simulation_time=0.8)
67+
68+
client = Client(id="client-1")
69+
lb = LoadBalancer(id="lb-1")
70+
srv1 = _server("srv-1")
71+
srv2 = _server("srv-2")
72+
73+
edges = [
74+
_edge("gen-to-client", "rqs-1", "client-1"),
75+
_edge("client-to-lb", "client-1", "lb-1"),
76+
_edge("lb-to-srv1", "lb-1", "srv-1"),
77+
_edge("lb-to-srv2", "lb-1", "srv-2"),
78+
_edge("srv1-to-client", "srv-1", "client-1"),
79+
_edge("srv2-to-client", "srv-2", "client-1"),
80+
]
81+
nodes = TopologyNodes(servers=[srv1, srv2], client=client, load_balancer=lb)
82+
topo = TopologyGraph(nodes=nodes, edges=edges)
83+
84+
events = [
85+
EventInjection(
86+
event_id="spike",
87+
target_id="client-to-lb",
88+
start={
89+
"kind": EventDescription.NETWORK_SPIKE_START,
90+
"t_start": 0.20,
91+
"spike_s": 0.02,
92+
},
93+
end={"kind": EventDescription.NETWORK_SPIKE_END, "t_end": 0.35},
94+
),
95+
EventInjection(
96+
event_id="outage-srv1",
97+
target_id="srv-1",
98+
start={"kind": EventDescription.SERVER_DOWN, "t_start": 0.40},
99+
end={"kind": EventDescription.SERVER_UP, "t_end": 0.55},
100+
),
101+
]
102+
103+
payload = SimulationPayload(rqs_input=rqs, topology_graph=topo, sim_settings=sim)
104+
payload.events = events
105+
106+
runner = SimulationRunner(env=env, simulation_input=payload)
107+
results: ResultsAnalyzer = runner.run()
108+
109+
stats = results.get_latency_stats()
110+
assert stats
111+
assert stats[LatencyKey.TOTAL_REQUESTS] > 0
112+
ts, rps = results.get_throughput_series()
113+
assert len(ts) == len(rps) > 0
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Integration test: single server with edge spike and server outage.
2+
3+
Topology:
4+
5+
rqs-1 → client-1 → lb-1 → srv-1
6+
srv-1 → client-1
7+
8+
Events:
9+
- NETWORK_SPIKE on 'client-to-lb' during a small window.
10+
- SERVER_DOWN/UP on 'srv-1' during a small window.
11+
12+
Assertions focus on end-to-end KPIs; the fine-grained event sequencing is
13+
covered by unit tests in the event injection suite.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
from typing import TYPE_CHECKING
19+
20+
import simpy
21+
22+
from asyncflow.config.constants import Distribution, EventDescription, LatencyKey
23+
from asyncflow.runtime.simulation_runner import SimulationRunner
24+
from asyncflow.schemas.common.random_variables import RVConfig
25+
from asyncflow.schemas.events.injection import EventInjection
26+
from asyncflow.schemas.payload import SimulationPayload
27+
from asyncflow.schemas.settings.simulation import SimulationSettings
28+
from asyncflow.schemas.topology.edges import Edge
29+
from asyncflow.schemas.topology.graph import TopologyGraph
30+
from asyncflow.schemas.topology.nodes import (
31+
Client,
32+
LoadBalancer,
33+
Server,
34+
ServerResources,
35+
TopologyNodes,
36+
)
37+
from asyncflow.schemas.workload.rqs_generator import RqsGenerator
38+
39+
if TYPE_CHECKING:
40+
from asyncflow.metrics.analyzer import ResultsAnalyzer
41+
42+
43+
def _server(sid: str) -> Server:
44+
return Server(id=sid, server_resources=ServerResources(), endpoints=[])
45+
46+
47+
def _edge(eid: str, src: str, tgt: str, mean: float = 0.002) -> Edge:
48+
return Edge(
49+
id=eid,
50+
source=src,
51+
target=tgt,
52+
latency=RVConfig(mean=mean, distribution=Distribution.POISSON),
53+
)
54+
55+
56+
def test_single_server_with_spike_and_outage_end_to_end() -> None:
57+
"""Run with both edge spike and server outage; verify KPIs exist."""
58+
env = simpy.Environment()
59+
rqs = RqsGenerator(
60+
id="rqs-1",
61+
avg_active_users=RVConfig(mean=1.0),
62+
avg_request_per_minute_per_user=RVConfig(mean=2.0),
63+
user_sampling_window=10.0,
64+
)
65+
sim = SimulationSettings(total_simulation_time=1.0)
66+
67+
client = Client(id="client-1")
68+
lb = LoadBalancer(id="lb-1")
69+
srv = _server("srv-1")
70+
71+
edges = [
72+
_edge("gen-to-client", "rqs-1", "client-1"),
73+
_edge("client-to-lb", "client-1", "lb-1"),
74+
_edge("lb-to-srv1", "lb-1", "srv-1"),
75+
_edge("srv1-to-client", "srv-1", "client-1"),
76+
]
77+
nodes = TopologyNodes(servers=[srv], client=client, load_balancer=lb)
78+
topo = TopologyGraph(nodes=nodes, edges=edges)
79+
80+
# Events in a short (but disjoint) schedule to avoid cross-process ties
81+
events = [
82+
EventInjection(
83+
event_id="spike",
84+
target_id="client-to-lb",
85+
start={
86+
"kind": EventDescription.NETWORK_SPIKE_START,
87+
"t_start": 0.2,
88+
"spike_s": 0.01,
89+
},
90+
end={"kind": EventDescription.NETWORK_SPIKE_END, "t_end": 0.4},
91+
),
92+
EventInjection(
93+
event_id="outage",
94+
target_id="srv-1",
95+
start={"kind": EventDescription.SERVER_DOWN, "t_start": 0.5},
96+
end={"kind": EventDescription.SERVER_UP, "t_end": 0.7},
97+
),
98+
]
99+
100+
payload = SimulationPayload(rqs_input=rqs, topology_graph=topo, sim_settings=sim)
101+
payload.events = events
102+
103+
runner = SimulationRunner(env=env, simulation_input=payload)
104+
results: ResultsAnalyzer = runner.run()
105+
106+
stats = results.get_latency_stats()
107+
assert stats
108+
assert stats[LatencyKey.TOTAL_REQUESTS] > 0

0 commit comments

Comments
 (0)