Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

<<<<<<< fix/asyncpg-silence-cleanup
=======
## Version 1.41.0/0.62b0 (2026-04-09)

>>>>>>> main
### Added

- `opentelemetry-instrumentation-asgi`: Respect `suppress_http_instrumentation` context in ASGI middleware to skip server span creation when HTTP instrumentation is suppressed
Expand All @@ -27,6 +30,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4049](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4049))
- `opentelemetry-instrumentation-sqlalchemy`: implement new semantic convention opt-in migration
([#4110](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4110))
- `opentelemetry-instrumentation-asyncpg`: Add `capture_connection_cleanup` option to `AsyncPGInstrumentor` to silence connection pool cleanup spans
([#4373](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4373))


### Fixed

Expand All @@ -45,7 +51,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'`
([#4259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4259))
- `opentelemetry-instrumentation-aiokafka`: fix `Unclosed AIOKafkaProducer` warning and `RuntimeWarning: coroutine was never awaited` in tests
([#4384](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4384))
([#4384](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4384))
- `opentelemetry-instrumentation-aiokafka`: Fix compatibility with aiokafka 0.13 by calling
`_key_serializer`/`_value_serializer` directly instead of the internal `_serialize` method
whose signature changed in 0.13 from `(topic, key, value)` to `(key, value, headers)`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
# You can optionally pass a custom TracerProvider to AsyncPGInstrumentor.instrument()
AsyncPGInstrumentor().instrument()

# OR opt into not tracing connection pool cleanup queries
AsyncPGInstrumentor(exclude_queries=AsyncPGInstrumentor._CLEANUP_QUERIES).instrument()

async def main():
conn = await asyncpg.connect(user='user', password='password')

Expand Down Expand Up @@ -114,11 +117,25 @@ def _hydrate_span_from_args(connection, query, parameters) -> dict:

class AsyncPGInstrumentor(BaseInstrumentor):
_leading_comment_remover = re.compile(r"^/\*.*?\*/")
_CLEANUP_QUERIES = frozenset(
[
"SELECT pg_advisory_unlock_all()",
"CLOSE ALL",
"UNLISTEN *",
"RESET ALL",
]
)
_tracer = None

def __init__(self, capture_parameters=False):
def _is_excluded_query(self, query: str) -> bool:
if not query:
return False
return any(q in query for q in self.exclude_queries)

def __init__(self, capture_parameters=False, exclude_queries=None):
super().__init__()
self.capture_parameters = capture_parameters
self.exclude_queries = exclude_queries or set()

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
Expand Down Expand Up @@ -184,6 +201,8 @@ async def _do_execute(self, func, instance, args, kwargs):
args[0],
args[1:] if self.capture_parameters else None,
)
if self._is_excluded_query(args[0]):
return await func(*args, **kwargs)

with self._tracer.start_as_current_span(
name, kind=SpanKind.CLIENT, attributes=span_attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,84 @@ async def exec_mock(*args, **kwargs):

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)

def test_capture_connection_cleanup_false(self):
"""Test that cleanup queries are not traced when exclude_queries=AsyncPGInstrumentor._CLEANUP_QUERIES."""
AsyncPGInstrumentor().uninstrument()
apg = AsyncPGInstrumentor(
exclude_queries=AsyncPGInstrumentor._CLEANUP_QUERIES
)
apg.instrument(tracer_provider=self.tracer_provider)

async def mock_execute(*args, **kwargs):
return None

conn = mock.Mock()
conn._params = mock.Mock()
conn._params.database = "testdb"
conn._params.user = "testuser"
conn._addr = ("localhost", 5432)

for cleanup_query in [
"SELECT pg_advisory_unlock_all()",
"CLOSE ALL",
"UNLISTEN *",
"RESET ALL",
]:
asyncio.run(
apg._do_execute(mock_execute, conn, (cleanup_query,), {})
)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
AsyncPGInstrumentor().uninstrument()

def test_capture_connection_cleanup_true_by_default(self):
"""Test that cleanup queries are traced by default."""
AsyncPGInstrumentor().uninstrument()
apg = AsyncPGInstrumentor()
apg.instrument(tracer_provider=self.tracer_provider)

async def mock_execute(*args, **kwargs):
return None

conn = mock.Mock()
conn._params = mock.Mock()
conn._params.database = "testdb"
conn._params.user = "testuser"
conn._addr = ("localhost", 5432)

asyncio.run(
apg._do_execute(
mock_execute, conn, ("SELECT pg_advisory_unlock_all()",), {}
)
)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
AsyncPGInstrumentor().uninstrument()

def test_capture_connection_cleanup_explicitly_true(self):
"""Test that cleanup queries are traced when exclude_queries=set()."""
AsyncPGInstrumentor().uninstrument()
apg = AsyncPGInstrumentor(exclude_queries=set())
apg.instrument(tracer_provider=self.tracer_provider)

async def mock_execute(*args, **kwargs):
return None

conn = mock.Mock()
conn._params = mock.Mock()
conn._params.database = "testdb"
conn._params.user = "testuser"
conn._addr = ("localhost", 5432)

asyncio.run(
apg._do_execute(
mock_execute, conn, ("SELECT pg_advisory_unlock_all()",), {}
)
)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
AsyncPGInstrumentor().uninstrument()
Loading