Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/4621.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`opentelemetry-instrumentation-grpc`: add rpc.client.call.duration and rpc.server.call.duration metrics for both sync and async components
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ async def serve():
from opentelemetry.instrumentation.grpc.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter

# pylint:disable=import-outside-toplevel
# pylint:disable=import-self
Expand Down Expand Up @@ -457,6 +458,8 @@ def instrumentation_dependencies(self) -> Collection[str]:
def _instrument(self, **kwargs):
self._request_hook = kwargs.get("request_hook")
self._response_hook = kwargs.get("response_hook")
self._tracer_provider = kwargs.get("tracer_provider")
self._meter_provider = kwargs.get("meter_provider")
for ctype in self._which_channel(kwargs):
_wrap(
"grpc",
Expand All @@ -470,16 +473,16 @@ def _uninstrument(self, **kwargs):

def wrapper_fn(self, original_func, instance, args, kwargs):
channel = original_func(*args, **kwargs)
tracer_provider = kwargs.get("tracer_provider")
request_hook = self._request_hook
response_hook = self._response_hook
target = args[0] if args else None
return intercept_channel(
channel,
client_interceptor(
tracer_provider=tracer_provider,
tracer_provider=self._tracer_provider,
filter_=self._filter,
request_hook=request_hook,
response_hook=response_hook,
request_hook=self._request_hook,
response_hook=self._response_hook,
meter_provider=self._meter_provider,
target=target,
),
Comment on lines 474 to 486
)

Expand Down Expand Up @@ -511,25 +514,21 @@ def __init__(self, filter_=None):
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _add_interceptors(self, tracer_provider, kwargs):
def _add_interceptors(self, tracer_provider, meter_provider, target, kwargs):
interceptors = aio_client_interceptors(
tracer_provider=tracer_provider,
filter_=self._filter,
request_hook=self._request_hook,
response_hook=self._response_hook,
meter_provider=meter_provider,
target=target,
)
if "interceptors" in kwargs and kwargs["interceptors"]:
kwargs["interceptors"] = list(kwargs["interceptors"])
kwargs["interceptors"] = (
aio_client_interceptors(
tracer_provider=tracer_provider,
filter_=self._filter,
request_hook=self._request_hook,
response_hook=self._response_hook,
)
+ kwargs["interceptors"]
kwargs["interceptors"] = interceptors + list(
kwargs["interceptors"]
)
else:
kwargs["interceptors"] = aio_client_interceptors(
tracer_provider=tracer_provider,
filter_=self._filter,
request_hook=self._request_hook,
response_hook=self._response_hook,
)
kwargs["interceptors"] = interceptors

return kwargs

Expand All @@ -539,15 +538,20 @@ def _instrument(self, **kwargs):
self._request_hook = kwargs.get("request_hook")
self._response_hook = kwargs.get("response_hook")
tracer_provider = kwargs.get("tracer_provider")
meter_provider = kwargs.get("meter_provider")

def insecure(*args, **kwargs):
kwargs = self._add_interceptors(tracer_provider, kwargs)

target = args[0] if args else None
kwargs = self._add_interceptors(
tracer_provider, meter_provider, target, kwargs
)
return self._original_insecure(*args, **kwargs)

def secure(*args, **kwargs):
kwargs = self._add_interceptors(tracer_provider, kwargs)

target = args[0] if args else None
kwargs = self._add_interceptors(
tracer_provider, meter_provider, target, kwargs
)
return self._original_secure(*args, **kwargs)

Comment on lines 543 to 556
grpc.aio.insecure_channel = insecure
Expand All @@ -559,7 +563,12 @@ def _uninstrument(self, **kwargs):


def client_interceptor(
tracer_provider=None, filter_=None, request_hook=None, response_hook=None
tracer_provider=None,
filter_=None,
request_hook=None,
response_hook=None,
meter_provider=None,
target=None,
):
"""Create a gRPC client channel interceptor.

Expand All @@ -570,6 +579,10 @@ def client_interceptor(
matches the condition. Default is None and intercept
all requests.

meter_provider: The meter provider to use for metrics.

target: The target address of the channel (e.g. "host:port").

Returns:
An invocation-side interceptor object.
"""
Expand All @@ -582,15 +595,23 @@ def client_interceptor(
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

meter = get_meter(
__name__,
__version__,
meter_provider,
)

return _client.OpenTelemetryClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
meter=meter,
target=target,
)


def server_interceptor(tracer_provider=None, filter_=None):
def server_interceptor(tracer_provider=None, filter_=None, meter_provider=None):
"""Create a gRPC server interceptor.

Args:
Expand All @@ -600,6 +621,8 @@ def server_interceptor(tracer_provider=None, filter_=None):
matches the condition. Default is None and intercept
all requests.

meter_provider: The meter provider to use for metrics.

Returns:
A service-side interceptor object.
"""
Expand All @@ -612,17 +635,34 @@ def server_interceptor(tracer_provider=None, filter_=None):
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_)
meter = get_meter(
__name__,
__version__,
meter_provider,
)
Comment on lines +638 to +642

return _server.OpenTelemetryServerInterceptor(
tracer, filter_=filter_, meter=meter
)


def aio_client_interceptors(
tracer_provider=None, filter_=None, request_hook=None, response_hook=None
tracer_provider=None,
filter_=None,
request_hook=None,
response_hook=None,
meter_provider=None,
target=None,
):
"""Create a gRPC client channel interceptor.

Args:
tracer: The tracer to use to create client-side spans.

meter_provider: The meter provider to use for metrics.

target: The target address of the channel (e.g. "host:port").

Returns:
An invocation-side interceptor object.
"""
Expand All @@ -635,40 +675,38 @@ def aio_client_interceptors(
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

meter = get_meter(
__name__,
__version__,
meter_provider,
)

common_kwargs = {
"filter_": filter_,
"request_hook": request_hook,
"response_hook": response_hook,
"meter": meter,
"target": target,
}

return [
_aio_client.UnaryUnaryAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
),
_aio_client.UnaryStreamAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
),
_aio_client.StreamUnaryAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
),
_aio_client.UnaryUnaryAioClientInterceptor(tracer, **common_kwargs),
_aio_client.UnaryStreamAioClientInterceptor(tracer, **common_kwargs),
_aio_client.StreamUnaryAioClientInterceptor(tracer, **common_kwargs),
_aio_client.StreamStreamAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
tracer, **common_kwargs
),
]


def aio_server_interceptor(tracer_provider=None, filter_=None):
def aio_server_interceptor(tracer_provider=None, filter_=None, meter_provider=None):
"""Create a gRPC aio server interceptor.

Args:
tracer: The tracer to use to create server-side spans.

meter_provider: The meter provider to use for metrics.

Returns:
A service-side interceptor object.
"""
Expand All @@ -681,8 +719,14 @@ def aio_server_interceptor(tracer_provider=None, filter_=None):
schema_url="https://opentelemetry.io/schemas/1.11.0",
)

meter = get_meter(
__name__,
__version__,
meter_provider,
)

return _aio_server.OpenTelemetryAioServerInterceptor(
tracer, filter_=filter_
tracer, filter_=filter_, meter=meter
)


Expand Down
Loading