From 5d31795346e9318a34d943a7af97c9db9aaa9519 Mon Sep 17 00:00:00 2001 From: Juan Calvo Pozo Date: Wed, 25 Feb 2026 12:07:19 +0000 Subject: [PATCH 1/6] Fix gRPC client interceptor breaking bidirectional streaming (#1180) Route bidi (stream-stream) RPCs through `_intercept` instead of the generator-based `_intercept_server_stream`. The generator wrapper strips the grpc.Call/grpc.Future interface, causing downstream code (e.g. google.api_core.bidi.BidiRpc) to crash with: AttributeError: 'generator' object has no attribute 'add_done_callback' The fix adds `and not client_info.is_client_stream` to the condition in `intercept_stream()` so only unary-stream RPCs use the generator path. Includes a regression test verifying the bidi stream response preserves the grpc.Call interface (add_done_callback, cancel, is_active). Co-authored-by: Cursor --- CHANGELOG.md | 2 + .../instrumentation/grpc/_client.py | 2 +- .../tests/test_client_interceptor.py | 45 +++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bbaae300f..c98c5b2be5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4139](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4139)) ### Fixed +- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'` by routing bidi streams through `_intercept` instead of the generator-based `_intercept_server_stream` + ([#1180](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1180)) - `opentelemetry-instrumentation-mysql`: Refactor MySQL integration test mocks to use concrete DBAPI connection attributes, reducing noisy attribute type warnings. ([#4116](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4116)) - `opentelemetry-instrumentation-cassandra`: Use `_instruments_any` instead of `_instruments` for driver dependencies so that having either `cassandra-driver` or `scylla-driver` installed is sufficient diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index bbeb4ec1d9..5e805a5479 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -228,7 +228,7 @@ def intercept_stream( if self._filter is not None and not self._filter(client_info): return invoker(request_or_iterator, metadata) - if client_info.is_server_stream: + if client_info.is_server_stream and not client_info.is_client_stream: return self._intercept_server_stream( request_or_iterator, metadata, client_info, invoker ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index e001d2ed57..c8c79c947c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -221,6 +221,51 @@ def test_stream_stream(self): }, ) + def test_stream_stream_preserves_call_interface(self): + """Regression test for issue #1180. + + Bidirectional streaming RPCs must return an object that implements + grpc.Call (add_done_callback, cancel, is_active, etc.) rather than + a bare generator. Before the fix, bidi streams were routed through + the generator-based _intercept_server_stream, which stripped the + grpc.Call interface and caused downstream code to crash with: + AttributeError: 'generator' object has no attribute 'add_done_callback' + """ + + def request_messages(): + for _ in range(5): + yield Request(client_id=1, request_data="data") + + response_iterator = self._stub.BidirectionalStreamingMethod( + request_messages(), metadata=(("key", "value"),) + ) + + for attr in ("add_done_callback", "cancel", "is_active"): + self.assertTrue( + hasattr(response_iterator, attr), + f"bidi stream response missing grpc.Call method '{attr}'", + ) + + list(response_iterator) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual( + span.name, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + self.assertIs(span.kind, trace.SpanKind.CLIENT) + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "BidirectionalStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], + }, + ) + def test_error_simple(self): with self.assertRaises(grpc.RpcError): simple_method(self._stub, error=True) From d2fc6ed30e742bb594c5307c7c17815c26b9786f Mon Sep 17 00:00:00 2001 From: Juan Calvo Pozo Date: Mon, 2 Mar 2026 11:27:30 +0000 Subject: [PATCH 2/6] Add gRPC status code attributes to error handling in client --- .../src/opentelemetry/instrumentation/grpc/_client.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 5e805a5479..0aa7262903 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -60,6 +60,13 @@ def callback(response_future): code = response_future.code() if code != grpc.StatusCode.OK: rpc_info.error = code + span.set_attribute(RPC_GRPC_STATUS_CODE, code.value[0]) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=str(code), + ) + ) return response = response_future.result() rpc_info.response = response From 82bffea3cb8b8c22b2e9b686ea5de9cda0763fd9 Mon Sep 17 00:00:00 2001 From: Juan Calvo Pozo Date: Wed, 11 Mar 2026 17:16:41 +0000 Subject: [PATCH 3/6] Remove redundant status description from span error status The gRPC status code is already captured in the RPC_GRPC_STATUS_CODE attribute, making the description in span status unnecessary. --- .../src/opentelemetry/instrumentation/grpc/_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 0aa7262903..a9264b56f4 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -64,7 +64,6 @@ def callback(response_future): span.set_status( Status( status_code=StatusCode.ERROR, - description=str(code), ) ) return From d476cf13cca4d170d5e95a8d0cd404c57c8b7d2a Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 17 Mar 2026 14:46:58 +0100 Subject: [PATCH 4/6] Update CHANGELOG.md --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15dbbe5be2..6819547b35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4321](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4321)) - `opentelemetry-instrumentation-logging`: Add recursion guard in LoggingHandler.emit to prevent deadlock ([#4302](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4302)) +- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'` + ([#1180](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1180)) ### Breaking changes @@ -93,8 +95,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4094](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4094)) - `opentelemetry-instrumentation-asyncio`: Fix environment variables not appearing in Read the Docs documentation ([#4256](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/4256)) -- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'` by routing bidi streams through `_intercept` instead of the generator-based `_intercept_server_stream` - ([#1180](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1180)) - `opentelemetry-instrumentation-mysql`: Refactor MySQL integration test mocks to use concrete DBAPI connection attributes, reducing noisy attribute type warnings. ([#4116](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4116)) - `opentelemetry-instrumentation-cassandra`: Use `_instruments_any` instead of `_instruments` for driver dependencies so that having either `cassandra-driver` or `scylla-driver` installed is sufficient From b634deea7d65786f391bad7a07a607890b12aa01 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 17 Mar 2026 14:47:27 +0100 Subject: [PATCH 5/6] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6819547b35..2e2eea8927 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-logging`: Add recursion guard in LoggingHandler.emit to prevent deadlock ([#4302](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4302)) - `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'` - ([#1180](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1180)) + ([#4259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4259)) ### Breaking changes From e8139f600fe18dea605193b5ab6d28236f8ab617 Mon Sep 17 00:00:00 2001 From: Juan Calvo Pozo Date: Wed, 18 Mar 2026 11:24:36 +0000 Subject: [PATCH 6/6] Add RPC_GRPC_STATUS_CODE assertions to client error tests Verify that error spans include the correct gRPC status code attribute (INVALID_ARGUMENT) in both sync and async client interceptor tests. --- .../tests/test_aio_client_interceptor.py | 16 ++++++++++++++++ .../tests/test_client_interceptor.py | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py index 850ad79661..daa4d68dc3 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py @@ -224,6 +224,10 @@ async def test_error_simple(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) async def test_error_unary_stream(self): with self.assertRaises(grpc.RpcError): @@ -237,6 +241,10 @@ async def test_error_unary_stream(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) async def test_error_stream_unary(self): with self.assertRaises(grpc.RpcError): @@ -249,6 +257,10 @@ async def test_error_stream_unary(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) async def test_error_stream_stream(self): with self.assertRaises(grpc.RpcError): @@ -264,6 +276,10 @@ async def test_error_stream_stream(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) # pylint:disable=no-self-use async def test_client_interceptor_trace_context_propagation(self): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index c8c79c947c..3830bf00c4 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -277,6 +277,10 @@ def test_error_simple(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) def test_error_stream_unary(self): with self.assertRaises(grpc.RpcError): @@ -289,6 +293,10 @@ def test_error_stream_unary(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) def test_error_unary_stream(self): with self.assertRaises(grpc.RpcError): @@ -301,6 +309,10 @@ def test_error_unary_stream(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) def test_error_stream_stream(self): with self.assertRaises(grpc.RpcError): @@ -313,6 +325,10 @@ def test_error_stream_stream(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) def test_client_interceptor_falsy_response( self,