Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
73f9258
feat(asyncpg): add capture_connection_cleanup option to silence pool …
RiyaChaturvedi37 Mar 28, 2026
c6a7702
fix: remove trailing whitespace from blank lines
RiyaChaturvedi37 Mar 28, 2026
d2c7ef7
fix: remove trailing whitespace and fix line endings
RiyaChaturvedi37 Mar 28, 2026
7ea55a3
fix: apply ruff formatting
RiyaChaturvedi37 Mar 28, 2026
b2e6ea8
fix: uninstrument after test to avoid state leakage
RiyaChaturvedi37 Mar 28, 2026
4c08069
fix: address reviewer feedback - add tests, docs and fix changelog
RiyaChaturvedi37 Mar 31, 2026
9c92064
docs: clarify capture_connection_cleanup usage in docstring
RiyaChaturvedi37 Apr 1, 2026
5c46842
Merge branch 'main' into fix/asyncpg-silence-cleanup
RiyaChaturvedi37 Apr 1, 2026
8085072
fix: resolve CHANGELOG.md conflict
RiyaChaturvedi37 Apr 4, 2026
bf3e817
Merge branch 'main' into fix/asyncpg-silence-cleanup
RiyaChaturvedi37 Apr 4, 2026
8646854
refactor(asyncpg): replace capture_connection_cleanup with exclude_qu…
RiyaChaturvedi37 Apr 10, 2026
a9e4c28
Merge branch 'main' into fix/asyncpg-silence-cleanup
RiyaChaturvedi37 Apr 10, 2026
e0b7918
fix: fix inconsistent return statements in _is_excluded_query
RiyaChaturvedi37 Apr 10, 2026
6a3dbde
fix: resolve CHANGELOG conflict
RiyaChaturvedi37 Apr 25, 2026
ae1b7cc
Merge branch 'main' into fix/asyncpg-silence-cleanup
RiyaChaturvedi37 Apr 25, 2026
7941182
Merge branch 'main' into fix/asyncpg-silence-cleanup
RiyaChaturvedi37 May 8, 2026
dfc1e42
fix: add CHANGELOG entry for exclude_queries feature
RiyaChaturvedi37 May 8, 2026
5f6463c
Merge branch 'main' into fix/asyncpg-silence-cleanup
RiyaChaturvedi37 May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Comment thread
tammy-baylis-swi marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
> Use [this search for a list of all CHANGELOG.md files in this repo](https://github.com/search?q=repo%3Aopen-telemetry%2Fopentelemetry-python-contrib+path%3A**%2FCHANGELOG.md&type=code).

## Unreleased

### Added

- `opentelemetry-instrumentation-confluent-kafka`: Loosen confluent-kafka upper bound to <3.0.0
Expand All @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4212))
- `opentelemetry-instrumentation-botocore`: Add support for instrumenting `aiobotocore`
([#4049](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4049))
- `opentelemetry-instrumentation-asyncpg`: Add `capture_connection_cleanup` option to `AsyncPGInstrumentor` to silence connection pool cleanup spans
([#4373](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4373))

### Fixed

Expand Down
Comment thread
tammy-baylis-swi marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

# You can optionally pass a custom TracerProvider to AsyncPGInstrumentor.instrument()
AsyncPGInstrumentor().instrument()
AsyncPGInstrumentor(capture_connection_cleanup=False).instrument()
Comment thread
RiyaChaturvedi37 marked this conversation as resolved.
Outdated
Comment thread
RiyaChaturvedi37 marked this conversation as resolved.
Outdated

async def main():
conn = await asyncpg.connect(user='user', password='password')
Expand Down Expand Up @@ -114,11 +115,27 @@ def _hydrate_span_from_args(connection, query, parameters) -> dict:

class AsyncPGInstrumentor(BaseInstrumentor):
_leading_comment_remover = re.compile(r"^/\*.*?\*/")
_CLEANUP_QUERIES = frozenset(
[
"SELECT pg_advisory_unlock_all()",
"CLOSE ALL",
"UNLISTEN *",
"RESET ALL",
]
)
_tracer = None

def __init__(self, capture_parameters=False):
def _is_cleanup_query(self, query: str) -> bool:
if query is None:
return False
return any(q in query for q in self._CLEANUP_QUERIES)

def __init__(
self, capture_parameters=False, capture_connection_cleanup=True
):
super().__init__()
self.capture_parameters = capture_parameters
self.capture_connection_cleanup = capture_connection_cleanup

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
Expand Down Expand Up @@ -184,6 +201,10 @@ async def _do_execute(self, func, instance, args, kwargs):
args[0],
args[1:] if self.capture_parameters else None,
)
if not self.capture_connection_cleanup and self._is_cleanup_query(
args[0]
):
return await func(*args, **kwargs)

with self._tracer.start_as_current_span(
name, kind=SpanKind.CLIENT, attributes=span_attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,82 @@ async def exec_mock(*args, **kwargs):

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)

def test_capture_connection_cleanup_false(self):
Comment thread
tammy-baylis-swi marked this conversation as resolved.
"""Test that cleanup queries are not traced when capture_connection_cleanup=False."""
AsyncPGInstrumentor().uninstrument()
apg = AsyncPGInstrumentor(capture_connection_cleanup=False)
apg.instrument(tracer_provider=self.tracer_provider)

async def mock_execute(*args, **kwargs):
return None

conn = mock.Mock()
conn._params = mock.Mock()
conn._params.database = "testdb"
conn._params.user = "testuser"
conn._addr = ("localhost", 5432)

for cleanup_query in [
"SELECT pg_advisory_unlock_all()",
"CLOSE ALL",
"UNLISTEN *",
"RESET ALL",
]:
asyncio.run(
apg._do_execute(mock_execute, conn, (cleanup_query,), {})
)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
AsyncPGInstrumentor().uninstrument()

def test_capture_connection_cleanup_true_by_default(self):
"""Test that cleanup queries are traced by default."""
AsyncPGInstrumentor().uninstrument()
apg = AsyncPGInstrumentor()
apg.instrument(tracer_provider=self.tracer_provider)

async def mock_execute(*args, **kwargs):
return None

conn = mock.Mock()
conn._params = mock.Mock()
conn._params.database = "testdb"
conn._params.user = "testuser"
conn._addr = ("localhost", 5432)

asyncio.run(
apg._do_execute(
mock_execute, conn, ("SELECT pg_advisory_unlock_all()",), {}
)
)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
AsyncPGInstrumentor().uninstrument()

def test_capture_connection_cleanup_explicitly_true(self):
"""Test that cleanup queries are traced when capture_connection_cleanup=True."""
AsyncPGInstrumentor().uninstrument()
apg = AsyncPGInstrumentor(capture_connection_cleanup=True)
apg.instrument(tracer_provider=self.tracer_provider)

async def mock_execute(*args, **kwargs):
return None

conn = mock.Mock()
conn._params = mock.Mock()
conn._params.database = "testdb"
conn._params.user = "testuser"
conn._addr = ("localhost", 5432)

asyncio.run(
apg._do_execute(
mock_execute, conn, ("SELECT pg_advisory_unlock_all()",), {}
)
)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
AsyncPGInstrumentor().uninstrument()
Loading