-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy path_invocation.py
More file actions
249 lines (219 loc) · 8.81 KB
/
_invocation.py
File metadata and controls
249 lines (219 loc) · 8.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import timeit
from abc import ABC, abstractmethod
from contextlib import contextmanager
from contextvars import Token
from dataclasses import asdict
from typing import TYPE_CHECKING, Any, Generator, Sequence
from typing_extensions import Self, TypeAlias
from opentelemetry._logs import Logger, LogRecord
from opentelemetry.context import Context, attach, detach
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.semconv.attributes import error_attributes
from opentelemetry.trace import INVALID_SPAN as _INVALID_SPAN
from opentelemetry.trace import Span, SpanKind, Tracer, set_span_in_context
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util.genai.completion_hook import CompletionHook
from opentelemetry.util.genai.types import (
Error,
InputMessage,
MessagePart,
OutputMessage,
ToolDefinition,
)
from opentelemetry.util.genai.utils import (
ContentCapturingMode,
gen_ai_json_dumps,
get_content_capturing_mode,
is_experimental_mode,
)
if TYPE_CHECKING:
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder
ContextToken: TypeAlias = Token[Context]
class GenAIInvocation(ABC):
"""
Base class for all GenAI invocation types. Manages the lifecycle of a single
GenAI operation (LLM call, embedding, tool execution, workflow, etc.).
Use the factory methods on TelemetryHandler (start_inference, start_embedding,
start_workflow, start_tool) rather than constructing invocations directly.
"""
def __init__(
self,
# Individual components instead of TelemetryHandler to avoid a circular
# import between handler.py and the invocation modules.
tracer: Tracer,
metrics_recorder: InvocationMetricsRecorder,
logger: Logger,
completion_hook: CompletionHook,
operation_name: str,
span_name: str,
span_kind: SpanKind = SpanKind.CLIENT,
attributes: dict[str, Any] | None = None,
metric_attributes: dict[str, Any] | None = None,
) -> None:
self._tracer = tracer
self._metrics_recorder = metrics_recorder
self._logger = logger
self._completion_hook = completion_hook
self._operation_name: str = operation_name
self.attributes: dict[str, Any] = (
{} if attributes is None else attributes
)
"""Additional attributes to set on spans and/or events. Not set on metrics."""
self.metric_attributes: dict[str, Any] = (
{} if metric_attributes is None else metric_attributes
)
"""Additional attributes to set on metrics. Must be low cardinality. Not set on spans or events."""
self.span: Span = _INVALID_SPAN
self._span_context: Context
self._span_name: str = span_name
self._span_kind: SpanKind = span_kind
self._context_token: ContextToken | None = None
self._monotonic_start_s: float | None = None
@property
def monotonic_start_s(self) -> float | None:
"""Monotonic timestamp (seconds) when this invocation started."""
return self._monotonic_start_s
def _start(self, attributes: dict[str, Any] | None = None) -> None:
"""Start the invocation span and attach it to the current context.
Args:
attributes: Initial span attributes available for sampling decisions.
"""
self.span = self._tracer.start_span(
name=self._span_name,
kind=self._span_kind,
attributes=attributes,
)
self._span_context = set_span_in_context(self.span)
self._monotonic_start_s = timeit.default_timer()
self._context_token = attach(self._span_context)
def _get_metric_attributes(self) -> dict[str, Any]:
"""Return low-cardinality attributes for metric recording."""
return self.metric_attributes
def _get_metric_token_counts(self) -> dict[str, int]: # pylint: disable=no-self-use
"""Return {token_type: count} for token histogram recording."""
return {}
def _apply_error_attributes(self, error: Error) -> None:
"""Apply error status and error.type attribute to the span, events, and metrics."""
error_type = error.type.__qualname__
self.span.set_status(Status(StatusCode.ERROR, error.message))
self.attributes[error_attributes.ERROR_TYPE] = error_type
self.metric_attributes[error_attributes.ERROR_TYPE] = error_type
def _call_completion_hook(
self,
*,
inputs: list[InputMessage] | None = None,
outputs: list[OutputMessage] | None = None,
system_instruction: list[MessagePart] | None = None,
tool_definitions: list[ToolDefinition] | None = None,
log_record: LogRecord | None = None,
) -> None:
"""Invoke the completion hook with the invocation's content.
Subclasses pass whichever content fields they carry; the wrapper substitutes []
for unspecified list fields
"""
self._completion_hook.on_completion(
inputs=inputs or [],
outputs=outputs or [],
system_instruction=system_instruction or [],
tool_definitions=tool_definitions,
span=self.span,
log_record=log_record,
)
@abstractmethod
def _apply_finish(self, error: Error | None = None) -> None:
"""Apply finish telemetry (attributes, metrics, events)."""
def _finish(self, error: Error | None = None) -> None:
"""Apply finish telemetry and end the span."""
if self._context_token is None:
return
try:
self._apply_finish(error)
finally:
try:
detach(self._context_token)
except Exception: # pylint: disable=broad-except
pass
self.span.end()
def stop(self) -> None:
"""Finalize the invocation successfully and end its span."""
self._finish()
def fail(self, error: Error | BaseException) -> None:
"""Fail the invocation and end its span with error status."""
if isinstance(error, BaseException):
error = Error(type=type(error), message=str(error))
self._finish(error)
@contextmanager
def _managed(self) -> Generator[Self, None, None]:
"""Context manager that calls stop() on success or fail() on exception."""
try:
yield self
except Exception as exc:
self.fail(exc)
raise
self.stop()
def get_content_attributes(
*,
input_messages: Sequence[InputMessage],
output_messages: Sequence[OutputMessage],
system_instruction: Sequence[MessagePart],
tool_definitions: Sequence[ToolDefinition] | None,
for_span: bool,
) -> dict[str, Any]:
"""Serialize messages, system instructions, and tool definitions into attributes.
Args:
input_messages: Input messages to serialize.
output_messages: Output messages to serialize.
system_instruction: System instructions to serialize.
tool_definitions: Tool definitions to serialize (may be None).
for_span: If True, serialize for span attributes (JSON string);
if False, serialize for event attributes (list of dicts).
"""
if not is_experimental_mode():
return {}
mode = get_content_capturing_mode()
allowed_modes = (
(
ContentCapturingMode.SPAN_ONLY,
ContentCapturingMode.SPAN_AND_EVENT,
)
if for_span
else (
ContentCapturingMode.EVENT_ONLY,
ContentCapturingMode.SPAN_AND_EVENT,
)
)
def serialize(items: Sequence[Any]) -> Any:
dicts = [asdict(item) for item in items]
return gen_ai_json_dumps(dicts) if for_span else dicts
# Tool definitions are always captured, the sem conv recommends adding params / description only
# when the content capture mode is set..
if mode not in allowed_modes:
return (
{GenAI.GEN_AI_TOOL_DEFINITIONS: serialize(tool_definitions)}
if tool_definitions
else {}
)
optional_attrs = (
(
GenAI.GEN_AI_INPUT_MESSAGES,
serialize(input_messages) if input_messages else None,
),
(
GenAI.GEN_AI_OUTPUT_MESSAGES,
serialize(output_messages) if output_messages else None,
),
(
GenAI.GEN_AI_SYSTEM_INSTRUCTIONS,
serialize(system_instruction) if system_instruction else None,
),
(
GenAI.GEN_AI_TOOL_DEFINITIONS,
serialize(tool_definitions) if tool_definitions else None,
),
)
return {key: value for key, value in optional_attrs if value is not None}