diff --git a/.gitignore b/.gitignore index 09141b12eb..ee31ca801a 100644 --- a/.gitignore +++ b/.gitignore @@ -64,3 +64,5 @@ target # opentelemetry-admin jobs opentelemetry-admin-jobs.txt + +.claude/settings.local.json \ No newline at end of file diff --git a/instrumentation-genai/AGENTS.md b/instrumentation-genai/AGENTS.md new file mode 100644 index 0000000000..6e8dd1eca9 --- /dev/null +++ b/instrumentation-genai/AGENTS.md @@ -0,0 +1,41 @@ +# GenAI Instrumentation — Agent and Contributor Guidelines + +Instrumentation packages here wrap specific libraries (OpenAI, Anthropic, etc.) and bridge +them to the shared telemetry layer in `util/opentelemetry-util-genai`. + +## 1. Instrumentation Layer Boundary + +Do not call OpenTelemetry APIs (`tracer`, `meter`, `span`, event APIs) directly. +Always go through `TelemetryHandler` and the invocation objects it returns. + +This layer is responsible only for: + +- Patching the library +- Parsing library-specific input/output into invocation fields + +Everything else (span creation, metric recording, event emission, context propagation) +belongs in `util/opentelemetry-util-genai`. + +## 2. Invocation Pattern + +Use `start_*()` and control span lifetime manually: + +```python +invocation = handler.start_inference(provider, request_model, server_address=..., server_port=...) +invocation.temperature = ... +try: + response = client.call(...) + invocation.response_model_name = response.model + invocation.finish_reasons = response.finish_reasons + invocation.stop() +except Exception as exc: + invocation.fail(exc) + raise +``` + +## 3. Exception Handling + +- Do not add `raise {Error}` statements in instrumentation/telemetry code — validation belongs in + tests and callers, not in the instrumentation layer. +- When catching exceptions from the underlying library to record telemetry, always re-raise + the original exception unmodified. diff --git a/instrumentation-genai/CLAUDE.md b/instrumentation-genai/CLAUDE.md new file mode 120000 index 0000000000..47dc3e3d86 --- /dev/null +++ b/instrumentation-genai/CLAUDE.md @@ -0,0 +1 @@ +AGENTS.md \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py index 4dc8b55d38..845e10de3f 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py @@ -26,7 +26,10 @@ gen_ai_attributes as GenAIAttributes, ) from opentelemetry.util.genai.handler import TelemetryHandler -from opentelemetry.util.genai.types import Error, LLMInvocation +from opentelemetry.util.genai.types import ( + Error, + LLMInvocation, # TODO: migrate to InferenceInvocation +) from opentelemetry.util.genai.utils import ( should_capture_content_on_spans_in_experimental_mode, ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py index 52e2e68582..1a9d5319a5 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py @@ -31,7 +31,7 @@ from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( Error, - LLMInvocation, + LLMInvocation, # TODO: migrate to InferenceInvocation ) from .messages_extractors import set_invocation_response_attributes diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index d694857da4..5235af3c7f 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -28,7 +28,7 @@ from opentelemetry.util.genai.types import ( Error, InputMessage, - LLMInvocation, + LLMInvocation, # TODO: migrate to InferenceInvocation MessagePart, OutputMessage, Text, @@ -158,7 +158,7 @@ def on_chat_model_start( self._invocation_manager.add_invocation_state( run_id=run_id, parent_run_id=parent_run_id, - invocation=llm_invocation, + invocation=llm_invocation, # pyright: ignore[reportArgumentType] ) def on_llm_end( @@ -171,7 +171,8 @@ def on_llm_end( ) -> None: llm_invocation = self._invocation_manager.get_invocation(run_id=run_id) if llm_invocation is None or not isinstance( - llm_invocation, LLMInvocation + llm_invocation, + LLMInvocation, ): # If the invocation does not exist, we cannot set attributes or end it return @@ -262,7 +263,8 @@ def on_llm_error( ) -> None: llm_invocation = self._invocation_manager.get_invocation(run_id=run_id) if llm_invocation is None or not isinstance( - llm_invocation, LLMInvocation + llm_invocation, + LLMInvocation, ): # If the invocation does not exist, we cannot set attributes or end it return diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/pyproject.toml index 61a63e60bf..9c0086bd57 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/pyproject.toml @@ -26,10 +26,10 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] dependencies = [ - "opentelemetry-api >= 1.37", + "opentelemetry-api >= 1.39", "opentelemetry-instrumentation >= 0.58b0", - "opentelemetry-semantic-conventions >= 0.58b0", - "opentelemetry-util-genai" + "opentelemetry-semantic-conventions >= 0.60b0", + "opentelemetry-util-genai >= 0.4b0.dev" ] [project.optional-dependencies] diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/requirements.latest.txt b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/requirements.latest.txt index e224e4349b..2bd34ba349 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/requirements.latest.txt +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/requirements.latest.txt @@ -50,4 +50,5 @@ grpcio>=1.60.0; implementation_name != "pypy" grpcio<1.60.0; implementation_name == "pypy" -e opentelemetry-instrumentation +-e util/opentelemetry-util-genai -e instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2 diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/requirements.oldest.txt b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/requirements.oldest.txt index 2e935a63b6..537c2b9ba2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/requirements.oldest.txt +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/requirements.oldest.txt @@ -26,10 +26,11 @@ pytest-asyncio==0.21.0 wrapt==1.16.0 opentelemetry-exporter-otlp-proto-grpc~=1.30 opentelemetry-exporter-otlp-proto-http~=1.30 -opentelemetry-api==1.37 # when updating, also update in pyproject.toml -opentelemetry-sdk==1.37 # when updating, also update in pyproject.toml -opentelemetry-semantic-conventions==0.58b0 # when updating, also update in pyproject.toml +opentelemetry-api==1.39 # when updating, also update in pyproject.toml +opentelemetry-sdk==1.39 # when updating, also update in pyproject.toml +opentelemetry-semantic-conventions==0.60b0 # when updating, also update in pyproject.toml grpcio>=1.60.0; implementation_name != "pypy" grpcio<1.60.0; implementation_name == "pypy" +-e util/opentelemetry-util-genai -e instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2 diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/pyproject.toml index 7b4fcce224..7727d06f23 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/pyproject.toml @@ -29,7 +29,7 @@ dependencies = [ "opentelemetry-api ~= 1.39", "opentelemetry-instrumentation ~= 0.60b0", "opentelemetry-semantic-conventions ~= 0.60b0", - "opentelemetry-util-genai", + "opentelemetry-util-genai >= 0.4b0.dev", ] [project.optional-dependencies] diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py index 996c91c04d..30f74cfab1 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py @@ -36,7 +36,7 @@ from opentelemetry.util.genai.types import ( ContentCapturingMode, Error, - LLMInvocation, + LLMInvocation, # pylint: disable=no-name-in-module # TODO: migrate to InferenceInvocation OutputMessage, Text, ToolCallRequest, diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/response_wrappers.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/response_wrappers.py index ce3375b8d5..402e5c84c2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/response_wrappers.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/response_wrappers.py @@ -8,7 +8,10 @@ from typing import TYPE_CHECKING, Callable, Generator, Generic, TypeVar from opentelemetry.util.genai.handler import TelemetryHandler -from opentelemetry.util.genai.types import Error, LLMInvocation +from opentelemetry.util.genai.types import ( + Error, + LLMInvocation, # pylint: disable=no-name-in-module # TODO: migrate to InferenceInvocation +) # OpenAI Responses internals are version-gated (added in openai>=1.66.0), so # pylint may not resolve them in all lint environments even though we guard diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py index 6afca130b6..bd566ddba5 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py @@ -38,7 +38,7 @@ from opentelemetry.trace.status import Status, StatusCode from opentelemetry.util.genai.types import ( InputMessage, - LLMInvocation, + LLMInvocation, # pylint: disable=no-name-in-module # TODO: migrate to InferenceInvocation OutputMessage, Text, ToolCallRequest, diff --git a/util/opentelemetry-util-genai/AGENTS.md b/util/opentelemetry-util-genai/AGENTS.md new file mode 100644 index 0000000000..355d8b7c08 --- /dev/null +++ b/util/opentelemetry-util-genai/AGENTS.md @@ -0,0 +1,80 @@ +# GenAI Utils — Agent and Contributor Guidelines + +This package provides shared telemetry utilities for OpenTelemetry GenAI instrumentation. + +## 1. Semantic Convention Compliance + +All attributes, operation names, and span names must match the +[OpenTelemetry GenAI semantic conventions](https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/) +exactly. + +Use the appropriate semconv attribute modules — do not hardcode attribute name strings: + +- `gen_ai.*` attributes: `opentelemetry.semconv._incubating.attributes.gen_ai_attributes` +- `server.*` attributes: `opentelemetry.semconv.attributes.server_attributes` +- `error.*` attributes: `opentelemetry.semconv.attributes.error_attributes` +- Other namespaces: use the corresponding module from `opentelemetry.semconv` + +## 2. Invocation Lifecycle Pattern + +Every new operation type must follow this pattern: + +```python +invocation = handler.start_inference(provider, request_model, server_address=..., server_port=...) +invocation.temperature = ... +try: + response = client.call(...) + invocation.response_model_name = response.model + invocation.finish_reasons = response.finish_reasons + invocation.stop() +except Exception as exc: + invocation.fail(exc) + raise +``` + +Factory methods on `TelemetryHandler` (`handler.py`): + +- `start_inference(provider, request_model, *, server_address, server_port)` → `InferenceInvocation` +- `start_embedding(provider, request_model, *, server_address, server_port)` → `EmbeddingInvocation` +- `start_tool(name, *, arguments, tool_call_id, tool_type, tool_description)` → `ToolInvocation` +- `start_workflow(name)` → `WorkflowInvocation` + +Context manager equivalents (`handler.inference()`, `handler.embedding()`, `handler.tool()`, +`handler.workflow()`) are available when the span lifetime maps cleanly to a `with` block. + +### Anti-patterns + +**Never construct invocation types directly** (`InferenceInvocation(...)`, `ToolInvocation(...)`, +etc.) in instrumentation or production code — direct construction skips span creation and context +propagation, so all telemetry calls become no-ops. Always use `handler.start_*()`. + +## 3. Exception Handling + +- Do not add `raise {Error}` statements to `handler.py` or `types.py` — validation belongs in + tests and callers, not telemetry internals. +- When catching exceptions from the underlying library to record telemetry, always re-raise + the original exception unmodified. + +## 4. Documentation + +- Docstrings for invocation types and span/event helpers must include a link to the + corresponding operation in the semconv spec. +- When adding or changing attributes, update the docstring to describe what is set and under + what conditions (e.g., "set only when `server_address` is provided"). + +## 5. Tests + +- Every new operation type or attribute change must have tests verifying the exact attribute + names and values produced, checked against the semconv spec. +- Cover all paths: success (`invocation.stop()`), failure (`invocation.fail(exc)`), and any + conditional attribute logic (e.g., attributes set only when optional fields are populated). +- Tests live in `tests/` — follow existing patterns there. +- Don't call internal API in tests when the public API is available. + +## 6. Python API Conventions + +- Mark private modules with an underscore. +- Objects inside of a private module should be prefixed with underscopre if they + are not used outside the that module. +- Before removing or renaming an object exposed publicly, deprecate it first with + `@deprecated("... Use X instead.")` pointing to the replacement; diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 40b05bb359..15914b8d61 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,10 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased - - Add support for workflow in genAI utils handler. ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4366](#4366)) -- Enrich ToolCall type, breaking change: usage of ToolCall class renamed to ToolCallRequest +- Enrich ToolCall type, breaking change: usage of ToolCall class renamed to ToolCallRequest ([#4218](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4218)) - Add EmbeddingInvocation span lifecycle support ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4219](#4219)) @@ -20,6 +19,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4310](#4310)) - Check if upload works at startup in initializer of the `UploadCompletionHook`, instead of repeatedly failing on every upload ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4390](#4390)). +- Refactor public API: add factory methods (`start_inference`, `start_embedding`, `start_tool`, `start_workflow`) and invocation-owned lifecycle (`invocation.stop()` / `invocation.fail(exc)`); rename `LLMInvocation` → `InferenceInvocation` and `ToolCall` → `ToolInvocation`. Existing usages remain fully functional via deprecated aliases. + ([#4391](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4391)) + ## Version 0.3b0 (2026-02-20) diff --git a/util/opentelemetry-util-genai/CLAUDE.md b/util/opentelemetry-util-genai/CLAUDE.md new file mode 120000 index 0000000000..47dc3e3d86 --- /dev/null +++ b/util/opentelemetry-util-genai/CLAUDE.md @@ -0,0 +1 @@ +AGENTS.md \ No newline at end of file diff --git a/util/opentelemetry-util-genai/pyproject.toml b/util/opentelemetry-util-genai/pyproject.toml index f8705369c2..3f81719f7b 100644 --- a/util/opentelemetry-util-genai/pyproject.toml +++ b/util/opentelemetry-util-genai/pyproject.toml @@ -50,3 +50,4 @@ include = ["/src", "/tests"] [tool.hatch.build.targets.wheel] packages = ["src/opentelemetry"] + diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_embedding_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_embedding_invocation.py new file mode 100644 index 0000000000..e7fa59b600 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_embedding_invocation.py @@ -0,0 +1,124 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Any + +from opentelemetry._logs import Logger +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.attributes import server_attributes +from opentelemetry.trace import SpanKind, Tracer +from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai.metrics import InvocationMetricsRecorder +from opentelemetry.util.types import AttributeValue + + +class EmbeddingInvocation(GenAIInvocation): + """Represents a single embedding model invocation. + + Use handler.start_embedding(provider) or the handler.embedding(provider) + context manager rather than constructing this directly. + """ + + def __init__( + self, + tracer: Tracer, + metrics_recorder: InvocationMetricsRecorder, + logger: Logger, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + encoding_formats: list[str] | None = None, + input_tokens: int | None = None, + dimension_count: int | None = None, + response_model_name: str | None = None, + attributes: dict[str, Any] | None = None, + metric_attributes: dict[str, Any] | None = None, + ) -> None: + """Use handler.start_embedding(provider) or handler.embedding(provider) instead of calling this directly.""" + _operation_name = GenAI.GenAiOperationNameValues.EMBEDDINGS.value + super().__init__( + tracer, + metrics_recorder, + logger, + operation_name=_operation_name, + span_name=f"{_operation_name} {request_model}" + if request_model + else _operation_name, + span_kind=SpanKind.CLIENT, + attributes=attributes, + metric_attributes=metric_attributes, + ) + self.provider = provider # e.g., azure.ai.openai, openai, aws.bedrock + self.request_model = request_model + self.server_address = server_address + self.server_port = server_port + # encoding_formats can be multi-value -> combinational cardinality risk. + # Keep on spans/events only. + self.encoding_formats = encoding_formats + self.input_tokens = input_tokens + self.dimension_count = dimension_count + self.response_model_name = response_model_name + self._start() + + def _get_metric_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_PROVIDER_NAME, self.provider), + (GenAI.GEN_AI_REQUEST_MODEL, self.request_model), + (GenAI.GEN_AI_RESPONSE_MODEL, self.response_model_name), + (server_attributes.SERVER_ADDRESS, self.server_address), + (server_attributes.SERVER_PORT, self.server_port), + ) + attrs: dict[str, AttributeValue] = { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name, + **{k: v for k, v in optional_attrs if v is not None}, + } + attrs.update(self.metric_attributes) + return attrs + + def _get_metric_token_counts(self) -> dict[str, int]: + if self.input_tokens is not None: + return {GenAI.GenAiTokenTypeValues.INPUT.value: self.input_tokens} + return {} + + def _apply_finish(self, error: Error | None = None) -> None: + optional_attrs = ( + (GenAI.GEN_AI_PROVIDER_NAME, self.provider), + (server_attributes.SERVER_ADDRESS, self.server_address), + (server_attributes.SERVER_PORT, self.server_port), + (GenAI.GEN_AI_REQUEST_MODEL, self.request_model), + (GenAI.GEN_AI_EMBEDDINGS_DIMENSION_COUNT, self.dimension_count), + (GenAI.GEN_AI_REQUEST_ENCODING_FORMATS, self.encoding_formats), + (GenAI.GEN_AI_RESPONSE_MODEL, self.response_model_name), + (GenAI.GEN_AI_USAGE_INPUT_TOKENS, self.input_tokens), + ) + attributes: dict[str, Any] = { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name, + **{ + key: value + for key, value in optional_attrs + if value is not None + }, + } + if error is not None: + self._apply_error_attributes(error) + attributes.update(self.attributes) + self.span.set_attributes(attributes) + # Metrics recorder currently supports InferenceInvocation fields only. + # No-op until dedicated embedding metric support is added. diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py new file mode 100644 index 0000000000..a1bd55811c --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py @@ -0,0 +1,358 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from typing import Any + +from opentelemetry._logs import Logger, LogRecord +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.attributes import server_attributes +from opentelemetry.trace import INVALID_SPAN, Span, SpanKind, Tracer +from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai.metrics import InvocationMetricsRecorder +from opentelemetry.util.genai.types import ( + InputMessage, + MessagePart, + OutputMessage, +) +from opentelemetry.util.genai.utils import ( + ContentCapturingMode, + gen_ai_json_dumps, + get_content_capturing_mode, + is_experimental_mode, + should_emit_event, +) + + +class InferenceInvocation(GenAIInvocation): + """Represents a single LLM chat/completion call. + + Use handler.start_inference(provider) or the handler.inference(provider) + context manager rather than constructing this directly. + """ + + def __init__( # pylint: disable=too-many-locals + self, + tracer: Tracer, + metrics_recorder: InvocationMetricsRecorder, + logger: Logger, + provider: str, + *, + request_model: str | None = None, + input_messages: list[InputMessage] | None = None, + output_messages: list[OutputMessage] | None = None, + system_instruction: list[MessagePart] | None = None, + response_model_name: str | None = None, + response_id: str | None = None, + finish_reasons: list[str] | None = None, + input_tokens: int | None = None, + output_tokens: int | None = None, + temperature: float | None = None, + top_p: float | None = None, + frequency_penalty: float | None = None, + presence_penalty: float | None = None, + max_tokens: int | None = None, + stop_sequences: list[str] | None = None, + seed: int | None = None, + server_address: str | None = None, + server_port: int | None = None, + attributes: dict[str, Any] | None = None, + metric_attributes: dict[str, Any] | None = None, + ) -> None: + """Use handler.start_inference(provider) or handler.inference(provider) instead of calling this directly.""" + _operation_name = GenAI.GenAiOperationNameValues.CHAT.value + super().__init__( + tracer, + metrics_recorder, + logger, + operation_name=_operation_name, + span_name=f"{_operation_name} {request_model}" + if request_model + else _operation_name, + span_kind=SpanKind.CLIENT, + attributes=attributes, + metric_attributes=metric_attributes, + ) + self.provider = provider + self.request_model = request_model + self.input_messages: list[InputMessage] = ( + [] if input_messages is None else input_messages + ) + self.output_messages: list[OutputMessage] = ( + [] if output_messages is None else output_messages + ) + self.system_instruction: list[MessagePart] = ( + [] if system_instruction is None else system_instruction + ) + self.response_model_name = response_model_name + self.response_id = response_id + self.finish_reasons = finish_reasons + self.input_tokens = input_tokens + self.output_tokens = output_tokens + self.temperature = temperature + self.top_p = top_p + self.frequency_penalty = frequency_penalty + self.presence_penalty = presence_penalty + self.max_tokens = max_tokens + self.stop_sequences = stop_sequences + self.seed = seed + self.server_address = server_address + self.server_port = server_port + self._start() + + def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]: + if not is_experimental_mode(): + return {} + mode = get_content_capturing_mode() + allowed_modes = ( + ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + if for_span + else ( + ContentCapturingMode.EVENT_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + ) + if mode not in allowed_modes: + return {} + + def serialize(items: list[Any]) -> Any: + dicts = [asdict(item) for item in items] + return gen_ai_json_dumps(dicts) if for_span else dicts + + optional_attrs = ( + ( + GenAI.GEN_AI_INPUT_MESSAGES, + serialize(self.input_messages) + if self.input_messages + else None, + ), + ( + GenAI.GEN_AI_OUTPUT_MESSAGES, + serialize(self.output_messages) + if self.output_messages + else None, + ), + ( + GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, + serialize(self.system_instruction) + if self.system_instruction + else None, + ), + ) + return { + key: value for key, value in optional_attrs if value is not None + } + + def _get_finish_reasons(self) -> list[str] | None: + if self.finish_reasons is not None: + return self.finish_reasons or None + if self.output_messages: + reasons = [ + msg.finish_reason + for msg in self.output_messages + if msg.finish_reason + ] + return reasons or None + return None + + def _get_base_attributes(self) -> dict[str, Any]: + optional_attrs = ( + (GenAI.GEN_AI_REQUEST_MODEL, self.request_model), + (GenAI.GEN_AI_PROVIDER_NAME, self.provider), + (server_attributes.SERVER_ADDRESS, self.server_address), + (server_attributes.SERVER_PORT, self.server_port), + ) + return { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name, + **{k: v for k, v in optional_attrs if v is not None}, + } + + def _get_attributes(self) -> dict[str, Any]: + attrs = self._get_base_attributes() + optional_attrs = ( + (GenAI.GEN_AI_REQUEST_TEMPERATURE, self.temperature), + (GenAI.GEN_AI_REQUEST_TOP_P, self.top_p), + (GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, self.frequency_penalty), + (GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, self.presence_penalty), + (GenAI.GEN_AI_REQUEST_MAX_TOKENS, self.max_tokens), + (GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, self.stop_sequences), + (GenAI.GEN_AI_REQUEST_SEED, self.seed), + (GenAI.GEN_AI_RESPONSE_FINISH_REASONS, self._get_finish_reasons()), + (GenAI.GEN_AI_RESPONSE_MODEL, self.response_model_name), + (GenAI.GEN_AI_RESPONSE_ID, self.response_id), + (GenAI.GEN_AI_USAGE_INPUT_TOKENS, self.input_tokens), + (GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, self.output_tokens), + ) + attrs.update({k: v for k, v in optional_attrs if v is not None}) + return attrs + + def _get_metric_attributes(self) -> dict[str, Any]: + attrs = self._get_base_attributes() + if self.response_model_name is not None: + attrs[GenAI.GEN_AI_RESPONSE_MODEL] = self.response_model_name + attrs.update(self.metric_attributes) + return attrs + + def _get_metric_token_counts(self) -> dict[str, int]: + counts: dict[str, int] = {} + if self.input_tokens is not None: + counts[GenAI.GenAiTokenTypeValues.INPUT.value] = self.input_tokens + if self.output_tokens is not None: + counts[GenAI.GenAiTokenTypeValues.OUTPUT.value] = ( + self.output_tokens + ) + return counts + + def _apply_finish(self, error: Error | None = None) -> None: + if error is not None: + self._apply_error_attributes(error) + attributes = self._get_attributes() + attributes.update(self._get_message_attributes(for_span=True)) + attributes.update(self.attributes) + self.span.set_attributes(attributes) + self._metrics_recorder.record(self) + self._emit_event() + + def _emit_event(self) -> None: + """Emit a gen_ai.client.inference.operation.details event. + + For more details, see the semantic convention documentation: + https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-events.md#event-eventgen_aiclientinferenceoperationdetails + """ + if not is_experimental_mode() or not should_emit_event(): + return + + attributes = self._get_attributes() + attributes.update(self._get_message_attributes(for_span=False)) + attributes.update(self.attributes) + self._logger.emit( + LogRecord( + event_name="gen_ai.client.inference.operation.details", + attributes=attributes, + context=self._span_context, + ) + ) + + +@dataclass +class LLMInvocation: + """Deprecated. Use InferenceInvocation instead. + + Data container for an LLM invocation. Pass to handler.start_llm() to start + the span, then update fields and call handler.stop_llm() or handler.fail_llm(). + """ + + request_model: str | None = None + input_messages: list[InputMessage] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType] + output_messages: list[OutputMessage] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType] + system_instruction: list[MessagePart] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType] + provider: str | None = None + response_model_name: str | None = None + response_id: str | None = None + finish_reasons: list[str] | None = None + input_tokens: int | None = None + output_tokens: int | None = None + attributes: dict[str, Any] = field(default_factory=dict) # pyright: ignore[reportUnknownVariableType] + """Additional attributes to set on spans and/or events. Not set on metrics.""" + metric_attributes: dict[str, Any] = field(default_factory=dict) # pyright: ignore[reportUnknownVariableType] + """Additional attributes to set on metrics. Must be low cardinality. Not set on spans or events.""" + temperature: float | None = None + top_p: float | None = None + frequency_penalty: float | None = None + presence_penalty: float | None = None + max_tokens: int | None = None + stop_sequences: list[str] | None = None + seed: int | None = None + server_address: str | None = None + server_port: int | None = None + + _inference_invocation: InferenceInvocation | None = field( + default=None, init=False, repr=False + ) + + def _start_with_handler( + self, + tracer: Tracer, + metrics_recorder: InvocationMetricsRecorder, + logger: Logger, + ) -> None: + """Create and start an InferenceInvocation from this data container. Called by handler.start_llm().""" + self._inference_invocation = InferenceInvocation( + tracer, + metrics_recorder, + logger, + self.provider or "", + request_model=self.request_model, + input_messages=self.input_messages, + output_messages=self.output_messages, + system_instruction=self.system_instruction, + response_model_name=self.response_model_name, + response_id=self.response_id, + finish_reasons=self.finish_reasons, + input_tokens=self.input_tokens, + output_tokens=self.output_tokens, + temperature=self.temperature, + top_p=self.top_p, + frequency_penalty=self.frequency_penalty, + presence_penalty=self.presence_penalty, + max_tokens=self.max_tokens, + stop_sequences=self.stop_sequences, + seed=self.seed, + server_address=self.server_address, + server_port=self.server_port, + attributes=self.attributes, + metric_attributes=self.metric_attributes, + ) + + def _sync_to_invocation(self) -> None: + inv = self._inference_invocation + if inv is None: + return + inv.provider = self.provider or "" + inv.request_model = self.request_model + inv.input_messages = self.input_messages + inv.output_messages = self.output_messages + inv.system_instruction = self.system_instruction + inv.response_model_name = self.response_model_name + inv.response_id = self.response_id + inv.finish_reasons = self.finish_reasons + inv.input_tokens = self.input_tokens + inv.output_tokens = self.output_tokens + inv.temperature = self.temperature + inv.top_p = self.top_p + inv.frequency_penalty = self.frequency_penalty + inv.presence_penalty = self.presence_penalty + inv.max_tokens = self.max_tokens + inv.stop_sequences = self.stop_sequences + inv.seed = self.seed + inv.server_address = self.server_address + inv.server_port = self.server_port + inv.attributes = self.attributes + inv.metric_attributes = self.metric_attributes + + @property + def span(self) -> Span: + """The underlying span, for back-compat with code that checks span.is_recording().""" + return ( + self._inference_invocation.span + if self._inference_invocation is not None + else INVALID_SPAN + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py new file mode 100644 index 0000000000..624a69160c --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py @@ -0,0 +1,129 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import timeit +from abc import ABC, abstractmethod +from contextvars import Token +from typing import TYPE_CHECKING, Any + +from typing_extensions import TypeAlias + +from opentelemetry._logs import Logger +from opentelemetry.context import Context, attach, detach +from opentelemetry.semconv.attributes import error_attributes +from opentelemetry.trace import INVALID_SPAN as _INVALID_SPAN +from opentelemetry.trace import Span, SpanKind, Tracer, set_span_in_context +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util.genai.types import Error + +if TYPE_CHECKING: + from opentelemetry.util.genai.metrics import InvocationMetricsRecorder + +ContextToken: TypeAlias = Token[Context] + + +class GenAIInvocation(ABC): + """ + Base class for all GenAI invocation types. Manages the lifecycle of a single + GenAI operation (LLM call, embedding, tool execution, workflow, etc.). + + Use the factory methods on TelemetryHandler (start_inference, start_embedding, + start_workflow, start_tool) rather than constructing invocations directly. + """ + + def __init__( + self, + # Individual components instead of TelemetryHandler to avoid a circular + # import between handler.py and the invocation modules. + tracer: Tracer, + metrics_recorder: InvocationMetricsRecorder, + logger: Logger, + operation_name: str, + span_name: str, + span_kind: SpanKind = SpanKind.CLIENT, + attributes: dict[str, Any] | None = None, + metric_attributes: dict[str, Any] | None = None, + ) -> None: + self._tracer = tracer + self._metrics_recorder = metrics_recorder + self._logger = logger + self._operation_name: str = operation_name + self.attributes: dict[str, Any] = ( + {} if attributes is None else attributes + ) + """Additional attributes to set on spans and/or events. Not set on metrics.""" + self.metric_attributes: dict[str, Any] = ( + {} if metric_attributes is None else metric_attributes + ) + """Additional attributes to set on metrics. Must be low cardinality. Not set on spans or events.""" + self.span: Span = _INVALID_SPAN + self._span_context: Context + self._span_name: str = span_name + self._span_kind: SpanKind = span_kind + self._context_token: ContextToken | None = None + self._monotonic_start_s: float | None = None + + def _start(self) -> None: + """Start the invocation span and attach it to the current context.""" + self.span = self._tracer.start_span( + name=self._span_name, + kind=self._span_kind, + ) + self._span_context = set_span_in_context(self.span) + self._monotonic_start_s = timeit.default_timer() + self._context_token = attach(self._span_context) + + def _get_metric_attributes(self) -> dict[str, Any]: + """Return low-cardinality attributes for metric recording.""" + return self.metric_attributes + + def _get_metric_token_counts(self) -> dict[str, int]: # pylint: disable=no-self-use + """Return {token_type: count} for token histogram recording.""" + return {} + + def _apply_error_attributes(self, error: Error) -> None: + """Apply error status and error.type attribute to the span, events, and metrics.""" + error_type = error.type.__qualname__ + self.span.set_status(Status(StatusCode.ERROR, error.message)) + self.attributes[error_attributes.ERROR_TYPE] = error_type + self.metric_attributes[error_attributes.ERROR_TYPE] = error_type + + @abstractmethod + def _apply_finish(self, error: Error | None = None) -> None: + """Apply finish telemetry (attributes, metrics, events).""" + + def _finish(self, error: Error | None = None) -> None: + """Apply finish telemetry and end the span.""" + if self._context_token is None: + return + try: + self._apply_finish(error) + finally: + try: + detach(self._context_token) + except Exception: # pylint: disable=broad-except + pass + self.span.end() + + def stop(self) -> None: + """Finalize the invocation successfully and end its span.""" + self._finish() + + def fail(self, error: Error | BaseException) -> None: + """Fail the invocation and end its span with error status.""" + if isinstance(error, BaseException): + error = Error(type=type(error), message=str(error)) + self._finish(error) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_tool_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_tool_invocation.py new file mode 100644 index 0000000000..1ebbd85bc6 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_tool_invocation.py @@ -0,0 +1,96 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Any + +from opentelemetry._logs import Logger +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.trace import Tracer +from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai.metrics import InvocationMetricsRecorder + + +class ToolInvocation(GenAIInvocation): + """Represents a tool call invocation for execute_tool span tracking. + + Not used as a message part — use ToolCallRequest for that purpose. + + Use handler.start_tool(name) rather than constructing this directly. + + Reference: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-spans.md#execute-tool-span + + Semantic convention attributes for execute_tool spans: + - gen_ai.operation.name: "execute_tool" (Required) + - gen_ai.tool.name: Name of the tool (Recommended) + - gen_ai.tool.call.id: Tool call identifier (Recommended if available) + - gen_ai.tool.type: Type classification - "function", "extension", or "datastore" (Recommended if available) + - gen_ai.tool.description: Tool description (Recommended if available) + - gen_ai.tool.call.arguments: Parameters passed to tool (Opt-In, may contain sensitive data) + - gen_ai.tool.call.result: Result returned by tool (Opt-In, may contain sensitive data) + - error.type: Error type if operation failed (Conditionally Required) + """ + + def __init__( + self, + tracer: Tracer, + metrics_recorder: InvocationMetricsRecorder, + logger: Logger, + name: str, + *, + arguments: Any = None, + tool_call_id: str | None = None, + tool_type: str | None = None, + tool_description: str | None = None, + tool_result: Any = None, + attributes: dict[str, Any] | None = None, + metric_attributes: dict[str, Any] | None = None, + ) -> None: + """Use handler.start_tool(name) or handler.tool(name) instead of calling this directly.""" + _operation_name = GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + super().__init__( + tracer, + metrics_recorder, + logger, + operation_name=_operation_name, + span_name=f"{_operation_name} {name}" if name else _operation_name, + attributes=attributes, + metric_attributes=metric_attributes, + ) + self.name = name + self.arguments = arguments + self.tool_call_id = tool_call_id + self.tool_type = tool_type + self.tool_description = tool_description + self.tool_result = tool_result + self._start() + + def _apply_finish(self, error: Error | None = None) -> None: + if error is not None: + self._apply_error_attributes(error) + optional_attrs = ( + (GenAI.GEN_AI_TOOL_NAME, self.name), + (GenAI.GEN_AI_TOOL_CALL_ID, self.tool_call_id), + (GenAI.GEN_AI_TOOL_TYPE, self.tool_type), + (GenAI.GEN_AI_TOOL_DESCRIPTION, self.tool_description), + ) + attributes: dict[str, Any] = { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name, + **{k: v for k, v in optional_attrs if v is not None}, + } + attributes.update(self.attributes) + self.span.set_attributes(attributes) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_workflow_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_workflow_invocation.py new file mode 100644 index 0000000000..e3b45535d2 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_workflow_invocation.py @@ -0,0 +1,115 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import asdict +from typing import Any + +from opentelemetry._logs import Logger +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.trace import SpanKind, Tracer +from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai.metrics import InvocationMetricsRecorder +from opentelemetry.util.genai.types import ( + InputMessage, + OutputMessage, +) +from opentelemetry.util.genai.utils import ( + ContentCapturingMode, + gen_ai_json_dumps, + get_content_capturing_mode, + is_experimental_mode, +) + + +class WorkflowInvocation(GenAIInvocation): + """ + Represents a predetermined sequence of operations (e.g. agent, LLM, tool, + and retrieval invocations). A workflow groups multiple operations together, + accepting input(s) and producing final output(s). + + Use handler.start_workflow(name) or the handler.workflow(name) context + manager rather than constructing this directly. + """ + + def __init__( + self, + tracer: Tracer, + metrics_recorder: InvocationMetricsRecorder, + logger: Logger, + name: str | None, + *, + input_messages: list[InputMessage] | None = None, + output_messages: list[OutputMessage] | None = None, + attributes: dict[str, Any] | None = None, + metric_attributes: dict[str, Any] | None = None, + ) -> None: + """Use handler.start_workflow(name) or handler.workflow(name) instead of calling this directly.""" + _operation_name = "invoke_workflow" + super().__init__( + tracer, + metrics_recorder, + logger, + operation_name=_operation_name, + span_name=f"{_operation_name} {name}" if name else _operation_name, + span_kind=SpanKind.INTERNAL, + attributes=attributes, + metric_attributes=metric_attributes, + ) + self.name = name + self.input_messages: list[InputMessage] = ( + [] if input_messages is None else input_messages + ) + self.output_messages: list[OutputMessage] = ( + [] if output_messages is None else output_messages + ) + self._start() + + def _get_messages_for_span(self) -> dict[str, Any]: + if not is_experimental_mode() or get_content_capturing_mode() not in ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ): + return {} + optional_attrs = ( + ( + GenAI.GEN_AI_INPUT_MESSAGES, + gen_ai_json_dumps([asdict(m) for m in self.input_messages]) + if self.input_messages + else None, + ), + ( + GenAI.GEN_AI_OUTPUT_MESSAGES, + gen_ai_json_dumps([asdict(m) for m in self.output_messages]) + if self.output_messages + else None, + ), + ) + return { + key: value for key, value in optional_attrs if value is not None + } + + def _apply_finish(self, error: Error | None = None) -> None: + attributes: dict[str, Any] = { + GenAI.GEN_AI_OPERATION_NAME: self._operation_name + } + attributes.update(self._get_messages_for_span()) + if error is not None: + self._apply_error_attributes(error) + attributes.update(self.attributes) + self.span.set_attributes(attributes) + # TODO: Add workflow metrics when supported diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 858e8a8237..b3fe7e91cf 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -17,7 +17,6 @@ This module exposes the `TelemetryHandler` class, which manages the lifecycle of GenAI (Generative AI) invocations and emits telemetry data (spans and related attributes). -It supports starting, stopping, and failing LLM invocations. Classes: - TelemetryHandler: Manages GenAI invocation lifecycles and emits telemetry. @@ -28,44 +27,30 @@ Usage: handler = get_telemetry_handler() - # Create an invocation object with your request data - # The span and context_token attributes are set by the TelemetryHandler, and - # managed by the TelemetryHandler during the lifecycle of the span. - - # Use the context manager to manage the lifecycle of an LLM invocation. - with handler.llm(invocation) as invocation: - # Populate outputs and any additional attributes + # Factory method: construct and start in one call, then stop or fail. + invocation = handler.start_inference("my-provider", request_model="my-model") + invocation.input_messages = [...] + invocation.temperature = 0.7 + try: + # ... call the underlying library ... + invocation.output_messages = [...] + invocation.stop() + except Exception as exc: + invocation.fail(exc) + raise + + # Or use the context manager form — exception handling is automatic. + with handler.inference("my-provider", request_model="my-model") as invocation: + invocation.input_messages = [...] + # ... call the underlying library ... invocation.output_messages = [...] - invocation.attributes.update({"more": "attrs"}) - - # Or, if you prefer to manage the lifecycle manually - invocation = LLMInvocation( - request_model="my-model", - input_messages=[...], - provider="my-provider", - attributes={"custom": "attr"}, - ) - - # Start the invocation (opens a span) - handler.start_llm(invocation) - - # Populate outputs and any additional attributes, then stop (closes the span) - invocation.output_messages = [...] - invocation.attributes.update({"more": "attrs"}) - handler.stop_llm(invocation) - - # Or, in case of error - handler.fail_llm(invocation, Error(type="...", message="...")) """ from __future__ import annotations -import logging -import timeit from contextlib import contextmanager -from typing import Iterator, TypeVar +from typing import Iterator -from opentelemetry import context as otel_context from opentelemetry._logs import ( LoggerProvider, get_logger, @@ -73,51 +58,22 @@ from opentelemetry.metrics import MeterProvider, get_meter from opentelemetry.semconv.schemas import Schemas from opentelemetry.trace import ( - Span, - SpanKind, TracerProvider, get_tracer, - set_span_in_context, ) -from opentelemetry.util.genai.metrics import InvocationMetricsRecorder -from opentelemetry.util.genai.span_utils import ( - _apply_embedding_finish_attributes, - _apply_error_attributes, - _apply_llm_finish_attributes, - _apply_workflow_finish_attributes, - _get_embedding_span_name, - _get_llm_span_name, - _get_workflow_span_name, - _maybe_emit_llm_event, +from opentelemetry.util.genai._inference_invocation import ( + LLMInvocation, ) -from opentelemetry.util.genai.types import ( +from opentelemetry.util.genai._invocation import Error +from opentelemetry.util.genai.invocation import ( EmbeddingInvocation, - Error, - GenAIInvocation, - LLMInvocation, + InferenceInvocation, + ToolInvocation, WorkflowInvocation, ) +from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.version import __version__ -_logger = logging.getLogger(__name__) - - -def _safe_detach(invocation: GenAIInvocation) -> None: - """Detach the context token if still present, as a safety net.""" - if invocation.context_token is not None: - try: - otel_context.detach(invocation.context_token) - except Exception: # pylint: disable=broad-except - pass - if invocation.span is not None: - try: - invocation.span.end() - except Exception: # pylint: disable=broad-except - pass - - -_T = TypeVar("_T", bound=GenAIInvocation) - class TelemetryHandler: """ @@ -138,7 +94,6 @@ def __init__( tracer_provider, schema_url=schema_url, ) - self._metrics_recorder: InvocationMetricsRecorder | None = None meter = get_meter( __name__, meter_provider=meter_provider, schema_url=schema_url ) @@ -150,153 +105,140 @@ def __init__( schema_url=schema_url, ) - def _record_llm_metrics( - self, - invocation: LLMInvocation, - span: Span | None = None, - *, - error_type: str | None = None, - ) -> None: - if self._metrics_recorder is None or span is None: - return - self._metrics_recorder.record( - span, - invocation, - error_type=error_type, - ) + # New-style factory methods: construct + start in one call, handler stored on invocation - @staticmethod - def _record_embedding_metrics( - invocation: EmbeddingInvocation, - span: Span | None = None, + def start_inference( + self, + provider: str, *, - error_type: str | None = None, - ) -> None: - # Metrics recorder currently supports LLMInvocation fields only. - # Keep embedding metrics as a no-op until dedicated embedding - # metric support is added. - return - - def _start(self, invocation: _T) -> _T: - """Start a GenAI invocation and create a pending span entry.""" - if isinstance(invocation, LLMInvocation): - span_name = _get_llm_span_name(invocation) - kind = SpanKind.CLIENT - elif isinstance(invocation, EmbeddingInvocation): - span_name = _get_embedding_span_name(invocation) - kind = SpanKind.CLIENT - elif isinstance(invocation, WorkflowInvocation): - span_name = _get_workflow_span_name(invocation) - kind = SpanKind.INTERNAL - else: - span_name = "" - kind = SpanKind.CLIENT - span = self._tracer.start_span( - name=span_name, - kind=kind, - ) - # Record a monotonic start timestamp (seconds) for duration - # calculation using timeit.default_timer. - invocation.monotonic_start_s = timeit.default_timer() - invocation.span = span - invocation.context_token = otel_context.attach( - set_span_in_context(span) + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + ) -> InferenceInvocation: + """Create and start an LLM inference invocation. + + Set remaining attributes (input_messages, temperature, etc.) on the + returned invocation, then call invocation.stop() or invocation.fail(). + """ + return InferenceInvocation( + self._tracer, + self._metrics_recorder, + self._logger, + provider, + request_model=request_model, + server_address=server_address, + server_port=server_port, ) - return invocation - def _stop(self, invocation: _T) -> _T: - """Finalize a GenAI invocation successfully and end its span.""" - if invocation.context_token is None or invocation.span is None: - # TODO: Provide feedback that this invocation was not started - return invocation + def start_llm(self, invocation: LLMInvocation) -> LLMInvocation: + """Start an LLM invocation. - span = invocation.span - try: - if isinstance(invocation, LLMInvocation): - _apply_llm_finish_attributes(span, invocation) - self._record_llm_metrics(invocation, span) - _maybe_emit_llm_event(self._logger, span, invocation) - elif isinstance(invocation, EmbeddingInvocation): - _apply_embedding_finish_attributes(span, invocation) - self._record_embedding_metrics(invocation, span) - elif isinstance(invocation, WorkflowInvocation): - _apply_workflow_finish_attributes(span, invocation) - # TODO: Add workflow metrics when supported - finally: - # Detach context and end span even if finishing fails - otel_context.detach(invocation.context_token) - span.end() + .. deprecated:: + Use ``handler.start_inference()`` instead. + """ + invocation._start_with_handler( + self._tracer, self._metrics_recorder, self._logger + ) return invocation - def _fail(self, invocation: _T, error: Error) -> _T: - """Fail a GenAI invocation and end its span with error status.""" - if invocation.context_token is None or invocation.span is None: - # TODO: Provide feedback that this invocation was not started - return invocation - - span = invocation.span - error_type = error.type.__qualname__ - try: - if isinstance(invocation, LLMInvocation): - _apply_llm_finish_attributes(span, invocation) - _apply_error_attributes(span, error, error_type) - self._record_llm_metrics( - invocation, span, error_type=error_type - ) - _maybe_emit_llm_event( - self._logger, span, invocation, error_type - ) - elif isinstance(invocation, EmbeddingInvocation): - _apply_embedding_finish_attributes(span, invocation) - _apply_error_attributes(span, error, error_type) - self._record_embedding_metrics( - invocation, span, error_type=error_type - ) - elif isinstance(invocation, WorkflowInvocation): - _apply_workflow_finish_attributes(span, invocation) - _apply_error_attributes(span, error, error_type) - # TODO: Add workflow metrics when supported - finally: - # Detach context and end span even if finishing fails - otel_context.detach(invocation.context_token) - span.end() - return invocation + def start_embedding( + self, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + ) -> EmbeddingInvocation: + """Create and start an Embedding invocation. + + Set remaining attributes (encoding_formats, etc.) on the returned + invocation, then call invocation.stop() or invocation.fail(). + """ + return EmbeddingInvocation( + self._tracer, + self._metrics_recorder, + self._logger, + provider, + request_model=request_model, + server_address=server_address, + server_port=server_port, + ) - def start( + def start_tool( self, - invocation: _T, - ) -> _T: - """Start a GenAI invocation and create a pending span entry.""" - return self._start(invocation) + name: str, + *, + arguments: object = None, + tool_call_id: str | None = None, + tool_type: str | None = None, + tool_description: str | None = None, + ) -> ToolInvocation: + """Create and start a tool invocation. + + Set tool_result on the returned invocation when done, then call + invocation.stop() or invocation.fail(). + """ + return ToolInvocation( + self._tracer, + self._metrics_recorder, + self._logger, + name, + arguments=arguments, + tool_call_id=tool_call_id, + tool_type=tool_type, + tool_description=tool_description, + ) - def stop(self, invocation: _T) -> _T: - """Finalize a GenAI invocation successfully and end its span.""" - return self._stop(invocation) + def start_workflow( + self, + *, + name: str | None = None, + ) -> WorkflowInvocation: + """Create and start a workflow invocation. - def fail(self, invocation: _T, error: Error) -> _T: - """Fail a GenAI invocation and end its span with error status.""" - return self._fail(invocation, error) + Set remaining attributes on the returned invocation, then call + invocation.stop() or invocation.fail(). + """ + return WorkflowInvocation( + self._tracer, self._metrics_recorder, self._logger, name + ) - # LLM-specific convenience methods - def start_llm(self, invocation: LLMInvocation) -> LLMInvocation: - """Start an LLM invocation and create a pending span entry.""" - return self._start(invocation) + def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disable=no-self-use + """Finalize an LLM invocation successfully and end its span. - def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: - """Finalize an LLM invocation successfully and end its span.""" - return self._stop(invocation) + .. deprecated:: + Use ``handler.start_inference()`` and then ``inference.stop()`` instead. + """ + invocation._sync_to_invocation() + if invocation._inference_invocation is not None: + invocation._inference_invocation.stop() + return invocation - def fail_llm( - self, invocation: LLMInvocation, error: Error + def fail_llm( # pylint: disable=no-self-use + self, + invocation: LLMInvocation, + error: Error, ) -> LLMInvocation: - """Fail an LLM invocation and end its span with error status.""" - return self._fail(invocation, error) + """Fail an LLM invocation and end its span with error status. + + .. deprecated:: + Use ``handler.start_inference()`` and then ``inference.fail()`` instead. + """ + invocation._sync_to_invocation() + if invocation._inference_invocation is not None: + invocation._inference_invocation.fail(error) + return invocation @contextmanager - def llm( - self, invocation: LLMInvocation | None = None - ) -> Iterator[LLMInvocation]: - """Context manager for LLM invocations. + def inference( + self, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, + ) -> Iterator[InferenceInvocation]: + """Context manager for LLM inference invocations. Only set data attributes on the invocation object, do not modify the span or context. @@ -304,21 +246,27 @@ def llm( If an exception occurs inside the context, marks the span as error, ends it, and re-raises the original exception. """ - if invocation is None: - invocation = LLMInvocation( - request_model="", - ) - self.start_llm(invocation) + invocation = self.start_inference( + provider=provider, + request_model=request_model, + server_address=server_address, + server_port=server_port, + ) try: yield invocation except Exception as exc: - self.fail_llm(invocation, Error(message=str(exc), type=type(exc))) + invocation.fail(exc) raise - self.stop_llm(invocation) + invocation.stop() @contextmanager def embedding( - self, invocation: EmbeddingInvocation | None = None + self, + provider: str, + *, + request_model: str | None = None, + server_address: str | None = None, + server_port: int | None = None, ) -> Iterator[EmbeddingInvocation]: """Context manager for Embedding invocations. @@ -328,19 +276,55 @@ def embedding( If an exception occurs inside the context, marks the span as error, ends it, and re-raises the original exception. """ - if invocation is None: - invocation = EmbeddingInvocation() - self.start(invocation) + invocation = self.start_embedding( + provider=provider, + request_model=request_model, + server_address=server_address, + server_port=server_port, + ) try: yield invocation except Exception as exc: - self.fail(invocation, Error(message=str(exc), type=type(exc))) + invocation.fail(exc) raise - self.stop(invocation) + invocation.stop() + + @contextmanager + def tool( + self, + name: str, + *, + arguments: object = None, + tool_call_id: str | None = None, + tool_type: str | None = None, + tool_description: str | None = None, + ) -> Iterator[ToolInvocation]: + """Context manager for Tool invocations. + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + invocation = self.start_tool( + name, + arguments=arguments, + tool_call_id=tool_call_id, + tool_type=tool_type, + tool_description=tool_description, + ) + try: + yield invocation + except Exception as exc: + invocation.fail(exc) + raise + invocation.stop() @contextmanager def workflow( - self, invocation: WorkflowInvocation | None = None + self, + name: str | None = None, ) -> Iterator[WorkflowInvocation]: """Context manager for Workflow invocations. @@ -350,33 +334,14 @@ def workflow( If an exception occurs inside the context, marks the span as error, ends it, and re-raises the original exception. """ - if invocation is None: - invocation = WorkflowInvocation() - - try: - self.start(invocation) - except Exception: # pylint: disable=broad-except - _logger.warning( - "Failed to start workflow telemetry", exc_info=True - ) + invocation = self.start_workflow(name=name) try: yield invocation except Exception as exc: - try: - self.fail(invocation, Error(message=str(exc), type=type(exc))) - except Exception: # pylint: disable=broad-except - _logger.warning( - "Failed to record workflow failure", exc_info=True - ) - _safe_detach(invocation) + invocation.fail(exc) raise - - try: - self.stop(invocation) - except Exception: # pylint: disable=broad-except - _logger.warning("Failed to stop workflow telemetry", exc_info=True) - _safe_detach(invocation) + invocation.stop() def get_telemetry_handler( diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py new file mode 100644 index 0000000000..4ac6426ce1 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/invocation.py @@ -0,0 +1,47 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Public re-export of all GenAI invocation types. + +Users can import everything from this single module: + + from opentelemetry.util.genai.invocation import ( + Error, + GenAIInvocation, + InferenceInvocation, + EmbeddingInvocation, + ToolInvocation, + WorkflowInvocation, + ) +""" + +from opentelemetry.util.genai._embedding_invocation import EmbeddingInvocation +from opentelemetry.util.genai._inference_invocation import InferenceInvocation +from opentelemetry.util.genai._invocation import ( + ContextToken, + Error, + GenAIInvocation, +) +from opentelemetry.util.genai._tool_invocation import ToolInvocation +from opentelemetry.util.genai._workflow_invocation import WorkflowInvocation + +__all__ = [ + "ContextToken", + "Error", + "GenAIInvocation", + "InferenceInvocation", + "EmbeddingInvocation", + "ToolInvocation", + "WorkflowInvocation", +] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py index 075cbe60a1..144f4663b6 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py @@ -3,23 +3,18 @@ from __future__ import annotations import timeit -from typing import Dict, Optional +from typing import Optional from opentelemetry.metrics import Histogram, Meter from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) -from opentelemetry.semconv.attributes import ( - error_attributes, - server_attributes, -) -from opentelemetry.trace import Span, set_span_in_context from opentelemetry.util.genai.instruments import ( create_duration_histogram, create_token_histogram, ) -from opentelemetry.util.genai.types import LLMInvocation -from opentelemetry.util.types import AttributeValue + +from ._invocation import GenAIInvocation class InvocationMetricsRecorder: @@ -29,80 +24,30 @@ def __init__(self, meter: Meter): self._duration_histogram: Histogram = create_duration_histogram(meter) self._token_histogram: Histogram = create_token_histogram(meter) - def record( - self, - span: Optional[Span], - invocation: LLMInvocation, - *, - error_type: Optional[str] = None, - ) -> None: + def record(self, invocation: GenAIInvocation) -> None: """Record duration and token metrics for an invocation if possible.""" + attributes = invocation._get_metric_attributes() + token_counts = invocation._get_metric_token_counts() - # pylint: disable=too-many-branches - - if span is None: - return - - token_counts: list[tuple[int, str]] = [] - if invocation.input_tokens is not None: - token_counts.append( - ( - invocation.input_tokens, - GenAI.GenAiTokenTypeValues.INPUT.value, - ) - ) - if invocation.output_tokens is not None: - token_counts.append( - ( - invocation.output_tokens, - GenAI.GenAiTokenTypeValues.OUTPUT.value, - ) - ) - - attributes: Dict[str, AttributeValue] = { - GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.CHAT.value - } - if invocation.request_model: - attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model - if invocation.provider: - attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider - if invocation.response_model_name: - attributes[GenAI.GEN_AI_RESPONSE_MODEL] = ( - invocation.response_model_name - ) - if invocation.server_address: - attributes[server_attributes.SERVER_ADDRESS] = ( - invocation.server_address - ) - if invocation.server_port is not None: - attributes[server_attributes.SERVER_PORT] = invocation.server_port - if invocation.metric_attributes: - attributes.update(invocation.metric_attributes) - - # Calculate duration from span timing or invocation monotonic start duration_seconds: Optional[float] = None - if invocation.monotonic_start_s is not None: + if invocation._monotonic_start_s is not None: duration_seconds = max( - timeit.default_timer() - invocation.monotonic_start_s, + timeit.default_timer() - invocation._monotonic_start_s, 0.0, ) - span_context = set_span_in_context(span) - if error_type: - attributes[error_attributes.ERROR_TYPE] = error_type - if duration_seconds is not None: self._duration_histogram.record( duration_seconds, attributes=attributes, - context=span_context, + context=invocation._span_context, ) - for token_count, token_type in token_counts: + for token_type, token_count in token_counts.items(): self._token_histogram.record( token_count, attributes=attributes | {GenAI.GEN_AI_TOKEN_TYPE: token_type}, - context=span_context, + context=invocation._span_context, ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py deleted file mode 100644 index 0a82462c1b..0000000000 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ /dev/null @@ -1,417 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -from dataclasses import asdict -from typing import Any - -from opentelemetry._logs import Logger, LogRecord -from opentelemetry.context import get_current -from opentelemetry.semconv._incubating.attributes import ( - gen_ai_attributes as GenAI, -) -from opentelemetry.semconv.attributes import ( - error_attributes, - server_attributes, -) -from opentelemetry.trace import ( - Span, -) -from opentelemetry.trace.propagation import set_span_in_context -from opentelemetry.trace.status import Status, StatusCode -from opentelemetry.util.genai.types import ( - EmbeddingInvocation, - Error, - GenAIInvocation, - InputMessage, - LLMInvocation, - MessagePart, - OutputMessage, - WorkflowInvocation, -) -from opentelemetry.util.genai.utils import ( - ContentCapturingMode, - gen_ai_json_dumps, - get_content_capturing_mode, - is_experimental_mode, - should_emit_event, -) - - -def _get_llm_common_attributes( - invocation: LLMInvocation, -) -> dict[str, Any]: - """Get common LLM attributes shared by finish() and error() paths. - - Returns a dictionary of attributes. - """ - # TODO: clean provider name to match GenAiProviderNameValues? - optional_attrs = ( - (GenAI.GEN_AI_REQUEST_MODEL, invocation.request_model), - (GenAI.GEN_AI_PROVIDER_NAME, invocation.provider), - (server_attributes.SERVER_ADDRESS, invocation.server_address), - (server_attributes.SERVER_PORT, invocation.server_port), - ) - - return { - GenAI.GEN_AI_OPERATION_NAME: invocation.operation_name, - **{key: value for key, value in optional_attrs if value is not None}, - } - - -def _get_embedding_common_attributes( - invocation: EmbeddingInvocation, -) -> dict[str, Any]: - """Get common Embedding attributes shared by finish() and error() paths. - - Returns a dictionary of attributes. - """ - optional_attrs = ( - (server_attributes.SERVER_ADDRESS, invocation.server_address), - (server_attributes.SERVER_PORT, invocation.server_port), - ) - - return { - GenAI.GEN_AI_OPERATION_NAME: invocation.operation_name, - GenAI.GEN_AI_PROVIDER_NAME: invocation.provider, - **{key: value for key, value in optional_attrs if value is not None}, - } - - -def _get_span_name( - invocation: GenAIInvocation, -) -> str: - """Get the span name for a GenAI invocation.""" - operation_name = getattr(invocation, "operation_name", None) or "" - request_model = getattr(invocation, "request_model", None) or "" - return f"{operation_name} {request_model}".strip() - - -def _get_llm_span_name(invocation: LLMInvocation) -> str: - """Get the span name for an LLM invocation.""" - return _get_span_name(invocation) - - -def _get_embedding_span_name(invocation: EmbeddingInvocation) -> str: - """Get the span name for an Embedding invocation.""" - return _get_span_name(invocation) - - -def _get_workflow_span_name(invocation: WorkflowInvocation) -> str: - """Get the span name for an Workflow invocation.""" - operation_name = invocation.operation_name - name = invocation.name - return f"{operation_name} {name}" if name else operation_name - - -def _get_messages_attributes_for_span( - input_messages: list[InputMessage], - output_messages: list[OutputMessage], - system_instruction: list[MessagePart] | None = None, -) -> dict[str, Any]: - """Get message attributes formatted for span (JSON string format). - - Returns empty dict if not in experimental mode or content capturing is disabled. - """ - if not is_experimental_mode() or get_content_capturing_mode() not in ( - ContentCapturingMode.SPAN_ONLY, - ContentCapturingMode.SPAN_AND_EVENT, - ): - return {} - - optional_attrs = ( - ( - GenAI.GEN_AI_INPUT_MESSAGES, - gen_ai_json_dumps([asdict(m) for m in input_messages]) - if input_messages - else None, - ), - ( - GenAI.GEN_AI_OUTPUT_MESSAGES, - gen_ai_json_dumps([asdict(m) for m in output_messages]) - if output_messages - else None, - ), - ( - GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, - gen_ai_json_dumps([asdict(p) for p in system_instruction]) - if system_instruction - else None, - ), - ) - - return {key: value for key, value in optional_attrs if value is not None} - - -def _get_llm_messages_attributes_for_event( - input_messages: list[InputMessage], - output_messages: list[OutputMessage], - system_instruction: list[MessagePart] | None = None, -) -> dict[str, Any]: - """Get message attributes formatted for event (structured format). - - Returns empty dict if not in experimental mode or content capturing is disabled. - """ - if not is_experimental_mode() or get_content_capturing_mode() not in ( - ContentCapturingMode.EVENT_ONLY, - ContentCapturingMode.SPAN_AND_EVENT, - ): - return {} - - optional_attrs = ( - ( - GenAI.GEN_AI_INPUT_MESSAGES, - [asdict(m) for m in input_messages] if input_messages else None, - ), - ( - GenAI.GEN_AI_OUTPUT_MESSAGES, - [asdict(m) for m in output_messages] if output_messages else None, - ), - ( - GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, - [asdict(p) for p in system_instruction] - if system_instruction - else None, - ), - ) - - return {key: value for key, value in optional_attrs if value is not None} - - -def _maybe_emit_llm_event( - logger: Logger | None, - span: Span, - invocation: LLMInvocation, - error_type: str | None = None, -) -> None: - """Emit a gen_ai.client.inference.operation.details event to the logger. - - This function creates a LogRecord event following the semantic convention - for gen_ai.client.inference.operation.details as specified in the GenAI - event semantic conventions. - - For more details, see the semantic convention documentation: - https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-events.md#event-eventgen_aiclientinferenceoperationdetails - """ - if not is_experimental_mode() or not should_emit_event() or logger is None: - return - - # Build event attributes by reusing the attribute getter functions - attributes: dict[str, Any] = {} - attributes.update(_get_llm_common_attributes(invocation)) - attributes.update(_get_llm_request_attributes(invocation)) - attributes.update(_get_llm_response_attributes(invocation)) - attributes.update( - _get_llm_messages_attributes_for_event( - invocation.input_messages, - invocation.output_messages, - invocation.system_instruction, - ) - ) - - # Add error.type if operation ended in error - if error_type is not None: - attributes[error_attributes.ERROR_TYPE] = error_type - - # Create and emit the event - context = set_span_in_context(span, get_current()) - event = LogRecord( - event_name="gen_ai.client.inference.operation.details", - attributes=attributes, - context=context, - ) - logger.emit(event) - - -def _apply_llm_finish_attributes( - span: Span, invocation: LLMInvocation -) -> None: - """Apply attributes/messages common to finish() paths.""" - # Update span name - span.update_name(_get_llm_span_name(invocation)) - - # Build all attributes by reusing the attribute getter functions - attributes: dict[str, Any] = {} - attributes.update(_get_llm_common_attributes(invocation)) - attributes.update(_get_llm_request_attributes(invocation)) - attributes.update(_get_llm_response_attributes(invocation)) - attributes.update( - _get_messages_attributes_for_span( - invocation.input_messages, - invocation.output_messages, - invocation.system_instruction, - ) - ) - attributes.update(invocation.attributes) - - # Set all attributes on the span - if attributes: - span.set_attributes(attributes) - - -def _apply_embedding_finish_attributes( - span: Span, invocation: EmbeddingInvocation -) -> None: - """Apply attributes common to embedding finish() paths.""" - # Update span name - span.update_name(_get_embedding_span_name(invocation)) - - # Build all attributes by reusing the attribute getter functions - attributes: dict[str, Any] = {} - attributes.update(_get_embedding_common_attributes(invocation)) - attributes.update(_get_embedding_request_attributes(invocation)) - attributes.update(_get_embedding_response_attributes(invocation)) - - attributes.update(invocation.attributes) - - # Set all attributes on the span - if attributes: - span.set_attributes(attributes) - - -def _apply_error_attributes(span: Span, error: Error, error_type: str) -> None: - """Apply status and error attributes common to error() paths.""" - span.set_status(Status(StatusCode.ERROR, error.message)) - if span.is_recording(): - span.set_attribute(error_attributes.ERROR_TYPE, error_type) - - -def _get_llm_request_attributes( - invocation: LLMInvocation, -) -> dict[str, Any]: - """Get GenAI request semantic convention attributes.""" - optional_attrs = ( - (GenAI.GEN_AI_REQUEST_TEMPERATURE, invocation.temperature), - (GenAI.GEN_AI_REQUEST_TOP_P, invocation.top_p), - (GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, invocation.frequency_penalty), - (GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, invocation.presence_penalty), - (GenAI.GEN_AI_REQUEST_MAX_TOKENS, invocation.max_tokens), - (GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, invocation.stop_sequences), - (GenAI.GEN_AI_REQUEST_SEED, invocation.seed), - ) - - return {key: value for key, value in optional_attrs if value is not None} - - -def _get_embedding_request_attributes( - invocation: EmbeddingInvocation, -) -> dict[str, Any]: - """Get GenAI request semantic convention attributes.""" - optional_attrs = ( - (GenAI.GEN_AI_REQUEST_MODEL, invocation.request_model), - (GenAI.GEN_AI_EMBEDDINGS_DIMENSION_COUNT, invocation.dimension_count), - (GenAI.GEN_AI_REQUEST_ENCODING_FORMATS, invocation.encoding_formats), - ) - - return {key: value for key, value in optional_attrs if value is not None} - - -def _get_llm_response_attributes( - invocation: LLMInvocation, -) -> dict[str, Any]: - """Get GenAI response semantic convention attributes.""" - finish_reasons: list[str] | None - if invocation.finish_reasons is not None: - finish_reasons = invocation.finish_reasons - elif invocation.output_messages: - finish_reasons = [ - message.finish_reason - for message in invocation.output_messages - if message.finish_reason - ] - else: - finish_reasons = None - - # De-duplicate finish reasons - unique_finish_reasons = ( - sorted(set(finish_reasons)) if finish_reasons else None - ) - - optional_attrs = ( - ( - GenAI.GEN_AI_RESPONSE_FINISH_REASONS, - unique_finish_reasons if unique_finish_reasons else None, - ), - (GenAI.GEN_AI_RESPONSE_MODEL, invocation.response_model_name), - (GenAI.GEN_AI_RESPONSE_ID, invocation.response_id), - (GenAI.GEN_AI_USAGE_INPUT_TOKENS, invocation.input_tokens), - (GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, invocation.output_tokens), - ) - - return {key: value for key, value in optional_attrs if value is not None} - - -def _apply_workflow_finish_attributes( - span: Span, invocation: WorkflowInvocation -) -> None: - """Apply attributes/messages common to finish() paths.""" - - # Build all attributes by reusing the attribute getter functions - attributes: dict[str, Any] = {} - attributes.update(_get_workflow_common_attributes(invocation)) - attributes.update( - _get_messages_attributes_for_span( - invocation.input_messages, - invocation.output_messages, - ) - ) - attributes.update(invocation.attributes) - - # Set all attributes on the span - if attributes: - span.set_attributes(attributes) - - -def _get_workflow_common_attributes( - invocation: WorkflowInvocation, -) -> dict[str, Any]: - """Get common Workflow attributes shared by finish() and error() paths. - - Returns a dictionary of attributes. - """ - return { - GenAI.GEN_AI_OPERATION_NAME: invocation.operation_name, - } - - -def _get_embedding_response_attributes( - invocation: EmbeddingInvocation, -) -> dict[str, Any]: - """Get GenAI response semantic convention attributes.""" - optional_attrs = ( - (GenAI.GEN_AI_RESPONSE_MODEL, invocation.response_model_name), - (GenAI.GEN_AI_USAGE_INPUT_TOKENS, invocation.input_tokens), - ) - - return {key: value for key, value in optional_attrs if value is not None} - - -__all__ = [ - "_apply_llm_finish_attributes", - "_apply_error_attributes", - "_get_llm_common_attributes", - "_get_llm_request_attributes", - "_get_llm_response_attributes", - "_get_llm_span_name", - "_maybe_emit_llm_event", - "_get_workflow_common_attributes", - "_apply_workflow_finish_attributes", - "_apply_embedding_finish_attributes", - "_get_embedding_common_attributes", - "_get_embedding_request_attributes", - "_get_embedding_response_attributes", - "_get_embedding_span_name", - "_get_workflow_span_name", -] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 6d59f03bf5..103c168023 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -14,20 +14,28 @@ from __future__ import annotations -from contextvars import Token -from dataclasses import dataclass, field +from dataclasses import dataclass from enum import Enum -from typing import Any, Literal, Type, Union +from typing import TYPE_CHECKING, Any, Literal, Type, Union -from typing_extensions import TypeAlias +if TYPE_CHECKING: + from opentelemetry.util.genai._inference_invocation import ( # pylint: disable=useless-import-alias + LLMInvocation as LLMInvocation, # noqa: PLC0414 + ) + from opentelemetry.util.genai._invocation import ( # pylint: disable=useless-import-alias + GenAIInvocation as GenAIInvocation, # noqa: PLC0414 + ) -from opentelemetry.context import Context -from opentelemetry.semconv._incubating.attributes import ( - gen_ai_attributes as GenAI, -) -from opentelemetry.trace import Span -ContextToken: TypeAlias = Token[Context] +class ContentCapturingMode(Enum): + # Do not capture content (default). + NO_CONTENT = 0 + # Only capture content in spans. + SPAN_ONLY = 1 + # Only capture content in events. + EVENT_ONLY = 2 + # Capture content in both spans and events. + SPAN_AND_EVENT = 3 @dataclass() @@ -41,23 +49,12 @@ class GenericPart: type: Literal["generic"] = "generic" -class ContentCapturingMode(Enum): - # Do not capture content (default). - NO_CONTENT = 0 - # Only capture content in spans. - SPAN_ONLY = 1 - # Only capture content in events. - EVENT_ONLY = 2 - # Capture content in both spans and events. - SPAN_AND_EVENT = 3 - - @dataclass() class ToolCallRequest: """Represents a tool call requested by the model (message part only). Use this for tool calls in message history. For execution tracking with spans - and metrics, use ToolCall instead. + and metrics, use ToolInvocation instead. This model is specified as part of semconv in `GenAI messages Python models - ToolCallRequestPart `__. @@ -84,7 +81,7 @@ class ToolCallResponse: @dataclass() class ServerToolCall: - """Represents a server-side tool call invocation. + """Represents a server-side tool call. Server tool calls are executed by the model provider on the server side rather than by the client application. Provider-specific tools (e.g., code_interpreter, @@ -238,187 +235,21 @@ class OutputMessage: finish_reason: str | FinishReason -def _new_input_messages() -> list[InputMessage]: - return [] - - -def _new_output_messages() -> list[OutputMessage]: - return [] - - -def _new_system_instruction() -> list[MessagePart]: - return [] - - -def _new_str_any_dict() -> dict[str, Any]: - return {} - - -@dataclass -class GenAIInvocation: - context_token: ContextToken | None = None - span: Span | None = None - attributes: dict[str, Any] = field(default_factory=_new_str_any_dict) - error_type: str | None = None - - monotonic_start_s: float | None = None - """ - Monotonic start time in seconds (from timeit.default_timer) used for - duration calculations to avoid mixing clock sources. This is populated - by the TelemetryHandler when starting an invocation. - """ - - -@dataclass -class WorkflowInvocation(GenAIInvocation): - """ - Represents predetermined static sequence of operations eg: Agent, LLM, tool, and retrieval invocations. - A workflow groups multiple operations together, accepting input(s) and producing final output(s). - """ - - name: str = "" - operation_name: str = "invoke_workflow" - input_messages: list[InputMessage] = field( - default_factory=_new_input_messages - ) - output_messages: list[OutputMessage] = field( - default_factory=_new_output_messages - ) - - def __post_init__(self) -> None: - self.operation_name = "invoke_workflow" - - @dataclass -class LLMInvocation(GenAIInvocation): - """ - Represents a single LLM call invocation. When creating an LLMInvocation object, - only update the data attributes. The span and context_token attributes are - set by the TelemetryHandler. - """ - - operation_name: str = GenAI.GenAiOperationNameValues.CHAT.value - request_model: str | None = None - input_messages: list[InputMessage] = field( - default_factory=_new_input_messages - ) - output_messages: list[OutputMessage] = field( - default_factory=_new_output_messages - ) - system_instruction: list[MessagePart] = field( - default_factory=_new_system_instruction - ) - provider: str | None = None - response_model_name: str | None = None - response_id: str | None = None - finish_reasons: list[str] | None = None - input_tokens: int | None = None - output_tokens: int | None = None - attributes: dict[str, Any] = field(default_factory=_new_str_any_dict) - """ - Additional attributes to set on spans and/or events. These attributes - will not be set on metrics. - """ - metric_attributes: dict[str, Any] = field( - default_factory=_new_str_any_dict - ) - """ - Additional attributes to set on metrics. Must be of a low cardinality. - These attributes will not be set on spans or events. - """ - temperature: float | None = None - top_p: float | None = None - frequency_penalty: float | None = None - presence_penalty: float | None = None - max_tokens: int | None = None - stop_sequences: list[str] | None = None - seed: int | None = None - server_address: str | None = None - server_port: int | None = None - - -@dataclass -class EmbeddingInvocation(GenAIInvocation): - """ - Represents a single embedding model invocation. When creating an - EmbeddingInvocation object, only update the data attributes. The span - and context_token attributes are set by the TelemetryHandler. - """ - - operation_name: str = GenAI.GenAiOperationNameValues.EMBEDDINGS.value - request_model: str | None = None - provider: str | None = None # e.g., azure.ai.openai, openai, aws.bedrock - server_address: str | None = None - server_port: int | None = None - - # encoding_formats can be multi-value -> combinational cardinality risk. - # Keep on spans/events only. - encoding_formats: list[str] | None = None - input_tokens: int | None = None - dimension_count: int | None = None - response_model_name: str | None = None - - attributes: dict[str, Any] = field(default_factory=_new_str_any_dict) - """ - Additional attributes to set on spans and/or events. These attributes - will not be set on metrics. - """ - - metric_attributes: dict[str, Any] = field( - default_factory=_new_str_any_dict - ) - """ - Additional attributes to set on metrics. Must be of a low cardinality. - These attributes will not be set on spans or events. - """ - - -@dataclass() -class ToolCall(GenAIInvocation): - """Represents a tool call for execution tracking with spans and metrics. - - This type extends GenAIInvocation (like LLMInvocation) for consistent lifecycle - management across all invocation types. It is NOT used as a MessagePart directly - - use ToolCallRequest for that purpose. - - Inherits from GenAIInvocation: - - context_token: Context tracking for span lifecycle - - span: Active span reference - - attributes: Custom attributes dict for extensibility - - Reference: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-spans.md#execute-tool-span - - Semantic convention attributes for execute_tool spans: - - gen_ai.operation.name: "execute_tool" (Required) - - gen_ai.tool.name: Name of the tool (Recommended) - - gen_ai.tool.call.id: Tool call identifier (Recommended if available) - - gen_ai.tool.type: Type classification - "function", "extension", or "datastore" (Recommended if available) - - gen_ai.tool.description: Tool description (Recommended if available) - - gen_ai.tool.call.arguments: Parameters passed to tool (Opt-In, may contain sensitive data) - - gen_ai.tool.call.result: Result returned by tool (Opt-In, may contain sensitive data) - - error.type: Error type if operation failed (Conditionally Required) - """ - - # Message identification fields (same as ToolCallRequest) - # Note: These are required fields but must have defaults due to dataclass inheritance - name: str = "" - arguments: Any = None - id: str | None = None - type: Literal["tool_call"] = "tool_call" +class Error: + message: str + type: Type[BaseException] - # Execution tracking fields (used for execute_tool spans): - # gen_ai.tool.type - Tool type: "function", "extension", or "datastore" - tool_type: str | None = None - # gen_ai.tool.description - Description of what the tool does - tool_description: str | None = None - # gen_ai.tool.call.result - Result returned by the tool (Opt-In, may contain sensitive data) - tool_result: Any = None - # Timing field (not inherited from GenAIInvocation, matches LLMInvocation pattern) - monotonic_start_s: float | None = None +def __getattr__(name: str) -> object: + if name == "GenAIInvocation": + import opentelemetry.util.genai.invocation as _inv # noqa: PLC0415 # pylint: disable=import-outside-toplevel + return _inv.GenAIInvocation + if name == "LLMInvocation": + from opentelemetry.util.genai._inference_invocation import ( # noqa: PLC0415 # pylint: disable=import-outside-toplevel + LLMInvocation, + ) -@dataclass -class Error: - message: str - type: Type[BaseException] + return LLMInvocation + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/util/opentelemetry-util-genai/tests/test_handler_metrics.py b/util/opentelemetry-util-genai/tests/test_handler_metrics.py index b45383abfe..8c758faea7 100644 --- a/util/opentelemetry-util-genai/tests/test_handler_metrics.py +++ b/util/opentelemetry-util-genai/tests/test_handler_metrics.py @@ -9,7 +9,7 @@ from opentelemetry.semconv.schemas import Schemas from opentelemetry.test.test_base import TestBase from opentelemetry.util.genai.handler import TelemetryHandler -from opentelemetry.util.genai.types import Error, LLMInvocation +from opentelemetry.util.genai.types import Error _DEFAULT_SCHEMA_URL = Schemas.V1_37_0.value @@ -22,19 +22,18 @@ def test_stop_llm_records_duration_and_tokens(self) -> None: tracer_provider=self.tracer_provider, meter_provider=self.meter_provider, ) - invocation = LLMInvocation(request_model="model", provider="prov") - invocation.input_tokens = 5 - invocation.output_tokens = 7 # Patch default_timer during start to ensure monotonic_start_s with patch("timeit.default_timer", return_value=1000.0): - handler.start_llm(invocation) + invocation = handler.start_inference("prov", request_model="model") + invocation.input_tokens = 5 + invocation.output_tokens = 7 # Simulate 2 seconds of elapsed monotonic time (seconds) with patch( "timeit.default_timer", return_value=1002.0, ): - handler.stop_llm(invocation) + invocation.stop() self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL) metrics = self._harvest_metrics() @@ -80,17 +79,19 @@ def test_stop_llm_records_duration_and_tokens_with_additional_attributes( meter_provider=self.meter_provider, ) - invocation = LLMInvocation(request_model="model", provider="prov") + invocation = handler.start_inference( + "prov", + request_model="model", + server_address="custom.server.com", + server_port=42, + ) invocation.input_tokens = 5 invocation.output_tokens = 7 - invocation.server_address = "custom.server.com" - invocation.server_port = 42 - handler.start_llm(invocation) invocation.metric_attributes = { "custom.attribute": "custom_value", } invocation.attributes = {"should not be on metrics": "value"} - handler.stop_llm(invocation) + invocation.stop() self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL) metrics = self._harvest_metrics() @@ -115,18 +116,17 @@ def test_fail_llm_records_error_and_available_tokens(self) -> None: tracer_provider=self.tracer_provider, meter_provider=self.meter_provider, ) - invocation = LLMInvocation(request_model="err-model", provider=None) - invocation.input_tokens = 11 # Patch default_timer during start to ensure monotonic_start_s with patch("timeit.default_timer", return_value=2000.0): - handler.start_llm(invocation) + invocation = handler.start_inference("", request_model="err-model") + invocation.input_tokens = 11 error = Error(message="boom", type=ValueError) with patch( "timeit.default_timer", return_value=2001.0, ): - handler.fail_llm(invocation, error) + invocation.fail(error) self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL) metrics = self._harvest_metrics() @@ -162,7 +162,7 @@ def _harvest_metrics( resource_metrics is the raw ResourceMetrics list for scope-level assertions (e.g. schema_url). """ - metrics = self.get_sorted_metrics(SCOPE) + metrics = self.get_sorted_metrics() metrics_by_name: Dict[str, List[Any]] = {} for metric in metrics or []: points = metric.data.data_points or [] diff --git a/util/opentelemetry-util-genai/tests/test_handler_workflow.py b/util/opentelemetry-util-genai/tests/test_handler_workflow.py index d3af4e9ec7..3bf689a14b 100644 --- a/util/opentelemetry-util-genai/tests/test_handler_workflow.py +++ b/util/opentelemetry-util-genai/tests/test_handler_workflow.py @@ -13,15 +13,15 @@ from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) -from opentelemetry.trace import SpanKind +from opentelemetry.trace import INVALID_SPAN, SpanKind from opentelemetry.trace.status import StatusCode from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.invocation import WorkflowInvocation from opentelemetry.util.genai.types import ( Error, InputMessage, OutputMessage, Text, - WorkflowInvocation, ) @@ -47,71 +47,55 @@ class TelemetryHandlerWorkflowTest(_WorkflowTestBase): # start_workflow # ------------------------------------------------------------------ - def test_operation_name_is_immutable(self) -> None: - invocation = WorkflowInvocation(name="wf", operation_name="custom_op") - self.assertEqual(invocation.operation_name, "invoke_workflow") - def test_start_workflow_creates_span(self) -> None: - invocation = WorkflowInvocation(name="my_workflow") - self.handler.start(invocation) - - self.assertIsNotNone(invocation.span) - self.assertIsNotNone(invocation.context_token) - self.assertIsNotNone(invocation.monotonic_start_s) - - # Clean up - self.handler.stop(invocation) + invocation = self.handler.start_workflow(name="my_workflow") + self.assertIsNot(invocation.span, INVALID_SPAN) + invocation.stop() def test_start_workflow_span_name(self) -> None: - invocation = WorkflowInvocation(name="my_pipeline") - self.handler.start(invocation) - self.handler.stop(invocation) + invocation = self.handler.start_workflow(name="my_pipeline") + invocation.stop() spans = self._get_finished_spans() self.assertEqual(len(spans), 1) self.assertEqual(spans[0].name, "invoke_workflow my_pipeline") def test_start_workflow_span_name_without_name(self) -> None: - invocation = WorkflowInvocation() - self.handler.start(invocation) - self.handler.stop(invocation) + invocation = self.handler.start_workflow(name=None) + invocation.stop() spans = self._get_finished_spans() self.assertEqual(len(spans), 1) self.assertEqual(spans[0].name, "invoke_workflow") def test_start_workflow_span_kind_is_internal(self) -> None: - invocation = WorkflowInvocation(name="wf") - self.handler.start(invocation) - self.handler.stop(invocation) + invocation = self.handler.start_workflow(name="wf") + invocation.stop() spans = self._get_finished_spans() self.assertEqual(len(spans), 1) self.assertEqual(spans[0].kind, SpanKind.INTERNAL) def test_start_workflow_records_monotonic_start(self) -> None: - invocation = WorkflowInvocation(name="wf") with patch("timeit.default_timer", return_value=500.0): - self.handler.start(invocation) - self.assertEqual(invocation.monotonic_start_s, 500.0) - self.handler.stop(invocation) + invocation = self.handler.start_workflow(name="wf") + self.assertEqual(invocation._monotonic_start_s, 500.0) + invocation.stop() # ------------------------------------------------------------------ # stop_workflow # ------------------------------------------------------------------ def test_stop_workflow_ends_span(self) -> None: - invocation = WorkflowInvocation(name="wf") - self.handler.start(invocation) - self.handler.stop(invocation) + invocation = self.handler.start_workflow(name="wf") + invocation.stop() spans = self._get_finished_spans() self.assertEqual(len(spans), 1) def test_stop_workflow_sets_operation_name_attribute(self) -> None: - invocation = WorkflowInvocation(name="wf") - self.handler.start(invocation) - self.handler.stop(invocation) + invocation = self.handler.start_workflow(name="wf") + invocation.stop() spans = self._get_finished_spans() self.assertEqual( @@ -120,36 +104,27 @@ def test_stop_workflow_sets_operation_name_attribute(self) -> None: ) def test_stop_workflow_sets_custom_attributes(self) -> None: - invocation = WorkflowInvocation(name="wf") + invocation = self.handler.start_workflow(name="wf") invocation.attributes["custom.key"] = "custom_value" - self.handler.start(invocation) - self.handler.stop(invocation) + invocation.stop() spans = self._get_finished_spans() self.assertEqual(spans[0].attributes["custom.key"], "custom_value") - def test_stop_workflow_noop_when_not_started(self) -> None: - invocation = WorkflowInvocation(name="wf") - # Not started — span and context_token are None - result = self.handler.stop(invocation) - self.assertIs(result, invocation) - self.assertEqual(len(self._get_finished_spans()), 0) - def test_stop_workflow_returns_invocation(self) -> None: - invocation = WorkflowInvocation(name="wf") - self.handler.start(invocation) - result = self.handler.stop(invocation) - self.assertIs(result, invocation) + invocation = self.handler.start_workflow(name="wf") + invocation.stop() + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) # ------------------------------------------------------------------ # fail_workflow # ------------------------------------------------------------------ def test_fail_workflow_sets_error_status(self) -> None: - invocation = WorkflowInvocation(name="wf") - self.handler.start(invocation) + invocation = self.handler.start_workflow(name="wf") error = Error(message="something broke", type=RuntimeError) - self.handler.fail(invocation, error) + invocation.fail(error) spans = self._get_finished_spans() self.assertEqual(len(spans), 1) @@ -157,19 +132,17 @@ def test_fail_workflow_sets_error_status(self) -> None: self.assertEqual(spans[0].status.description, "something broke") def test_fail_workflow_sets_error_type_attribute(self) -> None: - invocation = WorkflowInvocation(name="wf") - self.handler.start(invocation) + invocation = self.handler.start_workflow(name="wf") error = Error(message="bad", type=ValueError) - self.handler.fail(invocation, error) + invocation.fail(error) spans = self._get_finished_spans() self.assertEqual(spans[0].attributes["error.type"], "ValueError") def test_fail_workflow_sets_operation_name_attribute(self) -> None: - invocation = WorkflowInvocation(name="wf") - self.handler.start(invocation) + invocation = self.handler.start_workflow(name="wf") error = Error(message="fail", type=TypeError) - self.handler.fail(invocation, error) + invocation.fail(error) spans = self._get_finished_spans() self.assertEqual( @@ -177,19 +150,12 @@ def test_fail_workflow_sets_operation_name_attribute(self) -> None: "invoke_workflow", ) - def test_fail_workflow_noop_when_not_started(self) -> None: - invocation = WorkflowInvocation(name="wf") - error = Error(message="fail", type=RuntimeError) - result = self.handler.fail(invocation, error) - self.assertIs(result, invocation) - self.assertEqual(len(self._get_finished_spans()), 0) - - def test_fail_workflow_returns_invocation(self) -> None: - invocation = WorkflowInvocation(name="wf") - self.handler.start(invocation) - error = Error(message="err", type=RuntimeError) - result = self.handler.fail(invocation, error) - self.assertIs(result, invocation) + def test_fail_workflow_ends_span(self) -> None: + invocation = self.handler.start_workflow(name="wf") + invocation.fail(Error(message="err", type=RuntimeError)) + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].status.status_code, StatusCode.ERROR) class TelemetryHandlerWorkflowContextManagerTest(_WorkflowTestBase): @@ -198,10 +164,8 @@ class TelemetryHandlerWorkflowContextManagerTest(_WorkflowTestBase): # ------------------------------------------------------------------ def test_workflow_context_manager_creates_and_ends_span(self) -> None: - invocation = WorkflowInvocation(name="ctx_wf") - with self.handler.workflow(invocation) as inv: - self.assertIsNotNone(inv.span) - self.assertIsNotNone(inv.context_token) + with self.handler.workflow(name="ctx_wf") as inv: + self.assertIsNot(inv.span, INVALID_SPAN) spans = self._get_finished_spans() self.assertEqual(len(spans), 1) @@ -210,30 +174,27 @@ def test_workflow_context_manager_creates_and_ends_span(self) -> None: def test_workflow_context_manager_default_invocation(self) -> None: with self.handler.workflow() as inv: self.assertIsInstance(inv, WorkflowInvocation) - self.assertEqual(inv.name, "") - self.assertEqual(inv.operation_name, "invoke_workflow") + self.assertIsNone(inv.name) + self.assertEqual(inv._operation_name, "invoke_workflow") spans = self._get_finished_spans() self.assertEqual(len(spans), 1) def test_workflow_context_manager_sets_attributes_on_span(self) -> None: - invocation = WorkflowInvocation(name="wf") - with self.handler.workflow(invocation) as inv: + with self.handler.workflow("wf") as inv: inv.attributes["my.attr"] = "hello" spans = self._get_finished_spans() self.assertEqual(spans[0].attributes["my.attr"], "hello") def test_workflow_context_manager_reraises_exception(self) -> None: - invocation = WorkflowInvocation(name="wf") with pytest.raises(ValueError, match="test error"): - with self.handler.workflow(invocation): + with self.handler.workflow("wf"): raise ValueError("test error") def test_workflow_context_manager_marks_error_on_exception(self) -> None: - invocation = WorkflowInvocation(name="wf") with pytest.raises(ValueError): - with self.handler.workflow(invocation): + with self.handler.workflow("wf"): raise ValueError("boom") spans = self._get_finished_spans() @@ -243,8 +204,7 @@ def test_workflow_context_manager_marks_error_on_exception(self) -> None: self.assertEqual(spans[0].attributes["error.type"], "ValueError") def test_workflow_context_manager_success_has_unset_status(self) -> None: - invocation = WorkflowInvocation(name="wf") - with self.handler.workflow(invocation): + with self.handler.workflow("wf"): pass spans = self._get_finished_spans() @@ -255,13 +215,9 @@ def test_workflow_context_manager_with_messages(self) -> None: out = OutputMessage( role="assistant", parts=[Text(content="hi")], finish_reason="stop" ) - invocation = WorkflowInvocation( - name="msg_wf", - input_messages=[inp], - output_messages=[out], - ) - with self.handler.workflow(invocation): - pass + with self.handler.workflow("msg_wf") as inv: + inv.input_messages = [inp] + inv.output_messages = [out] spans = self._get_finished_spans() self.assertEqual(len(spans), 1) @@ -269,42 +225,3 @@ def test_workflow_context_manager_with_messages(self) -> None: spans[0].attributes[GenAI.GEN_AI_OPERATION_NAME], "invoke_workflow", ) - - def test_workflow_context_manager_swallows_start_failure(self) -> None: - """workflow() should yield even if start_workflow raises.""" - invocation = WorkflowInvocation(name="wf") - with patch.object( - self.handler, - "start", - side_effect=RuntimeError("start failed"), - ): - # Should not raise — the exception is swallowed with a warning - with self.handler.workflow(invocation) as inv: - self.assertIs(inv, invocation) - - def test_workflow_context_manager_swallows_stop_failure(self) -> None: - """workflow() should not raise if stop_workflow fails.""" - invocation = WorkflowInvocation(name="wf") - with patch.object( - self.handler, - "stop", - side_effect=RuntimeError("stop failed"), - ): - # Should not raise - with self.handler.workflow(invocation): - pass - - def test_workflow_context_manager_swallows_fail_workflow_failure( - self, - ) -> None: - """workflow() should still re-raise the original exception even if - fail_workflow itself raises.""" - invocation = WorkflowInvocation(name="wf") - with patch.object( - self.handler, - "fail", - side_effect=RuntimeError("fail broke"), - ): - with pytest.raises(ValueError, match="original"): - with self.handler.workflow(invocation): - raise ValueError("original") diff --git a/util/opentelemetry-util-genai/tests/test_toolcall.py b/util/opentelemetry-util-genai/tests/test_toolcall.py index d076f5e72b..abf7f637ae 100644 --- a/util/opentelemetry-util-genai/tests/test_toolcall.py +++ b/util/opentelemetry-util-genai/tests/test_toolcall.py @@ -12,20 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for ToolCallRequest and ToolCall inheritance structure""" +"""Tests for ToolCallRequest and ToolInvocation inheritance structure""" import pytest +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.invocation import GenAIInvocation from opentelemetry.util.genai.types import ( - GenAIInvocation, InputMessage, ServerToolCall, ServerToolCallResponse, - ToolCall, ToolCallRequest, ) +def _make_handler() -> TelemetryHandler: + return TelemetryHandler(tracer_provider=TracerProvider()) + + def test_toolcallrequest_is_message_part(): """ToolCallRequest is for message parts only""" tcr = ToolCallRequest( @@ -36,25 +41,40 @@ def test_toolcallrequest_is_message_part(): def test_toolcall_inherits_from_genaiinvocation(): - """ToolCall inherits from GenAIInvocation for lifecycle management""" - tc = ToolCall(name="get_weather", arguments={"city": "Paris"}) + """ToolInvocation inherits from GenAIInvocation for lifecycle management""" + handler = _make_handler() + tc = handler.start_tool("get_weather", arguments={"city": "Paris"}) assert isinstance(tc, GenAIInvocation) assert not isinstance(tc, ToolCallRequest) + tc.stop() def test_toolcall_has_attributes_dict(): - """ToolCall inherits attributes dict from GenAIInvocation""" - tc = ToolCall(name="test") + """ToolInvocation inherits attributes dict from GenAIInvocation""" + handler = _make_handler() + tc = handler.start_tool("test") tc.attributes["custom.key"] = "value" assert tc.attributes["custom.key"] == "value" + tc.stop() -def test_toolcall_in_message_part_union(): - """ToolCall can be used in messages despite not inheriting from ToolCallRequest""" - tc = ToolCall(name="get_weather", arguments={"city": "Paris"}) +def test_toolcallrequest_in_message_part_union(): + """ToolCallRequest (not ToolInvocation) is the correct type for message parts""" + tc = ToolCallRequest( + name="get_weather", arguments={"city": "Paris"}, id="call_123" + ) msg = InputMessage(role="assistant", parts=[tc]) assert len(msg.parts) == 1 - assert isinstance(msg.parts[0], GenAIInvocation) + assert isinstance(msg.parts[0], ToolCallRequest) + assert not isinstance(msg.parts[0], GenAIInvocation) + + +def test_toolcall_operation_name(): + """ToolInvocation operation_name is fixed to execute_tool""" + handler = _make_handler() + tc = handler.start_tool("my_tool") + assert tc._operation_name == "execute_tool" + tc.stop() def test_server_tool_call_basic(): diff --git a/util/opentelemetry-util-genai/tests/test_utils.py b/util/opentelemetry-util-genai/tests/test_utils.py index ee0e63d852..31629c5b0c 100644 --- a/util/opentelemetry-util-genai/tests/test_utils.py +++ b/util/opentelemetry-util-genai/tests/test_utils.py @@ -49,9 +49,7 @@ from opentelemetry.util.genai.handler import get_telemetry_handler from opentelemetry.util.genai.types import ( ContentCapturingMode, - EmbeddingInvocation, InputMessage, - LLMInvocation, MessagePart, OutputMessage, Text, @@ -248,25 +246,23 @@ def test_llm_start_and_stop_creates_span(self): # pylint: disable=no-self-use chat_generation = _create_output_message("hello back") system_instruction = _create_system_instruction() - with self.telemetry_handler.llm() as invocation: - for attr, value in { - "request_model": "test-model", - "input_messages": [message], - "system_instruction": system_instruction, - "provider": "test-provider", - "attributes": {"custom_attr": "value"}, - "temperature": 0.5, - "top_p": 0.9, - "stop_sequences": ["stop"], - "finish_reasons": ["stop"], - "response_model_name": "test-response-model", - "response_id": "response-id", - "input_tokens": 321, - "output_tokens": 654, - "server_address": "custom.server.com", - "server_port": 42, - }.items(): - setattr(invocation, attr, value) + with self.telemetry_handler.inference( + "test-provider", + request_model="test-model", + server_address="custom.server.com", + server_port=42, + ) as invocation: + invocation.input_messages = [message] + invocation.system_instruction = system_instruction + invocation.attributes = {"custom_attr": "value"} + invocation.temperature = 0.5 + invocation.top_p = 0.9 + invocation.stop_sequences = ["stop"] + invocation.finish_reasons = ["stop"] + invocation.response_model_name = "test-response-model" + invocation.response_id = "response-id" + invocation.input_tokens = 321 + invocation.output_tokens = 654 assert invocation.span is not None invocation.output_messages = [chat_generation] invocation.attributes.update({"extra": "info"}) @@ -331,18 +327,15 @@ def test_llm_manual_start_and_stop_creates_span(self): message = _create_input_message("hi") chat_generation = _create_output_message("ok") - invocation = LLMInvocation( - request_model="manual-model", - input_messages=[message], - provider="test-provider", - attributes={"manual": True}, + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="manual-model" ) - - self.telemetry_handler.start_llm(invocation) + invocation.input_messages = [message] + invocation.attributes["manual"] = True assert invocation.span is not None invocation.output_messages = [chat_generation] invocation.attributes.update({"extra_manual": "yes"}) - self.telemetry_handler.stop_llm(invocation) + invocation.stop() span = _get_single_span(self.span_exporter) assert span.name == "chat manual-model" @@ -365,19 +358,16 @@ def test_llm_manual_start_and_stop_creates_span(self): ) def test_llm_span_finish_reasons_without_output_messages(self): - invocation = LLMInvocation( - request_model="model-without-output", - provider="test-provider", - finish_reasons=["length"], - response_model_name="alt-model", - response_id="resp-001", - input_tokens=12, - output_tokens=34, - ) - - self.telemetry_handler.start_llm(invocation) + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="model-without-output" + ) + invocation.finish_reasons = ["length"] + invocation.response_model_name = "alt-model" + invocation.response_id = "resp-001" + invocation.input_tokens = 12 + invocation.output_tokens = 34 assert invocation.span is not None - self.telemetry_handler.stop_llm(invocation) + invocation.stop() span = _get_single_span(self.span_exporter) _assert_span_time_order(span) @@ -396,55 +386,46 @@ def test_llm_span_finish_reasons_without_output_messages(self): }, ) - def test_llm_span_finish_reasons_deduplicated_from_invocation(self): - invocation = LLMInvocation( - request_model="model-dedup", - provider="test-provider", - finish_reasons=["stop", "length", "stop"], + def test_llm_span_finish_reasons_from_invocation(self): + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="model-reasons" ) - - self.telemetry_handler.start_llm(invocation) + invocation.finish_reasons = ["stop", "length", "stop"] assert invocation.span is not None - self.telemetry_handler.stop_llm(invocation) + invocation.stop() span = _get_single_span(self.span_exporter) attrs = _get_span_attributes(span) self.assertEqual( attrs[GenAI.GEN_AI_RESPONSE_FINISH_REASONS], - ("length", "stop"), + ("stop", "length", "stop"), ) - def test_llm_span_finish_reasons_deduplicated_from_output_messages(self): - invocation = LLMInvocation( - request_model="model-output-dedup", - provider="test-provider", + def test_llm_span_finish_reasons_from_output_messages(self): + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="model-output-reasons" ) - - self.telemetry_handler.start_llm(invocation) assert invocation.span is not None invocation.output_messages = [ _create_output_message("response-1", finish_reason="stop"), _create_output_message("response-2", finish_reason="length"), _create_output_message("response-3", finish_reason="stop"), ] - self.telemetry_handler.stop_llm(invocation) + invocation.stop() span = _get_single_span(self.span_exporter) attrs = _get_span_attributes(span) self.assertEqual( attrs[GenAI.GEN_AI_RESPONSE_FINISH_REASONS], - ("length", "stop"), + ("stop", "length", "stop"), ) def test_llm_span_uses_expected_schema_url(self): - invocation = LLMInvocation( - request_model="schema-model", - provider="schema-provider", + invocation = self.telemetry_handler.start_inference( + "schema-provider", request_model="schema-model" ) - - self.telemetry_handler.start_llm(invocation) assert invocation.span is not None - self.telemetry_handler.stop_llm(invocation) + invocation.stop() span = _get_single_span(self.span_exporter) instrumentation = getattr(span, "instrumentation_scope", None) @@ -463,14 +444,11 @@ def test_llm_span_uses_expected_schema_url(self): emit_event="true", ) def test_llm_log_uses_expected_schema_url(self): - invocation = LLMInvocation( - request_model="schema-model", - provider="schema-provider", + invocation = self.telemetry_handler.start_inference( + "schema-provider", request_model="schema-model" ) - - self.telemetry_handler.start_llm(invocation) invocation.output_messages = [_create_output_message()] - self.telemetry_handler.stop_llm(invocation) + invocation.stop() logs = self.log_exporter.get_finished_logs() self.assertEqual(len(logs), 1) @@ -487,20 +465,14 @@ def test_parent_child_span_relationship(self): message = _create_input_message("hi") chat_generation = _create_output_message("ok") - with self.telemetry_handler.llm() as parent_invocation: - for attr, value in { - "request_model": "parent-model", - "input_messages": [message], - "provider": "test-provider", - }.items(): - setattr(parent_invocation, attr, value) - with self.telemetry_handler.llm() as child_invocation: - for attr, value in { - "request_model": "child-model", - "input_messages": [message], - "provider": "test-provider", - }.items(): - setattr(child_invocation, attr, value) + with self.telemetry_handler.inference( + "test-provider", request_model="parent-model" + ) as parent_invocation: + parent_invocation.input_messages = [message] + with self.telemetry_handler.inference( + "test-provider", request_model="child-model" + ) as child_invocation: + child_invocation.input_messages = [message] # Stop child first by exiting inner context child_invocation.output_messages = [chat_generation] # Then stop parent by exiting outer context @@ -527,23 +499,18 @@ def test_parent_child_span_relationship(self): emit_event="", ) def test_embedding_parent_child_span_relationship(self): - parent_invocation = EmbeddingInvocation( - request_model="embed-parent-model", - provider="test-provider", - input_tokens=10, - ) - child_invocation = EmbeddingInvocation( - request_model="embed-child-model", - provider="test-provider", - input_tokens=5, + parent_invocation = self.telemetry_handler.start_embedding( + "test-provider", request_model="embed-parent-model" ) - - self.telemetry_handler.start(parent_invocation) + parent_invocation.input_tokens = 10 assert parent_invocation.span is not None - self.telemetry_handler.start(child_invocation) + child_invocation = self.telemetry_handler.start_embedding( + "test-provider", request_model="embed-child-model" + ) + child_invocation.input_tokens = 5 assert child_invocation.span is not None - self.telemetry_handler.stop(child_invocation) - self.telemetry_handler.stop(parent_invocation) + child_invocation.stop() + parent_invocation.stop() spans = self.span_exporter.get_finished_spans() assert len(spans) == 2 @@ -567,22 +534,17 @@ def test_embedding_parent_child_span_relationship(self): def test_llm_parent_embedding_child_span_relationship(self): message = _create_input_message("hi") chat_generation = _create_output_message("ok") - child_invocation = EmbeddingInvocation( - request_model="embed-child-model", - provider="test-provider", - input_tokens=3, - ) - with self.telemetry_handler.llm() as parent_invocation: - for attr, value in { - "request_model": "parent-model", - "input_messages": [message], - "provider": "test-provider", - }.items(): - setattr(parent_invocation, attr, value) - self.telemetry_handler.start(child_invocation) + with self.telemetry_handler.inference( + "test-provider", request_model="parent-model" + ) as parent_invocation: + parent_invocation.input_messages = [message] + child_invocation = self.telemetry_handler.start_embedding( + "test-provider", request_model="embed-child-model" + ) + child_invocation.input_tokens = 3 assert child_invocation.span is not None - self.telemetry_handler.stop(child_invocation) + child_invocation.stop() parent_invocation.output_messages = [chat_generation] spans = self.span_exporter.get_finished_spans() @@ -604,14 +566,12 @@ class BoomError(RuntimeError): pass message = _create_input_message("hi", role="user") - invocation = LLMInvocation( - request_model="test-model", - input_messages=[message], - provider="test-provider", - ) with self.assertRaises(BoomError): - with self.telemetry_handler.llm(invocation): + with self.telemetry_handler.inference( + "test-provider", request_model="test-model" + ) as invocation: + invocation.input_messages = [message] for attr, value in { "max_tokens": 128, "seed": 123, @@ -651,18 +611,16 @@ def test_embedding_context_manager_error_path_records_error_status_and_attrs( class BoomError(RuntimeError): pass - invocation = EmbeddingInvocation( - request_model="embed-model", - provider="test-provider", - dimension_count=1536, - input_tokens=7, - server_address="embed.example.com", - server_port=443, - attributes={"custom_embed_attr": "value"}, - ) - with self.assertRaises(BoomError): - with self.telemetry_handler.embedding(invocation): + with self.telemetry_handler.embedding( + "test-provider", + request_model="embed-model", + server_address="embed.example.com", + server_port=443, + ) as invocation: + invocation.dimension_count = 1536 + invocation.input_tokens = 7 + invocation.attributes["custom_embed_attr"] = "value" invocation.response_model_name = "embed-response-model" raise BoomError("embedding boom") @@ -692,22 +650,20 @@ class BoomError(RuntimeError): emit_event="", ) def test_embedding_manual_start_and_stop_creates_span(self): - invocation = EmbeddingInvocation( + invocation = self.telemetry_handler.start_embedding( + "test-provider", request_model="embed-model", - provider="test-provider", - dimension_count=1536, - encoding_formats=["float"], - input_tokens=123, server_address="custom.server.com", server_port=42, - attributes={"custom_embed_attr": "value"}, ) - - self.telemetry_handler.start(invocation) + invocation.dimension_count = 1536 + invocation.encoding_formats = ["float"] + invocation.input_tokens = 123 + invocation.attributes["custom_embed_attr"] = "value" assert invocation.span is not None invocation.attributes.update({"extra_embed": "info"}) invocation.metric_attributes = {"should not be on span": "value"} - self.telemetry_handler.stop(invocation) + invocation.stop() span = _get_single_span(self.span_exporter) self.assertEqual(span.name, "embeddings embed-model") @@ -731,6 +687,23 @@ def test_embedding_manual_start_and_stop_creates_span(self): }, ) + def test_fail_with_exception_sets_error_status_and_type(self): + class BoomError(RuntimeError): + pass + + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="test-model" + ) + invocation.fail(BoomError("boom")) + + span = _get_single_span(self.span_exporter) + assert span.status.status_code == StatusCode.ERROR + assert span.status.description == "boom" + assert ( + _get_span_attributes(span)[error_attributes.ERROR_TYPE] + == BoomError.__qualname__ + ) + class AnyNonNone: def __eq__(self, other): diff --git a/util/opentelemetry-util-genai/tests/test_utils_events.py b/util/opentelemetry-util-genai/tests/test_utils_events.py index 20b3300c62..7febea44c6 100644 --- a/util/opentelemetry-util-genai/tests/test_utils_events.py +++ b/util/opentelemetry-util-genai/tests/test_utils_events.py @@ -30,7 +30,7 @@ ) from opentelemetry.semconv.attributes import error_attributes from opentelemetry.util.genai.handler import get_telemetry_handler -from opentelemetry.util.genai.types import Error, LLMInvocation +from opentelemetry.util.genai.types import Error from .test_utils import ( _create_input_message, @@ -72,22 +72,19 @@ def tearDown(self): emit_event="true", ) def test_emits_llm_event(self): - invocation = LLMInvocation( - request_model="event-model", - input_messages=[_create_input_message("test query")], - system_instruction=_create_system_instruction(), - provider="test-provider", - temperature=0.7, - max_tokens=100, - response_model_name="response-model", - response_id="event-response-id", - input_tokens=10, - output_tokens=20, + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="event-model" ) - - self.telemetry_handler.start_llm(invocation) + invocation.input_messages = [_create_input_message("test query")] + invocation.system_instruction = _create_system_instruction() + invocation.temperature = 0.7 + invocation.max_tokens = 100 + invocation.response_model_name = "response-model" + invocation.response_id = "event-response-id" + invocation.input_tokens = 10 + invocation.output_tokens = 20 invocation.output_messages = [_create_output_message("test response")] - self.telemetry_handler.stop_llm(invocation) + invocation.stop() # Check that event was emitted logs = self.log_exporter.get_finished_logs() @@ -157,16 +154,13 @@ def test_emits_llm_event_and_span(self): chat_generation = _create_output_message("combined response") system_instruction = _create_system_instruction("System prompt here") - invocation = LLMInvocation( - request_model="combined-model", - input_messages=[message], - system_instruction=system_instruction, - provider="test-provider", + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="combined-model" ) - - self.telemetry_handler.start_llm(invocation) + invocation.input_messages = [message] + invocation.system_instruction = system_instruction invocation.output_messages = [chat_generation] - self.telemetry_handler.stop_llm(invocation) + invocation.stop() # Check span was created span = _get_single_span(self.span_exporter) @@ -217,15 +211,12 @@ class TestError(RuntimeError): pass message = _create_input_message("error test") - invocation = LLMInvocation( - request_model="error-model", - input_messages=[message], - provider="test-provider", + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="error-model" ) - - self.telemetry_handler.start_llm(invocation) + invocation.input_messages = [message] error = Error(message="Test error occurred", type=TestError) - self.telemetry_handler.fail_llm(invocation, error) + invocation.fail(error) # Check event was emitted logs = self.log_exporter.get_finished_logs() @@ -256,15 +247,12 @@ def test_does_not_emit_llm_event_when_emit_event_false(self): message = _create_input_message("emit false test") chat_generation = _create_output_message("emit false response") - invocation = LLMInvocation( - request_model="emit-false-model", - input_messages=[message], - provider="test-provider", + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="emit-false-model" ) - - self.telemetry_handler.start_llm(invocation) + invocation.input_messages = [message] invocation.output_messages = [chat_generation] - self.telemetry_handler.stop_llm(invocation) + invocation.stop() # Check no event was emitted logs = self.log_exporter.get_finished_logs() @@ -277,17 +265,14 @@ def test_does_not_emit_llm_event_when_emit_event_false(self): ) def test_does_not_emit_llm_event_by_default_for_no_content(self): """Test that event is not emitted by default when content_capturing is NO_CONTENT and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" - invocation = LLMInvocation( - request_model="default-model", - input_messages=[_create_input_message("default test")], - provider="test-provider", + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="default-model" ) - - self.telemetry_handler.start_llm(invocation) + invocation.input_messages = [_create_input_message("default test")] invocation.output_messages = [ _create_output_message("default response") ] - self.telemetry_handler.stop_llm(invocation) + invocation.stop() # Check that no event was emitted (NO_CONTENT defaults to False) logs = self.log_exporter.get_finished_logs() @@ -300,17 +285,14 @@ def test_does_not_emit_llm_event_by_default_for_no_content(self): ) def test_does_not_emit_llm_event_by_default_for_span_only(self): """Test that event is not emitted by default when content_capturing is SPAN_ONLY and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" - invocation = LLMInvocation( - request_model="default-model", - input_messages=[_create_input_message("default test")], - provider="test-provider", + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="default-model" ) - - self.telemetry_handler.start_llm(invocation) + invocation.input_messages = [_create_input_message("default test")] invocation.output_messages = [ _create_output_message("default response") ] - self.telemetry_handler.stop_llm(invocation) + invocation.stop() # Check that no event was emitted (SPAN_ONLY defaults to False) logs = self.log_exporter.get_finished_logs() @@ -323,17 +305,14 @@ def test_does_not_emit_llm_event_by_default_for_span_only(self): ) def test_emits_llm_event_by_default_for_event_only(self): """Test that event is emitted by default when content_capturing is EVENT_ONLY and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" - invocation = LLMInvocation( - request_model="default-model", - input_messages=[_create_input_message("default test")], - provider="test-provider", + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="default-model" ) - - self.telemetry_handler.start_llm(invocation) + invocation.input_messages = [_create_input_message("default test")] invocation.output_messages = [ _create_output_message("default response") ] - self.telemetry_handler.stop_llm(invocation) + invocation.stop() # Check that event was emitted (EVENT_ONLY defaults to True) logs = self.log_exporter.get_finished_logs() @@ -354,16 +333,13 @@ def test_emits_llm_event_by_default_for_span_and_event(self): chat_generation = _create_output_message("span and event response") system_instruction = _create_system_instruction("System prompt") - invocation = LLMInvocation( - request_model="span-and-event-model", - input_messages=[message], - system_instruction=system_instruction, - provider="test-provider", + invocation = self.telemetry_handler.start_inference( + "test-provider", request_model="span-and-event-model" ) - - self.telemetry_handler.start_llm(invocation) + invocation.input_messages = [message] + invocation.system_instruction = system_instruction invocation.output_messages = [chat_generation] - self.telemetry_handler.stop_llm(invocation) + invocation.stop() # Check span was created span = _get_single_span(self.span_exporter) diff --git a/util/opentelemetry-util-genai/tests/test_workflow_invocation.py b/util/opentelemetry-util-genai/tests/test_workflow_invocation.py index b9c0e756da..3a80f1c214 100644 --- a/util/opentelemetry-util-genai/tests/test_workflow_invocation.py +++ b/util/opentelemetry-util-genai/tests/test_workflow_invocation.py @@ -1,29 +1,50 @@ +import unittest + +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.trace import INVALID_SPAN +from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( InputMessage, OutputMessage, Text, - WorkflowInvocation, ) -class TestWorkflowInvocation: # pylint: disable=no-self-use +class TestWorkflowInvocation(unittest.TestCase): + def setUp(self): + self.span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + self.handler = TelemetryHandler(tracer_provider=tracer_provider) + def test_default_values(self): - invocation = WorkflowInvocation() - assert invocation.name == "" - assert invocation.operation_name == "invoke_workflow" + invocation = self.handler.start_workflow(name=None) + invocation.stop() + assert invocation.name is None + assert invocation._operation_name == "invoke_workflow" assert not invocation.input_messages assert not invocation.output_messages - assert invocation.span is None - assert invocation.context_token is None + assert invocation.span is not INVALID_SPAN assert not invocation.attributes def test_custom_name(self): - invocation = WorkflowInvocation(name="customer_support_pipeline") + invocation = self.handler.start_workflow( + name="customer_support_pipeline" + ) + invocation.stop() assert invocation.name == "customer_support_pipeline" def test_with_input_messages(self): msg = InputMessage(role="user", parts=[Text(content="hello")]) - invocation = WorkflowInvocation(input_messages=[msg]) + invocation = self.handler.start_workflow(name="test") + invocation.input_messages = [msg] + invocation.stop() assert len(invocation.input_messages) == 1 assert invocation.input_messages[0].role == "user" @@ -31,26 +52,36 @@ def test_with_output_messages(self): msg = OutputMessage( role="assistant", parts=[Text(content="hi")], finish_reason="stop" ) - invocation = WorkflowInvocation(output_messages=[msg]) + invocation = self.handler.start_workflow(name="test") + invocation.output_messages = [msg] + invocation.stop() assert len(invocation.output_messages) == 1 assert invocation.output_messages[0].finish_reason == "stop" def test_inherits_genai_invocation(self): - invocation = WorkflowInvocation(attributes={"key": "value"}) - assert invocation.attributes == {"key": "value"} + invocation = self.handler.start_workflow(name="test") + invocation.attributes["key"] = "value" + invocation.stop() + spans = self.span_exporter.get_finished_spans() + assert spans[0].attributes is not None + assert spans[0].attributes["key"] == "value" def test_default_lists_are_independent(self): - """Ensure default factory creates separate list instances.""" - inv1 = WorkflowInvocation() - inv2 = WorkflowInvocation() + """Ensure separate invocations get separate list instances.""" + inv1 = self.handler.start_workflow(name=None) + inv2 = self.handler.start_workflow(name=None) inv1.input_messages.append(InputMessage(role="user", parts=[])) assert len(inv2.input_messages) == 0 + inv1.stop() + inv2.stop() def test_default_attributes_are_independent(self): - inv1 = WorkflowInvocation() - inv2 = WorkflowInvocation() + inv1 = self.handler.start_workflow(name=None) + inv2 = self.handler.start_workflow(name=None) inv1.attributes["foo"] = "bar" assert "foo" not in inv2.attributes + inv1.stop() + inv2.stop() def test_full_construction(self): inp = InputMessage(role="user", parts=[Text(content="query")]) @@ -59,12 +90,10 @@ def test_full_construction(self): parts=[Text(content="answer")], finish_reason="stop", ) - invocation = WorkflowInvocation( - name="my_workflow", - operation_name="invoke_workflow", - input_messages=[inp], - output_messages=[out], - ) + invocation = self.handler.start_workflow(name="my_workflow") + invocation.input_messages = [inp] + invocation.output_messages = [out] + invocation.stop() assert invocation.name == "my_workflow" assert len(invocation.input_messages) == 1 assert len(invocation.output_messages) == 1