Skip to content

Commit 6a8fdf3

Browse files
authored
feat(iorails): Telemetry - token-level metrics (#1846)
1 parent a2b7880 commit 6a8fdf3

8 files changed

Lines changed: 1380 additions & 77 deletions

File tree

nemoguardrails/guardrails/engine_registry.py

Lines changed: 103 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,27 @@
2020
"""
2121

2222
import logging
23+
import time
2324
from collections.abc import AsyncGenerator
25+
from contextlib import nullcontext
2426
from typing import TYPE_CHECKING, Any, Optional, TypeVar
2527

2628
from nemoguardrails.guardrails.api_engine import APIEngine
2729
from nemoguardrails.guardrails.base_engine import BaseEngine
2830
from nemoguardrails.guardrails.guardrails_types import get_request_id, truncate
2931
from nemoguardrails.guardrails.model_engine import ModelEngine
30-
from nemoguardrails.guardrails.telemetry import api_call_span, llm_call_span
32+
from nemoguardrails.guardrails.telemetry import (
33+
api_call_span,
34+
llm_call_span,
35+
)
3136
from nemoguardrails.rails.llm.config import Model, RailsConfigData
32-
from nemoguardrails.types import LLMResponse, LLMResponseChunk
37+
from nemoguardrails.tracing.constants import (
38+
llm_operation_duration,
39+
record_time_per_output_chunk,
40+
record_time_to_first_chunk,
41+
record_token_usage,
42+
)
43+
from nemoguardrails.types import LLMResponse, LLMResponseChunk, UsageInfo
3344

3445
if TYPE_CHECKING:
3546
from opentelemetry.trace import Tracer
@@ -51,15 +62,24 @@ def __init__(
5162
models: list[Model],
5263
rails_config_data: RailsConfigData,
5364
tracer: Optional["Tracer"] = None,
65+
metrics_enabled: bool = False,
5466
) -> None:
5567
"""Build one engine per configured model and API service.
5668
5769
When *tracer* is provided, LLM and API calls produce OTEL spans; when
5870
``None`` the span helpers become no-ops.
71+
72+
When *metrics_enabled* is True, LLM calls emit the OTEL GenAI
73+
client-side metrics (``gen_ai.client.token.usage``,
74+
``gen_ai.client.operation.duration``, plus the streaming
75+
chunk-timing metrics). Defaults to False so callers that don't
76+
opt in get no metric emissions even if a MeterProvider is
77+
configured globally.
5978
"""
6079
self._engines: dict[str, BaseEngine] = {}
6180
self._running = False
6281
self._tracer = tracer
82+
self._metrics_enabled = metrics_enabled
6383

6484
for model_config in models:
6585
engine = ModelEngine(model_config)
@@ -147,6 +167,11 @@ async def model_call(self, model_type: str, messages: list[dict], **kwargs: Any)
147167
reasoning (when the provider exposes it), usage, finish reason.
148168
Callers that only want the assistant text should access ``.content``.
149169
170+
When metrics are enabled, emits ``gen_ai.client.operation.duration``
171+
(with ``error.type`` on exception) and ``gen_ai.client.token.usage``
172+
(one observation each for ``input`` and ``output`` token types,
173+
only when ``LLMResponse.usage`` is populated).
174+
150175
Raises:
151176
KeyError: If no engine is registered with the given name.
152177
TypeError: If the named engine is not a ModelEngine.
@@ -155,8 +180,26 @@ async def model_call(self, model_type: str, messages: list[dict], **kwargs: Any)
155180
log.debug("[%s] Model engine '%s' messages: %s", req_id, model_type, truncate(messages))
156181

157182
engine = self._get_engine(model_type, ModelEngine)
158-
with llm_call_span(self._tracer, engine.model_name, engine.model_config.engine or "unknown"):
159-
result = await engine.chat_completion(messages, **kwargs)
183+
# TODO: Replace with LLMModel.provider_name after refactoring
184+
provider_name = engine.model_config.engine or "unknown"
185+
operation_name = "chat"
186+
187+
# Compose: span (always created — no-op when tracer is None) and
188+
# duration metric (only when metrics enabled). Token usage is
189+
# emitted after the call returns since it depends on
190+
# ``result.usage`` — exception path skips it because control
191+
# never reaches the line below.
192+
duration_ctx = (
193+
llm_operation_duration(engine.model_name, provider_name, operation_name)
194+
if self._metrics_enabled
195+
else nullcontext()
196+
)
197+
with llm_call_span(self._tracer, engine.model_name, provider_name, operation_name):
198+
with duration_ctx:
199+
result = await engine.chat_completion(messages, **kwargs)
200+
201+
if self._metrics_enabled:
202+
record_token_usage(engine.model_name, provider_name, operation_name, result.usage)
160203

161204
log.debug("[%s] Model engine '%s' response: %s", req_id, model_type, truncate(result))
162205
return result
@@ -171,6 +214,15 @@ async def stream_model_call(
171214
before the first chunk and closes when the generator exhausts or
172215
raises.
173216
217+
When metrics are enabled, emits ``gen_ai.client.operation.duration``
218+
for the full stream lifetime (with ``error.type`` on exception)
219+
and ``gen_ai.client.token.usage`` after stream completion using
220+
the ``UsageInfo`` carried on the terminal SSE chunk (when the
221+
provider returns one — controlled by ``include_usage_in_stream``,
222+
defaults to True for OpenAI-compatible engines). No token
223+
observation is emitted on early consumer cancellation or on
224+
provider error mid-stream.
225+
174226
Raises:
175227
KeyError: If no engine is registered with the given name.
176228
TypeError: If the named engine is not a ModelEngine.
@@ -179,9 +231,53 @@ async def stream_model_call(
179231
log.debug("[%s] Model engine '%s' stream messages: %s", req_id, model_type, truncate(messages))
180232

181233
engine = self._get_engine(model_type, ModelEngine)
182-
with llm_call_span(self._tracer, engine.model_name, engine.model_config.engine or "unknown"):
183-
async for chunk in engine.stream_chat_completion(messages, **kwargs):
184-
yield chunk
234+
# TODO: Change to LLMModel.provider_name after refactor
235+
provider_name = engine.model_config.engine or "unknown"
236+
operation_name = "chat"
237+
238+
# Capture the most recent chunk's ``usage`` field so we can emit
239+
# token metrics after the stream completes — providers (e.g.
240+
# OpenAI-compatible) only populate ``usage`` on the terminal
241+
# chunk when ``stream_options.include_usage=true``.
242+
captured_usage: Optional["UsageInfo"] = None
243+
duration_ctx = (
244+
llm_operation_duration(engine.model_name, provider_name, operation_name)
245+
if self._metrics_enabled
246+
else nullcontext()
247+
)
248+
with llm_call_span(self._tracer, engine.model_name, provider_name, operation_name):
249+
with duration_ctx:
250+
# Gate timing-state setup on ``_metrics_enabled`` so the
251+
# cold path skips ``time.monotonic()`` and the per-chunk
252+
# bookkeeping entirely. ``t0`` defaults to ``0.0`` in
253+
# the disabled path so the type stays a plain ``float``
254+
# — it's never read in that branch.
255+
t0 = time.monotonic() if self._metrics_enabled else 0.0
256+
last_chunk_time: Optional[float] = None
257+
async for chunk in engine.stream_chat_completion(messages, **kwargs):
258+
if self._metrics_enabled:
259+
# Per OTEL semconv, "first chunk" / "output chunk"
260+
# mean content-bearing chunks — gate on
261+
# ``delta_content`` / ``delta_reasoning`` to skip
262+
# the terminal usage frame and any other cosmetic
263+
# SSE events that the parser leaves in place.
264+
if chunk.delta_content or chunk.delta_reasoning:
265+
now = time.monotonic()
266+
if last_chunk_time is None:
267+
record_time_to_first_chunk(engine.model_name, provider_name, operation_name, now - t0)
268+
else:
269+
record_time_per_output_chunk(
270+
engine.model_name, provider_name, operation_name, now - last_chunk_time
271+
)
272+
last_chunk_time = now
273+
if chunk.usage is not None:
274+
captured_usage = chunk.usage
275+
yield chunk
276+
277+
# Reached only on natural exhaustion (not on consumer cancellation
278+
# or provider error — those raise out of the ``with`` blocks above).
279+
if self._metrics_enabled:
280+
record_token_usage(engine.model_name, provider_name, operation_name, captured_usage)
185281

186282
async def api_call(self, api_name: str, message: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
187283
"""Route an API request to the named API engine.

nemoguardrails/guardrails/iorails.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,12 @@ def __init__(self, config: RailsConfig) -> None:
9393
self._tracer = get_tracer() if self._tracing_enabled else None
9494
self._metrics_enabled = are_metrics_enabled(config.metrics)
9595

96-
self.engine_registry = EngineRegistry(config.models, config.rails.config, tracer=self._tracer)
96+
self.engine_registry = EngineRegistry(
97+
config.models,
98+
config.rails.config,
99+
tracer=self._tracer,
100+
metrics_enabled=self._metrics_enabled,
101+
)
97102
self.rails_manager = RailsManager(
98103
engine_registry=self.engine_registry,
99104
task_manager=LLMTaskManager(config),

nemoguardrails/guardrails/model_engine.py

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -116,28 +116,38 @@ def _parse_chat_completion(response: dict) -> LLMResponse:
116116
def _parse_chat_completion_chunk(chunk: dict) -> Optional[LLMResponseChunk]:
117117
"""Build an LLMResponseChunk from an SSE chunk dict.
118118
119-
Returns None for chunks that carry no content or reasoning delta —
120-
role-only first events, finish-only events, or empty-choices events
121-
are skipped, preserving current stream_call behavior.
119+
Returns None for chunks without one of: content delta, reasoning delta,
120+
or a usage payload.
121+
Role-only first events and finish-only events with empty deltas
122+
map to None.
123+
124+
Last chunk from OpenAI-compatible providers has a ``usage`` field when
125+
``stream_options.include_usage=true``. This is passed through to capture
126+
the token usage metadata.
122127
"""
123128
choices = chunk.get("choices") or []
124-
if not choices:
125-
return None
126-
127-
choice = choices[0]
128-
delta = choice.get("delta") or {}
129-
delta_content = delta.get("content")
130-
delta_reasoning = delta.get("reasoning_content") or None
131-
132-
if not delta_content and not delta_reasoning:
129+
usage_dict = chunk.get("usage")
130+
131+
delta_content: Optional[str] = None
132+
delta_reasoning: Optional[str] = None
133+
finish_reason = None
134+
if choices:
135+
choice = choices[0]
136+
delta = choice.get("delta") or {}
137+
delta_content = delta.get("content")
138+
delta_reasoning = delta.get("reasoning_content") or None
139+
finish_reason = choice.get("finish_reason")
140+
141+
if not delta_content and not delta_reasoning and not usage_dict:
133142
return None
134143

135144
return LLMResponseChunk(
136145
delta_content=delta_content,
137146
delta_reasoning=delta_reasoning,
138147
model=chunk.get("model"),
139-
finish_reason=choice.get("finish_reason"),
148+
finish_reason=finish_reason,
140149
request_id=chunk.get("id"),
150+
usage=_parse_usage(usage_dict) if usage_dict else None,
141151
)
142152

143153

@@ -315,17 +325,28 @@ async def stream_call(
315325
"""Make a streaming POST request to the /v1/chat/completions endpoint.
316326
317327
Sends ``stream=True`` and yields one ``LLMResponseChunk`` per SSE
318-
event that carries a content or reasoning delta. Role-only,
319-
finish-only, and empty-choices events are skipped. Retries are
320-
handled by the RetryClient (same as ``call()``).
328+
event that carries a content delta, reasoning delta, OR a
329+
``usage`` payload. Role-only, finish-only, and empty-choices
330+
events without usage are skipped. Retries are handled by the
331+
RetryClient (same as ``call()``).
332+
333+
Note: when the upstream payload includes
334+
``stream_options.include_usage=true`` (default for the
335+
OpenAI-compatible client), the provider sends a final
336+
usage-only chunk with empty ``choices`` after the last content
337+
chunk. That terminal chunk is yielded as
338+
``LLMResponseChunk(usage=...)`` with both ``delta_content``
339+
and ``delta_reasoning`` unset — callers that only care about
340+
content should gate on ``chunk.delta_content`` rather than
341+
assuming every yielded chunk carries one.
321342
322343
Args:
323344
messages: List of message dicts in OpenAI format.
324345
**kwargs: Additional parameters for the request body (temperature, max_tokens, etc.)
325346
326347
Yields:
327-
``LLMResponseChunk`` objects with ``delta_content`` and/or
328-
``delta_reasoning`` populated.
348+
``LLMResponseChunk`` objects with ``delta_content``,
349+
``delta_reasoning``, and/or ``usage`` populated.
329350
330351
Raises:
331352
ModelEngineError: If the request fails after all retries.
@@ -413,7 +434,9 @@ async def stream_chat_completion(
413434
) -> AsyncGenerator[LLMResponseChunk, None]:
414435
"""Stream a chat completion and yield ``LLMResponseChunk`` objects.
415436
416-
Thin pass-through over ``stream_call``.
437+
Thin pass-through over ``stream_call`` — see that method's
438+
docstring for the contract, including the terminal usage-only
439+
chunk emitted when ``stream_options.include_usage`` is on.
417440
418441
Raises:
419442
ModelEngineError: If the request fails after all retries.

0 commit comments

Comments
 (0)