diff --git a/docs/nitpick-exceptions.ini b/docs/nitpick-exceptions.ini index 73febacaad..782cc1b952 100644 --- a/docs/nitpick-exceptions.ini +++ b/docs/nitpick-exceptions.ini @@ -47,6 +47,7 @@ py-class= fastapi.applications.FastAPI starlette.applications.Starlette _contextvars.Token + opentelemetry.util.genai.types._BaseAgent any= ; API diff --git a/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/utils-demo/README.rst b/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/utils-demo/README.rst new file mode 100644 index 0000000000..9ee1b5fd17 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/utils-demo/README.rst @@ -0,0 +1,83 @@ +OpenTelemetry GenAI Utils — invoke_agent Demo +================================================ + +This example shows how to combine ``VertexAIInstrumentor`` (automatic +instrumentation of Vertex AI SDK calls) with manual +``TelemetryHandler.agent()`` spans from ``opentelemetry-util-genai`` +to extend existing instrumentation with agent invocation lifecycle spans. + +The ``generate_content`` call is made inside the ``invoke_agent`` context, +so the auto-instrumented LLM span appears as a child of the ``invoke_agent`` +parent span — all within the same trace. + +Sample Trace +------------ + +:: + + Trace ID: 0xe71e16deb2ecd162e3f4fc67c240818b + | + +-- invoke_agent Currency Exchange Agent [4.16s, root span] + | gen_ai.operation.name: invoke_agent + | gen_ai.agent.name: Currency Exchange Agent + | gen_ai.agent.description: Currency exchange agent demo + | gen_ai.provider.name: gcp_vertex_ai + | gen_ai.request.model: gemini-2.5-flash + | gen_ai.response.finish_reasons: ["stop"] + | gen_ai.usage.input_tokens: 12 + | gen_ai.usage.output_tokens: 87 + | server.address: us-central1-aiplatform.googleapis.com + | scope: opentelemetry.util.genai.handler + | + +-- chat gemini-2.5-flash [4.12s, child span] + gen_ai.operation.name: chat + gen_ai.system: vertex_ai + gen_ai.request.model: gemini-2.5-flash + gen_ai.response.model: gemini-2.5-flash + gen_ai.response.finish_reasons: ["stop"] + gen_ai.usage.input_tokens: 12 + gen_ai.usage.output_tokens: 87 + server.address: us-central1-aiplatform.googleapis.com + server.port: 443 + scope: opentelemetry.instrumentation.vertexai + +Prerequisites +------------- + +- A GCP project with the **Vertex AI API** enabled. +- **Application Default Credentials** (ADC) configured: + ``gcloud auth application-default login`` + +Setup +----- + +An OTLP compatible endpoint should be listening for traces and logs on +http://localhost:4317. If not, update ``OTEL_EXPORTER_OTLP_ENDPOINT``. + +Set up a virtual environment: + +:: + + python3 -m venv .venv + source .venv/bin/activate + pip install -r requirements.txt + pip install -e ../../../../util/opentelemetry-util-genai/ + +Run +--- + +:: + + export GCP_PROJECT="your-project-id" + python main.py + +You should see an ``invoke_agent`` span wrapping a ``chat`` child span, +both printed to the console and exported to your OTLP endpoint. + +Environment Variables +--------------------- + +- ``GCP_PROJECT`` — GCP project ID (default: ``gcp-o11yinframon-nprd-81065``). +- ``GCP_LOCATION`` — GCP region (default: ``us-central1``). +- ``OTEL_EXPORTER_OTLP_ENDPOINT`` — OTLP endpoint + (default: ``http://localhost:4317``). diff --git a/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/utils-demo/main.py b/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/utils-demo/main.py new file mode 100644 index 0000000000..8751b91551 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/utils-demo/main.py @@ -0,0 +1,121 @@ +# pylint: skip-file +""" +invoke_agent instrumentation demo. + +Combines automatic instrumentation (``VertexAIInstrumentor``) with +manual ``TelemetryHandler.agent()`` spans from ``opentelemetry-util-genai`` +to show how genai-utils extends the existing Vertex AI instrumentation +with agent invocation lifecycle spans. + +The ``generate_content`` call is made inside the ``invoke_agent`` context, +so the auto-instrumented LLM span appears as a child of the ``invoke_agent`` +parent span. + +Set environment variables before running: + + export GCP_PROJECT="your-project-id" + python main.py +""" + +import os + +import vertexai +from vertexai.generative_models import GenerativeModel + +# NOTE: OpenTelemetry Python Logs and Events APIs are in beta +from opentelemetry import _logs, metrics, trace +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.instrumentation.vertexai import VertexAIInstrumentor +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + ConsoleSpanExporter, +) +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import AgentInvocation + +OTLP_ENDPOINT = os.environ.get( + "OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317" +) +GCP_PROJECT = os.environ.get("GCP_PROJECT", "gcp-o11yinframon-nprd-81065") +GCP_LOCATION = os.environ.get("GCP_LOCATION", "us-central1") +MODEL = "gemini-2.5-flash" +AGENT_NAME = "Currency Exchange Agent" + +resource = Resource.create({"service.name": "invoke-agent-demo"}) + +# configure tracing +tracer_provider = TracerProvider(resource=resource) +tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) +tracer_provider.add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(endpoint=OTLP_ENDPOINT, insecure=True)) +) +trace.set_tracer_provider(tracer_provider) + +# configure metrics +metric_reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint=OTLP_ENDPOINT, insecure=True) +) +meter_provider = MeterProvider( + resource=resource, metric_readers=[metric_reader] +) +metrics.set_meter_provider(meter_provider) + +# configure logging and events +logger_provider = LoggerProvider(resource=resource) +logger_provider.add_log_record_processor( + BatchLogRecordProcessor( + OTLPLogExporter(endpoint=OTLP_ENDPOINT, insecure=True) + ) +) +_logs.set_logger_provider(logger_provider) + +# Auto-instrument Vertex AI SDK calls (generate_content, etc.) +VertexAIInstrumentor().instrument() + + +def main(): + vertexai.init(project=GCP_PROJECT, location=GCP_LOCATION) + model = GenerativeModel(MODEL) + handler = TelemetryHandler() + + # ----- invoke_agent span wrapping an LLM call ----- + # The generate_content call is auto-instrumented by VertexAIInstrumentor + # and appears as a child span under the invoke_agent parent span. + print("[invoke_agent] Starting agent invocation...") + with handler.agent( + AgentInvocation( + agent_name=AGENT_NAME, + provider="gcp_vertex_ai", + request_model=MODEL, + agent_description="Currency exchange agent demo", + server_address=f"{GCP_LOCATION}-aiplatform.googleapis.com", + ) + ) as invocation: + response = model.generate_content( + "What is the exchange rate from US dollars to SEK today?" + ) + # Populate response attributes on the invocation + usage = response.usage_metadata + invocation.input_tokens = usage.prompt_token_count + invocation.output_tokens = usage.candidates_token_count + invocation.finish_reasons = ["stop"] + + print(f"[invoke_agent] Response:\n{response.text}\n") + + +if __name__ == "__main__": + main() diff --git a/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/utils-demo/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/utils-demo/requirements.txt new file mode 100644 index 0000000000..724f8c719b --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/utils-demo/requirements.txt @@ -0,0 +1,9 @@ +google-cloud-aiplatform[agent_engines,langchain]>=1.64 +google-genai>=1.0 +langchain-google-vertexai +requests + +opentelemetry-sdk>=1.40 +opentelemetry-exporter-otlp-proto-grpc>=1.40 +opentelemetry-instrumentation-vertexai>=2.0b0 +opentelemetry-util-genai>=0.4b0.dev0 diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 1b0a444b95..d2b4fce24f 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add `AgentInvocation` type and agent invocation lifecycle support + ([#4274](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4274)) - Populate schema_url on metrics ([#4320](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4320)) - Add workflow invocation type to genAI utils diff --git a/util/opentelemetry-util-genai/pyproject.toml b/util/opentelemetry-util-genai/pyproject.toml index c9d4d388c1..bac4191eee 100644 --- a/util/opentelemetry-util-genai/pyproject.toml +++ b/util/opentelemetry-util-genai/pyproject.toml @@ -26,9 +26,9 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] dependencies = [ - "opentelemetry-instrumentation ~= 0.58b0", - "opentelemetry-semantic-conventions ~= 0.58b0", - "opentelemetry-api>=1.31.0", + "opentelemetry-instrumentation ~= 0.61b0", + "opentelemetry-semantic-conventions ~= 0.61b0", + "opentelemetry-api>=1.40.0", ] [project.entry-points.opentelemetry_genai_completion_hook] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 4e85799ea1..c0dc040a6b 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -60,9 +60,10 @@ from __future__ import annotations +import logging import timeit from contextlib import contextmanager -from typing import Iterator +from typing import Callable, Iterator, TypeVar from opentelemetry import context as otel_context from opentelemetry._logs import ( @@ -80,13 +81,58 @@ ) from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.span_utils import ( + _apply_agent_finish_attributes, _apply_error_attributes, _apply_llm_finish_attributes, + _get_base_agent_common_attributes, + _get_base_agent_span_name, _maybe_emit_llm_event, ) -from opentelemetry.util.genai.types import Error, LLMInvocation +from opentelemetry.util.genai.types import ( + AgentInvocation, + Error, + GenAIInvocation, + LLMInvocation, +) from opentelemetry.util.genai.version import __version__ +_logger = logging.getLogger(__name__) + +_T = TypeVar("_T", bound=GenAIInvocation) + + +@contextmanager +def _lifecycle_context( + invocation: _T, + start: Callable[[_T], _T], + stop: Callable[[_T], _T], + fail: Callable[[_T, Error], _T], + label: str, +) -> Iterator[_T]: + """Shared lifecycle context manager for GenAI invocations. + + Wraps start/stop/fail calls with error handling so SDK-internal + errors never propagate to the caller. + """ + try: + start(invocation) + except Exception: # pylint: disable=broad-exception-caught + _logger.warning("Failed to start %s span", label, exc_info=True) + try: + yield invocation + except Exception as exc: + try: + fail(invocation, Error(message=str(exc), type=type(exc))) + except Exception: # pylint: disable=broad-exception-caught + _logger.warning( + "Failed to record %s failure", label, exc_info=True + ) + raise + try: + stop(invocation) + except Exception: # pylint: disable=broad-exception-caught + _logger.warning("Failed to stop %s span", label, exc_info=True) + class TelemetryHandler: """ @@ -159,13 +205,13 @@ def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disab # TODO: Provide feedback that this invocation was not started return invocation - span = invocation.span - _apply_llm_finish_attributes(span, invocation) - self._record_llm_metrics(invocation, span) - _maybe_emit_llm_event(self._logger, span, invocation) - # Detach context and end span - otel_context.detach(invocation.context_token) - span.end() + try: + _apply_llm_finish_attributes(invocation.span, invocation) + self._record_llm_metrics(invocation, invocation.span) + _maybe_emit_llm_event(self._logger, invocation.span, invocation) + finally: + otel_context.detach(invocation.context_token) + invocation.span.end() return invocation def fail_llm( # pylint: disable=no-self-use @@ -176,15 +222,19 @@ def fail_llm( # pylint: disable=no-self-use # TODO: Provide feedback that this invocation was not started return invocation - span = invocation.span - _apply_llm_finish_attributes(invocation.span, invocation) - _apply_error_attributes(invocation.span, error) - error_type = getattr(error.type, "__qualname__", None) - self._record_llm_metrics(invocation, span, error_type=error_type) - _maybe_emit_llm_event(self._logger, span, invocation, error) - # Detach context and end span - otel_context.detach(invocation.context_token) - span.end() + try: + _apply_llm_finish_attributes(invocation.span, invocation) + _apply_error_attributes(invocation.span, error) + error_type = getattr(error.type, "__qualname__", None) + self._record_llm_metrics( + invocation, invocation.span, error_type=error_type + ) + _maybe_emit_llm_event( + self._logger, invocation.span, invocation, error + ) + finally: + otel_context.detach(invocation.context_token) + invocation.span.end() return invocation @contextmanager @@ -203,13 +253,79 @@ def llm( invocation = LLMInvocation( request_model="", ) - self.start_llm(invocation) + with _lifecycle_context( + invocation, self.start_llm, self.stop_llm, self.fail_llm, "llm" + ) as inv: + yield inv + + # ---- Agent invocation lifecycle ---- + + def start_agent( + self, + invocation: AgentInvocation, + ) -> AgentInvocation: + """Start an agent invocation and create a pending span entry.""" + span = self._tracer.start_span( + name=_get_base_agent_span_name(invocation), + kind=SpanKind.CLIENT, + attributes=_get_base_agent_common_attributes(invocation), + ) + invocation.monotonic_start_s = timeit.default_timer() + invocation.span = span + invocation.context_token = otel_context.attach( + set_span_in_context(span) + ) + return invocation + + def stop_agent(self, invocation: AgentInvocation) -> AgentInvocation: # pylint: disable=no-self-use + """Finalize an agent invocation successfully and end its span.""" + if invocation.context_token is None or invocation.span is None: + return invocation + + try: + _apply_agent_finish_attributes(invocation.span, invocation) + finally: + otel_context.detach(invocation.context_token) + invocation.span.end() + return invocation + + def fail_agent( # pylint: disable=no-self-use + self, invocation: AgentInvocation, error: Error + ) -> AgentInvocation: + """Fail an agent invocation and end its span with error status.""" + if invocation.context_token is None or invocation.span is None: + return invocation + try: - yield invocation - except Exception as exc: - self.fail_llm(invocation, Error(message=str(exc), type=type(exc))) - raise - self.stop_llm(invocation) + _apply_agent_finish_attributes(invocation.span, invocation) + _apply_error_attributes(invocation.span, error) + finally: + otel_context.detach(invocation.context_token) + invocation.span.end() + return invocation + + @contextmanager + def agent( + self, invocation: AgentInvocation | None = None + ) -> Iterator[AgentInvocation]: + """Context manager for agent invocations. + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + if invocation is None: + invocation = AgentInvocation() + with _lifecycle_context( + invocation, + self.start_agent, + self.stop_agent, + self.fail_agent, + "agent", + ) as inv: + yield inv def get_telemetry_handler( diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py index 889994436f..cff1bc2afb 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py @@ -32,11 +32,13 @@ from opentelemetry.trace.propagation import set_span_in_context from opentelemetry.trace.status import Status, StatusCode from opentelemetry.util.genai.types import ( + AgentInvocation, Error, InputMessage, LLMInvocation, MessagePart, OutputMessage, + _BaseAgent, ) from opentelemetry.util.genai.utils import ( ContentCapturingMode, @@ -73,10 +75,37 @@ def _get_llm_span_name(invocation: LLMInvocation) -> str: return f"{invocation.operation_name} {invocation.request_model}".strip() +def _get_system_instructions_for_span( + system_instruction: list[MessagePart] | None = None, +) -> dict[str, Any]: + """Get system instructions attribute formatted for span (JSON string format). + + Can be used with agent/llm/tool invocations. + Returns empty dict if not in experimental mode or content capturing is disabled. + """ + if ( + not is_experimental_mode() + or get_content_capturing_mode() + not in ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + or not system_instruction + ): + return {} + + return { + GenAI.GEN_AI_SYSTEM_INSTRUCTIONS: gen_ai_json_dumps( + [asdict(p) for p in system_instruction] + ) + } + + def _get_llm_messages_attributes_for_span( input_messages: list[InputMessage], output_messages: list[OutputMessage], system_instruction: list[MessagePart] | None = None, + tool_definitions: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """Get message attributes formatted for span (JSON string format). @@ -107,6 +136,10 @@ def _get_llm_messages_attributes_for_span( if system_instruction else None, ), + ( + GenAI.GEN_AI_TOOL_DEFINITIONS, + gen_ai_json_dumps(tool_definitions) if tool_definitions else None, + ), ) return {key: value for key, value in optional_attrs if value is not None} @@ -279,6 +312,137 @@ def _get_llm_response_attributes( return {key: value for key, value in optional_attrs if value is not None} +def _get_base_agent_common_attributes( + agent: _BaseAgent, +) -> dict[str, Any]: + """Get common attributes shared by all agent operations (invoke_agent, create_agent).""" + optional_attrs = ( + (GenAI.GEN_AI_REQUEST_MODEL, agent.request_model), + (GenAI.GEN_AI_PROVIDER_NAME, agent.provider), + (GenAI.GEN_AI_AGENT_NAME, agent.agent_name), + (GenAI.GEN_AI_AGENT_ID, agent.agent_id), + (GenAI.GEN_AI_AGENT_DESCRIPTION, agent.agent_description), + (GenAI.GEN_AI_AGENT_VERSION, agent.agent_version), + (server_attributes.SERVER_ADDRESS, agent.server_address), + (server_attributes.SERVER_PORT, agent.server_port), + ) + + return { + GenAI.GEN_AI_OPERATION_NAME: agent.operation_name, + **{key: value for key, value in optional_attrs if value is not None}, + } + + +def _get_base_agent_span_name(agent: _BaseAgent) -> str: + """Get the span name for any agent operation.""" + if agent.agent_name: + return f"{agent.operation_name} {agent.agent_name}" + return agent.operation_name + + +def _get_agent_common_attributes( + invocation: AgentInvocation, +) -> dict[str, Any]: + """Get common agent invocation attributes shared by finish() and error() paths.""" + attrs = _get_base_agent_common_attributes(invocation) + + # Invoke-specific conditionally required attributes + invoke_attrs = ( + (GenAI.GEN_AI_CONVERSATION_ID, invocation.conversation_id), + (GenAI.GEN_AI_DATA_SOURCE_ID, invocation.data_source_id), + (GenAI.GEN_AI_OUTPUT_TYPE, invocation.output_type), + ) + attrs.update( + {key: value for key, value in invoke_attrs if value is not None} + ) + + return attrs + + +def _get_agent_request_attributes( + invocation: AgentInvocation, +) -> dict[str, Any]: + """Get GenAI request semantic convention attributes for agent invocation.""" + optional_attrs = ( + (GenAI.GEN_AI_REQUEST_TEMPERATURE, invocation.temperature), + (GenAI.GEN_AI_REQUEST_TOP_P, invocation.top_p), + (GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, invocation.frequency_penalty), + (GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, invocation.presence_penalty), + (GenAI.GEN_AI_REQUEST_MAX_TOKENS, invocation.max_tokens), + (GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, invocation.stop_sequences), + (GenAI.GEN_AI_REQUEST_SEED, invocation.seed), + (GenAI.GEN_AI_REQUEST_CHOICE_COUNT, invocation.choice_count), + ) + + return {key: value for key, value in optional_attrs if value is not None} + + +def _get_agent_response_attributes( + invocation: AgentInvocation, +) -> dict[str, Any]: + """Get GenAI response semantic convention attributes for agent invocation.""" + finish_reasons: list[str] | None + if invocation.finish_reasons is not None: + finish_reasons = invocation.finish_reasons + elif invocation.output_messages: + finish_reasons = [ + message.finish_reason + for message in invocation.output_messages + if message.finish_reason + ] + else: + finish_reasons = None + + unique_finish_reasons = ( + sorted(set(finish_reasons)) if finish_reasons else None + ) + + optional_attrs = ( + ( + GenAI.GEN_AI_RESPONSE_FINISH_REASONS, + unique_finish_reasons if unique_finish_reasons else None, + ), + (GenAI.GEN_AI_RESPONSE_MODEL, invocation.response_model_name), + (GenAI.GEN_AI_RESPONSE_ID, invocation.response_id), + (GenAI.GEN_AI_USAGE_INPUT_TOKENS, invocation.input_tokens), + (GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, invocation.output_tokens), + ( + GenAI.GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS, + invocation.cache_creation_input_tokens, + ), + ( + GenAI.GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS, + invocation.cache_read_input_tokens, + ), + ) + + return {key: value for key, value in optional_attrs if value is not None} + + +def _apply_agent_finish_attributes( + span: Span, invocation: AgentInvocation +) -> None: + """Apply attributes/messages common to agent finish() paths.""" + span.update_name(_get_base_agent_span_name(invocation)) + + attributes: dict[str, Any] = {} + attributes.update(_get_agent_common_attributes(invocation)) + attributes.update(_get_agent_request_attributes(invocation)) + attributes.update(_get_agent_response_attributes(invocation)) + attributes.update( + _get_llm_messages_attributes_for_span( + invocation.input_messages, + invocation.output_messages, + invocation.system_instruction, + invocation.tool_definitions, + ) + ) + attributes.update(invocation.attributes) + + if attributes: + span.set_attributes(attributes) + + __all__ = [ "_apply_llm_finish_attributes", "_apply_error_attributes", @@ -287,4 +451,11 @@ def _get_llm_response_attributes( "_get_llm_response_attributes", "_get_llm_span_name", "_maybe_emit_llm_event", + "_get_base_agent_common_attributes", + "_get_base_agent_span_name", + "_apply_agent_finish_attributes", + "_get_system_instructions_for_span", + "_get_agent_common_attributes", + "_get_agent_request_attributes", + "_get_agent_response_attributes", ] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index fc5de2cdb0..a691a59883 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -273,6 +273,96 @@ class LLMInvocation(GenAIInvocation): monotonic_start_s: float | None = None +@dataclass +class _BaseAgent(GenAIInvocation): + """ + Shared base class for agent lifecycle types. + + Contains fields common to all agent operations: identity, provider, + model, system instructions, server info, and telemetry plumbing. + + Follows semconv for GenAI agent spans: + https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md + + Do not instantiate directly — use AgentInvocation. + """ + + agent_name: str | None = None + agent_id: str | None = None + agent_description: str | None = None + agent_version: str | None = None + + provider: str | None = None + + request_model: str | None = None + + system_instruction: list[MessagePart] = field( + default_factory=_new_system_instruction + ) + server_address: str | None = None + server_port: int | None = None + + attributes: dict[str, Any] = field(default_factory=_new_str_any_dict) + """ + Additional attributes to set on spans and/or events. + """ + # Monotonic start time in seconds (from timeit.default_timer) used + # for duration calculations to avoid mixing clock sources. This is + # populated by the TelemetryHandler when starting an invocation. + monotonic_start_s: float | None = None + + +@dataclass +class AgentInvocation(_BaseAgent): + """ + Represents an agent invocation (invoke_agent operation). + + Follows semconv for GenAI agent spans: + https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-agent-spans.md#invoke-agent-span + + When creating an AgentInvocation object, only update the data attributes. + The span and context_token attributes are set by the TelemetryHandler. + """ + + operation_name: str = GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + conversation_id: str | None = None + data_source_id: str | None = None + output_type: str | None = None + + temperature: float | None = None + top_p: float | None = None + frequency_penalty: float | None = None + presence_penalty: float | None = None + max_tokens: int | None = None + stop_sequences: list[str] | None = None + seed: int | None = None + choice_count: int | None = None + + response_model_name: str | None = None + response_id: str | None = None + finish_reasons: list[str] | None = None + input_tokens: int | None = None + output_tokens: int | None = None + cache_creation_input_tokens: int | None = None + cache_read_input_tokens: int | None = None + + input_messages: list[InputMessage] = field( + default_factory=_new_input_messages + ) + output_messages: list[OutputMessage] = field( + default_factory=_new_output_messages + ) + tool_definitions: list[dict[str, Any]] | None = None + + metric_attributes: dict[str, Any] = field( + default_factory=_new_str_any_dict + ) + """ + Additional attributes to set on metrics. Must be of a low cardinality. + These attributes will not be set on spans or events. + """ + + @dataclass class Error: message: str diff --git a/util/opentelemetry-util-genai/tests/test_handler_agent.py b/util/opentelemetry-util-genai/tests/test_handler_agent.py new file mode 100644 index 0000000000..f011b2ac3c --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_handler_agent.py @@ -0,0 +1,229 @@ +from __future__ import annotations + +from unittest import TestCase + +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.trace import SpanKind +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + AgentInvocation, + Error, + InputMessage, + OutputMessage, + Text, +) + + +class _AgentTestBase(TestCase): + """Shared setUp and helper for agent handler tests.""" + + def setUp(self) -> None: + self.span_exporter = InMemorySpanExporter() + self.tracer_provider = TracerProvider() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + + def _make_handler(self) -> TelemetryHandler: + return TelemetryHandler( + tracer_provider=self.tracer_provider, + ) + + +class TestAgentInvocationHandler(_AgentTestBase): + def test_start_stop_creates_span(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation( + agent_name="Math Tutor", + provider="openai", + request_model="gpt-4", + ) + handler.start_agent(invocation) + handler.stop_agent(invocation) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.name, "invoke_agent Math Tutor") + self.assertEqual( + span.attributes[GenAI.GEN_AI_OPERATION_NAME], "invoke_agent" + ) + self.assertEqual( + span.attributes[GenAI.GEN_AI_AGENT_NAME], "Math Tutor" + ) + self.assertEqual(span.attributes[GenAI.GEN_AI_PROVIDER_NAME], "openai") + self.assertEqual(span.attributes[GenAI.GEN_AI_REQUEST_MODEL], "gpt-4") + + def test_span_kind_client_by_default(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation(agent_name="Agent") + handler.start_agent(invocation) + handler.stop_agent(invocation) + self.assertEqual( + self.span_exporter.get_finished_spans()[0].kind, SpanKind.CLIENT + ) + + def test_all_attributes(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation( + agent_name="Full Agent", + agent_id="agent-123", + agent_description="A test agent", + agent_version="1.0.0", + provider="openai", + request_model="gpt-4", + conversation_id="conv-456", + data_source_id="ds-789", + output_type="text", + temperature=0.7, + top_p=0.9, + max_tokens=1000, + seed=42, + server_address="api.openai.com", + server_port=443, + ) + handler.start_agent(invocation) + invocation.response_model_name = "gpt-4-0613" + invocation.response_id = "resp-abc" + invocation.input_tokens = 100 + invocation.output_tokens = 200 + invocation.finish_reasons = ["stop"] + handler.stop_agent(invocation) + + attrs = self.span_exporter.get_finished_spans()[0].attributes + self.assertEqual(attrs[GenAI.GEN_AI_AGENT_NAME], "Full Agent") + self.assertEqual(attrs[GenAI.GEN_AI_AGENT_ID], "agent-123") + self.assertEqual(attrs[GenAI.GEN_AI_AGENT_DESCRIPTION], "A test agent") + self.assertEqual(attrs[GenAI.GEN_AI_RESPONSE_MODEL], "gpt-4-0613") + self.assertEqual(attrs[GenAI.GEN_AI_RESPONSE_ID], "resp-abc") + self.assertEqual(attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS], 100) + self.assertEqual(attrs[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS], 200) + self.assertEqual( + tuple(attrs[GenAI.GEN_AI_RESPONSE_FINISH_REASONS]), ("stop",) + ) + self.assertEqual(attrs["gen_ai.agent.version"], "1.0.0") + + def test_cache_token_attributes(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation( + agent_name="Cache Agent", provider="openai" + ) + handler.start_agent(invocation) + invocation.input_tokens = 100 + invocation.cache_creation_input_tokens = 25 + invocation.cache_read_input_tokens = 50 + handler.stop_agent(invocation) + + attrs = self.span_exporter.get_finished_spans()[0].attributes + self.assertEqual(attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS], 100) + self.assertEqual( + attrs[GenAI.GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS], 25 + ) + self.assertEqual(attrs[GenAI.GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS], 50) + + def test_fail_sets_error_status(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation(agent_name="Agent", provider="openai") + handler.start_agent(invocation) + handler.fail_agent( + invocation, Error(message="agent crashed", type=RuntimeError) + ) + + span = self.span_exporter.get_finished_spans()[0] + self.assertEqual(span.status.description, "agent crashed") + self.assertEqual(span.attributes.get("error.type"), "RuntimeError") + + def test_context_manager_success(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation( + agent_name="CM Agent", provider="openai", request_model="gpt-4" + ) + with handler.agent(invocation) as inv: + inv.input_tokens = 10 + inv.output_tokens = 20 + + self.assertEqual( + self.span_exporter.get_finished_spans()[0].name, + "invoke_agent CM Agent", + ) + + def test_context_manager_error(self) -> None: + handler = self._make_handler() + with self.assertRaises(ValueError): + with handler.agent(AgentInvocation(agent_name="Agent")): + raise ValueError("test error") + + self.assertEqual( + self.span_exporter.get_finished_spans()[0].attributes.get( + "error.type" + ), + "ValueError", + ) + + def test_context_manager_default_invocation(self) -> None: + handler = self._make_handler() + with handler.agent() as inv: + inv.agent_name = "Dynamic Agent" + inv.provider = "openai" + self.assertEqual(len(self.span_exporter.get_finished_spans()), 1) + + def test_stop_without_start_is_noop(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation(agent_name="Not Started") + result = handler.stop_agent(invocation) + self.assertIs(result, invocation) + self.assertEqual(len(self.span_exporter.get_finished_spans()), 0) + + def test_fail_without_start_is_noop(self) -> None: + handler = self._make_handler() + invocation = AgentInvocation(agent_name="Not Started") + result = handler.fail_agent( + invocation, Error(message="boom", type=RuntimeError) + ) + self.assertIs(result, invocation) + self.assertEqual(len(self.span_exporter.get_finished_spans()), 0) + + +class TestAgentInvocationType(TestCase): + def test_defaults(self) -> None: + inv = AgentInvocation() + self.assertEqual(inv.operation_name, "invoke_agent") + self.assertIsNone(inv.agent_name) + self.assertIsNone(inv.provider) + self.assertIsNone(inv.request_model) + self.assertEqual(inv.input_messages, []) + self.assertEqual(inv.output_messages, []) + self.assertIsNone(inv.tool_definitions) + self.assertIsNone(inv.cache_creation_input_tokens) + self.assertIsNone(inv.cache_read_input_tokens) + + def test_with_messages(self) -> None: + inv = AgentInvocation( + agent_name="Test", + input_messages=[ + InputMessage(role="user", parts=[Text(content="Hello")]) + ], + output_messages=[ + OutputMessage( + role="assistant", + parts=[Text(content="Hi there!")], + finish_reason="stop", + ) + ], + ) + self.assertEqual(len(inv.input_messages), 1) + self.assertEqual(inv.input_messages[0].role, "user") + + def test_custom_attributes(self) -> None: + inv = AgentInvocation( + agent_name="Custom", + attributes={"custom.key": "custom_value"}, + ) + self.assertEqual(inv.attributes["custom.key"], "custom_value")