|
3 | 3 | import logging |
4 | 4 | import os |
5 | 5 | import re |
| 6 | +from typing import Optional |
6 | 7 | from urllib.parse import parse_qs, unquote, urlencode, urlparse, urlunparse |
7 | 8 |
|
8 | 9 | import pytest |
9 | 10 |
|
10 | 11 | logger = logging.getLogger(__name__) |
11 | 12 |
|
12 | 13 |
|
13 | | -def _worker_schema_name(worker_id: str) -> str | None: |
| 14 | +def _worker_schema_name(worker_id: str) -> Optional[str]: |
14 | 15 | """Return a safe SQL schema name for an xdist worker ID.""" |
15 | 16 | match = re.fullmatch(r"gw(\d+)", worker_id) |
16 | 17 | if match is None: |
17 | 18 | return None |
18 | 19 | return f"test_worker_{match.group(1)}" |
19 | 20 |
|
20 | 21 |
|
21 | | -@pytest.fixture(autouse=True) |
22 | | -def inject_worker_schema_for_sql_tests(monkeypatch, request): |
23 | | - """Automatically inject worker-specific schema into SQL connection string. |
| 22 | +def _build_worker_url(original_url: str, schema_name: str) -> str: |
| 23 | + """Return a copy of original_url with search_path set to schema_name.""" |
| 24 | + parsed = urlparse(original_url) |
| 25 | + query_params = parse_qs(parsed.query) |
| 26 | + |
| 27 | + if "options" in query_params: |
| 28 | + current_options = unquote(query_params["options"][0]) |
| 29 | + new_options = f"{current_options} -csearch_path={schema_name}" |
| 30 | + else: |
| 31 | + new_options = f"-csearch_path={schema_name}" |
| 32 | + |
| 33 | + query_params["options"] = [new_options] |
| 34 | + new_query = urlencode(query_params, doseq=True) |
| 35 | + return urlunparse( |
| 36 | + ( |
| 37 | + parsed.scheme, |
| 38 | + parsed.netloc, |
| 39 | + parsed.path, |
| 40 | + parsed.params, |
| 41 | + new_query, |
| 42 | + parsed.fragment, |
| 43 | + ) |
| 44 | + ) |
| 45 | + |
24 | 46 |
|
25 | | - This fixture enables parallel SQL test execution by giving each pytest-xdist worker its own PostgreSQL schema, |
26 | | - preventing table creation conflicts. |
| 47 | +@pytest.fixture(scope="session") |
| 48 | +def worker_sql_connection(request: pytest.FixtureRequest) -> Optional[str]: |
| 49 | + """Create the worker-specific PostgreSQL schema once per xdist worker session. |
| 50 | +
|
| 51 | + Returns the worker-specific connection URL, or None when schema isolation is not |
| 52 | + needed (serial run or non-PostgreSQL backend). The schema is created with |
| 53 | + ``CREATE SCHEMA IF NOT EXISTS`` so this fixture is safe to run even if the schema |
| 54 | + already exists from a previous interrupted run. |
| 55 | +
|
| 56 | + A non-None return value means "use this URL"; schema creation is attempted but may |
| 57 | + fail silently (e.g. if SQLAlchemy is not installed or the DB is unreachable). Tests |
| 58 | + that depend on the schema will fail at the DB level with a diagnostic error. |
27 | 59 |
|
28 | 60 | """ |
29 | | - # Only apply to SQL tests |
30 | | - if "sql" not in request.node.keywords: |
31 | | - yield |
32 | | - return |
| 61 | + # Avoid touching SQL backends entirely when no SQL tests are collected. |
| 62 | + has_sql_tests = any("sql" in item.keywords for item in request.session.items) |
| 63 | + if not has_sql_tests: |
| 64 | + return None |
33 | 65 |
|
34 | 66 | worker_id = os.environ.get("PYTEST_XDIST_WORKER", "master") |
35 | | - |
36 | 67 | if worker_id == "master": |
37 | | - # Not running in parallel, no schema isolation needed |
38 | | - yield |
39 | | - return |
| 68 | + return None |
40 | 69 |
|
41 | | - # Get the original SQL connection string |
42 | 70 | original_url = os.environ.get("SQLALCHEMY_DATABASE_URL", "sqlite:///:memory:") |
| 71 | + if "postgresql" not in original_url: |
| 72 | + return None |
43 | 73 |
|
44 | | - if "postgresql" in original_url: |
45 | | - # Create worker-specific schema name |
46 | | - schema_name = _worker_schema_name(worker_id) |
47 | | - if schema_name is None: |
48 | | - logger.warning("Unexpected worker ID for SQL schema isolation: %s", worker_id) |
49 | | - yield |
50 | | - return |
51 | | - |
52 | | - # Parse the URL |
53 | | - parsed = urlparse(original_url) |
54 | | - |
55 | | - # Get existing query parameters |
56 | | - query_params = parse_qs(parsed.query) |
57 | | - |
58 | | - # Add or update the options parameter to set search_path |
59 | | - if "options" in query_params: |
60 | | - # Append to existing options |
61 | | - current_options = unquote(query_params["options"][0]) |
62 | | - new_options = f"{current_options} -csearch_path={schema_name}" |
63 | | - else: |
64 | | - # Create new options |
65 | | - new_options = f"-csearch_path={schema_name}" |
66 | | - |
67 | | - query_params["options"] = [new_options] |
68 | | - |
69 | | - # Rebuild the URL with updated query parameters |
70 | | - new_query = urlencode(query_params, doseq=True) |
71 | | - new_url = urlunparse( |
72 | | - ( |
73 | | - parsed.scheme, |
74 | | - parsed.netloc, |
75 | | - parsed.path, |
76 | | - parsed.params, |
77 | | - new_query, |
78 | | - parsed.fragment, |
79 | | - ) |
80 | | - ) |
| 74 | + schema_name = _worker_schema_name(worker_id) |
| 75 | + if schema_name is None: |
| 76 | + logger.warning("Unexpected worker ID for SQL schema isolation: %s", worker_id) |
| 77 | + return None |
| 78 | + |
| 79 | + new_url = _build_worker_url(original_url, schema_name) |
81 | 80 |
|
82 | | - # Override both the environment variable and the module constant |
83 | | - monkeypatch.setenv("SQLALCHEMY_DATABASE_URL", new_url) |
| 81 | + engine = None |
| 82 | + try: |
| 83 | + from sqlalchemy import create_engine, text |
84 | 84 |
|
85 | | - # Also patch the SQL_CONN_STR constant used in tests |
86 | | - import tests.sql_tests.test_sql_core |
| 85 | + engine = create_engine(original_url) |
| 86 | + with engine.connect() as conn: |
| 87 | + conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) |
| 88 | + conn.commit() |
| 89 | + except Exception as e: |
| 90 | + logger.debug("Failed to create schema %s: %s", schema_name, e) |
| 91 | + finally: |
| 92 | + if engine is not None: |
| 93 | + engine.dispose() |
87 | 94 |
|
88 | | - monkeypatch.setattr(tests.sql_tests.test_sql_core, "SQL_CONN_STR", new_url) |
| 95 | + return new_url |
89 | 96 |
|
90 | | - # Ensure schema creation by creating it before tests run |
91 | | - try: |
92 | | - from sqlalchemy import create_engine, text |
93 | 97 |
|
94 | | - # Use original URL to create schema (without search_path) |
95 | | - engine = create_engine(original_url) |
96 | | - with engine.connect() as conn: |
97 | | - conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) |
98 | | - conn.commit() |
99 | | - engine.dispose() |
100 | | - except Exception as e: |
101 | | - # If we can't create the schema, the test will fail anyway |
102 | | - logger.debug(f"Failed to create schema {schema_name}: {e}") |
| 98 | +@pytest.fixture(autouse=True) |
| 99 | +def inject_worker_schema_for_sql_tests(monkeypatch, request, worker_sql_connection): |
| 100 | + """Automatically inject worker-specific schema into SQL connection string. |
| 101 | +
|
| 102 | + This fixture enables parallel SQL test execution by giving each pytest-xdist worker |
| 103 | + its own PostgreSQL schema, preventing table creation conflicts. |
| 104 | +
|
| 105 | + Schema creation is handled once per worker session by |
| 106 | + :func:`worker_sql_connection`. This fixture only performs lightweight |
| 107 | + per-test monkeypatching of the environment variable and module constant. |
| 108 | +
|
| 109 | + """ |
| 110 | + if "sql" not in request.node.keywords or worker_sql_connection is None: |
| 111 | + yield |
| 112 | + return |
| 113 | + |
| 114 | + monkeypatch.setenv("SQLALCHEMY_DATABASE_URL", worker_sql_connection) |
| 115 | + |
| 116 | + import tests.sql_tests.test_sql_core |
| 117 | + |
| 118 | + monkeypatch.setattr(tests.sql_tests.test_sql_core, "SQL_CONN_STR", worker_sql_connection) |
103 | 119 |
|
104 | 120 | yield |
105 | 121 |
|
@@ -193,4 +209,4 @@ def cleanup_test_schemas(request): |
193 | 209 | engine.dispose() |
194 | 210 | except Exception as e: |
195 | 211 | # If cleanup fails, it's not critical |
196 | | - logger.debug(f"Failed to cleanup schema {schema_name}: {e}") |
| 212 | + logger.debug("Failed to cleanup schema %s: %s", schema_name, e) |
0 commit comments