Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/4621.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`opentelemetry-instrumentation-grpc`: add rpc.client.call.duration and rpc.server.call.duration metrics for both sync and async components
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ async def serve():
from opentelemetry.instrumentation.grpc.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter

# pylint:disable=import-outside-toplevel
# pylint:disable=import-self
Expand Down Expand Up @@ -457,6 +458,8 @@ def instrumentation_dependencies(self) -> Collection[str]:
def _instrument(self, **kwargs):
self._request_hook = kwargs.get("request_hook")
self._response_hook = kwargs.get("response_hook")
self._tracer_provider = kwargs.get("tracer_provider")
self._meter_provider = kwargs.get("meter_provider")
for ctype in self._which_channel(kwargs):
_wrap(
"grpc",
Expand All @@ -470,16 +473,16 @@ def _uninstrument(self, **kwargs):

def wrapper_fn(self, original_func, instance, args, kwargs):
channel = original_func(*args, **kwargs)
tracer_provider = kwargs.get("tracer_provider")
request_hook = self._request_hook
response_hook = self._response_hook
target = args[0] if args else None
return intercept_channel(
channel,
client_interceptor(
tracer_provider=tracer_provider,
tracer_provider=self._tracer_provider,
filter_=self._filter,
request_hook=request_hook,
response_hook=response_hook,
request_hook=self._request_hook,
response_hook=self._response_hook,
meter_provider=self._meter_provider,
target=target,
),
Comment on lines 474 to 486
)

Expand Down Expand Up @@ -511,25 +514,21 @@ def __init__(self, filter_=None):
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _add_interceptors(self, tracer_provider, kwargs):
def _add_interceptors(self, tracer_provider, meter_provider, target, kwargs):
interceptors = aio_client_interceptors(
tracer_provider=tracer_provider,
filter_=self._filter,
request_hook=self._request_hook,
response_hook=self._response_hook,
meter_provider=meter_provider,
target=target,
)
if "interceptors" in kwargs and kwargs["interceptors"]:
kwargs["interceptors"] = list(kwargs["interceptors"])
kwargs["interceptors"] = (
aio_client_interceptors(
tracer_provider=tracer_provider,
filter_=self._filter,
request_hook=self._request_hook,
response_hook=self._response_hook,
)
+ kwargs["interceptors"]
kwargs["interceptors"] = interceptors + list(
kwargs["interceptors"]
)
else:
kwargs["interceptors"] = aio_client_interceptors(
tracer_provider=tracer_provider,
filter_=self._filter,
request_hook=self._request_hook,
response_hook=self._response_hook,
)
kwargs["interceptors"] = interceptors

return kwargs

Expand All @@ -539,15 +538,20 @@ def _instrument(self, **kwargs):
self._request_hook = kwargs.get("request_hook")
self._response_hook = kwargs.get("response_hook")
tracer_provider = kwargs.get("tracer_provider")
meter_provider = kwargs.get("meter_provider")

def insecure(*args, **kwargs):
kwargs = self._add_interceptors(tracer_provider, kwargs)

target = args[0] if args else None
kwargs = self._add_interceptors(
tracer_provider, meter_provider, target, kwargs
)
return self._original_insecure(*args, **kwargs)

def secure(*args, **kwargs):
kwargs = self._add_interceptors(tracer_provider, kwargs)

target = args[0] if args else None
kwargs = self._add_interceptors(
tracer_provider, meter_provider, target, kwargs
)
return self._original_secure(*args, **kwargs)

Comment on lines 543 to 556
grpc.aio.insecure_channel = insecure
Expand All @@ -559,7 +563,12 @@ def _uninstrument(self, **kwargs):


def client_interceptor(
tracer_provider=None, filter_=None, request_hook=None, response_hook=None
tracer_provider=None,
filter_=None,
request_hook=None,
response_hook=None,
meter_provider=None,
target=None,
):
"""Create a gRPC client channel interceptor.

Expand All @@ -570,6 +579,10 @@ def client_interceptor(
matches the condition. Default is None and intercept
all requests.

meter_provider: The meter provider to use for metrics.

target: The target address of the channel (e.g. "host:port").

Returns:
An invocation-side interceptor object.
"""
Expand All @@ -582,15 +595,23 @@ def client_interceptor(
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

meter = get_meter(
__name__,
__version__,
meter_provider,
)

return _client.OpenTelemetryClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
meter=meter,
target=target,
)


def server_interceptor(tracer_provider=None, filter_=None):
def server_interceptor(tracer_provider=None, filter_=None, meter_provider=None):
"""Create a gRPC server interceptor.

Args:
Expand All @@ -600,6 +621,8 @@ def server_interceptor(tracer_provider=None, filter_=None):
matches the condition. Default is None and intercept
all requests.

meter_provider: The meter provider to use for metrics.

Returns:
A service-side interceptor object.
"""
Expand All @@ -612,17 +635,34 @@ def server_interceptor(tracer_provider=None, filter_=None):
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_)
meter = get_meter(
__name__,
__version__,
meter_provider,
)
Comment on lines +638 to +642

return _server.OpenTelemetryServerInterceptor(
tracer, filter_=filter_, meter=meter
)


def aio_client_interceptors(
tracer_provider=None, filter_=None, request_hook=None, response_hook=None
tracer_provider=None,
filter_=None,
request_hook=None,
response_hook=None,
meter_provider=None,
target=None,
):
"""Create a gRPC client channel interceptor.

Args:
tracer: The tracer to use to create client-side spans.

meter_provider: The meter provider to use for metrics.

target: The target address of the channel (e.g. "host:port").

Returns:
An invocation-side interceptor object.
"""
Expand All @@ -635,40 +675,38 @@ def aio_client_interceptors(
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

meter = get_meter(
__name__,
__version__,
meter_provider,
)

common_kwargs = {
"filter_": filter_,
"request_hook": request_hook,
"response_hook": response_hook,
"meter": meter,
"target": target,
}

return [
_aio_client.UnaryUnaryAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
),
_aio_client.UnaryStreamAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
),
_aio_client.StreamUnaryAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
),
_aio_client.UnaryUnaryAioClientInterceptor(tracer, **common_kwargs),
_aio_client.UnaryStreamAioClientInterceptor(tracer, **common_kwargs),
_aio_client.StreamUnaryAioClientInterceptor(tracer, **common_kwargs),
_aio_client.StreamStreamAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
tracer, **common_kwargs
),
]


def aio_server_interceptor(tracer_provider=None, filter_=None):
def aio_server_interceptor(tracer_provider=None, filter_=None, meter_provider=None):
"""Create a gRPC aio server interceptor.

Args:
tracer: The tracer to use to create server-side spans.

meter_provider: The meter provider to use for metrics.

Returns:
A service-side interceptor object.
"""
Expand All @@ -681,8 +719,14 @@ def aio_server_interceptor(tracer_provider=None, filter_=None):
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

meter = get_meter(
__name__,
__version__,
meter_provider,
)

return _aio_server.OpenTelemetryAioServerInterceptor(
tracer, filter_=filter_
tracer, filter_=filter_, meter=meter
)


Expand Down
Loading
Loading