22
33import json
44import logging
5+ import weakref
56from contextlib import contextmanager
67from types import ModuleType
78from typing import Any , Iterator
@@ -184,6 +185,10 @@ def __exit__(self, exc_type, exc_val, exc_tb):
184185 self .commit ()
185186 return False
186187
188+ def pipeline (self ):
189+ """Return a mock pipeline context manager for REPLAY mode."""
190+ return MockPipeline (self )
191+
187192
188193class MockCursor :
189194 """Mock cursor for when we can't create a real cursor from base class.
@@ -302,6 +307,27 @@ def set_types(self, types) -> None:
302307 pass
303308
304309
310+ class MockPipeline :
311+ """Mock Pipeline for REPLAY mode.
312+
313+ In REPLAY mode, pipeline operations are no-ops since queries
314+ return mocked data immediately.
315+ """
316+
317+ def __init__ (self , connection : "MockConnection" ):
318+ self ._conn = connection
319+
320+ def __enter__ (self ):
321+ return self
322+
323+ def __exit__ (self , exc_type , exc_val , exc_tb ):
324+ return False
325+
326+ def sync (self ):
327+ """No-op sync for mock pipeline."""
328+ pass
329+
330+
305331class _TracedCopyWrapper :
306332 """Wrapper around psycopg's Copy object to capture data in RECORD mode.
307333
@@ -389,6 +415,8 @@ def __init__(self, enabled: bool = True) -> None:
389415 enabled = enabled ,
390416 )
391417 self ._original_connect = None
418+ # Track pending pipeline spans per connection for deferred finalization
419+ self ._pending_pipeline_spans : weakref .WeakKeyDictionary = weakref .WeakKeyDictionary ()
392420 _instance = self
393421
394422 def patch (self , module : ModuleType ) -> None :
@@ -445,6 +473,51 @@ def patched_connect(*args, **kwargs):
445473 module .connect = patched_connect # type: ignore[attr-defined]
446474 logger .debug ("psycopg.connect instrumented" )
447475
476+ # Patch Pipeline class for pipeline mode support
477+ self ._patch_pipeline_class (module )
478+
479+ def _patch_pipeline_class (self , module : ModuleType ) -> None :
480+ """Patch psycopg.Pipeline to finalize spans on sync/exit."""
481+ try :
482+ from psycopg import Pipeline
483+ except ImportError :
484+ logger .debug ("psycopg.Pipeline not available, skipping pipeline instrumentation" )
485+ return
486+
487+ instrumentation = self
488+
489+ # Store originals for potential unpatch
490+ self ._original_pipeline_sync = getattr (Pipeline , 'sync' , None )
491+ self ._original_pipeline_exit = getattr (Pipeline , '__exit__' , None )
492+
493+ if self ._original_pipeline_sync :
494+ def patched_sync (pipeline_self ):
495+ """Patched Pipeline.sync that finalizes pending spans."""
496+ result = instrumentation ._original_pipeline_sync (pipeline_self )
497+ # _conn is the connection associated with the pipeline
498+ conn = getattr (pipeline_self , '_conn' , None )
499+ if conn :
500+ instrumentation ._finalize_pending_pipeline_spans (conn )
501+ return result
502+
503+ Pipeline .sync = patched_sync
504+ logger .debug ("psycopg.Pipeline.sync instrumented" )
505+
506+ if self ._original_pipeline_exit :
507+ def patched_exit (pipeline_self , exc_type , exc_val , exc_tb ):
508+ """Patched Pipeline.__exit__ that finalizes any remaining spans."""
509+ result = instrumentation ._original_pipeline_exit (
510+ pipeline_self , exc_type , exc_val , exc_tb
511+ )
512+ # Finalize any remaining pending spans (handles implicit sync on exit)
513+ conn = getattr (pipeline_self , '_conn' , None )
514+ if conn :
515+ instrumentation ._finalize_pending_pipeline_spans (conn )
516+ return result
517+
518+ Pipeline .__exit__ = patched_exit
519+ logger .debug ("psycopg.Pipeline.__exit__ instrumented" )
520+
448521 def _create_cursor_factory (self , sdk : TuskDrift , base_factory = None ):
449522 """Create a cursor factory that wraps cursors with instrumentation.
450523
@@ -638,6 +711,9 @@ def _record_execute(
638711 error = None
639712 result = None
640713
714+ # Check if we're in pipeline mode BEFORE executing
715+ in_pipeline_mode = self ._is_in_pipeline_mode (cursor )
716+
641717 with SpanUtils .with_span (span_info ):
642718 try :
643719 result = original_execute (query , params , ** kwargs )
@@ -646,14 +722,24 @@ def _record_execute(
646722 error = e
647723 raise
648724 finally :
649- self ._finalize_query_span (
650- span_info .span ,
651- cursor ,
652- query_str ,
653- params ,
654- error ,
655- )
656- span_info .span .end ()
725+ if error is not None :
726+ # Always finalize immediately on error
727+ self ._finalize_query_span (span_info .span , cursor , query_str , params , error )
728+ span_info .span .end ()
729+ elif in_pipeline_mode :
730+ # Defer finalization until pipeline.sync()
731+ connection = self ._get_connection_from_cursor (cursor )
732+ if connection :
733+ self ._add_pending_pipeline_span (connection , span_info , cursor , query_str , params )
734+ # DON'T end span here - will be ended in _finalize_pending_pipeline_spans
735+ else :
736+ # Fallback: finalize immediately if we can't get connection
737+ self ._finalize_query_span (span_info .span , cursor , query_str , params , None )
738+ span_info .span .end ()
739+ else :
740+ # Normal mode: finalize immediately
741+ self ._finalize_query_span (span_info .span , cursor , query_str , params , None )
742+ span_info .span .end ()
657743
658744 def _traced_executemany (
659745 self , cursor : Any , original_executemany : Any , sdk : TuskDrift , query : str , params_seq , ** kwargs
@@ -1218,6 +1304,71 @@ def _query_to_string(self, query: Any, cursor: Any) -> str:
12181304
12191305 return str (query ) if not isinstance (query , str ) else query
12201306
1307+ def _is_in_pipeline_mode (self , cursor : Any ) -> bool :
1308+ """Check if the cursor's connection is currently in pipeline mode.
1309+
1310+ In psycopg3, when conn.pipeline() is active, connection._pipeline is set.
1311+ """
1312+ try :
1313+ conn = getattr (cursor , 'connection' , None )
1314+ if conn is None :
1315+ return False
1316+ # MockConnection doesn't have real pipeline mode
1317+ if isinstance (conn , MockConnection ):
1318+ return False
1319+ pipeline = getattr (conn , '_pipeline' , None )
1320+ return pipeline is not None
1321+ except Exception :
1322+ return False
1323+
1324+ def _get_connection_from_cursor (self , cursor : Any ) -> Any :
1325+ """Get the connection object from a cursor."""
1326+ return getattr (cursor , 'connection' , None )
1327+
1328+ def _add_pending_pipeline_span (
1329+ self ,
1330+ connection : Any ,
1331+ span_info : Any ,
1332+ cursor : Any ,
1333+ query : str ,
1334+ params : Any ,
1335+ ) -> None :
1336+ """Add a pending span to be finalized when pipeline syncs."""
1337+ if connection not in self ._pending_pipeline_spans :
1338+ self ._pending_pipeline_spans [connection ] = []
1339+
1340+ self ._pending_pipeline_spans [connection ].append ({
1341+ 'span_info' : span_info ,
1342+ 'cursor' : cursor ,
1343+ 'query' : query ,
1344+ 'params' : params ,
1345+ })
1346+ logger .debug (f"[PIPELINE] Deferred span for query: { query [:50 ]} ..." )
1347+
1348+ def _finalize_pending_pipeline_spans (self , connection : Any ) -> None :
1349+ """Finalize all pending spans for a connection after pipeline sync."""
1350+ if connection not in self ._pending_pipeline_spans :
1351+ return
1352+
1353+ pending = self ._pending_pipeline_spans .pop (connection , [])
1354+ logger .debug (f"[PIPELINE] Finalizing { len (pending )} pending pipeline spans" )
1355+
1356+ for item in pending :
1357+ span_info = item ['span_info' ]
1358+ cursor = item ['cursor' ]
1359+ query = item ['query' ]
1360+ params = item ['params' ]
1361+
1362+ try :
1363+ self ._finalize_query_span (span_info .span , cursor , query , params , error = None )
1364+ span_info .span .end ()
1365+ except Exception as e :
1366+ logger .error (f"[PIPELINE] Error finalizing deferred span: { e } " )
1367+ try :
1368+ span_info .span .end ()
1369+ except Exception :
1370+ pass
1371+
12211372 def _try_get_mock (
12221373 self ,
12231374 sdk : TuskDrift ,
0 commit comments