diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c4d7ebddd..2e2eea8927 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4321](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4321)) - `opentelemetry-instrumentation-logging`: Add recursion guard in LoggingHandler.emit to prevent deadlock ([#4302](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4302)) +- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'` + ([#4259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4259)) ### Breaking changes diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index bbeb4ec1d9..a9264b56f4 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -60,6 +60,12 @@ def callback(response_future): code = response_future.code() if code != grpc.StatusCode.OK: rpc_info.error = code + span.set_attribute(RPC_GRPC_STATUS_CODE, code.value[0]) + span.set_status( + Status( + status_code=StatusCode.ERROR, + ) + ) return response = response_future.result() rpc_info.response = response @@ -228,7 +234,7 @@ def intercept_stream( if self._filter is not None and not self._filter(client_info): return invoker(request_or_iterator, metadata) - if client_info.is_server_stream: + if client_info.is_server_stream and not client_info.is_client_stream: return self._intercept_server_stream( request_or_iterator, metadata, client_info, invoker ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py index 850ad79661..daa4d68dc3 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py @@ -224,6 +224,10 @@ async def test_error_simple(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) async def test_error_unary_stream(self): with self.assertRaises(grpc.RpcError): @@ -237,6 +241,10 @@ async def test_error_unary_stream(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) async def test_error_stream_unary(self): with self.assertRaises(grpc.RpcError): @@ -249,6 +257,10 @@ async def test_error_stream_unary(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) async def test_error_stream_stream(self): with self.assertRaises(grpc.RpcError): @@ -264,6 +276,10 @@ async def test_error_stream_stream(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) # pylint:disable=no-self-use async def test_client_interceptor_trace_context_propagation(self): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index e001d2ed57..3830bf00c4 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -221,6 +221,51 @@ def test_stream_stream(self): }, ) + def test_stream_stream_preserves_call_interface(self): + """Regression test for issue #1180. + + Bidirectional streaming RPCs must return an object that implements + grpc.Call (add_done_callback, cancel, is_active, etc.) rather than + a bare generator. Before the fix, bidi streams were routed through + the generator-based _intercept_server_stream, which stripped the + grpc.Call interface and caused downstream code to crash with: + AttributeError: 'generator' object has no attribute 'add_done_callback' + """ + + def request_messages(): + for _ in range(5): + yield Request(client_id=1, request_data="data") + + response_iterator = self._stub.BidirectionalStreamingMethod( + request_messages(), metadata=(("key", "value"),) + ) + + for attr in ("add_done_callback", "cancel", "is_active"): + self.assertTrue( + hasattr(response_iterator, attr), + f"bidi stream response missing grpc.Call method '{attr}'", + ) + + list(response_iterator) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual( + span.name, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + self.assertIs(span.kind, trace.SpanKind.CLIENT) + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "BidirectionalStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], + }, + ) + def test_error_simple(self): with self.assertRaises(grpc.RpcError): simple_method(self._stub, error=True) @@ -232,6 +277,10 @@ def test_error_simple(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) def test_error_stream_unary(self): with self.assertRaises(grpc.RpcError): @@ -244,6 +293,10 @@ def test_error_stream_unary(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) def test_error_unary_stream(self): with self.assertRaises(grpc.RpcError): @@ -256,6 +309,10 @@ def test_error_unary_stream(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) def test_error_stream_stream(self): with self.assertRaises(grpc.RpcError): @@ -268,6 +325,10 @@ def test_error_stream_stream(self): span.status.status_code, trace.StatusCode.ERROR, ) + self.assertEqual( + span.attributes[RPC_GRPC_STATUS_CODE], + grpc.StatusCode.INVALID_ARGUMENT.value[0], + ) def test_client_interceptor_falsy_response( self,