diff --git a/CHANGELOG.md b/CHANGELOG.md index 564340ae26..3c9c67be4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-docker-tests`: Replace deprecated `SpanAttributes` from `opentelemetry.semconv.trace` with `opentelemetry.semconv._incubating.attributes` ([#4339](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4339)) +- `opentelemetry-instrumentation-confluent-kafka`: Skip `recv` span creation when `poll()` returns no message or `consume()` returns an empty list, avoiding empty spans on idle polls + ([#4349](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4349)) - Fix intermittent `Core Contrib Test` CI failures caused by GitHub git CDN SHA propagation lag by installing core packages from the already-checked-out local copy instead of a second git clone ([#4305](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4305)) - Don't import module in unwrap if not already imported diff --git a/docs/performance/benchmarks.rst b/docs/performance/benchmarks.rst index 99859a27d8..9ef78c099d 100644 --- a/docs/performance/benchmarks.rst +++ b/docs/performance/benchmarks.rst @@ -1,4 +1,8 @@ Performance Tests - Benchmarks ============================== -Click `here `_ to view the latest performance benchmarks for packages in this repo. +For instructions on running and writing benchmarks locally, see the +`Benchmarks section of CONTRIBUTING.md `_. + +Live benchmark results are published at +`opentelemetry.io/docs/languages/python/benchmarks `_. diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-anthropic/pyproject.toml index 74c411a1a5..5cc6754ef7 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/pyproject.toml @@ -26,16 +26,14 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] dependencies = [ - "opentelemetry-api ~= 1.37", - "opentelemetry-instrumentation ~= 0.58b0", - "opentelemetry-semantic-conventions ~= 0.58b0", + "opentelemetry-api ~= 1.39", + "opentelemetry-instrumentation ~= 0.60b0", + "opentelemetry-semantic-conventions ~= 0.60b0", "opentelemetry-util-genai >= 0.2b0, <0.4b0", ] [project.optional-dependencies] -instruments = [ - "anthropic >= 0.51.0", -] +instruments = ["anthropic >= 0.51.0"] [project.entry-points.opentelemetry_instrumentor] anthropic = "opentelemetry.instrumentation.anthropic:AnthropicInstrumentor" @@ -48,15 +46,10 @@ Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" path = "src/opentelemetry/instrumentation/anthropic/version.py" [tool.hatch.build.targets.sdist] -include = [ - "/src", - "/tests", - "/examples", -] +include = ["/src", "/tests", "/examples"] [tool.hatch.build.targets.wheel] packages = ["src/opentelemetry"] [tool.pytest.ini_options] testpaths = ["tests"] - diff --git a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/requirements.oldest.txt b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/requirements.oldest.txt index 0b77b1a7cb..1b1d2b1994 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/requirements.oldest.txt +++ b/instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/requirements.oldest.txt @@ -21,8 +21,8 @@ pytest==7.4.4 pytest-vcr==1.0.2 pytest-asyncio==0.21.0 wrapt==1.16.0 -opentelemetry-api==1.37 # when updating, also update in pyproject.toml -opentelemetry-sdk==1.37 # when updating, also update in pyproject.toml -opentelemetry-semantic-conventions==0.58b0 # when updating, also update in pyproject.toml +opentelemetry-api==1.39 # when updating, also update in pyproject.toml +opentelemetry-sdk==1.39 # when updating, also update in pyproject.toml +opentelemetry-semantic-conventions==0.60b0 # when updating, also update in pyproject.toml -e instrumentation-genai/opentelemetry-instrumentation-anthropic diff --git a/instrumentation-genai/opentelemetry-instrumentation-claude-agent-sdk/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-claude-agent-sdk/pyproject.toml index 3149629fac..fb38e61589 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-claude-agent-sdk/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-claude-agent-sdk/pyproject.toml @@ -24,16 +24,14 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] dependencies = [ - "opentelemetry-api ~= 1.37", - "opentelemetry-instrumentation ~= 0.58b0", - "opentelemetry-semantic-conventions ~= 0.58b0", + "opentelemetry-api ~= 1.39", + "opentelemetry-instrumentation ~= 0.60b0", + "opentelemetry-semantic-conventions ~= 0.60b0", "opentelemetry-util-genai >= 0.2b0, <0.4b0", ] [project.optional-dependencies] -instruments = [ - "claude-agent-sdk >= 0.1.14", -] +instruments = ["claude-agent-sdk >= 0.1.14"] [project.entry-points.opentelemetry_instrumentor] claude-agent-sdk = "opentelemetry.instrumentation.claude_agent_sdk:ClaudeAgentSDKInstrumentor" @@ -46,11 +44,7 @@ Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" path = "src/opentelemetry/instrumentation/claude_agent_sdk/version.py" [tool.hatch.build.targets.sdist] -include = [ - "/src", - "/tests", - "/examples", -] +include = ["/src", "/tests", "/examples"] [tool.hatch.build.targets.wheel] packages = ["src/opentelemetry"] diff --git a/instrumentation-genai/opentelemetry-instrumentation-claude-agent-sdk/tests/requirements.oldest.txt b/instrumentation-genai/opentelemetry-instrumentation-claude-agent-sdk/tests/requirements.oldest.txt index adfadef283..833e05fb1d 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-claude-agent-sdk/tests/requirements.oldest.txt +++ b/instrumentation-genai/opentelemetry-instrumentation-claude-agent-sdk/tests/requirements.oldest.txt @@ -21,8 +21,8 @@ pytest==7.4.4 pytest-vcr==1.0.2 pytest-asyncio==0.21.0 wrapt==1.16.0 -opentelemetry-api==1.37 # when updating, also update in pyproject.toml -opentelemetry-sdk==1.37 # when updating, also update in pyproject.toml -opentelemetry-semantic-conventions==0.58b0 # when updating, also update in pyproject.toml +opentelemetry-api==1.39 # when updating, also update in pyproject.toml +opentelemetry-sdk==1.39 # when updating, also update in pyproject.toml +opentelemetry-semantic-conventions==0.60b0 # when updating, also update in pyproject.toml -e instrumentation-genai/opentelemetry-instrumentation-claude-agent-sdk diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md index c75cdf2f27..76d2deabc4 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Fix `ChoiceBuffer` crash on streaming tool-call deltas with `arguments=None` + ([#4350](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4350)) - Fix `StreamWrapper` missing `.headers` and other attributes when using `with_raw_response` streaming ([#4113](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/4113)) - Add opt-in support for latest experimental semantic conventions (v1.37.0). Set diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/pyproject.toml index fc5939985b..7b4fcce224 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/pyproject.toml @@ -26,16 +26,14 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] dependencies = [ - "opentelemetry-api ~= 1.37", - "opentelemetry-instrumentation ~= 0.58b0", - "opentelemetry-semantic-conventions ~= 0.58b0", + "opentelemetry-api ~= 1.39", + "opentelemetry-instrumentation ~= 0.60b0", + "opentelemetry-semantic-conventions ~= 0.60b0", "opentelemetry-util-genai", ] [project.optional-dependencies] -instruments = [ - "openai >= 1.26.0", -] +instruments = ["openai >= 1.26.0"] [project.entry-points.opentelemetry_instrumentor] openai = "opentelemetry.instrumentation.openai_v2:OpenAIInstrumentor" @@ -48,10 +46,7 @@ Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" path = "src/opentelemetry/instrumentation/openai_v2/version.py" [tool.hatch.build.targets.sdist] -include = [ - "/src", - "/tests", -] +include = ["/src", "/tests"] [tool.hatch.build.targets.wheel] packages = ["src/opentelemetry"] diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py index 3bc42f103c..2c8c1b4c71 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py @@ -582,7 +582,8 @@ def __init__(self, index, tool_call_id, function_name): self.arguments = [] def append_arguments(self, arguments): - self.arguments.append(arguments) + if arguments is not None: + self.arguments.append(arguments) class ChoiceBuffer: @@ -601,13 +602,16 @@ def append_tool_call(self, tool_call): for _ in range(len(self.tool_calls_buffers), idx + 1): self.tool_calls_buffers.append(None) + function = tool_call.function if not self.tool_calls_buffers[idx]: self.tool_calls_buffers[idx] = ToolCallBuffer( - idx, tool_call.id, tool_call.function.name + idx, + tool_call.id, + function.name if function else None, ) - self.tool_calls_buffers[idx].append_arguments( - tool_call.function.arguments - ) + + if function: + self.tool_calls_buffers[idx].append_arguments(function.arguments) class BaseStreamWrapper: diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/requirements.oldest.txt b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/requirements.oldest.txt index 2644ba47e8..45339fb438 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/requirements.oldest.txt +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/requirements.oldest.txt @@ -29,9 +29,9 @@ pytest-vcr==1.0.2 pytest-asyncio==0.21.0 wrapt==1.16.0 opentelemetry-exporter-otlp-proto-http~=1.30 -opentelemetry-api==1.37 # when updating, also update in pyproject.toml -opentelemetry-sdk==1.37 # when updating, also update in pyproject.toml -opentelemetry-semantic-conventions==0.58b0 # when updating, also update in pyproject.toml +opentelemetry-api==1.39 # when updating, also update in pyproject.toml +opentelemetry-sdk==1.39 # when updating, also update in pyproject.toml +opentelemetry-semantic-conventions==0.60b0 # when updating, also update in pyproject.toml -e instrumentation-genai/opentelemetry-instrumentation-openai-v2 -e util/opentelemetry-util-genai \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_choice_buffer.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_choice_buffer.py new file mode 100644 index 0000000000..7717ff73b2 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_choice_buffer.py @@ -0,0 +1,182 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for ChoiceBuffer and ToolCallBuffer classes.""" + +from openai.types.chat.chat_completion_chunk import ( + ChoiceDeltaToolCall, + ChoiceDeltaToolCallFunction, +) + +from opentelemetry.instrumentation.openai_v2.patch import ( + ChoiceBuffer, + ToolCallBuffer, +) + + +def test_toolcallbuffer_append_arguments_with_string(): + buf = ToolCallBuffer(0, "call_1", "get_weather") + buf.append_arguments('{"city":') + buf.append_arguments(' "NYC"}') + assert "".join(buf.arguments) == '{"city": "NYC"}' + + +def test_toolcallbuffer_append_arguments_with_none_is_skipped(): + """Regression test for issue #4344. + + Some OpenAI-compatible providers (vLLM, TGI, etc.) send + arguments=None on tool-call delta chunks instead of arguments="". + This must not crash when joining the arguments list. + """ + buf = ToolCallBuffer(0, "call_1", "get_weather") + buf.append_arguments(None) + buf.append_arguments('{"city": "NYC"}') + buf.append_arguments(None) + assert "".join(buf.arguments) == '{"city": "NYC"}' + + +def test_toolcallbuffer_append_arguments_all_none(): + buf = ToolCallBuffer(0, "call_1", "get_weather") + buf.append_arguments(None) + buf.append_arguments(None) + assert "".join(buf.arguments) == "" + + +def test_toolcallbuffer_append_arguments_empty_string(): + buf = ToolCallBuffer(0, "call_1", "get_weather") + buf.append_arguments("") + buf.append_arguments('{"city": "NYC"}') + assert "".join(buf.arguments) == '{"city": "NYC"}' + + +def test_choicebuffer_append_tool_call_with_none_arguments(): + """End-to-end regression test for issue #4344. + + Simulates the exact scenario from the bug report where a provider + sends arguments=None on the first tool-call delta chunk. + """ + buf = ChoiceBuffer(0) + buf.append_tool_call( + ChoiceDeltaToolCall( + index=0, + id="call_1", + type="function", + function=ChoiceDeltaToolCallFunction( + name="get_weather", arguments=None + ), + ) + ) + buf.append_tool_call( + ChoiceDeltaToolCall( + index=0, + function=ChoiceDeltaToolCallFunction(arguments='{"city": "NYC"}'), + ) + ) + + # This must not raise TypeError + result = "".join(buf.tool_calls_buffers[0].arguments) + assert result == '{"city": "NYC"}' + + +def test_choicebuffer_append_tool_call_normal_flow(): + """Standard OpenAI flow where arguments="" on first delta.""" + buf = ChoiceBuffer(0) + buf.append_tool_call( + ChoiceDeltaToolCall( + index=0, + id="call_1", + type="function", + function=ChoiceDeltaToolCallFunction( + name="get_weather", arguments="" + ), + ) + ) + buf.append_tool_call( + ChoiceDeltaToolCall( + index=0, + function=ChoiceDeltaToolCallFunction(arguments='{"city": "NYC"}'), + ) + ) + + result = "".join(buf.tool_calls_buffers[0].arguments) + assert result == '{"city": "NYC"}' + + +def test_choicebuffer_append_multiple_tool_calls_with_none_arguments(): + """Multiple tool calls where some have arguments=None.""" + buf = ChoiceBuffer(0) + + # First tool call + buf.append_tool_call( + ChoiceDeltaToolCall( + index=0, + id="call_1", + type="function", + function=ChoiceDeltaToolCallFunction( + name="get_weather", arguments=None + ), + ) + ) + buf.append_tool_call( + ChoiceDeltaToolCall( + index=0, + function=ChoiceDeltaToolCallFunction(arguments='{"city": "NYC"}'), + ) + ) + + # Second tool call + buf.append_tool_call( + ChoiceDeltaToolCall( + index=1, + id="call_2", + type="function", + function=ChoiceDeltaToolCallFunction( + name="get_time", arguments=None + ), + ) + ) + buf.append_tool_call( + ChoiceDeltaToolCall( + index=1, + function=ChoiceDeltaToolCallFunction(arguments='{"tz": "EST"}'), + ) + ) + + assert "".join(buf.tool_calls_buffers[0].arguments) == '{"city": "NYC"}' + assert "".join(buf.tool_calls_buffers[1].arguments) == '{"tz": "EST"}' + + +def test_choicebuffer_append_tool_call_with_none_function(): + """Handle delta chunks where function is None.""" + buf = ChoiceBuffer(0) + buf.append_tool_call( + ChoiceDeltaToolCall( + index=0, + id="call_1", + type="function", + function=ChoiceDeltaToolCallFunction( + name="get_weather", arguments='{"city": "NYC"}' + ), + ) + ) + # Subsequent delta with function=None should not crash + buf.append_tool_call( + ChoiceDeltaToolCall( + index=0, + function=None, + ) + ) + + result = "".join(buf.tool_calls_buffers[0].arguments) + assert result == '{"city": "NYC"}' diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index f4606bc4d9..ed390d7006 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -383,11 +383,11 @@ def wrap_poll(func, instance, tracer, args, kwargs): if instance._current_consume_span: _end_current_consume_span(instance) - with tracer.start_as_current_span( - "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER - ): - record = func(*args, **kwargs) - if record: + record = func(*args, **kwargs) + if record: + with tracer.start_as_current_span( + "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER + ): _create_new_consume_span(instance, tracer, [record]) _enrich_span( instance._current_consume_span, @@ -396,9 +396,9 @@ def wrap_poll(func, instance, tracer, args, kwargs): record.offset(), operation=MessagingOperationTypeValues.PROCESS, ) - instance._current_context_token = context.attach( - trace.set_span_in_context(instance._current_consume_span) - ) + instance._current_context_token = context.attach( + trace.set_span_in_context(instance._current_consume_span) + ) return record @@ -407,21 +407,20 @@ def wrap_consume(func, instance, tracer, args, kwargs): if instance._current_consume_span: _end_current_consume_span(instance) - with tracer.start_as_current_span( - "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER - ): - records = func(*args, **kwargs) - if len(records) > 0: + records = func(*args, **kwargs) + if len(records) > 0: + with tracer.start_as_current_span( + "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER + ): _create_new_consume_span(instance, tracer, records) _enrich_span( instance._current_consume_span, records[0].topic(), operation=MessagingOperationTypeValues.PROCESS, ) - - instance._current_context_token = context.attach( - trace.set_span_in_context(instance._current_consume_span) - ) + instance._current_context_token = context.attach( + trace.set_span_in_context(instance._current_consume_span) + ) return records diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 365ac333d9..772ecd09ee 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -197,7 +197,6 @@ def test_poll(self) -> None: MESSAGING_MESSAGE_ID: "topic-30.1.3", }, }, - {"name": "recv", "attributes": {}}, ] consumer = MockConsumer( @@ -213,7 +212,7 @@ def test_poll(self) -> None: consumer.poll() consumer.poll() consumer.poll() - consumer.poll() + consumer.poll() # empty poll — must not produce a span span_list = self.memory_exporter.get_finished_spans() self._compare_spans(span_list, expected_spans) @@ -259,7 +258,6 @@ def test_consume(self) -> None: SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, }, - {"name": "recv", "attributes": {}}, ] consumer = MockConsumer( @@ -276,10 +274,86 @@ def test_consume(self) -> None: consumer.consume(3) consumer.consume(1) consumer.consume(2) - consumer.consume(1) + consumer.consume(1) # empty consume — must not produce a span span_list = self.memory_exporter.get_finished_spans() self._compare_spans(span_list, expected_spans) + def test_poll_empty_does_not_create_span(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + consumer = MockConsumer( + [], + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.poll() + consumer.poll() + + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 0) + + def test_consume_empty_does_not_create_span(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + consumer = MockConsumer( + [], + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.consume(5) + consumer.consume(5) + + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 0) + + def test_poll_empty_cleans_up_previous_span_and_token(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + consumer = MockConsumer( + [MockedMessage("topic-1", 0, 0, [])], + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + consumer = instrumentation.instrument_consumer(consumer) + consumer.poll() # non-empty: sets _current_consume_span and _current_context_token + self.assertIsNotNone(consumer._current_consume_span) + self.assertIsNotNone(consumer._current_context_token) + + consumer.poll() # empty: should clean up both + self.assertIsNone(consumer._current_consume_span) + self.assertIsNone(consumer._current_context_token) + + def test_consume_empty_cleans_up_previous_span_and_token(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + consumer = MockConsumer( + [MockedMessage("topic-1", 0, 0, [])], + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + consumer = instrumentation.instrument_consumer(consumer) + consumer.consume( + 1 + ) # non-empty: sets _current_consume_span and _current_context_token + self.assertIsNotNone(consumer._current_consume_span) + self.assertIsNotNone(consumer._current_context_token) + + consumer.consume(1) # empty: should clean up both + self.assertIsNone(consumer._current_consume_span) + self.assertIsNone(consumer._current_context_token) + def test_close(self) -> None: instrumentation = ConfluentKafkaInstrumentor() mocked_messages = [ diff --git a/opamp/opentelemetry-opamp-client/CHANGELOG.md b/opamp/opentelemetry-opamp-client/CHANGELOG.md index f981e2dcf3..60e9f9ebcd 100644 --- a/opamp/opentelemetry-opamp-client/CHANGELOG.md +++ b/opamp/opentelemetry-opamp-client/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Breaking change: callback class `Callbacks` renamed to `OpAMPCallbacks` + ([#4355](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4355)) + +## Version 0.1b0 (2026-03-23) + - Initial implementation ([#3635](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3635)) - Update client to have additional callback methods diff --git a/opamp/opentelemetry-opamp-client/pyproject.toml b/opamp/opentelemetry-opamp-client/pyproject.toml index 31f10501c5..978878c620 100644 --- a/opamp/opentelemetry-opamp-client/pyproject.toml +++ b/opamp/opentelemetry-opamp-client/pyproject.toml @@ -23,6 +23,7 @@ classifiers = [ "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", ] dependencies = [ "opentelemetry-api ~= 1.12", diff --git a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/__init__.py b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/__init__.py index ed85131f04..0a9cefcd1f 100644 --- a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/__init__.py +++ b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/__init__.py @@ -35,7 +35,7 @@ Since OpAMP APIs, config options or environment variables are not standardizes the distros are required to provide code doing so. -OTel Python distros would need to provide their own Callbacks subclass that implements the actual +OTel Python distros would need to provide their own OpAMPCallbacks subclass that implements the actual change of whatever configuration their backends sends. Please note that the API is not finalized yet and so the name is called ``_opamp`` with the underscore. @@ -48,13 +48,13 @@ import os from opentelemetry._opamp.agent import OpAMPAgent - from opentelemetry._opamp.callbacks import Callbacks + from opentelemetry._opamp.callbacks import OpAMPCallbacks from opentelemetry._opamp.client import OpAMPClient from opentelemetry.sdk._configuration import _OTelSDKConfigurator from opentelemetry.sdk.resources import OTELResourceDetector - class MyCallbacks(Callbacks): + class MyCallbacks(OpAMPCallbacks): def on_message(self, agent, client, message): if message.remote_config is None: return @@ -93,7 +93,7 @@ def _configure(self, **kwargs): """ from opentelemetry._opamp.agent import OpAMPAgent -from opentelemetry._opamp.callbacks import Callbacks, MessageData +from opentelemetry._opamp.callbacks import MessageData, OpAMPCallbacks from opentelemetry._opamp.client import OpAMPClient -__all__ = ["Callbacks", "MessageData", "OpAMPAgent", "OpAMPClient"] +__all__ = ["MessageData", "OpAMPAgent", "OpAMPCallbacks", "OpAMPClient"] diff --git a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/agent.py b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/agent.py index 2616726fcc..917f753a4f 100644 --- a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/agent.py +++ b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/agent.py @@ -21,7 +21,7 @@ import threading from typing import Any, Callable -from opentelemetry._opamp.callbacks import Callbacks, MessageData +from opentelemetry._opamp.callbacks import MessageData, OpAMPCallbacks from opentelemetry._opamp.client import OpAMPClient from opentelemetry._opamp.proto import opamp_pb2 @@ -85,7 +85,7 @@ def __init__( self, *, interval: float = 30, - callbacks: Callbacks, + callbacks: OpAMPCallbacks, max_retries: int = 10, heartbeat_max_retries: int = 1, initial_backoff: float = 1.0, @@ -93,7 +93,7 @@ def __init__( ): """ :param interval: seconds between heartbeat calls - :param callbacks: Callbacks instance for receiving client events + :param callbacks: OpAMPCallbacks instance for receiving client events :param max_retries: how many times to retry a failed job for ad-hoc messages :param heartbeat_max_retries: how many times to retry an heartbeat failed job :param initial_backoff: base seconds for exponential backoff diff --git a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/callbacks.py b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/callbacks.py index 78bffca66b..c543793ded 100644 --- a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/callbacks.py +++ b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/callbacks.py @@ -47,7 +47,7 @@ def from_server_message( ) -class Callbacks(ABC): +class OpAMPCallbacks(ABC): """OpAMP client callbacks with no-op defaults. All methods have no-op defaults so that subclasses only need to diff --git a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/version.py b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/version.py index e7bf4a48eb..29e61950cc 100644 --- a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/version.py +++ b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.1b0.dev" +__version__ = "0.2b0.dev" diff --git a/opamp/opentelemetry-opamp-client/tests/opamp/test_agent.py b/opamp/opentelemetry-opamp-client/tests/opamp/test_agent.py index 9c7e7f34f0..e66598df73 100644 --- a/opamp/opentelemetry-opamp-client/tests/opamp/test_agent.py +++ b/opamp/opentelemetry-opamp-client/tests/opamp/test_agent.py @@ -18,11 +18,11 @@ from opentelemetry._opamp.agent import OpAMPAgent, _safe_invoke from opentelemetry._opamp.agent import _Job as Job -from opentelemetry._opamp.callbacks import Callbacks, MessageData +from opentelemetry._opamp.callbacks import MessageData, OpAMPCallbacks from opentelemetry._opamp.proto import opamp_pb2 -class _NoOpCallbacks(Callbacks): +class _NoOpCallbacks(OpAMPCallbacks): pass @@ -48,7 +48,7 @@ def test_agent_start_will_send_connection_and_disconnetion_messages(): mock_message.flags = 0 client_mock.send.return_value = mock_message - cb = mock.create_autospec(Callbacks, instance=True) + cb = mock.create_autospec(OpAMPCallbacks, instance=True) agent = OpAMPAgent(interval=30, client=client_mock, callbacks=cb) agent.start() # wait for the queue to be consumed @@ -100,7 +100,7 @@ def test_agent_send_warns_without_worker_thread(caplog): def test_agent_retries_before_max_attempts(caplog): caplog.set_level(logging.DEBUG, logger="opentelemetry._opamp.agent") - cb = mock.create_autospec(Callbacks, instance=True) + cb = mock.create_autospec(OpAMPCallbacks, instance=True) client_mock = mock.Mock() connection_message = mock.Mock() connection_message.HasField.return_value = False @@ -136,7 +136,7 @@ def test_agent_retries_before_max_attempts(caplog): def test_agent_stops_after_max_attempts(caplog): caplog.set_level(logging.DEBUG, logger="opentelemetry._opamp.agent") - cb = mock.create_autospec(Callbacks, instance=True) + cb = mock.create_autospec(OpAMPCallbacks, instance=True) client_mock = mock.Mock() connection_message = mock.Mock() connection_message.HasField.return_value = False @@ -171,7 +171,7 @@ def test_agent_stops_after_max_attempts(caplog): def test_agent_send_enqueues_job(): - cb = mock.create_autospec(Callbacks, instance=True) + cb = mock.create_autospec(OpAMPCallbacks, instance=True) client_mock = mock.Mock() msg = mock.Mock() msg.HasField.return_value = False @@ -194,7 +194,7 @@ def test_agent_send_enqueues_job(): def test_on_error_called_without_on_message_for_error_response(): - cb = mock.create_autospec(Callbacks, instance=True) + cb = mock.create_autospec(OpAMPCallbacks, instance=True) client_mock = mock.Mock() error_response = opamp_pb2.ServerErrorResponse( @@ -224,7 +224,7 @@ def test_on_error_called_without_on_message_for_error_response(): def test_on_error_not_called_without_error_response(): - cb = mock.create_autospec(Callbacks, instance=True) + cb = mock.create_autospec(OpAMPCallbacks, instance=True) client_mock = mock.Mock() server_msg = opamp_pb2.ServerToAgent() @@ -255,7 +255,7 @@ def test_dispatch_order_with_error(): error_response=error_response, ) - class OrderTrackingCallbacks(Callbacks): + class OrderTrackingCallbacks(OpAMPCallbacks): def on_connect(self, agent, client): call_order.append("on_connect") @@ -286,7 +286,7 @@ def test_dispatch_order_without_error(): server_msg = opamp_pb2.ServerToAgent() - class OrderTrackingCallbacks(Callbacks): + class OrderTrackingCallbacks(OpAMPCallbacks): def on_connect(self, agent, client): call_order.append("on_connect") @@ -311,7 +311,7 @@ def on_error(self, agent, client, error_response): def test_report_full_state_flag_triggers_full_state_send(): - cb = mock.create_autospec(Callbacks, instance=True) + cb = mock.create_autospec(OpAMPCallbacks, instance=True) client_mock = mock.Mock() conn_msg = opamp_pb2.ServerToAgent() diff --git a/opamp/opentelemetry-opamp-client/tests/opamp/test_callbacks.py b/opamp/opentelemetry-opamp-client/tests/opamp/test_callbacks.py index 66ec8ef085..9493155af0 100644 --- a/opamp/opentelemetry-opamp-client/tests/opamp/test_callbacks.py +++ b/opamp/opentelemetry-opamp-client/tests/opamp/test_callbacks.py @@ -14,12 +14,12 @@ from unittest import mock -from opentelemetry._opamp.callbacks import Callbacks, MessageData +from opentelemetry._opamp.callbacks import MessageData, OpAMPCallbacks from opentelemetry._opamp.proto import opamp_pb2 def test_subclass_override_subset(): - class MyCallbacks(Callbacks): + class MyCallbacks(OpAMPCallbacks): def __init__(self): self.connected = False diff --git a/opamp/opentelemetry-opamp-client/tests/opamp/test_e2e.py b/opamp/opentelemetry-opamp-client/tests/opamp/test_e2e.py index 56998dd255..1bb9df6a4c 100644 --- a/opamp/opentelemetry-opamp-client/tests/opamp/test_e2e.py +++ b/opamp/opentelemetry-opamp-client/tests/opamp/test_e2e.py @@ -20,7 +20,7 @@ import pytest from opentelemetry._opamp.agent import OpAMPAgent -from opentelemetry._opamp.callbacks import Callbacks +from opentelemetry._opamp.callbacks import OpAMPCallbacks from opentelemetry._opamp.client import OpAMPClient from opentelemetry._opamp.proto import opamp_pb2 @@ -33,7 +33,7 @@ def test_connection_remote_config_status_heartbeat_disconnection(caplog): caplog.set_level(logging.DEBUG, logger="opentelemetry._opamp.agent") - class E2ECallbacks(Callbacks): + class E2ECallbacks(OpAMPCallbacks): def on_message(self, agent, client, message): logger = logging.getLogger( "opentelemetry._opamp.agent.opamp_handler" @@ -104,7 +104,7 @@ def on_message(self, agent, client, message): def test_with_server_not_responding(caplog): caplog.set_level(logging.DEBUG, logger="opentelemetry._opamp.agent") - cb = mock.create_autospec(Callbacks, instance=True) + cb = mock.create_autospec(OpAMPCallbacks, instance=True) opamp_client = OpAMPClient( endpoint="https://localhost:4399/v1/opamp", diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 1b0a444b95..1dde90ebf8 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,10 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add EmbeddingInvocation span lifecycle support + ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4219](#4219)) - Populate schema_url on metrics ([#4320](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4320)) - Add workflow invocation type to genAI utils ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4310](#4310)) +- Add support for workflow in genAI utils handler. + ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4334](#4334)) ## Version 0.3b0 (2026-02-20) @@ -41,10 +45,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795)) - Make inputs / outputs / system instructions optional params to `on_completion`, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3802](#3802)). - - Use a SHA256 hash of the system instructions as it's upload filename, and check +- Use a SHA256 hash of the system instructions as it's upload filename, and check if the file exists before re-uploading it, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3814](#3814)). - ## Version 0.1b0 (2025-09-25) - Add completion hook to genai utils to implement semconv v1.37. @@ -57,6 +60,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3752](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752)) ([#3759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3759)) ([#3763](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3763)) + - Add a utility to parse the `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` environment variable. Add `gen_ai_latest_experimental` as a new value to the Sem Conv stability flag ([#3716](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3716)). diff --git a/util/opentelemetry-util-genai/README.rst b/util/opentelemetry-util-genai/README.rst index bc4b348fcf..94806e0159 100644 --- a/util/opentelemetry-util-genai/README.rst +++ b/util/opentelemetry-util-genai/README.rst @@ -36,6 +36,18 @@ This package provides these span attributes: - `gen_ai.output.messages`: Str('[{"role": "AI", "parts": [{"content": "hello back", "type": "text"}], "finish_reason": "stop"}]') - `gen_ai.system_instructions`: Str('[{"content": "You are a helpful assistant.", "type": "text"}]') (when system instruction is provided) +This package also supports embedding invocation spans via the `embedding` context manager. +For embedding invocations, common attributes include: + +- `gen_ai.provider.name`: Str(openai) +- `gen_ai.operation.name`: Str(embeddings) +- `gen_ai.request.model`: Str(text-embedding-3-small) +- `gen_ai.embeddings.dimension.count`: Int(1536) +- `gen_ai.request.encoding_formats`: Slice(["float"]) +- `gen_ai.usage.input_tokens`: Int(24) +- `server.address`: Str(api.openai.com) +- `server.port`: Int(443) + When `EVENT_ONLY` or `SPAN_AND_EVENT` mode is enabled and a LoggerProvider is configured, the package also emits `gen_ai.client.inference.operation.details` events with structured message content (as dictionaries instead of JSON strings). Note that when using `EVENT_ONLY` diff --git a/util/opentelemetry-util-genai/pyproject.toml b/util/opentelemetry-util-genai/pyproject.toml index c9d4d388c1..f8705369c2 100644 --- a/util/opentelemetry-util-genai/pyproject.toml +++ b/util/opentelemetry-util-genai/pyproject.toml @@ -26,9 +26,9 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] dependencies = [ - "opentelemetry-instrumentation ~= 0.58b0", - "opentelemetry-semantic-conventions ~= 0.58b0", - "opentelemetry-api>=1.31.0", + "opentelemetry-instrumentation ~= 0.60b0", + "opentelemetry-semantic-conventions ~= 0.60b0", + "opentelemetry-api>=1.39", ] [project.entry-points.opentelemetry_genai_completion_hook] @@ -46,10 +46,7 @@ Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" path = "src/opentelemetry/util/genai/version.py" [tool.hatch.build.targets.sdist] -include = [ - "/src", - "/tests", -] +include = ["/src", "/tests"] [tool.hatch.build.targets.wheel] packages = ["src/opentelemetry"] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 4e85799ea1..12cca8f5a3 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -60,9 +60,10 @@ from __future__ import annotations +import logging import timeit from contextlib import contextmanager -from typing import Iterator +from typing import Iterator, TypeVar from opentelemetry import context as otel_context from opentelemetry._logs import ( @@ -80,13 +81,43 @@ ) from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.span_utils import ( + _apply_embedding_finish_attributes, _apply_error_attributes, _apply_llm_finish_attributes, + _apply_workflow_finish_attributes, + _get_embedding_span_name, + _get_llm_span_name, + _get_workflow_span_name, _maybe_emit_llm_event, ) -from opentelemetry.util.genai.types import Error, LLMInvocation +from opentelemetry.util.genai.types import ( + EmbeddingInvocation, + Error, + GenAIInvocation, + LLMInvocation, + WorkflowInvocation, +) from opentelemetry.util.genai.version import __version__ +_logger = logging.getLogger(__name__) + + +def _safe_detach(invocation: GenAIInvocation) -> None: + """Detach the context token if still present, as a safety net.""" + if invocation.context_token is not None: + try: + otel_context.detach(invocation.context_token) + except Exception: # pylint: disable=broad-except + pass + if invocation.span is not None: + try: + invocation.span.end() + except Exception: # pylint: disable=broad-except + pass + + +_T = TypeVar("_T", bound=GenAIInvocation) + class TelemetryHandler: """ @@ -134,15 +165,35 @@ def _record_llm_metrics( error_type=error_type, ) - def start_llm( - self, - invocation: LLMInvocation, - ) -> LLMInvocation: - """Start an LLM invocation and create a pending span entry.""" - # Create a span and attach it as current; keep the token to detach later + @staticmethod + def _record_embedding_metrics( + invocation: EmbeddingInvocation, + span: Span | None = None, + *, + error_type: str | None = None, + ) -> None: + # Metrics recorder currently supports LLMInvocation fields only. + # Keep embedding metrics as a no-op until dedicated embedding + # metric support is added. + return + + def _start(self, invocation: _T) -> _T: + """Start a GenAI invocation and create a pending span entry.""" + if isinstance(invocation, LLMInvocation): + span_name = _get_llm_span_name(invocation) + kind = SpanKind.CLIENT + elif isinstance(invocation, EmbeddingInvocation): + span_name = _get_embedding_span_name(invocation) + kind = SpanKind.CLIENT + elif isinstance(invocation, WorkflowInvocation): + span_name = _get_workflow_span_name(invocation) + kind = SpanKind.CLIENT + else: + span_name = "" + kind = "" span = self._tracer.start_span( - name=f"{invocation.operation_name} {invocation.request_model}", - kind=SpanKind.CLIENT, + name=span_name, + kind=kind, ) # Record a monotonic start timestamp (seconds) for duration # calculation using timeit.default_timer. @@ -153,40 +204,94 @@ def start_llm( ) return invocation - def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disable=no-self-use - """Finalize an LLM invocation successfully and end its span.""" + def _stop(self, invocation: _T) -> _T: + """Finalize a GenAI invocation successfully and end its span.""" if invocation.context_token is None or invocation.span is None: # TODO: Provide feedback that this invocation was not started return invocation span = invocation.span - _apply_llm_finish_attributes(span, invocation) - self._record_llm_metrics(invocation, span) - _maybe_emit_llm_event(self._logger, span, invocation) - # Detach context and end span - otel_context.detach(invocation.context_token) - span.end() + try: + if isinstance(invocation, LLMInvocation): + _apply_llm_finish_attributes(span, invocation) + self._record_llm_metrics(invocation, span) + _maybe_emit_llm_event(self._logger, span, invocation) + elif isinstance(invocation, EmbeddingInvocation): + _apply_embedding_finish_attributes(span, invocation) + self._record_embedding_metrics(invocation, span) + elif isinstance(invocation, WorkflowInvocation): + _apply_workflow_finish_attributes(span, invocation) + # TODO: Add workflow metrics when supported + finally: + # Detach context and end span even if finishing fails + otel_context.detach(invocation.context_token) + span.end() return invocation - def fail_llm( # pylint: disable=no-self-use - self, invocation: LLMInvocation, error: Error - ) -> LLMInvocation: - """Fail an LLM invocation and end its span with error status.""" + def _fail(self, invocation: _T, error: Error) -> _T: + """Fail a GenAI invocation and end its span with error status.""" if invocation.context_token is None or invocation.span is None: # TODO: Provide feedback that this invocation was not started return invocation span = invocation.span - _apply_llm_finish_attributes(invocation.span, invocation) - _apply_error_attributes(invocation.span, error) - error_type = getattr(error.type, "__qualname__", None) - self._record_llm_metrics(invocation, span, error_type=error_type) - _maybe_emit_llm_event(self._logger, span, invocation, error) - # Detach context and end span - otel_context.detach(invocation.context_token) - span.end() + error_type = error.type.__qualname__ + try: + if isinstance(invocation, LLMInvocation): + _apply_llm_finish_attributes(span, invocation) + _apply_error_attributes(span, error, error_type) + self._record_llm_metrics( + invocation, span, error_type=error_type + ) + _maybe_emit_llm_event( + self._logger, span, invocation, error_type + ) + elif isinstance(invocation, EmbeddingInvocation): + _apply_embedding_finish_attributes(span, invocation) + _apply_error_attributes(span, error, error_type) + self._record_embedding_metrics( + invocation, span, error_type=error_type + ) + elif isinstance(invocation, WorkflowInvocation): + _apply_workflow_finish_attributes(span, invocation) + _apply_error_attributes(span, error, error_type) + # TODO: Add workflow metrics when supported + finally: + # Detach context and end span even if finishing fails + otel_context.detach(invocation.context_token) + span.end() return invocation + def start( + self, + invocation: _T, + ) -> _T: + """Start a GenAI invocation and create a pending span entry.""" + return self._start(invocation) + + def stop(self, invocation: _T) -> _T: + """Finalize a GenAI invocation successfully and end its span.""" + return self._stop(invocation) + + def fail(self, invocation: _T, error: Error) -> _T: + """Fail a GenAI invocation and end its span with error status.""" + return self._fail(invocation, error) + + # LLM-specific convenience methods + def start_llm(self, invocation: LLMInvocation) -> LLMInvocation: + """Start an LLM invocation and create a pending span entry.""" + return self._start(invocation) + + def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: + """Finalize an LLM invocation successfully and end its span.""" + return self._stop(invocation) + + def fail_llm( + self, invocation: LLMInvocation, error: Error + ) -> LLMInvocation: + """Fail an LLM invocation and end its span with error status.""" + return self._fail(invocation, error) + @contextmanager def llm( self, invocation: LLMInvocation | None = None @@ -211,6 +316,68 @@ def llm( raise self.stop_llm(invocation) + @contextmanager + def embedding( + self, invocation: EmbeddingInvocation | None = None + ) -> Iterator[EmbeddingInvocation]: + """Context manager for Embedding invocations. + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + if invocation is None: + invocation = EmbeddingInvocation() + self.start(invocation) + try: + yield invocation + except Exception as exc: + self.fail(invocation, Error(message=str(exc), type=type(exc))) + raise + self.stop(invocation) + + @contextmanager + def workflow( + self, invocation: WorkflowInvocation | None = None + ) -> Iterator[WorkflowInvocation]: + """Context manager for Workflow invocations. + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + if invocation is None: + invocation = WorkflowInvocation() + + try: + self.start(invocation) + except Exception: # pylint: disable=broad-except + _logger.warning( + "Failed to start workflow telemetry", exc_info=True + ) + + try: + yield invocation + except Exception as exc: + try: + self.fail(invocation, Error(message=str(exc), type=type(exc))) + except Exception: # pylint: disable=broad-except + _logger.warning( + "Failed to record workflow failure", exc_info=True + ) + _safe_detach(invocation) + raise + + try: + self.stop(invocation) + except Exception: # pylint: disable=broad-except + _logger.warning("Failed to stop workflow telemetry", exc_info=True) + _safe_detach(invocation) + def get_telemetry_handler( tracer_provider: TracerProvider | None = None, diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py index 889994436f..c61f49d425 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py @@ -32,11 +32,14 @@ from opentelemetry.trace.propagation import set_span_in_context from opentelemetry.trace.status import Status, StatusCode from opentelemetry.util.genai.types import ( + EmbeddingInvocation, Error, + GenAIInvocation, InputMessage, LLMInvocation, MessagePart, OutputMessage, + WorkflowInvocation, ) from opentelemetry.util.genai.utils import ( ContentCapturingMode, @@ -68,9 +71,49 @@ def _get_llm_common_attributes( } +def _get_embedding_common_attributes( + invocation: EmbeddingInvocation, +) -> dict[str, Any]: + """Get common Embedding attributes shared by finish() and error() paths. + + Returns a dictionary of attributes. + """ + optional_attrs = ( + (server_attributes.SERVER_ADDRESS, invocation.server_address), + (server_attributes.SERVER_PORT, invocation.server_port), + ) + + return { + GenAI.GEN_AI_OPERATION_NAME: invocation.operation_name, + GenAI.GEN_AI_PROVIDER_NAME: invocation.provider, + **{key: value for key, value in optional_attrs if value is not None}, + } + + +def _get_span_name( + invocation: GenAIInvocation, +) -> str: + """Get the span name for a GenAI invocation.""" + operation_name = getattr(invocation, "operation_name", None) or "" + request_model = getattr(invocation, "request_model", None) or "" + return f"{operation_name} {request_model}".strip() + + def _get_llm_span_name(invocation: LLMInvocation) -> str: """Get the span name for an LLM invocation.""" - return f"{invocation.operation_name} {invocation.request_model}".strip() + return _get_span_name(invocation) + + +def _get_embedding_span_name(invocation: EmbeddingInvocation) -> str: + """Get the span name for an Embedding invocation.""" + return _get_span_name(invocation) + + +def _get_workflow_span_name(invocation: WorkflowInvocation) -> str: + """Get the span name for an Workflow invocation.""" + operation_name = getattr(invocation, "operation_name", None) or "" + name = getattr(invocation, "name", None) or "" + return f"{operation_name} {name}".strip() def _get_llm_messages_attributes_for_span( @@ -151,7 +194,7 @@ def _maybe_emit_llm_event( logger: Logger | None, span: Span, invocation: LLMInvocation, - error: Error | None = None, + error_type: str | None = None, ) -> None: """Emit a gen_ai.client.inference.operation.details event to the logger. @@ -179,8 +222,8 @@ def _maybe_emit_llm_event( ) # Add error.type if operation ended in error - if error is not None: - attributes[error_attributes.ERROR_TYPE] = error.type.__qualname__ + if error_type is not None: + attributes[error_attributes.ERROR_TYPE] = error_type # Create and emit the event context = set_span_in_context(span, get_current()) @@ -218,13 +261,31 @@ def _apply_llm_finish_attributes( span.set_attributes(attributes) -def _apply_error_attributes(span: Span, error: Error) -> None: +def _apply_embedding_finish_attributes( + span: Span, invocation: EmbeddingInvocation +) -> None: + """Apply attributes common to embedding finish() paths.""" + # Update span name + span.update_name(_get_embedding_span_name(invocation)) + + # Build all attributes by reusing the attribute getter functions + attributes: dict[str, Any] = {} + attributes.update(_get_embedding_common_attributes(invocation)) + attributes.update(_get_embedding_request_attributes(invocation)) + attributes.update(_get_embedding_response_attributes(invocation)) + + attributes.update(invocation.attributes) + + # Set all attributes on the span + if attributes: + span.set_attributes(attributes) + + +def _apply_error_attributes(span: Span, error: Error, error_type: str) -> None: """Apply status and error attributes common to error() paths.""" span.set_status(Status(StatusCode.ERROR, error.message)) if span.is_recording(): - span.set_attribute( - error_attributes.ERROR_TYPE, error.type.__qualname__ - ) + span.set_attribute(error_attributes.ERROR_TYPE, error_type) def _get_llm_request_attributes( @@ -244,6 +305,19 @@ def _get_llm_request_attributes( return {key: value for key, value in optional_attrs if value is not None} +def _get_embedding_request_attributes( + invocation: EmbeddingInvocation, +) -> dict[str, Any]: + """Get GenAI request semantic convention attributes.""" + optional_attrs = ( + (GenAI.GEN_AI_REQUEST_MODEL, invocation.request_model), + (GenAI.GEN_AI_EMBEDDINGS_DIMENSION_COUNT, invocation.dimension_count), + (GenAI.GEN_AI_REQUEST_ENCODING_FORMATS, invocation.encoding_formats), + ) + + return {key: value for key, value in optional_attrs if value is not None} + + def _get_llm_response_attributes( invocation: LLMInvocation, ) -> dict[str, Any]: @@ -279,6 +353,83 @@ def _get_llm_response_attributes( return {key: value for key, value in optional_attrs if value is not None} +def _apply_workflow_finish_attributes( + span: Span, invocation: WorkflowInvocation +) -> None: + """Apply attributes/messages common to finish() paths.""" + + # Build all attributes by reusing the attribute getter functions + attributes: dict[str, Any] = {} + attributes.update(_get_workflow_common_attributes(invocation)) + attributes.update( + _get_workflow_messages_attributes_for_span( + invocation.input_messages, + invocation.output_messages, + ) + ) + attributes.update(invocation.attributes) + + # Set all attributes on the span + if attributes: + span.set_attributes(attributes) + + +def _get_workflow_common_attributes( + invocation: WorkflowInvocation, +) -> dict[str, Any]: + """Get common Workflow attributes shared by finish() and error() paths. + + Returns a dictionary of attributes. + """ + return { + GenAI.GEN_AI_OPERATION_NAME: invocation.operation_name, + } + + +def _get_workflow_messages_attributes_for_span( + input_messages: list[InputMessage], + output_messages: list[OutputMessage], +) -> dict[str, Any]: + """Get message attributes formatted for span (JSON string format). + + Returns empty dict if not in experimental mode or content capturing is disabled. + """ + if not is_experimental_mode() or get_content_capturing_mode() not in ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ): + return {} + + optional_attrs = ( + ( + GenAI.GEN_AI_INPUT_MESSAGES, + gen_ai_json_dumps([asdict(m) for m in input_messages]) + if input_messages + else None, + ), + ( + GenAI.GEN_AI_OUTPUT_MESSAGES, + gen_ai_json_dumps([asdict(m) for m in output_messages]) + if output_messages + else None, + ), + ) + + return {key: value for key, value in optional_attrs if value is not None} + + +def _get_embedding_response_attributes( + invocation: EmbeddingInvocation, +) -> dict[str, Any]: + """Get GenAI response semantic convention attributes.""" + optional_attrs = ( + (GenAI.GEN_AI_RESPONSE_MODEL, invocation.response_model_name), + (GenAI.GEN_AI_USAGE_INPUT_TOKENS, invocation.input_tokens), + ) + + return {key: value for key, value in optional_attrs if value is not None} + + __all__ = [ "_apply_llm_finish_attributes", "_apply_error_attributes", @@ -287,4 +438,11 @@ def _get_llm_response_attributes( "_get_llm_response_attributes", "_get_llm_span_name", "_maybe_emit_llm_event", + "_get_workflow_common_attributes", + "_apply_workflow_finish_attributes", + "_apply_embedding_finish_attributes", + "_get_embedding_common_attributes", + "_get_embedding_request_attributes", + "_get_embedding_response_attributes", + "_get_embedding_span_name", ] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index fc5de2cdb0..5492b39f79 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -202,6 +202,13 @@ class GenAIInvocation: span: Span | None = None attributes: dict[str, Any] = field(default_factory=_new_str_any_dict) + monotonic_start_s: float | None = None + """ + Monotonic start time in seconds (from timeit.default_timer) used for + duration calculations to avoid mixing clock sources. This is populated + by the TelemetryHandler when starting an invocation. + """ + @dataclass class WorkflowInvocation(GenAIInvocation): @@ -228,9 +235,8 @@ class LLMInvocation(GenAIInvocation): set by the TelemetryHandler. """ - request_model: str | None = None - # Chat by default operation_name: str = GenAI.GenAiOperationNameValues.CHAT.value + request_model: str | None = None input_messages: list[InputMessage] = field( default_factory=_new_input_messages ) @@ -267,10 +273,44 @@ class LLMInvocation(GenAIInvocation): seed: int | None = None server_address: str | None = None server_port: int | None = None - # Monotonic start time in seconds (from timeit.default_timer) used - # for duration calculations to avoid mixing clock sources. This is - # populated by the TelemetryHandler when starting an invocation. - monotonic_start_s: float | None = None + + +@dataclass +class EmbeddingInvocation(GenAIInvocation): + """ + Represents a single embedding model invocation. When creating an + EmbeddingInvocation object, only update the data attributes. The span + and context_token attributes are set by the TelemetryHandler. + """ + + operation_name: str = GenAI.GenAiOperationNameValues.EMBEDDINGS.value + request_model: str | None = None + provider: str | None = None # e.g., azure.ai.openai, openai, aws.bedrock + server_address: str | None = None + server_port: int | None = None + + # encoding_formats can be multi-value -> combinational cardinality risk. + # Keep on spans/events only. + encoding_formats: list[str] | None = None + input_tokens: int | None = None + dimension_count: int | None = None + response_model_name: str | None = None + + attributes: dict[str, Any] = field(default_factory=_new_str_any_dict) + """ + Additional attributes to set on spans and/or events. These attributes + will not be set on metrics. + """ + + metric_attributes: dict[str, Any] = field( + default_factory=_new_str_any_dict + ) + """ + Additional attributes to set on metrics. Must be of a low cardinality. + These attributes will not be set on spans or events. + """ +>>>>>>> 195bcdb7a (GenAI Utils | Embedding Type and Span Creation (#4219)) + @dataclass diff --git a/util/opentelemetry-util-genai/tests/test_handler_workflow.py b/util/opentelemetry-util-genai/tests/test_handler_workflow.py new file mode 100644 index 0000000000..aee8bb735c --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_handler_workflow.py @@ -0,0 +1,299 @@ +from __future__ import annotations + +from unittest import TestCase +from unittest.mock import patch + +import pytest + +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.trace import SpanKind +from opentelemetry.trace.status import StatusCode +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + Error, + InputMessage, + OutputMessage, + Text, + WorkflowInvocation, +) + + +class _WorkflowTestBase(TestCase): + """Shared setUp for workflow handler tests.""" + + def setUp(self) -> None: + self.span_exporter = InMemorySpanExporter() + self.tracer_provider = TracerProvider() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + self.handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + ) + + def _get_finished_spans(self): + return self.span_exporter.get_finished_spans() + + +class TelemetryHandlerWorkflowTest(_WorkflowTestBase): + # ------------------------------------------------------------------ + # start_workflow + # ------------------------------------------------------------------ + + def test_start_workflow_creates_span(self) -> None: + invocation = WorkflowInvocation(name="my_workflow") + self.handler.start(invocation) + + self.assertIsNotNone(invocation.span) + self.assertIsNotNone(invocation.context_token) + self.assertIsNotNone(invocation.monotonic_start_s) + + # Clean up + self.handler.stop(invocation) + + def test_start_workflow_span_name(self) -> None: + invocation = WorkflowInvocation( + name="my_pipeline", operation_name="run_pipeline" + ) + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].name, "run_pipeline my_pipeline") + + def test_start_workflow_span_kind_is_internal(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].kind, SpanKind.INTERNAL) + + def test_start_workflow_records_monotonic_start(self) -> None: + invocation = WorkflowInvocation(name="wf") + with patch("timeit.default_timer", return_value=500.0): + self.handler.start(invocation) + self.assertEqual(invocation.monotonic_start_s, 500.0) + self.handler.stop(invocation) + + # ------------------------------------------------------------------ + # stop_workflow + # ------------------------------------------------------------------ + + def test_stop_workflow_ends_span(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + + def test_stop_workflow_sets_operation_name_attribute(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual( + spans[0].attributes[GenAI.GEN_AI_OPERATION_NAME], + "invoke_workflow", + ) + + def test_stop_workflow_sets_custom_attributes(self) -> None: + invocation = WorkflowInvocation(name="wf") + invocation.attributes["custom.key"] = "custom_value" + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual(spans[0].attributes["custom.key"], "custom_value") + + def test_stop_workflow_noop_when_not_started(self) -> None: + invocation = WorkflowInvocation(name="wf") + # Not started — span and context_token are None + result = self.handler.stop(invocation) + self.assertIs(result, invocation) + self.assertEqual(len(self._get_finished_spans()), 0) + + def test_stop_workflow_returns_invocation(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + result = self.handler.stop(invocation) + self.assertIs(result, invocation) + + # ------------------------------------------------------------------ + # fail_workflow + # ------------------------------------------------------------------ + + def test_fail_workflow_sets_error_status(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + error = Error(message="something broke", type=RuntimeError) + self.handler.fail(invocation, error) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].status.status_code, StatusCode.ERROR) + self.assertEqual(spans[0].status.description, "something broke") + + def test_fail_workflow_sets_error_type_attribute(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + error = Error(message="bad", type=ValueError) + self.handler.fail(invocation, error) + + spans = self._get_finished_spans() + self.assertEqual(spans[0].attributes["error.type"], "ValueError") + + def test_fail_workflow_sets_operation_name_attribute(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + error = Error(message="fail", type=TypeError) + self.handler.fail(invocation, error) + + spans = self._get_finished_spans() + self.assertEqual( + spans[0].attributes[GenAI.GEN_AI_OPERATION_NAME], + "invoke_workflow", + ) + + def test_fail_workflow_noop_when_not_started(self) -> None: + invocation = WorkflowInvocation(name="wf") + error = Error(message="fail", type=RuntimeError) + result = self.handler.fail(invocation, error) + self.assertIs(result, invocation) + self.assertEqual(len(self._get_finished_spans()), 0) + + def test_fail_workflow_returns_invocation(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + error = Error(message="err", type=RuntimeError) + result = self.handler.fail(invocation, error) + self.assertIs(result, invocation) + + +class TelemetryHandlerWorkflowContextManagerTest(_WorkflowTestBase): + # ------------------------------------------------------------------ + # workflow context manager + # ------------------------------------------------------------------ + + def test_workflow_context_manager_creates_and_ends_span(self) -> None: + invocation = WorkflowInvocation(name="ctx_wf") + with self.handler.workflow(invocation) as inv: + self.assertIsNotNone(inv.span) + self.assertIsNotNone(inv.context_token) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].name, "invoke_workflow ctx_wf") + + def test_workflow_context_manager_default_invocation(self) -> None: + with self.handler.workflow() as inv: + self.assertIsInstance(inv, WorkflowInvocation) + self.assertEqual(inv.name, "") + self.assertEqual(inv.operation_name, "invoke_workflow") + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + + def test_workflow_context_manager_sets_attributes_on_span(self) -> None: + invocation = WorkflowInvocation(name="wf") + with self.handler.workflow(invocation) as inv: + inv.attributes["my.attr"] = "hello" + + spans = self._get_finished_spans() + self.assertEqual(spans[0].attributes["my.attr"], "hello") + + def test_workflow_context_manager_reraises_exception(self) -> None: + invocation = WorkflowInvocation(name="wf") + with pytest.raises(ValueError, match="test error"): + with self.handler.workflow(invocation): + raise ValueError("test error") + + def test_workflow_context_manager_marks_error_on_exception(self) -> None: + invocation = WorkflowInvocation(name="wf") + with pytest.raises(ValueError): + with self.handler.workflow(invocation): + raise ValueError("boom") + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].status.status_code, StatusCode.ERROR) + self.assertEqual(spans[0].status.description, "boom") + self.assertEqual(spans[0].attributes["error.type"], "ValueError") + + def test_workflow_context_manager_success_has_unset_status(self) -> None: + invocation = WorkflowInvocation(name="wf") + with self.handler.workflow(invocation): + pass + + spans = self._get_finished_spans() + self.assertEqual(spans[0].status.status_code, StatusCode.UNSET) + + def test_workflow_context_manager_with_messages(self) -> None: + inp = InputMessage(role="user", parts=[Text(content="hello")]) + out = OutputMessage( + role="assistant", parts=[Text(content="hi")], finish_reason="stop" + ) + invocation = WorkflowInvocation( + name="msg_wf", + input_messages=[inp], + output_messages=[out], + ) + with self.handler.workflow(invocation): + pass + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + spans[0].attributes[GenAI.GEN_AI_OPERATION_NAME], + "invoke_workflow", + ) + + def test_workflow_context_manager_swallows_start_failure(self) -> None: + """workflow() should yield even if start_workflow raises.""" + invocation = WorkflowInvocation(name="wf") + with patch.object( + self.handler, + "start_workflow", + side_effect=RuntimeError("start failed"), + ): + # Should not raise — the exception is swallowed with a warning + with self.handler.workflow(invocation) as inv: + self.assertIs(inv, invocation) + + def test_workflow_context_manager_swallows_stop_failure(self) -> None: + """workflow() should not raise if stop_workflow fails.""" + invocation = WorkflowInvocation(name="wf") + with patch.object( + self.handler, + "stop_workflow", + side_effect=RuntimeError("stop failed"), + ): + # Should not raise + with self.handler.workflow(invocation): + pass + + def test_workflow_context_manager_swallows_fail_workflow_failure( + self, + ) -> None: + """workflow() should still re-raise the original exception even if + fail_workflow itself raises.""" + invocation = WorkflowInvocation(name="wf") + with patch.object( + self.handler, + "fail_workflow", + side_effect=RuntimeError("fail_workflow broke"), + ): + with pytest.raises(ValueError, match="original"): + with self.handler.workflow(invocation): + raise ValueError("original") diff --git a/util/opentelemetry-util-genai/tests/test_utils.py b/util/opentelemetry-util-genai/tests/test_utils.py index d229d4b4b6..ee0e63d852 100644 --- a/util/opentelemetry-util-genai/tests/test_utils.py +++ b/util/opentelemetry-util-genai/tests/test_utils.py @@ -25,9 +25,7 @@ ) from opentelemetry.sdk._logs import LoggerProvider from opentelemetry.sdk._logs.export import ( - InMemoryLogExporter as InMemoryLogRecordExporter, -) -from opentelemetry.sdk._logs.export import ( + InMemoryLogExporter, SimpleLogRecordProcessor, ) from opentelemetry.sdk.trace import ReadableSpan, TracerProvider @@ -51,7 +49,7 @@ from opentelemetry.util.genai.handler import get_telemetry_handler from opentelemetry.util.genai.types import ( ContentCapturingMode, - Error, + EmbeddingInvocation, InputMessage, LLMInvocation, MessagePart, @@ -224,7 +222,7 @@ def setUp(self): tracer_provider.add_span_processor( SimpleSpanProcessor(self.span_exporter) ) - self.log_exporter = InMemoryLogRecordExporter() + self.log_exporter = InMemoryLogExporter() logger_provider = LoggerProvider() logger_provider.add_log_record_processor( SimpleLogRecordProcessor(self.log_exporter) @@ -523,6 +521,82 @@ def test_parent_child_span_relationship(self): # Parent should not have a parent (root) assert parent_span.parent is None + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="SPAN_ONLY", + emit_event="", + ) + def test_embedding_parent_child_span_relationship(self): + parent_invocation = EmbeddingInvocation( + request_model="embed-parent-model", + provider="test-provider", + input_tokens=10, + ) + child_invocation = EmbeddingInvocation( + request_model="embed-child-model", + provider="test-provider", + input_tokens=5, + ) + + self.telemetry_handler.start(parent_invocation) + assert parent_invocation.span is not None + self.telemetry_handler.start(child_invocation) + assert child_invocation.span is not None + self.telemetry_handler.stop(child_invocation) + self.telemetry_handler.stop(parent_invocation) + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 2 + child_span = next( + s for s in spans if s.name == "embeddings embed-child-model" + ) + parent_span = next( + s for s in spans if s.name == "embeddings embed-parent-model" + ) + + assert child_span.context.trace_id == parent_span.context.trace_id + assert child_span.parent is not None + assert child_span.parent.span_id == parent_span.context.span_id + assert parent_span.parent is None + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="SPAN_ONLY", + emit_event="", + ) + def test_llm_parent_embedding_child_span_relationship(self): + message = _create_input_message("hi") + chat_generation = _create_output_message("ok") + child_invocation = EmbeddingInvocation( + request_model="embed-child-model", + provider="test-provider", + input_tokens=3, + ) + + with self.telemetry_handler.llm() as parent_invocation: + for attr, value in { + "request_model": "parent-model", + "input_messages": [message], + "provider": "test-provider", + }.items(): + setattr(parent_invocation, attr, value) + self.telemetry_handler.start(child_invocation) + assert child_invocation.span is not None + self.telemetry_handler.stop(child_invocation) + parent_invocation.output_messages = [chat_generation] + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 2 + child_span = next( + s for s in spans if s.name == "embeddings embed-child-model" + ) + parent_span = next(s for s in spans if s.name == "chat parent-model") + + assert child_span.context.trace_id == parent_span.context.trace_id + assert child_span.parent is not None + assert child_span.parent.span_id == parent_span.context.span_id + assert parent_span.parent is None + def test_llm_context_manager_error_path_records_error_status_and_attrs( self, ): @@ -571,318 +645,91 @@ class BoomError(RuntimeError): }, ) - @patch_env_vars( - stability_mode="gen_ai_latest_experimental", - content_capturing="EVENT_ONLY", - emit_event="true", - ) - def test_emits_llm_event(self): - invocation = LLMInvocation( - request_model="event-model", - input_messages=[_create_input_message("test query")], - system_instruction=_create_system_instruction(), - provider="test-provider", - temperature=0.7, - max_tokens=100, - response_model_name="response-model", - response_id="event-response-id", - input_tokens=10, - output_tokens=20, - ) - - self.telemetry_handler.start_llm(invocation) - invocation.output_messages = [_create_output_message("test response")] - self.telemetry_handler.stop_llm(invocation) - - # Check that event was emitted - logs = self.log_exporter.get_finished_logs() - self.assertEqual(len(logs), 1) - log_record = logs[0].log_record - - # Verify event name - self.assertEqual( - log_record.event_name, "gen_ai.client.inference.operation.details" - ) - - # Verify event attributes - attrs = log_record.attributes - self.assertIsNotNone(attrs) - self.assertEqual(attrs[GenAI.GEN_AI_OPERATION_NAME], "chat") - self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_MODEL], "event-model") - self.assertEqual(attrs[GenAI.GEN_AI_PROVIDER_NAME], "test-provider") - self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_TEMPERATURE], 0.7) - self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_MAX_TOKENS], 100) - self.assertEqual(attrs[GenAI.GEN_AI_RESPONSE_MODEL], "response-model") - self.assertEqual(attrs[GenAI.GEN_AI_RESPONSE_ID], "event-response-id") - self.assertEqual(attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS], 10) - self.assertEqual(attrs[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS], 20) - - # Verify messages are in structured format (not JSON string) - # OpenTelemetry may convert lists to tuples, so we normalize - input_msg = _normalize_to_dict( - _normalize_to_list(attrs[GenAI.GEN_AI_INPUT_MESSAGES])[0] - ) - self.assertEqual(input_msg["role"], "Human") - self.assertEqual( - _normalize_to_list(input_msg["parts"])[0]["content"], "test query" - ) - - output_msg = _normalize_to_dict( - _normalize_to_list(attrs[GenAI.GEN_AI_OUTPUT_MESSAGES])[0] - ) - self.assertEqual(output_msg["role"], "AI") - self.assertEqual( - _normalize_to_list(output_msg["parts"])[0]["content"], - "test response", - ) - self.assertEqual(output_msg["finish_reason"], "stop") - - # Verify system instruction is present in event in structured format - sys_instr = _normalize_to_dict( - _normalize_to_list(attrs[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS])[0] - ) - self.assertEqual(sys_instr["content"], "You are a helpful assistant.") - self.assertEqual(sys_instr["type"], "text") - - # Verify event context matches span context - span = _get_single_span(self.span_exporter) - self.assertIsNotNone(log_record.trace_id) - self.assertIsNotNone(log_record.span_id) - self.assertIsNotNone(span.context) - self.assertEqual(log_record.trace_id, span.context.trace_id) - self.assertEqual(log_record.span_id, span.context.span_id) - - @patch_env_vars( - stability_mode="gen_ai_latest_experimental", - content_capturing="SPAN_AND_EVENT", - emit_event="true", - ) - def test_emits_llm_event_and_span(self): - message = _create_input_message("combined test") - chat_generation = _create_output_message("combined response") - system_instruction = _create_system_instruction("System prompt here") - - invocation = LLMInvocation( - request_model="combined-model", - input_messages=[message], - system_instruction=system_instruction, - provider="test-provider", - ) - - self.telemetry_handler.start_llm(invocation) - invocation.output_messages = [chat_generation] - self.telemetry_handler.stop_llm(invocation) - - # Check span was created - span = _get_single_span(self.span_exporter) - span_attrs = _get_span_attributes(span) - self.assertIn(GenAI.GEN_AI_INPUT_MESSAGES, span_attrs) - - # Check event was emitted - logs = self.log_exporter.get_finished_logs() - self.assertEqual(len(logs), 1) - log_record = logs[0].log_record - self.assertEqual( - log_record.event_name, "gen_ai.client.inference.operation.details" - ) - self.assertIn(GenAI.GEN_AI_INPUT_MESSAGES, log_record.attributes) - # Verify system instruction in both span and event - self.assertIn(GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, span_attrs) - span_system = json.loads(span_attrs[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS]) - self.assertEqual(span_system[0]["content"], "System prompt here") - event_attrs = log_record.attributes - self.assertIn(GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, event_attrs) - event_system = event_attrs[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS] - event_system_list = ( - list(event_system) - if isinstance(event_system, tuple) - else event_system - ) - event_sys_instr = ( - dict(event_system_list[0]) - if isinstance(event_system_list[0], tuple) - else event_system_list[0] - ) - self.assertEqual(event_sys_instr["content"], "System prompt here") - # Verify event context matches span context - span = _get_single_span(self.span_exporter) - self.assertIsNotNone(log_record.trace_id) - self.assertIsNotNone(log_record.span_id) - self.assertIsNotNone(span.context) - self.assertEqual(log_record.trace_id, span.context.trace_id) - self.assertEqual(log_record.span_id, span.context.span_id) - - @patch_env_vars( - stability_mode="gen_ai_latest_experimental", - content_capturing="EVENT_ONLY", - emit_event="true", - ) - def test_emits_llm_event_with_error(self): - class TestError(RuntimeError): + def test_embedding_context_manager_error_path_records_error_status_and_attrs( + self, + ): + class BoomError(RuntimeError): pass - message = _create_input_message("error test") - invocation = LLMInvocation( - request_model="error-model", - input_messages=[message], + invocation = EmbeddingInvocation( + request_model="embed-model", provider="test-provider", + dimension_count=1536, + input_tokens=7, + server_address="embed.example.com", + server_port=443, + attributes={"custom_embed_attr": "value"}, ) - self.telemetry_handler.start_llm(invocation) - error = Error(message="Test error occurred", type=TestError) - self.telemetry_handler.fail_llm(invocation, error) - - # Check event was emitted - logs = self.log_exporter.get_finished_logs() - self.assertEqual(len(logs), 1) - log_record = logs[0].log_record - attrs = log_record.attributes + with self.assertRaises(BoomError): + with self.telemetry_handler.embedding(invocation): + invocation.response_model_name = "embed-response-model" + raise BoomError("embedding boom") - # Verify error attribute is present - self.assertEqual( - attrs[error_attributes.ERROR_TYPE], TestError.__qualname__ - ) - self.assertEqual(attrs[GenAI.GEN_AI_OPERATION_NAME], "chat") - self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_MODEL], "error-model") - # Verify event context matches span context span = _get_single_span(self.span_exporter) - self.assertIsNotNone(log_record.trace_id) - self.assertIsNotNone(log_record.span_id) - self.assertIsNotNone(span.context) - self.assertEqual(log_record.trace_id, span.context.trace_id) - self.assertEqual(log_record.span_id, span.context.span_id) - - @patch_env_vars( - stability_mode="gen_ai_latest_experimental", - content_capturing="EVENT_ONLY", - emit_event="false", - ) - def test_does_not_emit_llm_event_when_emit_event_false(self): - message = _create_input_message("emit false test") - chat_generation = _create_output_message("emit false response") - - invocation = LLMInvocation( - request_model="emit-false-model", - input_messages=[message], - provider="test-provider", - ) - - self.telemetry_handler.start_llm(invocation) - invocation.output_messages = [chat_generation] - self.telemetry_handler.stop_llm(invocation) - - # Check no event was emitted - logs = self.log_exporter.get_finished_logs() - self.assertEqual(len(logs), 0) - - @patch_env_vars( - stability_mode="gen_ai_latest_experimental", - content_capturing="NO_CONTENT", - emit_event="", - ) - def test_does_not_emit_llm_event_by_default_for_no_content(self): - """Test that event is not emitted by default when content_capturing is NO_CONTENT and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" - invocation = LLMInvocation( - request_model="default-model", - input_messages=[_create_input_message("default test")], - provider="test-provider", + assert span.status.status_code == StatusCode.ERROR + _assert_span_time_order(span) + span_attrs = _get_span_attributes(span) + _assert_span_attributes( + span_attrs, + { + GenAI.GEN_AI_OPERATION_NAME: "embeddings", + GenAI.GEN_AI_REQUEST_MODEL: "embed-model", + GenAI.GEN_AI_PROVIDER_NAME: "test-provider", + GenAI.GEN_AI_EMBEDDINGS_DIMENSION_COUNT: 1536, + GenAI.GEN_AI_USAGE_INPUT_TOKENS: 7, + GenAI.GEN_AI_RESPONSE_MODEL: "embed-response-model", + server_attributes.SERVER_ADDRESS: "embed.example.com", + server_attributes.SERVER_PORT: 443, + "custom_embed_attr": "value", + error_attributes.ERROR_TYPE: BoomError.__qualname__, + }, ) - self.telemetry_handler.start_llm(invocation) - invocation.output_messages = [ - _create_output_message("default response") - ] - self.telemetry_handler.stop_llm(invocation) - - # Check that no event was emitted (NO_CONTENT defaults to False) - logs = self.log_exporter.get_finished_logs() - self.assertEqual(len(logs), 0) - @patch_env_vars( stability_mode="gen_ai_latest_experimental", content_capturing="SPAN_ONLY", emit_event="", ) - def test_does_not_emit_llm_event_by_default_for_span_only(self): - """Test that event is not emitted by default when content_capturing is SPAN_ONLY and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" - invocation = LLMInvocation( - request_model="default-model", - input_messages=[_create_input_message("default test")], - provider="test-provider", - ) - - self.telemetry_handler.start_llm(invocation) - invocation.output_messages = [ - _create_output_message("default response") - ] - self.telemetry_handler.stop_llm(invocation) - - # Check that no event was emitted (SPAN_ONLY defaults to False) - logs = self.log_exporter.get_finished_logs() - self.assertEqual(len(logs), 0) - - @patch_env_vars( - stability_mode="gen_ai_latest_experimental", - content_capturing="EVENT_ONLY", - emit_event="", - ) - def test_emits_llm_event_by_default_for_event_only(self): - """Test that event is emitted by default when content_capturing is EVENT_ONLY and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" - invocation = LLMInvocation( - request_model="default-model", - input_messages=[_create_input_message("default test")], - provider="test-provider", - ) - - self.telemetry_handler.start_llm(invocation) - invocation.output_messages = [ - _create_output_message("default response") - ] - self.telemetry_handler.stop_llm(invocation) - - # Check that event was emitted (EVENT_ONLY defaults to True) - logs = self.log_exporter.get_finished_logs() - self.assertEqual(len(logs), 1) - log_record = logs[0].log_record - self.assertEqual( - log_record.event_name, "gen_ai.client.inference.operation.details" - ) - - @patch_env_vars( - stability_mode="gen_ai_latest_experimental", - content_capturing="SPAN_AND_EVENT", - emit_event="", - ) - def test_emits_llm_event_by_default_for_span_and_event(self): - """Test that event is emitted by default when content_capturing is SPAN_AND_EVENT and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" - message = _create_input_message("span and event test") - chat_generation = _create_output_message("span and event response") - system_instruction = _create_system_instruction("System prompt") - - invocation = LLMInvocation( - request_model="span-and-event-model", - input_messages=[message], - system_instruction=system_instruction, + def test_embedding_manual_start_and_stop_creates_span(self): + invocation = EmbeddingInvocation( + request_model="embed-model", provider="test-provider", + dimension_count=1536, + encoding_formats=["float"], + input_tokens=123, + server_address="custom.server.com", + server_port=42, + attributes={"custom_embed_attr": "value"}, ) - self.telemetry_handler.start_llm(invocation) - invocation.output_messages = [chat_generation] - self.telemetry_handler.stop_llm(invocation) + self.telemetry_handler.start(invocation) + assert invocation.span is not None + invocation.attributes.update({"extra_embed": "info"}) + invocation.metric_attributes = {"should not be on span": "value"} + self.telemetry_handler.stop(invocation) - # Check span was created span = _get_single_span(self.span_exporter) - span_attrs = _get_span_attributes(span) - self.assertIn(GenAI.GEN_AI_INPUT_MESSAGES, span_attrs) + self.assertEqual(span.name, "embeddings embed-model") + self.assertEqual(span.kind, trace.SpanKind.CLIENT) + _assert_span_time_order(span) - # Check that event was emitted (SPAN_AND_EVENT defaults to True) - logs = self.log_exporter.get_finished_logs() - self.assertEqual(len(logs), 1) - log_record = logs[0].log_record - self.assertEqual( - log_record.event_name, "gen_ai.client.inference.operation.details" + attrs = _get_span_attributes(span) + _assert_span_attributes( + attrs, + { + GenAI.GEN_AI_OPERATION_NAME: "embeddings", + GenAI.GEN_AI_REQUEST_MODEL: "embed-model", + GenAI.GEN_AI_PROVIDER_NAME: "test-provider", + GenAI.GEN_AI_EMBEDDINGS_DIMENSION_COUNT: 1536, + GenAI.GEN_AI_REQUEST_ENCODING_FORMATS: ("float",), + GenAI.GEN_AI_USAGE_INPUT_TOKENS: 123, + server_attributes.SERVER_ADDRESS: "custom.server.com", + server_attributes.SERVER_PORT: 42, + "custom_embed_attr": "value", + "extra_embed": "info", + }, ) - self.assertIn(GenAI.GEN_AI_INPUT_MESSAGES, log_record.attributes) class AnyNonNone: diff --git a/util/opentelemetry-util-genai/tests/test_utils_events.py b/util/opentelemetry-util-genai/tests/test_utils_events.py new file mode 100644 index 0000000000..20b3300c62 --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_utils_events.py @@ -0,0 +1,380 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import unittest + +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import ( + InMemoryLogExporter, + SimpleLogRecordProcessor, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.attributes import error_attributes +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.util.genai.types import Error, LLMInvocation + +from .test_utils import ( + _create_input_message, + _create_output_message, + _create_system_instruction, + _get_single_span, + _get_span_attributes, + _normalize_to_dict, + _normalize_to_list, + patch_env_vars, +) + + +class TestTelemetryHandlerEvents(unittest.TestCase): + def setUp(self): + self.span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + self.log_exporter = InMemoryLogExporter() + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor( + SimpleLogRecordProcessor(self.log_exporter) + ) + self.telemetry_handler = get_telemetry_handler( + tracer_provider=tracer_provider, logger_provider=logger_provider + ) + + def tearDown(self): + self.span_exporter.clear() + self.log_exporter.clear() + if hasattr(get_telemetry_handler, "_default_handler"): + delattr(get_telemetry_handler, "_default_handler") + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="true", + ) + def test_emits_llm_event(self): + invocation = LLMInvocation( + request_model="event-model", + input_messages=[_create_input_message("test query")], + system_instruction=_create_system_instruction(), + provider="test-provider", + temperature=0.7, + max_tokens=100, + response_model_name="response-model", + response_id="event-response-id", + input_tokens=10, + output_tokens=20, + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [_create_output_message("test response")] + self.telemetry_handler.stop_llm(invocation) + + # Check that event was emitted + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 1) + log_record = logs[0].log_record + + # Verify event name + self.assertEqual( + log_record.event_name, "gen_ai.client.inference.operation.details" + ) + + # Verify event attributes + attrs = log_record.attributes + self.assertIsNotNone(attrs) + self.assertEqual(attrs[GenAI.GEN_AI_OPERATION_NAME], "chat") + self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_MODEL], "event-model") + self.assertEqual(attrs[GenAI.GEN_AI_PROVIDER_NAME], "test-provider") + self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_TEMPERATURE], 0.7) + self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_MAX_TOKENS], 100) + self.assertEqual(attrs[GenAI.GEN_AI_RESPONSE_MODEL], "response-model") + self.assertEqual(attrs[GenAI.GEN_AI_RESPONSE_ID], "event-response-id") + self.assertEqual(attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS], 10) + self.assertEqual(attrs[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS], 20) + + # Verify messages are in structured format (not JSON string) + # OpenTelemetry may convert lists to tuples, so we normalize + input_msg = _normalize_to_dict( + _normalize_to_list(attrs[GenAI.GEN_AI_INPUT_MESSAGES])[0] + ) + self.assertEqual(input_msg["role"], "Human") + self.assertEqual( + _normalize_to_list(input_msg["parts"])[0]["content"], "test query" + ) + + output_msg = _normalize_to_dict( + _normalize_to_list(attrs[GenAI.GEN_AI_OUTPUT_MESSAGES])[0] + ) + self.assertEqual(output_msg["role"], "AI") + self.assertEqual( + _normalize_to_list(output_msg["parts"])[0]["content"], + "test response", + ) + self.assertEqual(output_msg["finish_reason"], "stop") + + # Verify system instruction is present in event in structured format + sys_instr = _normalize_to_dict( + _normalize_to_list(attrs[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS])[0] + ) + self.assertEqual(sys_instr["content"], "You are a helpful assistant.") + self.assertEqual(sys_instr["type"], "text") + + # Verify event context matches span context + span = _get_single_span(self.span_exporter) + self.assertIsNotNone(log_record.trace_id) + self.assertIsNotNone(log_record.span_id) + self.assertIsNotNone(span.context) + self.assertEqual(log_record.trace_id, span.context.trace_id) + self.assertEqual(log_record.span_id, span.context.span_id) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="SPAN_AND_EVENT", + emit_event="true", + ) + def test_emits_llm_event_and_span(self): + message = _create_input_message("combined test") + chat_generation = _create_output_message("combined response") + system_instruction = _create_system_instruction("System prompt here") + + invocation = LLMInvocation( + request_model="combined-model", + input_messages=[message], + system_instruction=system_instruction, + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [chat_generation] + self.telemetry_handler.stop_llm(invocation) + + # Check span was created + span = _get_single_span(self.span_exporter) + span_attrs = _get_span_attributes(span) + self.assertIn(GenAI.GEN_AI_INPUT_MESSAGES, span_attrs) + + # Check event was emitted + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 1) + log_record = logs[0].log_record + self.assertEqual( + log_record.event_name, "gen_ai.client.inference.operation.details" + ) + self.assertIn(GenAI.GEN_AI_INPUT_MESSAGES, log_record.attributes) + # Verify system instruction in both span and event + self.assertIn(GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, span_attrs) + span_system = json.loads(span_attrs[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS]) + self.assertEqual(span_system[0]["content"], "System prompt here") + event_attrs = log_record.attributes + self.assertIn(GenAI.GEN_AI_SYSTEM_INSTRUCTIONS, event_attrs) + event_system = event_attrs[GenAI.GEN_AI_SYSTEM_INSTRUCTIONS] + event_system_list = ( + list(event_system) + if isinstance(event_system, tuple) + else event_system + ) + event_sys_instr = ( + dict(event_system_list[0]) + if isinstance(event_system_list[0], tuple) + else event_system_list[0] + ) + self.assertEqual(event_sys_instr["content"], "System prompt here") + # Verify event context matches span context + span = _get_single_span(self.span_exporter) + self.assertIsNotNone(log_record.trace_id) + self.assertIsNotNone(log_record.span_id) + self.assertIsNotNone(span.context) + self.assertEqual(log_record.trace_id, span.context.trace_id) + self.assertEqual(log_record.span_id, span.context.span_id) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="true", + ) + def test_emits_llm_event_with_error(self): + class TestError(RuntimeError): + pass + + message = _create_input_message("error test") + invocation = LLMInvocation( + request_model="error-model", + input_messages=[message], + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + error = Error(message="Test error occurred", type=TestError) + self.telemetry_handler.fail_llm(invocation, error) + + # Check event was emitted + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 1) + log_record = logs[0].log_record + attrs = log_record.attributes + + # Verify error attribute is present + self.assertEqual( + attrs[error_attributes.ERROR_TYPE], TestError.__qualname__ + ) + self.assertEqual(attrs[GenAI.GEN_AI_OPERATION_NAME], "chat") + self.assertEqual(attrs[GenAI.GEN_AI_REQUEST_MODEL], "error-model") + # Verify event context matches span context + span = _get_single_span(self.span_exporter) + self.assertIsNotNone(log_record.trace_id) + self.assertIsNotNone(log_record.span_id) + self.assertIsNotNone(span.context) + self.assertEqual(log_record.trace_id, span.context.trace_id) + self.assertEqual(log_record.span_id, span.context.span_id) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="false", + ) + def test_does_not_emit_llm_event_when_emit_event_false(self): + message = _create_input_message("emit false test") + chat_generation = _create_output_message("emit false response") + + invocation = LLMInvocation( + request_model="emit-false-model", + input_messages=[message], + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [chat_generation] + self.telemetry_handler.stop_llm(invocation) + + # Check no event was emitted + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 0) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="NO_CONTENT", + emit_event="", + ) + def test_does_not_emit_llm_event_by_default_for_no_content(self): + """Test that event is not emitted by default when content_capturing is NO_CONTENT and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" + invocation = LLMInvocation( + request_model="default-model", + input_messages=[_create_input_message("default test")], + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [ + _create_output_message("default response") + ] + self.telemetry_handler.stop_llm(invocation) + + # Check that no event was emitted (NO_CONTENT defaults to False) + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 0) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="SPAN_ONLY", + emit_event="", + ) + def test_does_not_emit_llm_event_by_default_for_span_only(self): + """Test that event is not emitted by default when content_capturing is SPAN_ONLY and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" + invocation = LLMInvocation( + request_model="default-model", + input_messages=[_create_input_message("default test")], + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [ + _create_output_message("default response") + ] + self.telemetry_handler.stop_llm(invocation) + + # Check that no event was emitted (SPAN_ONLY defaults to False) + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 0) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="EVENT_ONLY", + emit_event="", + ) + def test_emits_llm_event_by_default_for_event_only(self): + """Test that event is emitted by default when content_capturing is EVENT_ONLY and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" + invocation = LLMInvocation( + request_model="default-model", + input_messages=[_create_input_message("default test")], + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [ + _create_output_message("default response") + ] + self.telemetry_handler.stop_llm(invocation) + + # Check that event was emitted (EVENT_ONLY defaults to True) + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 1) + log_record = logs[0].log_record + self.assertEqual( + log_record.event_name, "gen_ai.client.inference.operation.details" + ) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="SPAN_AND_EVENT", + emit_event="", + ) + def test_emits_llm_event_by_default_for_span_and_event(self): + """Test that event is emitted by default when content_capturing is SPAN_AND_EVENT and OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT is not set.""" + message = _create_input_message("span and event test") + chat_generation = _create_output_message("span and event response") + system_instruction = _create_system_instruction("System prompt") + + invocation = LLMInvocation( + request_model="span-and-event-model", + input_messages=[message], + system_instruction=system_instruction, + provider="test-provider", + ) + + self.telemetry_handler.start_llm(invocation) + invocation.output_messages = [chat_generation] + self.telemetry_handler.stop_llm(invocation) + + # Check span was created + span = _get_single_span(self.span_exporter) + span_attrs = _get_span_attributes(span) + self.assertIn(GenAI.GEN_AI_INPUT_MESSAGES, span_attrs) + + # Check that event was emitted (SPAN_AND_EVENT defaults to True) + logs = self.log_exporter.get_finished_logs() + self.assertEqual(len(logs), 1) + log_record = logs[0].log_record + self.assertEqual( + log_record.event_name, "gen_ai.client.inference.operation.details" + ) + self.assertIn(GenAI.GEN_AI_INPUT_MESSAGES, log_record.attributes)