Skip to content

Commit 49f83ad

Browse files
committed
Add load and benchmark testing utilities
Introduce testing helpers for rate limiters under ratelink/testing/load.py. Adds LoadTestResult (with latency percentiles and summary) and functions to simulate concurrent load (simulate_load, simulate_load_async), run stress tests (stress_test), and benchmark/compare algorithms (benchmark_algorithm, compare_algorithms). Uses ThreadPoolExecutor and asyncio for concurrency, measures throughput, latencies, allowed/denied counts and errors, and supports synchronous and async limiter interfaces (check / acheck). Intended to help simulate traffic and evaluate limiter performance.
1 parent bb57279 commit 49f83ad

1 file changed

Lines changed: 342 additions & 0 deletions

File tree

ratelink/testing/load.py

Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
"""
2+
Load and benchmark helpers for rate limiters.
3+
4+
Provides tools to simulate concurrent load and measure performance.
5+
"""
6+
7+
import asyncio
8+
import time
9+
from concurrent.futures import ThreadPoolExecutor, as_completed
10+
from dataclasses import dataclass
11+
from typing import Any, Callable, Dict, List, Optional
12+
13+
14+
@dataclass
15+
class LoadTestResult:
16+
total_requests: int
17+
allowed: int
18+
denied: int
19+
duration_seconds: float
20+
requests_per_second: float
21+
latencies: List[float]
22+
errors: int = 0
23+
24+
@property
25+
def min_latency(self) -> float:
26+
return min(self.latencies) * 1000 if self.latencies else 0.0
27+
28+
@property
29+
def max_latency(self) -> float:
30+
return max(self.latencies) * 1000 if self.latencies else 0.0
31+
32+
@property
33+
def avg_latency(self) -> float:
34+
return (sum(self.latencies) / len(self.latencies) * 1000) if self.latencies else 0.0
35+
36+
@property
37+
def p50_latency(self) -> float:
38+
return self._percentile(50)
39+
40+
@property
41+
def p95_latency(self) -> float:
42+
return self._percentile(95)
43+
44+
@property
45+
def p99_latency(self) -> float:
46+
return self._percentile(99)
47+
48+
def _percentile(self, p: float) -> float:
49+
if not self.latencies:
50+
return 0.0
51+
52+
sorted_latencies = sorted(self.latencies)
53+
index = int(len(sorted_latencies) * (p / 100))
54+
return sorted_latencies[min(index, len(sorted_latencies) - 1)] * 1000
55+
56+
def summary(self) -> str:
57+
return f"""
58+
Load Test Results:
59+
Total Requests: {self.total_requests:,}
60+
Allowed: {self.allowed:,} ({self.allowed/self.total_requests*100:.1f}%)
61+
Denied: {self.denied:,} ({self.denied/self.total_requests*100:.1f}%)
62+
Errors: {self.errors:,}
63+
Duration: {self.duration_seconds:.2f}s
64+
Throughput: {self.requests_per_second:.2f} req/s
65+
66+
Latency (ms):
67+
Min: {self.min_latency:.2f}
68+
Avg: {self.avg_latency:.2f}
69+
P50: {self.p50_latency:.2f}
70+
P95: {self.p95_latency:.2f}
71+
P99: {self.p99_latency:.2f}
72+
Max: {self.max_latency:.2f}
73+
""".strip()
74+
75+
76+
def simulate_load(
77+
limiter: Any,
78+
num_users: int,
79+
requests_per_user: int,
80+
key_generator: Optional[Callable[[int], str]] = None,
81+
weight: int = 1,
82+
max_workers: Optional[int] = None
83+
) -> LoadTestResult:
84+
if key_generator is None:
85+
key_generator = lambda user_id: f"user:{user_id}"
86+
87+
if max_workers is None:
88+
max_workers = min(num_users, 100)
89+
90+
total_requests = num_users * requests_per_user
91+
allowed = 0
92+
denied = 0
93+
errors = 0
94+
latencies = []
95+
96+
def make_requests(user_id: int) -> tuple:
97+
user_allowed = 0
98+
user_denied = 0
99+
user_errors = 0
100+
user_latencies = []
101+
102+
key = key_generator(user_id)
103+
104+
for _ in range(requests_per_user):
105+
start = time.time()
106+
try:
107+
is_allowed, state = limiter.check(key, weight)
108+
latency = time.time() - start
109+
110+
user_latencies.append(latency)
111+
112+
if is_allowed:
113+
user_allowed += 1
114+
else:
115+
user_denied += 1
116+
117+
except Exception as e:
118+
user_errors += 1
119+
latency = time.time() - start
120+
user_latencies.append(latency)
121+
122+
return user_allowed, user_denied, user_errors, user_latencies
123+
124+
start_time = time.time()
125+
126+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
127+
futures = [executor.submit(make_requests, user_id) for user_id in range(num_users)]
128+
129+
for future in as_completed(futures):
130+
user_allowed, user_denied, user_errors, user_latencies = future.result()
131+
allowed += user_allowed
132+
denied += user_denied
133+
errors += user_errors
134+
latencies.extend(user_latencies)
135+
136+
duration = time.time() - start_time
137+
138+
return LoadTestResult(
139+
total_requests=total_requests,
140+
allowed=allowed,
141+
denied=denied,
142+
duration_seconds=duration,
143+
requests_per_second=total_requests / duration if duration > 0 else 0,
144+
latencies=latencies,
145+
errors=errors
146+
)
147+
148+
149+
async def simulate_load_async(
150+
limiter: Any,
151+
num_users: int,
152+
requests_per_user: int,
153+
key_generator: Optional[Callable[[int], str]] = None,
154+
weight: int = 1
155+
) -> LoadTestResult:
156+
if key_generator is None:
157+
key_generator = lambda user_id: f"user:{user_id}"
158+
159+
total_requests = num_users * requests_per_user
160+
allowed = 0
161+
denied = 0
162+
errors = 0
163+
latencies = []
164+
165+
async def make_requests(user_id: int) -> tuple:
166+
user_allowed = 0
167+
user_denied = 0
168+
user_errors = 0
169+
user_latencies = []
170+
171+
key = key_generator(user_id)
172+
173+
for _ in range(requests_per_user):
174+
start = time.time()
175+
try:
176+
if hasattr(limiter, 'acheck'):
177+
is_allowed, state = await limiter.acheck(key, weight)
178+
else:
179+
is_allowed, state = limiter.check(key, weight)
180+
181+
latency = time.time() - start
182+
user_latencies.append(latency)
183+
184+
if is_allowed:
185+
user_allowed += 1
186+
else:
187+
user_denied += 1
188+
189+
except Exception as e:
190+
user_errors += 1
191+
latency = time.time() - start
192+
user_latencies.append(latency)
193+
194+
return user_allowed, user_denied, user_errors, user_latencies
195+
196+
start_time = time.time()
197+
198+
tasks = [make_requests(user_id) for user_id in range(num_users)]
199+
results = await asyncio.gather(*tasks)
200+
201+
for user_allowed, user_denied, user_errors, user_latencies in results:
202+
allowed += user_allowed
203+
denied += user_denied
204+
errors += user_errors
205+
latencies.extend(user_latencies)
206+
207+
duration = time.time() - start_time
208+
209+
return LoadTestResult(
210+
total_requests=total_requests,
211+
allowed=allowed,
212+
denied=denied,
213+
duration_seconds=duration,
214+
requests_per_second=total_requests / duration if duration > 0 else 0,
215+
latencies=latencies,
216+
errors=errors
217+
)
218+
219+
220+
def benchmark_algorithm(
221+
algorithm_name: str,
222+
limit: int,
223+
window: int,
224+
num_requests: int = 10000,
225+
backend: Optional[Any] = None
226+
) -> LoadTestResult:
227+
try:
228+
from ratelink import RateLimiter
229+
230+
limiter = RateLimiter(
231+
algorithm=algorithm_name,
232+
limit=limit,
233+
window=window,
234+
backend=backend
235+
)
236+
237+
return simulate_load(
238+
limiter,
239+
num_users=1,
240+
requests_per_user=num_requests,
241+
key_generator=lambda _: "benchmark:user"
242+
)
243+
244+
except ImportError:
245+
raise ImportError("RateLimiter not available for benchmarking")
246+
247+
248+
def compare_algorithms(
249+
limit: int = 100,
250+
window: int = 60,
251+
num_requests: int = 1000
252+
) -> Dict[str, LoadTestResult]:
253+
algorithms = [
254+
'token_bucket',
255+
'leaky_bucket',
256+
'fixed_window',
257+
'sliding_window',
258+
'sliding_window_log',
259+
]
260+
261+
results = {}
262+
263+
for algo in algorithms:
264+
try:
265+
result = benchmark_algorithm(algo, limit, window, num_requests)
266+
results[algo] = result
267+
except Exception as e:
268+
print(f"Failed to benchmark {algo}: {e}")
269+
270+
return results
271+
272+
def stress_test(
273+
limiter: Any,
274+
duration_seconds: float = 10.0,
275+
num_workers: int = 10,
276+
key_generator: Optional[Callable[[int], str]] = None
277+
) -> LoadTestResult:
278+
if key_generator is None:
279+
key_generator = lambda worker_id: f"worker:{worker_id}"
280+
281+
allowed = 0
282+
denied = 0
283+
errors = 0
284+
latencies = []
285+
total_requests = 0
286+
287+
def worker(worker_id: int, stop_time: float) -> tuple:
288+
worker_allowed = 0
289+
worker_denied = 0
290+
worker_errors = 0
291+
worker_latencies = []
292+
worker_requests = 0
293+
294+
key = key_generator(worker_id)
295+
296+
while time.time() < stop_time:
297+
start = time.time()
298+
try:
299+
is_allowed, state = limiter.check(key, weight=1)
300+
latency = time.time() - start
301+
302+
worker_latencies.append(latency)
303+
worker_requests += 1
304+
305+
if is_allowed:
306+
worker_allowed += 1
307+
else:
308+
worker_denied += 1
309+
310+
except Exception as e:
311+
worker_errors += 1
312+
latency = time.time() - start
313+
worker_latencies.append(latency)
314+
worker_requests += 1
315+
316+
return worker_allowed, worker_denied, worker_errors, worker_latencies, worker_requests
317+
318+
start_time = time.time()
319+
stop_time = start_time + duration_seconds
320+
321+
with ThreadPoolExecutor(max_workers=num_workers) as executor:
322+
futures = [executor.submit(worker, worker_id, stop_time) for worker_id in range(num_workers)]
323+
324+
for future in as_completed(futures):
325+
w_allowed, w_denied, w_errors, w_latencies, w_requests = future.result()
326+
allowed += w_allowed
327+
denied += w_denied
328+
errors += w_errors
329+
latencies.extend(w_latencies)
330+
total_requests += w_requests
331+
332+
duration = time.time() - start_time
333+
334+
return LoadTestResult(
335+
total_requests=total_requests,
336+
allowed=allowed,
337+
denied=denied,
338+
duration_seconds=duration,
339+
requests_per_second=total_requests / duration if duration > 0 else 0,
340+
latencies=latencies,
341+
errors=errors
342+
)

0 commit comments

Comments
 (0)