diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c108e893e..9c416b1271 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,8 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +<<<<<<< fix/asyncpg-silence-cleanup +======= ## Version 1.41.0/0.62b0 (2026-04-09) +>>>>>>> main ### Added - `opentelemetry-instrumentation-asgi`: Respect `suppress_http_instrumentation` context in ASGI middleware to skip server span creation when HTTP instrumentation is suppressed @@ -27,6 +30,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4049](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4049)) - `opentelemetry-instrumentation-sqlalchemy`: implement new semantic convention opt-in migration ([#4110](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4110)) +- `opentelemetry-instrumentation-asyncpg`: Add `capture_connection_cleanup` option to `AsyncPGInstrumentor` to silence connection pool cleanup spans + ([#4373](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4373)) + ### Fixed @@ -45,7 +51,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'` ([#4259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4259)) - `opentelemetry-instrumentation-aiokafka`: fix `Unclosed AIOKafkaProducer` warning and `RuntimeWarning: coroutine was never awaited` in tests - ([#4384](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4384)) + ([#4384](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4384)) - `opentelemetry-instrumentation-aiokafka`: Fix compatibility with aiokafka 0.13 by calling `_key_serializer`/`_value_serializer` directly instead of the internal `_serialize` method whose signature changed in 0.13 from `(topic, key, value)` to `(key, value, headers)` diff --git a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py index c8aba9bbf3..f3f4c7fb40 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py @@ -36,6 +36,9 @@ # You can optionally pass a custom TracerProvider to AsyncPGInstrumentor.instrument() AsyncPGInstrumentor().instrument() + # OR opt into not tracing connection pool cleanup queries + AsyncPGInstrumentor(exclude_queries=AsyncPGInstrumentor._CLEANUP_QUERIES).instrument() + async def main(): conn = await asyncpg.connect(user='user', password='password') @@ -114,11 +117,25 @@ def _hydrate_span_from_args(connection, query, parameters) -> dict: class AsyncPGInstrumentor(BaseInstrumentor): _leading_comment_remover = re.compile(r"^/\*.*?\*/") + _CLEANUP_QUERIES = frozenset( + [ + "SELECT pg_advisory_unlock_all()", + "CLOSE ALL", + "UNLISTEN *", + "RESET ALL", + ] + ) _tracer = None - def __init__(self, capture_parameters=False): + def _is_excluded_query(self, query: str) -> bool: + if not query: + return False + return any(q in query for q in self.exclude_queries) + + def __init__(self, capture_parameters=False, exclude_queries=None): super().__init__() self.capture_parameters = capture_parameters + self.exclude_queries = exclude_queries or set() def instrumentation_dependencies(self) -> Collection[str]: return _instruments @@ -184,6 +201,8 @@ async def _do_execute(self, func, instance, args, kwargs): args[0], args[1:] if self.capture_parameters else None, ) + if self._is_excluded_query(args[0]): + return await func(*args, **kwargs) with self._tracer.start_as_current_span( name, kind=SpanKind.CLIENT, attributes=span_attributes diff --git a/instrumentation/opentelemetry-instrumentation-asyncpg/tests/test_asyncpg_wrapper.py b/instrumentation/opentelemetry-instrumentation-asyncpg/tests/test_asyncpg_wrapper.py index 21d4591e0e..f0a7f0a565 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncpg/tests/test_asyncpg_wrapper.py +++ b/instrumentation/opentelemetry-instrumentation-asyncpg/tests/test_asyncpg_wrapper.py @@ -144,3 +144,84 @@ async def exec_mock(*args, **kwargs): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 0) + + def test_capture_connection_cleanup_false(self): + """Test that cleanup queries are not traced when exclude_queries=AsyncPGInstrumentor._CLEANUP_QUERIES.""" + AsyncPGInstrumentor().uninstrument() + apg = AsyncPGInstrumentor( + exclude_queries=AsyncPGInstrumentor._CLEANUP_QUERIES + ) + apg.instrument(tracer_provider=self.tracer_provider) + + async def mock_execute(*args, **kwargs): + return None + + conn = mock.Mock() + conn._params = mock.Mock() + conn._params.database = "testdb" + conn._params.user = "testuser" + conn._addr = ("localhost", 5432) + + for cleanup_query in [ + "SELECT pg_advisory_unlock_all()", + "CLOSE ALL", + "UNLISTEN *", + "RESET ALL", + ]: + asyncio.run( + apg._do_execute(mock_execute, conn, (cleanup_query,), {}) + ) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + AsyncPGInstrumentor().uninstrument() + + def test_capture_connection_cleanup_true_by_default(self): + """Test that cleanup queries are traced by default.""" + AsyncPGInstrumentor().uninstrument() + apg = AsyncPGInstrumentor() + apg.instrument(tracer_provider=self.tracer_provider) + + async def mock_execute(*args, **kwargs): + return None + + conn = mock.Mock() + conn._params = mock.Mock() + conn._params.database = "testdb" + conn._params.user = "testuser" + conn._addr = ("localhost", 5432) + + asyncio.run( + apg._do_execute( + mock_execute, conn, ("SELECT pg_advisory_unlock_all()",), {} + ) + ) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + AsyncPGInstrumentor().uninstrument() + + def test_capture_connection_cleanup_explicitly_true(self): + """Test that cleanup queries are traced when exclude_queries=set().""" + AsyncPGInstrumentor().uninstrument() + apg = AsyncPGInstrumentor(exclude_queries=set()) + apg.instrument(tracer_provider=self.tracer_provider) + + async def mock_execute(*args, **kwargs): + return None + + conn = mock.Mock() + conn._params = mock.Mock() + conn._params.database = "testdb" + conn._params.user = "testuser" + conn._addr = ("localhost", 5432) + + asyncio.run( + apg._do_execute( + mock_execute, conn, ("SELECT pg_advisory_unlock_all()",), {} + ) + ) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + AsyncPGInstrumentor().uninstrument()