Skip to content
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4321](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4321))
- `opentelemetry-instrumentation-logging`: Add recursion guard in LoggingHandler.emit to prevent deadlock
([#4302](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4302))
- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'`
([#4259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4259))

### Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ def callback(response_future):
code = response_future.code()
if code != grpc.StatusCode.OK:
rpc_info.error = code
span.set_attribute(RPC_GRPC_STATUS_CODE, code.value[0])
Comment thread
juandelacruz-calvo marked this conversation as resolved.
span.set_status(
Status(
status_code=StatusCode.ERROR,
)
)
return
response = response_future.result()
rpc_info.response = response
Expand Down Expand Up @@ -228,7 +234,7 @@ def intercept_stream(
if self._filter is not None and not self._filter(client_info):
return invoker(request_or_iterator, metadata)

if client_info.is_server_stream:
if client_info.is_server_stream and not client_info.is_client_stream:
return self._intercept_server_stream(
request_or_iterator, metadata, client_info, invoker
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ async def test_error_simple(self):
span.status.status_code,
trace.StatusCode.ERROR,
)
self.assertEqual(
span.attributes[RPC_GRPC_STATUS_CODE],
grpc.StatusCode.INVALID_ARGUMENT.value[0],
)

async def test_error_unary_stream(self):
with self.assertRaises(grpc.RpcError):
Expand All @@ -237,6 +241,10 @@ async def test_error_unary_stream(self):
span.status.status_code,
trace.StatusCode.ERROR,
)
self.assertEqual(
span.attributes[RPC_GRPC_STATUS_CODE],
grpc.StatusCode.INVALID_ARGUMENT.value[0],
)

async def test_error_stream_unary(self):
with self.assertRaises(grpc.RpcError):
Expand All @@ -249,6 +257,10 @@ async def test_error_stream_unary(self):
span.status.status_code,
trace.StatusCode.ERROR,
)
self.assertEqual(
span.attributes[RPC_GRPC_STATUS_CODE],
grpc.StatusCode.INVALID_ARGUMENT.value[0],
)

async def test_error_stream_stream(self):
with self.assertRaises(grpc.RpcError):
Expand All @@ -264,6 +276,10 @@ async def test_error_stream_stream(self):
span.status.status_code,
trace.StatusCode.ERROR,
)
self.assertEqual(
span.attributes[RPC_GRPC_STATUS_CODE],
grpc.StatusCode.INVALID_ARGUMENT.value[0],
)

# pylint:disable=no-self-use
async def test_client_interceptor_trace_context_propagation(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,51 @@ def test_stream_stream(self):
},
)

def test_stream_stream_preserves_call_interface(self):
"""Regression test for issue #1180.

Bidirectional streaming RPCs must return an object that implements
grpc.Call (add_done_callback, cancel, is_active, etc.) rather than
a bare generator. Before the fix, bidi streams were routed through
the generator-based _intercept_server_stream, which stripped the
grpc.Call interface and caused downstream code to crash with:
AttributeError: 'generator' object has no attribute 'add_done_callback'
"""

def request_messages():
for _ in range(5):
yield Request(client_id=1, request_data="data")

response_iterator = self._stub.BidirectionalStreamingMethod(
request_messages(), metadata=(("key", "value"),)
)

for attr in ("add_done_callback", "cancel", "is_active"):
self.assertTrue(
hasattr(response_iterator, attr),
f"bidi stream response missing grpc.Call method '{attr}'",
)

list(response_iterator)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]

self.assertEqual(
span.name, "/GRPCTestServer/BidirectionalStreamingMethod"
)
self.assertIs(span.kind, trace.SpanKind.CLIENT)
self.assertSpanHasAttributes(
span,
{
RPC_METHOD: "BidirectionalStreamingMethod",
RPC_SERVICE: "GRPCTestServer",
RPC_SYSTEM: "grpc",
RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0],
},
)

def test_error_simple(self):
with self.assertRaises(grpc.RpcError):
simple_method(self._stub, error=True)
Expand All @@ -232,6 +277,10 @@ def test_error_simple(self):
span.status.status_code,
trace.StatusCode.ERROR,
)
self.assertEqual(
span.attributes[RPC_GRPC_STATUS_CODE],
grpc.StatusCode.INVALID_ARGUMENT.value[0],
)

def test_error_stream_unary(self):
with self.assertRaises(grpc.RpcError):
Expand All @@ -244,6 +293,10 @@ def test_error_stream_unary(self):
span.status.status_code,
trace.StatusCode.ERROR,
)
self.assertEqual(
span.attributes[RPC_GRPC_STATUS_CODE],
grpc.StatusCode.INVALID_ARGUMENT.value[0],
)

def test_error_unary_stream(self):
with self.assertRaises(grpc.RpcError):
Expand All @@ -256,6 +309,10 @@ def test_error_unary_stream(self):
span.status.status_code,
trace.StatusCode.ERROR,
)
self.assertEqual(
span.attributes[RPC_GRPC_STATUS_CODE],
grpc.StatusCode.INVALID_ARGUMENT.value[0],
)

def test_error_stream_stream(self):
with self.assertRaises(grpc.RpcError):
Expand All @@ -268,6 +325,10 @@ def test_error_stream_stream(self):
span.status.status_code,
trace.StatusCode.ERROR,
)
self.assertEqual(
span.attributes[RPC_GRPC_STATUS_CODE],
grpc.StatusCode.INVALID_ARGUMENT.value[0],
)

def test_client_interceptor_falsy_response(
self,
Expand Down
Loading