From 8f2f3e1436d114b5f6b6adc51857275b2783b03e Mon Sep 17 00:00:00 2001 From: etserend Date: Thu, 12 Mar 2026 13:11:55 -0500 Subject: [PATCH 1/2] GenAI Utils | Agent metrics and events --- .../src/opentelemetry/util/genai/handler.py | 260 +++++++++- .../src/opentelemetry/util/genai/metrics.py | 75 ++- .../opentelemetry/util/genai/span_utils.py | 268 +++++++++++ .../src/opentelemetry/util/genai/types.py | 122 +++++ .../tests/test_handler_agent.py | 449 ++++++++++++++++++ 5 files changed, 1149 insertions(+), 25 deletions(-) create mode 100644 util/opentelemetry-util-genai/tests/test_handler_agent.py diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 54e626deaa..352a2fa174 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -60,9 +60,10 @@ from __future__ import annotations +import logging import timeit from contextlib import contextmanager -from typing import Iterator +from typing import Callable, Iterator, TypeVar from opentelemetry import context as otel_context from opentelemetry._logs import ( @@ -80,13 +81,60 @@ ) from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.span_utils import ( + _apply_agent_finish_attributes, + _apply_creation_finish_attributes, _apply_error_attributes, _apply_llm_finish_attributes, + _maybe_emit_agent_event, _maybe_emit_llm_event, ) -from opentelemetry.util.genai.types import Error, LLMInvocation +from opentelemetry.util.genai.types import ( + AgentCreation, + AgentInvocation, + Error, + GenAIInvocation, + LLMInvocation, +) from opentelemetry.util.genai.version import __version__ +_logger = logging.getLogger(__name__) + +_T = TypeVar("_T", bound=GenAIInvocation) + + +@contextmanager +def _lifecycle_context( + invocation: _T, + start: Callable[[_T], _T], + stop: Callable[[_T], _T], + fail: Callable[[_T, Error], _T], + label: str, +) -> Iterator[_T]: + """Shared lifecycle context manager for GenAI invocations. + + Wraps start/stop/fail calls with error handling so SDK-internal + errors never propagate to the caller. + """ + try: + start(invocation) + except Exception: + _logger.warning("Failed to start %s span", label, exc_info=True) + try: + yield invocation + except Exception as exc: + try: + fail(invocation, Error(message=str(exc), type=type(exc))) + except Exception: + _logger.warning( + "Failed to record %s failure", label, exc_info=True + ) + raise + else: + try: + stop(invocation) + except Exception: + _logger.warning("Failed to stop %s span", label, exc_info=True) + class TelemetryHandler: """ @@ -156,13 +204,13 @@ def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disab # TODO: Provide feedback that this invocation was not started return invocation - span = invocation.span - _apply_llm_finish_attributes(span, invocation) - self._record_llm_metrics(invocation, span) - _maybe_emit_llm_event(self._logger, span, invocation) - # Detach context and end span - otel_context.detach(invocation.context_token) - span.end() + try: + _apply_llm_finish_attributes(invocation.span, invocation) + self._record_llm_metrics(invocation, invocation.span) + _maybe_emit_llm_event(self._logger, invocation.span, invocation) + finally: + otel_context.detach(invocation.context_token) + invocation.span.end() return invocation def fail_llm( # pylint: disable=no-self-use @@ -173,15 +221,19 @@ def fail_llm( # pylint: disable=no-self-use # TODO: Provide feedback that this invocation was not started return invocation - span = invocation.span - _apply_llm_finish_attributes(invocation.span, invocation) - _apply_error_attributes(invocation.span, error) - error_type = getattr(error.type, "__qualname__", None) - self._record_llm_metrics(invocation, span, error_type=error_type) - _maybe_emit_llm_event(self._logger, span, invocation, error) - # Detach context and end span - otel_context.detach(invocation.context_token) - span.end() + try: + _apply_llm_finish_attributes(invocation.span, invocation) + _apply_error_attributes(invocation.span, error) + error_type = getattr(error.type, "__qualname__", None) + self._record_llm_metrics( + invocation, invocation.span, error_type=error_type + ) + _maybe_emit_llm_event( + self._logger, invocation.span, invocation, error + ) + finally: + otel_context.detach(invocation.context_token) + invocation.span.end() return invocation @contextmanager @@ -200,13 +252,173 @@ def llm( invocation = LLMInvocation( request_model="", ) - self.start_llm(invocation) + with _lifecycle_context( + invocation, self.start_llm, self.stop_llm, self.fail_llm, "llm" + ) as inv: + yield inv + + # ---- Agent invocation lifecycle ---- + + def start_agent( + self, + invocation: AgentInvocation, + ) -> AgentInvocation: + """Start an agent invocation and create a pending span entry.""" + span_name = ( + f"{invocation.operation_name} {invocation.agent_name}".strip() + ) + kind = SpanKind.CLIENT if invocation.is_remote else SpanKind.INTERNAL + span = self._tracer.start_span( + name=span_name, + kind=kind, + ) + invocation.monotonic_start_s = timeit.default_timer() + invocation.span = span + invocation.context_token = otel_context.attach( + set_span_in_context(span) + ) + return invocation + + def _record_agent_metrics( + self, + invocation: AgentInvocation, + span: Span | None = None, + *, + error_type: str | None = None, + ) -> None: + if self._metrics_recorder is None or span is None: + return + self._metrics_recorder.record_agent( + span, + invocation, + error_type=error_type, + ) + + def stop_agent(self, invocation: AgentInvocation) -> AgentInvocation: + """Finalize an agent invocation successfully and end its span.""" + if invocation.context_token is None or invocation.span is None: + return invocation + + try: + _apply_agent_finish_attributes(invocation.span, invocation) + self._record_agent_metrics(invocation, invocation.span) + _maybe_emit_agent_event(self._logger, invocation.span, invocation) + finally: + otel_context.detach(invocation.context_token) + invocation.span.end() + return invocation + + def fail_agent( + self, invocation: AgentInvocation, error: Error + ) -> AgentInvocation: + """Fail an agent invocation and end its span with error status.""" + if invocation.context_token is None or invocation.span is None: + return invocation + + try: + _apply_agent_finish_attributes(invocation.span, invocation) + _apply_error_attributes(invocation.span, error) + error_type = getattr(error.type, "__qualname__", None) + self._record_agent_metrics( + invocation, invocation.span, error_type=error_type + ) + _maybe_emit_agent_event( + self._logger, invocation.span, invocation, error + ) + finally: + otel_context.detach(invocation.context_token) + invocation.span.end() + return invocation + + @contextmanager + def agent( + self, invocation: AgentInvocation | None = None + ) -> Iterator[AgentInvocation]: + """Context manager for agent invocations. + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + if invocation is None: + invocation = AgentInvocation() + with _lifecycle_context( + invocation, + self.start_agent, + self.stop_agent, + self.fail_agent, + "agent", + ) as inv: + yield inv + + # ---- Agent creation lifecycle ---- + + def start_create_agent( + self, + creation: AgentCreation, + ) -> AgentCreation: + """Start an agent creation and create a pending span entry.""" + span_name = f"{creation.operation_name} {creation.agent_name}".strip() + span = self._tracer.start_span( + name=span_name, + kind=SpanKind.CLIENT, + ) + creation.monotonic_start_s = timeit.default_timer() + creation.span = span + creation.context_token = otel_context.attach(set_span_in_context(span)) + return creation + + def stop_create_agent(self, creation: AgentCreation) -> AgentCreation: # pylint: disable=no-self-use + """Finalize an agent creation successfully and end its span.""" + if creation.context_token is None or creation.span is None: + return creation + try: - yield invocation - except Exception as exc: - self.fail_llm(invocation, Error(message=str(exc), type=type(exc))) - raise - self.stop_llm(invocation) + _apply_creation_finish_attributes(creation.span, creation) + finally: + otel_context.detach(creation.context_token) + creation.span.end() + return creation + + def fail_create_agent( # pylint: disable=no-self-use + self, creation: AgentCreation, error: Error + ) -> AgentCreation: + """Fail an agent creation and end its span with error status.""" + if creation.context_token is None or creation.span is None: + return creation + + try: + _apply_creation_finish_attributes(creation.span, creation) + _apply_error_attributes(creation.span, error) + finally: + otel_context.detach(creation.context_token) + creation.span.end() + return creation + + @contextmanager + def create_agent( + self, creation: AgentCreation | None = None + ) -> Iterator[AgentCreation]: + """Context manager for agent creation. + + Only set data attributes on the creation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the creation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + if creation is None: + creation = AgentCreation() + with _lifecycle_context( + creation, + self.start_create_agent, + self.stop_create_agent, + self.fail_create_agent, + "create_agent", + ) as c: + yield c def get_telemetry_handler( diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py index 075cbe60a1..6fc2fe6dcc 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py @@ -18,7 +18,7 @@ create_duration_histogram, create_token_histogram, ) -from opentelemetry.util.genai.types import LLMInvocation +from opentelemetry.util.genai.types import AgentInvocation, LLMInvocation from opentelemetry.util.types import AttributeValue @@ -105,5 +105,78 @@ def record( context=span_context, ) + def record_agent( + self, + span: Optional[Span], + invocation: AgentInvocation, + *, + error_type: Optional[str] = None, + ) -> None: + """Record duration and token metrics for an agent invocation.""" + + if span is None: + return + + token_counts: list[tuple[int, str]] = [] + if invocation.input_tokens is not None: + token_counts.append( + ( + invocation.input_tokens, + GenAI.GenAiTokenTypeValues.INPUT.value, + ) + ) + if invocation.output_tokens is not None: + token_counts.append( + ( + invocation.output_tokens, + GenAI.GenAiTokenTypeValues.OUTPUT.value, + ) + ) + + attributes: Dict[str, AttributeValue] = { + GenAI.GEN_AI_OPERATION_NAME: invocation.operation_name + } + if invocation.request_model: + attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model + if invocation.provider: + attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider + if invocation.response_model_name: + attributes[GenAI.GEN_AI_RESPONSE_MODEL] = ( + invocation.response_model_name + ) + if invocation.server_address: + attributes[server_attributes.SERVER_ADDRESS] = ( + invocation.server_address + ) + if invocation.server_port is not None: + attributes[server_attributes.SERVER_PORT] = invocation.server_port + if invocation.metric_attributes: + attributes.update(invocation.metric_attributes) + + duration_seconds: Optional[float] = None + if invocation.monotonic_start_s is not None: + duration_seconds = max( + timeit.default_timer() - invocation.monotonic_start_s, + 0.0, + ) + + span_context = set_span_in_context(span) + if error_type: + attributes[error_attributes.ERROR_TYPE] = error_type + + if duration_seconds is not None: + self._duration_histogram.record( + duration_seconds, + attributes=attributes, + context=span_context, + ) + + for token_count, token_type in token_counts: + self._token_histogram.record( + token_count, + attributes=attributes | {GenAI.GEN_AI_TOKEN_TYPE: token_type}, + context=span_context, + ) + __all__ = ["InvocationMetricsRecorder"] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py index 889994436f..64f98e4445 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py @@ -32,11 +32,14 @@ from opentelemetry.trace.propagation import set_span_in_context from opentelemetry.trace.status import Status, StatusCode from opentelemetry.util.genai.types import ( + AgentCreation, + AgentInvocation, Error, InputMessage, LLMInvocation, MessagePart, OutputMessage, + _BaseAgent, ) from opentelemetry.util.genai.utils import ( ContentCapturingMode, @@ -47,6 +50,31 @@ ) +def _agent_attr(name: str, fallback: str) -> str: + """Get a semconv attribute, falling back to a string literal if not yet in the package.""" + return getattr(GenAI, name, fallback) + + +_GEN_AI_AGENT_NAME = _agent_attr("GEN_AI_AGENT_NAME", "gen_ai.agent.name") +_GEN_AI_AGENT_ID = _agent_attr("GEN_AI_AGENT_ID", "gen_ai.agent.id") +_GEN_AI_AGENT_DESCRIPTION = _agent_attr( + "GEN_AI_AGENT_DESCRIPTION", "gen_ai.agent.description" +) +_GEN_AI_AGENT_VERSION = _agent_attr( + "GEN_AI_AGENT_VERSION", "gen_ai.agent.version" +) +_GEN_AI_CONVERSATION_ID = _agent_attr( + "GEN_AI_CONVERSATION_ID", "gen_ai.conversation.id" +) +_GEN_AI_DATA_SOURCE_ID = _agent_attr( + "GEN_AI_DATA_SOURCE_ID", "gen_ai.data_source.id" +) +_GEN_AI_OUTPUT_TYPE = _agent_attr("GEN_AI_OUTPUT_TYPE", "gen_ai.output.type") +_GEN_AI_TOOL_DEFINITIONS = _agent_attr( + "GEN_AI_TOOL_DEFINITIONS", "gen_ai.tool.definitions" +) + + def _get_llm_common_attributes( invocation: LLMInvocation, ) -> dict[str, Any]: @@ -77,6 +105,7 @@ def _get_llm_messages_attributes_for_span( input_messages: list[InputMessage], output_messages: list[OutputMessage], system_instruction: list[MessagePart] | None = None, + tool_definitions: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """Get message attributes formatted for span (JSON string format). @@ -107,6 +136,10 @@ def _get_llm_messages_attributes_for_span( if system_instruction else None, ), + ( + _GEN_AI_TOOL_DEFINITIONS, + gen_ai_json_dumps(tool_definitions) if tool_definitions else None, + ), ) return {key: value for key, value in optional_attrs if value is not None} @@ -279,6 +312,230 @@ def _get_llm_response_attributes( return {key: value for key, value in optional_attrs if value is not None} +def _get_base_agent_common_attributes( + agent: _BaseAgent, +) -> dict[str, Any]: + """Get common attributes shared by all agent operations (invoke_agent, create_agent).""" + optional_attrs = ( + (GenAI.GEN_AI_REQUEST_MODEL, agent.request_model), + (GenAI.GEN_AI_PROVIDER_NAME, agent.provider), + (_GEN_AI_AGENT_NAME, agent.agent_name), + (_GEN_AI_AGENT_ID, agent.agent_id), + (_GEN_AI_AGENT_DESCRIPTION, agent.agent_description), + (_GEN_AI_AGENT_VERSION, agent.agent_version), + (server_attributes.SERVER_ADDRESS, agent.server_address), + (server_attributes.SERVER_PORT, agent.server_port), + ) + + return { + GenAI.GEN_AI_OPERATION_NAME: agent.operation_name, + **{key: value for key, value in optional_attrs if value is not None}, + } + + +def _get_base_agent_span_name(agent: _BaseAgent) -> str: + """Get the span name for any agent operation.""" + if agent.agent_name: + return f"{agent.operation_name} {agent.agent_name}" + return agent.operation_name + + +def _get_agent_common_attributes( + invocation: AgentInvocation, +) -> dict[str, Any]: + """Get common agent invocation attributes shared by finish() and error() paths.""" + attrs = _get_base_agent_common_attributes(invocation) + + # Invoke-specific conditionally required attributes + invoke_attrs = ( + (_GEN_AI_CONVERSATION_ID, invocation.conversation_id), + (_GEN_AI_DATA_SOURCE_ID, invocation.data_source_id), + (_GEN_AI_OUTPUT_TYPE, invocation.output_type), + ) + attrs.update( + {key: value for key, value in invoke_attrs if value is not None} + ) + + return attrs + + +def _get_agent_span_name(invocation: AgentInvocation) -> str: + """Get the span name for an agent invocation.""" + return _get_base_agent_span_name(invocation) + + +def _get_agent_request_attributes( + invocation: AgentInvocation, +) -> dict[str, Any]: + """Get GenAI request semantic convention attributes for agent invocation.""" + optional_attrs = ( + (GenAI.GEN_AI_REQUEST_TEMPERATURE, invocation.temperature), + (GenAI.GEN_AI_REQUEST_TOP_P, invocation.top_p), + (GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, invocation.frequency_penalty), + (GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, invocation.presence_penalty), + (GenAI.GEN_AI_REQUEST_MAX_TOKENS, invocation.max_tokens), + (GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, invocation.stop_sequences), + (GenAI.GEN_AI_REQUEST_SEED, invocation.seed), + (GenAI.GEN_AI_REQUEST_CHOICE_COUNT, invocation.choice_count), + ) + + return {key: value for key, value in optional_attrs if value is not None} + + +def _get_agent_response_attributes( + invocation: AgentInvocation, +) -> dict[str, Any]: + """Get GenAI response semantic convention attributes for agent invocation.""" + finish_reasons: list[str] | None + if invocation.finish_reasons is not None: + finish_reasons = invocation.finish_reasons + elif invocation.output_messages: + finish_reasons = [ + message.finish_reason + for message in invocation.output_messages + if message.finish_reason + ] + else: + finish_reasons = None + + unique_finish_reasons = ( + sorted(set(finish_reasons)) if finish_reasons else None + ) + + optional_attrs = ( + ( + GenAI.GEN_AI_RESPONSE_FINISH_REASONS, + unique_finish_reasons if unique_finish_reasons else None, + ), + (GenAI.GEN_AI_RESPONSE_MODEL, invocation.response_model_name), + (GenAI.GEN_AI_RESPONSE_ID, invocation.response_id), + (GenAI.GEN_AI_USAGE_INPUT_TOKENS, invocation.input_tokens), + (GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, invocation.output_tokens), + ( + GenAI.GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS, + invocation.cache_creation_input_tokens, + ), + ( + GenAI.GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS, + invocation.cache_read_input_tokens, + ), + ) + + return {key: value for key, value in optional_attrs if value is not None} + + +def _apply_agent_finish_attributes( + span: Span, invocation: AgentInvocation +) -> None: + """Apply attributes/messages common to agent finish() paths.""" + span.update_name(_get_agent_span_name(invocation)) + + attributes: dict[str, Any] = {} + attributes.update(_get_agent_common_attributes(invocation)) + attributes.update(_get_agent_request_attributes(invocation)) + attributes.update(_get_agent_response_attributes(invocation)) + attributes.update( + _get_llm_messages_attributes_for_span( + invocation.input_messages, + invocation.output_messages, + invocation.system_instruction, + invocation.tool_definitions, + ) + ) + attributes.update(invocation.attributes) + + if attributes: + span.set_attributes(attributes) + + +def _get_creation_common_attributes( + creation: AgentCreation, +) -> dict[str, Any]: + """Get common agent creation attributes.""" + return _get_base_agent_common_attributes(creation) + + +def _get_creation_span_name(creation: AgentCreation) -> str: + """Get the span name for an agent creation.""" + return _get_base_agent_span_name(creation) + + +def _apply_creation_finish_attributes( + span: Span, creation: AgentCreation +) -> None: + """Apply attributes common to agent creation finish() paths.""" + span.update_name(_get_creation_span_name(creation)) + + attributes: dict[str, Any] = {} + attributes.update(_get_creation_common_attributes(creation)) + + # System instructions (Opt-In) + if ( + is_experimental_mode() + and get_content_capturing_mode() + in ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + and creation.system_instruction + ): + attributes[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS] = gen_ai_json_dumps( + [asdict(p) for p in creation.system_instruction] + ) + + attributes.update(creation.attributes) + + if attributes: + span.set_attributes(attributes) + + +def _maybe_emit_agent_event( + logger: Logger | None, + span: Span, + invocation: AgentInvocation, + error: Error | None = None, +) -> None: + """Emit a gen_ai.client.inference.operation.details event for agent invocation.""" + if not is_experimental_mode() or not should_emit_event() or logger is None: + return + + attributes: dict[str, Any] = {} + attributes.update(_get_agent_common_attributes(invocation)) + attributes.update(_get_agent_request_attributes(invocation)) + attributes.update(_get_agent_response_attributes(invocation)) + + # Event uses structured format for messages + if get_content_capturing_mode() in ( + ContentCapturingMode.EVENT_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ): + if invocation.input_messages: + attributes[GenAI.GEN_AI_INPUT_MESSAGES] = [ + asdict(m) for m in invocation.input_messages + ] + if invocation.output_messages: + attributes[GenAI.GEN_AI_OUTPUT_MESSAGES] = [ + asdict(m) for m in invocation.output_messages + ] + if invocation.system_instruction: + attributes[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS] = [ + asdict(p) for p in invocation.system_instruction + ] + if invocation.tool_definitions: + attributes[_GEN_AI_TOOL_DEFINITIONS] = invocation.tool_definitions + + if error is not None: + attributes[error_attributes.ERROR_TYPE] = error.type.__qualname__ + + context = set_span_in_context(span, get_current()) + event = LogRecord( + event_name="gen_ai.client.inference.operation.details", + attributes=attributes, + context=context, + ) + logger.emit(event) + + __all__ = [ "_apply_llm_finish_attributes", "_apply_error_attributes", @@ -287,4 +544,15 @@ def _get_llm_response_attributes( "_get_llm_response_attributes", "_get_llm_span_name", "_maybe_emit_llm_event", + "_get_base_agent_common_attributes", + "_get_base_agent_span_name", + "_apply_agent_finish_attributes", + "_apply_creation_finish_attributes", + "_get_agent_common_attributes", + "_get_agent_request_attributes", + "_get_agent_response_attributes", + "_get_agent_span_name", + "_get_creation_common_attributes", + "_get_creation_span_name", + "_maybe_emit_agent_event", ] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 0e86885f20..88b62e5b3b 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -236,6 +236,128 @@ class LLMInvocation(GenAIInvocation): monotonic_start_s: float | None = None +@dataclass +class _BaseAgent(GenAIInvocation): + """ + Shared base class for agent lifecycle types (AgentInvocation, AgentCreation). + + Contains fields common to all agent operations: identity, provider, + model, system instructions, server info, and telemetry plumbing. + + Follows semconv for GenAI agent spans: + https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md + + Do not instantiate directly — use AgentInvocation or AgentCreation. + """ + + # Agent identity + agent_name: str | None = None + agent_id: str | None = None + agent_description: str | None = None + agent_version: str | None = None + + # Operation + operation_name: str = "" + provider: str | None = None + + # Request + request_model: str | None = None + + # Content (Opt-In) + system_instruction: list[MessagePart] = field( + default_factory=_new_system_instruction + ) + + # Server + server_address: str | None = None + server_port: int | None = None + + attributes: dict[str, Any] = field(default_factory=_new_str_any_dict) + """ + Additional attributes to set on spans and/or events. + """ + # Monotonic start time in seconds (from timeit.default_timer) used + # for duration calculations to avoid mixing clock sources. This is + # populated by the TelemetryHandler when starting an invocation. + monotonic_start_s: float | None = None + + +@dataclass +class AgentCreation(_BaseAgent): + """ + Represents agent creation/initialization (create_agent operation). + + Follows semconv for GenAI agent spans: + https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#create-agent-span + + When creating an AgentCreation object, only update the data attributes. + The span and context_token attributes are set by the TelemetryHandler. + """ + + # Override default operation name + operation_name: str = "create_agent" + + +@dataclass +class AgentInvocation(_BaseAgent): + """ + Represents an agent invocation (invoke_agent operation). + + Follows semconv for GenAI agent spans: + https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#invoke-agent-span + + When creating an AgentInvocation object, only update the data attributes. + The span and context_token attributes are set by the TelemetryHandler. + """ + + # Override default operation name + operation_name: str = "invoke_agent" + + # Invoke-specific request attributes (Cond. Required) + conversation_id: str | None = None + data_source_id: str | None = None + output_type: str | None = None + + # Request parameters (Recommended) + temperature: float | None = None + top_p: float | None = None + frequency_penalty: float | None = None + presence_penalty: float | None = None + max_tokens: int | None = None + stop_sequences: list[str] | None = None + seed: int | None = None + choice_count: int | None = None + + # Response (Recommended) + response_model_name: str | None = None + response_id: str | None = None + finish_reasons: list[str] | None = None + input_tokens: int | None = None + output_tokens: int | None = None + cache_creation_input_tokens: int | None = None + cache_read_input_tokens: int | None = None + + # Content (Opt-In) — input/output messages and tool definitions + input_messages: list[InputMessage] = field( + default_factory=_new_input_messages + ) + output_messages: list[OutputMessage] = field( + default_factory=_new_output_messages + ) + tool_definitions: list[dict[str, Any]] | None = None + + # Span kind: CLIENT for remote agents, INTERNAL for in-process agents + is_remote: bool = True + + metric_attributes: dict[str, Any] = field( + default_factory=_new_str_any_dict + ) + """ + Additional attributes to set on metrics. Must be of a low cardinality. + These attributes will not be set on spans or events. + """ + + @dataclass class Error: message: str diff --git a/util/opentelemetry-util-genai/tests/test_handler_agent.py b/util/opentelemetry-util-genai/tests/test_handler_agent.py new file mode 100644 index 0000000000..19850c6a30 --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_handler_agent.py @@ -0,0 +1,449 @@ +from __future__ import annotations + +from typing import Any, Dict, List +from unittest import TestCase +from unittest.mock import patch + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.trace import SpanKind +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + AgentCreation, + AgentInvocation, + Error, + InputMessage, + OutputMessage, + Text, +) + + +class TestAgentInvocationHandler(TestCase): + def setUp(self) -> None: + self.span_exporter = InMemorySpanExporter() + self.tracer_provider = TracerProvider() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + + def _make_handler(self) -> TelemetryHandler: + return TelemetryHandler( + tracer_provider=self.tracer_provider, + ) + + # ---- start/stop agent ---- + + def test_start_stop_agent_creates_span(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation( + agent_name="Math Tutor", + provider="openai", + request_model="gpt-4", + ) + handler.start_agent(invocation) + handler.stop_agent(invocation) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.name, "invoke_agent Math Tutor") + self.assertEqual( + span.attributes[GenAI.GEN_AI_OPERATION_NAME], "invoke_agent" + ) + self.assertEqual( + span.attributes[GenAI.GEN_AI_AGENT_NAME], "Math Tutor" + ) + self.assertEqual(span.attributes[GenAI.GEN_AI_PROVIDER_NAME], "openai") + self.assertEqual(span.attributes[GenAI.GEN_AI_REQUEST_MODEL], "gpt-4") + + def test_agent_span_kind_client_by_default(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation(agent_name="Remote Agent", is_remote=True) + handler.start_agent(invocation) + handler.stop_agent(invocation) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].kind, SpanKind.CLIENT) + + def test_agent_span_kind_internal_for_local(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation(agent_name="Local Agent", is_remote=False) + handler.start_agent(invocation) + handler.stop_agent(invocation) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].kind, SpanKind.INTERNAL) + + def test_agent_with_all_attributes(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation( + agent_name="Full Agent", + agent_id="agent-123", + agent_description="A test agent", + agent_version="1.0.0", + provider="openai", + request_model="gpt-4", + conversation_id="conv-456", + data_source_id="ds-789", + output_type="text", + temperature=0.7, + top_p=0.9, + max_tokens=1000, + seed=42, + server_address="api.openai.com", + server_port=443, + ) + handler.start_agent(invocation) + invocation.response_model_name = "gpt-4-0613" + invocation.response_id = "resp-abc" + invocation.input_tokens = 100 + invocation.output_tokens = 200 + invocation.finish_reasons = ["stop"] + handler.stop_agent(invocation) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + attrs = spans[0].attributes + self.assertEqual(attrs[GenAI.GEN_AI_AGENT_NAME], "Full Agent") + self.assertEqual(attrs[GenAI.GEN_AI_AGENT_ID], "agent-123") + self.assertEqual(attrs[GenAI.GEN_AI_AGENT_DESCRIPTION], "A test agent") + self.assertEqual(attrs[GenAI.GEN_AI_RESPONSE_MODEL], "gpt-4-0613") + self.assertEqual(attrs[GenAI.GEN_AI_RESPONSE_ID], "resp-abc") + self.assertEqual(attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS], 100) + self.assertEqual(attrs[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS], 200) + self.assertEqual( + tuple(attrs[GenAI.GEN_AI_RESPONSE_FINISH_REASONS]), ("stop",) + ) + + # ---- fail agent ---- + + def test_fail_agent_sets_error_status(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation( + agent_name="Failing Agent", provider="openai" + ) + handler.start_agent(invocation) + error = Error(message="agent crashed", type=RuntimeError) + handler.fail_agent(invocation, error) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.status.description, "agent crashed") + self.assertEqual(span.attributes.get("error.type"), "RuntimeError") + + # ---- context manager ---- + + def test_agent_context_manager_success(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation( + agent_name="CM Agent", provider="openai", request_model="gpt-4" + ) + with handler.agent(invocation) as inv: + inv.input_tokens = 10 + inv.output_tokens = 20 + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].name, "invoke_agent CM Agent") + + def test_agent_context_manager_error(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation(agent_name="Error Agent") + with self.assertRaises(ValueError): + with handler.agent(invocation): + raise ValueError("test error") + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].attributes.get("error.type"), "ValueError") + + def test_agent_context_manager_default_invocation(self) -> None: + handler = self._make_handler() + with handler.agent() as inv: + inv.agent_name = "Dynamic Agent" + inv.provider = "openai" + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + # ---- not started ---- + + def test_stop_agent_without_start_is_noop(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation(agent_name="Not Started") + result = handler.stop_agent(invocation) + self.assertIs(result, invocation) + self.assertEqual(len(self.span_exporter.get_finished_spans()), 0) + + def test_fail_agent_without_start_is_noop(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation(agent_name="Not Started") + error = Error(message="boom", type=RuntimeError) + result = handler.fail_agent(invocation, error) + self.assertIs(result, invocation) + self.assertEqual(len(self.span_exporter.get_finished_spans()), 0) + + +class TestAgentCreationHandler(TestCase): + def setUp(self) -> None: + self.span_exporter = InMemorySpanExporter() + self.tracer_provider = TracerProvider() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + + def _make_handler(self) -> TelemetryHandler: + return TelemetryHandler( + tracer_provider=self.tracer_provider, + ) + + def test_start_stop_create_agent(self) -> None: + handler = self._make_handler() + creation = AgentCreation( + agent_name="New Agent", + agent_id="agent-new-1", + provider="openai", + request_model="gpt-4", + ) + handler.start_create_agent(creation) + handler.stop_create_agent(creation) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.name, "create_agent New Agent") + self.assertEqual( + span.attributes[GenAI.GEN_AI_OPERATION_NAME], "create_agent" + ) + self.assertEqual(span.attributes[GenAI.GEN_AI_AGENT_NAME], "New Agent") + + def test_create_agent_span_kind_is_client(self) -> None: + handler = self._make_handler() + creation = AgentCreation(agent_name="Client Agent") + handler.start_create_agent(creation) + handler.stop_create_agent(creation) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(spans[0].kind, SpanKind.CLIENT) + + def test_create_agent_with_all_base_attributes(self) -> None: + handler = self._make_handler() + creation = AgentCreation( + agent_name="Full Agent", + agent_id="agent-123", + agent_description="A test agent", + agent_version="1.0.0", + provider="openai", + request_model="gpt-4", + server_address="api.openai.com", + server_port=443, + ) + handler.start_create_agent(creation) + handler.stop_create_agent(creation) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + attrs = spans[0].attributes + self.assertEqual(attrs[GenAI.GEN_AI_OPERATION_NAME], "create_agent") + self.assertEqual(attrs[GenAI.GEN_AI_AGENT_NAME], "Full Agent") + self.assertEqual(attrs[GenAI.GEN_AI_AGENT_ID], "agent-123") + self.assertEqual(attrs[GenAI.GEN_AI_AGENT_DESCRIPTION], "A test agent") + self.assertEqual(attrs[GenAI.GEN_AI_AGENT_VERSION], "1.0.0") + self.assertEqual(attrs[GenAI.GEN_AI_PROVIDER_NAME], "openai") + self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_MODEL], "gpt-4") + + def test_fail_create_agent(self) -> None: + handler = self._make_handler() + creation = AgentCreation(agent_name="Bad Agent") + handler.start_create_agent(creation) + error = Error(message="creation failed", type=RuntimeError) + handler.fail_create_agent(creation, error) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].status.description, "creation failed") + self.assertEqual(spans[0].attributes.get("error.type"), "RuntimeError") + + def test_create_agent_context_manager(self) -> None: + handler = self._make_handler() + creation = AgentCreation( + agent_name="CM Agent", + provider="openai", + ) + with handler.create_agent(creation) as c: + c.agent_id = "assigned-id" + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].name, "create_agent CM Agent") + + def test_create_agent_context_manager_error(self) -> None: + handler = self._make_handler() + with self.assertRaises(TypeError): + with handler.create_agent(AgentCreation(agent_name="Err")): + raise TypeError("bad type") + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].attributes.get("error.type"), "TypeError") + + def test_create_agent_context_manager_default(self) -> None: + handler = self._make_handler() + with handler.create_agent() as c: + c.agent_name = "Dynamic Agent" + c.provider = "openai" + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + def test_stop_create_agent_without_start_is_noop(self) -> None: + handler = self._make_handler() + creation = AgentCreation(agent_name="Not Started") + result = handler.stop_create_agent(creation) + self.assertIs(result, creation) + self.assertEqual(len(self.span_exporter.get_finished_spans()), 0) + + def test_fail_create_agent_without_start_is_noop(self) -> None: + handler = self._make_handler() + creation = AgentCreation(agent_name="Not Started") + error = Error(message="boom", type=RuntimeError) + result = handler.fail_create_agent(creation, error) + self.assertIs(result, creation) + self.assertEqual(len(self.span_exporter.get_finished_spans()), 0) + + +class TestAgentTypes(TestCase): + """Unit tests for the AgentInvocation and AgentCreation dataclasses.""" + + def test_agent_invocation_defaults(self) -> None: + inv = AgentInvocation() + self.assertEqual(inv.operation_name, "invoke_agent") + self.assertIsNone(inv.agent_name) + self.assertIsNone(inv.agent_id) + self.assertIsNone(inv.provider) + self.assertIsNone(inv.request_model) + self.assertTrue(inv.is_remote) + self.assertEqual(inv.input_messages, []) + self.assertEqual(inv.output_messages, []) + self.assertEqual(inv.system_instruction, []) + self.assertIsNone(inv.tool_definitions) + self.assertIsNone(inv.span) + self.assertIsNone(inv.context_token) + + def test_agent_creation_defaults(self) -> None: + creation = AgentCreation() + self.assertEqual(creation.operation_name, "create_agent") + self.assertIsNone(creation.agent_name) + self.assertIsNone(creation.agent_id) + self.assertIsNone(creation.agent_description) + self.assertIsNone(creation.agent_version) + self.assertIsNone(creation.provider) + self.assertIsNone(creation.request_model) + self.assertEqual(creation.system_instruction, []) + self.assertIsNone(creation.server_address) + self.assertIsNone(creation.server_port) + self.assertIsNone(creation.span) + self.assertIsNone(creation.context_token) + + def test_agent_invocation_with_messages(self) -> None: + inv = AgentInvocation( + agent_name="Test", + input_messages=[ + InputMessage(role="user", parts=[Text(content="Hello")]) + ], + output_messages=[ + OutputMessage( + role="assistant", + parts=[Text(content="Hi there!")], + finish_reason="stop", + ) + ], + ) + self.assertEqual(len(inv.input_messages), 1) + self.assertEqual(len(inv.output_messages), 1) + self.assertEqual(inv.input_messages[0].role, "user") + + def test_agent_invocation_custom_attributes(self) -> None: + inv = AgentInvocation( + agent_name="Custom", + attributes={"custom.key": "custom_value"}, + ) + self.assertEqual(inv.attributes["custom.key"], "custom_value") + + def test_agent_creation_custom_attributes(self) -> None: + creation = AgentCreation( + agent_name="Custom", + attributes={"custom.key": "custom_value"}, + ) + self.assertEqual(creation.attributes["custom.key"], "custom_value") + + +class TestAgentMetrics(TestCase): + def setUp(self) -> None: + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) + self.span_exporter = InMemorySpanExporter() + self.tracer_provider = TracerProvider() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + + def _make_handler(self) -> TelemetryHandler: + return TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + def _harvest_metrics(self) -> Dict[str, List[Any]]: + try: + self.meter_provider.force_flush() + except Exception: + pass + self.metric_reader.collect() + metrics_by_name: Dict[str, List[Any]] = {} + data = self.metric_reader.get_metrics_data() + for resource_metric in (data and data.resource_metrics) or []: + for scope_metric in resource_metric.scope_metrics or []: + for metric in scope_metric.metrics or []: + points = metric.data.data_points or [] + metrics_by_name.setdefault(metric.name, []).extend(points) + return metrics_by_name + + def test_agent_records_duration_and_tokens(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation( + agent_name="Metrics Agent", + provider="openai", + request_model="gpt-4", + ) + invocation.input_tokens = 50 + invocation.output_tokens = 100 + + with patch("timeit.default_timer", return_value=1000.0): + handler.start_agent(invocation) + + with patch("timeit.default_timer", return_value=1003.0): + handler.stop_agent(invocation) + + metrics = self._harvest_metrics() + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_points = metrics["gen_ai.client.operation.duration"] + self.assertEqual(len(duration_points), 1) + self.assertAlmostEqual(duration_points[0].sum, 3.0, places=3) + self.assertEqual( + duration_points[0].attributes[GenAI.GEN_AI_OPERATION_NAME], + "invoke_agent", + ) From 6e23298f0d4afd955650be1e188dd338f691fbe3 Mon Sep 17 00:00:00 2001 From: etserend Date: Wed, 18 Mar 2026 12:03:33 -0500 Subject: [PATCH 2/2] update context handler --- .../src/opentelemetry/util/genai/handler.py | 131 ++++++------------ .../src/opentelemetry/util/genai/metrics.py | 52 +++---- .../opentelemetry/util/genai/span_utils.py | 95 ++++++------- .../src/opentelemetry/util/genai/types.py | 5 +- .../tests/test_handler_agent.py | 20 +-- 5 files changed, 126 insertions(+), 177 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 352a2fa174..5f256cc6f7 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -82,9 +82,9 @@ from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.span_utils import ( _apply_agent_finish_attributes, - _apply_creation_finish_attributes, _apply_error_attributes, _apply_llm_finish_attributes, + _get_base_agent_span_name, _maybe_emit_agent_event, _maybe_emit_llm_event, ) @@ -94,6 +94,7 @@ Error, GenAIInvocation, LLMInvocation, + _BaseAgent, ) from opentelemetry.util.genai.version import __version__ @@ -257,31 +258,31 @@ def llm( ) as inv: yield inv - # ---- Agent invocation lifecycle ---- + # ---- Unified agent lifecycle (invoke_agent / create_agent) ---- def start_agent( self, - invocation: AgentInvocation, - ) -> AgentInvocation: - """Start an agent invocation and create a pending span entry.""" - span_name = ( - f"{invocation.operation_name} {invocation.agent_name}".strip() - ) - kind = SpanKind.CLIENT if invocation.is_remote else SpanKind.INTERNAL + agent: _BaseAgent, + ) -> _BaseAgent: + """Start an agent operation and create a pending span entry. + + Accepts any _BaseAgent subclass (AgentInvocation, AgentCreation). + """ + span_name = _get_base_agent_span_name(agent) + is_remote = getattr(agent, "is_remote", True) + kind = SpanKind.CLIENT if is_remote else SpanKind.INTERNAL span = self._tracer.start_span( name=span_name, kind=kind, ) - invocation.monotonic_start_s = timeit.default_timer() - invocation.span = span - invocation.context_token = otel_context.attach( - set_span_in_context(span) - ) - return invocation + agent.monotonic_start_s = timeit.default_timer() + agent.span = span + agent.context_token = otel_context.attach(set_span_in_context(span)) + return agent def _record_agent_metrics( self, - invocation: AgentInvocation, + agent: _BaseAgent, span: Span | None = None, *, error_type: str | None = None, @@ -290,45 +291,41 @@ def _record_agent_metrics( return self._metrics_recorder.record_agent( span, - invocation, + agent, error_type=error_type, ) - def stop_agent(self, invocation: AgentInvocation) -> AgentInvocation: - """Finalize an agent invocation successfully and end its span.""" - if invocation.context_token is None or invocation.span is None: - return invocation + def stop_agent(self, agent: _BaseAgent) -> _BaseAgent: + """Finalize an agent operation successfully and end its span.""" + if agent.context_token is None or agent.span is None: + return agent try: - _apply_agent_finish_attributes(invocation.span, invocation) - self._record_agent_metrics(invocation, invocation.span) - _maybe_emit_agent_event(self._logger, invocation.span, invocation) + _apply_agent_finish_attributes(agent.span, agent) + self._record_agent_metrics(agent, agent.span) + _maybe_emit_agent_event(self._logger, agent.span, agent) finally: - otel_context.detach(invocation.context_token) - invocation.span.end() - return invocation + otel_context.detach(agent.context_token) + agent.span.end() + return agent - def fail_agent( - self, invocation: AgentInvocation, error: Error - ) -> AgentInvocation: - """Fail an agent invocation and end its span with error status.""" - if invocation.context_token is None or invocation.span is None: - return invocation + def fail_agent(self, agent: _BaseAgent, error: Error) -> _BaseAgent: + """Fail an agent operation and end its span with error status.""" + if agent.context_token is None or agent.span is None: + return agent try: - _apply_agent_finish_attributes(invocation.span, invocation) - _apply_error_attributes(invocation.span, error) + _apply_agent_finish_attributes(agent.span, agent) + _apply_error_attributes(agent.span, error) error_type = getattr(error.type, "__qualname__", None) self._record_agent_metrics( - invocation, invocation.span, error_type=error_type - ) - _maybe_emit_agent_event( - self._logger, invocation.span, invocation, error + agent, agent.span, error_type=error_type ) + _maybe_emit_agent_event(self._logger, agent.span, agent, error) finally: - otel_context.detach(invocation.context_token) - invocation.span.end() - return invocation + otel_context.detach(agent.context_token) + agent.span.end() + return agent @contextmanager def agent( @@ -353,50 +350,6 @@ def agent( ) as inv: yield inv - # ---- Agent creation lifecycle ---- - - def start_create_agent( - self, - creation: AgentCreation, - ) -> AgentCreation: - """Start an agent creation and create a pending span entry.""" - span_name = f"{creation.operation_name} {creation.agent_name}".strip() - span = self._tracer.start_span( - name=span_name, - kind=SpanKind.CLIENT, - ) - creation.monotonic_start_s = timeit.default_timer() - creation.span = span - creation.context_token = otel_context.attach(set_span_in_context(span)) - return creation - - def stop_create_agent(self, creation: AgentCreation) -> AgentCreation: # pylint: disable=no-self-use - """Finalize an agent creation successfully and end its span.""" - if creation.context_token is None or creation.span is None: - return creation - - try: - _apply_creation_finish_attributes(creation.span, creation) - finally: - otel_context.detach(creation.context_token) - creation.span.end() - return creation - - def fail_create_agent( # pylint: disable=no-self-use - self, creation: AgentCreation, error: Error - ) -> AgentCreation: - """Fail an agent creation and end its span with error status.""" - if creation.context_token is None or creation.span is None: - return creation - - try: - _apply_creation_finish_attributes(creation.span, creation) - _apply_error_attributes(creation.span, error) - finally: - otel_context.detach(creation.context_token) - creation.span.end() - return creation - @contextmanager def create_agent( self, creation: AgentCreation | None = None @@ -413,9 +366,9 @@ def create_agent( creation = AgentCreation() with _lifecycle_context( creation, - self.start_create_agent, - self.stop_create_agent, - self.fail_create_agent, + self.start_agent, + self.stop_agent, + self.fail_agent, "create_agent", ) as c: yield c diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py index 6fc2fe6dcc..738bae9d7c 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py @@ -18,7 +18,7 @@ create_duration_histogram, create_token_histogram, ) -from opentelemetry.util.genai.types import AgentInvocation, LLMInvocation +from opentelemetry.util.genai.types import LLMInvocation, _BaseAgent from opentelemetry.util.types import AttributeValue @@ -108,55 +108,55 @@ def record( def record_agent( self, span: Optional[Span], - invocation: AgentInvocation, + agent: _BaseAgent, *, error_type: Optional[str] = None, ) -> None: - """Record duration and token metrics for an agent invocation.""" + """Record duration and token metrics for any agent operation.""" if span is None: return token_counts: list[tuple[int, str]] = [] - if invocation.input_tokens is not None: + input_tokens = getattr(agent, "input_tokens", None) + output_tokens = getattr(agent, "output_tokens", None) + if input_tokens is not None: token_counts.append( ( - invocation.input_tokens, + input_tokens, GenAI.GenAiTokenTypeValues.INPUT.value, ) ) - if invocation.output_tokens is not None: + if output_tokens is not None: token_counts.append( ( - invocation.output_tokens, + output_tokens, GenAI.GenAiTokenTypeValues.OUTPUT.value, ) ) attributes: Dict[str, AttributeValue] = { - GenAI.GEN_AI_OPERATION_NAME: invocation.operation_name + GenAI.GEN_AI_OPERATION_NAME: agent.operation_name } - if invocation.request_model: - attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model - if invocation.provider: - attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider - if invocation.response_model_name: - attributes[GenAI.GEN_AI_RESPONSE_MODEL] = ( - invocation.response_model_name - ) - if invocation.server_address: - attributes[server_attributes.SERVER_ADDRESS] = ( - invocation.server_address - ) - if invocation.server_port is not None: - attributes[server_attributes.SERVER_PORT] = invocation.server_port - if invocation.metric_attributes: - attributes.update(invocation.metric_attributes) + if agent.request_model: + attributes[GenAI.GEN_AI_REQUEST_MODEL] = agent.request_model + if agent.provider: + attributes[GenAI.GEN_AI_PROVIDER_NAME] = agent.provider + response_model_name = getattr(agent, "response_model_name", None) + if response_model_name: + attributes[GenAI.GEN_AI_RESPONSE_MODEL] = response_model_name + if agent.server_address: + attributes[server_attributes.SERVER_ADDRESS] = agent.server_address + if agent.server_port is not None: + attributes[server_attributes.SERVER_PORT] = agent.server_port + metric_attributes = getattr(agent, "metric_attributes", None) + if metric_attributes: + attributes.update(metric_attributes) duration_seconds: Optional[float] = None - if invocation.monotonic_start_s is not None: + if agent.monotonic_start_s is not None: duration_seconds = max( - timeit.default_timer() - invocation.monotonic_start_s, + timeit.default_timer() - agent.monotonic_start_s, 0.0, ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py index 64f98e4445..51e326d776 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py @@ -359,11 +359,6 @@ def _get_agent_common_attributes( return attrs -def _get_agent_span_name(invocation: AgentInvocation) -> str: - """Get the span name for an agent invocation.""" - return _get_base_agent_span_name(invocation) - - def _get_agent_request_attributes( invocation: AgentInvocation, ) -> dict[str, Any]: @@ -424,12 +419,24 @@ def _get_agent_response_attributes( return {key: value for key, value in optional_attrs if value is not None} -def _apply_agent_finish_attributes( +def _apply_agent_finish_attributes(span: Span, agent: _BaseAgent) -> None: + """Apply attributes common to any agent finish() path. + + Dispatches to invocation-specific or creation-specific logic + based on the concrete type. + """ + span.update_name(_get_base_agent_span_name(agent)) + + if isinstance(agent, AgentInvocation): + _apply_invocation_finish_attributes(span, agent) + elif isinstance(agent, AgentCreation): + _apply_creation_finish_attributes(span, agent) + + +def _apply_invocation_finish_attributes( span: Span, invocation: AgentInvocation ) -> None: - """Apply attributes/messages common to agent finish() paths.""" - span.update_name(_get_agent_span_name(invocation)) - + """Apply attributes specific to agent invocation finish() paths.""" attributes: dict[str, Any] = {} attributes.update(_get_agent_common_attributes(invocation)) attributes.update(_get_agent_request_attributes(invocation)) @@ -448,26 +455,13 @@ def _apply_agent_finish_attributes( span.set_attributes(attributes) -def _get_creation_common_attributes( - creation: AgentCreation, -) -> dict[str, Any]: - """Get common agent creation attributes.""" - return _get_base_agent_common_attributes(creation) - - -def _get_creation_span_name(creation: AgentCreation) -> str: - """Get the span name for an agent creation.""" - return _get_base_agent_span_name(creation) - - def _apply_creation_finish_attributes( span: Span, creation: AgentCreation ) -> None: """Apply attributes common to agent creation finish() paths.""" - span.update_name(_get_creation_span_name(creation)) attributes: dict[str, Any] = {} - attributes.update(_get_creation_common_attributes(creation)) + attributes.update(_get_base_agent_common_attributes(creation)) # System instructions (Opt-In) if ( @@ -492,37 +486,40 @@ def _apply_creation_finish_attributes( def _maybe_emit_agent_event( logger: Logger | None, span: Span, - invocation: AgentInvocation, + agent: _BaseAgent, error: Error | None = None, ) -> None: - """Emit a gen_ai.client.inference.operation.details event for agent invocation.""" + """Emit a gen_ai.client.inference.operation.details event for any agent operation.""" if not is_experimental_mode() or not should_emit_event() or logger is None: return attributes: dict[str, Any] = {} - attributes.update(_get_agent_common_attributes(invocation)) - attributes.update(_get_agent_request_attributes(invocation)) - attributes.update(_get_agent_response_attributes(invocation)) + attributes.update(_get_base_agent_common_attributes(agent)) - # Event uses structured format for messages - if get_content_capturing_mode() in ( - ContentCapturingMode.EVENT_ONLY, - ContentCapturingMode.SPAN_AND_EVENT, - ): - if invocation.input_messages: - attributes[GenAI.GEN_AI_INPUT_MESSAGES] = [ - asdict(m) for m in invocation.input_messages - ] - if invocation.output_messages: - attributes[GenAI.GEN_AI_OUTPUT_MESSAGES] = [ - asdict(m) for m in invocation.output_messages - ] - if invocation.system_instruction: - attributes[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS] = [ - asdict(p) for p in invocation.system_instruction - ] - if invocation.tool_definitions: - attributes[_GEN_AI_TOOL_DEFINITIONS] = invocation.tool_definitions + if isinstance(agent, AgentInvocation): + attributes.update(_get_agent_common_attributes(agent)) + attributes.update(_get_agent_request_attributes(agent)) + attributes.update(_get_agent_response_attributes(agent)) + + # Event uses structured format for messages + if get_content_capturing_mode() in ( + ContentCapturingMode.EVENT_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ): + if agent.input_messages: + attributes[GenAI.GEN_AI_INPUT_MESSAGES] = [ + asdict(m) for m in agent.input_messages + ] + if agent.output_messages: + attributes[GenAI.GEN_AI_OUTPUT_MESSAGES] = [ + asdict(m) for m in agent.output_messages + ] + if agent.system_instruction: + attributes[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS] = [ + asdict(p) for p in agent.system_instruction + ] + if agent.tool_definitions: + attributes[_GEN_AI_TOOL_DEFINITIONS] = agent.tool_definitions if error is not None: attributes[error_attributes.ERROR_TYPE] = error.type.__qualname__ @@ -547,12 +544,10 @@ def _maybe_emit_agent_event( "_get_base_agent_common_attributes", "_get_base_agent_span_name", "_apply_agent_finish_attributes", + "_apply_invocation_finish_attributes", "_apply_creation_finish_attributes", "_get_agent_common_attributes", "_get_agent_request_attributes", "_get_agent_response_attributes", - "_get_agent_span_name", - "_get_creation_common_attributes", - "_get_creation_span_name", "_maybe_emit_agent_event", ] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 88b62e5b3b..88b2c052ab 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -14,6 +14,7 @@ from __future__ import annotations +from abc import ABC from contextvars import Token from dataclasses import dataclass, field from enum import Enum @@ -237,9 +238,9 @@ class LLMInvocation(GenAIInvocation): @dataclass -class _BaseAgent(GenAIInvocation): +class _BaseAgent(GenAIInvocation, ABC): """ - Shared base class for agent lifecycle types (AgentInvocation, AgentCreation). + Shared abstract base class for agent lifecycle types (AgentInvocation, AgentCreation). Contains fields common to all agent operations: identity, provider, model, system instructions, server info, and telemetry plumbing. diff --git a/util/opentelemetry-util-genai/tests/test_handler_agent.py b/util/opentelemetry-util-genai/tests/test_handler_agent.py index 19850c6a30..c33690e382 100644 --- a/util/opentelemetry-util-genai/tests/test_handler_agent.py +++ b/util/opentelemetry-util-genai/tests/test_handler_agent.py @@ -216,8 +216,8 @@ def test_start_stop_create_agent(self) -> None: provider="openai", request_model="gpt-4", ) - handler.start_create_agent(creation) - handler.stop_create_agent(creation) + handler.start_agent(creation) + handler.stop_agent(creation) spans = self.span_exporter.get_finished_spans() self.assertEqual(len(spans), 1) @@ -231,8 +231,8 @@ def test_start_stop_create_agent(self) -> None: def test_create_agent_span_kind_is_client(self) -> None: handler = self._make_handler() creation = AgentCreation(agent_name="Client Agent") - handler.start_create_agent(creation) - handler.stop_create_agent(creation) + handler.start_agent(creation) + handler.stop_agent(creation) spans = self.span_exporter.get_finished_spans() self.assertEqual(spans[0].kind, SpanKind.CLIENT) @@ -249,8 +249,8 @@ def test_create_agent_with_all_base_attributes(self) -> None: server_address="api.openai.com", server_port=443, ) - handler.start_create_agent(creation) - handler.stop_create_agent(creation) + handler.start_agent(creation) + handler.stop_agent(creation) spans = self.span_exporter.get_finished_spans() self.assertEqual(len(spans), 1) @@ -266,9 +266,9 @@ def test_create_agent_with_all_base_attributes(self) -> None: def test_fail_create_agent(self) -> None: handler = self._make_handler() creation = AgentCreation(agent_name="Bad Agent") - handler.start_create_agent(creation) + handler.start_agent(creation) error = Error(message="creation failed", type=RuntimeError) - handler.fail_create_agent(creation, error) + handler.fail_agent(creation, error) spans = self.span_exporter.get_finished_spans() self.assertEqual(len(spans), 1) @@ -310,7 +310,7 @@ def test_create_agent_context_manager_default(self) -> None: def test_stop_create_agent_without_start_is_noop(self) -> None: handler = self._make_handler() creation = AgentCreation(agent_name="Not Started") - result = handler.stop_create_agent(creation) + result = handler.stop_agent(creation) self.assertIs(result, creation) self.assertEqual(len(self.span_exporter.get_finished_spans()), 0) @@ -318,7 +318,7 @@ def test_fail_create_agent_without_start_is_noop(self) -> None: handler = self._make_handler() creation = AgentCreation(agent_name="Not Started") error = Error(message="boom", type=RuntimeError) - result = handler.fail_create_agent(creation, error) + result = handler.fail_agent(creation, error) self.assertIs(result, creation) self.assertEqual(len(self.span_exporter.get_finished_spans()), 0)