Skip to content

Commit 79d8f46

Browse files
committed
defined edge runtime and more central logic for sampler
1 parent 52bff29 commit 79d8f46

14 files changed

Lines changed: 316 additions & 118 deletions

File tree

src/app/api/simulation.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,4 @@
11
""""Api to simulate the process"""
22

3-
import numpy as np
4-
from fastapi import APIRouter
5-
6-
from app.core.simulation.simulation_run import run_simulation
7-
from app.schemas.full_simulation_input import SimulationPayload
8-
from app.schemas.simulation_output import SimulationOutput
9-
10-
router = APIRouter()
11-
12-
@router.post("/simulation")
13-
async def event_loop_simulation(input_data: SimulationPayload) -> SimulationOutput:
14-
"""Run the simulation and return aggregate KPIs."""
15-
rng = np.random.default_rng()
16-
return run_simulation(input_data, rng=rng)
173

184

src/app/config/constants.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class Distribution(StrEnum):
4949
NORMAL = "normal"
5050
LOG_NORMAL = "log_normal"
5151
EXPONENTIAL = "exponential"
52+
UNIFORM = "uniform"
5253

5354
# ======================================================================
5455
# CONSTANTS FOR ENDPOINT STEP DEFINITION (REQUEST-HANDLER)
@@ -123,7 +124,7 @@ class EndpointStepRAM(StrEnum):
123124
RAM = "ram"
124125

125126

126-
class Metrics(StrEnum):
127+
class StepOperation(StrEnum):
127128
"""
128129
Keys used inside the ``metrics`` dictionary of a *step*.
129130
@@ -153,6 +154,15 @@ class ServerResourcesDefaults:
153154
MINIMUM_RAM_MB = 256
154155
DB_CONNECTION_POOL = None
155156

157+
# ======================================================================
158+
# CONSTANTS FOR NETWORK PARAMETERS
159+
# ======================================================================
160+
161+
class NetworkParameters:
162+
"""parameters for the network"""
163+
164+
DROPOUT_RATE = 0.01
165+
156166
# ======================================================================
157167
# CONSTANTS FOR THE MACRO-TOPOLOGY GRAPH
158168
# ======================================================================
@@ -164,7 +174,7 @@ class SystemNodes(StrEnum):
164174
Each member represents a *macro-component* that may have its own SimPy
165175
resources (CPU cores, DB pool, etc.).
166176
"""
167-
177+
168178
GENERATOR = "generator"
169179
SERVER = "server"
170180
CLIENT = "client"

src/app/config/rqs_state.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"""
66

77
from __future__ import annotations
8+
89
from dataclasses import dataclass, field
910

1011

@@ -17,27 +18,32 @@ class RequestState:
1718
id: Unique identifier of the request.
1819
t0: Timestamp (simulated env.now) when the request was generated.
1920
history: List of hop records, each noting a node/edge visit.
21+
finish_time: Timestamp when the requests is satisfied
22+
2023
"""
24+
2125
id: int # Unique request identifier
2226
initial_time: float # Generation timestamp (env.now)
2327
finish_time: float | None = None # a requests might be dropped
2428
history: list[str] = field(default_factory=list) # Trace of hops
2529

26-
def record_hop(self, node_name: str) -> None:
30+
def record_hop(self, node_name: str, now: float) -> None:
2731
"""
2832
Append a record of visiting a node or edge.
2933
3034
Args:
3135
node_name: Name of the node or edge being recorded.
36+
now: register the time of the operation
37+
3238
"""
3339
# Record hop as "NodeName@Timestamp"
34-
self.history.append(f"{node_name}@{self.t0:.3f}")
35-
40+
self.history.append(f"{node_name}@{now:.3f}")
41+
3642
@property
3743
def latency(self) -> float | None:
3844
"""
3945
Return the total time in the system (finish_time - initial_time),
40-
or None if the request hasnt completed yet.
46+
or None if the request hasn't completed yet.
4147
"""
4248
if self.finish_time is None:
4349
return None

src/app/core/event_samplers/common_helpers.py

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,74 @@
33

44
import numpy as np
55

6+
from app.config.constants import Distribution
7+
from app.schemas.random_variables_config import RVConfig
68

7-
def uniform_variable_generator(rng: np.random.Generator | None = None) -> float:
9+
10+
def uniform_variable_generator(rng: np.random.Generator) -> float:
811
"""Return U~Uniform(0, 1)."""
912
rng = rng or np.random.default_rng()
10-
return float(rng.random())
11-
13+
return rng.random()
1214

1315
def poisson_variable_generator(
1416
mean: float,
15-
rng: np.random.Generator | None = None,
16-
) -> int:
17-
"""Return a Poisson-distributed integer with expectation *mean*."""
18-
rng = rng or np.random.default_rng()
19-
return int(rng.poisson(mean))
20-
17+
rng: np.random.Generator,
18+
) -> float:
19+
"""Return a Poisson-distributed floateger with expectation *mean*."""
20+
return rng.poisson(mean)
2121

