Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#
from logging import getLogger
from re import split
from typing import Iterable, List, Mapping, Optional, Set
from typing import Iterable, List, Mapping, Optional, Protocol, Sequence, Set
from urllib.parse import quote_plus, unquote_plus

from opentelemetry.baggage import _is_valid_pair, get_all, set_baggage
Expand Down Expand Up @@ -144,3 +144,178 @@ def _extract_first_element(
if items is None:
return None
return next(iter(items), None)


class PredicateT(Protocol):
def __call__(
self,
name: str,
value: str,
) -> bool: ...

def __str__(self) -> str: ...


class OnKeyPresence(PredicateT):
def __init__(self, key: str):
self._key = key

def __call__(
self,
name: str,
value: str,
) -> bool:
return name == self._key

def __str__(self):
return f"{self._key}=*"


class OnKeyValue(PredicateT):
def __init__(self, key: str, value: str):
self._key = key
self._value = value

def __call__(
self,
name: str,
value: str,
) -> bool:
return name == self._key and value == self._value

def __str__(self):
return f"{self._key}={self._value}"


class AlwaysPredicate(PredicateT):
def __call__(
self,
name: str,
value: str,
) -> bool:
return True

def __str__(self):
return "*"


RulesT = Sequence[tuple[PredicateT, bool]]


class RuleBasedW3CBaggagePropagator(textmap.TextMapPropagator):
"""Extracts and injects Baggage which is used to annotate telemetry.

Baggage entries are injected depending on the rules."""

_MAX_HEADER_LENGTH = 8192
_MAX_PAIR_LENGTH = 4096
_MAX_PAIRS = 180
_BAGGAGE_HEADER_NAME = "baggage"

def __init__(self, rules: RulesT):
self._rules = rules

def extract(
self,
carrier: textmap.CarrierT,
context: Optional[Context] = None,
getter: textmap.Getter[textmap.CarrierT] = textmap.default_getter,
) -> Context:
"""Extract Baggage from the carrier.

See
`opentelemetry.propagators.textmap.TextMapPropagator.extract`
"""

if context is None:
context = get_current()

header = _extract_first_element(
getter.get(carrier, self._BAGGAGE_HEADER_NAME)
)

if not header:
return context

if len(header) > self._MAX_HEADER_LENGTH:
_logger.warning(
"Baggage header `%s` exceeded the maximum number of bytes per baggage-string",
header,
)
return context

baggage_entries: List[str] = split(_DELIMITER_PATTERN, header)
total_baggage_entries = self._MAX_PAIRS

if len(baggage_entries) > self._MAX_PAIRS:
_logger.warning(
"Baggage header `%s` exceeded the maximum number of list-members",
header,
)

for entry in baggage_entries:
if len(entry) > self._MAX_PAIR_LENGTH:
_logger.warning(
"Baggage entry `%s` exceeded the maximum number of bytes per list-member",
entry,
)
continue
if not entry: # empty string
continue
try:
name, value = entry.split("=", 1)
except Exception: # pylint: disable=broad-exception-caught
_logger.warning(
"Baggage list-member `%s` doesn't match the format", entry
)
continue

if not _is_valid_pair(name, value):
_logger.warning("Invalid baggage entry: `%s`", entry)
continue

name = unquote_plus(name).strip()
value = unquote_plus(value).strip()

skip_entry = False
for predicate, outcome in self._rules:
if predicate(name, value):
skip_entry = outcome
break

if skip_entry:
continue

context = set_baggage(
name,
value,
context=context,
)
total_baggage_entries -= 1
if total_baggage_entries == 0:
break

return context

def inject(
self,
carrier: textmap.CarrierT,
context: Optional[Context] = None,
setter: textmap.Setter[textmap.CarrierT] = textmap.default_setter,
) -> None:
"""Injects Baggage into the carrier.

See
`opentelemetry.propagators.textmap.TextMapPropagator.inject`
"""
baggage_entries = get_all(context=context)
if not baggage_entries:
return

baggage_string = _format_baggage(baggage_entries)
setter.set(carrier, self._BAGGAGE_HEADER_NAME, baggage_string)

@property
def fields(self) -> Set[str]:
"""Returns a set with the fields set in `inject`."""
return {self._BAGGAGE_HEADER_NAME}
50 changes: 6 additions & 44 deletions opentelemetry-api/src/opentelemetry/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@
from typing_extensions import deprecated

from opentelemetry import context as context_api
from opentelemetry.attributes import BoundedAttributes
from opentelemetry.context.context import Context
from opentelemetry.environment_variables import OTEL_PYTHON_TRACER_PROVIDER
from opentelemetry.trace.link import Link
from opentelemetry.trace.propagation import (
_SPAN_KEY,
get_current_link,
get_current_span,
set_link_in_context,
set_span_in_context,
)
from opentelemetry.trace.span import (
Expand All @@ -115,47 +116,6 @@
logger = getLogger(__name__)


class _LinkBase(ABC):
def __init__(self, context: "SpanContext") -> None:
self._context = context

@property
def context(self) -> "SpanContext":
return self._context

@property
@abstractmethod
def attributes(self) -> types.Attributes:
pass


class Link(_LinkBase):
"""A link to a `Span`. The attributes of a Link are immutable.

Args:
context: `SpanContext` of the `Span` to link to.
attributes: Link's attributes.
"""

def __init__(
self,
context: "SpanContext",
attributes: types.Attributes = None,
) -> None:
super().__init__(context)
self._attributes = attributes

@property
def attributes(self) -> types.Attributes:
return self._attributes

@property
def dropped_attributes(self) -> int:
if isinstance(self._attributes, BoundedAttributes):
return self._attributes.dropped
return 0


_Links = Optional[Sequence[Link]]


Expand Down Expand Up @@ -614,7 +574,7 @@ def use_span(
this mechanism if it was previously set manually.
"""
try:
token = context_api.attach(context_api.set_value(_SPAN_KEY, span))
token = context_api.attach(set_span_in_context(span))
try:
yield span
finally:
Expand Down Expand Up @@ -673,6 +633,8 @@ def use_span(
"set_tracer_provider",
"set_span_in_context",
"use_span",
"get_current_link",
"set_link_in_context",
"Status",
"StatusCode",
]
60 changes: 60 additions & 0 deletions opentelemetry-api/src/opentelemetry/trace/link.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod

from opentelemetry.attributes import BoundedAttributes
from opentelemetry.trace.span import SpanContext
from opentelemetry.util import types


class _LinkBase(ABC):
def __init__(self, context: SpanContext) -> None:
self._context = context

@property
def context(self) -> SpanContext:
return self._context

@property
@abstractmethod
def attributes(self) -> types.Attributes:
pass


class Link(_LinkBase):
"""A link to a `Span`. The attributes of a Link are immutable.

Args:
context: `SpanContext` of the `Span` to link to.
attributes: Link's attributes.
"""

def __init__(
self,
context: SpanContext,
attributes: types.Attributes = None,
) -> None:
super().__init__(context)
self._attributes = attributes

@property
def attributes(self) -> types.Attributes:
return self._attributes

@property
def dropped_attributes(self) -> int:
if isinstance(self._attributes, BoundedAttributes):
return self._attributes.dropped
return 0
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

from opentelemetry.context import create_key, get_value, set_value
from opentelemetry.context.context import Context
from opentelemetry.trace.link import Link
from opentelemetry.trace.span import INVALID_SPAN, Span

SPAN_KEY = "current-span"
_SPAN_KEY = create_key("current-span")
_LINK_KEY = create_key("current-link")


def set_span_in_context(
Expand All @@ -32,7 +34,8 @@ def set_span_in_context(
default current context is used instead.
"""
ctx = set_value(_SPAN_KEY, span, context=context)
return ctx
# A pending restart link is a one-shot hint for the next activated span.
return set_value(_LINK_KEY, None, context=ctx)


def get_current_span(context: Optional[Context] = None) -> Span:
Expand All @@ -49,3 +52,33 @@ def get_current_span(context: Optional[Context] = None) -> Span:
if span is None or not isinstance(span, Span):
return INVALID_SPAN
return span


def set_link_in_context(
link: Link, context: Optional[Context] = None
) -> Context:
"""Set the link in the given context.

Args:
link: The Link to set.
context: a Context object. if one is not passed, the
default current context is used instead.
"""
ctx = set_value(_LINK_KEY, link, context=context)
return ctx


def get_current_link(context: Optional[Context] = None) -> Optional[Link]:
"""Retrieve the current link.

Args:
context: A Context object. If one is not passed, the
default current context is used instead.

Returns:
The Link set in the context if it exists. None otherwise.
"""
link = get_value(_LINK_KEY, context=context)
if link is None or not isinstance(link, Link):
return None
return link
Loading
Loading