diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index ee838a87ef..42886635c7 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -267,25 +267,47 @@ async def serve(): ``filter_`` option also applies to both global and manual client intrumentors. -Environment variable --------------------- +Environment variables +--------------------- -If you'd like to exclude specific services for the instrumentations, you can use -``OTEL_PYTHON_GRPC_EXCLUDED_SERVICES`` environment variables. +``OTEL_PYTHON_GRPC_EXCLUDED_SERVICES`` + Comma-separated list of service names to exclude from instrumentation. + For example, ``"GRPCTestServer,GRPCHealthServer"`` will exclude those services. -For example, if you assign ``"GRPCTestServer,GRPCHealthServer"`` to the variable, -then the global interceptor automatically adds the filters to exclude requests to -services ``GRPCTestServer`` and ``GRPCHealthServer``. +``OTEL_SEMCONV_STABILITY_OPT_IN`` + Controls which version of the RPC semantic conventions the instrumentation + emits. Accepted values (comma-separated): + + - ``rpc`` — emit only the stable (new) RPC conventions. Key changes: + + - ``rpc.system`` → ``rpc.system.name`` + - ``rpc.grpc.status_code`` (int) → ``rpc.response.status_code`` (string, + e.g. ``"OK"``, ``"UNAVAILABLE"``); non-OK codes also set ``error.type`` + - ``rpc.method`` now contains the fully-qualified name + (e.g. ``"helloworld.Greeter/SayHello"``); ``rpc.service`` is removed + - ``net.peer.ip`` / ``net.peer.name`` / ``net.peer.port`` (server spans) + → ``client.address`` / ``client.port`` + + - ``rpc/dup`` — emit both old and new RPC conventions simultaneously, + useful for a phased rollout. + + - *(default, no value)* — continue emitting the old RPC conventions. """ import os -from typing import Callable, Collection, List, Union +from typing import Callable, Collection, List, Optional, Tuple, Union import grpc # pylint:disable=import-self from wrapt import wrap_function_wrapper as _wrap from opentelemetry import trace +from opentelemetry.instrumentation._semconv import ( + _StabilityMode, + _OpenTelemetrySemanticConventionStability, + _OpenTelemetryStabilitySignalType, +) +from opentelemetry.instrumentation.grpc._semconv import _parse_grpc_target from opentelemetry.instrumentation.grpc.filters import ( any_of, negate, @@ -296,6 +318,7 @@ async def serve(): from opentelemetry.instrumentation.grpc.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.semconv.schemas import Schemas # pylint:disable=import-outside-toplevel # pylint:disable=import-self @@ -482,15 +505,17 @@ def _uninstrument(self, **kwargs): def wrapper_fn(self, original_func, instance, args, kwargs): channel = original_func(*args, **kwargs) tracer_provider = kwargs.get("tracer_provider") - request_hook = self._request_hook - response_hook = self._response_hook + target = args[0] if args else kwargs.get("target", "") + host, port = _parse_grpc_target(target) return intercept_channel( channel, client_interceptor( tracer_provider=tracer_provider, filter_=self._filter, - request_hook=request_hook, - response_hook=response_hook, + request_hook=self._request_hook, + response_hook=self._response_hook, + host=host, + port=port, ), ) @@ -522,26 +547,21 @@ def __init__(self, filter_=None): def instrumentation_dependencies(self) -> Collection[str]: return _instruments - def _add_interceptors(self, tracer_provider, kwargs): + def _add_interceptors(self, tracer_provider, args, kwargs): + target = args[0] if args else kwargs.get("target", "") + host, port = _parse_grpc_target(target) + ours = aio_client_interceptors( + tracer_provider=tracer_provider, + filter_=self._filter, + request_hook=self._request_hook, + response_hook=self._response_hook, + host=host, + port=port, + ) if "interceptors" in kwargs and kwargs["interceptors"]: - kwargs["interceptors"] = list(kwargs["interceptors"]) - kwargs["interceptors"] = ( - aio_client_interceptors( - tracer_provider=tracer_provider, - filter_=self._filter, - request_hook=self._request_hook, - response_hook=self._response_hook, - ) - + kwargs["interceptors"] - ) + kwargs["interceptors"] = ours + list(kwargs["interceptors"]) else: - kwargs["interceptors"] = aio_client_interceptors( - tracer_provider=tracer_provider, - filter_=self._filter, - request_hook=self._request_hook, - response_hook=self._response_hook, - ) - + kwargs["interceptors"] = ours return kwargs def _instrument(self, **kwargs): @@ -552,13 +572,11 @@ def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") def insecure(*args, **kwargs): - kwargs = self._add_interceptors(tracer_provider, kwargs) - + kwargs = self._add_interceptors(tracer_provider, args, kwargs) return self._original_insecure(*args, **kwargs) def secure(*args, **kwargs): - kwargs = self._add_interceptors(tracer_provider, kwargs) - + kwargs = self._add_interceptors(tracer_provider, args, kwargs) return self._original_secure(*args, **kwargs) grpc.aio.insecure_channel = insecure @@ -570,7 +588,8 @@ def _uninstrument(self, **kwargs): def client_interceptor( - tracer_provider=None, filter_=None, request_hook=None, response_hook=None + tracer_provider=None, filter_=None, request_hook=None, response_hook=None, + host=None, port=None, ): """Create a gRPC client channel interceptor. @@ -581,16 +600,27 @@ def client_interceptor( matches the condition. Default is None and intercept all requests. + host: Server hostname parsed from the channel target. Used to set + ``server.address`` / ``net.peer.name`` on client spans. + + port: Server port parsed from the channel target. Used to set + ``server.port`` / ``net.peer.port`` on client spans. + Returns: An invocation-side interceptor object. """ from . import _client # noqa: PLC0415 + _OpenTelemetrySemanticConventionStability._initialize() + sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.RPC + ) + tracer = trace.get_tracer( __name__, __version__, tracer_provider, - schema_url="https://opentelemetry.io/schemas/1.11.0", + schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode), ) return _client.OpenTelemetryClientInterceptor( @@ -598,6 +628,9 @@ def client_interceptor( filter_=filter_, request_hook=request_hook, response_hook=response_hook, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, + host=host, + port=port, ) @@ -616,61 +649,65 @@ def server_interceptor(tracer_provider=None, filter_=None): """ from . import _server # noqa: PLC0415 + _OpenTelemetrySemanticConventionStability._initialize() + sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.RPC + ) + tracer = trace.get_tracer( __name__, __version__, tracer_provider, - schema_url="https://opentelemetry.io/schemas/1.11.0", + schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode), ) - return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_) + return _server.OpenTelemetryServerInterceptor( + tracer, filter_=filter_, sem_conv_opt_in_mode=sem_conv_opt_in_mode + ) def aio_client_interceptors( - tracer_provider=None, filter_=None, request_hook=None, response_hook=None + tracer_provider=None, filter_=None, request_hook=None, response_hook=None, + host=None, port=None, ): """Create a gRPC client channel interceptor. Args: tracer: The tracer to use to create client-side spans. + host: Server hostname parsed from the channel target. + port: Server port parsed from the channel target. + Returns: An invocation-side interceptor object. """ from . import _aio_client # noqa: PLC0415 + _OpenTelemetrySemanticConventionStability._initialize() + sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.RPC + ) + tracer = trace.get_tracer( __name__, __version__, tracer_provider, - schema_url="https://opentelemetry.io/schemas/1.11.0", + schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode), ) + interceptor_kwargs = dict( + filter_=filter_, + request_hook=request_hook, + response_hook=response_hook, + sem_conv_opt_in_mode=sem_conv_opt_in_mode, + host=host, + port=port, + ) return [ - _aio_client.UnaryUnaryAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ), - _aio_client.UnaryStreamAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ), - _aio_client.StreamUnaryAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ), - _aio_client.StreamStreamAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ), + _aio_client.UnaryUnaryAioClientInterceptor(tracer, **interceptor_kwargs), + _aio_client.UnaryStreamAioClientInterceptor(tracer, **interceptor_kwargs), + _aio_client.StreamUnaryAioClientInterceptor(tracer, **interceptor_kwargs), + _aio_client.StreamStreamAioClientInterceptor(tracer, **interceptor_kwargs), ] @@ -685,15 +722,20 @@ def aio_server_interceptor(tracer_provider=None, filter_=None): """ from . import _aio_server # noqa: PLC0415 + _OpenTelemetrySemanticConventionStability._initialize() + sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.RPC + ) + tracer = trace.get_tracer( __name__, __version__, tracer_provider, - schema_url="https://opentelemetry.io/schemas/1.11.0", + schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode), ) return _aio_server.OpenTelemetryAioServerInterceptor( - tracer, filter_=filter_ + tracer, filter_=filter_, sem_conv_opt_in_mode=sem_conv_opt_in_mode ) @@ -715,3 +757,9 @@ def _parse_services(excluded_services: str) -> List[str]: else: excluded_service_list = [] return excluded_service_list + +def _get_rpc_schema_url(mode: _StabilityMode) -> str: + if mode is _StabilityMode.DEFAULT: + return "https://opentelemetry.io/schemas/1.11.0" + # TODO: update to 1.40.0 + return Schemas.V1_38_0.value diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index 684693bea0..fa7a31c08f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -18,36 +18,23 @@ import grpc from grpc.aio import ClientCallDetails, Metadata +from opentelemetry import trace +from opentelemetry.instrumentation.grpc._semconv import _apply_grpc_status from opentelemetry.instrumentation.grpc._client import ( OpenTelemetryClientInterceptor, _carrier_setter, ) from opentelemetry.instrumentation.utils import is_instrumentation_enabled from opentelemetry.propagate import inject -from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( - RPC_GRPC_STATUS_CODE, -) -from opentelemetry.trace.status import Status, StatusCode logger = logging.getLogger(__name__) -def _unary_done_callback(span, code, details, response_hook): +def _unary_done_callback(span, code, details, response_hook, sem_conv_opt_in_mode): def callback(call): try: - span.set_attribute( - RPC_GRPC_STATUS_CODE, - code.value[0], - ) - if code != grpc.StatusCode.OK: - span.set_status( - Status( - status_code=StatusCode.ERROR, - description=details, - ) - ) + _apply_grpc_status(span, code, trace.SpanKind.CLIENT, sem_conv_opt_in_mode, details) response_hook(span, details) - finally: span.end() @@ -73,21 +60,6 @@ def propagate_trace_in_details(client_call_details: ClientCallDetails): client_call_details.wait_for_ready, ) - @staticmethod - def add_error_details_to_span(span, exc): - if isinstance(exc, grpc.RpcError): - span.set_attribute( - RPC_GRPC_STATUS_CODE, - exc.code().value[0], - ) - span.set_status( - Status( - status_code=StatusCode.ERROR, - description=f"{type(exc).__name__}: {exc}", - ) - ) - span.record_exception(exc) - def _start_interceptor_span(self, method): # method _should_ be a string here but due to a bug in grpc, it is # populated with a bytes object. Handle both cases such that we @@ -116,7 +88,11 @@ async def _wrap_unary_response(self, continuation, span): call.add_done_callback( _unary_done_callback( - span, code, details, self._call_response_hook + span, + code, + details, + self._call_response_hook, + self._sem_conv_opt_in_mode, ) ) @@ -131,6 +107,8 @@ async def _wrap_stream_response(self, span, call): if self._response_hook: self._call_response_hook(span, response) yield response + code = await call.code() + _apply_grpc_status(span, code, trace.SpanKind.CLIENT, self._sem_conv_opt_in_mode) except Exception as exc: self.add_error_details_to_span(span, exc) raise exc diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index 73a7c16c08..f460066ab5 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -16,48 +16,34 @@ import grpc.aio import wrapt -from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( - RPC_GRPC_STATUS_CODE, +from opentelemetry import trace +from opentelemetry.instrumentation._semconv import _report_new +from opentelemetry.instrumentation.grpc._semconv import ( + _apply_grpc_status, + _apply_server_error, ) from ._server import OpenTelemetryServerInterceptor, _wrap_rpc_behavior -from ._utilities import _server_status # pylint:disable=abstract-method class _OpenTelemetryAioServicerContext(wrapt.ObjectProxy): - def __init__(self, servicer_context, active_span): + def __init__(self, servicer_context): super().__init__(servicer_context) - self._self_active_span = active_span - self._self_code = grpc.StatusCode.OK + self._self_code = None self._self_details = None async def abort(self, code, details="", trailing_metadata=tuple()): self._self_code = code self._self_details = details - self._self_active_span.set_attribute( - RPC_GRPC_STATUS_CODE, code.value[0] - ) - status = _server_status(code, details) - self._self_active_span.set_status(status) return await self.__wrapped__.abort(code, details, trailing_metadata) def set_code(self, code): self._self_code = code - details = self._self_details or code.value[1] - self._self_active_span.set_attribute( - RPC_GRPC_STATUS_CODE, code.value[0] - ) - if code != grpc.StatusCode.OK: - status = _server_status(code, details) - self._self_active_span.set_status(status) return self.__wrapped__.set_code(code) def set_details(self, details): self._self_details = details - if self._self_code != grpc.StatusCode.OK: - status = _server_status(self._self_code, details) - self._self_active_span.set_status(status) return self.__wrapped__.set_details(details) @@ -93,9 +79,16 @@ def telemetry_wrapper(behavior, request_streaming, response_streaming): handler_call_details, ) - next_handler = await continuation(handler_call_details) - - return _wrap_rpc_behavior(next_handler, telemetry_wrapper) + handler = await continuation(handler_call_details) + if handler is None: + if _report_new(self._sem_conv_opt_in_mode): + async def _unimplemented(_request, context): + self._handle_unimplemented(handler_call_details, context) + # TODO: I still don't like it, figure out how not to + # change server behavior. + return grpc.unary_unary_rpc_method_handler(_unimplemented) + return None + return _wrap_rpc_behavior(handler, telemetry_wrapper) def _intercept_aio_server_unary(self, behavior, handler_call_details): async def _unary_interceptor(request_or_iterator, context): @@ -105,20 +98,26 @@ async def _unary_interceptor(request_or_iterator, context): context, set_status_on_exception=False, ) as span: + self._set_peer_attributes(span, context) # wrap the context - context = _OpenTelemetryAioServicerContext(context, span) + context = _OpenTelemetryAioServicerContext( + context + ) # And now we run the actual RPC. try: - return await behavior(request_or_iterator, context) + result = await behavior(request_or_iterator, context) + _apply_grpc_status( + span, context._self_code, trace.SpanKind.SERVER, + self._sem_conv_opt_in_mode, context._self_details, + ) + return result except Exception as error: - # Bare exceptions are likely to be gRPC aborts, which - # we handle in our context wrapper. - # Here, we're interested in uncaught exceptions. - # pylint:disable=unidiomatic-typecheck - if type(error) != Exception: # noqa: E721 - span.record_exception(error) + _apply_server_error( + span, error, context._self_code, context._self_details, + self._sem_conv_opt_in_mode, + ) raise error return _unary_interceptor @@ -131,18 +130,26 @@ async def _stream_interceptor(request_or_iterator, context): context, set_status_on_exception=False, ) as span: - context = _OpenTelemetryAioServicerContext(context, span) + self._set_peer_attributes(span, context) + context = _OpenTelemetryAioServicerContext( + context + ) try: async for response in behavior( request_or_iterator, context ): yield response + _apply_grpc_status( + span, context._self_code, trace.SpanKind.SERVER, + self._sem_conv_opt_in_mode, context._self_details, + ) except Exception as error: - # pylint:disable=unidiomatic-typecheck - if type(error) != Exception: # noqa: E721 - span.record_exception(error) + _apply_server_error( + span, error, context._self_code, context._self_details, + self._sem_conv_opt_in_mode, + ) raise error return _stream_interceptor diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index bbeb4ec1d9..ba2a2e5a8c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -26,18 +26,18 @@ import grpc from opentelemetry import trace +from opentelemetry.instrumentation._semconv import _StabilityMode +from opentelemetry.instrumentation.grpc._semconv import ( + _add_error_details_to_span, + _apply_grpc_status, + _set_rpc_method, + _set_rpc_system, + _set_server_address_port, +) from opentelemetry.instrumentation.grpc import grpcext -from opentelemetry.instrumentation.grpc._utilities import RpcInfo from opentelemetry.instrumentation.utils import is_instrumentation_enabled from opentelemetry.propagate import inject from opentelemetry.propagators.textmap import Setter -from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( - RPC_GRPC_STATUS_CODE, - RPC_METHOD, - RPC_SERVICE, - RPC_SYSTEM, -) -from opentelemetry.trace.status import Status, StatusCode logger = logging.getLogger(__name__) @@ -54,15 +54,12 @@ def set(self, carrier: MutableMapping[str, str], key: str, value: str): _carrier_setter = _CarrierSetter() -def _make_future_done_callback(span, rpc_info): +def _make_future_done_callback(span, sem_conv_opt_in_mode): def callback(response_future): with trace.use_span(span, end_on_exit=True): code = response_future.code() - if code != grpc.StatusCode.OK: - rpc_info.error = code - return - response = response_future.result() - rpc_info.response = response + details = response_future.details() + _apply_grpc_status(span, code, trace.SpanKind.CLIENT, sem_conv_opt_in_mode, details) return callback @@ -82,21 +79,31 @@ class OpenTelemetryClientInterceptor( grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor ): def __init__( - self, tracer, filter_=None, request_hook=None, response_hook=None + self, + tracer, + filter_=None, + request_hook=None, + response_hook=None, + sem_conv_opt_in_mode=_StabilityMode.DEFAULT, + host=None, + port=None, ): self._tracer = tracer self._filter = filter_ self._request_hook = request_hook self._response_hook = response_hook + self._sem_conv_opt_in_mode = sem_conv_opt_in_mode + self._host = host + self._port = port + + def add_error_details_to_span(self, span, exc): + _add_error_details_to_span(span, exc, trace.SpanKind.CLIENT, self._sem_conv_opt_in_mode) def _start_span(self, method, **kwargs): - service, meth = method.lstrip("/").split("/", 1) - attributes = { - RPC_SYSTEM: "grpc", - RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], - RPC_METHOD: meth, - RPC_SERVICE: service, - } + attributes = {} + _set_rpc_system(attributes, "grpc", self._sem_conv_opt_in_mode) + _set_rpc_method(attributes, method, self._sem_conv_opt_in_mode) + _set_server_address_port(attributes, self._host, self._port, self._sem_conv_opt_in_mode) return self._tracer.start_as_current_span( name=method, @@ -105,24 +112,29 @@ def _start_span(self, method, **kwargs): **kwargs, ) - # pylint:disable=no-self-use - def _trace_result(self, span, rpc_info, result): + def _trace_result(self, span, result): # If the RPC is called asynchronously, add a callback to end the span # when the future is done, else end the span immediately if isinstance(result, grpc.Future): result.add_done_callback( - _make_future_done_callback(span, rpc_info) + _make_future_done_callback(span, self._sem_conv_opt_in_mode) ) return result - response = result # Handle the case when the RPC is initiated via the with_call - # method and the result is a tuple with the first element as the - # response. + # method and the result is a tuple of (response, call). # http://www.grpc.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.with_call if isinstance(result, tuple): - response = result[0] - rpc_info.response = response - if self._response_hook: + response, call = result[0], result[1] + code = call.code() + details = call.details() + else: + # Defensive fallback: should not be reached when using grpcext + # interceptors (which always use with_call), keeping it just in case + response = result + code = grpc.StatusCode.OK + details = None + _apply_grpc_status(span, code, trace.SpanKind.CLIENT, self._sem_conv_opt_in_mode, details) + if self._response_hook and response is not None: self._call_response_hook(span, response) span.end() return result @@ -141,38 +153,17 @@ def _intercept(self, request, metadata, client_info, invoker): record_exception=False, set_status_on_exception=False, ) as span: - result = None try: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) - - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request, - ) if self._request_hook: self._call_request_hook(span, request) result = invoker(request, metadata) except Exception as exc: - if isinstance(exc, grpc.RpcError): - span.set_attribute( - RPC_GRPC_STATUS_CODE, - exc.code().value[0], - ) - span.set_status( - Status( - status_code=StatusCode.ERROR, - description=f"{type(exc).__name__}: {exc}", - ) - ) - span.record_exception(exc) - raise exc - finally: - if result is None: - span.end() - return self._trace_result(span, rpc_info, result) + self.add_error_details_to_span(span, exc) + span.end() + raise + return self._trace_result(span, result) def _call_request_hook(self, span, request): if not callable(self._request_hook): @@ -203,20 +194,13 @@ def _intercept_server_stream( with self._start_span(client_info.full_method) as span: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - ) - - if client_info.is_client_stream: - rpc_info.request = request_or_iterator try: - yield from invoker(request_or_iterator, metadata) + call = invoker(request_or_iterator, metadata) + yield from call + _apply_grpc_status(span, call.code(), trace.SpanKind.CLIENT, self._sem_conv_opt_in_mode) except grpc.RpcError as err: - span.set_status(Status(StatusCode.ERROR)) - span.set_attribute(RPC_GRPC_STATUS_CODE, err.code().value[0]) + self.add_error_details_to_span(span, err) raise err def intercept_stream( diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_semconv.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_semconv.py new file mode 100644 index 0000000000..26abfd0718 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_semconv.py @@ -0,0 +1,351 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""gRPC-specific semantic convention helpers for the stability migration.""" + +from typing import MutableMapping, Optional + +import grpc + +from opentelemetry import trace +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.instrumentation._semconv import ( + _StabilityMode, + _report_new, + _report_old, + set_int_attribute, + set_string_attribute, +) +from opentelemetry.semconv._incubating.attributes.net_attributes import ( + NET_PEER_IP, + NET_PEER_NAME, + NET_PEER_PORT, +) +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_GRPC_STATUS_CODE, + RPC_METHOD, + RPC_SERVICE, + RPC_SYSTEM, +) +from opentelemetry.semconv.attributes.error_attributes import ( + ERROR_TYPE, + ErrorTypeValues, +) +from opentelemetry.semconv.attributes.network_attributes import ( + NETWORK_PEER_ADDRESS, + NETWORK_PEER_PORT, +) +from opentelemetry.semconv.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) + +from opentelemetry.util.types import AttributeValue + +# New stable RPC attribute names. Not yet published as stable constants in the +# opentelemetry-semantic-conventions package because the stable RPC conventions +# are still a work in progress. +RPC_SYSTEM_NAME = "rpc.system.name" +RPC_RESPONSE_STATUS_CODE = "rpc.response.status_code" +RPC_METHOD_ORIGINAL = "rpc.method_original" + +# gRPC status codes that are considered errors on the server side under the +# new stable RPC conventions. See: +# https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md +_GRPC_SERVER_ERROR_STATUS_CODES = frozenset({ + grpc.StatusCode.UNKNOWN, + grpc.StatusCode.DEADLINE_EXCEEDED, + grpc.StatusCode.UNIMPLEMENTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.DATA_LOSS, +}) + + +def _apply_grpc_status( + span, + code: Optional[grpc.StatusCode], + span_kind: trace.SpanKind, + sem_conv_opt_in_mode: _StabilityMode, + description: Optional[str] = None, +) -> None: + """Set gRPC status code attributes and update the span status. + + Handles both old and new semantic convention modes. Error criterion: + + * Client (SpanKind.CLIENT): any non-OK code is an error. + * Server (SpanKind.SERVER): only the subset defined in + _GRPC_SERVER_ERROR_STATUS_CODES. + """ + if code is None: + code = grpc.StatusCode.OK + if _report_old(sem_conv_opt_in_mode): + span.set_attribute(RPC_GRPC_STATUS_CODE, code.value[0]) + if _report_new(sem_conv_opt_in_mode): + span.set_attribute(RPC_RESPONSE_STATUS_CODE, code.name) + + is_error = ( + code in _GRPC_SERVER_ERROR_STATUS_CODES + if span_kind == trace.SpanKind.SERVER + else code != grpc.StatusCode.OK + ) + + if is_error: + if _report_new(sem_conv_opt_in_mode): + span.set_attribute(ERROR_TYPE, code.name if code else ErrorTypeValues.OTHER) + status_description = f"{code}:{description}" if description else str(code) + span.set_status(Status(StatusCode.ERROR, description=status_description)) + + +def _add_error_details_to_span(span, exc, span_kind, sem_conv_opt_in_mode): + """Record exception details on a span. + + Sets the span status to ERROR with a description, sets error.type + (new semconv only), and records the exception (old semconv only). + + For grpc.RpcError, delegates to _apply_grpc_status which handles + the gRPC status code attributes. For all other exceptions, sets + error.type to the fully-qualified exception class name. + """ + if _report_new(sem_conv_opt_in_mode): + description = str(exc) + else: + description = f"{type(exc).__name__}: {exc}" + + if isinstance(exc, grpc.RpcError): + _apply_grpc_status(span, exc.code(), span_kind, sem_conv_opt_in_mode, description) + else: + span.set_status(Status(StatusCode.ERROR, description=description)) + if _report_new(sem_conv_opt_in_mode): + span.set_attribute(ERROR_TYPE, type(exc).__qualname__) + + if _report_old(sem_conv_opt_in_mode): + span.record_exception(exc) + + +def _apply_server_error(span, exc, code, details, sem_conv_opt_in_mode): + """Handle span status for a server-side exception. + + If a gRPC status code was explicitly set (via abort/set_code), apply it. + Otherwise record the unexpected exception details. + """ + if code is not None: + _apply_grpc_status(span, code, trace.SpanKind.SERVER, sem_conv_opt_in_mode, details) + else: + _add_error_details_to_span(span, exc, trace.SpanKind.SERVER, sem_conv_opt_in_mode) + + +def _set_rpc_system( + result: MutableMapping[str, AttributeValue], + system: str, + sem_conv_opt_in_mode: _StabilityMode, +) -> None: + """Set rpc.system (old) or rpc.system.name (new).""" + if _report_old(sem_conv_opt_in_mode): + result[RPC_SYSTEM] = system + if _report_new(sem_conv_opt_in_mode): + result[RPC_SYSTEM_NAME] = system + + +def _set_rpc_method( + result: MutableMapping[str, AttributeValue], + full_method: str, + sem_conv_opt_in_mode: _StabilityMode, +) -> None: + """Set rpc.method (and rpc.service in old mode) from a full method path. + + ``full_method`` is the raw gRPC path, e.g. ``/helloworld.Greeter/SayHello`` + (leading slash is stripped automatically). + + Old: rpc.method = bare method name, rpc.service = package-qualified service + New: rpc.method = fully-qualified "Service/Method" (no rpc.service) + """ + if _report_new(sem_conv_opt_in_mode): + set_string_attribute(result, RPC_METHOD, full_method) + if _report_old(sem_conv_opt_in_mode): + stripped = full_method.lstrip("/") + if "/" in stripped: + service, method = stripped.split("/", 1) + set_string_attribute(result, RPC_METHOD, method) + set_string_attribute(result, RPC_SERVICE, service) + else: + set_string_attribute(result, RPC_METHOD, stripped) + + +def _set_rpc_grpc_status_code( + result: MutableMapping[str, AttributeValue], + code_int: int, + code_str: str, + sem_conv_opt_in_mode: _StabilityMode, +) -> None: + """Set gRPC status code attribute. + + Old: rpc.grpc.status_code (integer, e.g. 0 for OK) + New: rpc.response.status_code (string, e.g. "OK"); non-OK also sets error.type + """ + if _report_old(sem_conv_opt_in_mode): + result[RPC_GRPC_STATUS_CODE] = code_int + if _report_new(sem_conv_opt_in_mode): + result[RPC_RESPONSE_STATUS_CODE] = code_str + if code_int != 0: # non-OK status + result[ERROR_TYPE] = code_str + + +def _set_rpc_peer_ip_server( + result: MutableMapping[str, AttributeValue], + ip: str, + sem_conv_opt_in_mode: _StabilityMode, +) -> None: + """Set the client's IP address on a gRPC server span. + + Old: net.peer.ip + New: network.peer.address + """ + if _report_old(sem_conv_opt_in_mode): + set_string_attribute(result, NET_PEER_IP, ip) + if _report_new(sem_conv_opt_in_mode): + set_string_attribute(result, NETWORK_PEER_ADDRESS, ip) + + +def _set_rpc_peer_port_server( + result: MutableMapping[str, AttributeValue], + port: str, + sem_conv_opt_in_mode: _StabilityMode, +) -> None: + """Set the client's port on a gRPC server span. + + Old: net.peer.port (string) + New: network.peer.port (int) + """ + if _report_old(sem_conv_opt_in_mode): + result[NET_PEER_PORT] = port + if _report_new(sem_conv_opt_in_mode): + set_int_attribute(result, NETWORK_PEER_PORT, port) + + +def _set_rpc_peer_name_server( + result: MutableMapping[str, AttributeValue], + name: str, + sem_conv_opt_in_mode: _StabilityMode, +) -> None: + """Set the client's hostname on a gRPC server span. + + Old: net.peer.name + New: network.peer.address (only if not already set by _set_rpc_peer_ip_server) + """ + if _report_old(sem_conv_opt_in_mode): + set_string_attribute(result, NET_PEER_NAME, name) + if _report_new(sem_conv_opt_in_mode): + if not result.get(NETWORK_PEER_ADDRESS): + set_string_attribute(result, NETWORK_PEER_ADDRESS, name) + + +def _parse_grpc_target(target: str) -> tuple: + """Parse a gRPC target string into a ``(server.address, server.port)`` 2-tuple. + + Follows the OTel RPC semantic convention for ``server.address``/ + ``server.port`` on client spans: + + * ``"host:port"`` / ``"[ipv6]:port"`` — bare DNS address, parse normally + * ``"dns://[resolver]/host:port"`` — DNS URI, skip resolver, parse endpoint + * ``"unix:path"`` / ``"unix:///path"`` — socket path becomes + ``server.address``, no port + * ``"ipv4:..."`` / ``"ipv6:..."`` — ``scheme:addr`` format (no ``//``), + may list multiple addresses; whole target becomes ``server.address``, no port + * ``"scheme://..."`` with an unknown scheme (e.g. ``"zk://..."``) — whole + target becomes ``server.address``, no port + + Returns ``(address, port)`` where *port* is an :class:`int` or ``None``. + """ + if not target: + return None, None + + # Unix domain sockets: server.address = the socket path, no port. + # Handles both "unix:path" and "unix:///path" (URI form). + if target.startswith("unix:") or target.startswith("unix-abstract:"): + prefix = "unix:" if target.startswith("unix:") else "unix-abstract:" + path = target[len(prefix):] + # Strip the authority component from URI form ("//[authority]/path") + if path.startswith("//"): + slash = path.find("/", 2) # first "/" after the leading "//" + path = path[slash:] if slash != -1 else "" + return path or None, None + + # ipv4/ipv6 scheme can list multiple addresses; no single low-cardinality + # identifier can be derived → return the whole target string, no port. + if target.startswith("ipv4:") or target.startswith("ipv6:"): + return target, None + + # URI form: scheme://[authority]/endpoint + if "://" in target: + scheme, _, rest = target.partition("://") + if scheme == "dns": + # dns://[resolver]/host:port — endpoint follows the authority + slash = rest.find("/") + endpoint = rest[slash + 1:] if slash != -1 else rest + return _parse_host_port(endpoint) + # Unknown URI scheme — cannot determine a low-cardinality identifier. + return target, None + + # Plain "host:port" (implicit DNS) + return _parse_host_port(target) + + +def _parse_host_port(target: str) -> tuple: + """Parse a bare ``"host:port"`` or ``"[ipv6]:port"`` string.""" + if not target: + return None, None + + # IPv6 literal: "[::1]:50051" + if target.startswith("["): + bracket_end = target.find("]") + if bracket_end == -1: + return None, None + host = target[1:bracket_end] + remainder = target[bracket_end + 1:] + if remainder.startswith(":"): + try: + return host, int(remainder[1:]) + except ValueError: + pass + return host, None + + # Plain "host:port" or bare "host" + if ":" in target: + host, _, port_str = target.rpartition(":") + try: + return host, int(port_str) + except ValueError: + pass + + return target, None + + +def _set_server_address_port( + result: MutableMapping[str, AttributeValue], + host: Optional[str], + port: Optional[int], + sem_conv_opt_in_mode: _StabilityMode, +) -> None: + """Set server address/port attributes on a gRPC client span. + + New only: server.address + server.port + (The old implementation does not set these.) + """ + if not _report_new(sem_conv_opt_in_mode): + return + if host: + result[SERVER_ADDRESS] = host + if port is not None: + result[SERVER_PORT] = port diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 1d744656de..408370bf05 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -29,20 +29,22 @@ from opentelemetry import trace from opentelemetry.context import attach, detach -from opentelemetry.propagate import extract -from opentelemetry.semconv._incubating.attributes.net_attributes import ( - NET_PEER_IP, - NET_PEER_NAME, - NET_PEER_PORT, +from opentelemetry.instrumentation._semconv import ( + _StabilityMode, + _report_old, + _report_new, ) -from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( - RPC_GRPC_STATUS_CODE, - RPC_METHOD, - RPC_SERVICE, - RPC_SYSTEM, +from opentelemetry.instrumentation.grpc._semconv import ( + RPC_METHOD_ORIGINAL, + _apply_grpc_status, + _apply_server_error, + _set_rpc_method, + _set_rpc_peer_ip_server, + _set_rpc_peer_name_server, + _set_rpc_peer_port_server, + _set_rpc_system, ) - -from ._utilities import _server_status +from opentelemetry.propagate import extract logger = logging.getLogger(__name__) @@ -77,10 +79,9 @@ def _wrap_rpc_behavior(handler, continuation): # pylint:disable=abstract-method class _OpenTelemetryServicerContext(grpc.ServicerContext): - def __init__(self, servicer_context, active_span): + def __init__(self, servicer_context): self._servicer_context = servicer_context - self._active_span = active_span - self._code = grpc.StatusCode.OK + self._code = None self._details = None super().__init__() @@ -132,12 +133,11 @@ def trailing_metadata(self): def abort(self, code, details): self._code = code self._details = details - self._active_span.set_attribute(RPC_GRPC_STATUS_CODE, code.value[0]) - status = _server_status(code, details) - self._active_span.set_status(status) return self._servicer_context.abort(code, details) def abort_with_status(self, status): + self._code = status.code + self._details = status.details return self._servicer_context.abort_with_status(status) def code(self): @@ -157,19 +157,10 @@ def details(self): def set_code(self, code): self._code = code - # use details if we already have it, otherwise the status description - details = self._details or code.value[1] - self._active_span.set_attribute(RPC_GRPC_STATUS_CODE, code.value[0]) - if code != grpc.StatusCode.OK: - status = _server_status(code, details) - self._active_span.set_status(status) return self._servicer_context.set_code(code) def set_details(self, details): self._details = details - if self._code != grpc.StatusCode.OK: - status = _server_status(self._code, details) - self._active_span.set_status(status) return self._servicer_context.set_details(details) @@ -195,9 +186,15 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): """ - def __init__(self, tracer, filter_=None): + def __init__( + self, + tracer, + filter_=None, + sem_conv_opt_in_mode=_StabilityMode.DEFAULT, + ): self._tracer = tracer self._filter = filter_ + self._sem_conv_opt_in_mode = sem_conv_opt_in_mode @contextmanager def _set_remote_context(self, servicer_context): @@ -217,59 +214,22 @@ def _set_remote_context(self, servicer_context): def _start_span( self, handler_call_details, context, set_status_on_exception=False ): - # standard attributes - attributes = { - RPC_SYSTEM: "grpc", - RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], - } + attributes = {} + + _set_rpc_system(attributes, "grpc", self._sem_conv_opt_in_mode) - # if we have details about the call, split into service and method if handler_call_details.method: - service, method = handler_call_details.method.lstrip("/").split( - "/", 1 + _set_rpc_method( + attributes, + handler_call_details.method, + self._sem_conv_opt_in_mode, ) - attributes.update( - { - RPC_METHOD: method, - RPC_SERVICE: service, - } - ) - - # add some attributes from the metadata - metadata = dict(context.invocation_metadata()) - if "user-agent" in metadata: - attributes["rpc.user_agent"] = metadata["user-agent"] - - # Split up the peer to keep with how other telemetry sources - # do it. This looks like: - # * ipv6:[::1]:57284 - # * ipv4:127.0.0.1:57284 - # * ipv4:10.2.1.1:57284,127.0.0.1:57284 - # - if not context.peer().startswith("unix:"): - try: - ip, port = ( - context.peer() - .split(",")[0] - .split(":", 1)[1] - .rsplit(":", 1) - ) - ip = unquote(ip) - attributes.update( - { - NET_PEER_IP: ip, - NET_PEER_PORT: port, - } - ) - # other telemetry sources add this, so we will too - if ip in ("[::1]", "127.0.0.1"): - attributes[NET_PEER_NAME] = "localhost" - - except IndexError: - logger.warning( - "Failed to parse peer address '%s'", context.peer() - ) + if _report_old(self._sem_conv_opt_in_mode): + # add some attributes from the metadata + metadata = dict(context.invocation_metadata()) + if "user-agent" in metadata: + attributes["rpc.user_agent"] = metadata["user-agent"] return self._tracer.start_as_current_span( name=handler_call_details.method, @@ -278,6 +238,51 @@ def _start_span( set_status_on_exception=set_status_on_exception, ) + def _handle_unimplemented(self, handler_call_details, context): + with self._set_remote_context(context): + attributes = {} + _set_rpc_system(attributes, "grpc", self._sem_conv_opt_in_mode) + attributes["rpc.method"] = "_OTHER" + if handler_call_details.method: + attributes[RPC_METHOD_ORIGINAL] = handler_call_details.method + with self._tracer.start_as_current_span( + name="_OTHER", + kind=trace.SpanKind.SERVER, + attributes=attributes, + ) as span: + self._set_peer_attributes(span, context) + _apply_grpc_status( + span, + grpc.StatusCode.UNIMPLEMENTED, + trace.SpanKind.SERVER, + self._sem_conv_opt_in_mode, + ) + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + + def _set_peer_attributes(self, span, context): + # Split up the peer to keep with how other telemetry sources + # do it. This looks like: + # * ipv6:[::1]:57284 + # * ipv4:127.0.0.1:57284 + # * ipv4:10.2.1.1:57284,127.0.0.1:57284 + if not span.is_recording(): + return + peer = context.peer() + if peer.startswith("unix:"): + return + try: + ip, port = peer.split(",")[0].split(":", 1)[1].rsplit(":", 1) + ip = unquote(ip) + attrs = {} + _set_rpc_peer_ip_server(attrs, ip, self._sem_conv_opt_in_mode) + _set_rpc_peer_port_server(attrs, port, self._sem_conv_opt_in_mode) + if ip in ("[::1]", "127.0.0.1"): + _set_rpc_peer_name_server(attrs, "localhost", self._sem_conv_opt_in_mode) + for k, v in attrs.items(): + span.set_attribute(k, v) + except IndexError: + logger.warning("Failed to parse peer address '%s'", peer) + def intercept_service(self, continuation, handler_call_details): if self._filter is not None and not self._filter(handler_call_details): return continuation(handler_call_details) @@ -299,27 +304,37 @@ def telemetry_interceptor(request_or_iterator, context): context, set_status_on_exception=False, ) as span: + self._set_peer_attributes(span, context) # wrap the context - context = _OpenTelemetryServicerContext(context, span) + context = _OpenTelemetryServicerContext( + context + ) # And now we run the actual RPC. try: - return behavior(request_or_iterator, context) - + result = behavior(request_or_iterator, context) + _apply_grpc_status( + span, context._code, trace.SpanKind.SERVER, + self._sem_conv_opt_in_mode, context._details, + ) + return result except Exception as error: - # Bare exceptions are likely to be gRPC aborts, which - # we handle in our context wrapper. - # Here, we're interested in uncaught exceptions. - # pylint:disable=unidiomatic-typecheck - if type(error) != Exception: # noqa: E721 - span.record_exception(error) + _apply_server_error( + span, error, context._code, context._details, + self._sem_conv_opt_in_mode, + ) raise error return telemetry_interceptor - return _wrap_rpc_behavior( - continuation(handler_call_details), telemetry_wrapper - ) + handler = continuation(handler_call_details) + if handler is None: + if _report_new(self._sem_conv_opt_in_mode): + def _unimplemented(_request, context): + self._handle_unimplemented(handler_call_details, context) + return grpc.unary_unary_rpc_method_handler(_unimplemented) + return None + return _wrap_rpc_behavior(handler, telemetry_wrapper) # Handle streaming responses separately - we have to do this # to return a *new* generator or various upstream things @@ -331,13 +346,21 @@ def _intercept_server_stream( with self._start_span( handler_call_details, context, set_status_on_exception=False ) as span: - context = _OpenTelemetryServicerContext(context, span) + self._set_peer_attributes(span, context) + context = _OpenTelemetryServicerContext( + context + ) try: yield from behavior(request_or_iterator, context) - + _apply_grpc_status( + span, context._code, trace.SpanKind.SERVER, + self._sem_conv_opt_in_mode, context._details, + ) except Exception as error: - # pylint:disable=unidiomatic-typecheck - if type(error) != Exception: # noqa: E721 - span.record_exception(error) + _apply_server_error( + span, error, context._code, context._details, + self._sem_conv_opt_in_mode, + ) raise error + diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py deleted file mode 100644 index 8a6365b742..0000000000 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Internal utilities.""" - -import grpc - -from opentelemetry.trace.status import Status, StatusCode - - -class RpcInfo: - def __init__( - self, - full_method=None, - metadata=None, - timeout=None, - request=None, - response=None, - error=None, - ): - self.full_method = full_method - self.metadata = metadata - self.timeout = timeout - self.request = request - self.response = response - self.error = error - - -def _server_status(code, details): - error_status = Status( - status_code=StatusCode.ERROR, description=f"{code}:{details}" - ) - status_codes = { - grpc.StatusCode.UNKNOWN: error_status, - grpc.StatusCode.DEADLINE_EXCEEDED: error_status, - grpc.StatusCode.UNIMPLEMENTED: error_status, - grpc.StatusCode.INTERNAL: error_status, - grpc.StatusCode.UNAVAILABLE: error_status, - grpc.StatusCode.DATA_LOSS: error_status, - } - - return status_codes.get( - code, Status(status_code=StatusCode.UNSET, description="") - ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py index c7eec06c99..5e223a1121 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py @@ -165,7 +165,7 @@ def __call__( compression=None, ): def invoker(request_iterator, metadata): - return self._base_callable( + return self._base_callable.with_call( request_iterator, timeout, metadata, @@ -175,9 +175,10 @@ def invoker(request_iterator, metadata): ) client_info = _StreamClientInfo(self._method, True, False, timeout) - return self._interceptor.intercept_stream( + result = self._interceptor.intercept_stream( request_iterator, metadata, client_info, invoker ) + return result[0] if isinstance(result, tuple) else result def with_call( self, diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_aio_client.py index 6c0b8eac21..1eda9002f9 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_aio_client.py @@ -36,22 +36,28 @@ def request_messages(): return await stub.ClientStreamingMethod(request_messages()) -def server_streaming_method(stub, error=False): - request = Request( - client_id=CLIENT_ID, request_data="error" if error else "data" - ) - +def server_streaming_method(stub, error=False, error_mid_stream=False): + if error: + request_data = "error" + elif error_mid_stream: + request_data = "error_mid_stream" + else: + request_data = "data" + request = Request(client_id=CLIENT_ID, request_data=request_data) return stub.ServerStreamingMethod(request, metadata=(("key", "value"),)) -def bidirectional_streaming_method(stub, error=False): - # create a generator +def bidirectional_streaming_method(stub, error=False, error_mid_stream=False): + if error: + request_data = "error" + elif error_mid_stream: + request_data = "error_mid_stream" + else: + request_data = "data" + def request_messages(): for _ in range(5): - request = Request( - client_id=CLIENT_ID, request_data="error" if error else "data" - ) - yield request + yield Request(client_id=CLIENT_ID, request_data=request_data) return stub.BidirectionalStreamingMethod( request_messages(), metadata=(("key", "value"),) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py index 67e7d0a625..edb5213bba 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py @@ -45,23 +45,31 @@ def request_messages(): ) -def server_streaming_method(stub, error=False): - request = Request( - client_id=CLIENT_ID, request_data="error" if error else "data" - ) +def server_streaming_method(stub, error=False, error_mid_stream=False): + if error: + request_data = "error" + elif error_mid_stream: + request_data = "error_mid_stream" + else: + request_data = "data" + request = Request(client_id=CLIENT_ID, request_data=request_data) response_iterator = stub.ServerStreamingMethod( request, metadata=(("key", "value"),) ) list(response_iterator) -def bidirectional_streaming_method(stub, error=False): +def bidirectional_streaming_method(stub, error=False, error_mid_stream=False): + if error: + request_data = "error" + elif error_mid_stream: + request_data = "error_mid_stream" + else: + request_data = "data" + def request_messages(): for _ in range(5): - request = Request( - client_id=CLIENT_ID, request_data="error" if error else "data" - ) - yield request + yield Request(client_id=CLIENT_ID, request_data=request_data) response_iterator = stub.BidirectionalStreamingMethod( request_messages(), metadata=(("key", "value"),) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py index 3c2e7a09f9..e103a20401 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py @@ -50,17 +50,20 @@ def ServerStreamingMethod(self, request, context): code=grpc.StatusCode.INVALID_ARGUMENT, details="server stream error", ) - return test_server_pb2.Response() - # create a generator - def response_messages(): - for _ in range(5): - response = test_server_pb2.Response( - server_id=SERVER_ID, response_data="data" - ) - yield response + if request.request_data == "error_mid_stream": + yield test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + context.abort( + code=grpc.StatusCode.INVALID_ARGUMENT, + details="server stream error mid-stream", + ) - return response_messages() + for _ in range(5): + yield test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) def BidirectionalStreamingMethod(self, request_iterator, context): data = list(request_iterator) @@ -71,6 +74,16 @@ def BidirectionalStreamingMethod(self, request_iterator, context): ) return + if data[0].request_data == "error_mid_stream": + yield test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + context.abort( + code=grpc.StatusCode.INVALID_ARGUMENT, + details="bidirectional error mid-stream", + ) + return + for _ in range(5): yield test_server_pb2.Response( server_id=SERVER_ID, response_data="data" diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py index 850ad79661..e3de0808f5 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py @@ -11,12 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from unittest import IsolatedAsyncioTestCase +from unittest import IsolatedAsyncioTestCase, mock import grpc import opentelemetry.instrumentation.grpc from opentelemetry import trace +from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, +) from opentelemetry.instrumentation.grpc import ( GrpcAioInstrumentorClient, aio_client_interceptors, @@ -24,6 +28,10 @@ from opentelemetry.instrumentation.grpc._aio_client import ( UnaryUnaryAioClientInterceptor, ) +from opentelemetry.instrumentation.grpc._semconv import ( + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, +) from opentelemetry.instrumentation.utils import suppress_instrumentation from opentelemetry.propagate import get_global_textmap, set_global_textmap from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( @@ -32,9 +40,27 @@ RPC_SERVICE, RPC_SYSTEM, ) +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE from opentelemetry.test.mock_textmap import MockTextMapPropagator from opentelemetry.test.test_base import TestBase + +def _new_aio_client_rpc_attrs(full_method, status_name="OK", error_type=None): + """Build expected new-semconv attributes for an aio client RPC span. + + Note: server.address / server.port are not asserted here because + aio_client_interceptors() is constructed without the channel target, + so those attributes are not available to the interceptor. + """ + attrs = { + RPC_SYSTEM_NAME: "grpc", + RPC_METHOD: full_method, + RPC_RESPONSE_STATUS_CODE: status_name, + } + if error_type: + attrs[ERROR_TYPE] = error_type + return attrs + from ._aio_client import ( bidirectional_streaming_method, client_streaming_method, @@ -43,6 +69,7 @@ ) from ._server import create_test_server from .protobuf import test_server_pb2_grpc # pylint: disable=no-name-in-module +from .protobuf.test_server_pb2 import Request class RecordingInterceptor(grpc.aio.UnaryUnaryClientInterceptor): @@ -58,6 +85,17 @@ async def intercept_unary_unary( class TestAioClientInterceptor(TestBase, IsolatedAsyncioTestCase): def setUp(self): super().setUp() + test_name = self._testMethodName if hasattr(self, "_testMethodName") else "" + sem_conv_mode = "default" + if "new_semconv" in test_name: + sem_conv_mode = "rpc" + elif "both_semconv" in test_name: + sem_conv_mode = "rpc/dup" + self.env_patch = mock.patch.dict( + "os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: sem_conv_mode} + ) + self.env_patch.start() + _OpenTelemetrySemanticConventionStability._initialized = False self.server = create_test_server(25565) self.server.start() @@ -71,6 +109,8 @@ def setUp(self): def tearDown(self): super().tearDown() self.server.stop(1000) + self.env_patch.stop() + _OpenTelemetrySemanticConventionStability._initialized = False async def asyncTearDown(self): await self._channel.close() @@ -265,6 +305,279 @@ async def test_error_stream_stream(self): trace.StatusCode.ERROR, ) + async def test_error_unary_stream_mid_stream(self): + with self.assertRaises(grpc.RpcError): + async for _ in server_streaming_method( + self._stub, error_mid_stream=True + ): + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "ServerStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.INVALID_ARGUMENT.value[0], + }, + ) + + async def test_error_stream_stream_mid_stream(self): + with self.assertRaises(grpc.RpcError): + async for _ in bidirectional_streaming_method( + self._stub, error_mid_stream=True + ): + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "BidirectionalStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.INVALID_ARGUMENT.value[0], + }, + ) + + async def test_unimplemented(self): + """Check that calling an unregistered method creates a span with UNIMPLEMENTED status.""" + request = Request(client_id=1, request_data="data") + with self.assertRaises(grpc.RpcError) as cm: + await self._channel.unary_unary( + "/GRPCTestServer/UnimplementedMethod" + )(request.SerializeToString()) + self.assertEqual(cm.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "UnimplementedMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.UNIMPLEMENTED.value[0], + }, + ) + + # --- new semconv (OTEL_SEMCONV_STABILITY_OPT_IN=rpc) tests --- + + async def test_unary_unary_new_semconv(self): + response = await simple_method(self._stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + self.assertSpanHasAttributes( + span, _new_aio_client_rpc_attrs("/GRPCTestServer/SimpleMethod") + ) + + async def test_unary_stream_new_semconv(self): + async for _ in server_streaming_method(self._stub): + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertSpanHasAttributes( + span, + _new_aio_client_rpc_attrs("/GRPCTestServer/ServerStreamingMethod"), + ) + + async def test_stream_unary_new_semconv(self): + await client_streaming_method(self._stub) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertSpanHasAttributes( + span, + _new_aio_client_rpc_attrs("/GRPCTestServer/ClientStreamingMethod"), + ) + + async def test_stream_stream_new_semconv(self): + async for _ in bidirectional_streaming_method(self._stub): + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertSpanHasAttributes( + span, + _new_aio_client_rpc_attrs( + "/GRPCTestServer/BidirectionalStreamingMethod" + ), + ) + + async def test_error_simple_new_semconv(self): + with self.assertRaises(grpc.RpcError): + await simple_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_aio_client_rpc_attrs( + "/GRPCTestServer/SimpleMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + async def test_error_unary_stream_new_semconv(self): + with self.assertRaises(grpc.RpcError): + async for _ in server_streaming_method(self._stub, error=True): + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_aio_client_rpc_attrs( + "/GRPCTestServer/ServerStreamingMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + async def test_error_stream_unary_new_semconv(self): + with self.assertRaises(grpc.RpcError): + await client_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_aio_client_rpc_attrs( + "/GRPCTestServer/ClientStreamingMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + async def test_error_stream_stream_new_semconv(self): + with self.assertRaises(grpc.RpcError): + async for _ in bidirectional_streaming_method(self._stub, error=True): + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_aio_client_rpc_attrs( + "/GRPCTestServer/BidirectionalStreamingMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + async def test_error_unary_stream_mid_stream_new_semconv(self): + with self.assertRaises(grpc.RpcError): + async for _ in server_streaming_method( + self._stub, error_mid_stream=True + ): + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_aio_client_rpc_attrs( + "/GRPCTestServer/ServerStreamingMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + async def test_error_stream_stream_mid_stream_new_semconv(self): + with self.assertRaises(grpc.RpcError): + async for _ in bidirectional_streaming_method( + self._stub, error_mid_stream=True + ): + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_aio_client_rpc_attrs( + "/GRPCTestServer/BidirectionalStreamingMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + async def test_unimplemented_new_semconv(self): + """Check that calling an unregistered method records UNIMPLEMENTED in new semconv.""" + request = Request(client_id=1, request_data="data") + with self.assertRaises(grpc.RpcError) as cm: + await self._channel.unary_unary( + "/GRPCTestServer/UnimplementedMethod" + )(request.SerializeToString()) + self.assertEqual(cm.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_aio_client_rpc_attrs( + "/GRPCTestServer/UnimplementedMethod", + status_name="UNIMPLEMENTED", + error_type="UNIMPLEMENTED", + ), + ) + + # --- both semconv (OTEL_SEMCONV_STABILITY_OPT_IN=rpc/dup) tests --- + + async def test_unary_unary_both_semconv(self): + """Verify that dup mode reports both old and new semconv attributes.""" + response = await simple_method(self._stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertSpanHasAttributes( + span, + { + # old semconv attributes + RPC_SYSTEM: "grpc", + RPC_METHOD: "SimpleMethod", # old value wins in dup mode + RPC_SERVICE: "GRPCTestServer", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], + # new semconv attributes + RPC_SYSTEM_NAME: "grpc", + RPC_RESPONSE_STATUS_CODE: "OK", + }, + ) + # pylint:disable=no-self-use async def test_client_interceptor_trace_context_propagation(self): """ensure that client interceptor correctly inject trace context into all outgoing requests.""" diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py index e9565f8f8f..3f92ec81bf 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py @@ -12,17 +12,26 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio -from unittest import IsolatedAsyncioTestCase +from unittest import IsolatedAsyncioTestCase, mock import grpc import grpc.aio import opentelemetry.instrumentation.grpc from opentelemetry import trace +from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, +) from opentelemetry.instrumentation.grpc import ( GrpcAioInstrumentorServer, aio_server_interceptor, ) +from opentelemetry.instrumentation.grpc._semconv import ( + RPC_METHOD_ORIGINAL, + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, +) from opentelemetry.sdk import trace as trace_sdk from opentelemetry.semconv._incubating.attributes.net_attributes import ( NET_PEER_IP, @@ -34,6 +43,10 @@ RPC_SERVICE, RPC_SYSTEM, ) +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE +from opentelemetry.semconv.attributes.network_attributes import ( + NETWORK_PEER_ADDRESS, +) from opentelemetry.test.test_base import TestBase from opentelemetry.trace import StatusCode @@ -87,6 +100,41 @@ async def run_with_test_server( class TestOpenTelemetryAioServerInterceptor(TestBase, IsolatedAsyncioTestCase): + new_semconv_net_peer_span_attributes = { + NETWORK_PEER_ADDRESS: "[::1]", + } + + def setUp(self): + super().setUp() + test_name = self._testMethodName if hasattr(self, "_testMethodName") else "" + sem_conv_mode = "default" + if "new_semconv" in test_name: + sem_conv_mode = "rpc" + elif "both_semconv" in test_name: + sem_conv_mode = "rpc/dup" + self.env_patch = mock.patch.dict( + "os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: sem_conv_mode} + ) + self.env_patch.start() + _OpenTelemetrySemanticConventionStability._initialized = False + + def tearDown(self): + super().tearDown() + self.env_patch.stop() + _OpenTelemetrySemanticConventionStability._initialized = False + + def _new_server_rpc_attrs(self, full_method, status_name="OK", error_type=None): + """Build expected new-semconv attributes for a server RPC span.""" + attrs = { + **self.new_semconv_net_peer_span_attributes, + RPC_SYSTEM_NAME: "grpc", + RPC_METHOD: full_method, + RPC_RESPONSE_STATUS_CODE: status_name, + } + if error_type: + attrs[ERROR_TYPE] = error_type + return attrs + async def test_instrumentor(self): """Check that automatic instrumentation configures the interceptor""" rpc_call = "/GRPCTestServer/SimpleMethod" @@ -619,6 +667,98 @@ async def request(channel): }, ) + async def test_error_streaming_mid_stream(self): + """Check that an error raised mid-stream ends the span with error status.""" + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + + class MidStreamErrorServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def ServerStreamingMethod(self, request, context): + yield Response(server_id=1, response_data="first") + await context.abort(grpc.StatusCode.INTERNAL, "mid-stream error") + + testcase = self + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + with testcase.assertRaises(grpc.RpcError) as cm: + async for _ in channel.unary_stream(rpc_call)(msg): + pass + testcase.assertEqual(cm.exception.code(), grpc.StatusCode.INTERNAL) + + await run_with_test_server( + request, + servicer=MidStreamErrorServicer(), + interceptors=[aio_server_interceptor()], + ) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + NET_PEER_IP: "[::1]", + NET_PEER_NAME: "localhost", + RPC_METHOD: "ServerStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.INTERNAL.value[0], + }, + ) + + async def test_unimplemented(self): + """Check that calling an unimplemented method creates a span with UNIMPLEMENTED status.""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + class UnimplementedServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def SimpleMethod(self, request, context): + await context.abort( + grpc.StatusCode.UNIMPLEMENTED, "Method not implemented!" + ) + + testcase = self + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + with testcase.assertRaises(grpc.RpcError) as cm: + await channel.unary_unary(rpc_call)(msg) + testcase.assertEqual( + cm.exception.code(), grpc.StatusCode.UNIMPLEMENTED + ) + + await run_with_test_server( + request, + servicer=UnimplementedServicer(), + interceptors=[aio_server_interceptor()], + ) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + NET_PEER_IP: "[::1]", + NET_PEER_NAME: "localhost", + RPC_METHOD: "SimpleMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.UNIMPLEMENTED.value[0], + }, + ) + async def test_non_list_interceptors(self): """Check that we handle non-list interceptors correctly.""" @@ -672,6 +812,226 @@ async def intercept_service( ) + # --- new semconv (OTEL_SEMCONV_STABILITY_OPT_IN=rpc) tests --- + + async def test_create_span_new_semconv(self): + """Check that the interceptor records new-semconv attributes on a unary span.""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + async def request(channel): + req = Request(client_id=1, request_data="test") + msg = req.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + await run_with_test_server(request, interceptors=[aio_server_interceptor()]) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.assertSpanHasAttributes(span, self._new_server_rpc_attrs(rpc_call)) + + async def test_create_span_streaming_new_semconv(self): + """Check that the interceptor records new-semconv attributes on a streaming span.""" + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + + async def request(channel): + req = Request(client_id=1, request_data="test") + msg = req.SerializeToString() + async for _ in channel.unary_stream(rpc_call)(msg): + pass + + await run_with_test_server(request, interceptors=[aio_server_interceptor()]) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertSpanHasAttributes(span, self._new_server_rpc_attrs(rpc_call)) + + async def test_abort_new_semconv(self): + """Check that abort sets new-semconv error attributes correctly. + + INTERNAL is a server error → error.type set; FAILED_PRECONDITION is not → no error.type. + """ + rpc_call = "/GRPCTestServer/SimpleMethod" + failure_message = "failure message" + + class InternalAbortServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def SimpleMethod(self, request, context): + await context.abort(grpc.StatusCode.INTERNAL, failure_message) + + testcase = self + + async def request(channel): + req = Request(client_id=1, request_data=failure_message) + msg = req.SerializeToString() + with testcase.assertRaises(grpc.RpcError): + await channel.unary_unary(rpc_call)(msg) + + await run_with_test_server( + request, + servicer=InternalAbortServicer(), + interceptors=[aio_server_interceptor()], + ) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + self._new_server_rpc_attrs( + rpc_call, + status_name="INTERNAL", + error_type="INTERNAL", + ), + ) + + async def test_error_streaming_mid_stream_new_semconv(self): + """Check that a mid-stream error sets new-semconv error attributes.""" + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + + class MidStreamErrorServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def ServerStreamingMethod(self, request, context): + yield Response(server_id=1, response_data="first") + await context.abort(grpc.StatusCode.INTERNAL, "mid-stream error") + + testcase = self + + async def request(channel): + req = Request(client_id=1, request_data="test") + msg = req.SerializeToString() + with testcase.assertRaises(grpc.RpcError) as cm: + async for _ in channel.unary_stream(rpc_call)(msg): + pass + testcase.assertEqual(cm.exception.code(), grpc.StatusCode.INTERNAL) + + await run_with_test_server( + request, + servicer=MidStreamErrorServicer(), + interceptors=[aio_server_interceptor()], + ) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + self._new_server_rpc_attrs( + rpc_call, + status_name="INTERNAL", + error_type="INTERNAL", + ), + ) + + async def test_unimplemented_new_semconv(self): + """Check that an unimplemented handler records UNIMPLEMENTED in new semconv.""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + class UnimplementedServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def SimpleMethod(self, request, context): + await context.abort(grpc.StatusCode.UNIMPLEMENTED, "not implemented") + + testcase = self + + async def request(channel): + req = Request(client_id=1, request_data="test") + msg = req.SerializeToString() + with testcase.assertRaises(grpc.RpcError) as cm: + await channel.unary_unary(rpc_call)(msg) + testcase.assertEqual(cm.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + + await run_with_test_server( + request, + servicer=UnimplementedServicer(), + interceptors=[aio_server_interceptor()], + ) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + self._new_server_rpc_attrs( + rpc_call, + status_name="UNIMPLEMENTED", + error_type="UNIMPLEMENTED", + ), + ) + + async def test_unimplemented_unknown_method_new_semconv(self): + """In new-semconv mode, a totally unknown method produces an _OTHER span.""" + rpc_call = "/GRPCTestServer/UnknownMethod" + + testcase = self + + async def request(channel): + req = Request(client_id=1, request_data="test") + msg = req.SerializeToString() + with testcase.assertRaises(grpc.RpcError) as cm: + await channel.unary_unary(rpc_call)(msg) + testcase.assertEqual(cm.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + + await run_with_test_server(request, interceptors=[aio_server_interceptor()]) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "_OTHER") + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + **self.new_semconv_net_peer_span_attributes, + RPC_SYSTEM_NAME: "grpc", + "rpc.method": "_OTHER", + RPC_METHOD_ORIGINAL: rpc_call, + RPC_RESPONSE_STATUS_CODE: "UNIMPLEMENTED", + ERROR_TYPE: "UNIMPLEMENTED", + }, + ) + + # --- both semconv (OTEL_SEMCONV_STABILITY_OPT_IN=rpc/dup) tests --- + + async def test_create_span_both_semconv(self): + """Verify that dup mode reports both old and new semconv attributes.""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + async def request(channel): + req = Request(client_id=1, request_data="test") + msg = req.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + await run_with_test_server(request, interceptors=[aio_server_interceptor()]) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertSpanHasAttributes( + span, + { + # old semconv + NET_PEER_IP: "[::1]", + NET_PEER_NAME: "localhost", + RPC_SYSTEM: "grpc", + RPC_METHOD: "SimpleMethod", # old value wins in dup mode + RPC_SERVICE: "GRPCTestServer", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], + # new semconv + **self.new_semconv_net_peer_span_attributes, + RPC_SYSTEM_NAME: "grpc", + RPC_RESPONSE_STATUS_CODE: "OK", + }, + ) + + def get_latch(num): """Get a countdown latch function for use in n threads.""" cv = asyncio.Condition() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index e001d2ed57..5384d9e6e5 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -19,10 +19,18 @@ import opentelemetry.instrumentation.grpc from opentelemetry import trace +from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, +) from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient from opentelemetry.instrumentation.grpc._client import ( OpenTelemetryClientInterceptor, ) +from opentelemetry.instrumentation.grpc._semconv import ( + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, +) from opentelemetry.instrumentation.grpc.grpcext._interceptor import ( _UnaryClientInfo, ) @@ -35,6 +43,11 @@ RPC_SERVICE, RPC_SYSTEM, ) +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE +from opentelemetry.semconv.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) from opentelemetry.test.mock_textmap import MockTextMapPropagator from opentelemetry.test.test_base import TestBase @@ -49,6 +62,23 @@ from .protobuf import test_server_pb2_grpc from .protobuf.test_server_pb2 import Request +_CLIENT_SERVER_HOST = "localhost" +_CLIENT_SERVER_PORT = 25565 + + +def _new_client_rpc_attrs(full_method, status_name="OK", error_type=None): + """Build expected new-semconv attributes for a client RPC span.""" + attrs = { + RPC_SYSTEM_NAME: "grpc", + RPC_METHOD: full_method, + RPC_RESPONSE_STATUS_CODE: status_name, + SERVER_ADDRESS: _CLIENT_SERVER_HOST, + SERVER_PORT: _CLIENT_SERVER_PORT, + } + if error_type: + attrs[ERROR_TYPE] = error_type + return attrs + # User defined interceptor. Is used in the tests along with the opentelemetry client interceptor. class Interceptor( @@ -94,6 +124,17 @@ def _intercept_call( class TestClientProto(TestBase): def setUp(self): super().setUp() + test_name = self._testMethodName if hasattr(self, "_testMethodName") else "" + sem_conv_mode = "default" + if "new_semconv" in test_name: + sem_conv_mode = "rpc" + elif "both_semconv" in test_name: + sem_conv_mode = "rpc/dup" + self.env_patch = mock.patch.dict( + "os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: sem_conv_mode} + ) + self.env_patch.start() + _OpenTelemetrySemanticConventionStability._initialized = False GrpcInstrumentorClient().instrument() self.server = create_test_server(25565) self.server.start() @@ -108,6 +149,8 @@ def tearDown(self): GrpcInstrumentorClient().uninstrument() self.server.stop(None) self.channel.close() + self.env_patch.stop() + _OpenTelemetrySemanticConventionStability._initialized = False def test_unary_unary_future(self): simple_method_future(self._stub).result() @@ -269,6 +312,65 @@ def test_error_stream_stream(self): trace.StatusCode.ERROR, ) + def test_error_unary_stream_mid_stream(self): + with self.assertRaises(grpc.RpcError): + server_streaming_method(self._stub, error_mid_stream=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "ServerStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.INVALID_ARGUMENT.value[0], + }, + ) + + def test_error_stream_stream_mid_stream(self): + with self.assertRaises(grpc.RpcError): + bidirectional_streaming_method(self._stub, error_mid_stream=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "BidirectionalStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.INVALID_ARGUMENT.value[0], + }, + ) + + def test_unimplemented(self): + """Check that calling an unregistered method creates a span with UNIMPLEMENTED status.""" + request = Request(client_id=1, request_data="data") + with self.assertRaises(grpc.RpcError) as cm: + self.channel.unary_unary("/GRPCTestServer/UnimplementedMethod")( + request.SerializeToString() + ) + self.assertEqual(cm.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "UnimplementedMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.UNIMPLEMENTED.value[0], + }, + ) + def test_client_interceptor_falsy_response( self, ): @@ -330,6 +432,192 @@ def invoker(request, metadata): finally: set_global_textmap(previous_propagator) + # --- new semconv (OTEL_SEMCONV_STABILITY_OPT_IN=rpc) tests --- + + def test_unary_unary_new_semconv(self): + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + self.assertSpanHasAttributes( + span, _new_client_rpc_attrs("/GRPCTestServer/SimpleMethod") + ) + + def test_unary_stream_new_semconv(self): + server_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") + self.assertSpanHasAttributes( + span, _new_client_rpc_attrs("/GRPCTestServer/ServerStreamingMethod") + ) + + def test_stream_unary_new_semconv(self): + client_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.name, "/GRPCTestServer/ClientStreamingMethod") + self.assertSpanHasAttributes( + span, _new_client_rpc_attrs("/GRPCTestServer/ClientStreamingMethod") + ) + + def test_stream_stream_new_semconv(self): + bidirectional_streaming_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.name, "/GRPCTestServer/BidirectionalStreamingMethod") + self.assertSpanHasAttributes( + span, + _new_client_rpc_attrs("/GRPCTestServer/BidirectionalStreamingMethod"), + ) + + def test_error_simple_new_semconv(self): + with self.assertRaises(grpc.RpcError): + simple_method(self._stub, error=True) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_client_rpc_attrs( + "/GRPCTestServer/SimpleMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + def test_error_stream_unary_new_semconv(self): + with self.assertRaises(grpc.RpcError): + client_streaming_method(self._stub, error=True) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_client_rpc_attrs( + "/GRPCTestServer/ClientStreamingMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + def test_error_unary_stream_new_semconv(self): + with self.assertRaises(grpc.RpcError): + server_streaming_method(self._stub, error=True) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_client_rpc_attrs( + "/GRPCTestServer/ServerStreamingMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + def test_error_stream_stream_new_semconv(self): + with self.assertRaises(grpc.RpcError): + bidirectional_streaming_method(self._stub, error=True) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_client_rpc_attrs( + "/GRPCTestServer/BidirectionalStreamingMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + def test_error_unary_stream_mid_stream_new_semconv(self): + with self.assertRaises(grpc.RpcError): + server_streaming_method(self._stub, error_mid_stream=True) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_client_rpc_attrs( + "/GRPCTestServer/ServerStreamingMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + def test_error_stream_stream_mid_stream_new_semconv(self): + with self.assertRaises(grpc.RpcError): + bidirectional_streaming_method(self._stub, error_mid_stream=True) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_client_rpc_attrs( + "/GRPCTestServer/BidirectionalStreamingMethod", + status_name="INVALID_ARGUMENT", + error_type="INVALID_ARGUMENT", + ), + ) + + def test_unimplemented_new_semconv(self): + """Check that calling an unregistered method records UNIMPLEMENTED in new semconv.""" + request = Request(client_id=1, request_data="data") + with self.assertRaises(grpc.RpcError) as cm: + self.channel.unary_unary("/GRPCTestServer/UnimplementedMethod")( + request.SerializeToString() + ) + self.assertEqual(cm.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs(span.status.status_code, trace.StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + _new_client_rpc_attrs( + "/GRPCTestServer/UnimplementedMethod", + status_name="UNIMPLEMENTED", + error_type="UNIMPLEMENTED", + ), + ) + + # --- both semconv (OTEL_SEMCONV_STABILITY_OPT_IN=rpc/dup) tests --- + + def test_unary_unary_both_semconv(self): + """Verify that dup mode reports both old and new semconv attributes.""" + simple_method(self._stub) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertSpanHasAttributes( + span, + { + # old semconv attributes + RPC_SYSTEM: "grpc", + RPC_METHOD: "SimpleMethod", # old value wins in dup mode + RPC_SERVICE: "GRPCTestServer", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], + # new semconv attributes + RPC_SYSTEM_NAME: "grpc", + RPC_RESPONSE_STATUS_CODE: "OK", + SERVER_ADDRESS: _CLIENT_SERVER_HOST, + SERVER_PORT: _CLIENT_SERVER_PORT, + }, + ) + def test_unary_unary_with_suppress_key(self): with suppress_instrumentation(): simple_method(self._stub) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index 5f53b0a3ea..0f5fb5b927 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -25,10 +25,19 @@ import opentelemetry.instrumentation.grpc from opentelemetry import trace +from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, +) from opentelemetry.instrumentation.grpc import ( GrpcInstrumentorServer, server_interceptor, ) +from opentelemetry.instrumentation.grpc._semconv import ( + RPC_METHOD_ORIGINAL, + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, +) from opentelemetry.sdk import trace as trace_sdk from opentelemetry.semconv._incubating.attributes.net_attributes import ( NET_PEER_IP, @@ -40,6 +49,10 @@ RPC_SERVICE, RPC_SYSTEM, ) +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE +from opentelemetry.semconv.attributes.network_attributes import ( + NETWORK_PEER_ADDRESS, +) from opentelemetry.test.test_base import TestBase from opentelemetry.trace import StatusCode @@ -94,6 +107,40 @@ class TestOpenTelemetryServerInterceptor(TestBase): NET_PEER_IP: "[::1]", NET_PEER_NAME: "localhost", } + new_semconv_net_peer_span_attributes = { + NETWORK_PEER_ADDRESS: "[::1]", + } + + def setUp(self): + super().setUp() + test_name = self._testMethodName if hasattr(self, "_testMethodName") else "" + sem_conv_mode = "default" + if "new_semconv" in test_name: + sem_conv_mode = "rpc" + elif "both_semconv" in test_name: + sem_conv_mode = "rpc/dup" + self.env_patch = mock.patch.dict( + "os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: sem_conv_mode} + ) + self.env_patch.start() + _OpenTelemetrySemanticConventionStability._initialized = False + + def tearDown(self): + super().tearDown() + self.env_patch.stop() + _OpenTelemetrySemanticConventionStability._initialized = False + + def _new_server_rpc_attrs(self, full_method, status_name="OK", error_type=None): + """Build expected new-semconv attributes for a server RPC span.""" + attrs = { + **self.new_semconv_net_peer_span_attributes, + RPC_SYSTEM_NAME: "grpc", + RPC_METHOD: full_method, + RPC_RESPONSE_STATUS_CODE: status_name, + } + if error_type: + attrs[ERROR_TYPE] = error_type + return attrs @contextlib.contextmanager def server(self, max_workers=1, interceptors=None): @@ -646,6 +693,93 @@ def unset_status_handler(request, context): }, ) + def test_error_streaming_mid_stream(self): + """Check that an error raised mid-stream ends the span with error status.""" + + class MidStreamErrorServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + def ServerStreamingMethod(self, request, context): + yield Response(server_id=1, response_data="first") + context.abort(grpc.StatusCode.INTERNAL, "mid-stream error") + + interceptor = server_interceptor() + + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): + add_GRPCTestServerServicer_to_server(MidStreamErrorServicer(), server) + + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + with self.assertRaises(grpc.RpcError) as cm: + list(channel.unary_stream(rpc_call)(msg)) + self.assertEqual(cm.exception.code(), grpc.StatusCode.INTERNAL) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + **self.net_peer_span_attributes, + RPC_METHOD: "ServerStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.INTERNAL.value[0], + }, + ) + + def test_unimplemented(self): + """Check that calling an unimplemented method creates a span with UNIMPLEMENTED status.""" + + interceptor = server_interceptor() + + with self.server( + max_workers=1, + interceptors=[interceptor], + ) as (server, channel): + # The base servicer sets UNIMPLEMENTED and raises NotImplementedError + add_GRPCTestServerServicer_to_server(GRPCTestServerServicer(), server) + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + with self.assertRaises(grpc.RpcError) as cm: + channel.unary_unary(rpc_call)(msg) + self.assertEqual(cm.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + **self.net_peer_span_attributes, + RPC_METHOD: "SimpleMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.UNIMPLEMENTED.value[0], + }, + ) + def test_non_list_interceptors(self): """Check that we handle non-list interceptors correctly.""" grpc_server_instrumentor = GrpcInstrumentorServer() @@ -694,10 +828,251 @@ def test_non_list_interceptors(self): ) + # --- new semconv (OTEL_SEMCONV_STABILITY_OPT_IN=rpc) tests --- + + def test_create_span_new_semconv(self): + """Check that the interceptor records new-semconv attributes on a unary span.""" + interceptor = server_interceptor() + + with self.server(max_workers=1, interceptors=[interceptor]) as (server, channel): + add_GRPCTestServerServicer_to_server(Servicer(), server) + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.assertSpanHasAttributes( + span, self._new_server_rpc_attrs(rpc_call) + ) + + def test_create_span_streaming_new_semconv(self): + """Check that the interceptor records new-semconv attributes on a streaming span.""" + interceptor = server_interceptor() + + with self.server(max_workers=1, interceptors=[interceptor]) as (server, channel): + add_GRPCTestServerServicer_to_server(Servicer(), server) + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + list(channel.unary_stream(rpc_call)(msg)) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertSpanHasAttributes( + span, self._new_server_rpc_attrs(rpc_call) + ) + + def test_abort_new_semconv(self): + """Check that abort sets new-semconv error attributes correctly. + + INTERNAL is a server error → error.type set; FAILED_PRECONDITION is not → no error.type. + """ + interceptor = server_interceptor() + + failure_message = "This is a test failure" + + def internal_handler(request, context): + context.abort(grpc.StatusCode.INTERNAL, failure_message) + + def precondition_handler(request, context): + context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message) + + for handler, expected_code, expect_error in [ + (internal_handler, grpc.StatusCode.INTERNAL, True), + (precondition_handler, grpc.StatusCode.FAILED_PRECONDITION, False), + ]: + self.memory_exporter.clear() + with self.server(max_workers=1, interceptors=[interceptor]) as (server, channel): + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + rpc_call = f"TestServicer/{handler.__name__}" + try: + server.start() + with self.assertRaises(Exception): + channel.unary_unary(rpc_call)(b"") + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + if expect_error: + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + RPC_SYSTEM_NAME: "grpc", + RPC_RESPONSE_STATUS_CODE: expected_code.name, + ERROR_TYPE: expected_code.name, + }, + ) + else: + self.assertEqual(span.status.status_code, StatusCode.UNSET) + self.assertSpanHasAttributes( + span, + { + RPC_SYSTEM_NAME: "grpc", + RPC_RESPONSE_STATUS_CODE: expected_code.name, + }, + ) + self.assertNotIn(ERROR_TYPE, span.attributes or {}) + + def test_error_streaming_mid_stream_new_semconv(self): + """Check that a mid-stream error sets new-semconv error attributes.""" + + class MidStreamErrorServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + def ServerStreamingMethod(self, request, context): + yield Response(server_id=1, response_data="first") + context.abort(grpc.StatusCode.INTERNAL, "mid-stream error") + + interceptor = server_interceptor() + + with self.server(max_workers=1, interceptors=[interceptor]) as (server, channel): + add_GRPCTestServerServicer_to_server(MidStreamErrorServicer(), server) + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + with self.assertRaises(grpc.RpcError): + list(channel.unary_stream(rpc_call)(msg)) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + self._new_server_rpc_attrs( + rpc_call, + status_name="INTERNAL", + error_type="INTERNAL", + ), + ) + + def test_unimplemented_new_semconv(self): + """Check that an unimplemented handler records UNIMPLEMENTED in new semconv.""" + interceptor = server_interceptor() + + with self.server(max_workers=1, interceptors=[interceptor]) as (server, channel): + add_GRPCTestServerServicer_to_server(GRPCTestServerServicer(), server) + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + with self.assertRaises(grpc.RpcError) as cm: + channel.unary_unary(rpc_call)(msg) + self.assertEqual(cm.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + self._new_server_rpc_attrs( + rpc_call, + status_name="UNIMPLEMENTED", + error_type="UNIMPLEMENTED", + ), + ) + + def test_unimplemented_unknown_method_new_semconv(self): + """In new-semconv mode, a totally unknown method produces an _OTHER span.""" + interceptor = server_interceptor() + + with self.server(max_workers=1, interceptors=[interceptor]) as (server, channel): + add_GRPCTestServerServicer_to_server(Servicer(), server) + rpc_call = "/GRPCTestServer/UnknownMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + with self.assertRaises(grpc.RpcError) as cm: + channel.unary_unary(rpc_call)(msg) + self.assertEqual(cm.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "_OTHER") + self.assertIs(span.kind, trace.SpanKind.SERVER) + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertSpanHasAttributes( + span, + { + **self.new_semconv_net_peer_span_attributes, + RPC_SYSTEM_NAME: "grpc", + "rpc.method": "_OTHER", + RPC_METHOD_ORIGINAL: rpc_call, + RPC_RESPONSE_STATUS_CODE: "UNIMPLEMENTED", + ERROR_TYPE: "UNIMPLEMENTED", + }, + ) + + # --- both semconv (OTEL_SEMCONV_STABILITY_OPT_IN=rpc/dup) tests --- + + def test_create_span_both_semconv(self): + """Verify that dup mode reports both old and new semconv attributes on a server span.""" + interceptor = server_interceptor() + + with self.server(max_workers=1, interceptors=[interceptor]) as (server, channel): + add_GRPCTestServerServicer_to_server(Servicer(), server) + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertSpanHasAttributes( + span, + { + # old semconv + **self.net_peer_span_attributes, + RPC_SYSTEM: "grpc", + RPC_METHOD: "SimpleMethod", # old value wins in dup mode + RPC_SERVICE: "GRPCTestServer", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], + # new semconv + **self.new_semconv_net_peer_span_attributes, + RPC_SYSTEM_NAME: "grpc", + RPC_RESPONSE_STATUS_CODE: "OK", + }, + ) + + class TestOpenTelemetryServerInterceptorUnix( TestOpenTelemetryServerInterceptor, ): net_peer_span_attributes = {} + new_semconv_net_peer_span_attributes = {} @contextlib.contextmanager def server(self, max_workers=1, interceptors=None): diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_semconv.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_semconv.py index e30cdf2dfb..dfc648826c 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_semconv.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_semconv.py @@ -182,6 +182,7 @@ class _OpenTelemetryStabilitySignalType(Enum): HTTP = "http" DATABASE = "database" GEN_AI = "gen_ai" + RPC = "rpc" class _StabilityMode(Enum): @@ -191,6 +192,8 @@ class _StabilityMode(Enum): DATABASE = "database" DATABASE_DUP = "database/dup" GEN_AI_LATEST_EXPERIMENTAL = "gen_ai_latest_experimental" + RPC = "rpc" + RPC_DUP = "rpc/dup" def _report_new(mode: _StabilityMode): @@ -198,7 +201,11 @@ def _report_new(mode: _StabilityMode): def _report_old(mode: _StabilityMode): - return mode not in (_StabilityMode.HTTP, _StabilityMode.DATABASE) + return mode not in ( + _StabilityMode.HTTP, + _StabilityMode.DATABASE, + _StabilityMode.RPC, + ) class _OpenTelemetrySemanticConventionStability: @@ -222,6 +229,7 @@ def _initialize(cls): _OpenTelemetryStabilitySignalType.HTTP: _StabilityMode.DEFAULT, _OpenTelemetryStabilitySignalType.DATABASE: _StabilityMode.DEFAULT, _OpenTelemetryStabilitySignalType.GEN_AI: _StabilityMode.DEFAULT, + _OpenTelemetryStabilitySignalType.RPC: _StabilityMode.DEFAULT, } cls._initialized = True return @@ -249,6 +257,14 @@ def _initialize(cls): _StabilityMode.DATABASE, _StabilityMode.DATABASE_DUP, ) + + cls._OTEL_SEMCONV_STABILITY_SIGNAL_MAPPING[ + _OpenTelemetryStabilitySignalType.RPC + ] = cls._filter_mode( + opt_in_list, + _StabilityMode.RPC, + _StabilityMode.RPC_DUP, + ) cls._initialized = True @staticmethod diff --git a/tox.ini b/tox.ini index d1989a834b..ea4c7108c3 100644 --- a/tox.ini +++ b/tox.ini @@ -780,6 +780,7 @@ setenv = UV_CONFIG_FILE={toxinidir}/tox-uv.toml PIP_CONSTRAINTS={toxinidir}/test-constraints.txt UV_BUILD_CONSTRAINT={toxinidir}/test-constraints.txt + grpc: PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python commands_pre = ; In order to get a health coverage report,