22E2E test fixtures with real PostgreSQL database.
33
44Uses a separate 'test_e2e' schema to isolate test data from production.
5- Tests are skipped if DATABASE_URL is not set or database is unreachable.
5+ Supports two connection modes:
6+ 1. Cloud SQL Connector (CI) - uses INSTANCE_CONNECTION_NAME
7+ 2. Direct connection (local) - uses DATABASE_URL
8+
9+ Tests are skipped if neither is configured or database is unreachable.
610
711Note: These tests must NOT be run with pytest-xdist parallelization
812as multiple workers would conflict on the shared test_e2e schema.
2024
2125
2226TEST_SCHEMA = "test_e2e"
23- CONNECTION_TIMEOUT = 5 # seconds - skip tests if DB unreachable
27+ CONNECTION_TIMEOUT = 10 # seconds - skip tests if DB unreachable
2428
2529# Test data constants
2630TEST_IMAGE_URL = "https://storage.googleapis.com/pyplots-images/test/plot.png"
2731TEST_THUMB_URL = "https://storage.googleapis.com/pyplots-images/test/thumb.png"
2832
33+ # Store Cloud SQL connectors for cleanup (can't attach to engine objects)
34+ _connectors = []
35+
2936
30- def _get_database_url ():
31- """Get DATABASE_URL from environment, loading .env if needed."""
37+ def _get_connection_config ():
38+ """Get database connection config.
39+
40+ Prefers DATABASE_URL for local development (simpler, no event loop issues).
41+ Falls back to Cloud SQL Connector for CI environments.
42+ """
3243 from dotenv import load_dotenv
3344
3445 load_dotenv ()
35- return os .environ .get ("DATABASE_URL" )
46+
47+ # Prefer DATABASE_URL for local development
48+ database_url = os .environ .get ("DATABASE_URL" )
49+ if database_url :
50+ # Ensure async driver
51+ if database_url .startswith ("postgresql://" ):
52+ database_url = database_url .replace ("postgresql://" , "postgresql+asyncpg://" )
53+ elif database_url .startswith ("postgres://" ):
54+ database_url = database_url .replace ("postgres://" , "postgresql+asyncpg://" )
55+ return {"mode" : "direct" , "url" : database_url }
56+
57+ # Fall back to Cloud SQL Connector (for CI)
58+ instance_conn = os .environ .get ("INSTANCE_CONNECTION_NAME" )
59+ db_user = os .environ .get ("DB_USER" )
60+ db_pass = os .environ .get ("DB_PASS" )
61+ db_name = os .environ .get ("DB_NAME" )
62+
63+ if instance_conn and db_user and db_pass and db_name :
64+ return {
65+ "mode" : "cloud_sql" ,
66+ "instance" : instance_conn ,
67+ "user" : db_user ,
68+ "password" : db_pass ,
69+ "database" : db_name ,
70+ }
71+
72+ return None
73+
74+
75+ async def _create_cloud_sql_engine (config , search_path = None ):
76+ """Create async engine using Cloud SQL Connector with asyncpg."""
77+ from google .cloud .sql .connector import Connector , IPTypes
78+
79+ connector = Connector ()
80+ _connectors .append (connector ) # Store for cleanup
81+
82+ async def getconn ():
83+ conn = await connector .connect_async (
84+ config ["instance" ],
85+ "asyncpg" ,
86+ user = config ["user" ],
87+ password = config ["password" ],
88+ db = config ["database" ],
89+ ip_type = IPTypes .PUBLIC ,
90+ )
91+ if search_path :
92+ await conn .execute (f"SET search_path TO { search_path } " )
93+ return conn
94+
95+ return create_async_engine ("postgresql+asyncpg://" , async_creator = getconn , echo = False )
96+
97+
98+ async def _create_direct_engine (url , search_path = None ):
99+ """Create async engine using direct DATABASE_URL connection."""
100+ connect_args = {"timeout" : CONNECTION_TIMEOUT }
101+ if search_path :
102+ connect_args ["server_settings" ] = {"search_path" : search_path }
103+
104+ return create_async_engine (url , echo = False , connect_args = connect_args )
105+
106+
107+ async def _cleanup_connectors ():
108+ """Clean up all Cloud SQL connectors."""
109+ global _connectors
110+ for connector in _connectors :
111+ await connector .close_async ()
112+ _connectors = []
36113
37114
38115@pytest_asyncio .fixture (scope = "function" )
@@ -42,34 +119,38 @@ async def pg_engine():
42119
43120 Creates a separate 'test_e2e' schema to isolate tests from production data.
44121 The schema is dropped and recreated for each test.
45- Skips tests if database is unreachable (e.g., in CI without DB access) .
122+ Skips tests if database is unreachable.
46123 """
47- database_url = _get_database_url ()
48- if not database_url :
49- pytest .skip ("DATABASE_URL not set - skipping PostgreSQL E2E tests" )
124+ config = _get_connection_config ()
125+ if not config :
126+ pytest .skip ("No database configured - skipping PostgreSQL E2E tests" )
50127
51- # First create schema with a temporary engine (no search_path yet)
52- # Use timeout to skip tests if DB is unreachable (e.g., CI without DB access)
53- temp_engine = create_async_engine (database_url , echo = False , connect_args = {"timeout" : CONNECTION_TIMEOUT })
128+ # Create temporary engine for schema setup
54129 try :
130+ if config ["mode" ] == "cloud_sql" :
131+ temp_engine = await _create_cloud_sql_engine (config )
132+ else :
133+ temp_engine = await _create_direct_engine (config ["url" ])
134+
55135 async with asyncio .timeout (CONNECTION_TIMEOUT + 2 ):
56136 async with temp_engine .begin () as conn :
57137 await conn .execute (text (f"DROP SCHEMA IF EXISTS { TEST_SCHEMA } CASCADE" ))
58138 await conn .execute (text (f"CREATE SCHEMA { TEST_SCHEMA } " ))
59- except ( TimeoutError , asyncio . TimeoutError , OSError ) as e :
139+
60140 await temp_engine .dispose ()
141+
142+ except (TimeoutError , asyncio .TimeoutError , OSError ) as e :
143+ await _cleanup_connectors ()
61144 pytest .skip (f"Database unreachable (timeout) - skipping E2E tests: { e } " )
62145 except Exception as e :
63- await temp_engine . dispose ()
146+ await _cleanup_connectors ()
64147 pytest .skip (f"Database connection failed - skipping E2E tests: { e } " )
65- await temp_engine .dispose ()
66148
67- # Create engine with search_path set at connection level (handles pooling correctly)
68- engine = create_async_engine (
69- database_url ,
70- echo = False ,
71- connect_args = {"server_settings" : {"search_path" : TEST_SCHEMA }, "timeout" : CONNECTION_TIMEOUT },
72- )
149+ # Create main engine with search_path set
150+ if config ["mode" ] == "cloud_sql" :
151+ engine = await _create_cloud_sql_engine (config , search_path = TEST_SCHEMA )
152+ else :
153+ engine = await _create_direct_engine (config ["url" ], search_path = TEST_SCHEMA )
73154
74155 # Create tables in test schema
75156 async with engine .begin () as conn :
@@ -80,7 +161,9 @@ async def pg_engine():
80161 # Cleanup: Drop entire test schema
81162 async with engine .begin () as conn :
82163 await conn .execute (text (f"DROP SCHEMA IF EXISTS { TEST_SCHEMA } CASCADE" ))
164+
83165 await engine .dispose ()
166+ await _cleanup_connectors ()
84167
85168
86169@pytest_asyncio .fixture
0 commit comments