Skip to content

Commit bd01dcd

Browse files
juandelacruz-calvocursoragentxrmx
authored
Fix gRPC client interceptor breaking bidirectional streaming (open-telemetry#1180) (open-telemetry#4259)
* Fix gRPC client interceptor breaking bidirectional streaming (open-telemetry#1180) Route bidi (stream-stream) RPCs through `_intercept` instead of the generator-based `_intercept_server_stream`. The generator wrapper strips the grpc.Call/grpc.Future interface, causing downstream code (e.g. google.api_core.bidi.BidiRpc) to crash with: AttributeError: 'generator' object has no attribute 'add_done_callback' The fix adds `and not client_info.is_client_stream` to the condition in `intercept_stream()` so only unary-stream RPCs use the generator path. Includes a regression test verifying the bidi stream response preserves the grpc.Call interface (add_done_callback, cancel, is_active). Co-authored-by: Cursor <cursoragent@cursor.com> * Add gRPC status code attributes to error handling in client * Remove redundant status description from span error status The gRPC status code is already captured in the RPC_GRPC_STATUS_CODE attribute, making the description in span status unnecessary. * Update CHANGELOG.md * Update CHANGELOG.md * Add RPC_GRPC_STATUS_CODE assertions to client error tests Verify that error spans include the correct gRPC status code attribute (INVALID_ARGUMENT) in both sync and async client interceptor tests. --------- Co-authored-by: Cursor <cursoragent@cursor.com> Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
1 parent 7930a15 commit bd01dcd

4 files changed

Lines changed: 86 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2828
([#4321](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4321))
2929
- `opentelemetry-instrumentation-logging`: Add recursion guard in LoggingHandler.emit to prevent deadlock
3030
([#4302](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4302))
31+
- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'`
32+
([#4259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4259))
3133

3234
### Breaking changes
3335

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ def callback(response_future):
6060
code = response_future.code()
6161
if code != grpc.StatusCode.OK:
6262
rpc_info.error = code
63+
span.set_attribute(RPC_GRPC_STATUS_CODE, code.value[0])
64+
span.set_status(
65+
Status(
66+
status_code=StatusCode.ERROR,
67+
)
68+
)
6369
return
6470
response = response_future.result()
6571
rpc_info.response = response
@@ -228,7 +234,7 @@ def intercept_stream(
228234
if self._filter is not None and not self._filter(client_info):
229235
return invoker(request_or_iterator, metadata)
230236

231-
if client_info.is_server_stream:
237+
if client_info.is_server_stream and not client_info.is_client_stream:
232238
return self._intercept_server_stream(
233239
request_or_iterator, metadata, client_info, invoker
234240
)

instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,10 @@ async def test_error_simple(self):
224224
span.status.status_code,
225225
trace.StatusCode.ERROR,
226226
)
227+
self.assertEqual(
228+
span.attributes[RPC_GRPC_STATUS_CODE],
229+
grpc.StatusCode.INVALID_ARGUMENT.value[0],
230+
)
227231

228232
async def test_error_unary_stream(self):
229233
with self.assertRaises(grpc.RpcError):
@@ -237,6 +241,10 @@ async def test_error_unary_stream(self):
237241
span.status.status_code,
238242
trace.StatusCode.ERROR,
239243
)
244+
self.assertEqual(
245+
span.attributes[RPC_GRPC_STATUS_CODE],
246+
grpc.StatusCode.INVALID_ARGUMENT.value[0],
247+
)
240248

241249
async def test_error_stream_unary(self):
242250
with self.assertRaises(grpc.RpcError):
@@ -249,6 +257,10 @@ async def test_error_stream_unary(self):
249257
span.status.status_code,
250258
trace.StatusCode.ERROR,
251259
)
260+
self.assertEqual(
261+
span.attributes[RPC_GRPC_STATUS_CODE],
262+
grpc.StatusCode.INVALID_ARGUMENT.value[0],
263+
)
252264

253265
async def test_error_stream_stream(self):
254266
with self.assertRaises(grpc.RpcError):
@@ -264,6 +276,10 @@ async def test_error_stream_stream(self):
264276
span.status.status_code,
265277
trace.StatusCode.ERROR,
266278
)
279+
self.assertEqual(
280+
span.attributes[RPC_GRPC_STATUS_CODE],
281+
grpc.StatusCode.INVALID_ARGUMENT.value[0],
282+
)
267283

268284
# pylint:disable=no-self-use
269285
async def test_client_interceptor_trace_context_propagation(self):

instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,51 @@ def test_stream_stream(self):
221221
},
222222
)
223223

224+
def test_stream_stream_preserves_call_interface(self):
225+
"""Regression test for issue #1180.
226+
227+
Bidirectional streaming RPCs must return an object that implements
228+
grpc.Call (add_done_callback, cancel, is_active, etc.) rather than
229+
a bare generator. Before the fix, bidi streams were routed through
230+
the generator-based _intercept_server_stream, which stripped the
231+
grpc.Call interface and caused downstream code to crash with:
232+
AttributeError: 'generator' object has no attribute 'add_done_callback'
233+
"""
234+
235+
def request_messages():
236+
for _ in range(5):
237+
yield Request(client_id=1, request_data="data")
238+
239+
response_iterator = self._stub.BidirectionalStreamingMethod(
240+
request_messages(), metadata=(("key", "value"),)
241+
)
242+
243+
for attr in ("add_done_callback", "cancel", "is_active"):
244+
self.assertTrue(
245+
hasattr(response_iterator, attr),
246+
f"bidi stream response missing grpc.Call method '{attr}'",
247+
)
248+
249+
list(response_iterator)
250+
251+
spans = self.memory_exporter.get_finished_spans()
252+
self.assertEqual(len(spans), 1)
253+
span = spans[0]
254+
255+
self.assertEqual(
256+
span.name, "/GRPCTestServer/BidirectionalStreamingMethod"
257+
)
258+
self.assertIs(span.kind, trace.SpanKind.CLIENT)
259+
self.assertSpanHasAttributes(
260+
span,
261+
{
262+
RPC_METHOD: "BidirectionalStreamingMethod",
263+
RPC_SERVICE: "GRPCTestServer",
264+
RPC_SYSTEM: "grpc",
265+
RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0],
266+
},
267+
)
268+
224269
def test_error_simple(self):
225270
with self.assertRaises(grpc.RpcError):
226271
simple_method(self._stub, error=True)
@@ -232,6 +277,10 @@ def test_error_simple(self):
232277
span.status.status_code,
233278
trace.StatusCode.ERROR,
234279
)
280+
self.assertEqual(
281+
span.attributes[RPC_GRPC_STATUS_CODE],
282+
grpc.StatusCode.INVALID_ARGUMENT.value[0],
283+
)
235284

236285
def test_error_stream_unary(self):
237286
with self.assertRaises(grpc.RpcError):
@@ -244,6 +293,10 @@ def test_error_stream_unary(self):
244293
span.status.status_code,
245294
trace.StatusCode.ERROR,
246295
)
296+
self.assertEqual(
297+
span.attributes[RPC_GRPC_STATUS_CODE],
298+
grpc.StatusCode.INVALID_ARGUMENT.value[0],
299+
)
247300

248301
def test_error_unary_stream(self):
249302
with self.assertRaises(grpc.RpcError):
@@ -256,6 +309,10 @@ def test_error_unary_stream(self):
256309
span.status.status_code,
257310
trace.StatusCode.ERROR,
258311
)
312+
self.assertEqual(
313+
span.attributes[RPC_GRPC_STATUS_CODE],
314+
grpc.StatusCode.INVALID_ARGUMENT.value[0],
315+
)
259316

260317
def test_error_stream_stream(self):
261318
with self.assertRaises(grpc.RpcError):
@@ -268,6 +325,10 @@ def test_error_stream_stream(self):
268325
span.status.status_code,
269326
trace.StatusCode.ERROR,
270327
)
328+
self.assertEqual(
329+
span.attributes[RPC_GRPC_STATUS_CODE],
330+
grpc.StatusCode.INVALID_ARGUMENT.value[0],
331+
)
271332

272333
def test_client_interceptor_falsy_response(
273334
self,

0 commit comments

Comments
 (0)