From 406cb9a31997f100a3d1da899db98282a2ec5214 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Thu, 26 Mar 2026 13:47:56 -0700 Subject: [PATCH 1/8] Add support for workflow invocation in genAI utils handler --- .../src/opentelemetry/util/genai/handler.py | 76 ++++- .../opentelemetry/util/genai/span_utils.py | 75 +++++ .../tests/test_handler_workflow.py | 299 ++++++++++++++++++ 3 files changed, 449 insertions(+), 1 deletion(-) create mode 100644 util/opentelemetry-util-genai/tests/test_handler_workflow.py diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 80b801e9a1..12cca8f5a3 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -60,6 +60,7 @@ from __future__ import annotations +import logging import timeit from contextlib import contextmanager from typing import Iterator, TypeVar @@ -83,8 +84,10 @@ _apply_embedding_finish_attributes, _apply_error_attributes, _apply_llm_finish_attributes, + _apply_workflow_finish_attributes, _get_embedding_span_name, _get_llm_span_name, + _get_workflow_span_name, _maybe_emit_llm_event, ) from opentelemetry.util.genai.types import ( @@ -92,9 +95,27 @@ Error, GenAIInvocation, LLMInvocation, + WorkflowInvocation, ) from opentelemetry.util.genai.version import __version__ +_logger = logging.getLogger(__name__) + + +def _safe_detach(invocation: GenAIInvocation) -> None: + """Detach the context token if still present, as a safety net.""" + if invocation.context_token is not None: + try: + otel_context.detach(invocation.context_token) + except Exception: # pylint: disable=broad-except + pass + if invocation.span is not None: + try: + invocation.span.end() + except Exception: # pylint: disable=broad-except + pass + + _T = TypeVar("_T", bound=GenAIInvocation) @@ -160,13 +181,19 @@ def _start(self, invocation: _T) -> _T: """Start a GenAI invocation and create a pending span entry.""" if isinstance(invocation, LLMInvocation): span_name = _get_llm_span_name(invocation) + kind = SpanKind.CLIENT elif isinstance(invocation, EmbeddingInvocation): span_name = _get_embedding_span_name(invocation) + kind = SpanKind.CLIENT + elif isinstance(invocation, WorkflowInvocation): + span_name = _get_workflow_span_name(invocation) + kind = SpanKind.CLIENT else: span_name = "" + kind = "" span = self._tracer.start_span( name=span_name, - kind=SpanKind.CLIENT, + kind=kind, ) # Record a monotonic start timestamp (seconds) for duration # calculation using timeit.default_timer. @@ -192,6 +219,9 @@ def _stop(self, invocation: _T) -> _T: elif isinstance(invocation, EmbeddingInvocation): _apply_embedding_finish_attributes(span, invocation) self._record_embedding_metrics(invocation, span) + elif isinstance(invocation, WorkflowInvocation): + _apply_workflow_finish_attributes(span, invocation) + # TODO: Add workflow metrics when supported finally: # Detach context and end span even if finishing fails otel_context.detach(invocation.context_token) @@ -222,6 +252,10 @@ def _fail(self, invocation: _T, error: Error) -> _T: self._record_embedding_metrics( invocation, span, error_type=error_type ) + elif isinstance(invocation, WorkflowInvocation): + _apply_workflow_finish_attributes(span, invocation) + _apply_error_attributes(span, error, error_type) + # TODO: Add workflow metrics when supported finally: # Detach context and end span even if finishing fails otel_context.detach(invocation.context_token) @@ -304,6 +338,46 @@ def embedding( raise self.stop(invocation) + @contextmanager + def workflow( + self, invocation: WorkflowInvocation | None = None + ) -> Iterator[WorkflowInvocation]: + """Context manager for Workflow invocations. + + Only set data attributes on the invocation object, do not modify the span or context. + + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. + If an exception occurs inside the context, marks the span as error, ends it, and + re-raises the original exception. + """ + if invocation is None: + invocation = WorkflowInvocation() + + try: + self.start(invocation) + except Exception: # pylint: disable=broad-except + _logger.warning( + "Failed to start workflow telemetry", exc_info=True + ) + + try: + yield invocation + except Exception as exc: + try: + self.fail(invocation, Error(message=str(exc), type=type(exc))) + except Exception: # pylint: disable=broad-except + _logger.warning( + "Failed to record workflow failure", exc_info=True + ) + _safe_detach(invocation) + raise + + try: + self.stop(invocation) + except Exception: # pylint: disable=broad-except + _logger.warning("Failed to stop workflow telemetry", exc_info=True) + _safe_detach(invocation) + def get_telemetry_handler( tracer_provider: TracerProvider | None = None, diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py index ac099e3dae..c61f49d425 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py @@ -39,6 +39,7 @@ LLMInvocation, MessagePart, OutputMessage, + WorkflowInvocation, ) from opentelemetry.util.genai.utils import ( ContentCapturingMode, @@ -108,6 +109,13 @@ def _get_embedding_span_name(invocation: EmbeddingInvocation) -> str: return _get_span_name(invocation) +def _get_workflow_span_name(invocation: WorkflowInvocation) -> str: + """Get the span name for an Workflow invocation.""" + operation_name = getattr(invocation, "operation_name", None) or "" + name = getattr(invocation, "name", None) or "" + return f"{operation_name} {name}".strip() + + def _get_llm_messages_attributes_for_span( input_messages: list[InputMessage], output_messages: list[OutputMessage], @@ -345,6 +353,71 @@ def _get_llm_response_attributes( return {key: value for key, value in optional_attrs if value is not None} +def _apply_workflow_finish_attributes( + span: Span, invocation: WorkflowInvocation +) -> None: + """Apply attributes/messages common to finish() paths.""" + + # Build all attributes by reusing the attribute getter functions + attributes: dict[str, Any] = {} + attributes.update(_get_workflow_common_attributes(invocation)) + attributes.update( + _get_workflow_messages_attributes_for_span( + invocation.input_messages, + invocation.output_messages, + ) + ) + attributes.update(invocation.attributes) + + # Set all attributes on the span + if attributes: + span.set_attributes(attributes) + + +def _get_workflow_common_attributes( + invocation: WorkflowInvocation, +) -> dict[str, Any]: + """Get common Workflow attributes shared by finish() and error() paths. + + Returns a dictionary of attributes. + """ + return { + GenAI.GEN_AI_OPERATION_NAME: invocation.operation_name, + } + + +def _get_workflow_messages_attributes_for_span( + input_messages: list[InputMessage], + output_messages: list[OutputMessage], +) -> dict[str, Any]: + """Get message attributes formatted for span (JSON string format). + + Returns empty dict if not in experimental mode or content capturing is disabled. + """ + if not is_experimental_mode() or get_content_capturing_mode() not in ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ): + return {} + + optional_attrs = ( + ( + GenAI.GEN_AI_INPUT_MESSAGES, + gen_ai_json_dumps([asdict(m) for m in input_messages]) + if input_messages + else None, + ), + ( + GenAI.GEN_AI_OUTPUT_MESSAGES, + gen_ai_json_dumps([asdict(m) for m in output_messages]) + if output_messages + else None, + ), + ) + + return {key: value for key, value in optional_attrs if value is not None} + + def _get_embedding_response_attributes( invocation: EmbeddingInvocation, ) -> dict[str, Any]: @@ -365,6 +438,8 @@ def _get_embedding_response_attributes( "_get_llm_response_attributes", "_get_llm_span_name", "_maybe_emit_llm_event", + "_get_workflow_common_attributes", + "_apply_workflow_finish_attributes", "_apply_embedding_finish_attributes", "_get_embedding_common_attributes", "_get_embedding_request_attributes", diff --git a/util/opentelemetry-util-genai/tests/test_handler_workflow.py b/util/opentelemetry-util-genai/tests/test_handler_workflow.py new file mode 100644 index 0000000000..752c2eb85a --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_handler_workflow.py @@ -0,0 +1,299 @@ +from __future__ import annotations + +from unittest import TestCase +from unittest.mock import patch + +import pytest + +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.trace import SpanKind +from opentelemetry.trace.status import StatusCode +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + Error, + InputMessage, + OutputMessage, + Text, + WorkflowInvocation, +) + + +class _WorkflowTestBase(TestCase): + """Shared setUp for workflow handler tests.""" + + def setUp(self) -> None: + self.span_exporter = InMemorySpanExporter() + self.tracer_provider = TracerProvider() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + self.handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + ) + + def _get_finished_spans(self): + return self.span_exporter.get_finished_spans() + + +class TelemetryHandlerWorkflowTest(_WorkflowTestBase): + # ------------------------------------------------------------------ + # start_workflow + # ------------------------------------------------------------------ + + def test_start_workflow_creates_span(self) -> None: + invocation = WorkflowInvocation(name="my_workflow") + self.handler.start(invocation) + + self.assertIsNotNone(invocation.span) + self.assertIsNotNone(invocation.context_token) + self.assertIsNotNone(invocation.monotonic_start_s) + + # Clean up + self.handler.stop(invocation) + + def test_start_workflow_span_name(self) -> None: + invocation = WorkflowInvocation( + name="my_pipeline", operation_name="run_pipeline" + ) + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].name, "run_pipeline my_pipeline") + + def test_start_workflow_span_kind_is_internal(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].kind, SpanKind.INTERNAL) + + def test_start_workflow_records_monotonic_start(self) -> None: + invocation = WorkflowInvocation(name="wf") + with patch("timeit.default_timer", return_value=500.0): + self.handler.start(invocation) + self.assertEqual(invocation.monotonic_start_s, 500.0) + self.handler.stop(invocation) + + # ------------------------------------------------------------------ + # stop_workflow + # ------------------------------------------------------------------ + + def test_stop_workflow_ends_span(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + + def test_stop_workflow_sets_operation_name_attribute(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual( + spans[0].attributes[GenAI.GEN_AI_OPERATION_NAME], + "invoke_workflow", + ) + + def test_stop_workflow_sets_custom_attributes(self) -> None: + invocation = WorkflowInvocation(name="wf") + invocation.attributes["custom.key"] = "custom_value" + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual(spans[0].attributes["custom.key"], "custom_value") + + def test_stop_workflow_noop_when_not_started(self) -> None: + invocation = WorkflowInvocation(name="wf") + # Not started — span and context_token are None + result = self.handler.stop(invocation) + self.assertIs(result, invocation) + self.assertEqual(len(self._get_finished_spans()), 0) + + def test_stop_workflow_returns_invocation(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + result = self.handler.stop(invocation) + self.assertIs(result, invocation) + + # ------------------------------------------------------------------ + # fail_workflow + # ------------------------------------------------------------------ + + def test_fail_workflow_sets_error_status(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + error = Error(message="something broke", type=RuntimeError) + self.handler.fail(invocation, error) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].status.status_code, StatusCode.ERROR) + self.assertEqual(spans[0].status.description, "something broke") + + def test_fail_workflow_sets_error_type_attribute(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + error = Error(message="bad", type=ValueError) + self.handler.fail(invocation, error) + + spans = self._get_finished_spans() + self.assertEqual(spans[0].attributes["error.type"], "ValueError") + + def test_fail_workflow_sets_operation_name_attribute(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + error = Error(message="fail", type=TypeError) + self.handler.fail(invocation, error) + + spans = self._get_finished_spans() + self.assertEqual( + spans[0].attributes[GenAI.GEN_AI_OPERATION_NAME], + "invoke_workflow", + ) + + def test_fail_workflow_noop_when_not_started(self) -> None: + invocation = WorkflowInvocation(name="wf") + error = Error(message="fail", type=RuntimeError) + result = self.handler.fail(invocation, error) + self.assertIs(result, invocation) + self.assertEqual(len(self._get_finished_spans()), 0) + + def test_fail_workflow_returns_invocation(self) -> None: + invocation = WorkflowInvocation(name="wf") + self.handler.start(invocation) + error = Error(message="err", type=RuntimeError) + result = self.handler.fail(invocation, error) + self.assertIs(result, invocation) + + +class TelemetryHandlerWorkflowContextManagerTest(_WorkflowTestBase): + # ------------------------------------------------------------------ + # workflow context manager + # ------------------------------------------------------------------ + + def test_workflow_context_manager_creates_and_ends_span(self) -> None: + invocation = WorkflowInvocation(name="ctx_wf") + with self.handler.workflow(invocation) as inv: + self.assertIsNotNone(inv.span) + self.assertIsNotNone(inv.context_token) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].name, "invoke_workflow ctx_wf") + + def test_workflow_context_manager_default_invocation(self) -> None: + with self.handler.workflow() as inv: + self.assertIsInstance(inv, WorkflowInvocation) + self.assertEqual(inv.name, "") + self.assertEqual(inv.operation_name, "invoke_workflow") + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + + def test_workflow_context_manager_sets_attributes_on_span(self) -> None: + invocation = WorkflowInvocation(name="wf") + with self.handler.workflow(invocation) as inv: + inv.attributes["my.attr"] = "hello" + + spans = self._get_finished_spans() + self.assertEqual(spans[0].attributes["my.attr"], "hello") + + def test_workflow_context_manager_reraises_exception(self) -> None: + invocation = WorkflowInvocation(name="wf") + with pytest.raises(ValueError, match="test error"): + with self.handler.workflow(invocation): + raise ValueError("test error") + + def test_workflow_context_manager_marks_error_on_exception(self) -> None: + invocation = WorkflowInvocation(name="wf") + with pytest.raises(ValueError): + with self.handler.workflow(invocation): + raise ValueError("boom") + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].status.status_code, StatusCode.ERROR) + self.assertEqual(spans[0].status.description, "boom") + self.assertEqual(spans[0].attributes["error.type"], "ValueError") + + def test_workflow_context_manager_success_has_unset_status(self) -> None: + invocation = WorkflowInvocation(name="wf") + with self.handler.workflow(invocation): + pass + + spans = self._get_finished_spans() + self.assertEqual(spans[0].status.status_code, StatusCode.UNSET) + + def test_workflow_context_manager_with_messages(self) -> None: + inp = InputMessage(role="user", parts=[Text(content="hello")]) + out = OutputMessage( + role="assistant", parts=[Text(content="hi")], finish_reason="stop" + ) + invocation = WorkflowInvocation( + name="msg_wf", + input_messages=[inp], + output_messages=[out], + ) + with self.handler.workflow(invocation): + pass + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + spans[0].attributes[GenAI.GEN_AI_OPERATION_NAME], + "invoke_workflow", + ) + + def test_workflow_context_manager_swallows_start_failure(self) -> None: + """workflow() should yield even if start_workflow raises.""" + invocation = WorkflowInvocation(name="wf") + with patch.object( + self.handler, + "start", + side_effect=RuntimeError("start failed"), + ): + # Should not raise — the exception is swallowed with a warning + with self.handler.workflow(invocation) as inv: + self.assertIs(inv, invocation) + + def test_workflow_context_manager_swallows_stop_failure(self) -> None: + """workflow() should not raise if stop_workflow fails.""" + invocation = WorkflowInvocation(name="wf") + with patch.object( + self.handler, + "stop", + side_effect=RuntimeError("stop failed"), + ): + # Should not raise + with self.handler.workflow(invocation): + pass + + def test_workflow_context_manager_swallows_fail_workflow_failure( + self, + ) -> None: + """workflow() should still re-raise the original exception even if + fail_workflow itself raises.""" + invocation = WorkflowInvocation(name="wf") + with patch.object( + self.handler, + "fail", + side_effect=RuntimeError("fail broke"), + ): + with pytest.raises(ValueError, match="original"): + with self.handler.workflow(invocation): + raise ValueError("original") From bcc077534adca00c672f75a70fbc5be8fa89ef43 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Thu, 26 Mar 2026 13:53:21 -0700 Subject: [PATCH 2/8] fix changelog --- util/opentelemetry-util-genai/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index d779ac8efe..dfecb433f5 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add support for workflow in genAI utils handler. + ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4366](#4366)) - Add EmbeddingInvocation span lifecycle support ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4219](#4219)) - Populate schema_url on metrics From bbbd405d22c5bbc93299d8d3d52b1d1c0a924f76 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Thu, 26 Mar 2026 13:58:54 -0700 Subject: [PATCH 3/8] fixed errors --- .../src/opentelemetry/util/genai/handler.py | 4 ++-- .../src/opentelemetry/util/genai/span_utils.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 12cca8f5a3..858e8a8237 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -187,10 +187,10 @@ def _start(self, invocation: _T) -> _T: kind = SpanKind.CLIENT elif isinstance(invocation, WorkflowInvocation): span_name = _get_workflow_span_name(invocation) - kind = SpanKind.CLIENT + kind = SpanKind.INTERNAL else: span_name = "" - kind = "" + kind = SpanKind.CLIENT span = self._tracer.start_span( name=span_name, kind=kind, diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py index c61f49d425..7377b55187 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py @@ -445,4 +445,5 @@ def _get_embedding_response_attributes( "_get_embedding_request_attributes", "_get_embedding_response_attributes", "_get_embedding_span_name", + "_get_workflow_span_name", ] From 30e252eb1e8c21f0838a2b83a6569807ae1b47a8 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Mon, 30 Mar 2026 17:36:07 -0700 Subject: [PATCH 4/8] addressed comments --- .../src/opentelemetry/util/genai/span_utils.py | 6 +++--- .../tests/test_handler_workflow.py | 13 +++++++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py index 7377b55187..49f4ef803f 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py @@ -111,9 +111,9 @@ def _get_embedding_span_name(invocation: EmbeddingInvocation) -> str: def _get_workflow_span_name(invocation: WorkflowInvocation) -> str: """Get the span name for an Workflow invocation.""" - operation_name = getattr(invocation, "operation_name", None) or "" - name = getattr(invocation, "name", None) or "" - return f"{operation_name} {name}".strip() + operation_name = invocation.operation_name + name = invocation.name + return f"{operation_name} {name}" if name else operation_name def _get_llm_messages_attributes_for_span( diff --git a/util/opentelemetry-util-genai/tests/test_handler_workflow.py b/util/opentelemetry-util-genai/tests/test_handler_workflow.py index 752c2eb85a..9603ed9dba 100644 --- a/util/opentelemetry-util-genai/tests/test_handler_workflow.py +++ b/util/opentelemetry-util-genai/tests/test_handler_workflow.py @@ -60,14 +60,23 @@ def test_start_workflow_creates_span(self) -> None: def test_start_workflow_span_name(self) -> None: invocation = WorkflowInvocation( - name="my_pipeline", operation_name="run_pipeline" + name="my_pipeline" ) self.handler.start(invocation) self.handler.stop(invocation) spans = self._get_finished_spans() self.assertEqual(len(spans), 1) - self.assertEqual(spans[0].name, "run_pipeline my_pipeline") + self.assertEqual(spans[0].name, "invoke_workflow my_pipeline") + + def test_start_workflow_span_name_without_name(self) -> None: + invocation = WorkflowInvocation() + self.handler.start(invocation) + self.handler.stop(invocation) + + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual(spans[0].name, "invoke_workflow") def test_start_workflow_span_kind_is_internal(self) -> None: invocation = WorkflowInvocation(name="wf") From 733b5ef2f5677aed50d5657344c5e83d6d7867cd Mon Sep 17 00:00:00 2001 From: Wrisa Date: Mon, 30 Mar 2026 17:41:46 -0700 Subject: [PATCH 5/8] fixed precommit --- util/opentelemetry-util-genai/tests/test_handler_workflow.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/util/opentelemetry-util-genai/tests/test_handler_workflow.py b/util/opentelemetry-util-genai/tests/test_handler_workflow.py index 9603ed9dba..cb538ea507 100644 --- a/util/opentelemetry-util-genai/tests/test_handler_workflow.py +++ b/util/opentelemetry-util-genai/tests/test_handler_workflow.py @@ -59,9 +59,7 @@ def test_start_workflow_creates_span(self) -> None: self.handler.stop(invocation) def test_start_workflow_span_name(self) -> None: - invocation = WorkflowInvocation( - name="my_pipeline" - ) + invocation = WorkflowInvocation(name="my_pipeline") self.handler.start(invocation) self.handler.stop(invocation) From a7cdf87d397871883264b4f6918af0ecab60a1a3 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Mon, 30 Mar 2026 18:09:45 -0700 Subject: [PATCH 6/8] made operation name immutable and added test --- .../src/opentelemetry/util/genai/types.py | 3 +++ util/opentelemetry-util-genai/tests/test_handler_workflow.py | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 538d61490b..6d59f03bf5 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -285,6 +285,9 @@ class WorkflowInvocation(GenAIInvocation): default_factory=_new_output_messages ) + def __post_init__(self) -> None: + self.operation_name = "invoke_workflow" + @dataclass class LLMInvocation(GenAIInvocation): diff --git a/util/opentelemetry-util-genai/tests/test_handler_workflow.py b/util/opentelemetry-util-genai/tests/test_handler_workflow.py index cb538ea507..d3af4e9ec7 100644 --- a/util/opentelemetry-util-genai/tests/test_handler_workflow.py +++ b/util/opentelemetry-util-genai/tests/test_handler_workflow.py @@ -47,6 +47,10 @@ class TelemetryHandlerWorkflowTest(_WorkflowTestBase): # start_workflow # ------------------------------------------------------------------ + def test_operation_name_is_immutable(self) -> None: + invocation = WorkflowInvocation(name="wf", operation_name="custom_op") + self.assertEqual(invocation.operation_name, "invoke_workflow") + def test_start_workflow_creates_span(self) -> None: invocation = WorkflowInvocation(name="my_workflow") self.handler.start(invocation) From 90b5fc7bace8fe516cfdeb37b24ee611bcabc2e8 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Mon, 30 Mar 2026 18:14:59 -0700 Subject: [PATCH 7/8] removed obsolete test --- .../tests/test_workflow_invocation.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/util/opentelemetry-util-genai/tests/test_workflow_invocation.py b/util/opentelemetry-util-genai/tests/test_workflow_invocation.py index c6f9bcdaec..b9c0e756da 100644 --- a/util/opentelemetry-util-genai/tests/test_workflow_invocation.py +++ b/util/opentelemetry-util-genai/tests/test_workflow_invocation.py @@ -21,10 +21,6 @@ def test_custom_name(self): invocation = WorkflowInvocation(name="customer_support_pipeline") assert invocation.name == "customer_support_pipeline" - def test_custom_operation_name(self): - invocation = WorkflowInvocation(operation_name="run_pipeline") - assert invocation.operation_name == "run_pipeline" - def test_with_input_messages(self): msg = InputMessage(role="user", parts=[Text(content="hello")]) invocation = WorkflowInvocation(input_messages=[msg]) From 5aa34af413e06698c64a68f36e426e467cc0ff21 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Mon, 30 Mar 2026 18:23:59 -0700 Subject: [PATCH 8/8] removed workflow specific method --- .../opentelemetry/util/genai/span_utils.py | 38 ++----------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py index 49f4ef803f..0a82462c1b 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py @@ -116,7 +116,7 @@ def _get_workflow_span_name(invocation: WorkflowInvocation) -> str: return f"{operation_name} {name}" if name else operation_name -def _get_llm_messages_attributes_for_span( +def _get_messages_attributes_for_span( input_messages: list[InputMessage], output_messages: list[OutputMessage], system_instruction: list[MessagePart] | None = None, @@ -248,7 +248,7 @@ def _apply_llm_finish_attributes( attributes.update(_get_llm_request_attributes(invocation)) attributes.update(_get_llm_response_attributes(invocation)) attributes.update( - _get_llm_messages_attributes_for_span( + _get_messages_attributes_for_span( invocation.input_messages, invocation.output_messages, invocation.system_instruction, @@ -362,7 +362,7 @@ def _apply_workflow_finish_attributes( attributes: dict[str, Any] = {} attributes.update(_get_workflow_common_attributes(invocation)) attributes.update( - _get_workflow_messages_attributes_for_span( + _get_messages_attributes_for_span( invocation.input_messages, invocation.output_messages, ) @@ -386,38 +386,6 @@ def _get_workflow_common_attributes( } -def _get_workflow_messages_attributes_for_span( - input_messages: list[InputMessage], - output_messages: list[OutputMessage], -) -> dict[str, Any]: - """Get message attributes formatted for span (JSON string format). - - Returns empty dict if not in experimental mode or content capturing is disabled. - """ - if not is_experimental_mode() or get_content_capturing_mode() not in ( - ContentCapturingMode.SPAN_ONLY, - ContentCapturingMode.SPAN_AND_EVENT, - ): - return {} - - optional_attrs = ( - ( - GenAI.GEN_AI_INPUT_MESSAGES, - gen_ai_json_dumps([asdict(m) for m in input_messages]) - if input_messages - else None, - ), - ( - GenAI.GEN_AI_OUTPUT_MESSAGES, - gen_ai_json_dumps([asdict(m) for m in output_messages]) - if output_messages - else None, - ), - ) - - return {key: value for key, value in optional_attrs if value is not None} - - def _get_embedding_response_attributes( invocation: EmbeddingInvocation, ) -> dict[str, Any]: