diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 40b05bb359..c4d57c28b6 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,7 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased - +- Add metrics support for EmbeddingInvocation + ([#4377](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4377)) +- Add metrics support for EmbeddingInvocation + ([#5022](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/5022)) - Add support for workflow in genAI utils handler. ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4366](#4366)) - Enrich ToolCall type, breaking change: usage of ToolCall class renamed to ToolCallRequest diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 858e8a8237..8207af977c 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -150,9 +150,9 @@ def __init__( schema_url=schema_url, ) - def _record_llm_metrics( + def _record_metrics( self, - invocation: LLMInvocation, + invocation: GenAIInvocation, span: Span | None = None, *, error_type: str | None = None, @@ -165,18 +165,6 @@ def _record_llm_metrics( error_type=error_type, ) - @staticmethod - def _record_embedding_metrics( - invocation: EmbeddingInvocation, - span: Span | None = None, - *, - error_type: str | None = None, - ) -> None: - # Metrics recorder currently supports LLMInvocation fields only. - # Keep embedding metrics as a no-op until dedicated embedding - # metric support is added. - return - def _start(self, invocation: _T) -> _T: """Start a GenAI invocation and create a pending span entry.""" if isinstance(invocation, LLMInvocation): @@ -214,11 +202,11 @@ def _stop(self, invocation: _T) -> _T: try: if isinstance(invocation, LLMInvocation): _apply_llm_finish_attributes(span, invocation) - self._record_llm_metrics(invocation, span) + self._record_metrics(invocation, span) _maybe_emit_llm_event(self._logger, span, invocation) elif isinstance(invocation, EmbeddingInvocation): _apply_embedding_finish_attributes(span, invocation) - self._record_embedding_metrics(invocation, span) + self._record_metrics(invocation, span) elif isinstance(invocation, WorkflowInvocation): _apply_workflow_finish_attributes(span, invocation) # TODO: Add workflow metrics when supported @@ -240,18 +228,14 @@ def _fail(self, invocation: _T, error: Error) -> _T: if isinstance(invocation, LLMInvocation): _apply_llm_finish_attributes(span, invocation) _apply_error_attributes(span, error, error_type) - self._record_llm_metrics( - invocation, span, error_type=error_type - ) + self._record_metrics(invocation, span, error_type=error_type) _maybe_emit_llm_event( self._logger, span, invocation, error_type ) elif isinstance(invocation, EmbeddingInvocation): _apply_embedding_finish_attributes(span, invocation) _apply_error_attributes(span, error, error_type) - self._record_embedding_metrics( - invocation, span, error_type=error_type - ) + self._record_metrics(invocation, span, error_type=error_type) elif isinstance(invocation, WorkflowInvocation): _apply_workflow_finish_attributes(span, invocation) _apply_error_attributes(span, error, error_type) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py index 075cbe60a1..e2ebad6409 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py @@ -1,4 +1,4 @@ -"""Helpers for emitting GenAI metrics from LLM invocations.""" +"""Helpers for emitting GenAI metrics from invocations.""" from __future__ import annotations @@ -18,7 +18,11 @@ create_duration_histogram, create_token_histogram, ) -from opentelemetry.util.genai.types import LLMInvocation +from opentelemetry.util.genai.types import ( + EmbeddingInvocation, + GenAIInvocation, + LLMInvocation, +) from opentelemetry.util.types import AttributeValue @@ -29,57 +33,65 @@ def __init__(self, meter: Meter): self._duration_histogram: Histogram = create_duration_histogram(meter) self._token_histogram: Histogram = create_token_histogram(meter) + @staticmethod + def _build_attributes( + invocation: GenAIInvocation, + error_type: Optional[str] = None, + ) -> Dict[str, AttributeValue]: + """Build metric attributes from an invocation.""" + attributes: Dict[str, AttributeValue] = {} + + if invocation.operation_name: + attributes[GenAI.GEN_AI_OPERATION_NAME] = invocation.operation_name + + request_model = getattr(invocation, "request_model", None) + if request_model: + attributes[GenAI.GEN_AI_REQUEST_MODEL] = request_model + + if invocation.provider: + attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider + + response_model_name = getattr(invocation, "response_model_name", None) + if response_model_name: + attributes[GenAI.GEN_AI_RESPONSE_MODEL] = response_model_name + + server_address = getattr(invocation, "server_address", None) + if server_address: + attributes[server_attributes.SERVER_ADDRESS] = server_address + + server_port = getattr(invocation, "server_port", None) + if server_port is not None: + attributes[server_attributes.SERVER_PORT] = server_port + + if invocation.metric_attributes: + attributes.update(invocation.metric_attributes) + + if error_type: + attributes[error_attributes.ERROR_TYPE] = error_type + + return attributes + def record( self, span: Optional[Span], - invocation: LLMInvocation, + invocation: GenAIInvocation, *, error_type: Optional[str] = None, ) -> None: - """Record duration and token metrics for an invocation if possible.""" + """Record duration and token metrics for an invocation if possible. + + For LLMInvocation: records duration and token (input/output) metrics. + For EmbeddingInvocation: records duration only. + """ # pylint: disable=too-many-branches if span is None: return - token_counts: list[tuple[int, str]] = [] - if invocation.input_tokens is not None: - token_counts.append( - ( - invocation.input_tokens, - GenAI.GenAiTokenTypeValues.INPUT.value, - ) - ) - if invocation.output_tokens is not None: - token_counts.append( - ( - invocation.output_tokens, - GenAI.GenAiTokenTypeValues.OUTPUT.value, - ) - ) - - attributes: Dict[str, AttributeValue] = { - GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.CHAT.value - } - if invocation.request_model: - attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model - if invocation.provider: - attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider - if invocation.response_model_name: - attributes[GenAI.GEN_AI_RESPONSE_MODEL] = ( - invocation.response_model_name - ) - if invocation.server_address: - attributes[server_attributes.SERVER_ADDRESS] = ( - invocation.server_address - ) - if invocation.server_port is not None: - attributes[server_attributes.SERVER_PORT] = invocation.server_port - if invocation.metric_attributes: - attributes.update(invocation.metric_attributes) + attributes = self._build_attributes(invocation, error_type) - # Calculate duration from span timing or invocation monotonic start + # Calculate duration from invocation monotonic start duration_seconds: Optional[float] = None if invocation.monotonic_start_s is not None: duration_seconds = max( @@ -88,8 +100,6 @@ def record( ) span_context = set_span_in_context(span) - if error_type: - attributes[error_attributes.ERROR_TYPE] = error_type if duration_seconds is not None: self._duration_histogram.record( @@ -98,12 +108,29 @@ def record( context=span_context, ) - for token_count, token_type in token_counts: - self._token_histogram.record( - token_count, - attributes=attributes | {GenAI.GEN_AI_TOKEN_TYPE: token_type}, - context=span_context, - ) + # Record token metrics for LLMInvocation and EmbeddingInvocation + if isinstance(invocation, (LLMInvocation, EmbeddingInvocation)): + if invocation.input_tokens is not None: + self._token_histogram.record( + invocation.input_tokens, + attributes=attributes + | { + GenAI.GEN_AI_TOKEN_TYPE: GenAI.GenAiTokenTypeValues.INPUT.value + }, + context=span_context, + ) + + # Only LLMInvocation has output tokens + if isinstance(invocation, LLMInvocation): + if invocation.output_tokens is not None: + self._token_histogram.record( + invocation.output_tokens, + attributes=attributes + | { + GenAI.GEN_AI_TOKEN_TYPE: GenAI.GenAiTokenTypeValues.OUTPUT.value + }, + context=span_context, + ) __all__ = ["InvocationMetricsRecorder"] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 6d59f03bf5..35f8473f0c 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -256,9 +256,18 @@ def _new_str_any_dict() -> dict[str, Any]: @dataclass class GenAIInvocation: + operation_name: str = "" + provider: str | None = None context_token: ContextToken | None = None span: Span | None = None attributes: dict[str, Any] = field(default_factory=_new_str_any_dict) + metric_attributes: dict[str, Any] = field( + default_factory=_new_str_any_dict + ) + """ + Additional attributes to set on metrics. Must be of a low cardinality. + These attributes will not be set on spans or events. + """ error_type: str | None = None monotonic_start_s: float | None = None @@ -319,13 +328,6 @@ class LLMInvocation(GenAIInvocation): Additional attributes to set on spans and/or events. These attributes will not be set on metrics. """ - metric_attributes: dict[str, Any] = field( - default_factory=_new_str_any_dict - ) - """ - Additional attributes to set on metrics. Must be of a low cardinality. - These attributes will not be set on spans or events. - """ temperature: float | None = None top_p: float | None = None frequency_penalty: float | None = None @@ -364,14 +366,6 @@ class EmbeddingInvocation(GenAIInvocation): will not be set on metrics. """ - metric_attributes: dict[str, Any] = field( - default_factory=_new_str_any_dict - ) - """ - Additional attributes to set on metrics. Must be of a low cardinality. - These attributes will not be set on spans or events. - """ - @dataclass() class ToolCall(GenAIInvocation): @@ -399,6 +393,8 @@ class ToolCall(GenAIInvocation): - error.type: Error type if operation failed (Conditionally Required) """ + operation_name: str = GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + # Message identification fields (same as ToolCallRequest) # Note: These are required fields but must have defaults due to dataclass inheritance name: str = "" diff --git a/util/opentelemetry-util-genai/tests/test_handler_metrics.py b/util/opentelemetry-util-genai/tests/test_handler_metrics.py index b45383abfe..3b16a1ec1d 100644 --- a/util/opentelemetry-util-genai/tests/test_handler_metrics.py +++ b/util/opentelemetry-util-genai/tests/test_handler_metrics.py @@ -9,7 +9,11 @@ from opentelemetry.semconv.schemas import Schemas from opentelemetry.test.test_base import TestBase from opentelemetry.util.genai.handler import TelemetryHandler -from opentelemetry.util.genai.types import Error, LLMInvocation +from opentelemetry.util.genai.types import ( + EmbeddingInvocation, + Error, + LLMInvocation, +) _DEFAULT_SCHEMA_URL = Schemas.V1_37_0.value @@ -181,3 +185,149 @@ def _assert_metric_scope_schema_urls( self.assertEqual( scope_metric.scope.schema_url, expected_schema_url ) + + def test_stop_embedding_records_duration_and_tokens(self) -> None: + """Verify embedding invocations record duration and input token metrics.""" + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + invocation = EmbeddingInvocation( + request_model="embed-model", provider="embed-prov" + ) + invocation.input_tokens = 100 + # Patch default_timer during start to ensure monotonic_start_s + with patch("timeit.default_timer", return_value=1000.0): + handler.start(invocation) + + # Simulate 1.5 seconds of elapsed monotonic time + with patch("timeit.default_timer", return_value=1001.5): + handler.stop(invocation) + + self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL) + metrics = self._harvest_metrics() + + # Duration should be recorded + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_points = metrics["gen_ai.client.operation.duration"] + self.assertEqual(len(duration_points), 1) + duration_point = duration_points[0] + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_OPERATION_NAME], + GenAI.GenAiOperationNameValues.EMBEDDINGS.value, + ) + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_REQUEST_MODEL], + "embed-model", + ) + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_PROVIDER_NAME], "embed-prov" + ) + self.assertAlmostEqual(duration_point.sum, 1.5, places=3) + + # Token metrics should be recorded for embedding (input only) + self.assertIn("gen_ai.client.token.usage", metrics) + token_points = metrics["gen_ai.client.token.usage"] + self.assertEqual(len(token_points), 1) # Only input tokens + token_point = token_points[0] + self.assertEqual( + token_point.attributes[GenAI.GEN_AI_TOKEN_TYPE], + GenAI.GenAiTokenTypeValues.INPUT.value, + ) + self.assertAlmostEqual(token_point.sum, 100.0, places=3) + + def test_stop_embedding_records_duration_with_additional_attributes( + self, + ) -> None: + """Verify embedding metrics include server and custom attributes.""" + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + invocation = EmbeddingInvocation( + request_model="embed-model", + provider="embed-prov", + server_address="embed.server.com", + server_port=8080, + ) + handler.start(invocation) + invocation.metric_attributes = {"custom.embed.attr": "embed_value"} + invocation.response_model_name = "embed-response-model" + handler.stop(invocation) + + self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL) + metrics = self._harvest_metrics() + + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_points = metrics["gen_ai.client.operation.duration"] + self.assertEqual(len(duration_points), 1) + duration_point = duration_points[0] + + self.assertEqual( + duration_point.attributes["server.address"], "embed.server.com" + ) + self.assertEqual(duration_point.attributes["server.port"], 8080) + self.assertEqual( + duration_point.attributes["custom.embed.attr"], "embed_value" + ) + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_RESPONSE_MODEL], + "embed-response-model", + ) + + def test_fail_embedding_records_error_and_duration(self) -> None: + """Verify embedding failure records error type and duration.""" + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + invocation = EmbeddingInvocation( + request_model="embed-err-model", provider="embed-prov" + ) + with patch("timeit.default_timer", return_value=3000.0): + handler.start(invocation) + + error = Error(message="embedding failed", type=RuntimeError) + with patch("timeit.default_timer", return_value=3002.5): + handler.fail(invocation, error) + + self._assert_metric_scope_schema_urls(_DEFAULT_SCHEMA_URL) + metrics = self._harvest_metrics() + + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_points = metrics["gen_ai.client.operation.duration"] + self.assertEqual(len(duration_points), 1) + duration_point = duration_points[0] + + self.assertEqual( + duration_point.attributes.get("error.type"), "RuntimeError" + ) + self.assertEqual( + duration_point.attributes.get(GenAI.GEN_AI_REQUEST_MODEL), + "embed-err-model", + ) + self.assertAlmostEqual(duration_point.sum, 2.5, places=3) + + # Token metrics should NOT be recorded when input_tokens is not set + self.assertNotIn("gen_ai.client.token.usage", metrics) + + def test_stop_embedding_without_tokens(self) -> None: + """Verify embedding without input_tokens does not record token metrics.""" + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + invocation = EmbeddingInvocation( + request_model="embed-model", provider="embed-prov" + ) + # input_tokens is not set + handler.start(invocation) + handler.stop(invocation) + + metrics = self._harvest_metrics() + + # Duration should be recorded + self.assertIn("gen_ai.client.operation.duration", metrics) + + # Token metrics should NOT be recorded when input_tokens is not set + self.assertNotIn("gen_ai.client.token.usage", metrics)