Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Record streaming timing metrics (time to first chunk, time per output chunk) for chat
completion streams.
([#13](https://github.com/open-telemetry/opentelemetry-python-genai/pull/13))
- Refactor chat completion stream wrappers to use shared GenAI stream lifecycle helpers.
([#4500](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4500))
- Pass tool definitions from `tools` kwarg to `InferenceInvocation.tool_definitions`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def __init__(
invocation: InferenceInvocation,
capture_content: bool,
) -> None:
super().__init__(stream)
super().__init__(stream, start_time_s=invocation.monotonic_start_s, timing_target=invocation)
self._self_invocation = invocation
self._self_choice_buffers = []
self._self_capture_content = capture_content
Expand All @@ -203,7 +203,7 @@ def __init__(
invocation: InferenceInvocation,
capture_content: bool,
) -> None:
super().__init__(stream)
super().__init__(stream, start_time_s=invocation.monotonic_start_s, timing_target=invocation)
self._self_invocation = invocation
self._self_choice_buffers = []
self._self_capture_content = capture_content
Expand Down
3 changes: 3 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add streaming timing metrics (TTFC histogram, time_per_output_chunk histogram, span attribute)
to stream wrapper base classes.
([#13](https://github.com/open-telemetry/opentelemetry-python-genai/pull/13))
- Add shared sync and async stream wrapper base classes for GenAI instrumentations.
([#4500](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4500))
- Change `InferenceInvocation` init params to only accept base params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

# TODO: Migrate to GenAI constants once available in semconv package
_GEN_AI_REASONING_OUTPUT_TOKENS = "gen_ai.usage.reasoning.output_tokens"
_GEN_AI_RESPONSE_TIME_TO_FIRST_CHUNK = "gen_ai.response.time_to_first_chunk"


class InferenceInvocation(GenAIInvocation):
Expand Down Expand Up @@ -94,6 +95,9 @@ def __init__(
self.cache_creation_input_tokens: int | None = None
self.cache_read_input_tokens: int | None = None
self.tool_definitions: list[ToolDefinition] | None = None
# Streaming timing fields (populated by stream wrappers)
self.ttfc_seconds: float | None = None
self.chunk_gap_seconds: list[float] = []
self._start(self._get_base_attributes())

def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]:
Expand Down Expand Up @@ -162,6 +166,10 @@ def _get_attributes(self) -> dict[str, Any]:
_GEN_AI_REASONING_OUTPUT_TOKENS,
self.thinking_tokens,
),
(
_GEN_AI_RESPONSE_TIME_TO_FIRST_CHUNK,
self.ttfc_seconds,
Comment thread
Nik-Reddy marked this conversation as resolved.
),
)
attrs.update({k: v for k, v in optional_attrs if v is not None})
return attrs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def __init__(
self._context_token: ContextToken | None = None
self._monotonic_start_s: float | None = None

@property
def monotonic_start_s(self) -> float | None:
"""Monotonic timestamp (seconds) when this invocation started."""
return self._monotonic_start_s

def _start(self, attributes: dict[str, Any] | None = None) -> None:
"""Start the invocation span and attach it to the current context.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
from opentelemetry.metrics import Histogram, Meter
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics

# TODO: Migrate to GenAI constants once available in semconv package
_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK = (
"gen_ai.client.operation.time_to_first_chunk"
)
_GEN_AI_CLIENT_OPERATION_TIME_PER_OUTPUT_CHUNK = (
"gen_ai.client.operation.time_per_output_chunk"
)

_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
0.01,
0.02,
Expand Down Expand Up @@ -55,3 +63,21 @@ def create_token_histogram(meter: Meter) -> Histogram:
unit="{token}",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
)


def create_ttfc_histogram(meter: Meter) -> Histogram:
return meter.create_histogram(
name=_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK,
description="Time from request start to first output chunk",
unit="s",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
)


def create_time_per_chunk_histogram(meter: Meter) -> Histogram:
return meter.create_histogram(
name=_GEN_AI_CLIENT_OPERATION_TIME_PER_OUTPUT_CHUNK,
description="Duration from the end of the previous output chunk to the end of the current output chunk",
unit="s",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
)
from opentelemetry.util.genai.instruments import (
create_duration_histogram,
create_time_per_chunk_histogram,
create_token_histogram,
create_ttfc_histogram,
)

from ._invocation import GenAIInvocation
Expand All @@ -26,16 +28,20 @@ class InvocationMetricsRecorder:
def __init__(self, meter: Meter):
self._duration_histogram: Histogram = create_duration_histogram(meter)
self._token_histogram: Histogram = create_token_histogram(meter)
self._ttfc_histogram: Histogram = create_ttfc_histogram(meter)
self._time_per_chunk_histogram: Histogram = (
create_time_per_chunk_histogram(meter)
)

def record(self, invocation: GenAIInvocation) -> None:
"""Record duration and token metrics for an invocation if possible."""
attributes = invocation._get_metric_attributes()
token_counts = invocation._get_metric_token_counts()

duration_seconds: Optional[float] = None
if invocation._monotonic_start_s is not None:
if invocation.monotonic_start_s is not None:
duration_seconds = max(
timeit.default_timer() - invocation._monotonic_start_s,
timeit.default_timer() - invocation.monotonic_start_s,
0.0,
)

Expand All @@ -53,5 +59,25 @@ def record(self, invocation: GenAIInvocation) -> None:
context=invocation._span_context,
)

# Streaming timing metrics
Comment thread
Nik-Reddy marked this conversation as resolved.
ttfc_seconds = getattr(invocation, "ttfc_seconds", None)
if ttfc_seconds is not None:
self._ttfc_histogram.record(
ttfc_seconds,
attributes=attributes,
context=invocation._span_context,
)

chunk_gap_seconds = getattr(invocation, "chunk_gap_seconds", None)
if chunk_gap_seconds:
for gap in chunk_gap_seconds:
self._time_per_chunk_histogram.record(
gap,
attributes=attributes,
context=invocation._span_context,
)
# Clear after recording to avoid holding timing data beyond finalization
chunk_gap_seconds.clear()


__all__ = ["InvocationMetricsRecorder"]
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import logging
import timeit
from abc import ABCMeta, abstractmethod
from types import TracebackType
from typing import (
Expand All @@ -30,6 +31,13 @@ def __init__(self, wrapped: object) -> None: ...
_logger = logging.getLogger(__name__)


class _TimingTarget(Protocol):
"""Any object that can receive streaming timing measurements."""

ttfc_seconds: float | None
chunk_gap_seconds: list[float]


class _StreamWrapperMeta(ABCMeta, type(_ObjectProxy)):
"""Metaclass compatible with wrapt's proxy type and ABC hooks."""

Expand Down Expand Up @@ -64,11 +72,21 @@ class SyncStreamWrapper(
internally by the wrapper lifecycle and are not part of the public API.
"""

def __init__(self, stream: _SyncStream[ChunkT]):
def __init__(
self,
stream: _SyncStream[ChunkT],
start_time_s: float | None = None,
timing_target: _TimingTarget | None = None,
):
super().__init__(stream)
self._self_stream = stream
self._self_iterator = iter(stream)
self._self_finalized = False
self._self_start_time_s = start_time_s
self._self_timing_target = timing_target
self._self_ttfc_seconds: float | None = None
self._self_first_chunk_at: float | None = None
self._self_chunk_gaps: list[float] = []
Comment thread
Nik-Reddy marked this conversation as resolved.

def __enter__(self):
return self
Expand Down Expand Up @@ -108,6 +126,7 @@ def __iter__(self):
return self

def __next__(self) -> ChunkT:
t_before_read = timeit.default_timer()
try:
chunk = next(self._self_iterator)
except StopIteration:
Expand All @@ -116,22 +135,38 @@ def __next__(self) -> ChunkT:
except Exception as error:
self._safe_finalize_failure(error)
raise
t_after_read = timeit.default_timer()

if self._self_first_chunk_at is None:
self._self_first_chunk_at = t_after_read
if self._self_start_time_s is not None:
self._self_ttfc_seconds = t_after_read - self._self_start_time_s
else:
self._self_chunk_gaps.append(t_after_read - t_before_read)

try:
self._process_chunk(chunk)
except Exception as error: # pylint: disable=broad-exception-caught
self._handle_process_chunk_error(error)
return chunk

def _sync_timing_to_target(self) -> None:
if self._self_timing_target is not None:
self._self_timing_target.ttfc_seconds = self._self_ttfc_seconds
self._self_timing_target.chunk_gap_seconds = list(self._self_chunk_gaps)

def _finalize_success(self) -> None:
if self._self_finalized:
return
self._self_finalized = True
self._sync_timing_to_target()
self._on_stream_end()

def _finalize_failure(self, error: BaseException) -> None:
if self._self_finalized:
return
self._self_finalized = True
self._sync_timing_to_target()
self._on_stream_error(error)

def _safe_finalize_success(self) -> None:
Expand Down Expand Up @@ -191,11 +226,21 @@ class AsyncStreamWrapper(
are owned by this base class.
"""

def __init__(self, stream: _AsyncStream[ChunkT]):
def __init__(
self,
stream: _AsyncStream[ChunkT],
start_time_s: float | None = None,
timing_target: _TimingTarget | None = None,
):
super().__init__(stream)
self._self_stream = stream
self._self_aiter = aiter(stream)
self._self_finalized = False
self._self_start_time_s = start_time_s
self._self_timing_target = timing_target
self._self_ttfc_seconds: float | None = None
self._self_first_chunk_at: float | None = None
self._self_chunk_gaps: list[float] = []

async def __aenter__(self):
return self
Expand Down Expand Up @@ -237,6 +282,7 @@ def __aiter__(self):
return self

async def __anext__(self) -> ChunkT:
t_before_read = timeit.default_timer()
try:
chunk = await anext(self._self_aiter)
except StopAsyncIteration:
Expand All @@ -245,22 +291,38 @@ async def __anext__(self) -> ChunkT:
except Exception as error:
self._safe_finalize_failure(error)
raise
t_after_read = timeit.default_timer()

if self._self_first_chunk_at is None:
self._self_first_chunk_at = t_after_read
if self._self_start_time_s is not None:
self._self_ttfc_seconds = t_after_read - self._self_start_time_s
else:
self._self_chunk_gaps.append(t_after_read - t_before_read)

try:
self._process_chunk(chunk)
except Exception as error: # pylint: disable=broad-exception-caught
self._handle_process_chunk_error(error)
return chunk

def _sync_timing_to_target(self) -> None:
if self._self_timing_target is not None:
self._self_timing_target.ttfc_seconds = self._self_ttfc_seconds
self._self_timing_target.chunk_gap_seconds = list(self._self_chunk_gaps)

def _finalize_success(self) -> None:
if self._self_finalized:
return
self._self_finalized = True
self._sync_timing_to_target()
self._on_stream_end()

def _finalize_failure(self, error: BaseException) -> None:
if self._self_finalized:
return
self._self_finalized = True
self._sync_timing_to_target()
self._on_stream_error(error)

def _safe_finalize_success(self) -> None:
Expand Down
Loading