Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion packages/uipath-core/src/uipath/core/tracing/processors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Custom span processors for UiPath execution tracing."""

from typing import cast
from typing import Callable, cast

from opentelemetry import context as context_api
from opentelemetry import trace
Expand All @@ -13,6 +13,21 @@

from uipath.core.tracing.types import UiPathTraceSettings

# Hooks called synchronously in on_start (same thread as span creation, where
# ContextVar values are still live). Use register_span_start_hook to stamp
# span attributes that must survive the BatchSpanProcessor thread boundary.
_span_start_hooks: list[Callable[[Span], None]] = []


def register_span_start_hook(hook: Callable[[Span], None]) -> None:
"""Register a callable invoked for every span at creation time.

The hook receives the live, writable Span so it can call set_attribute.
Runs in the same thread/context as the span creator — ContextVar values
are available here but NOT in the BatchSpanProcessor export thread.
"""
_span_start_hooks.append(hook)


class UiPathExecutionTraceProcessorMixin:
"""Mixin that propagates execution.id and optionally filters spans."""
Expand All @@ -32,6 +47,9 @@ def on_start(self, span: Span, parent_context: context_api.Context | None = None
if execution_id:
span.set_attribute("execution.id", execution_id)

for hook in _span_start_hooks:
hook(span)
Comment on lines +50 to +51

def on_end(self, span: ReadableSpan):
"""Called when a span ends. Filters before delegating to parent."""
span_filter = self._settings.span_filter if self._settings else None
Expand Down Expand Up @@ -73,4 +91,5 @@ def __init__(
__all__ = [
"UiPathExecutionBatchTraceProcessor",
"UiPathExecutionSimpleTraceProcessor",
"register_span_start_hook",
]
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
from ._http_config import get_ca_bundle_path, get_httpx_client_kwargs
from ._models import Endpoint, RequestSpec
from ._service_url_overrides import inject_routing_headers, resolve_service_url
from ._reference_context import (
ReferenceContext,
ReferenceContextAccessor,
ReferenceEntry,
)
from ._span_utils import UiPathSpan, _SpanUtils
from ._url import UiPathUrl
from ._user_agent import user_agent_value
Expand Down Expand Up @@ -108,6 +113,9 @@
"ResourceOverwrite",
"ResourceOverwriteParser",
"ResourceOverwritesContext",
"ReferenceEntry",
"ReferenceContext",
"ReferenceContextAccessor",
"UiPathSpan",
"_SpanUtils",
"resolve_service_url",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
"""Immutable reference-hierarchy context for span propagation.

Follows the same design as service-common BaggageContext:
- Immutable, copy-on-write — each mutating call returns a NEW instance so
sibling spans cannot bleed context into each other.
- ContextVar-backed accessor — flows across await boundaries without
threading the value through every function signature.
- Wire format compatible with the ``ref.*`` keys in ``x-uipath-tracebaggage``
so context parsed by service-common middleware is understood here and
vice-versa.
"""
from __future__ import annotations

import contextvars
import uuid
from dataclasses import dataclass
from typing import Iterator, List, Optional, Tuple


__all__ = [
"ReferenceEntry",
"ReferenceContext",
"ReferenceContextAccessor",
"BAGGAGE_HEADER_NAME",
"BAGGAGE_KEY_TYPE",
"BAGGAGE_KEY_ID",
"BAGGAGE_KEY_VERSION",
]

BAGGAGE_HEADER_NAME = "x-uipath-tracebaggage"

# Key names — matches service-common ReferenceHierarchyKeys
BAGGAGE_KEY_TYPE = "ref.type"
BAGGAGE_KEY_ID = "ref.id"
BAGGAGE_KEY_VERSION = "ref.v"


@dataclass(frozen=True)
class ReferenceEntry:
"""A single node in the reference hierarchy call chain."""

service_type: str
reference_id: str # UUID string
version: Optional[str] = None


class ReferenceContext:
"""Immutable, copy-on-write ordered list of reference entries.

Outermost caller first, current service appended last.
Each mutating call returns a new instance — the original is never
modified, preventing sibling spans from sharing context.

Usage::

ctx = ReferenceContext.Empty
ctx = ctx.add("maestro", process_id, "2.1.0")
ctx = ctx.add("agent", agent_id)
token = ReferenceContextAccessor.set(ctx)
try:
...
finally:
ReferenceContextAccessor.reset(token)
"""

__slots__ = ("_entries",)

def __init__(self, entries: Tuple[ReferenceEntry, ...] = ()) -> None:
self._entries: Tuple[ReferenceEntry, ...] = entries

@property
def entries(self) -> Tuple[ReferenceEntry, ...]:
return self._entries

def __len__(self) -> int:
return len(self._entries)

def __iter__(self) -> Iterator[ReferenceEntry]:
return iter(self._entries)

def __bool__(self) -> bool:
return len(self._entries) > 0

def __eq__(self, other: object) -> bool:
if not isinstance(other, ReferenceContext):
return NotImplemented
return self._entries == other._entries

def __hash__(self) -> int:
return hash(self._entries)

def add(
self,
service_type: str,
reference_id: str | uuid.UUID,
version: Optional[str] = None,
) -> "ReferenceContext":
"""Returns a new context with this entry appended (copy-on-write).

