diff --git a/instrumentation-genai/AGENTS.md b/instrumentation-genai/AGENTS.md index 5e602be269..6977c61514 100644 --- a/instrumentation-genai/AGENTS.md +++ b/instrumentation-genai/AGENTS.md @@ -19,7 +19,32 @@ This layer is responsible only for: Everything else (span creation, metric recording, event emission, context propagation) belongs in `util/opentelemetry-util-genai`. -## 2. Invocation Pattern +## 2. TelemetryHandler Initialization + +Construct `TelemetryHandler` once inside `_instrument()`, passing all OTel providers and the +completion hook. Always prefer an explicitly injected hook (`kwargs.get("completion_hook")`) +over the entry-point hook loaded by `load_completion_hook()`, so test code can override the +hook without touching the environment. + +```python +from opentelemetry.util.genai.completion_hook import load_completion_hook +from opentelemetry.util.genai.handler import TelemetryHandler + +def _instrument(self, **kwargs): + tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") + logger_provider = kwargs.get("logger_provider") + + handler = TelemetryHandler( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + logger_provider=logger_provider, + completion_hook=kwargs.get("completion_hook") or load_completion_hook(), + ) + # pass handler to each patch/wrapper function +``` + +## 3. Invocation Pattern Use `start_*()` and control span lifetime manually: diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md index 87dc9461e7..2cbe75facb 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md @@ -28,6 +28,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add strongly typed Responses API extractors with validation and content extraction improvements ([#4337](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4337)) +- Add completion hook support. + ([#4315](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4315)) +- Fix `response_format` handling: map `json_object`/`json_schema` to `json` output type. + ([#4315](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4315)) +- Skip attribute values with `openai.Omit` value. + ([#4315](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4315)) ## Version 2.3b0 (2025-12-24) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py index f001812def..0ecd87013f 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py @@ -52,14 +52,11 @@ from opentelemetry.metrics import get_meter from opentelemetry.semconv.schemas import Schemas from opentelemetry.trace import get_tracer +from opentelemetry.util.genai.completion_hook import load_completion_hook from opentelemetry.util.genai.handler import ( TelemetryHandler, ) -from opentelemetry.util.genai.types import ContentCapturingMode -from opentelemetry.util.genai.utils import ( - get_content_capturing_mode, - is_experimental_mode, -) +from opentelemetry.util.genai.utils import is_experimental_mode from .instruments import Instruments from .patch import ( @@ -107,22 +104,19 @@ def _instrument(self, **kwargs): instruments = Instruments(self._meter) - content_mode = ( - get_content_capturing_mode() - if latest_experimental_enabled - else ContentCapturingMode.NO_CONTENT - ) handler = TelemetryHandler( tracer_provider=tracer_provider, meter_provider=meter_provider, logger_provider=logger_provider, + completion_hook=kwargs.get("completion_hook") + or load_completion_hook(), ) wrap_function_wrapper( "openai.resources.chat.completions", "Completions.create", ( - chat_completions_create_v_new(handler, content_mode) + chat_completions_create_v_new(handler) if latest_experimental_enabled else chat_completions_create_v_old( tracer, logger, instruments, is_content_enabled() @@ -134,7 +128,7 @@ def _instrument(self, **kwargs): "openai.resources.chat.completions", "AsyncCompletions.create", ( - async_chat_completions_create_v_new(handler, content_mode) + async_chat_completions_create_v_new(handler) if latest_experimental_enabled else async_chat_completions_create_v_old( tracer, logger, instruments, is_content_enabled() diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py index 30f74cfab1..5793ecfc35 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py @@ -34,7 +34,6 @@ from opentelemetry.trace.propagation import set_span_in_context from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( - ContentCapturingMode, Error, LLMInvocation, # pylint: disable=no-name-in-module # TODO: migrate to InferenceInvocation OutputMessage, @@ -121,11 +120,9 @@ def traced_method(wrapped, instance, args, kwargs): def chat_completions_create_v_new( handler: TelemetryHandler, - content_capturing_mode: ContentCapturingMode, ): """Wrap the `create` method of the `ChatCompletion` class to trace it.""" - - capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + capture_content = handler.should_capture_content() def traced_method(wrapped, instance, args, kwargs): chat_invocation = handler.start_llm( @@ -226,10 +223,9 @@ async def traced_method(wrapped, instance, args, kwargs): def async_chat_completions_create_v_new( handler: TelemetryHandler, - content_capturing_mode: ContentCapturingMode, ): """Wrap the `create` method of the `AsyncChatCompletion` class to trace it.""" - capture_content = content_capturing_mode != ContentCapturingMode.NO_CONTENT + capture_content = handler.should_capture_content() async def traced_method(wrapped, instance, args, kwargs): chat_invocation = handler.start_llm( diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py index 128697fe12..307fe08110 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/utils.py @@ -19,6 +19,7 @@ from typing import Any, Iterable, List, Mapping from urllib.parse import urlparse +import openai from httpx import URL from openai import NotGiven @@ -48,6 +49,8 @@ ToolCallResponse, ) +_OpenAIOmit = getattr(openai, "Omit", None) + def is_content_enabled() -> bool: capture_content = environ.get( @@ -201,9 +204,17 @@ def non_numerical_value_is_set(value: bool | str | NotGiven | None): def value_is_set(value): + if _OpenAIOmit is not None and isinstance(value, _OpenAIOmit): + return False return value is not None and not isinstance(value, NotGiven) +def _openai_response_format_to_output_type(response_format_type: str) -> str: + if response_format_type in ("json_object", "json_schema"): + return "json" + return response_format_type + + def get_llm_request_attributes( kwargs, client_instance, @@ -282,7 +293,7 @@ def get_llm_request_attributes( attributes[request_response_format_attr_key] = ( response_format_type ) - else: + elif isinstance(response_format, str): attributes[request_response_format_attr_key] = response_format # service_tier can be passed directly or in extra_body (in SDK 1.26.0 it's via extra_body) @@ -374,12 +385,14 @@ def create_chat_invocation( response_format_type := get_value(response_format.get("type")) ) is not None: attributes[GenAIAttributes.GEN_AI_OUTPUT_TYPE] = ( - response_format_type + _openai_response_format_to_output_type( + response_format_type + ) ) - else: - attributes[ - GenAIAttributes.GEN_AI_OPENAI_REQUEST_RESPONSE_FORMAT - ] = response_format + elif isinstance(response_format, str): + attributes[GenAIAttributes.GEN_AI_OUTPUT_TYPE] = ( + _openai_response_format_to_output_type(response_format) + ) # service_tier can be passed directly or in extra_body (in SDK 1.26.0 it's via extra_body) service_tier = get_value(kwargs.get("service_tier")) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/conftest.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/conftest.py index 9c8f87c943..ab1142cec9 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/conftest.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/conftest.py @@ -130,7 +130,10 @@ def fixture_content_mode(request): @pytest.fixture(scope="function") def instrument_no_content( - tracer_provider, logger_provider, meter_provider, content_mode + tracer_provider, + logger_provider, + meter_provider, + content_mode, ): _OpenTelemetrySemanticConventionStability._initialized = False latest_experimental_enabled, _ = content_mode diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/requirements.latest.txt b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/requirements.latest.txt index 6ed0658634..63de54f8ee 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/requirements.latest.txt +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/requirements.latest.txt @@ -36,7 +36,7 @@ # This variant of the requirements aims to test the system using # the newest supported version of external dependencies. -openai==1.109.1 +openai==2.26.0 pydantic==2.12.5 httpx==0.27.2 # older jiter is required for PyPy < 3.11 diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_completion_hook.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_completion_hook.py new file mode 100644 index 0000000000..ff104078a1 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_completion_hook.py @@ -0,0 +1,129 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from unittest.mock import MagicMock, patch + +from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, +) +from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor +from opentelemetry.util.genai.environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, +) + +from .test_utils import DEFAULT_MODEL, USER_ONLY_PROMPT + + +@patch.dict( + os.environ, + {OTEL_SEMCONV_STABILITY_OPT_IN: "gen_ai_latest_experimental"}, +) +def test_custom_hook_is_called( + span_exporter, + log_exporter, + tracer_provider, + logger_provider, + meter_provider, + openai_client, + vcr, +): + """A hook passed to instrument() is called after each chat completion.""" + hook = MagicMock() + instrumentor = OpenAIInstrumentor() + _OpenTelemetrySemanticConventionStability._initialized = False + instrumentor.instrument( + tracer_provider=tracer_provider, + logger_provider=logger_provider, + meter_provider=meter_provider, + completion_hook=hook, + ) + + try: + with vcr.use_cassette("test_chat_completion_with_content.yaml"): + openai_client.chat.completions.create( + messages=USER_ONLY_PROMPT, + model=DEFAULT_MODEL, + stream=False, + ) + finally: + instrumentor.uninstrument() + + hook.on_completion.assert_called_once() + kwargs = hook.on_completion.call_args.kwargs + assert kwargs["inputs"] + assert kwargs["outputs"] + assert kwargs["span"] is not None + + # Content goes to the hook only — not to span attributes or log records + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span_attrs = spans[0].attributes or {} + assert "gen_ai.input.messages" not in span_attrs + assert "gen_ai.output.messages" not in span_attrs + + assert log_exporter.get_finished_logs() == () + + +@patch.dict( + os.environ, + { + OTEL_SEMCONV_STABILITY_OPT_IN: "gen_ai_latest_experimental", + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "span_only", + }, +) +def test_default_hook_loaded_from_env( + span_exporter, + tracer_provider, + logger_provider, + meter_provider, + openai_client, + vcr, +): + """When no hook kwarg is given, load_completion_hook() provides the default.""" + default_hook = MagicMock() + instrumentor = OpenAIInstrumentor() + with patch( + "opentelemetry.instrumentation.openai_v2.load_completion_hook", + return_value=default_hook, + ): + instrumentor.instrument( + tracer_provider=tracer_provider, + logger_provider=logger_provider, + meter_provider=meter_provider, + # no completion_hook kwarg — should fall back to load_completion_hook() + ) + + try: + with vcr.use_cassette("test_chat_completion_with_content.yaml"): + openai_client.chat.completions.create( + messages=USER_ONLY_PROMPT, + model=DEFAULT_MODEL, + stream=False, + ) + finally: + instrumentor.uninstrument() + + default_hook.on_completion.assert_called_once() + kwargs = default_hook.on_completion.call_args.kwargs + assert kwargs["inputs"] + assert kwargs["outputs"] + assert kwargs["span"] is not None + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span_attrs = spans[0].attributes or {} + assert "gen_ai.input.messages" in span_attrs + assert "gen_ai.output.messages" in span_attrs diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index a24aca39aa..18ef11f778 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 of repeatedly failing on every upload ([#4390](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4390)). - Refactor public API: add factory methods (`start_inference`, `start_embedding`, `start_tool`, `start_workflow`) and invocation-owned lifecycle (`invocation.stop()` / `invocation.fail(exc)`); rename `LLMInvocation` → `InferenceInvocation` and `ToolCall` → `ToolInvocation`. Existing usages remain fully functional via deprecated aliases. ([#4391](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4391)) +- `TelemetryHandler` now accepts a `completion_hook` parameter and calls it after each LLM invocation, passing inputs, outputs, the active span, and the log record. Content capture is enabled automatically when a real hook is configured. + ([#4315](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4315)) - Add metrics to ToolInvocations ([#4443](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4443)) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_embedding_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_embedding_invocation.py index c4a2f2f427..82829d97b2 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_embedding_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_embedding_invocation.py @@ -23,6 +23,7 @@ from opentelemetry.semconv.attributes import server_attributes from opentelemetry.trace import SpanKind, Tracer from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai.completion_hook import CompletionHook from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.types import AttributeValue @@ -34,11 +35,12 @@ class EmbeddingInvocation(GenAIInvocation): context manager rather than constructing this directly. """ - def __init__( + def __init__( # pylint: disable=too-many-locals self, tracer: Tracer, metrics_recorder: InvocationMetricsRecorder, logger: Logger, + completion_hook: CompletionHook, provider: str, *, request_model: str | None = None, @@ -57,6 +59,7 @@ def __init__( tracer, metrics_recorder, logger, + completion_hook, operation_name=_operation_name, span_name=f"{_operation_name} {request_model}" if request_model diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py index a1bd55811c..377df8706f 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py @@ -24,6 +24,9 @@ from opentelemetry.semconv.attributes import server_attributes from opentelemetry.trace import INVALID_SPAN, Span, SpanKind, Tracer from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai.completion_hook import ( + CompletionHook, +) from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.types import ( InputMessage, @@ -51,6 +54,7 @@ def __init__( # pylint: disable=too-many-locals tracer: Tracer, metrics_recorder: InvocationMetricsRecorder, logger: Logger, + completion_hook: CompletionHook, provider: str, *, request_model: str | None = None, @@ -80,6 +84,7 @@ def __init__( # pylint: disable=too-many-locals tracer, metrics_recorder, logger, + completion_hook, operation_name=_operation_name, span_name=f"{_operation_name} {request_model}" if request_model @@ -221,6 +226,18 @@ def _get_metric_token_counts(self) -> dict[str, int]: ) return counts + def _call_completion_hook( + self, + log_record: LogRecord | None, + ) -> None: + self._completion_hook.on_completion( + inputs=self.input_messages, + outputs=self.output_messages, + system_instruction=self.system_instruction, + span=self.span, + log_record=log_record, + ) + def _apply_finish(self, error: Error | None = None) -> None: if error is not None: self._apply_error_attributes(error) @@ -229,26 +246,27 @@ def _apply_finish(self, error: Error | None = None) -> None: attributes.update(self.attributes) self.span.set_attributes(attributes) self._metrics_recorder.record(self) - self._emit_event() + log_record = self._maybe_create_event() + self._call_completion_hook(log_record) + if log_record is not None: + self._logger.emit(log_record) - def _emit_event(self) -> None: + def _maybe_create_event(self) -> LogRecord | None: """Emit a gen_ai.client.inference.operation.details event. For more details, see the semantic convention documentation: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-events.md#event-eventgen_aiclientinferenceoperationdetails """ if not is_experimental_mode() or not should_emit_event(): - return + return None attributes = self._get_attributes() attributes.update(self._get_message_attributes(for_span=False)) attributes.update(self.attributes) - self._logger.emit( - LogRecord( - event_name="gen_ai.client.inference.operation.details", - attributes=attributes, - context=self._span_context, - ) + return LogRecord( + event_name="gen_ai.client.inference.operation.details", + attributes=attributes, + context=self._span_context, ) @@ -293,12 +311,14 @@ def _start_with_handler( tracer: Tracer, metrics_recorder: InvocationMetricsRecorder, logger: Logger, + completion_hook: CompletionHook, ) -> None: """Create and start an InferenceInvocation from this data container. Called by handler.start_llm().""" self._inference_invocation = InferenceInvocation( tracer, metrics_recorder, logger, + completion_hook, self.provider or "", request_model=self.request_model, input_messages=self.input_messages, diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py index b63d9a32b4..4c8eb70f2e 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_invocation.py @@ -28,6 +28,7 @@ from opentelemetry.trace import INVALID_SPAN as _INVALID_SPAN from opentelemetry.trace import Span, SpanKind, Tracer, set_span_in_context from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util.genai.completion_hook import CompletionHook from opentelemetry.util.genai.types import Error if TYPE_CHECKING: @@ -52,6 +53,7 @@ def __init__( tracer: Tracer, metrics_recorder: InvocationMetricsRecorder, logger: Logger, + completion_hook: CompletionHook, operation_name: str, span_name: str, span_kind: SpanKind = SpanKind.CLIENT, @@ -61,6 +63,7 @@ def __init__( self._tracer = tracer self._metrics_recorder = metrics_recorder self._logger = logger + self._completion_hook = completion_hook self._operation_name: str = operation_name self.attributes: dict[str, Any] = ( {} if attributes is None else attributes diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_tool_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_tool_invocation.py index 39edd6b57b..2bb937097c 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_tool_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_tool_invocation.py @@ -22,6 +22,7 @@ ) from opentelemetry.trace import Tracer from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai.completion_hook import CompletionHook from opentelemetry.util.genai.metrics import InvocationMetricsRecorder @@ -50,6 +51,7 @@ def __init__( tracer: Tracer, metrics_recorder: InvocationMetricsRecorder, logger: Logger, + completion_hook: CompletionHook, name: str, *, arguments: Any = None, @@ -66,6 +68,7 @@ def __init__( tracer, metrics_recorder, logger, + completion_hook, operation_name=_operation_name, span_name=f"{_operation_name} {name}" if name else _operation_name, attributes=attributes, diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_workflow_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_workflow_invocation.py index e3b45535d2..8315624733 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_workflow_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_workflow_invocation.py @@ -23,6 +23,7 @@ ) from opentelemetry.trace import SpanKind, Tracer from opentelemetry.util.genai._invocation import Error, GenAIInvocation +from opentelemetry.util.genai.completion_hook import CompletionHook from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.types import ( InputMessage, @@ -51,6 +52,7 @@ def __init__( tracer: Tracer, metrics_recorder: InvocationMetricsRecorder, logger: Logger, + completion_hook: CompletionHook, name: str | None, *, input_messages: list[InputMessage] | None = None, @@ -64,6 +66,7 @@ def __init__( tracer, metrics_recorder, logger, + completion_hook, operation_name=_operation_name, span_name=f"{_operation_name} {name}" if name else _operation_name, span_kind=SpanKind.INTERNAL, @@ -103,6 +106,15 @@ def _get_messages_for_span(self) -> dict[str, Any]: key: value for key, value in optional_attrs if value is not None } + def _call_completion_hook(self) -> None: + self._completion_hook.on_completion( + inputs=self.input_messages, + outputs=self.output_messages, + system_instruction=[], + span=self.span, + log_record=None, + ) + def _apply_finish(self, error: Error | None = None) -> None: attributes: dict[str, Any] = { GenAI.GEN_AI_OPERATION_NAME: self._operation_name @@ -112,4 +124,5 @@ def _apply_finish(self, error: Error | None = None) -> None: self._apply_error_attributes(error) attributes.update(self.attributes) self.span.set_attributes(attributes) + self._call_completion_hook() # TODO: Add workflow metrics when supported diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 9ef4a5592d..27fa0d0650 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -48,7 +48,9 @@ from __future__ import annotations +import os from contextlib import AbstractContextManager +from typing import TYPE_CHECKING from opentelemetry._logs import ( LoggerProvider, @@ -60,10 +62,14 @@ TracerProvider, get_tracer, ) -from opentelemetry.util.genai._inference_invocation import ( - LLMInvocation, -) from opentelemetry.util.genai._invocation import Error +from opentelemetry.util.genai.completion_hook import ( + CompletionHook, + _NoOpCompletionHook, +) +from opentelemetry.util.genai.environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, +) from opentelemetry.util.genai.invocation import ( EmbeddingInvocation, InferenceInvocation, @@ -71,8 +77,16 @@ WorkflowInvocation, ) from opentelemetry.util.genai.metrics import InvocationMetricsRecorder +from opentelemetry.util.genai.types import ContentCapturingMode +from opentelemetry.util.genai.utils import ( + get_content_capturing_mode, + is_experimental_mode, +) from opentelemetry.util.genai.version import __version__ +if TYPE_CHECKING: + from opentelemetry.util.genai.types import LLMInvocation + class TelemetryHandler: """ @@ -85,6 +99,7 @@ def __init__( tracer_provider: TracerProvider | None = None, meter_provider: MeterProvider | None = None, logger_provider: LoggerProvider | None = None, + completion_hook: CompletionHook | None = None, ): schema_url = Schemas.V1_37_0.value self._tracer = get_tracer( @@ -103,6 +118,31 @@ def __init__( logger_provider, schema_url=schema_url, ) + self._completion_hook = completion_hook or _NoOpCompletionHook() + if is_experimental_mode(): + content_enabled = ( + get_content_capturing_mode() != ContentCapturingMode.NO_CONTENT + ) + else: + content_enabled = os.environ.get( + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, "" + ).lower() in ( + "true", + "span_only", + "event_only", + "span_and_event", + ) + self._capture_content = content_enabled or not isinstance( + self._completion_hook, _NoOpCompletionHook + ) + + def should_capture_content(self) -> bool: + """Returns True if content should be captured. + + Content is captured when the content capturing mode requires it, or + when a real completion hook is configured (not a no-op). + """ + return self._capture_content # New-style factory methods: construct + start in one call, handler stored on invocation @@ -123,6 +163,7 @@ def start_inference( self._tracer, self._metrics_recorder, self._logger, + self._completion_hook, provider, request_model=request_model, server_address=server_address, @@ -136,7 +177,10 @@ def start_llm(self, invocation: LLMInvocation) -> LLMInvocation: Use ``handler.start_inference()`` instead. """ invocation._start_with_handler( - self._tracer, self._metrics_recorder, self._logger + self._tracer, + self._metrics_recorder, + self._logger, + self._completion_hook, ) return invocation @@ -157,6 +201,7 @@ def start_embedding( self._tracer, self._metrics_recorder, self._logger, + self._completion_hook, provider, request_model=request_model, server_address=server_address, @@ -181,6 +226,7 @@ def start_tool( self._tracer, self._metrics_recorder, self._logger, + self._completion_hook, name, arguments=arguments, tool_call_id=tool_call_id, @@ -199,7 +245,11 @@ def start_workflow( invocation.stop() or invocation.fail(). """ return WorkflowInvocation( - self._tracer, self._metrics_recorder, self._logger, name + self._tracer, + self._metrics_recorder, + self._logger, + self._completion_hook, + name, ) def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disable=no-self-use @@ -318,6 +368,7 @@ def get_telemetry_handler( tracer_provider: TracerProvider | None = None, meter_provider: MeterProvider | None = None, logger_provider: LoggerProvider | None = None, + completion_hook: CompletionHook | None = None, ) -> TelemetryHandler: """ Returns a singleton TelemetryHandler instance. @@ -330,6 +381,7 @@ def get_telemetry_handler( tracer_provider=tracer_provider, meter_provider=meter_provider, logger_provider=logger_provider, + completion_hook=completion_hook, ) setattr(get_telemetry_handler, "_default_handler", handler) return handler diff --git a/util/opentelemetry-util-genai/tests/test_handler_completion_hook.py b/util/opentelemetry-util-genai/tests/test_handler_completion_hook.py new file mode 100644 index 0000000000..696bb2c6ec --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_handler_completion_hook.py @@ -0,0 +1,318 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from tests.test_utils import patch_env_vars + +from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.util.genai.completion_hook import _NoOpCompletionHook +from opentelemetry.util.genai.environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, + OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT, +) +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + InputMessage, + OutputMessage, + Text, +) + +_EXPERIMENTAL_ENV = { + OTEL_SEMCONV_STABILITY_OPT_IN: "gen_ai_latest_experimental", + OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "EVENT_ONLY", + OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT: "true", +} + + +class TestHandlerCompletionHook(TestCase): # pylint: disable=too-many-public-methods + def setUp(self) -> None: + self.span_exporter = InMemorySpanExporter() + self.tracer_provider = TracerProvider() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + _OpenTelemetrySemanticConventionStability._initialized = False + _OpenTelemetrySemanticConventionStability._initialize() + + def tearDown(self) -> None: + # Reset semconv stability state between tests + _OpenTelemetrySemanticConventionStability._initialized = False + + def _make_handler(self, hook=None): + return TelemetryHandler( + tracer_provider=self.tracer_provider, + completion_hook=hook, + ) + + def test_hook_called_on_stop(self): + hook = MagicMock() + handler = self._make_handler(hook) + + input_messages = [ + InputMessage(role="user", parts=[Text(content="hello")]) + ] + output_messages = [ + OutputMessage( + role="assistant", + parts=[Text(content="hi")], + finish_reason="stop", + ) + ] + system_instruction = [Text(content="be helpful")] + + invocation = handler.start_inference("openai", request_model="gpt-4o") + invocation.input_messages = input_messages + invocation.output_messages = output_messages + invocation.system_instruction = system_instruction + invocation.stop() + + hook.on_completion.assert_called_once() + kwargs = hook.on_completion.call_args.kwargs + self.assertEqual(kwargs["inputs"], input_messages) + self.assertEqual(kwargs["outputs"], output_messages) + self.assertEqual(kwargs["system_instruction"], system_instruction) + self.assertIsNotNone(kwargs["span"]) + + def test_hook_called_on_fail(self): + hook = MagicMock() + handler = self._make_handler(hook) + + input_messages = [ + InputMessage(role="user", parts=[Text(content="hello")]) + ] + + invocation = handler.start_inference("openai", request_model="gpt-4o") + invocation.input_messages = input_messages + invocation.fail(ValueError("boom")) + + hook.on_completion.assert_called_once() + kwargs = hook.on_completion.call_args.kwargs + self.assertEqual(kwargs["inputs"], input_messages) + self.assertIsNotNone(kwargs["span"]) + + def test_hook_not_called_when_not_set(self): + # No hook — stop should not raise + handler = self._make_handler() + handler.start_inference("openai", request_model="gpt-4o").stop() + + def test_log_record_is_none_when_events_disabled(self): + # Default env: no experimental mode, so log_record should be None + hook = MagicMock() + handler = self._make_handler(hook) + + handler.start_inference("openai", request_model="gpt-4o").stop() + + kwargs = hook.on_completion.call_args.kwargs + self.assertIsNone(kwargs["log_record"]) + + @patch.dict(os.environ, _EXPERIMENTAL_ENV) + def test_log_record_passed_when_events_enabled(self): + _OpenTelemetrySemanticConventionStability._initialized = False + _OpenTelemetrySemanticConventionStability._initialize() + + hook = MagicMock() + handler = self._make_handler(hook) + + handler.start_inference("openai", request_model="gpt-4o").stop() + + kwargs = hook.on_completion.call_args.kwargs + self.assertIsNotNone(kwargs["log_record"]) + + @patch.dict(os.environ, _EXPERIMENTAL_ENV) + def test_hook_can_stamp_attrs_on_log_record(self): + # Verify that attrs stamped by the hook are on the same log_record that gets emitted + _OpenTelemetrySemanticConventionStability._initialized = False + _OpenTelemetrySemanticConventionStability._initialize() + + stamped_record = None + + def stamp_ref(*, log_record, **kwargs): + nonlocal stamped_record + stamped_record = log_record + if log_record is not None: + log_record.attributes = { + **(log_record.attributes or {}), + "gen_ai.input_messages_ref": "s3://bucket/inputs.json", + } + + hook = MagicMock(on_completion=stamp_ref) + handler = self._make_handler(hook) + + handler.start_inference("openai", request_model="gpt-4o").stop() + + # The record the hook stamped is the same one that would be emitted + self.assertIsNotNone(stamped_record) + self.assertEqual( + stamped_record.attributes.get("gen_ai.input_messages_ref"), + "s3://bucket/inputs.json", + ) + + @patch.dict( + os.environ, {OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: ""} + ) + def test_should_capture_content_false_by_default(self): + handler = self._make_handler() + self.assertFalse(handler.should_capture_content()) + + def test_should_capture_content_true_when_real_hook_set(self): + # A real (non-noop) hook forces content capture regardless of env vars + hook = MagicMock() + handler = self._make_handler(hook) + self.assertTrue(handler.should_capture_content()) + + @patch.dict( + os.environ, {OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: ""} + ) + def test_should_capture_content_false_when_noop_hook(self): + handler = self._make_handler(_NoOpCompletionHook()) + self.assertFalse(handler.should_capture_content()) + + @patch.dict( + os.environ, + {OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "true"}, + ) + def test_should_capture_content_true_in_legacy_mode_when_content_env_true( + self, + ): + handler = self._make_handler() + self.assertTrue(handler.should_capture_content()) + + @patch.dict( + os.environ, + {OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "false"}, + ) + def test_should_capture_content_false_in_legacy_mode_when_content_env_false( + self, + ): + handler = self._make_handler() + self.assertFalse(handler.should_capture_content()) + + @patch.dict( + os.environ, + {OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "span_only"}, + ) + def test_should_capture_content_true_in_legacy_mode_when_content_env_span_only( + self, + ): + handler = self._make_handler() + self.assertTrue(handler.should_capture_content()) + + @patch.dict( + os.environ, + {OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "event_only"}, + ) + def test_should_capture_content_true_in_legacy_mode_when_content_env_event_only( + self, + ): + handler = self._make_handler() + self.assertTrue(handler.should_capture_content()) + + @patch_env_vars("gen_ai_latest_experimental", "span_only", "false") + def test_should_capture_content_true_in_experimental_mode_with_content( + self, + ): + handler = self._make_handler() + self.assertTrue(handler.should_capture_content()) + + @patch_env_vars("gen_ai_latest_experimental", "no_content", "false") + def test_should_capture_content_false_in_experimental_mode_with_no_content( + self, + ): + handler = self._make_handler() + self.assertFalse(handler.should_capture_content()) + + @patch_env_vars("gen_ai_latest_experimental", "no_content", "false") + def test_should_capture_content_true_in_experimental_mode_no_content_but_hook_set( + self, + ): + # Hook overrides no_content mode + hook = MagicMock() + handler = self._make_handler(hook) + self.assertTrue(handler.should_capture_content()) + + @patch_env_vars("gen_ai_latest_experimental", "no_content", "false") + @patch.dict( + os.environ, + {OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT: "true"}, + ) + def test_should_capture_content_false_in_experimental_mode_ignores_legacy_env( + self, + ): + # Legacy CAPTURE_MESSAGE_CONTENT=true should NOT override NO_CONTENT in experimental mode + handler = self._make_handler() + self.assertFalse(handler.should_capture_content()) + + def test_workflow_hook_called_on_stop_with_messages(self): + hook = MagicMock() + handler = self._make_handler(hook) + + input_messages = [ + InputMessage(role="user", parts=[Text(content="what is 2+2?")]) + ] + output_messages = [ + OutputMessage( + role="assistant", + parts=[Text(content="4")], + finish_reason="stop", + ) + ] + + invocation = handler.start_workflow(name="my-workflow") + invocation.input_messages = input_messages + invocation.output_messages = output_messages + invocation.stop() + + hook.on_completion.assert_called_once() + kwargs = hook.on_completion.call_args.kwargs + self.assertEqual(kwargs["inputs"], input_messages) + self.assertEqual(kwargs["outputs"], output_messages) + self.assertEqual(kwargs["system_instruction"], []) + self.assertIsNotNone(kwargs["span"]) + self.assertIsNone(kwargs["log_record"]) + + def test_workflow_hook_called_on_fail(self): + hook = MagicMock() + handler = self._make_handler(hook) + + invocation = handler.start_workflow(name="my-workflow") + invocation.input_messages = [ + InputMessage(role="user", parts=[Text(content="hello")]) + ] + invocation.fail(RuntimeError("workflow failed")) + + hook.on_completion.assert_called_once() + kwargs = hook.on_completion.call_args.kwargs + self.assertIsNotNone(kwargs["span"]) + + def test_workflow_hook_called_with_empty_messages_when_none_set(self): + hook = MagicMock() + handler = self._make_handler(hook) + + handler.start_workflow(name="my-workflow").stop() + + hook.on_completion.assert_called_once() + kwargs = hook.on_completion.call_args.kwargs + self.assertEqual(kwargs["inputs"], []) + self.assertEqual(kwargs["outputs"], [])