Skip to content

Commit ce09c31

Browse files
UOndroAidan Jensen
authored andcommitted
Revert "Wrap strean response in proxy that manage span lifetime"
This reverts commit d0abacf
1 parent 58683c1 commit ce09c31

4 files changed

Lines changed: 49 additions & 132 deletions

File tree

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py

Lines changed: 29 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@
2121

2222
import logging
2323
from collections import OrderedDict
24+
from functools import partial
2425
from typing import Callable, MutableMapping
2526

2627
import grpc
27-
import wrapt
28-
29-
from opentelemetry import context, trace
28+
from opentelemetry import trace
3029
from opentelemetry.instrumentation.grpc import grpcext
3130
from opentelemetry.instrumentation.grpc._utilities import RpcInfo
3231
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
@@ -78,59 +77,11 @@ def _safe_invoke(function: Callable, *args):
7877
"Error when invoking function '%s'", function_name, exc_info=ex
7978
)
8079

81-
82-
class OpenTelemetryStreamWrapper(wrapt.ObjectProxy):
83-
def __init__(self, wrapped, span: trace.Span):
84-
super().__init__(wrapped)
85-
self._self_span = span
86-
87-
def _end_span_if_not_already_ended(self, status_code=None, status=None):
88-
if self._self_span.end_time is None:
89-
self._self_span.end()
90-
if status_code is not None:
91-
self._self_span.set_attribute(
92-
SpanAttributes.RPC_GRPC_STATUS_CODE, status_code
93-
)
94-
if status is not None:
95-
self._self_span.set_status(status)
96-
97-
def __del__(self):
98-
self._end_span_if_not_already_ended()
99-
self.__wrapped__.__del__()
100-
101-
def __iter__(self):
102-
return self
103-
104-
def cancel(self):
105-
self._end_span_if_not_already_ended(
106-
status_code=grpc.StatusCode.CANCELLED.value[0]
107-
)
108-
return self.__wrapped__.cancel()
109-
110-
def __next__(self):
111-
return self._next()
112-
113-
def next(self):
114-
return self._next()
115-
116-
def _next(self):
117-
try:
118-
return self.__wrapped__._next()
119-
except StopIteration:
120-
self._end_span_if_not_already_ended()
121-
raise
122-
except grpc.RpcError as err:
123-
self._end_span_if_not_already_ended(
124-
err.code().value[0], Status(StatusCode.ERROR)
125-
)
126-
raise err
127-
128-
12980
class OpenTelemetryClientInterceptor(
13081
grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor
13182
):
13283
def __init__(
133-
self, tracer, filter_=None, request_hook=None, response_hook=None
84+
self, tracer, filter_=None, request_hook=None, response_hook=None
13485
):
13586
self._tracer = tracer
13687
self._filter = filter_
@@ -184,10 +135,10 @@ def _intercept(self, request, metadata, client_info, invoker):
184135
else:
185136
mutable_metadata = OrderedDict(metadata)
186137
with self._start_span(
187-
client_info.full_method,
188-
end_on_exit=False,
189-
record_exception=False,
190-
set_status_on_exception=False,
138+
client_info.full_method,
139+
end_on_exit=False,
140+
record_exception=False,
141+
set_status_on_exception=False,
191142
) as span:
192143
result = None
193144
try:
@@ -241,15 +192,16 @@ def intercept_unary(self, request, metadata, client_info, invoker):
241192
# the span across the generated responses and detect any errors, we wrap
242193
# the result in a new generator that yields the response values.
243194
def _intercept_server_stream(
244-
self, request_or_iterator, metadata, client_info, invoker
195+
self, request_or_iterator, metadata, client_info, invoker
245196
):
246197
if not metadata:
247198
mutable_metadata = OrderedDict()
248199
else:
249200
mutable_metadata = OrderedDict(metadata)
250201

