Skip to content

Commit 4ece6ce

Browse files
committed
Make rate limiting easier to consume
1 parent 03d4b0d commit 4ece6ce

14 files changed

Lines changed: 857 additions & 87 deletions

README.rst

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,65 @@ Examples
172172
173173
See the full documentation for more details on concurrent fixtures and shared state.
174174

175+
176+
Rate Limiting
177+
~~~~~~~~~~~~~
178+
179+
The plugin provides a ``rate_limiter_fixture_factory`` for enforcing rate limits across workers:
180+
181+
.. code-block:: python
182+
183+
import pytest
184+
from pytest_load_testing import weight, stop_load_testing
185+
from pytest_load_testing.token_bucket_rate_limiter import RateLimit
186+
187+
@pytest.fixture(scope="session")
188+
def api_limiter(rate_limiter_fixture_factory, request):
189+
"""Rate limiter that stops tests if rate drift exceeds 20%."""
190+
191+
def on_drift(limiter_id, current_rate, target_rate, drift):
192+
message = (
193+
f"Rate drift for {limiter_id}: "
194+
f"current={current_rate:.2f}/hr, target={target_rate}/hr, "
195+
f"drift={drift:.2%}"
196+
)
197+
stop_load_testing(request, message)
198+
199+
return rate_limiter_fixture_factory(
200+
name="api_limiter",
201+
hourly_rate=RateLimit.per_second(10), # 10 calls/second
202+
max_drift=0.2, # 20% tolerance
203+
on_drift_callback=on_drift
204+
)
205+
206+
@weight(1)
207+
def test_api_call(api_limiter):
208+
with api_limiter.rate_limited_context() as ctx:
209+
# Context entry waits if rate limit would be exceeded
210+
response = api.get("/data")
211+
assert response.status_code == 200
212+
assert ctx.call_count >= 1
213+
214+
**Key Features:**
215+
216+
* **Token Bucket Algorithm**: Allows controlled bursts while maintaining average rate
217+
* **Shared State**: Rate limiting coordinated across all workers
218+
* **Drift Detection**: Monitors actual vs. target rate and triggers callbacks
219+
* **Max Calls**: Optional limit on total calls with callback
220+
* **Dynamic Rates**: Support for callable rate functions
221+
222+
**Rate Limit Helpers:**
223+
224+
.. code-block:: python
225+
226+
RateLimit.per_second(10) # 10 calls per second
227+
RateLimit.per_minute(600) # 600 calls per minute
228+
RateLimit.per_hour(3600) # 3600 calls per hour
229+
RateLimit.per_day(86400) # 86400 calls per day
230+
231+
See the full documentation for more examples and advanced usage.
232+
233+
175234
License
176235
-------
177236

docs/index.md

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,209 @@ The factory handles worker coordination automatically:
246246
3. **Last Worker**: Executes `on_last_worker` callback during teardown
247247
4. **Cleanup**: Last worker removes all temporary files
248248