Args:
service_type: Identifier for the calling service (e.g. ``"agent"``,
``"maestro"``).
reference_id: UUID of the referenced entity (UUID object or string).
version: Optional version string.

Returns:
A new :class:`ReferenceContext` with the entry appended.
"""
if not service_type or not service_type.strip():
raise ValueError("service_type must be a non-empty string.")
if isinstance(reference_id, uuid.UUID):
id_str = str(reference_id)
elif isinstance(reference_id, str):
id_str = reference_id
else:
raise TypeError("reference_id must be a UUID or string.")
Comment on lines +111 to +116
entry = ReferenceEntry(
service_type=service_type,
reference_id=id_str,
version=version if version and version.strip() else None,
)
return ReferenceContext(self._entries + (entry,))

def to_wire_list(self) -> List[dict]:
"""Serialize to the ``referenceHierarchy`` wire format.

Returns:
A list of dicts suitable for JSON serialization as
``Context.referenceHierarchy`` in the span payload.
"""
result = []
for e in self._entries:
item: dict = {
"serviceType": e.service_type,
"referenceId": e.reference_id,
}
if e.version:
item["version"] = e.version
result.append(item)
return result
Comment on lines +124 to +140

@staticmethod
def from_baggage_header(header_value: Optional[str]) -> "ReferenceContext":

Check failure on line 143 in packages/uipath-platform/src/uipath/platform/common/_reference_context.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 22 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=UiPath_uipath-python&issues=AZ6y9RZpCNFXwN9kyrtF&open=AZ6y9RZpCNFXwN9kyrtF&pullRequest=1708
"""Parse ``x-uipath-tracebaggage`` header value into a ReferenceContext.

Only entries that carry the ``ref.*`` shape (type + valid UUID id) are
included. Malformed or plain-KV entries are silently skipped so a bad
header from an upstream service cannot crash this one.

Args:
header_value: Raw header string, e.g.
``"ref.type=agent;ref.id=<uuid>;ref.v=1.0,ref.type=maestro;ref.id=<uuid>"``

Returns:
Parsed :class:`ReferenceContext`, or :attr:`ReferenceContext.Empty`
if the header is absent, empty, or contains no valid ref entries.
"""
if not header_value or not header_value.strip():
return ReferenceContext.Empty

entries: List[ReferenceEntry] = []
for raw_entry in header_value.split(","):
entry_text = raw_entry.strip()
if not entry_text:
continue
props: dict[str, str] = {}
for raw_pair in entry_text.split(";"):
pair_text = raw_pair.strip()
eq = pair_text.find("=")
if eq <= 0 or eq >= len(pair_text) - 1:
continue
key = pair_text[:eq].strip()
value = pair_text[eq + 1:].strip()
if key and value:
props[key] = value

type_v = props.get(BAGGAGE_KEY_TYPE)
id_v = props.get(BAGGAGE_KEY_ID)
if not type_v or not id_v:
continue
try:
parsed_uuid = uuid.UUID(id_v)
except (ValueError, AttributeError):
continue
entries.append(
ReferenceEntry(
service_type=type_v,
reference_id=str(parsed_uuid),
version=props.get(BAGGAGE_KEY_VERSION) or None,
)
)

if not entries:
return ReferenceContext.Empty
return ReferenceContext(tuple(entries))

def to_baggage_header_value(self) -> str:
"""Serialize to ``x-uipath-tracebaggage`` header value.

Returns:
Comma-separated entries; each is a semicolon-separated list of
``key=value`` pairs. Empty context returns ``""``.
"""
if not self._entries:
return ""
parts: List[str] = []
for e in self._entries:
kv = f"{BAGGAGE_KEY_TYPE}={e.service_type};{BAGGAGE_KEY_ID}={e.reference_id}"
if e.version:
kv += f";{BAGGAGE_KEY_VERSION}={e.version}"
parts.append(kv)
return ",".join(parts)


# Assigned after class body so ReferenceContext is fully bound.
ReferenceContext.Empty = ReferenceContext() # type: ignore[attr-defined]


class ReferenceContextAccessor:
"""Ambient accessor for the current :class:`ReferenceContext`.

Backed by :mod:`contextvars` so the value propagates across ``await``
boundaries without being threaded through every call signature.
Comment on lines +219 to +223

Usage::

token = ReferenceContextAccessor.set(ctx)
try:
... # code here sees ReferenceContextAccessor.get() == ctx
finally:
ReferenceContextAccessor.reset(token)
"""

_current: contextvars.ContextVar[Optional[ReferenceContext]] = (
contextvars.ContextVar("uipath_reference_context", default=None)
)

@classmethod
def get(cls) -> Optional[ReferenceContext]:
"""Return the current ambient context, or ``None`` if not set."""
return cls._current.get()

@classmethod
def set(cls, value: Optional[ReferenceContext]) -> contextvars.Token:
"""Set the ambient context. Returns a token for restoration.

Pass the token to :meth:`reset` in a ``finally`` block.
"""
return cls._current.set(value)

@classmethod
def reset(cls, token: contextvars.Token) -> None:
"""Restore the ambient context to its prior value."""
cls._current.reset(token)
Loading
Loading