Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', 'pypy-3.8']
python-version: ['3.9', '3.10', '3.11', '3.12']
steps:
- uses: actions/checkout@v4

Expand Down
2 changes: 0 additions & 2 deletions examples/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
"""Conftest for examples."""

# No special fixtures needed for basic load testing examples
15 changes: 10 additions & 5 deletions examples/test_load_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@

TEST_CODE:
```python
result = pytester.runpytest('--load-test', '-n', '2', '-v')
result = run_with_timeout(pytester, '--load-test', '-n', '2', '-v')
# Should complete successfully with interrupt
assert result.ret == pytest.ExitCode.INTERRUPTED
result.stdout.fnmatch_lines([
'*Interrupted: Test session completed*',
])
assert result.ret == pytest.ExitCode.INTERRUPTED
```
"""

import pytest

from pytest_load_testing import stop_load_testing, weight

# Simple counter without shared state
# Simple counter - note this won't work across xdist workers
# but is sufficient for demonstration purposes
_iteration_count = 0


Expand All @@ -31,8 +33,8 @@ def iteration_counter(request):
global _iteration_count
_iteration_count += 1

# Stop after 100 total test executions
if _iteration_count >= 100:
# Stop after 50 total test executions (reduced for faster testing)
if _iteration_count >= 50:
stop_load_testing(request, "Test session completed")


Expand All @@ -58,3 +60,6 @@ def test_admin_operations(iteration_counter):
def test_health_check(iteration_counter):
"""1% of requests - simulates health check."""
assert True


# Made with Bob
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ classifiers = [
"License :: OSI Approved :: MIT License",
]
dependencies = [
"pytest>=6.2.0",
"pytest>=8.4.2",
"pytest-xdist>=2.0.0",
]
[project.urls]
Expand Down
58 changes: 58 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,67 @@
import pytest
from _pytest.pytester import Pytester

pytest_plugins = "pytester"

PYTESTER_TIMEOUT = 10


@pytest.fixture(autouse=True)
def wide_terminal(monkeypatch):
"""Make all pytester tests use wider terminal for better output visibility."""
monkeypatch.setenv("COLUMNS", "120")


@pytest.fixture
def run_with_timeout():
"""Fixture that provides a helper to run pytester with timeout

Returns:
A callable that runs pytester.runpytest_subprocess with timeout handling
"""

def _run(pytester, *args, timeout=PYTESTER_TIMEOUT, **kwargs):
"""Run pytester with timeout and proper error handling.

Args:
pytester: The pytester fixture
*args: Arguments to pass to runpytest_subprocess
timeout: Timeout in seconds (default: PYTESTER_TIMEOUT)
**kwargs: Keyword arguments to pass to runpytest_subprocess

Returns:
The result object from runpytest_subprocess

Raises:
pytest.fail: If the subprocess times out
"""
try:
return pytester.runpytest_subprocess(*args, timeout=timeout, **kwargs)
except Pytester.TimeoutExpired as e:
# Read stdout/stderr from pytester path
stdout_path = pytester.path.joinpath("stdout")
stderr_path = pytester.path.joinpath("stderr")

stdout = stdout_path.read_text() if stdout_path.exists() else "<no stdout>"
stderr = stderr_path.read_text() if stderr_path.exists() else "<no stderr>"

# Truncate long output (first 100 + last 100 lines if > 200 lines)
def truncate_output(text: str) -> str:
lines = text.splitlines()
if len(lines) > 200:
first_100 = "\n".join(lines[:100])
last_100 = "\n".join(lines[-100:])
return f"{first_100}\n\n... ({len(lines) - 200} lines omitted) ...\n\n{last_100}"
return text

stdout = truncate_output(stdout)
stderr = truncate_output(stderr)

pytest.fail(
f"Test timed out after {timeout} seconds - load test did not complete\n"
f"Error: {e}\n"
f"STDOUT:\n{stdout}\n"
f"STDERR:\n{stderr}"
)

return _run
4 changes: 2 additions & 2 deletions tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ def test_missing_code(pytester):
return test_missing_code

# Create test function that executes the test code
def test_from_code(pytester):
def test_from_code(pytester, run_with_timeout):
# Copy example and conftest
pytester.copy_example(f"examples/{file_name}")
pytester.copy_example("examples/conftest.py")

# Execute the test code
exec(test_code, {"pytester": pytester, "pytest": pytest})
exec(test_code, {"pytester": pytester, "pytest": pytest, "run_with_timeout": run_with_timeout})

test_from_code.__name__ = test_name
test_from_code.__doc__ = f"Auto-generated test for {file_name}"
Expand Down
13 changes: 8 additions & 5 deletions tests/test_failure_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def test_dummy():
assert "test_example.py::test_one" in scheduler.last_success_time


def test_failure_tracking_integration(pytester):
def test_failure_tracking_integration(pytester, run_with_timeout):
"""Integration test: verify failure tracking works during actual test execution."""
# Create a conftest that will capture the scheduler for inspection
pytester.makeconftest("""
Expand Down Expand Up @@ -246,20 +246,23 @@ def pytest_sessionfinish(session):

