Skip to content

Commit 828138a

Browse files
authored
Add support for workflow invocation in genAI utils handler (#4366)
* Add support for workflow invocation in genAI utils handler * fix changelog * fixed errors * addressed comments * fixed precommit * made operation name immutable and added test * removed obsolete test * removed workflow specific method
1 parent a428585 commit 828138a

File tree

6 files changed

+437
-7
lines changed

6 files changed

+437
-7
lines changed

util/opentelemetry-util-genai/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
11+
- Add support for workflow in genAI utils handler.
12+
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4366](#4366))
1013
- Enrich ToolCall type, breaking change: usage of ToolCall class renamed to ToolCallRequest
1114
([#4218](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4218))
1215
- Add EmbeddingInvocation span lifecycle support

util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060

6161
from __future__ import annotations
6262

63+
import logging
6364
import timeit
6465
from contextlib import contextmanager
6566
from typing import Iterator, TypeVar
@@ -83,18 +84,38 @@
8384
_apply_embedding_finish_attributes,
8485
_apply_error_attributes,
8586
_apply_llm_finish_attributes,
87+
_apply_workflow_finish_attributes,
8688
_get_embedding_span_name,
8789
_get_llm_span_name,
90+
_get_workflow_span_name,
8891
_maybe_emit_llm_event,
8992
)
9093
from opentelemetry.util.genai.types import (
9194
EmbeddingInvocation,
9295
Error,
9396
GenAIInvocation,
9497
LLMInvocation,
98+
WorkflowInvocation,
9599
)
96100
from opentelemetry.util.genai.version import __version__
97101

102+
_logger = logging.getLogger(__name__)
103+
104+
105+
def _safe_detach(invocation: GenAIInvocation) -> None:
106+
"""Detach the context token if still present, as a safety net."""
107+
if invocation.context_token is not None:
108+
try:
109+
otel_context.detach(invocation.context_token)
110+
except Exception: # pylint: disable=broad-except
111+
pass
112+
if invocation.span is not None:
113+
try:
114+
invocation.span.end()
115+
except Exception: # pylint: disable=broad-except
116+
pass
117+
118+
98119
_T = TypeVar("_T", bound=GenAIInvocation)
99120

100121

@@ -160,13 +181,19 @@ def _start(self, invocation: _T) -> _T:
160181
"""Start a GenAI invocation and create a pending span entry."""
161182
if isinstance(invocation, LLMInvocation):
162183
span_name = _get_llm_span_name(invocation)
184+
kind = SpanKind.CLIENT
163185
elif isinstance(invocation, EmbeddingInvocation):
164186
span_name = _get_embedding_span_name(invocation)
187+
kind = SpanKind.CLIENT
188+
elif isinstance(invocation, WorkflowInvocation):
189+
span_name = _get_workflow_span_name(invocation)
190+
kind = SpanKind.INTERNAL
165191
else:
166192
span_name = ""
193+
kind = SpanKind.CLIENT
167194
span = self._tracer.start_span(
168195
name=span_name,
169-
kind=SpanKind.CLIENT,
196+
kind=kind,
170197
)
171198
# Record a monotonic start timestamp (seconds) for duration
172199
# calculation using timeit.default_timer.
@@ -192,6 +219,9 @@ def _stop(self, invocation: _T) -> _T:
192219
elif isinstance(invocation, EmbeddingInvocation):
193220
_apply_embedding_finish_attributes(span, invocation)
194221
self._record_embedding_metrics(invocation, span)
222+
elif isinstance(invocation, WorkflowInvocation):
223+
_apply_workflow_finish_attributes(span, invocation)
224+
# TODO: Add workflow metrics when supported
195225
finally:
196226
# Detach context and end span even if finishing fails
197227
otel_context.detach(invocation.context_token)
@@ -222,6 +252,10 @@ def _fail(self, invocation: _T, error: Error) -> _T:
222252
self._record_embedding_metrics(
223253
invocation, span, error_type=error_type
224254
)
255+
elif isinstance(invocation, WorkflowInvocation):
256+
_apply_workflow_finish_attributes(span, invocation)
257+
_apply_error_attributes(span, error, error_type)
258+
# TODO: Add workflow metrics when supported
225259
finally:
226260
# Detach context and end span even if finishing fails
227261
otel_context.detach(invocation.context_token)
@@ -304,6 +338,46 @@ def embedding(
304338
raise
305339
self.stop(invocation)
306340

341+
@contextmanager
342+
def workflow(
343+
self, invocation: WorkflowInvocation | None = None
344+
) -> Iterator[WorkflowInvocation]:
345+
"""Context manager for Workflow invocations.
346+
347+
Only set data attributes on the invocation object, do not modify the span or context.
348+
349+
Starts the span on entry. On normal exit, finalizes the invocation and ends the span.
350+
If an exception occurs inside the context, marks the span as error, ends it, and
351+
re-raises the original exception.
352+
"""
353+
if invocation is None:
354+
invocation = WorkflowInvocation()
355+
356+
try:
357+
self.start(invocation)
358+
except Exception: # pylint: disable=broad-except
359+
_logger.warning(
360+
"Failed to start workflow telemetry", exc_info=True
361+
)
362+
363+
try:
364+
yield invocation
365+
except Exception as exc:
366+
try:
367+
self.fail(invocation, Error(message=str(exc), type=type(exc)))
368+
except Exception: # pylint: disable=broad-except
369+
_logger.warning(
370+
"Failed to record workflow failure", exc_info=True
371+
)
372+
_safe_detach(invocation)
373+
raise
374+
375+
try:
376+
self.stop(invocation)
377+
except Exception: # pylint: disable=broad-except
378+
_logger.warning("Failed to stop workflow telemetry", exc_info=True)
379+
_safe_detach(invocation)
380+
307381

308382
def get_telemetry_handler(
309383
tracer_provider: TracerProvider | None = None,

util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
LLMInvocation,
4040
MessagePart,
4141
OutputMessage,
42+
WorkflowInvocation,
4243
)
4344
from opentelemetry.util.genai.utils import (
4445
ContentCapturingMode,
@@ -108,7 +109,14 @@ def _get_embedding_span_name(invocation: EmbeddingInvocation) -> str:
108109
return _get_span_name(invocation)
109110

110111

111-
def _get_llm_messages_attributes_for_span(
112+
def _get_workflow_span_name(invocation: WorkflowInvocation) -> str:
113+
"""Get the span name for an Workflow invocation."""
114+
operation_name = invocation.operation_name
115+
name = invocation.name
116+
return f"{operation_name} {name}" if name else operation_name
117+
118+
119+
def _get_messages_attributes_for_span(
112120
input_messages: list[InputMessage],
113121
output_messages: list[OutputMessage],
114122
system_instruction: list[MessagePart] | None = None,
@@ -240,7 +248,7 @@ def _apply_llm_finish_attributes(
240248
attributes.update(_get_llm_request_attributes(invocation))
241249
attributes.update(_get_llm_response_attributes(invocation))
242250
attributes.update(
243-
_get_llm_messages_attributes_for_span(
251+
_get_messages_attributes_for_span(
244252
invocation.input_messages,
245253
invocation.output_messages,
246254
invocation.system_instruction,
@@ -345,6 +353,39 @@ def _get_llm_response_attributes(
345353
return {key: value for key, value in optional_attrs if value is not None}
346354

347355

356+
def _apply_workflow_finish_attributes(
357+
span: Span, invocation: WorkflowInvocation
358+
) -> None:
359+
"""Apply attributes/messages common to finish() paths."""
360+
361+
# Build all attributes by reusing the attribute getter functions
362+
attributes: dict[str, Any] = {}
363+
attributes.update(_get_workflow_common_attributes(invocation))
364+
attributes.update(
365+
_get_messages_attributes_for_span(
366+
invocation.input_messages,
367+
invocation.output_messages,
368+
)
369+
)
370+
attributes.update(invocation.attributes)
371+
372+
# Set all attributes on the span
373+
if attributes:
374+
span.set_attributes(attributes)
375+
376+
377+
def _get_workflow_common_attributes(
378+
invocation: WorkflowInvocation,
379+
) -> dict[str, Any]:
380+
"""Get common Workflow attributes shared by finish() and error() paths.
381+
382+
Returns a dictionary of attributes.
383+
"""
384+
return {
385+
GenAI.GEN_AI_OPERATION_NAME: invocation.operation_name,
386+
}
387+
388+
348389
def _get_embedding_response_attributes(
349390
invocation: EmbeddingInvocation,
350391
) -> dict[str, Any]:
@@ -365,9 +406,12 @@ def _get_embedding_response_attributes(
365406
"_get_llm_response_attributes",
366407
"_get_llm_span_name",
367408
"_maybe_emit_llm_event",
409+
"_get_workflow_common_attributes",
410+
"_apply_workflow_finish_attributes",
368411
"_apply_embedding_finish_attributes",
369412
"_get_embedding_common_attributes",
370413
"_get_embedding_request_attributes",
371414
"_get_embedding_response_attributes",
372415
"_get_embedding_span_name",
416+
"_get_workflow_span_name",
373417
]

util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,9 @@ class WorkflowInvocation(GenAIInvocation):
285285
default_factory=_new_output_messages
286286
)
287287

288+
def __post_init__(self) -> None:
289+
self.operation_name = "invoke_workflow"
290+
288291

289292
@dataclass
290293
class LLMInvocation(GenAIInvocation):

0 commit comments

Comments
 (0)