251202
with self._start_span(
252-
client_info.full_method, end_on_exit=False
203+
client_info.full_method,
204+
end_on_exit=False
253205
) as span:
254206
inject(mutable_metadata, setter=_carrier_setter)
255207
metadata = tuple(mutable_metadata.items())
@@ -264,10 +216,27 @@ def _intercept_server_stream(
264216

265217
stream = invoker(request_or_iterator, metadata)
266218

267-
return OpenTelemetryStreamWrapper(stream, span)
219+
def done_callback(future, span_):
220+
try:
221+
future.result()
222+
except grpc.FutureCancelledError:
223+
span_.set_status(Status(StatusCode.OK))
224+
span_.set_attribute(
225+
SpanAttributes.RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0]
226+
)
227+
except grpc.RpcError as err:
228+
span_.set_status(Status(StatusCode.ERROR))
229+
span_.set_attribute(
230+
SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0]
231+
)
232+
finally:
233+
span_.end()
234+
235+
stream.add_done_callback(partial(done_callback, span_=span))
236+
return stream
268237

269238
def intercept_stream(
270-
self, request_or_iterator, metadata, client_info, invoker
239+
self, request_or_iterator, metadata, client_info, invoker
271240
):
272241
if not is_instrumentation_enabled():
273242
return invoker(request_or_iterator, metadata)

instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def server_streaming_method(stub, error=False, serialize=True):
5757
return response_iterator
5858

5959

60-
def bidirectional_streaming_method(stub, error=False, serialize=True):
60+
def bidirectional_streaming_method(stub, error=False):
6161
def request_messages():
6262
for _ in range(5):
6363
request = Request(
@@ -68,6 +68,5 @@ def request_messages():
6868
response_iterator = stub.ServerStreamingMethod(
6969
request, metadata=(("key", "value"),)
7070
)
71-
if serialize:
72-
list(response_iterator)
73-
return response_iterator
71+
72+
list(response_iterator)

instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py

Lines changed: 14 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
# pylint:disable=cyclic-import
15+
from time import sleep
1516

1617
from unittest import mock
1718

@@ -61,32 +62,32 @@ def __init__(self):
6162
pass
6263

6364
def intercept_unary_unary(
64-
self, continuation, client_call_details, request
65+
self, continuation, client_call_details, request
6566
):
6667
return self._intercept_call(continuation, client_call_details, request)
6768

6869
def intercept_unary_stream(
69-
self, continuation, client_call_details, request
70+
self, continuation, client_call_details, request
7071
):
7172
return self._intercept_call(continuation, client_call_details, request)
7273

7374
def intercept_stream_unary(
74-
self, continuation, client_call_details, request_iterator
75+
self, continuation, client_call_details, request_iterator
7576
):
7677
return self._intercept_call(
7778
continuation, client_call_details, request_iterator
7879
)
7980

8081
def intercept_stream_stream(
81-
self, continuation, client_call_details, request_iterator
82+
self, continuation, client_call_details, request_iterator
8283
):
8384
return self._intercept_call(
8485
continuation, client_call_details, request_iterator
8586
)
8687

8788
@staticmethod
8889
def _intercept_call(
89-
continuation, client_call_details, request_or_iterator
90+
continuation, client_call_details, request_or_iterator
9091
):
9192
return continuation(client_call_details, request_or_iterator)
9293

@@ -99,7 +100,9 @@ def setUp(self):
99100
self.server.start()
100101
# use a user defined interceptor along with the opentelemetry client interceptor
101102
interceptors = [Interceptor()]
102-
self.channel = grpc.insecure_channel("localhost:25565")
103+
self.channel = grpc.insecure_channel("localhost:25565", options=[
104+
# (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
105+
])
103106
self.channel = grpc.intercept_channel(self.channel, *interceptors)
104107
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
105108

@@ -172,11 +175,14 @@ def test_unary_stream(self):
172175
)
173176