@pytest.fixture(scope="session")
def run_count(tmp_path_factory):
counts_file = tmp_path_factory.mktemp("data") / "counts.json"
root_tmp_dir = tmp_path_factory.getbasetemp().parent
counts_file = root_tmp_dir / "counts.json"
lock_file = counts_file.with_suffix('.lock')

# Initialize file with a lock
with FileLock(str(lock_file)):
if not counts_file.exists():
counts_file.write_text(json.dumps({'failing': 0, 'passing': 0}))

class Counter:
def __init__(self, file_path, lock_path):
self.file = file_path
self.lock = lock_path
self.lock_path = lock_path

def increment(self, key):
with FileLock(str(self.lock)):
# Create new FileLock for each operation (xdist-safe)
with FileLock(str(self.lock_path)):
data = json.loads(self.file.read_text())
data[key] += 1
self.file.write_text(json.dumps(data))
Expand All @@ -286,7 +289,7 @@ def test_passing(request, run_count):
assert True
""")

result = pytester.runpytest("--load-test", "-n", "2", "-v")
result = run_with_timeout(pytester, "--load-test", "-n", "2", "-v")

# Verify tests ran and load testing stopped
result.stdout.fnmatch_lines(
Expand Down
38 changes: 22 additions & 16 deletions tests/test_fixture_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest


def test_request_fixture_available(pytester):
def test_request_fixture_available(pytester, run_with_timeout):
"""
Test that the request fixture is available on each test execution.

Expand All @@ -22,18 +22,17 @@ def test_request_fixture_available(pytester):
def test_request_fixture_available(request):
global count

count += 1
if count >= 5:
stop_load_testing(request, "Request fixture verified")

# The request fixture should be available
assert request is not None
assert hasattr(request, 'node')
assert hasattr(request, 'session')

if count < 5:
count += 1
else:
stop_load_testing(request, "Request fixture verified")
""")

result = pytester.runpytest("--load-test", "-n", "2", "-v")
result = run_with_timeout(pytester, "--load-test", "-n", "2", "-v", timeout=5)

# Should stop gracefully
result.stdout.fnmatch_lines(
Expand All @@ -44,7 +43,7 @@ def test_request_fixture_available(request):
assert result.ret == pytest.ExitCode.INTERRUPTED


def test_all_fixture_scopes(pytester):
def test_all_fixture_scopes(pytester, run_with_timeout):
"""Test that fixtures with different scopes provide correct data."""
pytester.makepyfile("""
import pytest
Expand All @@ -55,9 +54,11 @@ def test_all_fixture_scopes(pytester):

@pytest.fixture(scope="session")
def fixture_counts(tmp_path_factory):
counts_file = tmp_path_factory.mktemp("data") / "fixture_counts.json"
root_tmp_dir = tmp_path_factory.getbasetemp().parent
counts_file = root_tmp_dir / "fixture_counts.json"
lock_file = counts_file.with_suffix('.lock')

# Initialize file with a lock
with FileLock(str(lock_file)):
if not counts_file.exists():
counts_file.write_text(json.dumps({
Expand All @@ -69,17 +70,19 @@ def fixture_counts(tmp_path_factory):
class Counter:
def __init__(self, file_path, lock_path):
self.file = file_path
self.lock = lock_path
self.lock_path = lock_path

def increment(self, key):
with FileLock(str(self.lock)):
# Create new FileLock for each operation (xdist-safe)
with FileLock(str(self.lock_path)):
data = json.loads(self.file.read_text())
data[key] += 1
self.file.write_text(json.dumps(data))
return data[key]

def read(self):
with FileLock(str(self.lock)):
# Create new FileLock for each operation (xdist-safe)
with FileLock(str(self.lock_path)):
return json.loads(self.file.read_text())

counter = Counter(counts_file, lock_file)
Expand All @@ -90,20 +93,23 @@ def read(self):

@pytest.fixture(scope="session")
def execution_counts(tmp_path_factory):
counts_file = tmp_path_factory.mktemp("data") / "execution_counts.json"
root_tmp_dir = tmp_path_factory.getbasetemp().parent
counts_file = root_tmp_dir / "execution_counts.json"
lock_file = counts_file.with_suffix('.lock')

# Initialize file with a lock
with FileLock(str(lock_file)):
if not counts_file.exists():
counts_file.write_text(json.dumps({"test1": 0, "test2": 0}))

class Counter:
def __init__(self, file_path, lock_path):
self.file = file_path
self.lock = lock_path
self.lock_path = lock_path

def increment(self, key):
with FileLock(str(self.lock)):
# Create new FileLock for each operation (xdist-safe)
with FileLock(str(self.lock_path)):
data = json.loads(self.file.read_text())
data[key] += 1
self.file.write_text(json.dumps(data))
Expand Down Expand Up @@ -175,7 +181,7 @@ def test_function_fixture_reinitialization(request, function_fixture, execution_
stop_load_testing(request, f"All fixture scopes verified (test1: {execution_count_test1}, test2: {execution_count_test2})")
""")

result = pytester.runpytest("--load-test", "-n", "1", "-v")
result = run_with_timeout(pytester, "--load-test", "-n", "1", "-v")

# Should stop gracefully
result.stdout.fnmatch_lines(
Expand Down
Loading