2222
def truncated_gaussian_generator(
2323
mean: float,
2424
variance: float,
2525
rng: np.random.Generator,
26-
) -> int:
26+
) -> float:
2727
"""
2828
Generate a Normal-distributed variable
2929
with mean and variance
3030
"""
31-
rng = rng or np.random.default_rng()
3231
value = rng.normal(mean, variance)
33-
return max(0, int(value))
32+
return max(0.0, value)
33+
34+
def lognormal_variable_generator(
35+
mean: float,
36+
variance: float,
37+
rng: np.random.Generator,
38+
) -> float:
39+
"""Return a Poisson-distributed floateger with expectation *mean*."""
40+
return rng.lognormal(mean, variance)
41+
42+
def exponential_variable_generator(
43+
mean: float,
44+
rng: np.random.Generator,
45+
) -> float:
46+
"""Return a Poisson-distributed floateger with expectation *mean*."""
47+
return float(rng.exponential(mean))
48+
49+
def general_sampler(random_variable: RVConfig, rng: np.random.Generator) -> float:
50+
"""Sample a number according to the distribution described in `random_variable`."""
51+
dist = random_variable.distribution
52+
mean = random_variable.mean
53+
54+
match dist:
55+
case Distribution.UNIFORM:
56+
57+
assert random_variable.variance is None
58+
return uniform_variable_generator(rng)
59+
60+
case _:
61+
62+
variance = random_variable.variance
63+
assert variance is not None
64+
65+
match dist:
66+
case Distribution.NORMAL:
67+
return truncated_gaussian_generator(mean, variance, rng)
68+
case Distribution.LOG_NORMAL:
69+
return lognormal_variable_generator(mean, variance, rng)
70+
case Distribution.POISSON:
71+
return float(poisson_variable_generator(mean, rng))
72+
case Distribution.EXPONENTIAL:
73+
return exponential_variable_generator(mean, rng)
74+
case _:
75+
msg = f"Unsupported distribution: {dist}"
76+
raise ValueError(msg)

src/app/core/event_samplers/gaussian_poisson.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ def gaussian_poisson_sampling(
3939
Δt ~ Exponential(Λ) using inverse-CDF.
4040
4. Stop once the virtual clock exceeds *total_simulation_time*.
4141
"""
42-
4342
simulation_time = sim_settings.total_simulation_time
4443
user_sampling_window = input_data.user_sampling_window
4544

src/app/core/event_samplers/poisson_poisson.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ def poisson_poisson_sampling(
3636
Δt ~ Exponential(Λ) using inverse-CDF.
3737
4. Stop once the virtual clock exceeds *total_simulation_time*.
3838
"""
39-
4039
simulation_time = sim_settings.total_simulation_time
4140
user_sampling_window = input_data.user_sampling_window
4241

src/app/core/helpers/requests_generator.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
"""
2-
Continuous-time event sampling for the Poisson-Poisson
3-
and Gaussian-Poisson workload model.
2+
SimPy process that generates user requests at stochastic intervals.
3+
4+
This node samples inter-arrival times according to the configured
5+
distribution (Gaussian-Poisson or Poisson-Poisson), constructs a
6+
RequestState for each new request, records its origin hop, and
7+
immediately pushes it into the next pipeline stage via an EdgeRuntime.
48
"""
59

610
from __future__ import annotations
@@ -24,7 +28,7 @@ def requests_generator(
2428
input_data: RqsGeneratorInput,
2529
sim_settings: SimulationSettings,
2630
*,
27-
rng: np.random.Generator | None = None,
31+
rng: np.random.Generator,
2832
) -> Generator[float, None, None]:
2933
"""
3034
Return an iterator of inter-arrival gaps (seconds) according to the model

src/app/core/runtime/edge.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
"""
2+
Unidirectional link that simulates message transmission between nodes.
3+
Encapsulates network behavior—latency sampling (LogNormal, Exponential, etc.),
4+
drop probability, and optional connection-pool contention—by exposing a
5+
`send(msg)` method. Each `send` call schedules a SimPy subprocess that
6+
waits the sampled delay (and any resource wait) before delivering the
7+
message to the target node's inbox.
8+
"""
9+
10+
from collections.abc import Generator
11+
12+
import numpy as np
13+
import simpy
14+
15+
from app.config.constants import NetworkParameters
16+
from app.config.rqs_state import RequestState
17+
from app.core.event_samplers.common_helpers import general_sampler
18+
from app.schemas.system_topology_schema.full_system_topology_schema import Edge
19+
20+
21+
class EdgeRuntime:
22+
"""definining the logic to handle the edges during the simulation"""
23+
24+
def __init__(
25+
self,
26+
*,
27+
env: simpy.Environment,
28+
edge_config: Edge,
29+
rng: np.random.Generator | None = None,
30+
target_box: simpy.Store,
31+
) -> None:
32+
"""Definition of the instance attributes"""
33+
self.env = env
34+
self.edge_config = edge_config
35+
self.target_box = target_box
36+
self.rng = rng or np.random.default_rng()
37+
38+
def _deliver(self, state: RequestState) -> Generator[simpy.Event, None, None]:
39+
"""Function to deliver the state to the next node"""
40+
# assign a probability that the network will drop the request
41+
distribution = self.edge_config.latency.distribution
42+
43+
uniform_variable = self.rng.uniform()
44+
if uniform_variable < NetworkParameters.DROPOUT_RATE:
45+
state.finish_time = self.env.now
46+
state.record_hop(f"{self.edge_config.id}-dropped")
47+
return
48+
49+
transit_time = general_sampler(distribution, self.rng)
50+
yield self.env.timeout(transit_time)
51+
state.record_hop(self.edge_config.id, self.env.now)
52+
self.target_box.put(state)
53+
54+
55+
def transport(self, state: RequestState) -> simpy.Process:
56+
"""
57+
Called by the upstream node. Immediately spins off a SimPy process
58+
that will handle drop + delay + delivery of `state`.
59+
"""
60+
return self.env.process(self._deliver(state))
61+
62+
63+
64+
65+
66+

0 commit comments

Comments
 (0)