diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index a24aca39aa..a6c1536c0d 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add `AgentInvocation` type with `invoke_agent` span lifecycle + ([#4274](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4274)) - Add metrics support for EmbeddingInvocation ([#4377](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4377)) - Add support for workflow in genAI utils handler. diff --git a/util/opentelemetry-util-genai/pyproject.toml b/util/opentelemetry-util-genai/pyproject.toml index 3e95fb3039..3968c684dd 100644 --- a/util/opentelemetry-util-genai/pyproject.toml +++ b/util/opentelemetry-util-genai/pyproject.toml @@ -27,7 +27,7 @@ classifiers = [ dependencies = [ "opentelemetry-instrumentation ~= 0.60b0", "opentelemetry-semantic-conventions ~= 0.60b0", - "opentelemetry-api>=1.39", + "opentelemetry-api ~= 1.39", ] [project.entry-points.opentelemetry_genai_completion_hook] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_invocation.py new file mode 100644 index 0000000000..e01ed1e40a --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_invocation.py @@ -0,0 +1,222 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Any + +from opentelemetry._logs import Logger +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.attributes import server_attributes +from opentelemetry.trace import SpanKind, Tracer +from opentelemetry.util.genai._invocation import ( + Error, + GenAIInvocation, + get_content_attributes, +) +from opentelemetry.util.genai.metrics import InvocationMetricsRecorder +from opentelemetry.util.genai.types import ( + InputMessage, + MessagePart, + OutputMessage, + ToolDefinition, +) + +# TODO: Migrate to GenAI constants once available in semconv package +_GEN_AI_AGENT_VERSION = "gen_ai.agent.version" +_GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS = ( + "gen_ai.usage.cache_creation.input_tokens" +) +_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read.input_tokens" + + +class AgentInvocation(GenAIInvocation): + """Represents a single agent invocation (invoke_agent span). + + Use handler.start_invoke_local_agent() / handler.start_invoke_remote_agent() + or the handler.invoke_local_agent() / handler.invoke_remote_agent() context + managers rather than constructing this directly. + + Reference: + Client span: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#invoke-agent-client-span + Internal span: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#invoke-agent-internal-span + """ + + def __init__( + self, + tracer: Tracer, + metrics_recorder: InvocationMetricsRecorder, + logger: Logger, + provider: str, + *, + span_kind: SpanKind = SpanKind.INTERNAL, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + attributes: dict[str, Any] | None = None, + metric_attributes: dict[str, Any] | None = None, + ) -> None: + """Use handler.start_invoke_local_agent() or handler.start_invoke_remote_agent() instead of calling this directly.""" + _operation_name = GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + super().__init__( + tracer, + metrics_recorder, + logger, + operation_name=_operation_name, + span_name=_operation_name, + span_kind=span_kind, + attributes=attributes, + metric_attributes=metric_attributes, + ) + self.provider = provider + self.request_model = request_model + self.server_address = server_address + self.server_port = server_port + + self.agent_name: str | None = None + self.agent_id: str | None = None + self.agent_description: str | None = None + self.agent_version: str | None = None + + self.conversation_id: str | None = None + self.data_source_id: str | None = None + self.output_type: str | None = None + + self.temperature: float | None = None + self.top_p: float | None = None + self.frequency_penalty: float | None = None + self.presence_penalty: float | None = None + self.max_tokens: int | None = None + self.stop_sequences: list[str] | None = None + self.seed: int | None = None + self.choice_count: int | None = None + + self.finish_reasons: list[str] | None = None + + self.input_tokens: int | None = None + self.output_tokens: int | None = None + self.cache_creation_input_tokens: int | None = None + self.cache_read_input_tokens: int | None = None + + self.input_messages: list[InputMessage] = [] + self.output_messages: list[OutputMessage] = [] + self.system_instruction: list[MessagePart] = [] + self.tool_definitions: list[ToolDefinition] | None = None + + self._start() + + def _get_common_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_REQUEST_MODEL, self.request_model), + (server_attributes.SERVER_ADDRESS, self.server_address), + (server_attributes.SERVER_PORT, self.server_port), + (GenAI.GEN_AI_AGENT_NAME, self.agent_name), + (GenAI.GEN_AI_AGENT_ID, self.agent_id), + (GenAI.GEN_AI_AGENT_DESCRIPTION, self.agent_description), + (_GEN_AI_AGENT_VERSION, self.agent_version), + ) + return { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name, + GenAI.GEN_AI_PROVIDER_NAME: self.provider, + **{k: v for k, v in optional_attrs if v is not None}, + } + + def _get_request_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_CONVERSATION_ID, self.conversation_id), + (GenAI.GEN_AI_DATA_SOURCE_ID, self.data_source_id), + (GenAI.GEN_AI_OUTPUT_TYPE, self.output_type), + (GenAI.GEN_AI_REQUEST_TEMPERATURE, self.temperature), + (GenAI.GEN_AI_REQUEST_TOP_P, self.top_p), + (GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, self.frequency_penalty), + (GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, self.presence_penalty), + (GenAI.GEN_AI_REQUEST_MAX_TOKENS, self.max_tokens), + (GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, self.stop_sequences), + (GenAI.GEN_AI_REQUEST_SEED, self.seed), + (GenAI.GEN_AI_REQUEST_CHOICE_COUNT, self.choice_count), + ) + return {k: v for k, v in optional_attrs if v is not None} + + def _get_response_attributes(self) -> dict[str, Any]: + if self.finish_reasons: + return {GenAI.GEN_AI_RESPONSE_FINISH_REASONS: self.finish_reasons} + return {} + + def _get_usage_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_USAGE_INPUT_TOKENS, self.input_tokens), + (GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, self.output_tokens), + ( + _GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS, + self.cache_creation_input_tokens, + ), + ( + _GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS, + self.cache_read_input_tokens, + ), + ) + return {k: v for k, v in optional_attrs if v is not None} + + def _get_content_attributes_for_span(self) -> dict[str, Any]: + return get_content_attributes( + input_messages=self.input_messages, + output_messages=self.output_messages, + system_instruction=self.system_instruction, + tool_definitions=self.tool_definitions, + for_span=True, + ) + + def _get_metric_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_PROVIDER_NAME, self.provider), + (GenAI.GEN_AI_REQUEST_MODEL, self.request_model), + (server_attributes.SERVER_ADDRESS, self.server_address), + (server_attributes.SERVER_PORT, self.server_port), + ) + attrs: dict[str, Any] = { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name, + **{k: v for k, v in optional_attrs if v is not None}, + } + attrs.update(self.metric_attributes) + return attrs + + def _get_metric_token_counts(self) -> dict[str, int]: + counts: dict[str, int] = {} + if self.input_tokens is not None: + counts[GenAI.GenAiTokenTypeValues.INPUT.value] = self.input_tokens + if self.output_tokens is not None: + counts[GenAI.GenAiTokenTypeValues.OUTPUT.value] = ( + self.output_tokens + ) + return counts + + def _apply_finish(self, error: Error | None = None) -> None: + if error is not None: + self._apply_error_attributes(error) + + # Update span name if agent_name was set after construction + if self.agent_name: + self.span.update_name(f"{self._operation_name} {self.agent_name}") + + attributes: dict[str, Any] = {} + attributes.update(self._get_common_attributes()) + attributes.update(self._get_request_attributes()) + attributes.update(self._get_response_attributes()) + attributes.update(self._get_usage_attributes()) + attributes.update(self._get_content_attributes_for_span()) + attributes.update(self.attributes) + self.span.set_attributes(attributes) + self._metrics_recorder.record(self) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py index a1bd55811c..f0351eaa51 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py @@ -14,7 +14,7 @@ from __future__ import annotations -from dataclasses import asdict, dataclass, field +from dataclasses import dataclass, field from typing import Any from opentelemetry._logs import Logger, LogRecord @@ -23,21 +23,29 @@ ) from opentelemetry.semconv.attributes import server_attributes from opentelemetry.trace import INVALID_SPAN, Span, SpanKind, Tracer -from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai._invocation import ( + Error, + GenAIInvocation, + get_content_attributes, +) from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.types import ( InputMessage, MessagePart, OutputMessage, + ToolDefinition, ) from opentelemetry.util.genai.utils import ( - ContentCapturingMode, - gen_ai_json_dumps, - get_content_capturing_mode, is_experimental_mode, should_emit_event, ) +# TODO: Migrate to GenAI constants once available in semconv package +_GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS = ( + "gen_ai.usage.cache_creation.input_tokens" +) +_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read.input_tokens" + class InferenceInvocation(GenAIInvocation): """Represents a single LLM chat/completion call. @@ -113,53 +121,19 @@ def __init__( # pylint: disable=too-many-locals self.seed = seed self.server_address = server_address self.server_port = server_port + self.cache_creation_input_tokens: int | None = None + self.cache_read_input_tokens: int | None = None + self.tool_definitions: list[ToolDefinition] | None = None self._start() def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]: - if not is_experimental_mode(): - return {} - mode = get_content_capturing_mode() - allowed_modes = ( - ( - ContentCapturingMode.SPAN_ONLY, - ContentCapturingMode.SPAN_AND_EVENT, - ) - if for_span - else ( - ContentCapturingMode.EVENT_ONLY, - ContentCapturingMode.SPAN_AND_EVENT, - ) - ) - if mode not in allowed_modes: - return {} - - def serialize(items: list[Any]) -> Any: - dicts = [asdict(item) for item in items] - return gen_ai_json_dumps(dicts) if for_span else dicts - - optional_attrs = ( - ( - GenAI.GEN_AI_INPUT_MESSAGES, - serialize(self.input_messages) - if self.input_messages - else None, - ), - ( - GenAI.GEN_AI_OUTPUT_MESSAGES, - serialize(self.output_messages) - if self.output_messages - else None, - ), - ( - GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, - serialize(self.system_instruction) - if self.system_instruction - else None, - ), + return get_content_attributes( + input_messages=self.input_messages, + output_messages=self.output_messages, + system_instruction=self.system_instruction, + tool_definitions=self.tool_definitions, + for_span=for_span, ) - return { - key: value for key, value in optional_attrs if value is not None - } def _get_finish_reasons(self) -> list[str] | None: if self.finish_reasons is not None: @@ -200,6 +174,14 @@ def _get_attributes(self) -> dict[str, Any]: (GenAI.GEN_AI_RESPONSE_ID, self.response_id), (GenAI.GEN_AI_USAGE_INPUT_TOKENS, self.input_tokens), (GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, self.output_tokens), + ( + _GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS, + self.cache_creation_input_tokens, + ), + ( + _GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS, + self.cache_read_input_tokens, + ), ) attrs.update({k: v for k, v in optional_attrs if v is not None}) return attrs diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py index b63d9a32b4..a9f3711c5c 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py @@ -18,17 +18,33 @@ from abc import ABC, abstractmethod from contextlib import contextmanager from contextvars import Token -from typing import TYPE_CHECKING, Any, Iterator +from dataclasses import asdict +from typing import TYPE_CHECKING, Any, Iterator, Sequence from typing_extensions import Self, TypeAlias from opentelemetry._logs import Logger from opentelemetry.context import Context, attach, detach +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) from opentelemetry.semconv.attributes import error_attributes from opentelemetry.trace import INVALID_SPAN as _INVALID_SPAN from opentelemetry.trace import Span, SpanKind, Tracer, set_span_in_context from opentelemetry.trace.status import Status, StatusCode -from opentelemetry.util.genai.types import Error +from opentelemetry.util.genai.types import ( + Error, + InputMessage, + MessagePart, + OutputMessage, + ToolDefinition, +) +from opentelemetry.util.genai.utils import ( + ContentCapturingMode, + gen_ai_json_dumps, + get_content_capturing_mode, + is_experimental_mode, +) if TYPE_CHECKING: from opentelemetry.util.genai.metrics import InvocationMetricsRecorder @@ -138,3 +154,64 @@ def _managed(self) -> Iterator[Self]: self.fail(exc) raise self.stop() + + +def get_content_attributes( + *, + input_messages: Sequence[InputMessage], + output_messages: Sequence[OutputMessage], + system_instruction: Sequence[MessagePart], + tool_definitions: Sequence[ToolDefinition] | None, + for_span: bool, +) -> dict[str, Any]: + """Serialize messages, system instructions, and tool definitions into attributes. + + Args: + input_messages: Input messages to serialize. + output_messages: Output messages to serialize. + system_instruction: System instructions to serialize. + tool_definitions: Tool definitions to serialize (may be None). + for_span: If True, serialize for span attributes (JSON string); + if False, serialize for event attributes (list of dicts). + """ + if not is_experimental_mode(): + return {} + + mode = get_content_capturing_mode() + allowed_modes = ( + ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + if for_span + else ( + ContentCapturingMode.EVENT_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + ) + if mode not in allowed_modes: + return {} + + def serialize(items: Sequence[Any]) -> Any: + dicts = [asdict(item) for item in items] + return gen_ai_json_dumps(dicts) if for_span else dicts + + optional_attrs = ( + ( + GenAI.GEN_AI_INPUT_MESSAGES, + serialize(input_messages) if input_messages else None, + ), + ( + GenAI.GEN_AI_OUTPUT_MESSAGES, + serialize(output_messages) if output_messages else None, + ), + ( + GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, + serialize(system_instruction) if system_instruction else None, + ), + ( + GenAI.GEN_AI_TOOL_DEFINITIONS, + serialize(tool_definitions) if tool_definitions else None, + ), + ) + return {key: value for key, value in optional_attrs if value is not None} diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py index f2b9254f49..5f43637e67 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py @@ -52,9 +52,8 @@ gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS + "_ref" ) -GEN_AI_TOOL_DEFINITIONS = getattr( - gen_ai_attributes, "GEN_AI_TOOL_DEFINITIONS", "gen_ai.tool.definitions" -) +# TODO: Migrate to gen_ai_attributes constant once available in semconv package +GEN_AI_TOOL_DEFINITIONS = "gen_ai.tool.definitions" GEN_AI_TOOL_DEFINITIONS_REF: Final = GEN_AI_TOOL_DEFINITIONS + "_ref" _MESSAGE_INDEX_KEY = "index" diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 9ef4a5592d..393b3140f5 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -57,9 +57,11 @@ from opentelemetry.metrics import MeterProvider, get_meter from opentelemetry.semconv.schemas import Schemas from opentelemetry.trace import ( + SpanKind, TracerProvider, get_tracer, ) +from opentelemetry.util.genai._agent_invocation import AgentInvocation from opentelemetry.util.genai._inference_invocation import ( LLMInvocation, ) @@ -299,6 +301,100 @@ def tool( tool_description=tool_description, )._managed() + def start_invoke_local_agent( + self, + provider: str, + *, + request_model: str | None = None, + ) -> AgentInvocation: + """Create and start a local agent invocation (INTERNAL span kind). + + Use for agents running within the same process (e.g. LangChain, CrewAI). + + Set remaining attributes (agent_name, etc.) on the returned invocation, + then call invocation.stop() or invocation.fail(). + """ + return AgentInvocation( + self._tracer, + self._metrics_recorder, + self._logger, + provider, + span_kind=SpanKind.INTERNAL, + request_model=request_model, + ) + + def start_invoke_remote_agent( + self, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + ) -> AgentInvocation: + """Create and start a remote agent invocation (CLIENT span kind). + + Use for agents invoked over a remote service (e.g. OpenAI Assistants, AWS Bedrock). + + Set remaining attributes (agent_name, etc.) on the returned invocation, + then call invocation.stop() or invocation.fail(). + """ + return AgentInvocation( + self._tracer, + self._metrics_recorder, + self._logger, + provider, + span_kind=SpanKind.CLIENT, + request_model=request_model, + server_address=server_address, + server_port=server_port, + ) + + def invoke_local_agent( + self, + provider: str, + *, + request_model: str | None = None, + ) -> AbstractContextManager[AgentInvocation]: + """Context manager for local agent invocations (INTERNAL span kind). + + Use for agents running within the same process (e.g. LangChain, CrewAI). + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + return self.start_invoke_local_agent( + provider, + request_model=request_model, + )._managed() + + def invoke_remote_agent( + self, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + ) -> AbstractContextManager[AgentInvocation]: + """Context manager for remote agent invocations (CLIENT span kind). + + Use for agents invoked over a remote service (e.g. OpenAI Assistants, AWS Bedrock). + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + return self.start_invoke_remote_agent( + provider, + request_model=request_model, + server_address=server_address, + server_port=server_port, + )._managed() + def workflow( self, name: str | None = None, diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py index 4ac6426ce1..174d14bdc3 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py @@ -26,6 +26,7 @@ ) """ +from opentelemetry.util.genai._agent_invocation import AgentInvocation from opentelemetry.util.genai._embedding_invocation import EmbeddingInvocation from opentelemetry.util.genai._inference_invocation import InferenceInvocation from opentelemetry.util.genai._invocation import ( @@ -37,6 +38,7 @@ from opentelemetry.util.genai._workflow_invocation import WorkflowInvocation __all__ = [ + "AgentInvocation", "ContextToken", "Error", "GenAIInvocation", diff --git a/util/opentelemetry-util-genai/tests/test_handler_agent.py b/util/opentelemetry-util-genai/tests/test_handler_agent.py new file mode 100644 index 0000000000..9d611234d2 --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_handler_agent.py @@ -0,0 +1,533 @@ +from __future__ import annotations + +import unittest +from unittest.mock import patch + +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.attributes import server_attributes +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import INVALID_SPAN, SpanKind +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + ContentCapturingMode, + Error, + FunctionToolDefinition, + InputMessage, + OutputMessage, + Text, +) + + +class TestLocalAgentInvocation(unittest.TestCase): + def setUp(self): + self.span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + self.handler = TelemetryHandler(tracer_provider=tracer_provider) + + def test_start_stop_creates_span(self): + invocation = self.handler.start_invoke_local_agent( + "openai", + request_model="gpt-4", + ) + invocation.agent_name = "Math Tutor" + invocation.stop() + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.name == "invoke_agent Math Tutor" + assert span.attributes[GenAI.GEN_AI_OPERATION_NAME] == "invoke_agent" + assert span.attributes[GenAI.GEN_AI_AGENT_NAME] == "Math Tutor" + assert span.attributes[GenAI.GEN_AI_PROVIDER_NAME] == "openai" + assert span.attributes[GenAI.GEN_AI_REQUEST_MODEL] == "gpt-4" + + def test_span_kind_internal(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.stop() + assert ( + self.span_exporter.get_finished_spans()[0].kind + == SpanKind.INTERNAL + ) + + def test_no_server_attributes(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.stop() + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert server_attributes.SERVER_ADDRESS not in attrs + assert server_attributes.SERVER_PORT not in attrs + + def test_all_attributes(self): + invocation = self.handler.start_invoke_local_agent( + "openai", + request_model="gpt-4", + ) + invocation.agent_name = "Full Agent" + invocation.agent_id = "agent-123" + invocation.agent_description = "A test agent" + invocation.agent_version = "1.0.0" + invocation.conversation_id = "conv-456" + invocation.data_source_id = "ds-789" + invocation.output_type = "text" + invocation.temperature = 0.7 + invocation.top_p = 0.9 + invocation.frequency_penalty = 0.5 + invocation.presence_penalty = 0.3 + invocation.max_tokens = 1000 + invocation.stop_sequences = ["END", "STOP"] + invocation.seed = 42 + invocation.choice_count = 3 + invocation.finish_reasons = ["stop"] + invocation.input_tokens = 100 + invocation.output_tokens = 200 + invocation.stop() + + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert attrs[GenAI.GEN_AI_AGENT_NAME] == "Full Agent" + assert attrs[GenAI.GEN_AI_AGENT_ID] == "agent-123" + assert attrs[GenAI.GEN_AI_AGENT_DESCRIPTION] == "A test agent" + assert attrs[GenAI.GEN_AI_AGENT_VERSION] == "1.0.0" + assert attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS] == 100 + assert attrs[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] == 200 + assert attrs[GenAI.GEN_AI_CONVERSATION_ID] == "conv-456" + assert attrs[GenAI.GEN_AI_DATA_SOURCE_ID] == "ds-789" + assert attrs[GenAI.GEN_AI_OUTPUT_TYPE] == "text" + assert attrs[GenAI.GEN_AI_REQUEST_TEMPERATURE] == 0.7 + assert attrs[GenAI.GEN_AI_REQUEST_TOP_P] == 0.9 + assert attrs[GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY] == 0.5 + assert attrs[GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY] == 0.3 + assert attrs[GenAI.GEN_AI_REQUEST_MAX_TOKENS] == 1000 + assert attrs[GenAI.GEN_AI_REQUEST_STOP_SEQUENCES] == ("END", "STOP") + assert attrs[GenAI.GEN_AI_REQUEST_SEED] == 42 + assert attrs[GenAI.GEN_AI_REQUEST_CHOICE_COUNT] == 3 + assert attrs[GenAI.GEN_AI_RESPONSE_FINISH_REASONS] == ("stop",) + + def test_finish_reasons_multiple(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.finish_reasons = ["stop", "length"] + invocation.stop() + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert attrs[GenAI.GEN_AI_RESPONSE_FINISH_REASONS] == ( + "stop", + "length", + ) + + def test_finish_reasons_empty_list_omitted(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.finish_reasons = [] + invocation.stop() + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert GenAI.GEN_AI_RESPONSE_MODEL not in attrs + assert GenAI.GEN_AI_RESPONSE_FINISH_REASONS not in attrs + + def test_cache_token_attributes(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.input_tokens = 100 + invocation.cache_creation_input_tokens = 25 + invocation.cache_read_input_tokens = 50 + invocation.stop() + + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS] == 100 + assert attrs[GenAI.GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS] == 25 + assert attrs[GenAI.GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS] == 50 + + def test_fail_sets_error_status(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.fail(RuntimeError("agent crashed")) + + span = self.span_exporter.get_finished_spans()[0] + assert span.status.description == "agent crashed" + assert span.attributes.get("error.type") == "RuntimeError" + + def test_context_manager_success(self): + with self.handler.invoke_local_agent( + "openai", request_model="gpt-4" + ) as inv: + inv.agent_name = "CM Agent" + inv.input_tokens = 10 + inv.output_tokens = 20 + + assert ( + self.span_exporter.get_finished_spans()[0].name + == "invoke_agent CM Agent" + ) + + def test_context_manager_error(self): + with self.assertRaises(ValueError): + with self.handler.invoke_local_agent("openai"): + raise ValueError("test error") + + assert ( + self.span_exporter.get_finished_spans()[0].attributes.get( + "error.type" + ) + == "ValueError" + ) + + def test_context_manager_default_invocation(self): + with self.handler.invoke_local_agent("openai") as inv: + inv.agent_name = "Dynamic Agent" + assert len(self.span_exporter.get_finished_spans()) == 1 + + def test_default_values(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.stop() + assert invocation._operation_name == "invoke_agent" + assert invocation.agent_name is None + assert invocation.provider == "openai" + assert invocation.request_model is None + assert not invocation.input_messages + assert not invocation.output_messages + assert invocation.tool_definitions is None + assert invocation.cache_creation_input_tokens is None + assert invocation.cache_read_input_tokens is None + assert invocation.span is not INVALID_SPAN + assert not invocation.attributes + + def test_with_messages(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.input_messages = [ + InputMessage(role="user", parts=[Text(content="Hello")]) + ] + invocation.output_messages = [ + OutputMessage( + role="assistant", + parts=[Text(content="Hi there!")], + finish_reason="stop", + ) + ] + invocation.stop() + assert len(invocation.input_messages) == 1 + assert invocation.input_messages[0].role == "user" + + def test_custom_attributes(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.attributes["custom.key"] = "custom_value" + invocation.stop() + spans = self.span_exporter.get_finished_spans() + assert spans[0].attributes["custom.key"] == "custom_value" + + def test_tool_definitions_type(self): + tool = FunctionToolDefinition( + name="get_weather", + description="Get the weather", + parameters={"type": "object", "properties": {}}, + ) + invocation = self.handler.start_invoke_local_agent("openai") + invocation.tool_definitions = [tool] + invocation.stop() + assert len(invocation.tool_definitions) == 1 + assert invocation.tool_definitions[0].name == "get_weather" + assert invocation.tool_definitions[0].type == "function" + + def test_default_lists_are_independent(self): + inv1 = self.handler.start_invoke_local_agent("openai") + inv2 = self.handler.start_invoke_local_agent("openai") + inv1.input_messages.append(InputMessage(role="user", parts=[])) + assert len(inv2.input_messages) == 0 + inv2.stop() + inv1.stop() + + def test_default_attributes_are_independent(self): + inv1 = self.handler.start_invoke_local_agent("openai") + inv2 = self.handler.start_invoke_local_agent("openai") + inv1.attributes["foo"] = "bar" + assert "foo" not in inv2.attributes + inv2.stop() + inv1.stop() + + def test_agent_name_set_after_construction(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.agent_name = "Named Agent" + invocation.stop() + span = self.span_exporter.get_finished_spans()[0] + assert span.name == "invoke_agent Named Agent" + assert span.attributes[GenAI.GEN_AI_AGENT_NAME] == "Named Agent" + + +class TestAgentInvocationContent(unittest.TestCase): + def setUp(self): + self.span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + self.handler = TelemetryHandler(tracer_provider=tracer_provider) + + @patch( + "opentelemetry.util.genai._invocation.get_content_capturing_mode", + return_value=ContentCapturingMode.SPAN_AND_EVENT, + ) + @patch( + "opentelemetry.util.genai._invocation.is_experimental_mode", + return_value=True, + ) + def test_system_instruction_on_span(self, _mock_exp, _mock_cap): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.system_instruction = [ + Text(content="You are a helpful assistant."), + ] + invocation.stop() + + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert GenAI.GEN_AI_SYSTEM_INSTRUCTIONS in attrs + + @patch( + "opentelemetry.util.genai._invocation.get_content_capturing_mode", + return_value=ContentCapturingMode.SPAN_AND_EVENT, + ) + @patch( + "opentelemetry.util.genai._invocation.is_experimental_mode", + return_value=True, + ) + def test_tool_definitions_on_span(self, _mock_exp, _mock_cap): + tool = FunctionToolDefinition( + name="get_weather", + description="Get the weather", + parameters={"type": "object", "properties": {}}, + ) + invocation = self.handler.start_invoke_local_agent("openai") + invocation.tool_definitions = [tool] + invocation.stop() + + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert GenAI.GEN_AI_TOOL_DEFINITIONS in attrs + + @patch( + "opentelemetry.util.genai._invocation.get_content_capturing_mode", + return_value=ContentCapturingMode.SPAN_AND_EVENT, + ) + @patch( + "opentelemetry.util.genai._invocation.is_experimental_mode", + return_value=True, + ) + def test_messages_on_span(self, _mock_exp, _mock_cap): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.input_messages = [ + InputMessage(role="user", parts=[Text(content="Hello")]) + ] + invocation.output_messages = [ + OutputMessage( + role="assistant", + parts=[Text(content="Hi!")], + finish_reason="stop", + ) + ] + invocation.stop() + + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert GenAI.GEN_AI_INPUT_MESSAGES in attrs + assert GenAI.GEN_AI_OUTPUT_MESSAGES in attrs + + def test_content_not_on_span_by_default(self): + invocation = self.handler.start_invoke_local_agent("openai") + invocation.system_instruction = [ + Text(content="You are a helpful assistant."), + ] + invocation.input_messages = [ + InputMessage(role="user", parts=[Text(content="Hello")]) + ] + invocation.stop() + + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert GenAI.GEN_AI_SYSTEM_INSTRUCTIONS not in attrs + assert GenAI.GEN_AI_INPUT_MESSAGES not in attrs + + +class TestRemoteAgentInvocation(unittest.TestCase): + def setUp(self): + self.span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + self.handler = TelemetryHandler(tracer_provider=tracer_provider) + + def test_span_kind_client(self): + invocation = self.handler.start_invoke_remote_agent("openai") + invocation.stop() + assert ( + self.span_exporter.get_finished_spans()[0].kind == SpanKind.CLIENT + ) + + def test_server_attributes(self): + invocation = self.handler.start_invoke_remote_agent( + "openai", + server_address="api.openai.com", + server_port=443, + ) + invocation.stop() + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert attrs[server_attributes.SERVER_ADDRESS] == "api.openai.com" + assert attrs[server_attributes.SERVER_PORT] == 443 + + def test_all_attributes(self): + invocation = self.handler.start_invoke_remote_agent( + "openai", + request_model="gpt-4", + server_address="api.openai.com", + server_port=443, + ) + invocation.agent_name = "Remote Agent" + invocation.agent_id = "agent-123" + invocation.agent_description = "A remote test agent" + invocation.agent_version = "1.0.0" + invocation.input_tokens = 100 + invocation.output_tokens = 200 + invocation.stop() + + attrs = self.span_exporter.get_finished_spans()[0].attributes + assert attrs[GenAI.GEN_AI_AGENT_NAME] == "Remote Agent" + assert attrs[GenAI.GEN_AI_AGENT_ID] == "agent-123" + assert attrs[GenAI.GEN_AI_AGENT_DESCRIPTION] == "A remote test agent" + assert attrs[GenAI.GEN_AI_AGENT_VERSION] == "1.0.0" + assert attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS] == 100 + assert attrs[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] == 200 + assert attrs[GenAI.GEN_AI_REQUEST_MODEL] == "gpt-4" + + def test_fail_sets_error_status(self): + invocation = self.handler.start_invoke_remote_agent("openai") + invocation.fail(RuntimeError("remote agent crashed")) + + span = self.span_exporter.get_finished_spans()[0] + assert span.status.description == "remote agent crashed" + assert span.attributes.get("error.type") == "RuntimeError" + + def test_context_manager_success(self): + with self.handler.invoke_remote_agent( + "openai", + request_model="gpt-4", + server_address="api.openai.com", + ) as inv: + inv.agent_name = "CM Remote Agent" + + span = self.span_exporter.get_finished_spans()[0] + assert span.name == "invoke_agent CM Remote Agent" + assert span.kind == SpanKind.CLIENT + + def test_context_manager_error(self): + with self.assertRaises(ValueError): + with self.handler.invoke_remote_agent("openai"): + raise ValueError("remote error") + + assert ( + self.span_exporter.get_finished_spans()[0].attributes.get( + "error.type" + ) + == "ValueError" + ) + + +class TestAgentInvocationMetrics(TestBase): + def test_local_agent_records_duration_and_tokens(self) -> None: + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + with patch("timeit.default_timer", return_value=1000.0): + invocation = handler.start_invoke_local_agent( + "prov", request_model="model" + ) + invocation.input_tokens = 5 + invocation.output_tokens = 7 + + with patch("timeit.default_timer", return_value=1002.0): + invocation.stop() + + metrics = self._harvest_metrics() + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_points = metrics["gen_ai.client.operation.duration"] + self.assertEqual(len(duration_points), 1) + duration_point = duration_points[0] + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_OPERATION_NAME], + GenAI.GenAiOperationNameValues.INVOKE_AGENT.value, + ) + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_REQUEST_MODEL], "model" + ) + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_PROVIDER_NAME], "prov" + ) + self.assertAlmostEqual(duration_point.sum, 2.0, places=3) + + self.assertIn("gen_ai.client.token.usage", metrics) + token_points = metrics["gen_ai.client.token.usage"] + token_by_type = { + point.attributes[GenAI.GEN_AI_TOKEN_TYPE]: point + for point in token_points + } + self.assertEqual(len(token_by_type), 2) + self.assertAlmostEqual( + token_by_type[GenAI.GenAiTokenTypeValues.INPUT.value].sum, + 5.0, + places=3, + ) + self.assertAlmostEqual( + token_by_type[GenAI.GenAiTokenTypeValues.OUTPUT.value].sum, + 7.0, + places=3, + ) + + def test_remote_agent_records_duration_with_server_attrs(self) -> None: + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + invocation = handler.start_invoke_remote_agent( + "prov", + request_model="model", + server_address="agent.example.com", + server_port=443, + ) + invocation.input_tokens = 10 + invocation.stop() + + metrics = self._harvest_metrics() + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_point = metrics["gen_ai.client.operation.duration"][0] + self.assertEqual( + duration_point.attributes["server.address"], "agent.example.com" + ) + self.assertEqual(duration_point.attributes["server.port"], 443) + + def test_fail_agent_records_error_metric(self) -> None: + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + with patch("timeit.default_timer", return_value=2000.0): + invocation = handler.start_invoke_local_agent( + "", request_model="err-model" + ) + invocation.input_tokens = 11 + + error = Error(message="boom", type=ValueError) + with patch("timeit.default_timer", return_value=2001.0): + invocation.fail(error) + + metrics = self._harvest_metrics() + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_point = metrics["gen_ai.client.operation.duration"][0] + self.assertEqual( + duration_point.attributes.get("error.type"), "ValueError" + ) + self.assertAlmostEqual(duration_point.sum, 1.0, places=3) + + def _harvest_metrics(self): + metrics = self.get_sorted_metrics() + metrics_by_name = {} + for metric in metrics or []: + points = metric.data.data_points or [] + metrics_by_name.setdefault(metric.name, []).extend(points) + return metrics_by_name