174177
def test_unary_stream_can_be_cancel(self):
175-
responses = server_streaming_method(self._stub, serialize=False)
178+
responses = server_streaming_method(self._stub)
176179
for i, _ in enumerate(responses):
177180
if i == 1:
178181
responses.cancel()
179182
break
183+
sleep(10)
184+
self.server.stop(None)
185+
self.channel.close()
180186
spans = self.memory_exporter.get_finished_spans()
181187
self.assertEqual(len(spans), 1)
182188
span = spans[0]
@@ -201,33 +207,6 @@ def test_unary_stream_can_be_cancel(self):
201207
},
202208
)
203209

204-
def test_finished_stream_cancel_does_not_change_status_of_span(self):
205-
responses = server_streaming_method(self._stub, serialize=True)
206-
responses.cancel()
207-
spans = self.memory_exporter.get_finished_spans()
208-
self.assertEqual(len(spans), 1)
209-
span = spans[0]
210-
211-
self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod")
212-
self.assertIs(span.kind, trace.SpanKind.CLIENT)
213-
214-
# Check version and name in span's instrumentation info
215-
self.assertEqualSpanInstrumentationInfo(
216-
span, opentelemetry.instrumentation.grpc
217-
)
218-
219-
self.assertSpanHasAttributes(
220-
span,
221-
{
222-
SpanAttributes.RPC_METHOD: "ServerStreamingMethod",
223-
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
224-
SpanAttributes.RPC_SYSTEM: "grpc",
225-
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[
226-
0
227-
],
228-
},
229-
)
230-
231210
def test_stream_unary(self):
232211
client_streaming_method(self._stub)
233212
spans = self.memory_exporter.get_finished_spans()
@@ -278,38 +257,6 @@ def test_stream_stream(self):
278257
},
279258
)
280259

281-
def test_stream_stream_can_be_cancel(self):
282-
responses = bidirectional_streaming_method(self._stub, serialize=False)
283-
for i, _ in enumerate(responses):
284-
if i == 1:
285-
responses.cancel()
286-
break
287-
spans = self.memory_exporter.get_finished_spans()
288-
self.assertEqual(len(spans), 1)
289-
span = spans[0]
290-
291-
self.assertEqual(
292-
span.name, "/GRPCTestServer/BidirectionalStreamingMethod"
293-
)
294-
self.assertIs(span.kind, trace.SpanKind.CLIENT)
295-
296-
# Check version and name in span's instrumentation info
297-
self.assertEqualSpanInstrumentationInfo(
298-
span, opentelemetry.instrumentation.grpc
299-
)
300-
301-
self.assertSpanHasAttributes(
302-
span,
303-
{
304-
SpanAttributes.RPC_METHOD: "BidirectionalStreamingMethod",
305-
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
306-
SpanAttributes.RPC_SYSTEM: "grpc",
307-
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[
308-
0
309-
],
310-
},
311-
)
312-
313260
def test_error_simple(self):
314261
with self.assertRaises(grpc.RpcError):
315262
simple_method(self._stub, error=True)
@@ -385,7 +332,7 @@ def invoker(_request, _metadata):
385332
self.assertEqual(span_end_mock.call_count, 1)
386333

387334
def test_client_interceptor_trace_context_propagation(
388-
self,
335+
self,
389336
): # pylint: disable=no-self-use
390337
"""ensure that client interceptor correctly inject trace context into all outgoing requests."""
391338
previous_propagator = get_global_textmap()

instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ def setUp(self):
101101
self.server.start()
102102
# use a user defined interceptor along with the opentelemetry client interceptor
103103
interceptors = [Interceptor()]
104-
self.channel = grpc.insecure_channel("localhost:25565")
104+
self.channel = grpc.insecure_channel("localhost:25565",options=[
105+
(grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
106+
])
105107
self.channel = grpc.intercept_channel(self.channel, *interceptors)
106108
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
107109

0 commit comments

Comments
 (0)