Skip to content

Commit 80c64db

Browse files
committed
feat: add Responses API inference metrics
Record inference duration and outcome (success/failure) for both streaming and non-streaming Responses API paths. Metric is recorded at the terminal-event boundary before post-processing to prevent spurious failure metrics when post-processing raises after a successful LLM inference. Guarded with inference_metric_recorded flag to avoid double-recording. Signed-off-by: Major Hayden <major@redhat.com>
1 parent f7b927a commit 80c64db

4 files changed

Lines changed: 268 additions & 61 deletions

File tree

src/app/endpoints/responses.py

Lines changed: 149 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import asyncio
66
import json
7+
import time
78
from collections.abc import AsyncIterator, Sequence
89
from datetime import UTC, datetime
910
from typing import Annotated, Any, Final, NoReturn, Optional, cast
@@ -39,6 +40,7 @@
3940
from configuration import configuration
4041
from constants import ENDPOINT_PATH_RESPONSES, SUBSTITUTED_INSTRUCTIONS_PLACEHOLDER
4142
from log import get_logger
43+
from metrics import recording
4244
from models.api.requests import ResponsesRequest
4345
from models.api.responses.constants import UNAUTHORIZED_OPENAPI_EXAMPLES_WITH_MCP_OAUTH
4446
from models.api.responses.error import (
@@ -630,6 +632,34 @@ async def responses_endpoint_handler(
630632
)
631633

632634

635+
def _record_response_inference_result(
636+
model_id: str,
637+
endpoint_path: str,
638+
result: str,
639+
duration: float,
640+
record_failure: bool = False,
641+
) -> None:
642+
"""Record inference result metrics for a Responses API call.
643+
644+
Extracts the provider and model from the composite model identifier and
645+
records the inference duration histogram. Optionally records a failure
646+
counter increment.
647+
648+
Args:
649+
model_id: Composite model identifier in ``provider/model`` format.
650+
endpoint_path: API endpoint path for metric labeling.
651+
result: Result label such as ``success`` or ``failure``.
652+
duration: Inference call duration in seconds.
653+
record_failure: When True, also increment the LLM failure counter.
654+
"""
655+
provider, model = extract_provider_and_model_from_model_id(model_id)
656+
if record_failure:
657+
recording.record_llm_failure(provider, model, endpoint_path)
658+
recording.record_llm_inference_duration(
659+
provider, model, endpoint_path, result, duration
660+
)
661+
662+
633663
async def handle_streaming_response(
634664
original_request: ResponsesRequest,
635665
api_params: ResponsesApiParams,
@@ -658,6 +688,7 @@ async def handle_streaming_response(
658688
context.moderation_result.message,
659689
)
660690
else:
691+
inference_start_time = time.monotonic()
661692
try:
662693
response = await context.client.responses.create(
663694
**api_params.model_dump(exclude_none=True)
@@ -668,13 +699,21 @@ async def handle_streaming_response(
668699
api_params=api_params,
669700
context=context,
670701
turn_summary=turn_summary,
702+
inference_start_time=inference_start_time,
671703
)
672704
except (
673705
RuntimeError,
674706
APIConnectionError,
675707
LLSApiStatusError,
676708
OpenAIAPIStatusError,
677709
) as e:
710+
_record_response_inference_result(
711+
api_params.model,
712+
context.endpoint_path,
713+
recording.LLM_INFERENCE_RESULT_FAILURE,
714+
time.monotonic() - inference_start_time,
715+
record_failure=True,
716+
)
678717
_raise_response_api_http_exception(e, api_params, context)
679718

680719
return StreamingResponse(
@@ -928,6 +967,7 @@ async def response_generator(
928967
api_params: ResponsesApiParams,
929968
context: ResponsesContext,
930969
turn_summary: TurnSummary,
970+
inference_start_time: float,
931971
) -> AsyncIterator[str]:
932972
"""Generate SSE-formatted streaming response with LCORE-enriched events.
933973
@@ -937,6 +977,7 @@ async def response_generator(
937977
api_params: ResponsesApiParams
938978
context: Responses context
939979
turn_summary: TurnSummary to populate during streaming
980+
inference_start_time: Monotonic timestamp taken before the inference call.
940981
Yields:
941982
SSE-formatted strings for streaming events, ending with [DONE]
942983
"""
@@ -947,76 +988,107 @@ async def response_generator(
947988
configured_mcp_labels = {s.name for s in configuration.mcp_servers}
948989
# Track output indices of server-deployed MCP calls to filter their events
949990
server_mcp_output_indices: set[int] = set()
991+
inference_metric_recorded = False
950992

951-
async for chunk in stream:
952-
logger.debug("Processing streaming chunk, type: %s", chunk.type)
993+
try:
994+
async for chunk in stream:
995+
logger.debug("Processing streaming chunk, type: %s", chunk.type)
953996

954-
# Filter out streaming events for server-deployed MCP tools.
955-
# These are handled internally by LCS and should not be forwarded
956-
# to clients that don't understand the mcp_call item type.
957-
if _should_filter_mcp_chunk(
958-
chunk, configured_mcp_labels, server_mcp_output_indices
959-
):
960-
continue
997+
# Filter out streaming events for server-deployed MCP tools.
998+
# These are handled internally by LCS and should not be forwarded
999+
# to clients that don't understand the mcp_call item type.
1000+
if _should_filter_mcp_chunk(
1001+
chunk, configured_mcp_labels, server_mcp_output_indices
1002+
):
1003+
continue
9611004

962-
chunk_dict = chunk.model_dump(exclude_none=True, by_alias=True)
1005+
chunk_dict = chunk.model_dump(exclude_none=True, by_alias=True)
9631006

964-
# Create own sequence number for chunks to maintain order
965-
chunk_dict["sequence_number"] = sequence_number
966-
sequence_number += 1
1007+
# Create own sequence number for chunks to maintain order
1008+
chunk_dict["sequence_number"] = sequence_number
1009+
sequence_number += 1
9671010

968-
if "response" in chunk_dict:
969-
chunk_dict["response"]["conversation"] = normalize_conversation_id(
970-
api_params.conversation
971-
)
972-
_sanitize_response_dict(
973-
chunk_dict["response"],
974-
configured_mcp_labels,
975-
original_request,
976-
)
977-
tools = chunk_dict["response"].get("tools")
978-
if tools is not None:
979-
chunk_dict["response"]["tools"] = (
980-
translate_vector_store_ids_to_user_facing(
981-
tools,
982-
configuration.rag_id_mapping,
1011+
if "response" in chunk_dict:
1012+
chunk_dict["response"]["conversation"] = normalize_conversation_id(
1013+
api_params.conversation
1014+
)
1015+
_sanitize_response_dict(
1016+
chunk_dict["response"],
1017+
configured_mcp_labels,
1018+
original_request,
1019+
)
1020+
tools = chunk_dict["response"].get("tools")
1021+
if tools is not None:
1022+
chunk_dict["response"]["tools"] = (
1023+
translate_vector_store_ids_to_user_facing(
1024+
tools,
1025+
configuration.rag_id_mapping,
1026+
)
9831027
)
1028+
# Intermediate response - no quota consumption and text yet
1029+
if chunk.type == "response.in_progress":
1030+
chunk_dict["response"]["available_quotas"] = {}
1031+
chunk_dict["response"]["output_text"] = ""
1032+
1033+
# Handle completion, incomplete, and failed events
1034+
if chunk.type in (
1035+
"response.completed",
1036+
"response.incomplete",
1037+
"response.failed",
1038+
):
1039+
latest_response_object = cast(
1040+
OpenAIResponseObject, cast(Any, chunk).response
9841041
)
985-
# Intermediate response - no quota consumption and text yet
986-
if chunk.type == "response.in_progress":
987-
chunk_dict["response"]["available_quotas"] = {}
988-
chunk_dict["response"]["output_text"] = ""
989-
990-
# Handle completion, incomplete, and failed events - only quota handling here
991-
if chunk.type in (
992-
"response.completed",
993-
"response.incomplete",
994-
"response.failed",
995-
):
996-
latest_response_object = cast(
997-
OpenAIResponseObject, cast(Any, chunk).response
998-
)
9991042

1000-
# Extract and consume tokens if any were used
1001-
turn_summary.token_usage = extract_token_usage(
1002-
latest_response_object.usage, api_params.model, context.endpoint_path
1003-
)
1004-
consume_query_tokens(
1005-
user_id=context.auth[0],
1006-
model_id=api_params.model,
1007-
token_usage=turn_summary.token_usage,
1008-
)
1043+
# Record inference duration metric at the terminal-event
1044+
# boundary, before post-processing that could raise.
1045+
result = (
1046+
recording.LLM_INFERENCE_RESULT_FAILURE
1047+
if chunk.type == "response.failed"
1048+
else recording.LLM_INFERENCE_RESULT_SUCCESS
1049+
)
1050+
_record_response_inference_result(
1051+
api_params.model,
1052+
context.endpoint_path,
1053+
result,
1054+
time.monotonic() - inference_start_time,
1055+
record_failure=(result == recording.LLM_INFERENCE_RESULT_FAILURE),
1056+
)
1057+
inference_metric_recorded = True
10091058

1010-
# Get available quotas after token consumption
1011-
chunk_dict["response"]["available_quotas"] = get_available_quotas(
1012-
quota_limiters=configuration.quota_limiters, user_id=context.auth[0]
1013-
)
1014-
turn_summary.llm_response = extract_text_from_response_items(
1015-
latest_response_object.output
1016-
)
1017-
chunk_dict["response"]["output_text"] = turn_summary.llm_response
1059+
# Extract and consume tokens if any were used
1060+
turn_summary.token_usage = extract_token_usage(
1061+
latest_response_object.usage,
1062+
api_params.model,
1063+
context.endpoint_path,
1064+
)
1065+
consume_query_tokens(
1066+
user_id=context.auth[0],
1067+
model_id=api_params.model,
1068+
token_usage=turn_summary.token_usage,
1069+
)
10181070

1019-
yield f"event: {chunk.type or 'error'}\ndata: {json.dumps(chunk_dict)}\n\n"
1071+
# Get available quotas after token consumption
1072+
chunk_dict["response"]["available_quotas"] = get_available_quotas(
1073+
quota_limiters=configuration.quota_limiters,
1074+
user_id=context.auth[0],
1075+
)
1076+
turn_summary.llm_response = extract_text_from_response_items(
1077+
latest_response_object.output
1078+
)
1079+
chunk_dict["response"]["output_text"] = turn_summary.llm_response
1080+
1081+
yield f"event: {chunk.type or 'error'}\ndata: {json.dumps(chunk_dict)}\n\n"
1082+
except Exception:
1083+
if not inference_metric_recorded:
1084+
_record_response_inference_result(
1085+
api_params.model,
1086+
context.endpoint_path,
1087+
recording.LLM_INFERENCE_RESULT_FAILURE,
1088+
time.monotonic() - inference_start_time,
1089+
record_failure=True,
1090+
)
1091+
raise
10201092

10211093
# Extract response metadata from final response object
10221094
if latest_response_object:
@@ -1108,13 +1180,22 @@ async def handle_non_streaming_response(
11081180
await _persist_blocked_response_turn(api_params, context)
11091181
_queue_blocked_response_event(api_params, context, output_text)
11101182
else:
1183+
inference_start_time = time.monotonic()
1184+
inference_metric_recorded = False
11111185
try:
11121186
api_response = cast(
11131187
OpenAIResponseObject,
11141188
await context.client.responses.create(
11151189
**api_params.model_dump(exclude_none=True)
11161190
),
11171191
)
1192+
_record_response_inference_result(
1193+
api_params.model,
1194+
context.endpoint_path,
1195+
recording.LLM_INFERENCE_RESULT_SUCCESS,
1196+
time.monotonic() - inference_start_time,
1197+
)
1198+
inference_metric_recorded = True
11181199
token_usage = extract_token_usage(
11191200
api_response.usage, api_params.model, context.endpoint_path
11201201
)
@@ -1138,6 +1219,14 @@ async def handle_non_streaming_response(
11381219
LLSApiStatusError,
11391220
OpenAIAPIStatusError,
11401221
) as e:
1222+
if not inference_metric_recorded:
1223+
_record_response_inference_result(
1224+
api_params.model,
1225+
context.endpoint_path,
1226+
recording.LLM_INFERENCE_RESULT_FAILURE,
1227+
time.monotonic() - inference_start_time,
1228+
record_failure=True,
1229+
)
11411230
_raise_response_api_http_exception(e, api_params, context)
11421231

11431232
# Get available quotas

src/metrics/recording.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from collections.abc import Iterator
99
from contextlib import contextmanager
10+
from typing import Final
1011

1112
import metrics
1213
from log import get_logger
@@ -111,6 +112,32 @@ def record_llm_token_usage(
111112
logger.warning("Failed to update token metrics", exc_info=True)
112113

113114

115+
LLM_INFERENCE_RESULT_SUCCESS: Final[str] = "success"
116+
LLM_INFERENCE_RESULT_FAILURE: Final[str] = "failure"
117+
ALLOWED_LLM_INFERENCE_RESULTS: Final[frozenset[str]] = frozenset(
118+
{LLM_INFERENCE_RESULT_SUCCESS, LLM_INFERENCE_RESULT_FAILURE}
119+
)
120+
121+
122+
def normalize_llm_inference_result(result: str) -> str:
123+
"""Clamp an inference result string to the bounded label set.
124+
125+
Unknown or unexpected values are mapped to ``failure`` so that the
126+
Prometheus label cardinality stays bounded.
127+
128+
Args:
129+
result: Raw result label from the caller.
130+
131+
Returns:
132+
A value guaranteed to be in ``ALLOWED_LLM_INFERENCE_RESULTS``.
133+
"""
134+
return (
135+
result
136+
if result in ALLOWED_LLM_INFERENCE_RESULTS
137+
else LLM_INFERENCE_RESULT_FAILURE
138+
)
139+
140+
114141
def record_llm_inference_duration(
115142
provider: str, model: str, endpoint_path: str, result: str, duration: float
116143
) -> None:
@@ -123,9 +150,10 @@ def record_llm_inference_duration(
123150
result: Bounded result label, such as ``success`` or ``failure``.
124151
duration: Inference call duration in seconds.
125152
"""
153+
bounded_result = normalize_llm_inference_result(result)
126154
try:
127155
metrics.llm_inference_duration_seconds.labels(
128-
provider, model, endpoint_path, result
156+
provider, model, endpoint_path, bounded_result
129157
).observe(duration)
130158
except (AttributeError, TypeError, ValueError):
131159
logger.warning("Failed to update LLM inference duration metric", exc_info=True)

0 commit comments

Comments
 (0)