160160import functools
161161import logging
162162import re
163+ import time
163164from typing import Any , Awaitable , Callable , Generic , TypeVar
164165
165166from wrapt import wrap_function_wrapper
179180 _get_schema_url_for_signal_types ,
180181 _OpenTelemetrySemanticConventionStability ,
181182 _OpenTelemetryStabilitySignalType ,
183+ _report_new ,
182184 _set_db_name ,
183185 _set_db_statement ,
184186 _set_db_system ,
193195 is_instrumentation_enabled ,
194196 unwrap ,
195197)
198+ from opentelemetry .metrics import MeterProvider , get_meter
199+ from opentelemetry .semconv ._incubating .metrics .db_metrics import (
200+ create_db_client_operation_duration ,
201+ create_db_client_response_returned_rows ,
202+ )
203+ from opentelemetry .semconv .attributes .db_attributes import (
204+ DB_NAMESPACE ,
205+ DB_OPERATION_NAME ,
206+ DB_SYSTEM_NAME ,
207+ )
208+ from opentelemetry .semconv .attributes .error_attributes import ERROR_TYPE
209+ from opentelemetry .semconv .attributes .server_attributes import (
210+ SERVER_ADDRESS ,
211+ SERVER_PORT ,
212+ )
196213from opentelemetry .trace import SpanKind , TracerProvider , get_tracer
197214from opentelemetry .util ._importlib_metadata import version as util_version
198215
@@ -217,6 +234,7 @@ def trace_integration(
217234 db_api_integration_factory : type [DatabaseApiIntegration ] | None = None ,
218235 enable_attribute_commenter : bool = False ,
219236 commenter_options : dict [str , Any ] | None = None ,
237+ meter_provider : MeterProvider | None = None ,
220238):
221239 """Integrate with DB API library.
222240 https://www.python.org/dev/peps/pep-0249/
@@ -236,6 +254,8 @@ def trace_integration(
236254 default one is used.
237255 enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` and/or `db.query.text` span attribute. Only available if enable_commenter=True.
238256 commenter_options: Configurations for tags to be appended at the sql query.
257+ meter_provider: The :class:`opentelemetry.metrics.MeterProvider` to
258+ use. If omitted the current configured one is used.
239259 """
240260 wrap_connect (
241261 __name__ ,
@@ -250,9 +270,11 @@ def trace_integration(
250270 db_api_integration_factory = db_api_integration_factory ,
251271 enable_attribute_commenter = enable_attribute_commenter ,
252272 commenter_options = commenter_options ,
273+ meter_provider = meter_provider ,
253274 )
254275
255276
277+ # pylint: disable-next=too-many-positional-arguments
256278def wrap_connect (
257279 name : str ,
258280 connect_module : Callable [..., Any ],
@@ -266,6 +288,7 @@ def wrap_connect(
266288 db_api_integration_factory : type [DatabaseApiIntegration ] | None = None ,
267289 commenter_options : dict [str , Any ] | None = None ,
268290 enable_attribute_commenter : bool = False ,
291+ meter_provider : MeterProvider | None = None ,
269292):
270293 """Integrate with DB API library.
271294 https://www.python.org/dev/peps/pep-0249/
@@ -285,6 +308,8 @@ def wrap_connect(
285308 default one is used.
286309 commenter_options: Configurations for tags to be appended at the sql query.
287310 enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` and/or `db.query.text` span attribute. Only available if enable_commenter=True.
311+ meter_provider: The :class:`opentelemetry.metrics.MeterProvider` to
312+ use. If omitted the current configured one is used.
288313
289314 """
290315 db_api_integration_factory = (
@@ -309,6 +334,7 @@ def wrap_connect_(
309334 commenter_options = commenter_options ,
310335 connect_module = connect_module ,
311336 enable_attribute_commenter = enable_attribute_commenter ,
337+ meter_provider = meter_provider ,
312338 )
313339 return db_integration .wrapped_connection (wrapped , args , kwargs )
314340
@@ -333,6 +359,7 @@ def unwrap_connect(
333359 unwrap (connect_module , connect_method_name )
334360
335361
362+ # pylint: disable-next=too-many-positional-arguments
336363def instrument_connection (
337364 name : str ,
338365 connection : ConnectionT | TracedConnectionProxy [ConnectionT ],
@@ -346,6 +373,7 @@ def instrument_connection(
346373 connect_module : Callable [..., Any ] | None = None ,
347374 enable_attribute_commenter : bool = False ,
348375 db_api_integration_factory : type [DatabaseApiIntegration ] | None = None ,
376+ meter_provider : MeterProvider | None = None ,
349377) -> TracedConnectionProxy [ConnectionT ]:
350378 """Enable instrumentation in a database connection.
351379
@@ -367,6 +395,8 @@ def instrument_connection(
367395 replacement for :class:`DatabaseApiIntegration`. Can be used to
368396 obtain connection attributes from the connect method instead of
369397 from the connection itself (as done by the pymssql intrumentor).
398+ meter_provider: The :class:`opentelemetry.metrics.MeterProvider` to
399+ use. If omitted the current configured one is used.
370400
371401 Returns:
372402 An instrumented connection.
@@ -390,6 +420,7 @@ def instrument_connection(
390420 commenter_options = commenter_options ,
391421 connect_module = connect_module ,
392422 enable_attribute_commenter = enable_attribute_commenter ,
423+ meter_provider = meter_provider ,
393424 )
394425 db_integration .get_connection_attributes (connection )
395426 return get_traced_connection_proxy (connection , db_integration )
@@ -426,6 +457,7 @@ def __init__(
426457 commenter_options : dict [str , Any ] | None = None ,
427458 connect_module : Callable [..., Any ] | None = None ,
428459 enable_attribute_commenter : bool = False ,
460+ meter_provider : MeterProvider | None = None ,
429461 ):
430462 # Initialize semantic conventions opt-in if needed
431463 _OpenTelemetrySemanticConventionStability ._initialize ()
@@ -458,6 +490,24 @@ def __init__(
458490 ]
459491 ),
460492 )
493+ self ._meter = None
494+ self ._duration_histogram = None
495+ self ._returned_rows_histogram = None
496+ if _report_new (self ._sem_conv_opt_in_mode_db ):
497+ self ._meter = get_meter (
498+ self ._name ,
499+ self ._version ,
500+ meter_provider ,
501+ schema_url = _get_schema_url_for_signal_types (
502+ [_OpenTelemetryStabilitySignalType .DATABASE ]
503+ ),
504+ )
505+ self ._duration_histogram = create_db_client_operation_duration (
506+ self ._meter
507+ )
508+ self ._returned_rows_histogram = (
509+ create_db_client_response_returned_rows (self ._meter )
510+ )
461511 self .capture_parameters = capture_parameters
462512 self .enable_commenter = enable_commenter
463513 self .commenter_options = commenter_options
@@ -467,6 +517,8 @@ def __init__(
467517 self .span_attributes : dict [str , Any ] = {}
468518 self .name = ""
469519 self .database = ""
520+ self ._server_address : str | None = None
521+ self ._server_port : int | None = None
470522 self .connect_module = connect_module
471523 self .commenter_data = self .calculate_commenter_data ()
472524
@@ -579,11 +631,13 @@ def get_connection_attributes(self, connection: object) -> None:
579631 host ,
580632 self ._sem_conv_opt_in_mode_http ,
581633 )
634+ self ._server_address = host
582635 port = self .connection_props .get ("port" )
583636 if port is not None :
584637 _set_http_peer_port_client (
585638 self .span_attributes , port , self ._sem_conv_opt_in_mode_http
586639 )
640+ self ._server_port = port
587641
588642
589643# pylint: disable=abstract-method,no-member
@@ -754,6 +808,50 @@ def get_statement(self, cursor: CursorT, args: tuple[Any, ...]): # pylint: disa
754808 return statement .decode ("utf8" , "replace" )
755809 return statement
756810
811+ def _get_metric_attributes (
812+ self ,
813+ operation_name : str ,
814+ error : Exception | None ,
815+ ) -> dict [str , Any ]:
816+ attributes : dict [str , Any ] = {
817+ DB_SYSTEM_NAME : self ._db_api_integration .database_system ,
818+ }
819+ if self ._db_api_integration .database :
820+ attributes [DB_NAMESPACE ] = self ._db_api_integration .database
821+ if operation_name :
822+ attributes [DB_OPERATION_NAME ] = operation_name
823+ if self ._db_api_integration ._server_address is not None :
824+ attributes [SERVER_ADDRESS ] = (
825+ self ._db_api_integration ._server_address
826+ )
827+ if self ._db_api_integration ._server_port is not None :
828+ attributes [SERVER_PORT ] = self ._db_api_integration ._server_port
829+ if error is not None :
830+ attributes [ERROR_TYPE ] = type (error ).__qualname__
831+ return attributes
832+
833+ def _record_metrics (
834+ self ,
835+ cursor : CursorT ,
836+ operation_name : str ,
837+ start_time : float ,
838+ error : Exception | None ,
839+ ) -> None :
840+ if not _report_new (self ._db_api_integration ._sem_conv_opt_in_mode_db ):
841+ # DB Metrics are not supported without Database semconv opt-in
842+ return
843+ elapsed = time .perf_counter () - start_time
844+ attributes = self ._get_metric_attributes (operation_name , error )
845+ self ._db_api_integration ._duration_histogram .record (
846+ elapsed , attributes = attributes
847+ )
848+ if error is None :
849+ rowcount = getattr (cursor , "rowcount" , None )
850+ if isinstance (rowcount , int ) and rowcount >= 0 :
851+ self ._db_api_integration ._returned_rows_histogram .record (
852+ rowcount , attributes = attributes
853+ )
854+
757855 def traced_execution (
758856 self ,
759857 cursor : CursorT ,
@@ -764,7 +862,8 @@ def traced_execution(
764862 if not is_instrumentation_enabled ():
765863 return query_method (* args , ** kwargs )
766864
767- name = self .get_operation_name (cursor , args )
865+ operation_name = self .get_operation_name (cursor , args )
866+ name = operation_name
768867 if not name :
769868 name = (
770869 self ._db_api_integration .database
@@ -793,7 +892,15 @@ def traced_execution(
793892 else :
794893 # no sqlcomment anywhere
795894 self ._populate_span (span , cursor , * args )
796- return query_method (* args , ** kwargs )
895+ start_time = time .perf_counter ()
896+ error : Exception | None = None
897+ try :
898+ return query_method (* args , ** kwargs )
899+ except Exception as exc :
900+ error = exc
901+ raise
902+ finally :
903+ self ._record_metrics (cursor , operation_name , start_time , error )
797904
798905 async def traced_execution_async (
799906 self ,
@@ -802,7 +909,8 @@ async def traced_execution_async(
802909 * args : tuple [Any , ...],
803910 ** kwargs : dict [Any , Any ],
804911 ):
805- name = self .get_operation_name (cursor , args )
912+ operation_name = self .get_operation_name (cursor , args )
913+ name = operation_name
806914 if not name :
807915 name = (
808916 self ._db_api_integration .database
@@ -831,7 +939,15 @@ async def traced_execution_async(
831939 else :
832940 # no sqlcomment anywhere
833941 self ._populate_span (span , cursor , * args )
834- return await query_method (* args , ** kwargs )
942+ start_time = time .perf_counter ()
943+ error : Exception | None = None
944+ try :
945+ return await query_method (* args , ** kwargs )
946+ except Exception as exc :
947+ error = exc
948+ raise
949+ finally :
950+ self ._record_metrics (cursor , operation_name , start_time , error )
835951
836952
837953# pylint: disable=abstract-method,no-member
0 commit comments