Skip to content

Commit 98fc0fd

Browse files
committed
Logic to handle completed rqs move to arr-gen
1 parent c5a8a52 commit 98fc0fd

23 files changed

Lines changed: 258 additions & 204 deletions

asyncflow_queue_limit/asyncflow_mmc.ipynb

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
},
2121
{
2222
"cell_type": "code",
23-
"execution_count": 96,
23+
"execution_count": 111,
2424
"id": "3e168d4a",
2525
"metadata": {},
2626
"outputs": [],
@@ -45,7 +45,7 @@
4545
},
4646
{
4747
"cell_type": "code",
48-
"execution_count": 97,
48+
"execution_count": 112,
4949
"id": "dd39a8e3",
5050
"metadata": {},
5151
"outputs": [
@@ -112,7 +112,7 @@
112112
},
113113
{
114114
"cell_type": "code",
115-
"execution_count": 98,
115+
"execution_count": 113,
116116
"id": "d2937e5e",
117117
"metadata": {},
118118
"outputs": [],
@@ -201,7 +201,7 @@
201201
},
202202
{
203203
"cell_type": "code",
204-
"execution_count": 99,
204+
"execution_count": 114,
205205
"id": "d0634bc8",
206206
"metadata": {},
207207
"outputs": [
@@ -339,27 +339,27 @@
339339
},
340340
{
341341
"cell_type": "code",
342-
"execution_count": 100,
342+
"execution_count": 115,
343343
"id": "ccd7379b",
344344
"metadata": {},
345345
"outputs": [
346346
{
347347
"name": "stdout",
348348
"output_type": "stream",
349349
"text": [
350-
"=================================================================\n",
350+
"===================================================================\n",
351351
"MMc (FCFS/Erlang-C) — Theory vs Observed\n",
352-
"-----------------------------------------------------------------\n",
353-
"sym metric theory observed abs rel%\n",
354-
"-----------------------------------------------------------------\n",
355-
"λ Arrival rate (1/s) 270.000000 270.258333 0.258333 0.10\n",
356-
"μ Service rate (1/s) 100.000000 100.036707 0.036707 0.04\n",
357-
"rho Utilization 0.900000 0.900531 0.000531 0.06\n",
358-
"L Mean items in sys 10.053549 10.073544 0.019994 0.20\n",
359-
"Lq Mean items in queue 7.353549 7.371934 0.018385 0.25\n",
360-
"W Mean time in sys (s) 0.037235 0.037274 0.000038 0.10\n",
361-
"Wq Mean waiting (s) 0.027235 0.027277 0.000042 0.15\n",
362-
"=================================================================\n"
352+
"-------------------------------------------------------------------\n",
353+
"sym metric theory observed abs rel%\n",
354+
"-------------------------------------------------------------------\n",
355+
"λ Arrival rate (1/s) 270.000000 269.997500 -0.002500 -0.00\n",
356+
"μ Service rate (1/s) 100.000000 99.925514 -0.074486 -0.07\n",
357+
"rho Utilization 0.900000 0.900663 0.000663 0.07\n",
358+
"L Mean items in sys 10.053549 9.991820 -0.061730 -0.61\n",
359+
"Lq Mean items in queue 7.353549 7.289848 -0.063701 -0.87\n",
360+
"W Mean time in sys (s) 0.037235 0.037007 -0.000228 -0.61\n",
361+
"Wq Mean waiting (s) 0.027235 0.027000 -0.000236 -0.87\n",
362+
"===================================================================\n"
363363
]
364364
}
365365
],

src/asyncflow/metrics/collector.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ def _build_time_series(self) -> Generator[simpy.Event, None, None]:
6767

6868

6969

70-
def start(self) -> simpy.Process:
70+
def start(self) -> None:
7171
"""Definition of the process to collect sampled metrics"""
72-
return self.env.process(self._build_time_series())
72+
self.env.process(self._build_time_series())
7373

7474

7575

src/asyncflow/metrics/simulation_analyzer.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from matplotlib.axes import Axes
2929
from matplotlib.lines import Line2D
3030

31-
from asyncflow.runtime.actors.client import ClientRuntime
31+
from asyncflow.runtime.actors.arrivals_generator import ArrivalsGeneratorRuntime
3232
from asyncflow.runtime.actors.edge import EdgeRuntime
3333
from asyncflow.runtime.actors.load_balancer import LoadBalancerRuntime
3434
from asyncflow.runtime.actors.server import ServerRuntime
@@ -69,14 +69,14 @@ class ResultsAnalyzer:
6969
def __init__(
7070
self,
7171
*,
72-
client: ClientRuntime,
72+
generator: ArrivalsGeneratorRuntime,
7373
servers: list[ServerRuntime],
7474
edges: list[EdgeRuntime],
7575
settings: SimulationSettings,
7676
lb: LoadBalancerRuntime | None = None,
7777
) -> None:
7878
"""Initialize with the runtime objects and original settings."""
79-
self._client = client
79+
self._generator = generator
8080
self._servers = servers
8181
self._edges = edges
8282
self._settings = settings
@@ -103,8 +103,8 @@ def __init__(
103103
# ─────────────────────────────────────────────
104104
def process_all_metrics(self) -> None:
105105
"""Compute all aggregated and sampled metrics if not already done."""
106-
# Client-side: end-to-end latencies + 1s throughput
107-
if self.latency_stats is None and self._client.rqs_clock:
106+
# generator-side: end-to-end latencies + 1s throughput
107+
if self.latency_stats is None and self._generator.rqs_clock:
108108
self._process_event_metrics()
109109

110110
# Sampled time series from servers/edges (RAM, queues, etc.)
@@ -209,7 +209,7 @@ def _process_event_metrics(self) -> None:
209209
# 1) Latencies
210210
self.latencies = [
211211
clock.finish - clock.start
212-
for clock in self._client.rqs_clock
212+
for clock in self._generator.rqs_clock
213213
]
214214

215215
# 2) Summary stats
@@ -229,7 +229,7 @@ def _process_event_metrics(self) -> None:
229229
self.latency_stats = {}
230230

231231
# 3) Throughput per 1s window (cached)
232-
completion_times = sorted(clock.finish for clock in self._client.rqs_clock)
232+
completion_times = sorted(clock.finish for clock in self._generator.rqs_clock)
233233
end_time = self._settings.total_simulation_time
234234

235235
timestamps: list[float] = []
@@ -320,7 +320,7 @@ def get_throughput_series(
320320
return self.throughput_series or ([], [])
321321

322322
# Recompute with a custom window size.
323-
completion_times = sorted(clock.finish for clock in self._client.rqs_clock)
323+
completion_times = sorted(clock.finish for clock in self._generator.rqs_clock)
324324
end_time = self._settings.total_simulation_time
325325

326326
timestamps: list[float] = []

src/asyncflow/runner/simulation.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
class Startable(Protocol):
4545
"""A protocol for runtime actors that can be started."""
4646

47-
def start(self) -> simpy.Process:
47+
def start(self) -> None:
4848
"""Starts the main process loop for the actor."""
4949
...
5050

@@ -141,6 +141,9 @@ def _build_rqs_generator(self) -> None:
141141
arrivals=self.arrivals,
142142
sim_settings=self.simulation_settings,
143143
rng=self.rng,
144+
arrivals_generator_box=self._make_inbox(),
145+
completed_box=self._make_inbox(),
146+
144147
)
145148

146149

@@ -155,7 +158,6 @@ def _build_client(self) -> None:
155158
self._client_runtime[self.client.id] = ClientRuntime(
156159
env=self.env,
157160
out_edge=None,
158-
completed_box=self._make_inbox(),
159161
client_box=self._make_inbox(),
160162
client_config=self.client,
161163
)
@@ -227,6 +229,8 @@ def _build_edges(self) -> None:
227229
target_box = target_object.client_box
228230
elif isinstance(target_object, LoadBalancerRuntime):
229231
target_box = target_object.lb_box
232+
elif isinstance(target_object, ArrivalsGeneratorRuntime):
233+
target_box = target_object.arrivals_generator_box
230234

231235

232236
else:
@@ -404,7 +408,7 @@ def run(self) -> ResultsAnalyzer:
404408
self.env.run(until=self.simulation_settings.total_simulation_time)
405409

406410
return ResultsAnalyzer(
407-
client=next(iter(self._client_runtime.values())),
411+
generator=next(iter(self._arrivals_runtime.values())),
408412
servers=list(self._servers_runtime.values()),
409413
edges=list(self._edges_runtime.values()),
410414
settings=self.simulation_settings,

src/asyncflow/runtime/actors/arrivals_generator.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import numpy as np
1111

1212
from asyncflow.config.enums import SystemNodes
13+
from asyncflow.metrics.client import RqsClock
1314
from asyncflow.runtime.rqs_state import RequestState
1415
from asyncflow.samplers.arrivals import general_interarrivals
1516

@@ -25,18 +26,20 @@
2526

2627
class ArrivalsGeneratorRuntime:
2728
"""
28-
A node that produces request contexts at stochastic inter-arrival times
29+
A node that produces request contexts at stochastic inter-arrival times
2930
and immediately pushes them down the pipeline via an EdgeRuntime.
3031
"""
3132

32-
def __init__(
33+
def __init__( # noqa: PLR0913
3334
self,
3435
*,
3536
env: simpy.Environment,
3637
out_edge: EdgeRuntime | None,
3738
arrivals: ArrivalsGenerator,
3839
sim_settings: SimulationSettings,
3940
rng: np.random.Generator | None = None,
41+
arrivals_generator_box: simpy.Store,
42+
completed_box: simpy.Store,
4043
) -> None:
4144
"""
4245
Definition of the instance attributes for the ArrivalsGeneratorRuntime
@@ -47,14 +50,19 @@ def __init__(
4750
arrivals (ArrivalsGenerator): data do define the sampler
4851
sim_settings (SimulationSettings): settings to start the simulation
4952
rng (np.random.Generator | None, optional): random variable generator.
53+
arrivals_generator_box: simpy box to collect request when they come back
54+
completed_box: box to collect all satisfied requests
5055
5156
"""
5257
self.arrivals = arrivals
5358
self.sim_settings = sim_settings
5459
self.rng = rng or np.random.default_rng()
5560
self.out_edge = out_edge
5661
self.env = env
62+
self.arrivals_generator_box = arrivals_generator_box
63+
self.completed_box = completed_box
5764
self.id_counter = 0
65+
self._rqs_clock: list[RqsClock] = []
5866

5967

6068
def _next_id(self) -> int:
@@ -90,6 +98,28 @@ def _event_arrival(self) -> Generator[simpy.Event, None, None]:
9098
# from one node to another
9199
self.out_edge.transport(state)
92100

101+
def _collector(self) -> Generator[simpy.Event, None, None]:
102+
"""The request has been satisfied"""
103+
while True:
104+
state: RequestState = yield self.arrivals_generator_box.get() # type: ignore[assignment]
105+
106+
state.finish_time = self.env.now
107+
clock_data = RqsClock(
108+
start=state.initial_time,
109+
finish=state.finish_time,
110+
)
111+
self._rqs_clock.append(clock_data)
112+
yield self.completed_box.put(state)
113+
114+
93115
def start(self) -> simpy.Process:
94116
"""Passing the structure as a simpy process"""
95-
return self.env.process(self._event_arrival())
117+
self.env.process(self._event_arrival())
118+
self.env.process(self._collector())
119+
120+
121+
122+
@property
123+
def rqs_clock(self) -> list[RqsClock]:
124+
"""Readable version to compute aggregate metrics"""
125+
return self._rqs_clock

src/asyncflow/runtime/actors/client.py

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import simpy
77

88
from asyncflow.config.enums import SystemNodes
9-
from asyncflow.metrics.client import RqsClock
109
from asyncflow.runtime.actors.edge import EdgeRuntime
1110
from asyncflow.schemas.topology.nodes import Client
1211

@@ -24,20 +23,13 @@ def __init__(
2423
env: simpy.Environment,
2524
out_edge: EdgeRuntime | None,
2625
client_box: simpy.Store,
27-
completed_box: simpy.Store,
2826
client_config: Client,
2927
) -> None:
3028
"""Definition of attributes for the client"""
3129
self.env = env
3230
self.out_edge = out_edge
3331
self.client_config = client_config
3432
self.client_box = client_box
35-
self.completed_box = completed_box
36-
# This list will be enough to calculate at the end
37-
# of the simulation both the throughput and the
38-
# latency distribution
39-
40-
self._rqs_clock: list[RqsClock] = []
4133

4234

4335
def _forwarder(self) -> Generator[simpy.Event, None, None]:
@@ -53,31 +45,9 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
5345
self.env.now,
5446
)
5547

56-
# if the length of the list is bigger than two
57-
# it means that the state is coming back to the
58-
# client after being elaborated, since if the value
59-
# would be equal to two would mean that the state
60-
# went through the mandatory path to be generated
61-
# rqs generator and client registration
62-
if len(state.history) > 3:
63-
state.finish_time = self.env.now
64-
clock_data = RqsClock(
65-
start=state.initial_time,
66-
finish=state.finish_time,
67-
)
68-
self._rqs_clock.append(clock_data)
69-
yield self.completed_box.put(state)
70-
else:
71-
self.out_edge.transport(state)
48+
self.out_edge.transport(state)
7249

73-
def start(self) -> simpy.Process:
50+
def start(self) -> None:
7451
"""Initialization of the process"""
75-
return self.env.process(self._forwarder())
52+
self.env.process(self._forwarder())
7653

77-
@property
78-
def rqs_clock(self) -> list[RqsClock]:
79-
"""
80-
Expose the value of the private list of the starting
81-
and arrival time for each rqs just for reading purpose
82-
"""
83-
return self._rqs_clock

src/asyncflow/runtime/actors/load_balancer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,10 @@ def _forwarder(self) -> Generator[simpy.Event, None, None]:
149149
edge_rt = LB_TABLE[self.lb_config.algorithms](self.lb_out_edges)
150150
edge_rt.transport(state)
151151

152-
def start(self) -> simpy.Process:
152+
def start(self) -> None:
153153
"""Start the process and populate FIFO"""
154154
self._prime_free_edges()
155-
return self.env.process(self._forwarder())
155+
self.env.process(self._forwarder())
156156

157157
@property
158158
def lb_waiting_times(self) -> Sequence[float]:

src/asyncflow/runtime/actors/server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,9 @@ def _dispatcher(self) -> Generator[simpy.Event, None, None]:
189189
# Spawn a new, independent process to handle this request
190190
self.env.process(self._handle_request(request_state))
191191

192-
def start(self) -> simpy.Process:
192+
def start(self) -> None:
193193
"""Generate the process to simulate the server inside simpy env"""
194-
return self.env.process(self._dispatcher())
194+
self.env.process(self._dispatcher())
195195

196196
# right now we disable the warnings but a refactor will be done soon
197197
def _handle_request( # noqa: PLR0915, PLR0912, C901

src/asyncflow/runtime/events/injection.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,11 +254,11 @@ def _assign_server_state(self) -> Generator[simpy.Event, None, None]:
254254

255255

256256

257-
def start(self) -> tuple[simpy.Process, simpy.Process]:
257+
def start(self) -> None:
258258
"""Start both edge-spike and server-outage timelines."""
259-
p1 = self.env.process(self._assign_edges_spike())
260-
p2 = self.env.process(self._assign_server_state())
261-
return p1, p2
259+
self.env.process(self._assign_edges_spike())
260+
self.env.process(self._assign_server_state())
261+
262262

263263
@property
264264
def edges_spike(self) -> dict[str, float]:

0 commit comments

Comments
 (0)