Skip to content

Commit 5d31795

Browse files
Fix gRPC client interceptor breaking bidirectional streaming (#1180)
Route bidi (stream-stream) RPCs through `_intercept` instead of the generator-based `_intercept_server_stream`. The generator wrapper strips the grpc.Call/grpc.Future interface, causing downstream code (e.g. google.api_core.bidi.BidiRpc) to crash with: AttributeError: 'generator' object has no attribute 'add_done_callback' The fix adds `and not client_info.is_client_stream` to the condition in `intercept_stream()` so only unary-stream RPCs use the generator path. Includes a regression test verifying the bidi stream response preserves the grpc.Call interface (add_done_callback, cancel, is_active). Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 2b8ca97 commit 5d31795

3 files changed

Lines changed: 48 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5959
([#4139](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4139))
6060

6161
### Fixed
62+
- `opentelemetry-instrumentation-grpc`: Fix bidirectional streaming RPCs raising `AttributeError: 'generator' object has no attribute 'add_done_callback'` by routing bidi streams through `_intercept` instead of the generator-based `_intercept_server_stream`
63+
([#1180](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1180))
6264
- `opentelemetry-instrumentation-mysql`: Refactor MySQL integration test mocks to use concrete DBAPI connection attributes, reducing noisy attribute type warnings.
6365
([#4116](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4116))
6466
- `opentelemetry-instrumentation-cassandra`: Use `_instruments_any` instead of `_instruments` for driver dependencies so that having either `cassandra-driver` or `scylla-driver` installed is sufficient

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ def intercept_stream(
228228
if self._filter is not None and not self._filter(client_info):
229229
return invoker(request_or_iterator, metadata)
230230

231-
if client_info.is_server_stream:
231+
if client_info.is_server_stream and not client_info.is_client_stream:
232232
return self._intercept_server_stream(
233233
request_or_iterator, metadata, client_info, invoker
234234
)

instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,51 @@ def test_stream_stream(self):
221221
},
222222
)
223223

224+
def test_stream_stream_preserves_call_interface(self):
225+
"""Regression test for issue #1180.
226+
227+
Bidirectional streaming RPCs must return an object that implements
228+
grpc.Call (add_done_callback, cancel, is_active, etc.) rather than
229+
a bare generator. Before the fix, bidi streams were routed through
230+
the generator-based _intercept_server_stream, which stripped the
231+
grpc.Call interface and caused downstream code to crash with:
232+
AttributeError: 'generator' object has no attribute 'add_done_callback'
233+
"""
234+
235+
def request_messages():
236+
for _ in range(5):
237+
yield Request(client_id=1, request_data="data")
238+
239+
response_iterator = self._stub.BidirectionalStreamingMethod(
240+
request_messages(), metadata=(("key", "value"),)
241+
)
242+
243+
for attr in ("add_done_callback", "cancel", "is_active"):
244+
self.assertTrue(
245+
hasattr(response_iterator, attr),
246+
f"bidi stream response missing grpc.Call method '{attr}'",
247+
)
248+
249+
list(response_iterator)
250+
251+
spans = self.memory_exporter.get_finished_spans()
252+
self.assertEqual(len(spans), 1)
253+
span = spans[0]
254+
255+
self.assertEqual(
256+
span.name, "/GRPCTestServer/BidirectionalStreamingMethod"
257+
)
258+
self.assertIs(span.kind, trace.SpanKind.CLIENT)
259+
self.assertSpanHasAttributes(
260+
span,
261+
{
262+
RPC_METHOD: "BidirectionalStreamingMethod",
263+
RPC_SERVICE: "GRPCTestServer",
264+
RPC_SYSTEM: "grpc",
265+
RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0],
266+
},
267+
)
268+
224269
def test_error_simple(self):
225270
with self.assertRaises(grpc.RpcError):
226271
simple_method(self._stub, error=True)

0 commit comments

Comments
 (0)