diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 1ddd22cee8..7951d64d6a 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add `AgentCreation` and `AgentInvocation` invocation types with factory methods (`start_create_agent`, `start_invoke_local_agent`, `start_invoke_remote_agent`), context managers, metrics recording, and event emission. + ([#4329](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4329)) - Add metrics support for EmbeddingInvocation ([#4377](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4377)) - Add support for workflow in genAI utils handler. diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_creation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_creation.py new file mode 100644 index 0000000000..a506548792 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_creation.py @@ -0,0 +1,197 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Agent creation invocation type. + +Represents a ``create_agent`` operation as defined by the OpenTelemetry +GenAI semantic conventions: +https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#create-agent +""" + +from __future__ import annotations + +from dataclasses import asdict +from typing import Any + +from opentelemetry._logs import Logger, LogRecord +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.attributes import server_attributes +from opentelemetry.trace import SpanKind, Tracer +from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai.metrics import InvocationMetricsRecorder +from opentelemetry.util.genai.types import MessagePart +from opentelemetry.util.genai.utils import ( + ContentCapturingMode, + gen_ai_json_dumps, + get_content_capturing_mode, + is_experimental_mode, + should_emit_event, +) + +_GEN_AI_AGENT_VERSION: str = getattr( + GenAI, "GEN_AI_AGENT_VERSION", "gen_ai.agent.version" +) + + +class AgentCreation(GenAIInvocation): + """Represents an agent creation/initialization. + + Use ``handler.start_create_agent()`` or ``handler.create_agent()`` + context manager rather than constructing this directly. + + Spec: + https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#create-agent + """ + + def __init__( + self, + tracer: Tracer, + metrics_recorder: InvocationMetricsRecorder, + logger: Logger, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + attributes: dict[str, Any] | None = None, + metric_attributes: dict[str, Any] | None = None, + ) -> None: + """Use handler.start_create_agent() or handler.create_agent() instead of calling this directly.""" + _operation_name = GenAI.GenAiOperationNameValues.CREATE_AGENT.value + super().__init__( + tracer, + metrics_recorder, + logger, + operation_name=_operation_name, + span_name=_operation_name, + span_kind=SpanKind.CLIENT, + attributes=attributes, + metric_attributes=metric_attributes, + ) + self.provider = provider + self.request_model = request_model + self.server_address = server_address + self.server_port = server_port + + self.agent_name: str | None = None + self.agent_id: str | None = None + self.agent_description: str | None = None + self.agent_version: str | None = None + + self.system_instruction: list[MessagePart] = [] + + self._start() + + def _get_common_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_REQUEST_MODEL, self.request_model), + (server_attributes.SERVER_ADDRESS, self.server_address), + (server_attributes.SERVER_PORT, self.server_port), + (GenAI.GEN_AI_AGENT_NAME, self.agent_name), + (GenAI.GEN_AI_AGENT_ID, self.agent_id), + (GenAI.GEN_AI_AGENT_DESCRIPTION, self.agent_description), + (_GEN_AI_AGENT_VERSION, self.agent_version), + ) + return { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name, + GenAI.GEN_AI_PROVIDER_NAME: self.provider, + **{k: v for k, v in optional_attrs if v is not None}, + } + + def _get_system_instructions_for_span(self) -> dict[str, Any]: + if ( + not is_experimental_mode() + or get_content_capturing_mode() + not in ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + or not self.system_instruction + ): + return {} + return { + GenAI.GEN_AI_SYSTEM_INSTRUCTIONS: gen_ai_json_dumps( + [asdict(p) for p in self.system_instruction] + ), + } + + def _get_metric_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_PROVIDER_NAME, self.provider), + (GenAI.GEN_AI_REQUEST_MODEL, self.request_model), + (server_attributes.SERVER_ADDRESS, self.server_address), + (server_attributes.SERVER_PORT, self.server_port), + ) + attrs: dict[str, Any] = { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name, + **{k: v for k, v in optional_attrs if v is not None}, + } + attrs.update(self.metric_attributes) + return attrs + + def _get_system_instructions_for_event(self) -> dict[str, Any]: + if ( + not is_experimental_mode() + or get_content_capturing_mode() + not in ( + ContentCapturingMode.EVENT_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + or not self.system_instruction + ): + return {} + return { + GenAI.GEN_AI_SYSTEM_INSTRUCTIONS: [ + asdict(p) for p in self.system_instruction + ], + } + + def _apply_finish(self, error: Error | None = None) -> None: + if error is not None: + self._apply_error_attributes(error) + + # Update span name if agent_name was set after construction + if self.agent_name: + self.span.update_name(f"{self._operation_name} {self.agent_name}") + + attributes: dict[str, Any] = {} + attributes.update(self._get_common_attributes()) + attributes.update(self._get_system_instructions_for_span()) + attributes.update(self.attributes) + self.span.set_attributes(attributes) + self._metrics_recorder.record(self) + self._emit_event() + + def _emit_event(self) -> None: + """Emit a gen_ai.client.inference.operation.details event. + + For more details, see the semantic convention documentation: + https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-events.md#event-eventgen_aiclientinferenceoperationdetails + """ + if not is_experimental_mode() or not should_emit_event(): + return + + attributes: dict[str, Any] = {} + attributes.update(self._get_common_attributes()) + attributes.update(self._get_system_instructions_for_event()) + attributes.update(self.attributes) + self._logger.emit( + LogRecord( + event_name="gen_ai.client.inference.operation.details", + attributes=attributes, + context=self._span_context, + ) + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_invocation.py new file mode 100644 index 0000000000..8ea7f455ee --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_invocation.py @@ -0,0 +1,255 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Any + +from opentelemetry._logs import Logger, LogRecord +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.attributes import server_attributes +from opentelemetry.trace import SpanKind, Tracer +from opentelemetry.util.genai._content import _get_content_attributes +from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai.metrics import InvocationMetricsRecorder +from opentelemetry.util.genai.types import ( + InputMessage, + MessagePart, + OutputMessage, + ToolDefinition, +) +from opentelemetry.util.genai.utils import ( + is_experimental_mode, + should_emit_event, +) + +_GEN_AI_AGENT_VERSION: str = getattr( + GenAI, "GEN_AI_AGENT_VERSION", "gen_ai.agent.version" +) +_GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS: str = getattr( + GenAI, + "GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS", + "gen_ai.usage.cache_creation_input_tokens", +) +_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS: str = getattr( + GenAI, + "GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS", + "gen_ai.usage.cache_read_input_tokens", +) + + +class AgentInvocation(GenAIInvocation): + """Represents a single agent invocation (invoke_agent span). + + Use handler.start_invoke_local_agent() / handler.start_invoke_remote_agent() + or the handler.invoke_local_agent() / handler.invoke_remote_agent() context + managers rather than constructing this directly. + + Reference: + Client span: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#invoke-agent-client-span + Internal span: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#invoke-agent-internal-span + """ + + def __init__( + self, + tracer: Tracer, + metrics_recorder: InvocationMetricsRecorder, + logger: Logger, + provider: str, + *, + span_kind: SpanKind = SpanKind.INTERNAL, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + attributes: dict[str, Any] | None = None, + metric_attributes: dict[str, Any] | None = None, + ) -> None: + """Use handler.start_invoke_local_agent() or handler.start_invoke_remote_agent() instead of calling this directly.""" + _operation_name = GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + super().__init__( + tracer, + metrics_recorder, + logger, + operation_name=_operation_name, + span_name=_operation_name, + span_kind=span_kind, + attributes=attributes, + metric_attributes=metric_attributes, + ) + self.provider = provider + self.request_model = request_model + self.server_address = server_address + self.server_port = server_port + + self.agent_name: str | None = None + self.agent_id: str | None = None + self.agent_description: str | None = None + self.agent_version: str | None = None + + self.conversation_id: str | None = None + self.data_source_id: str | None = None + self.output_type: str | None = None + + self.temperature: float | None = None + self.top_p: float | None = None + self.frequency_penalty: float | None = None + self.presence_penalty: float | None = None + self.max_tokens: int | None = None + self.stop_sequences: list[str] | None = None + self.seed: int | None = None + self.choice_count: int | None = None + + self.input_tokens: int | None = None + self.output_tokens: int | None = None + self.cache_creation_input_tokens: int | None = None + self.cache_read_input_tokens: int | None = None + + self.input_messages: list[InputMessage] = [] + self.output_messages: list[OutputMessage] = [] + self.system_instruction: list[MessagePart] = [] + self.tool_definitions: list[ToolDefinition] | None = None + + self._start() + + def _get_common_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_REQUEST_MODEL, self.request_model), + (server_attributes.SERVER_ADDRESS, self.server_address), + (server_attributes.SERVER_PORT, self.server_port), + (GenAI.GEN_AI_AGENT_NAME, self.agent_name), + (GenAI.GEN_AI_AGENT_ID, self.agent_id), + (GenAI.GEN_AI_AGENT_DESCRIPTION, self.agent_description), + (_GEN_AI_AGENT_VERSION, self.agent_version), + ) + return { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name, + GenAI.GEN_AI_PROVIDER_NAME: self.provider, + **{k: v for k, v in optional_attrs if v is not None}, + } + + def _get_request_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_CONVERSATION_ID, self.conversation_id), + (GenAI.GEN_AI_DATA_SOURCE_ID, self.data_source_id), + (GenAI.GEN_AI_OUTPUT_TYPE, self.output_type), + (GenAI.GEN_AI_REQUEST_TEMPERATURE, self.temperature), + (GenAI.GEN_AI_REQUEST_TOP_P, self.top_p), + (GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, self.frequency_penalty), + (GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, self.presence_penalty), + (GenAI.GEN_AI_REQUEST_MAX_TOKENS, self.max_tokens), + (GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, self.stop_sequences), + (GenAI.GEN_AI_REQUEST_SEED, self.seed), + (GenAI.GEN_AI_REQUEST_CHOICE_COUNT, self.choice_count), + ) + return {k: v for k, v in optional_attrs if v is not None} + + def _get_usage_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_USAGE_INPUT_TOKENS, self.input_tokens), + (GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, self.output_tokens), + ( + _GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS, + self.cache_creation_input_tokens, + ), + ( + _GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS, + self.cache_read_input_tokens, + ), + ) + return {k: v for k, v in optional_attrs if v is not None} + + def _get_content_attributes_for_span(self) -> dict[str, Any]: + return _get_content_attributes( + input_messages=self.input_messages, + output_messages=self.output_messages, + system_instruction=self.system_instruction, + tool_definitions=self.tool_definitions, + for_span=True, + ) + + def _get_metric_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_PROVIDER_NAME, self.provider), + (GenAI.GEN_AI_REQUEST_MODEL, self.request_model), + (server_attributes.SERVER_ADDRESS, self.server_address), + (server_attributes.SERVER_PORT, self.server_port), + ) + attrs: dict[str, Any] = { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name, + **{k: v for k, v in optional_attrs if v is not None}, + } + attrs.update(self.metric_attributes) + return attrs + + def _get_metric_token_counts(self) -> dict[str, int]: + counts: dict[str, int] = {} + if self.input_tokens is not None: + counts[GenAI.GenAiTokenTypeValues.INPUT.value] = self.input_tokens + if self.output_tokens is not None: + counts[GenAI.GenAiTokenTypeValues.OUTPUT.value] = ( + self.output_tokens + ) + return counts + + def _get_content_attributes_for_event(self) -> dict[str, Any]: + return _get_content_attributes( + input_messages=self.input_messages, + output_messages=self.output_messages, + system_instruction=self.system_instruction, + tool_definitions=self.tool_definitions, + for_span=False, + ) + + def _apply_finish(self, error: Error | None = None) -> None: + if error is not None: + self._apply_error_attributes(error) + + # Update span name if agent_name was set after construction + if self.agent_name: + self.span.update_name(f"{self._operation_name} {self.agent_name}") + + attributes: dict[str, Any] = {} + attributes.update(self._get_common_attributes()) + attributes.update(self._get_request_attributes()) + attributes.update(self._get_usage_attributes()) + attributes.update(self._get_content_attributes_for_span()) + attributes.update(self.attributes) + self.span.set_attributes(attributes) + self._metrics_recorder.record(self) + self._emit_event() + + def _emit_event(self) -> None: + """Emit a gen_ai.client.inference.operation.details event. + + For more details, see the semantic convention documentation: + https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-events.md#event-eventgen_aiclientinferenceoperationdetails + """ + if not is_experimental_mode() or not should_emit_event(): + return + + attributes: dict[str, Any] = {} + attributes.update(self._get_common_attributes()) + attributes.update(self._get_request_attributes()) + attributes.update(self._get_usage_attributes()) + attributes.update(self._get_content_attributes_for_event()) + attributes.update(self.attributes) + self._logger.emit( + LogRecord( + event_name="gen_ai.client.inference.operation.details", + attributes=attributes, + context=self._span_context, + ) + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_content.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_content.py new file mode 100644 index 0000000000..27f883ad62 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_content.py @@ -0,0 +1,102 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared content serialization helpers for invocation types. + +This module provides shared logic for serializing input/output messages, +system instructions, and tool definitions into span and event attributes. +Used by both InferenceInvocation and AgentInvocation to avoid duplication. +""" + +from __future__ import annotations + +from dataclasses import asdict +from typing import Any, Sequence + +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.util.genai.types import ( + InputMessage, + MessagePart, + OutputMessage, + ToolDefinition, +) +from opentelemetry.util.genai.utils import ( + ContentCapturingMode, + gen_ai_json_dumps, + get_content_capturing_mode, + is_experimental_mode, +) + + +def _get_content_attributes( + *, + input_messages: Sequence[InputMessage], + output_messages: Sequence[OutputMessage], + system_instruction: Sequence[MessagePart], + tool_definitions: Sequence[ToolDefinition] | None, + for_span: bool, +) -> dict[str, Any]: + """Serialize messages, system instructions, and tool definitions into attributes. + + Args: + input_messages: Input messages to serialize. + output_messages: Output messages to serialize. + system_instruction: System instructions to serialize. + tool_definitions: Tool definitions to serialize (may be None). + for_span: If True, serialize for span attributes (JSON string); + if False, serialize for event attributes (list of dicts). + """ + if not is_experimental_mode(): + return {} + + mode = get_content_capturing_mode() + allowed_modes = ( + ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + if for_span + else ( + ContentCapturingMode.EVENT_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + ) + if mode not in allowed_modes: + return {} + + def serialize(items: Sequence[Any]) -> Any: + dicts = [asdict(item) for item in items] + return gen_ai_json_dumps(dicts) if for_span else dicts + + optional_attrs = ( + ( + GenAI.GEN_AI_INPUT_MESSAGES, + serialize(input_messages) if input_messages else None, + ), + ( + GenAI.GEN_AI_OUTPUT_MESSAGES, + serialize(output_messages) if output_messages else None, + ), + ( + GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, + serialize(system_instruction) if system_instruction else None, + ), + ( + GenAI.GEN_AI_TOOL_DEFINITIONS, + serialize(tool_definitions) if tool_definitions else None, + ), + ) + return {key: value for key, value in optional_attrs if value is not None} diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 9ef4a5592d..7d8f084e14 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -57,9 +57,12 @@ from opentelemetry.metrics import MeterProvider, get_meter from opentelemetry.semconv.schemas import Schemas from opentelemetry.trace import ( + SpanKind, TracerProvider, get_tracer, ) +from opentelemetry.util.genai._agent_creation import AgentCreation +from opentelemetry.util.genai._agent_invocation import AgentInvocation from opentelemetry.util.genai._inference_invocation import ( LLMInvocation, ) @@ -299,6 +302,146 @@ def tool( tool_description=tool_description, )._managed() + def start_create_agent( + self, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + ) -> AgentCreation: + """Create and start an agent creation span (CLIENT span kind). + + Set remaining attributes (agent_name, etc.) on the returned invocation, + then call invocation.stop() or invocation.fail(). + """ + return AgentCreation( + self._tracer, + self._metrics_recorder, + self._logger, + provider, + request_model=request_model, + server_address=server_address, + server_port=server_port, + ) + + def create_agent( + self, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + ) -> AbstractContextManager[AgentCreation]: + """Context manager for agent creation (CLIENT span kind). + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + return self.start_create_agent( + provider, + request_model=request_model, + server_address=server_address, + server_port=server_port, + )._managed() + + def start_invoke_local_agent( + self, + provider: str, + *, + request_model: str | None = None, + ) -> AgentInvocation: + """Create and start a local agent invocation (INTERNAL span kind). + + Use for agents running within the same process (e.g. LangChain, CrewAI). + + Set remaining attributes (agent_name, etc.) on the returned invocation, + then call invocation.stop() or invocation.fail(). + """ + return AgentInvocation( + self._tracer, + self._metrics_recorder, + self._logger, + provider, + span_kind=SpanKind.INTERNAL, + request_model=request_model, + ) + + def start_invoke_remote_agent( + self, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + ) -> AgentInvocation: + """Create and start a remote agent invocation (CLIENT span kind). + + Use for agents invoked over a remote service (e.g. OpenAI Assistants, AWS Bedrock). + + Set remaining attributes (agent_name, etc.) on the returned invocation, + then call invocation.stop() or invocation.fail(). + """ + return AgentInvocation( + self._tracer, + self._metrics_recorder, + self._logger, + provider, + span_kind=SpanKind.CLIENT, + request_model=request_model, + server_address=server_address, + server_port=server_port, + ) + + def invoke_local_agent( + self, + provider: str, + *, + request_model: str | None = None, + ) -> AbstractContextManager[AgentInvocation]: + """Context manager for local agent invocations (INTERNAL span kind). + + Use for agents running within the same process (e.g. LangChain, CrewAI). + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + return self.start_invoke_local_agent( + provider, + request_model=request_model, + )._managed() + + def invoke_remote_agent( + self, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + ) -> AbstractContextManager[AgentInvocation]: + """Context manager for remote agent invocations (CLIENT span kind). + + Use for agents invoked over a remote service (e.g. OpenAI Assistants, AWS Bedrock). + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + return self.start_invoke_remote_agent( + provider, + request_model=request_model, + server_address=server_address, + server_port=server_port, + )._managed() + def workflow( self, name: str | None = None, diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py index 4ac6426ce1..9f02a15ac8 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py @@ -26,6 +26,8 @@ ) """ +from opentelemetry.util.genai._agent_creation import AgentCreation +from opentelemetry.util.genai._agent_invocation import AgentInvocation from opentelemetry.util.genai._embedding_invocation import EmbeddingInvocation from opentelemetry.util.genai._inference_invocation import InferenceInvocation from opentelemetry.util.genai._invocation import ( @@ -37,6 +39,8 @@ from opentelemetry.util.genai._workflow_invocation import WorkflowInvocation __all__ = [ + "AgentCreation", + "AgentInvocation", "ContextToken", "Error", "GenAIInvocation", diff --git a/util/opentelemetry-util-genai/tests/test_handler_agent.py b/util/opentelemetry-util-genai/tests/test_handler_agent.py new file mode 100644 index 0000000000..2c239bb4ea --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_handler_agent.py @@ -0,0 +1,481 @@ +from __future__ import annotations + +import unittest +from typing import Any +from unittest.mock import patch + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.attributes import server_attributes +from opentelemetry.trace import SpanKind +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.utils import ContentCapturingMode + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_handler( + span_exporter: InMemorySpanExporter, + metric_reader: InMemoryMetricReader | None = None, +) -> TelemetryHandler: + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + kwargs: dict[str, Any] = {"tracer_provider": tracer_provider} + if metric_reader is not None: + kwargs["meter_provider"] = MeterProvider( + metric_readers=[metric_reader] + ) + return TelemetryHandler(**kwargs) + + +def _harvest_metrics( + meter_provider: MeterProvider, + metric_reader: InMemoryMetricReader, +) -> dict[str, list[Any]]: + try: + meter_provider.force_flush() + except Exception: + pass + metric_reader.collect() + metrics_by_name: dict[str, list[Any]] = {} + data = metric_reader.get_metrics_data() + for resource_metric in (data and data.resource_metrics) or []: + for scope_metric in resource_metric.scope_metrics or []: + for metric in scope_metric.metrics or []: + points = metric.data.data_points or [] + metrics_by_name.setdefault(metric.name, []).extend(points) + return metrics_by_name + + +# ============================================================================ +# AgentCreation tests +# ============================================================================ + + +class TestAgentCreation(unittest.TestCase): + def setUp(self): + self.span_exporter = InMemorySpanExporter() + self.handler = _make_handler(self.span_exporter) + + def test_start_stop_creates_span(self): + creation = self.handler.start_create_agent( + "openai", + request_model="gpt-4", + ) + creation.agent_name = "New Agent" + creation.agent_id = "agent-new-1" + creation.stop() + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.name == "create_agent New Agent" + assert span.attributes[GenAI.GEN_AI_OPERATION_NAME] == "create_agent" + assert span.attributes[GenAI.GEN_AI_AGENT_NAME] == "New Agent" + assert span.attributes[GenAI.GEN_AI_AGENT_ID] == "agent-new-1" + assert span.attributes[GenAI.GEN_AI_PROVIDER_NAME] == "openai" + assert span.attributes[GenAI.GEN_AI_REQUEST_MODEL] == "gpt-4" + + def test_span_kind_is_client(self): + creation = self.handler.start_create_agent("openai") + creation.stop() + assert ( + self.span_exporter.get_finished_spans()[0].kind == SpanKind.CLIENT + ) + + def test_all_attributes(self): + creation = self.handler.start_create_agent( + "openai", + request_model="gpt-4", + server_address="api.openai.com", + server_port=443, + ) + creation.agent_name = "Full Agent" + creation.agent_id = "agent-123" + creation.agent_description = "A test agent" + creation.agent_version = "1.0.0" + creation.stop() + + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert attrs[GenAI.GEN_AI_OPERATION_NAME] == "create_agent" + assert attrs[GenAI.GEN_AI_AGENT_NAME] == "Full Agent" + assert attrs[GenAI.GEN_AI_AGENT_ID] == "agent-123" + assert attrs[GenAI.GEN_AI_AGENT_DESCRIPTION] == "A test agent" + assert attrs["gen_ai.agent.version"] == "1.0.0" + assert attrs[GenAI.GEN_AI_PROVIDER_NAME] == "openai" + assert attrs[GenAI.GEN_AI_REQUEST_MODEL] == "gpt-4" + assert attrs[server_attributes.SERVER_ADDRESS] == "api.openai.com" + assert attrs[server_attributes.SERVER_PORT] == 443 + + def test_no_server_attributes_when_not_provided(self): + creation = self.handler.start_create_agent("openai") + creation.stop() + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert server_attributes.SERVER_ADDRESS not in attrs + assert server_attributes.SERVER_PORT not in attrs + + def test_fail_create_agent(self): + creation = self.handler.start_create_agent("openai") + creation.agent_name = "Bad Agent" + creation.fail(RuntimeError("creation failed")) + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.description == "creation failed" + assert spans[0].attributes.get("error.type") == "RuntimeError" + + def test_context_manager(self): + with self.handler.create_agent( + "openai", request_model="gpt-4" + ) as creation: + creation.agent_name = "CM Agent" + creation.agent_id = "assigned-id" + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "create_agent CM Agent" + assert spans[0].attributes[GenAI.GEN_AI_AGENT_ID] == "assigned-id" + + def test_context_manager_error(self): + with self.assertRaises(TypeError): + with self.handler.create_agent("openai") as creation: + creation.agent_name = "Err" + raise TypeError("bad type") + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes.get("error.type") == "TypeError" + + def test_custom_attributes(self): + creation = self.handler.start_create_agent( + "openai", request_model="gpt-4" + ) + creation.attributes["custom.key"] = "custom_value" + creation.stop() + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert attrs["custom.key"] == "custom_value" + + def test_span_name_without_agent_name(self): + creation = self.handler.start_create_agent("openai") + creation.stop() + assert ( + self.span_exporter.get_finished_spans()[0].name == "create_agent" + ) + + +# ============================================================================ +# AgentInvocation tests +# ============================================================================ + + +class TestAgentInvocation(unittest.TestCase): + def setUp(self): + self.span_exporter = InMemorySpanExporter() + self.handler = _make_handler(self.span_exporter) + + # ---- local (INTERNAL) ---- + + def test_local_agent_creates_span(self): + inv = self.handler.start_invoke_local_agent( + "openai", request_model="gpt-4" + ) + inv.agent_name = "Math Tutor" + inv.stop() + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.name == "invoke_agent Math Tutor" + assert span.kind == SpanKind.INTERNAL + assert span.attributes[GenAI.GEN_AI_OPERATION_NAME] == "invoke_agent" + assert span.attributes[GenAI.GEN_AI_AGENT_NAME] == "Math Tutor" + assert span.attributes[GenAI.GEN_AI_PROVIDER_NAME] == "openai" + + # ---- remote (CLIENT) ---- + + def test_remote_agent_creates_client_span(self): + inv = self.handler.start_invoke_remote_agent( + "openai", + request_model="gpt-4", + server_address="api.openai.com", + server_port=443, + ) + inv.agent_name = "Remote Agent" + inv.stop() + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.kind == SpanKind.CLIENT + assert ( + span.attributes[server_attributes.SERVER_ADDRESS] + == "api.openai.com" + ) + assert span.attributes[server_attributes.SERVER_PORT] == 443 + + def test_all_attributes(self): + inv = self.handler.start_invoke_remote_agent( + "openai", + request_model="gpt-4", + server_address="api.openai.com", + server_port=443, + ) + inv.agent_name = "Full Agent" + inv.agent_id = "agent-123" + inv.agent_description = "A test agent" + inv.agent_version = "1.0.0" + inv.conversation_id = "conv-456" + inv.data_source_id = "ds-789" + inv.output_type = "text" + inv.temperature = 0.7 + inv.top_p = 0.9 + inv.max_tokens = 1000 + inv.seed = 42 + inv.input_tokens = 100 + inv.output_tokens = 200 + inv.stop() + + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert attrs[GenAI.GEN_AI_AGENT_NAME] == "Full Agent" + assert attrs[GenAI.GEN_AI_AGENT_ID] == "agent-123" + assert attrs[GenAI.GEN_AI_AGENT_DESCRIPTION] == "A test agent" + assert attrs["gen_ai.agent.version"] == "1.0.0" + assert attrs[GenAI.GEN_AI_CONVERSATION_ID] == "conv-456" + assert attrs[GenAI.GEN_AI_DATA_SOURCE_ID] == "ds-789" + assert attrs[GenAI.GEN_AI_OUTPUT_TYPE] == "text" + assert attrs[GenAI.GEN_AI_REQUEST_TEMPERATURE] == 0.7 + assert attrs[GenAI.GEN_AI_REQUEST_TOP_P] == 0.9 + assert attrs[GenAI.GEN_AI_REQUEST_MAX_TOKENS] == 1000 + assert attrs[GenAI.GEN_AI_REQUEST_SEED] == 42 + assert attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS] == 100 + assert attrs[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] == 200 + + def test_fail_agent(self): + inv = self.handler.start_invoke_local_agent("openai") + inv.agent_name = "Failing Agent" + inv.fail(RuntimeError("agent crashed")) + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.description == "agent crashed" + assert spans[0].attributes.get("error.type") == "RuntimeError" + + # ---- context managers ---- + + def test_invoke_local_agent_context_manager(self): + with self.handler.invoke_local_agent( + "openai", request_model="gpt-4" + ) as inv: + inv.agent_name = "CM Agent" + inv.input_tokens = 10 + inv.output_tokens = 20 + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "invoke_agent CM Agent" + assert spans[0].kind == SpanKind.INTERNAL + + def test_invoke_remote_agent_context_manager(self): + with self.handler.invoke_remote_agent( + "openai", + request_model="gpt-4", + server_address="api.openai.com", + server_port=443, + ) as inv: + inv.agent_name = "Remote CM" + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].kind == SpanKind.CLIENT + + def test_context_manager_error(self): + with self.assertRaises(ValueError): + with self.handler.invoke_local_agent("openai") as inv: + inv.agent_name = "Error Agent" + raise ValueError("test error") + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes.get("error.type") == "ValueError" + + def test_custom_attributes(self): + inv = self.handler.start_invoke_local_agent("openai") + inv.attributes["custom.key"] = "custom_value" + inv.stop() + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert attrs["custom.key"] == "custom_value" + + def test_span_name_without_agent_name(self): + inv = self.handler.start_invoke_local_agent("openai") + inv.stop() + assert ( + self.span_exporter.get_finished_spans()[0].name == "invoke_agent" + ) + + +# ============================================================================ +# Agent Metrics tests +# ============================================================================ + + +class TestAgentMetrics(unittest.TestCase): + def setUp(self): + self.span_exporter = InMemorySpanExporter() + self.metric_reader = InMemoryMetricReader() + self.tracer_provider = TracerProvider() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) + self.handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + def _harvest(self) -> dict[str, list[Any]]: + return _harvest_metrics(self.meter_provider, self.metric_reader) + + def test_invoke_agent_records_duration(self): + with patch( + "opentelemetry.util.genai._invocation.timeit.default_timer", + return_value=1000.0, + ): + inv = self.handler.start_invoke_local_agent( + "openai", request_model="gpt-4" + ) + inv.agent_name = "Metrics Agent" + inv.input_tokens = 50 + inv.output_tokens = 100 + + with patch( + "opentelemetry.util.genai.metrics.timeit.default_timer", + return_value=1003.0, + ): + inv.stop() + + metrics = self._harvest() + assert "gen_ai.client.operation.duration" in metrics + duration_points = metrics["gen_ai.client.operation.duration"] + assert len(duration_points) == 1 + self.assertAlmostEqual(duration_points[0].sum, 3.0, places=3) + assert ( + duration_points[0].attributes[GenAI.GEN_AI_OPERATION_NAME] + == "invoke_agent" + ) + + def test_invoke_agent_records_token_usage(self): + inv = self.handler.start_invoke_local_agent( + "openai", request_model="gpt-4" + ) + inv.input_tokens = 50 + inv.output_tokens = 100 + inv.stop() + + metrics = self._harvest() + assert "gen_ai.client.token.usage" in metrics + token_points = metrics["gen_ai.client.token.usage"] + assert len(token_points) == 2 + token_map = { + p.attributes[GenAI.GEN_AI_TOKEN_TYPE]: p.sum for p in token_points + } + assert token_map["input"] == 50 + assert token_map["output"] == 100 + + def test_create_agent_records_duration(self): + with patch( + "opentelemetry.util.genai._invocation.timeit.default_timer", + return_value=2000.0, + ): + creation = self.handler.start_create_agent( + "openai", request_model="gpt-4" + ) + creation.agent_name = "Created" + + with patch( + "opentelemetry.util.genai.metrics.timeit.default_timer", + return_value=2005.0, + ): + creation.stop() + + metrics = self._harvest() + assert "gen_ai.client.operation.duration" in metrics + duration_points = metrics["gen_ai.client.operation.duration"] + assert len(duration_points) == 1 + self.assertAlmostEqual(duration_points[0].sum, 5.0, places=3) + assert ( + duration_points[0].attributes[GenAI.GEN_AI_OPERATION_NAME] + == "create_agent" + ) + + def test_create_agent_no_token_metrics(self): + creation = self.handler.start_create_agent("openai") + creation.stop() + + metrics = self._harvest() + assert "gen_ai.client.token.usage" not in metrics + + +# ============================================================================ +# Agent Events tests +# ============================================================================ + + +class TestAgentEvents(unittest.TestCase): + def setUp(self): + self.span_exporter = InMemorySpanExporter() + self.handler = _make_handler(self.span_exporter) + + @patch( + "opentelemetry.util.genai._agent_invocation.should_emit_event", + return_value=True, + ) + @patch( + "opentelemetry.util.genai._agent_invocation.is_experimental_mode", + return_value=True, + ) + def test_invoke_agent_emits_event(self, _mock_exp, _mock_emit): + inv = self.handler.start_invoke_local_agent( + "openai", request_model="gpt-4" + ) + inv.agent_name = "Event Agent" + inv.stop() + + # The logger.emit was called — verify via span attributes at minimum + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes[GenAI.GEN_AI_AGENT_NAME] == "Event Agent" + + @patch( + "opentelemetry.util.genai._agent_creation.should_emit_event", + return_value=True, + ) + @patch( + "opentelemetry.util.genai._agent_creation.get_content_capturing_mode", + return_value=ContentCapturingMode.NO_CONTENT, + ) + @patch( + "opentelemetry.util.genai._agent_creation.is_experimental_mode", + return_value=True, + ) + def test_create_agent_emits_event( + self, _mock_exp, _mock_capture, _mock_emit + ): + creation = self.handler.start_create_agent("openai") + creation.agent_name = "Event Creation" + creation.stop() + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes[GenAI.GEN_AI_AGENT_NAME] == "Event Creation"