Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased


- Add support for workflow in genAI utils handler.
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4366](#4366))
- Enrich ToolCall type, breaking change: usage of ToolCall class renamed to ToolCallRequest
([#4218](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4218))
- Add EmbeddingInvocation span lifecycle support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

from __future__ import annotations

import logging
import timeit
from contextlib import contextmanager
from typing import Iterator, TypeVar
Expand All @@ -83,18 +84,38 @@
_apply_embedding_finish_attributes,
_apply_error_attributes,
_apply_llm_finish_attributes,
_apply_workflow_finish_attributes,
_get_embedding_span_name,
_get_llm_span_name,
_get_workflow_span_name,
_maybe_emit_llm_event,
)
from opentelemetry.util.genai.types import (
EmbeddingInvocation,
Error,
GenAIInvocation,
LLMInvocation,
WorkflowInvocation,
)
from opentelemetry.util.genai.version import __version__

_logger = logging.getLogger(__name__)


def _safe_detach(invocation: GenAIInvocation) -> None:
"""Detach the context token if still present, as a safety net."""
if invocation.context_token is not None:
try:
otel_context.detach(invocation.context_token)
except Exception: # pylint: disable=broad-except
pass
if invocation.span is not None:
try:
invocation.span.end()
except Exception: # pylint: disable=broad-except
pass


_T = TypeVar("_T", bound=GenAIInvocation)


Expand Down Expand Up @@ -160,13 +181,19 @@ def _start(self, invocation: _T) -> _T:
"""Start a GenAI invocation and create a pending span entry."""
if isinstance(invocation, LLMInvocation):
span_name = _get_llm_span_name(invocation)
kind = SpanKind.CLIENT
elif isinstance(invocation, EmbeddingInvocation):
span_name = _get_embedding_span_name(invocation)
kind = SpanKind.CLIENT
elif isinstance(invocation, WorkflowInvocation):
span_name = _get_workflow_span_name(invocation)
kind = SpanKind.INTERNAL
else:
span_name = ""
kind = SpanKind.CLIENT
span = self._tracer.start_span(
name=span_name,
kind=SpanKind.CLIENT,
kind=kind,
)
# Record a monotonic start timestamp (seconds) for duration
# calculation using timeit.default_timer.
Expand All @@ -192,6 +219,9 @@ def _stop(self, invocation: _T) -> _T:
elif isinstance(invocation, EmbeddingInvocation):
_apply_embedding_finish_attributes(span, invocation)
self._record_embedding_metrics(invocation, span)
elif isinstance(invocation, WorkflowInvocation):
_apply_workflow_finish_attributes(span, invocation)
# TODO: Add workflow metrics when supported
finally:
# Detach context and end span even if finishing fails
otel_context.detach(invocation.context_token)
Expand Down Expand Up @@ -222,6 +252,10 @@ def _fail(self, invocation: _T, error: Error) -> _T:
self._record_embedding_metrics(
invocation, span, error_type=error_type
)
elif isinstance(invocation, WorkflowInvocation):
_apply_workflow_finish_attributes(span, invocation)
_apply_error_attributes(span, error, error_type)
# TODO: Add workflow metrics when supported
finally:
# Detach context and end span even if finishing fails
otel_context.detach(invocation.context_token)
Expand Down Expand Up @@ -304,6 +338,46 @@ def embedding(
raise
self.stop(invocation)

@contextmanager
def workflow(
self, invocation: WorkflowInvocation | None = None
) -> Iterator[WorkflowInvocation]:
"""Context manager for Workflow invocations.

Only set data attributes on the invocation object, do not modify the span or context.

Starts the span on entry. On normal exit, finalizes the invocation and ends the span.
If an exception occurs inside the context, marks the span as error, ends it, and
re-raises the original exception.
"""
if invocation is None:
invocation = WorkflowInvocation()

try:
self.start(invocation)
except Exception: # pylint: disable=broad-except
_logger.warning(
"Failed to start workflow telemetry", exc_info=True
)

try:
yield invocation
except Exception as exc:
try:
self.fail(invocation, Error(message=str(exc), type=type(exc)))
except Exception: # pylint: disable=broad-except
_logger.warning(
"Failed to record workflow failure", exc_info=True
)
_safe_detach(invocation)
raise

try:
self.stop(invocation)
except Exception: # pylint: disable=broad-except
_logger.warning("Failed to stop workflow telemetry", exc_info=True)
_safe_detach(invocation)


def get_telemetry_handler(
tracer_provider: TracerProvider | None = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
LLMInvocation,
MessagePart,
OutputMessage,
WorkflowInvocation,
)
from opentelemetry.util.genai.utils import (
ContentCapturingMode,
Expand Down Expand Up @@ -108,6 +109,13 @@ def _get_embedding_span_name(invocation: EmbeddingInvocation) -> str:
return _get_span_name(invocation)


def _get_workflow_span_name(invocation: WorkflowInvocation) -> str:
"""Get the span name for an Workflow invocation."""
operation_name = getattr(invocation, "operation_name", None) or ""
Comment thread
wrisa marked this conversation as resolved.
Outdated
name = getattr(invocation, "name", None) or ""
Comment thread
wrisa marked this conversation as resolved.
Outdated
return f"{operation_name} {name}".strip()
Comment thread
wrisa marked this conversation as resolved.
Outdated


def _get_llm_messages_attributes_for_span(
input_messages: list[InputMessage],
output_messages: list[OutputMessage],
Expand Down Expand Up @@ -345,6 +353,71 @@ def _get_llm_response_attributes(
return {key: value for key, value in optional_attrs if value is not None}


def _apply_workflow_finish_attributes(
span: Span, invocation: WorkflowInvocation
) -> None:
"""Apply attributes/messages common to finish() paths."""

# Build all attributes by reusing the attribute getter functions
attributes: dict[str, Any] = {}
attributes.update(_get_workflow_common_attributes(invocation))
attributes.update(
_get_workflow_messages_attributes_for_span(
invocation.input_messages,
invocation.output_messages,
)
)
attributes.update(invocation.attributes)

# Set all attributes on the span
if attributes:
span.set_attributes(attributes)


def _get_workflow_common_attributes(
invocation: WorkflowInvocation,
) -> dict[str, Any]:
"""Get common Workflow attributes shared by finish() and error() paths.

Returns a dictionary of attributes.
"""
return {
GenAI.GEN_AI_OPERATION_NAME: invocation.operation_name,
}


def _get_workflow_messages_attributes_for_span(
Comment thread
wrisa marked this conversation as resolved.
Outdated
input_messages: list[InputMessage],
output_messages: list[OutputMessage],
) -> dict[str, Any]:
"""Get message attributes formatted for span (JSON string format).

Returns empty dict if not in experimental mode or content capturing is disabled.
"""
if not is_experimental_mode() or get_content_capturing_mode() not in (
ContentCapturingMode.SPAN_ONLY,
ContentCapturingMode.SPAN_AND_EVENT,
):
return {}

optional_attrs = (
(
GenAI.GEN_AI_INPUT_MESSAGES,
gen_ai_json_dumps([asdict(m) for m in input_messages])
if input_messages
else None,
),
(
GenAI.GEN_AI_OUTPUT_MESSAGES,
gen_ai_json_dumps([asdict(m) for m in output_messages])
if output_messages
else None,
),
)

return {key: value for key, value in optional_attrs if value is not None}


def _get_embedding_response_attributes(
invocation: EmbeddingInvocation,
) -> dict[str, Any]:
Expand All @@ -365,9 +438,12 @@ def _get_embedding_response_attributes(
"_get_llm_response_attributes",
"_get_llm_span_name",
"_maybe_emit_llm_event",
"_get_workflow_common_attributes",
"_apply_workflow_finish_attributes",
"_apply_embedding_finish_attributes",
"_get_embedding_common_attributes",
"_get_embedding_request_attributes",
"_get_embedding_response_attributes",
"_get_embedding_span_name",
"_get_workflow_span_name",
]
Loading
Loading