Skip to content

Commit 37574b9

Browse files
committed
opentelemetry-instrumentation-dbapi: instrument commit and rollback
1 parent 1cd2554 commit 37574b9

14 files changed

Lines changed: 389 additions & 43 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5959
- `opentelemetry-instrumentation-sqlalchemy`: implement new semantic convention opt-in migration
6060
([#4110](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4110))
6161

62+
- `opentelemetry-instrumentation-dbapi`: Add instrumentation for `commit()` and `rollback()` transaction operations
63+
([#3964](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3964))
64+
- `opentelemetry-instrumentation-dbapi`: Add `enable_transaction_spans` configuration flag to control transaction span creation (default: `True`)
65+
([#3964](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3964))
66+
- `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-mysqlclient`, `opentelemetry-instrumentation-psycopg`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-sqlite3`, `opentelemetry-instrumentation-pymssql`: Add support for transaction span instrumentation via `enable_transaction_spans` parameter
67+
([#3964](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3964))
68+
6269
### Fixed
6370

6471
- `opentelemetry-docker-tests`: Replace deprecated `SpanAttributes` from `opentelemetry.semconv.trace` with `opentelemetry.semconv._incubating.attributes`

instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py

Lines changed: 131 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ def trace_integration(
227227
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
228228
enable_attribute_commenter: bool = False,
229229
commenter_options: dict[str, Any] | None = None,
230+
enable_transaction_spans: bool = True,
230231
):
231232
"""Integrate with DB API library.
232233
https://www.python.org/dev/peps/pep-0249/
@@ -246,6 +247,7 @@ def trace_integration(
246247
default one is used.
247248
enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` span attribute. Only available if enable_commenter=True.
248249
commenter_options: Configurations for tags to be appended at the sql query.
250+
enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True.
249251
"""
250252
wrap_connect(
251253
__name__,
@@ -260,6 +262,7 @@ def trace_integration(
260262
db_api_integration_factory=db_api_integration_factory,
261263
enable_attribute_commenter=enable_attribute_commenter,
262264
commenter_options=commenter_options,
265+
enable_transaction_spans=enable_transaction_spans,
263266
)
264267

265268

@@ -276,6 +279,7 @@ def wrap_connect(
276279
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
277280
commenter_options: dict[str, Any] | None = None,
278281
enable_attribute_commenter: bool = False,
282+
enable_transaction_spans: bool = True,
279283
):
280284
"""Integrate with DB API library.
281285
https://www.python.org/dev/peps/pep-0249/
@@ -295,6 +299,7 @@ def wrap_connect(
295299
default one is used.
296300
commenter_options: Configurations for tags to be appended at the sql query.
297301
enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` span attribute. Only available if enable_commenter=True.
302+
enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True.
298303
299304
"""
300305
db_api_integration_factory = (
@@ -319,6 +324,7 @@ def wrap_connect_(
319324
commenter_options=commenter_options,
320325
connect_module=connect_module,
321326
enable_attribute_commenter=enable_attribute_commenter,
327+
enable_transaction_spans=enable_transaction_spans,
322328
)
323329
return db_integration.wrapped_connection(wrapped, args, kwargs)
324330

@@ -356,6 +362,7 @@ def instrument_connection(
356362
connect_module: Callable[..., Any] | None = None,
357363
enable_attribute_commenter: bool = False,
358364
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
365+
enable_transaction_spans: bool = True,
359366
) -> TracedConnectionProxy[ConnectionT]:
360367
"""Enable instrumentation in a database connection.
361368
@@ -377,6 +384,7 @@ def instrument_connection(
377384
replacement for :class:`DatabaseApiIntegration`. Can be used to
378385
obtain connection attributes from the connect method instead of
379386
from the connection itself (as done by the pymssql intrumentor).
387+
enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True.
380388
381389
Returns:
382390
An instrumented connection.
@@ -400,6 +408,7 @@ def instrument_connection(
400408
commenter_options=commenter_options,
401409
connect_module=connect_module,
402410
enable_attribute_commenter=enable_attribute_commenter,
411+
enable_transaction_spans=enable_transaction_spans,
403412
)
404413
db_integration.get_connection_attributes(connection)
405414
return get_traced_connection_proxy(connection, db_integration)
@@ -436,6 +445,7 @@ def __init__(
436445
commenter_options: dict[str, Any] | None = None,
437446
connect_module: Callable[..., Any] | None = None,
438447
enable_attribute_commenter: bool = False,
448+
enable_transaction_spans: bool = True,
439449
):
440450
if connection_attributes is None:
441451
self.connection_attributes = {
@@ -458,6 +468,7 @@ def __init__(
458468
self.enable_commenter = enable_commenter
459469
self.commenter_options = commenter_options
460470
self.enable_attribute_commenter = enable_attribute_commenter
471+
self.enable_transaction_spans = enable_transaction_spans
461472
self.database_system = database_system
462473
self.connection_props: dict[str, Any] = {}
463474
self.span_attributes: dict[str, Any] = {}
@@ -573,6 +584,17 @@ def get_connection_attributes(self, connection: object) -> None:
573584
if port is not None:
574585
self.span_attributes[NET_PEER_PORT] = port
575586

587+
def populate_common_span_attributes(self, span: trace_api.Span) -> None:
588+
"""Populate span with common database connection attributes."""
589+
if not span.is_recording():
590+
return
591+
592+
span.set_attribute(DB_SYSTEM, self.database_system)
593+
span.set_attribute(DB_NAME, self.database)
594+
595+
for attribute_key, attribute_value in self.span_attributes.items():
596+
span.set_attribute(attribute_key, attribute_value)
597+
576598

577599
# pylint: disable=abstract-method,no-member
578600
class TracedConnectionProxy(BaseObjectProxy, Generic[ConnectionT]):
@@ -581,23 +603,54 @@ def __init__(
581603
self,
582604
connection: ConnectionT,
583605
db_api_integration: DatabaseApiIntegration | None = None,
606+
wrap_cursors: bool = True,
584607
):
585608
BaseObjectProxy.__init__(self, connection)
586609
self._self_db_api_integration = db_api_integration
610+
self._self_wrap_cursors = wrap_cursors
587611

588612
def __getattribute__(self, name: str):
589-
if object.__getattribute__(self, name):
613+
# Try to get the attribute from the proxy first
614+
try:
590615
return object.__getattribute__(self, name)
591-
592-
return object.__getattribute__(
593-
object.__getattribute__(self, "_connection"), name
594-
)
616+
except AttributeError:
617+
# If not found on proxy, try the wrapped connection
618+
return object.__getattribute__(
619+
object.__getattribute__(self, "__wrapped__"), name
620+
)
595621

596622
def cursor(self, *args: Any, **kwargs: Any):
597-
return get_traced_cursor_proxy(
598-
self.__wrapped__.cursor(*args, **kwargs),
599-
self._self_db_api_integration,
600-
)
623+
cursor = self.__wrapped__.cursor(*args, **kwargs)
624+
625+
# For databases like psycopg/psycopg2 that use cursor_factory,
626+
# cursor tracing is already handled by the factory, so skip wrapping
627+
if not self._self_wrap_cursors:
628+
return cursor
629+
630+
# For standard dbapi connections, wrap the cursor
631+
return get_traced_cursor_proxy(cursor, self._self_db_api_integration)
632+
633+
def _traced_tx_operation(
634+
self, operation_name: str, operation_method: Callable[[], None]
635+
) -> None:
636+
"""Execute a traced transaction operation (commit, rollback)."""
637+
if not is_instrumentation_enabled():
638+
return operation_method()
639+
640+
if not self._self_db_api_integration.enable_transaction_spans:
641+
return operation_method()
642+
643+
with self._self_db_api_integration._tracer.start_as_current_span(
644+
operation_name, kind=trace_api.SpanKind.CLIENT
645+
) as span:
646+
self._self_db_api_integration.populate_common_span_attributes(span)
647+
return operation_method()
648+
649+
def commit(self):
650+
return self._traced_tx_operation("COMMIT", self.__wrapped__.commit)
651+
652+
def rollback(self):
653+
return self._traced_tx_operation("ROLLBACK", self.__wrapped__.rollback)
601654

602655
def __enter__(self):
603656
self.__wrapped__.__enter__()
@@ -607,13 +660,78 @@ def __exit__(self, *args: Any, **kwargs: Any):
607660
self.__wrapped__.__exit__(*args, **kwargs)
608661

609662

663+
class AsyncTracedConnectionProxy(TracedConnectionProxy[ConnectionT]):
664+
async def _traced_tx_operation_async(
665+
self, operation_name: str, operation_method: Callable[[], Awaitable[None]]
666+
) -> None:
667+
"""Execute a traced async transaction operation (commit, rollback)."""
668+
if not is_instrumentation_enabled():
669+
return await operation_method()
670+
671+
if not self._self_db_api_integration.enable_transaction_spans:
672+
return await operation_method()
673+
674+
with self._self_db_api_integration._tracer.start_as_current_span(
675+
operation_name, kind=trace_api.SpanKind.CLIENT
676+
) as span:
677+
self._self_db_api_integration.populate_common_span_attributes(span)
678+
return await operation_method()
679+
680+
async def commit(self):
681+
"""Async commit for async connections (e.g., psycopg.AsyncConnection)."""
682+
return await self._traced_tx_operation_async("COMMIT", self.__wrapped__.commit)
683+
684+
async def rollback(self):
685+
"""Async rollback for async connections (e.g., psycopg.AsyncConnection)."""
686+
return await self._traced_tx_operation_async("ROLLBACK", self.__wrapped__.rollback)
687+
688+
# Async context manager support
689+
async def __aenter__(self):
690+
if hasattr(self.__wrapped__, "__aenter__"):
691+
await self.__wrapped__.__aenter__()
692+
return self
693+
694+
async def __aexit__(self, *args: Any, **kwargs: Any):
695+
if hasattr(self.__wrapped__, "__aexit__"):
696+
return await self.__wrapped__.__aexit__(*args, **kwargs)
697+
698+
610699
def get_traced_connection_proxy(
611700
connection: ConnectionT,
612701
db_api_integration: DatabaseApiIntegration | None,
702+
wrap_cursors: bool = True,
613703
*args: Any,
614704
**kwargs: Any,
615705
) -> TracedConnectionProxy[ConnectionT]:
616-
return TracedConnectionProxy(connection, db_api_integration)
706+
"""Get a traced connection proxy for sync connections.
707+
708+
Args:
709+
connection: The database connection to wrap.
710+
db_api_integration: The database API integration instance.
711+
wrap_cursors: Whether to wrap cursors returned by connection.cursor().
712+
Set to False for databases like psycopg/psycopg2 that handle cursor
713+
tracing via cursor_factory. Defaults to True.
714+
"""
715+
return TracedConnectionProxy(connection, db_api_integration, wrap_cursors)
716+
717+
718+
def get_traced_async_connection_proxy(
719+
connection: ConnectionT,
720+
db_api_integration: DatabaseApiIntegration | None,
721+
wrap_cursors: bool = True,
722+
*args: Any,
723+
**kwargs: Any,
724+
) -> AsyncTracedConnectionProxy[ConnectionT]:
725+
"""Get a traced connection proxy for async connections.
726+
727+
Args:
728+
connection: The async database connection to wrap.
729+
db_api_integration: The database API integration instance.
730+
wrap_cursors: Whether to wrap cursors returned by connection.cursor().
731+
Set to False for databases like psycopg/psycopg2 that handle cursor
732+
tracing via cursor_factory. Defaults to True.
733+
"""
734+
return AsyncTracedConnectionProxy(connection, db_api_integration, wrap_cursors)
617735

618736

619737
class CursorTracer(Generic[CursorT]):
@@ -698,17 +816,12 @@ def _populate_span(
698816
):
699817
if not span.is_recording():
700818
return
819+
820+
self._db_api_integration.populate_common_span_attributes(span)
821+
701822
statement = self.get_statement(cursor, args)
702-
span.set_attribute(DB_SYSTEM, self._db_api_integration.database_system)
703-
span.set_attribute(DB_NAME, self._db_api_integration.database)
704823
span.set_attribute(DB_STATEMENT, statement)
705824

706-
for (
707-
attribute_key,
708-
attribute_value,
709-
) in self._db_api_integration.span_attributes.items():
710-
span.set_attribute(attribute_key, attribute_value)
711-
712825
if self._db_api_integration.capture_parameters and len(args) > 1:
713826
span.set_attribute("db.statement.parameters", str(args[1]))
714827

instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,62 @@ def test_callproc(self):
10611061
"Test stored procedure",
10621062
)
10631063

1064+
def test_commit(self):
1065+
db_integration = dbapi.DatabaseApiIntegration(
1066+
"instrumenting_module_test_name", "testcomponent"
1067+
)
1068+
mock_connection = db_integration.wrapped_connection(
1069+
mock_connect, {}, {}
1070+
)
1071+
mock_connection.commit()
1072+
spans_list = self.memory_exporter.get_finished_spans()
1073+
self.assertEqual(len(spans_list), 1)
1074+
span = spans_list[0]
1075+
self.assertEqual(span.name, "COMMIT")
1076+
1077+
def test_rollback(self):
1078+
db_integration = dbapi.DatabaseApiIntegration(
1079+
"instrumenting_module_test_name", "testcomponent"
1080+
)
1081+
mock_connection = db_integration.wrapped_connection(
1082+
mock_connect, {}, {}
1083+
)
1084+
mock_connection.rollback()
1085+
spans_list = self.memory_exporter.get_finished_spans()
1086+
self.assertEqual(len(spans_list), 1)
1087+
span = spans_list[0]
1088+
self.assertEqual(span.name, "ROLLBACK")
1089+
1090+
def test_commit_with_suppress_instrumentation(self):
1091+
"""Test that commit doesn't create a span when instrumentation is suppressed"""
1092+
db_integration = dbapi.DatabaseApiIntegration(
1093+
"instrumenting_module_test_name",
1094+
"testcomponent",
1095+
)
1096+
mock_connection = db_integration.wrapped_connection(
1097+
mock_connect, {}, {}
1098+
)
1099+
with suppress_instrumentation():
1100+
mock_connection.commit()
1101+
1102+
spans_list = self.memory_exporter.get_finished_spans()
1103+
self.assertEqual(len(spans_list), 0)
1104+
1105+
def test_rollback_with_suppress_instrumentation(self):
1106+
"""Test that rollback doesn't create a span when instrumentation is suppressed"""
1107+
db_integration = dbapi.DatabaseApiIntegration(
1108+
"instrumenting_module_test_name",
1109+
"testcomponent",
1110+
)
1111+
mock_connection = db_integration.wrapped_connection(
1112+
mock_connect, {}, {}
1113+
)
1114+
with suppress_instrumentation():
1115+
mock_connection.rollback()
1116+
1117+
spans_list = self.memory_exporter.get_finished_spans()
1118+
self.assertEqual(len(spans_list), 0)
1119+
10641120
@mock.patch("opentelemetry.instrumentation.dbapi")
10651121
def test_wrap_connect(self, mock_dbapi):
10661122
dbapi.wrap_connect(self.tracer, mock_dbapi, "connect", "-")
@@ -1298,6 +1354,14 @@ def __init__(self, database, server_port, server_host, user):
12981354
def cursor(self):
12991355
return MockCursor()
13001356

1357+
# pylint: disable=no-self-use
1358+
def commit(self):
1359+
pass
1360+
1361+
# pylint: disable=no-self-use
1362+
def rollback(self):
1363+
pass
1364+
13011365

13021366
class MockCursor:
13031367
def __init__(self) -> None:

0 commit comments

Comments
 (0)