2424
2525import grpc
2626
27- from opentelemetry import propagators , trace
27+ from opentelemetry import metrics , propagators , trace
28+ from opentelemetry .sdk .metrics .export .controller import PushController
2829from opentelemetry .trace .status import Status , StatusCanonicalCode
2930
3031from . import grpcext
31- from ._utilities import RpcInfo
32+ from ._utilities import RpcInfo , TimedMetricRecorder
3233
3334
3435class _GuardedSpan :
@@ -63,7 +64,7 @@ def append_metadata(
6364 propagators .inject (append_metadata , metadata )
6465
6566
66- def _make_future_done_callback (span , rpc_info ):
67+ def _make_future_done_callback (span , rpc_info , client_info , metrics_recorder ):
6768 def callback (response_future ):
6869 with span :
6970 code = response_future .code ()
@@ -72,28 +73,45 @@ def callback(response_future):
7273 return
7374 response = response_future .result ()
7475 rpc_info .response = response
76+ if "ByteSize" in dir (response ):
77+ metrics_recorder .record_bytes_in (
78+ response .ByteSize (), client_info .full_method
79+ )
7580
7681 return callback
7782
7883
7984class OpenTelemetryClientInterceptor (
8085 grpcext .UnaryClientInterceptor , grpcext .StreamClientInterceptor
8186):
82- def __init__ (self , tracer ):
87+ def __init__ (self , tracer , exporter , interval ):
8388 self ._tracer = tracer
8489
90+ self ._meter = None
91+ if exporter and interval :
92+ self ._meter = metrics .get_meter (__name__ )
93+ self .controller = PushController (
94+ meter = self ._meter , exporter = exporter , interval = interval
95+ )
96+ self ._metrics_recorder = TimedMetricRecorder (self ._meter , "client" )
97+
8598 def _start_span (self , method ):
8699 return self ._tracer .start_as_current_span (
87100 name = method , kind = trace .SpanKind .CLIENT
88101 )
89102
90103 # pylint:disable=no-self-use
91- def _trace_result (self , guarded_span , rpc_info , result ):
104+ def _trace_result (self , guarded_span , rpc_info , result , client_info ):
92105 # If the RPC is called asynchronously, release the guard and add a
93106 # callback so that the span can be finished once the future is done.
94107 if isinstance (result , grpc .Future ):
95108 result .add_done_callback (
96- _make_future_done_callback (guarded_span .release (), rpc_info )
109+ _make_future_done_callback (
110+ guarded_span .release (),
111+ rpc_info ,
112+ client_info ,
113+ self ._metrics_recorder ,
114+ )
97115 )
98116 return result
99117 response = result
@@ -104,37 +122,62 @@ def _trace_result(self, guarded_span, rpc_info, result):
104122 if isinstance (result , tuple ):
105123 response = result [0 ]
106124 rpc_info .response = response
125+
126+ if "ByteSize" in dir (response ):
127+ self ._metrics_recorder .record_bytes_in (
128+ response .ByteSize (), client_info .full_method
129+ )
107130 return result
108131
109132 def _start_guarded_span (self , * args , ** kwargs ):
110133 return _GuardedSpan (self ._start_span (* args , ** kwargs ))
111134
135+ def _bytes_out_iterator_wrapper (self , iterator , client_info ):
136+ for request in iterator :
137+ if "ByteSize" in dir (request ):
138+ self ._metrics_recorder .record_bytes_out (
139+ request .ByteSize (), client_info .full_method
140+ )
141+ yield request
142+
112143 def intercept_unary (self , request , metadata , client_info , invoker ):
113144 if not metadata :
114145 mutable_metadata = OrderedDict ()
115146 else :
116147 mutable_metadata = OrderedDict (metadata )
117148
118149 with self ._start_guarded_span (client_info .full_method ) as guarded_span :
119- _inject_span_context (mutable_metadata )
120- metadata = tuple (mutable_metadata .items ())
121-
122- rpc_info = RpcInfo (
123- full_method = client_info .full_method ,
124- metadata = metadata ,
125- timeout = client_info .timeout ,
126- request = request ,
127- )
128-
129- try :
130- result = invoker (request , metadata )
131- except grpc .RpcError as exc :
132- guarded_span .generated_span .set_status (
133- Status (StatusCanonicalCode (exc .code ().value [0 ]))
150+ with self ._metrics_recorder .record_latency (
151+ client_info .full_method
152+ ):
153+ _inject_span_context (mutable_metadata )
154+ metadata = tuple (mutable_metadata .items ())
155+
156+ # If protobuf is used, we can record the bytes in/out. Otherwise, we have no way
157+ # to get the size of the request/response properly, so don't record anything
158+ if "ByteSize" in dir (request ):
159+ self ._metrics_recorder .record_bytes_out (
160+ request .ByteSize (), client_info .full_method
161+ )
162+
163+ rpc_info = RpcInfo (
164+ full_method = client_info .full_method ,
165+ metadata = metadata ,
166+ timeout = client_info .timeout ,
167+ request = request ,
134168 )
135- raise
136169
137- return self ._trace_result (guarded_span , rpc_info , result )
170+ try :
171+ result = invoker (request , metadata )
172+ except grpc .RpcError as exc :
173+ guarded_span .generated_span .set_status (
174+ Status (StatusCanonicalCode (exc .code ().value [0 ]))
175+ )
176+ raise
177+
178+ return self ._trace_result (
179+ guarded_span , rpc_info , result , client_info
180+ )
138181
139182 # For RPCs that stream responses, the result can be a generator. To record
140183 # the span across the generated responses and detect any errors, we wrap
@@ -148,25 +191,44 @@ def _intercept_server_stream(
148191 mutable_metadata = OrderedDict (metadata )
149192
150193 with self ._start_span (client_info .full_method ) as span :
151- _inject_span_context (mutable_metadata )
152- metadata = tuple (mutable_metadata .items ())
153- rpc_info = RpcInfo (
154- full_method = client_info .full_method ,
155- metadata = metadata ,
156- timeout = client_info .timeout ,
157- )
158- if client_info .is_client_stream :
159- rpc_info .request = request_or_iterator
160-
161- try :
162- result = invoker (request_or_iterator , metadata )
163- for response in result :
164- yield response
165- except grpc .RpcError as exc :
166- span .set_status (
167- Status (StatusCanonicalCode (exc .code ().value [0 ]))
194+ with self ._metrics_recorder .record_latency (
195+ client_info .full_method
196+ ):
197+ _inject_span_context (mutable_metadata )
198+ metadata = tuple (mutable_metadata .items ())
199+ rpc_info = RpcInfo (
200+ full_method = client_info .full_method ,
201+ metadata = metadata ,
202+ timeout = client_info .timeout ,
168203 )
169- raise
204+
205+ if client_info .is_client_stream :
206+ rpc_info .request = request_or_iterator
207+ request_or_iterator = self ._bytes_out_iterator_wrapper (
208+ request_or_iterator , client_info
209+ )
210+ else :
211+ if "ByteSize" in dir (request_or_iterator ):
212+ self ._metrics_recorder .record_bytes_out (
213+ request_or_iterator .ByteSize (),
214+ client_info .full_method ,
215+ )
216+
217+ try :
218+ result = invoker (request_or_iterator , metadata )
219+
220+ # Rewrap the result stream into a generator, and record the bytes received
221+ for response in result :
222+ if "ByteSize" in dir (response ):
223+ self ._metrics_recorder .record_bytes_in (
224+ response .ByteSize (), client_info .full_method
225+ )
226+ yield response
227+ except grpc .RpcError as exc :
228+ span .set_status (
229+ Status (StatusCanonicalCode (exc .code ().value [0 ]))
230+ )
231+ raise
170232
171233 def intercept_stream (
172234 self , request_or_iterator , metadata , client_info , invoker
@@ -182,21 +244,32 @@ def intercept_stream(
182244 mutable_metadata = OrderedDict (metadata )
183245
184246 with self ._start_guarded_span (client_info .full_method ) as guarded_span :
185- _inject_span_context (mutable_metadata )
186- metadata = tuple (mutable_metadata .items ())
187- rpc_info = RpcInfo (
188- full_method = client_info .full_method ,
189- metadata = metadata ,
190- timeout = client_info .timeout ,
191- request = request_or_iterator ,
192- )
247+ with self ._metrics_recorder .record_latency (
248+ client_info .full_method
249+ ):
250+ _inject_span_context (mutable_metadata )
251+ metadata = tuple (mutable_metadata .items ())
252+ rpc_info = RpcInfo (
253+ full_method = client_info .full_method ,
254+ metadata = metadata ,
255+ timeout = client_info .timeout ,
256+ request = request_or_iterator ,
257+ )
258+
259+ rpc_info .request = request_or_iterator
193260
194- try :
195- result = invoker (request_or_iterator , metadata )
196- except grpc .RpcError as exc :
197- guarded_span .generated_span .set_status (
198- Status (StatusCanonicalCode (exc .code ().value [0 ]))
261+ request_or_iterator = self ._bytes_out_iterator_wrapper (
262+ request_or_iterator , client_info
199263 )
200- raise
201264
202- return self ._trace_result (guarded_span , rpc_info , result )
265+ try :
266+ result = invoker (request_or_iterator , metadata )
267+ except grpc .RpcError as exc :
268+ guarded_span .generated_span .set_status (
269+ Status (StatusCanonicalCode (exc .code ().value [0 ]))
270+ )
271+ raise
272+
273+ return self ._trace_result (
274+ guarded_span , rpc_info , result , client_info
275+ )
0 commit comments