Skip to content

Commit 5224b86

Browse files
authored
fix: resolve stack test failures in replay mode for psycopg/psycopg2 (#53)
1 parent 237849d commit 5224b86

File tree

9 files changed

+1146
-84
lines changed

9 files changed

+1146
-84
lines changed

drift/instrumentation/psycopg/instrumentation.py

Lines changed: 782 additions & 8 deletions
Large diffs are not rendered by default.

drift/instrumentation/psycopg/mocks.py

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,3 +383,231 @@ def __exit__(self, exc_type, exc_val, exc_tb):
383383
def sync(self):
384384
"""No-op sync for mock pipeline."""
385385
pass
386+
387+
388+
# ==================== ASYNC MOCKS ====================
389+
390+
391+
class MockAsyncConnection:
392+
"""Mock async database connection for REPLAY mode when postgres is not available.
393+
394+
Provides minimal async interface for FastAPI/asyncio apps to work without a real database.
395+
All queries are mocked at the cursor.execute() level.
396+
"""
397+
398+
def __init__(
399+
self,
400+
sdk: TuskDrift,
401+
instrumentation: PsycopgInstrumentation,
402+
cursor_factory,
403+
row_factory=None,
404+
):
405+
self.sdk = sdk
406+
self.instrumentation = instrumentation
407+
self.cursor_factory = cursor_factory
408+
self.row_factory = row_factory
409+
self.closed = False
410+
self.autocommit = False
411+
412+
# psycopg3 async connection attributes
413+
self.isolation_level = None
414+
self.encoding = "UTF8"
415+
self.adapters = MockAdapters()
416+
self.pgconn = None
417+
418+
class MockInfo:
419+
vendor = "postgresql"
420+
server_version = 150000
421+
encoding = "UTF8"
422+
423+
def parameter_status(self, param):
424+
if param == "TimeZone":
425+
return "UTC"
426+
elif param == "server_version":
427+
return "15.0"
428+
return None
429+
430+
self.info = MockInfo()
431+
432+
logger.debug("[MOCK_ASYNC_CONNECTION] Created mock async connection for REPLAY mode (psycopg3)")
433+
434+
def cursor(self, name=None, *, cursor_factory=None, **kwargs):
435+
"""Create an async cursor using the instrumented cursor factory."""
436+
cursor = MockAsyncCursor(self)
437+
438+
instrumentation = self.instrumentation
439+
sdk = self.sdk
440+
441+
async def mock_execute(query, params=None, **kwargs):
442+
# Use async execute handler
443+
async def noop_execute(q, p, **kw):
444+
return cursor
445+
446+
return await instrumentation._traced_async_execute(cursor, noop_execute, sdk, query, params, **kwargs)
447+
448+
async def mock_executemany(query, params_seq, **kwargs):
449+
async def noop_executemany(q, ps, **kw):
450+
return cursor
451+
452+
return await instrumentation._traced_async_executemany(
453+
cursor, noop_executemany, sdk, query, params_seq, **kwargs
454+
)
455+
456+
cursor.execute = mock_execute # type: ignore[method-assign]
457+
cursor.executemany = mock_executemany # type: ignore[method-assign]
458+
459+
logger.debug("[MOCK_ASYNC_CONNECTION] Created async cursor (psycopg3)")
460+
return cursor
461+
462+
async def commit(self):
463+
"""Mock async commit - no-op in REPLAY mode."""
464+
logger.debug("[MOCK_ASYNC_CONNECTION] commit() called (no-op)")
465+
pass
466+
467+
async def rollback(self):
468+
"""Mock async rollback - no-op in REPLAY mode."""
469+
logger.debug("[MOCK_ASYNC_CONNECTION] rollback() called (no-op)")
470+
pass
471+
472+
async def close(self):
473+
"""Mock async close - no-op in REPLAY mode."""
474+
logger.debug("[MOCK_ASYNC_CONNECTION] close() called (no-op)")
475+
self.closed = True
476+
477+
async def __aenter__(self):
478+
return self
479+
480+
async def __aexit__(self, exc_type, exc_val, exc_tb):
481+
if exc_type is not None:
482+
await self.rollback()
483+
else:
484+
await self.commit()
485+
return False
486+
487+
def transaction(self):
488+
"""Return a mock async transaction context manager for REPLAY mode."""
489+
return MockAsyncTransaction(self)
490+
491+
def pipeline(self):
492+
"""Return a mock async pipeline context manager for REPLAY mode."""
493+
return MockAsyncPipeline(self)
494+
495+
496+
class MockAsyncCursor:
497+
"""Mock async cursor for when we can't create a real async cursor.
498+
499+
This is a fallback when the async connection is completely mocked.
500+
"""
501+
502+
def __init__(self, connection):
503+
self.connection = connection
504+
self.rowcount = -1
505+
self._tusk_description = None
506+
self.arraysize = 1
507+
self._mock_rows = []
508+
self._mock_index = 0
509+
self._mock_result_sets = []
510+
self._mock_result_set_index = 0
511+
self.adapters = MockAdapters()
512+
logger.debug("[MOCK_ASYNC_CURSOR] Created fallback mock async cursor (psycopg3)")
513+
514+
@property
515+
def description(self):
516+
return self._tusk_description
517+
518+
@property
519+
def rownumber(self):
520+
if self._mock_rows:
521+
return self._mock_index
522+
return None
523+
524+
@property
525+
def statusmessage(self):
526+
return getattr(self, "_mock_statusmessage", None)
527+
528+
async def execute(self, query, params=None, **kwargs):
529+
"""Will be replaced by instrumentation."""
530+
logger.debug(f"[MOCK_ASYNC_CURSOR] execute() called: {query[:100]}")
531+
return self
532+
533+
async def executemany(self, query, params_seq, **kwargs):
534+
"""Will be replaced by instrumentation."""
535+
logger.debug(f"[MOCK_ASYNC_CURSOR] executemany() called: {query[:100]}")
536+
return self
537+
538+
async def fetchone(self):
539+
if self._mock_index < len(self._mock_rows):
540+
row = self._mock_rows[self._mock_index]
541+
self._mock_index += 1
542+
return tuple(row) if isinstance(row, list) else row
543+
return None
544+
545+
async def fetchmany(self, size=None):
546+
if size is None:
547+
size = self.arraysize
548+
rows = []
549+
for _ in range(size):
550+
row = await self.fetchone()
551+
if row is None:
552+
break
553+
rows.append(row)
554+
return rows
555+
556+
async def fetchall(self):
557+
rows = self._mock_rows[self._mock_index :]
558+
self._mock_index = len(self._mock_rows)
559+
return [tuple(row) if isinstance(row, list) else row for row in rows]
560+
561+
def __aiter__(self):
562+
return self
563+
564+
async def __anext__(self):
565+
if self._mock_index < len(self._mock_rows):
566+
row = self._mock_rows[self._mock_index]
567+
self._mock_index += 1
568+
return tuple(row) if isinstance(row, list) else row
569+
raise StopAsyncIteration
570+
571+
async def close(self):
572+
pass
573+
574+
async def __aenter__(self):
575+
return self
576+
577+
async def __aexit__(self, exc_type, exc_val, exc_tb):
578+
await self.close()
579+
return False
580+
581+
582+
class MockAsyncTransaction:
583+
"""Mock async transaction context manager for REPLAY mode."""
584+
585+
def __init__(self, connection: MockAsyncConnection):
586+
self._conn = connection
587+
588+
async def __aenter__(self):
589+
return self
590+
591+
async def __aexit__(self, exc_type, exc_val, exc_tb):
592+
if exc_type is not None:
593+
await self._conn.rollback()
594+
else:
595+
await self._conn.commit()
596+
return False
597+
598+
599+
class MockAsyncPipeline:
600+
"""Mock async Pipeline for REPLAY mode."""
601+
602+
def __init__(self, connection: MockAsyncConnection):
603+
self._conn = connection
604+
605+
async def __aenter__(self):
606+
return self
607+
608+
async def __aexit__(self, exc_type, exc_val, exc_tb):
609+
return False
610+
611+
async def sync(self):
612+
"""No-op async sync for mock pipeline."""
613+
pass

drift/instrumentation/psycopg2/instrumentation.py

Lines changed: 84 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ class MockCursor:
150150
"""Mock cursor for when we can't create a real cursor from base class.
151151
152152
This is a fallback when the connection is completely mocked.
153+
Provides all attributes that psycopg2 cursors have to ensure compatibility
154+
with frameworks like Django that access cursor properties.
153155
"""
154156

155157
def __init__(self, connection):
@@ -159,6 +161,14 @@ def __init__(self, connection):
159161
self.arraysize = 1
160162
self._mock_rows = []
161163
self._mock_index = 0
164+
# psycopg2 cursor attributes that Django/Flask may access
165+
self.query = None # Last executed query string
166+
self.statusmessage = None # Status message from last command
167+
self.lastrowid = None # OID of last inserted row (if applicable)
168+
self.closed = False
169+
self.name = None # Server-side cursor name (None for client-side)
170+
self.scrollable = None
171+
self.withhold = False
162172
logger.debug("[MOCK_CURSOR] Created fallback mock cursor")
163173

164174
def execute(self, query: Any, vars: Any = None) -> None:
@@ -210,6 +220,58 @@ def __init__(self, connection: Any, instrumentation: Psycopg2Instrumentation, sd
210220

211221
def cursor(self, name: str | None = None, cursor_factory: Any = None, *args: Any, **kwargs: Any) -> Any:
212222
"""Intercept cursor creation to wrap user-provided cursor_factory."""
223+
# In REPLAY mode, use MockCursor to have full control over cursor state
224+
# This is necessary because psycopg2's cursor.description is a read-only
225+
# C-level property that cannot be set after the cursor is created
226+
if self._sdk.mode == TuskDriftMode.REPLAY:
227+
cursor = MockCursor(self)
228+
instrumentation = self._instrumentation
229+
sdk = self._sdk
230+
231+
# Detect if user wants dict-style cursors (RealDictCursor, DictCursor)
232+
is_dict_cursor = False
233+
effective_cursor_factory = cursor_factory if cursor_factory is not None else self._default_cursor_factory
234+
if effective_cursor_factory is not None:
235+
try:
236+
import psycopg2.extras
237+
238+
if effective_cursor_factory in (
239+
psycopg2.extras.RealDictCursor,
240+
psycopg2.extras.DictCursor,
241+
) or (
242+
isinstance(effective_cursor_factory, type)
243+
and issubclass(
244+
effective_cursor_factory, (psycopg2.extras.RealDictCursor, psycopg2.extras.DictCursor)
245+
)
246+
):
247+
is_dict_cursor = True
248+
except (ImportError, AttributeError):
249+
pass
250+
251+
# Store cursor type info on the cursor for _mock_execute_with_data
252+
cursor._is_dict_cursor = is_dict_cursor # type: ignore[attr-defined]
253+
254+
def mock_execute(query, vars=None):
255+
def noop_execute(q, v):
256+
return None
257+
258+
return instrumentation._traced_execute(cursor, noop_execute, sdk, query, vars)
259+
260+
def mock_executemany(query, vars_list):
261+
def noop_executemany(q, vl):
262+
return None
263+
264+
return instrumentation._traced_executemany(cursor, noop_executemany, sdk, query, vars_list)
265+
266+
cursor.execute = mock_execute # type: ignore[method-assign]
267+
cursor.executemany = mock_executemany # type: ignore[method-assign]
268+
269+
logger.debug(
270+
f"[INSTRUMENTED_CONNECTION] Created MockCursor for REPLAY mode (is_dict_cursor={is_dict_cursor})"
271+
)
272+
return cursor
273+
274+
# In RECORD mode, use real cursor with instrumentation
213275
# Use cursor_factory from cursor() call, or fall back to connection's default
214276
base_factory = cursor_factory if cursor_factory is not None else self._default_cursor_factory
215277
# Create instrumented cursor factory (wrapping the base factory)
@@ -493,6 +555,9 @@ def _replay_execute(self, cursor: Any, sdk: TuskDrift, query_str: str, params: A
493555
logger.warning("[PSYCOPG2_REPLAY] No mock found for pre-app-start query, returning empty result")
494556
empty_mock = {"rowcount": 0, "rows": [], "description": None}
495557
self._mock_execute_with_data(cursor, empty_mock)
558+
# Set cursor.query to the executed query (psycopg2 cursor attribute)
559+
if hasattr(cursor, "query"):
560+
cursor.query = query_str.encode("utf-8") if isinstance(query_str, str) else query_str
496561
span_info.span.end()
497562
return None
498563

@@ -503,6 +568,9 @@ def _replay_execute(self, cursor: Any, sdk: TuskDrift, query_str: str, params: A
503568
)
504569

505570
self._mock_execute_with_data(cursor, mock_result)
571+
# Set cursor.query to the executed query (psycopg2 cursor attribute)
572+
if hasattr(cursor, "query"):
573+
cursor.query = query_str.encode("utf-8") if isinstance(query_str, str) else query_str
506574
span_info.span.end()
507575
return None
508576

@@ -621,6 +689,9 @@ def _replay_executemany(self, cursor: Any, sdk: TuskDrift, query_str: str, param
621689
)
622690
empty_mock = {"rowcount": 0, "rows": [], "description": None}
623691
self._mock_execute_with_data(cursor, empty_mock)
692+
# Set cursor.query to the executed query (psycopg2 cursor attribute)
693+
if hasattr(cursor, "query"):
694+
cursor.query = query_str.encode("utf-8") if isinstance(query_str, str) else query_str
624695
span_info.span.end()
625696
return None
626697

@@ -631,6 +702,9 @@ def _replay_executemany(self, cursor: Any, sdk: TuskDrift, query_str: str, param
631702
)
632703

633704
self._mock_execute_with_data(cursor, mock_result)
705+
# Set cursor.query to the executed query (psycopg2 cursor attribute)
706+
if hasattr(cursor, "query"):
707+
cursor.query = query_str.encode("utf-8") if isinstance(query_str, str) else query_str
634708
span_info.span.end()
635709
return None
636710

@@ -777,15 +851,17 @@ def _mock_execute_with_data(self, cursor: Any, mock_data: dict[str, Any]) -> Non
777851
# Deserialize datetime strings back to datetime objects for consistent Flask/Django serialization
778852
mock_rows = [deserialize_db_value(row) for row in mock_rows]
779853

780-
# Check if this is a dict-cursor (like RealDictCursor) by checking if cursor class
781-
# inherits from a dict-returning cursor type
782-
is_dict_cursor = False
783-
try:
784-
import psycopg2.extras
854+
# Check if this is a dict-cursor (like RealDictCursor)
855+
# First check if cursor has _is_dict_cursor attribute (set by InstrumentedConnection.cursor())
856+
# Then fall back to isinstance check for real dict cursors
857+
is_dict_cursor = getattr(cursor, "_is_dict_cursor", False)
858+
if not is_dict_cursor:
859+
try:
860+
import psycopg2.extras
785861

786-
is_dict_cursor = isinstance(cursor, (psycopg2.extras.RealDictCursor, psycopg2.extras.DictCursor))
787-
except (ImportError, AttributeError):
788-
pass
862+
is_dict_cursor = isinstance(cursor, (psycopg2.extras.RealDictCursor, psycopg2.extras.DictCursor))
863+
except (ImportError, AttributeError):
864+
pass
789865

790866
# If it's a dict cursor and we have description, convert rows to dicts
791867
if is_dict_cursor and description_data:

0 commit comments

Comments
 (0)