249+
## Rate Limiting
250+
251+
The plugin provides a [`rate_limiter_fixture_factory`](../src/pytest_load_testing/concurrent_fixtures.py:305) for enforcing rate limits across pytest-xdist workers. This is essential for load testing scenarios where you need to respect API rate limits or simulate realistic traffic patterns.
252+
253+
### Overview
254+
255+
The rate limiter uses the **token bucket algorithm**, which allows controlled bursts of activity while maintaining an average rate over time. State is synchronized across all workers using file-based locking.
256+
257+
### Basic Usage
258+
259+
```python
260+
import pytest
261+
from pytest_load_testing import weight
262+
from pytest_load_testing.token_bucket_rate_limiter import RateLimit
263+
264+
@pytest.fixture(scope="session")
265+
def api_limiter(rate_limiter_fixture_factory):
266+
"""Rate limiter for API calls."""
267+
return rate_limiter_fixture_factory(
268+
name="api_limiter",
269+
hourly_rate=RateLimit.per_second(10) # 10 calls per second
270+
)
271+
272+
@weight(1)
273+
def test_api_call(api_limiter):
274+
with api_limiter.rate_limited_context() as ctx:
275+
# Context entry waits if rate limit would be exceeded
276+
response = api.get("/data")
277+
assert response.status_code == 200
278+
assert ctx.call_count >= 1
279+
```
280+
281+
### Rate Limit Helpers
282+
283+
The `RateLimit` class provides convenient factory methods:
284+
285+
```python
286+
RateLimit.per_second(10) # 10 calls per second (36,000/hour)
287+
RateLimit.per_minute(600) # 600 calls per minute (36,000/hour)
288+
RateLimit.per_hour(3600) # 3600 calls per hour
289+
RateLimit.per_day(86400) # 86400 calls per day (3,600/hour)
290+
```
291+
292+
### Factory Parameters
293+
294+
```python
295+
rate_limiter_fixture_factory(
296+
name: str, # Unique identifier
297+
hourly_rate: Union[RateLimit, Callable], # Rate limit specification
298+
max_drift: float = 0.1, # Max deviation (0-1)
299+
on_drift_callback: Optional[Callable] = None, # Drift detection callback
300+
num_calls_between_checks: int = 10, # Calls between rate checks
301+
seconds_before_first_check: float = 60.0, # Delay before first check
302+
burst_capacity: Optional[int] = None, # Max burst size
303+
max_calls: int = -1, # Total call limit
304+
max_call_callback: Optional[Callable] = None # Max calls callback
305+
)
306+
```
307+
308+
### Advanced Examples
309+
310+
#### Rate Limiting with Drift Detection
311+
312+
Stop tests if actual rate exceeds target by more than 20%:
313+
314+
```python
315+
import pytest
316+
from pytest_load_testing import weight, stop_load_testing
317+
from pytest_load_testing.token_bucket_rate_limiter import RateLimit
318+
319+
@pytest.fixture(scope="session")
320+
def monitored_api(rate_limiter_fixture_factory, request):
321+
"""API limiter with drift detection."""
322+
323+
def on_drift(limiter_id, current_rate, target_rate, drift):
324+
"""Stop testing when drift exceeds threshold."""
325+
message = (
326+
f"Rate drift for {limiter_id} exceeds maximum: "
327+
f"current={current_rate:.2f}/hr, target={target_rate}/hr, "
328+
f"drift={drift:.2%}"
329+
)
330+
stop_load_testing(request, message)
331+
332+
return rate_limiter_fixture_factory(
333+
name="monitored_api",
334+
hourly_rate=RateLimit.per_second(100),
335+
max_drift=0.2, # 20% tolerance
336+
num_calls_between_checks=50,
337+
seconds_before_first_check=5.0,
338+
on_drift_callback=on_drift
339+
)
340+
341+
@weight(1)
342+
def test_api_with_monitoring(monitored_api):
343+
with monitored_api.rate_limited_context() as ctx:
344+
response = api.get("/data")
345+
assert response.status_code == 200
346+
```
347+
348+
#### Rate Limiting with Max Calls
349+
350+
Limit total number of calls and stop when reached:
351+
352+
```python
353+
@pytest.fixture(scope="session")
354+
def limited_api(rate_limiter_fixture_factory, request):
355+
"""API limiter with max calls."""
356+
357+
def on_max_calls(limiter_id, count):
358+
"""Stop when max calls reached."""
359+
stop_load_testing(request, f"Max calls reached: {count}")
360+
361+
return rate_limiter_fixture_factory(
362+
name="limited_api",
363+
hourly_rate=RateLimit.per_minute(600),
364+
max_calls=1000,
365+
max_call_callback=on_max_calls
366+
)
367+
368+
@weight(1)
369+
def test_limited_api(limited_api):
370+
with limited_api.rate_limited_context():
371+
response = api.post("/data", json={"key": "value"})
372+
assert response.status_code == 201
373+
```
374+
375+
#### Dynamic Rate Limiting
376+
377+
Adjust rate limits dynamically during test execution:
378+
379+
```python
380+
@pytest.fixture(scope="session")
381+
def adaptive_limiter(rate_limiter_fixture_factory):
382+
"""Rate limiter with dynamic rate adjustment."""
383+
current_rate = [RateLimit.per_second(10)]
384+
385+
def get_rate():
386+
return current_rate[0]
387+
388+
limiter = rate_limiter_fixture_factory(
389+
name="adaptive",
390+
hourly_rate=get_rate
391+
)
392+
limiter.rate_control = current_rate
393+
return limiter
394+
395+
@weight(1)
396+
def test_with_rate_change(adaptive_limiter):
397+
# Increase rate for this test
398+
adaptive_limiter.rate_control[0] = RateLimit.per_second(20)
399+
400+
with adaptive_limiter.rate_limited_context():
401+
response = api.get("/data")
402+
assert response.status_code == 200
403+
```
404+
405+
#### Burst Capacity Control
406+
407+
Allow bursts above average rate:
408+
409+
```python
410+
@pytest.fixture(scope="session")
411+
def bursty_api(rate_limiter_fixture_factory):
412+
"""API limiter allowing bursts."""
413+
return rate_limiter_fixture_factory(
414+
name="bursty_api",
415+
hourly_rate=RateLimit.per_second(10),
416+
burst_capacity=50 # Allow bursts up to 50 calls
417+
)
418+
419+
@weight(1)
420+
def test_burst_handling(bursty_api):
421+
# Can make rapid calls up to burst capacity
422+
with bursty_api.rate_limited_context():
423+
response = api.get("/data")
424+
assert response.status_code == 200
425+
```
426+
427+
### Context Manager Details
428+
429+
The `rate_limited_context()` context manager:
430+
431+
- **Waits** if necessary to respect the rate limit before entering
432+
- **Tracks** call count, exceptions, and timing
433+
- **Yields** a progress object with attributes:
434+
- `id`: The limiter name
435+
- `call_count`: Total calls made
436+
- `exceptions`: Total exceptions encountered
437+
- `start_time`: When rate limiting started (timestamp)
438+
- `hourly_rate`: Current rate limit in calls per hour
439+
440+
```python
441+
with api_limiter.rate_limited_context() as ctx:
442+
print(f"Limiter: {ctx.id}")
443+
print(f"Call count: {ctx.call_count}")
444+
print(f"Rate: {ctx.hourly_rate} calls/hour")
445+
# Make your API call here
446+
```
447+
448+
### Thread Safety
449+
450+
All rate limiter state is synchronized across workers using file-based locking via `SharedJson`. Multiple workers can safely share the same rate limiter instance without race conditions.
451+
249452
## License
250453

251454
MIT License - see LICENSE file for details.

examples/conftest.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
"""Conftest for examples to ensure fixtures work with xdist."""
22

3-
from pytest_load_testing.concurrent_fixtures import shared_json_fixture_factory
3+
from pytest_load_testing.concurrent_fixtures import (
4+
rate_limiter_fixture_factory,
5+
shared_json_fixture_factory,
6+
)
47

5-
# Re-export the fixture so it's available in examples
6-
__all__ = ["shared_json_fixture_factory"]
8+
# Re-export the fixtures so they're available in examples
9+
__all__ = ["shared_json_fixture_factory", "rate_limiter_fixture_factory"]

examples/test_load_example.py

Lines changed: 0 additions & 73 deletions
This file was deleted.

0 commit comments

Comments
 (0)