22E2E test fixtures with real PostgreSQL database.
33
44Uses a separate 'test_e2e' schema to isolate test data from production.
5- Supports two connection modes:
6- 1. Cloud SQL Connector (CI) - uses INSTANCE_CONNECTION_NAME
7- 2. Direct connection (local) - uses DATABASE_URL
5+ Tests are skipped if DATABASE_URL is not set or database is unreachable.
86
9- Tests are skipped if neither is configured or database is unreachable.
7+ Connection modes:
8+ - Local: Direct DATABASE_URL from .env
9+ - CI: DATABASE_URL via Cloud SQL Proxy (localhost:5432 -> Cloud SQL)
1010
1111Note: These tests must NOT be run with pytest-xdist parallelization
1212as multiple workers would conflict on the shared test_e2e schema.
3030TEST_IMAGE_URL = "https://storage.googleapis.com/pyplots-images/test/plot.png"
3131TEST_THUMB_URL = "https://storage.googleapis.com/pyplots-images/test/thumb.png"
3232
33- # Store Cloud SQL connectors for cleanup (can't attach to engine objects)
34- _connectors = []
3533
36-
37- def _get_connection_config ():
38- """Get database connection config.
39-
40- Prefers DATABASE_URL for local development (simpler, no event loop issues).
41- Falls back to Cloud SQL Connector for CI environments.
42- """
34+ def _get_database_url ():
35+ """Get DATABASE_URL from environment, loading .env if needed."""
4336 from dotenv import load_dotenv
4437
4538 load_dotenv ()
4639
47- # Prefer DATABASE_URL for local development
4840 database_url = os .environ .get ("DATABASE_URL" )
49- if database_url :
50- # Ensure async driver
51- if database_url .startswith ("postgresql://" ):
52- database_url = database_url .replace ("postgresql://" , "postgresql+asyncpg://" )
53- elif database_url .startswith ("postgres://" ):
54- database_url = database_url .replace ("postgres://" , "postgresql+asyncpg://" )
55- return {"mode" : "direct" , "url" : database_url }
56-
57- # Fall back to Cloud SQL Connector (for CI)
58- instance_conn = os .environ .get ("INSTANCE_CONNECTION_NAME" )
59- db_user = os .environ .get ("DB_USER" )
60- db_pass = os .environ .get ("DB_PASS" )
61- db_name = os .environ .get ("DB_NAME" )
62-
63- if instance_conn and db_user and db_pass and db_name :
64- return {
65- "mode" : "cloud_sql" ,
66- "instance" : instance_conn ,
67- "user" : db_user ,
68- "password" : db_pass ,
69- "database" : db_name ,
70- }
71-
72- return None
73-
74-
75- async def _create_cloud_sql_engine (config , search_path = None ):
76- """Create async engine using Cloud SQL Connector with asyncpg."""
77- from google .cloud .sql .connector import Connector , IPTypes
78-
79- connector = Connector ()
80- _connectors .append (connector ) # Store for cleanup
81-
82- async def getconn ():
83- conn = await connector .connect_async (
84- config ["instance" ],
85- "asyncpg" ,
86- user = config ["user" ],
87- password = config ["password" ],
88- db = config ["database" ],
89- ip_type = IPTypes .PUBLIC ,
90- )
91- if search_path :
92- await conn .execute (f"SET search_path TO { search_path } " )
93- return conn
94-
95- return create_async_engine ("postgresql+asyncpg://" , async_creator = getconn , echo = False )
96-
97-
98- async def _create_direct_engine (url , search_path = None ):
99- """Create async engine using direct DATABASE_URL connection."""
100- connect_args = {"timeout" : CONNECTION_TIMEOUT }
101- if search_path :
102- connect_args ["server_settings" ] = {"search_path" : search_path }
103-
104- return create_async_engine (url , echo = False , connect_args = connect_args )
105-
106-
107- async def _cleanup_connectors ():
108- """Clean up all Cloud SQL connectors."""
109- global _connectors
110- for connector in _connectors :
111- await connector .close_async ()
112- _connectors = []
41+ if not database_url :
42+ return None
43+
44+ # Ensure async driver
45+ if database_url .startswith ("postgresql://" ):
46+ database_url = database_url .replace ("postgresql://" , "postgresql+asyncpg://" )
47+ elif database_url .startswith ("postgres://" ):
48+ database_url = database_url .replace ("postgres://" , "postgresql+asyncpg://" )
49+
50+ return database_url
11351
11452
11553@pytest_asyncio .fixture (scope = "function" )
@@ -121,36 +59,31 @@ async def pg_engine():
12159 The schema is dropped and recreated for each test.
12260 Skips tests if database is unreachable.
12361 """
124- config = _get_connection_config ()
125- if not config :
126- pytest .skip ("No database configured - skipping PostgreSQL E2E tests" )
62+ database_url = _get_database_url ()
63+ if not database_url :
64+ pytest .skip ("DATABASE_URL not set - skipping PostgreSQL E2E tests" )
12765
12866 # Create temporary engine for schema setup
67+ temp_engine = create_async_engine (database_url , echo = False , connect_args = {"timeout" : CONNECTION_TIMEOUT })
12968 try :
130- if config ["mode" ] == "cloud_sql" :
131- temp_engine = await _create_cloud_sql_engine (config )
132- else :
133- temp_engine = await _create_direct_engine (config ["url" ])
134-
13569 async with asyncio .timeout (CONNECTION_TIMEOUT + 2 ):
13670 async with temp_engine .begin () as conn :
13771 await conn .execute (text (f"DROP SCHEMA IF EXISTS { TEST_SCHEMA } CASCADE" ))
13872 await conn .execute (text (f"CREATE SCHEMA { TEST_SCHEMA } " ))
139-
14073 await temp_engine .dispose ()
141-
14274 except (TimeoutError , asyncio .TimeoutError , OSError ) as e :
143- await _cleanup_connectors ()
75+ await temp_engine . dispose ()
14476 pytest .skip (f"Database unreachable (timeout) - skipping E2E tests: { e } " )
14577 except Exception as e :
146- await _cleanup_connectors ()
78+ await temp_engine . dispose ()
14779 pytest .skip (f"Database connection failed - skipping E2E tests: { e } " )
14880
149- # Create main engine with search_path set
150- if config ["mode" ] == "cloud_sql" :
151- engine = await _create_cloud_sql_engine (config , search_path = TEST_SCHEMA )
152- else :
153- engine = await _create_direct_engine (config ["url" ], search_path = TEST_SCHEMA )
81+ # Create main engine with search_path set at connection level
82+ engine = create_async_engine (
83+ database_url ,
84+ echo = False ,
85+ connect_args = {"server_settings" : {"search_path" : TEST_SCHEMA }, "timeout" : CONNECTION_TIMEOUT },
86+ )
15487
15588 # Create tables in test schema
15689 async with engine .begin () as conn :
@@ -161,9 +94,7 @@ async def pg_engine():
16194 # Cleanup: Drop entire test schema
16295 async with engine .begin () as conn :
16396 await conn .execute (text (f"DROP SCHEMA IF EXISTS { TEST_SCHEMA } CASCADE" ))
164-
16597 await engine .dispose ()
166- await _cleanup_connectors ()
16798
16899
169100@pytest_asyncio .fixture
0 commit comments