44
55import asyncio
66import json
7+ import time
78from collections .abc import AsyncIterator , Sequence
89from datetime import UTC , datetime
910from typing import Annotated , Any , Final , NoReturn , Optional , cast
3940from configuration import configuration
4041from constants import ENDPOINT_PATH_RESPONSES , SUBSTITUTED_INSTRUCTIONS_PLACEHOLDER
4142from log import get_logger
43+ from metrics import recording
4244from models .api .responses .constants import UNAUTHORIZED_OPENAPI_EXAMPLES_WITH_MCP_OAUTH
4345from models .api .responses .error import (
4446 ConflictResponse ,
@@ -630,6 +632,34 @@ async def responses_endpoint_handler(
630632 )
631633
632634
635+ def _record_response_inference_result (
636+ model_id : str ,
637+ endpoint_path : str ,
638+ result : str ,
639+ duration : float ,
640+ record_failure : bool = False ,
641+ ) -> None :
642+ """Record inference result metrics for a Responses API call.
643+
644+ Extracts the provider and model from the composite model identifier and
645+ records the inference duration histogram. Optionally records a failure
646+ counter increment.
647+
648+ Args:
649+ model_id: Composite model identifier in ``provider/model`` format.
650+ endpoint_path: API endpoint path for metric labeling.
651+ result: Result label such as ``success`` or ``failure``.
652+ duration: Inference call duration in seconds.
653+ record_failure: When True, also increment the LLM failure counter.
654+ """
655+ provider , model = extract_provider_and_model_from_model_id (model_id )
656+ if record_failure :
657+ recording .record_llm_failure (provider , model , endpoint_path )
658+ recording .record_llm_inference_duration (
659+ provider , model , endpoint_path , result , duration
660+ )
661+
662+
633663async def handle_streaming_response (
634664 original_request : ResponsesRequest ,
635665 api_params : ResponsesApiParams ,
@@ -658,6 +688,7 @@ async def handle_streaming_response(
658688 context .moderation_result .message ,
659689 )
660690 else :
691+ inference_start_time = time .monotonic ()
661692 try :
662693 response = await context .client .responses .create (
663694 ** api_params .model_dump (exclude_none = True )
@@ -668,13 +699,21 @@ async def handle_streaming_response(
668699 api_params = api_params ,
669700 context = context ,
670701 turn_summary = turn_summary ,
702+ inference_start_time = inference_start_time ,
671703 )
672704 except (
673705 RuntimeError ,
674706 APIConnectionError ,
675707 LLSApiStatusError ,
676708 OpenAIAPIStatusError ,
677709 ) as e :
710+ _record_response_inference_result (
711+ api_params .model ,
712+ context .endpoint_path ,
713+ "failure" ,
714+ time .monotonic () - inference_start_time ,
715+ record_failure = True ,
716+ )
678717 _raise_response_api_http_exception (e , api_params , context )
679718
680719 return StreamingResponse (
@@ -928,6 +967,7 @@ async def response_generator(
928967 api_params : ResponsesApiParams ,
929968 context : ResponsesContext ,
930969 turn_summary : TurnSummary ,
970+ inference_start_time : float ,
931971) -> AsyncIterator [str ]:
932972 """Generate SSE-formatted streaming response with LCORE-enriched events.
933973
@@ -937,6 +977,7 @@ async def response_generator(
937977 api_params: ResponsesApiParams
938978 context: Responses context
939979 turn_summary: TurnSummary to populate during streaming
980+ inference_start_time: Monotonic timestamp taken before the inference call.
940981 Yields:
941982 SSE-formatted strings for streaming events, ending with [DONE]
942983 """
@@ -947,76 +988,102 @@ async def response_generator(
947988 configured_mcp_labels = {s .name for s in configuration .mcp_servers }
948989 # Track output indices of server-deployed MCP calls to filter their events
949990 server_mcp_output_indices : set [int ] = set ()
991+ inference_metric_recorded = False
950992
951- async for chunk in stream :
952- logger .debug ("Processing streaming chunk, type: %s" , chunk .type )
993+ try :
994+ async for chunk in stream :
995+ logger .debug ("Processing streaming chunk, type: %s" , chunk .type )
953996
954- # Filter out streaming events for server-deployed MCP tools.
955- # These are handled internally by LCS and should not be forwarded
956- # to clients that don't understand the mcp_call item type.
957- if _should_filter_mcp_chunk (
958- chunk , configured_mcp_labels , server_mcp_output_indices
959- ):
960- continue
997+ # Filter out streaming events for server-deployed MCP tools.
998+ # These are handled internally by LCS and should not be forwarded
999+ # to clients that don't understand the mcp_call item type.
1000+ if _should_filter_mcp_chunk (
1001+ chunk , configured_mcp_labels , server_mcp_output_indices
1002+ ):
1003+ continue
9611004
962- chunk_dict = chunk .model_dump (exclude_none = True , by_alias = True )
1005+ chunk_dict = chunk .model_dump (exclude_none = True , by_alias = True )
9631006
964- # Create own sequence number for chunks to maintain order
965- chunk_dict ["sequence_number" ] = sequence_number
966- sequence_number += 1
1007+ # Create own sequence number for chunks to maintain order
1008+ chunk_dict ["sequence_number" ] = sequence_number
1009+ sequence_number += 1
9671010
968- if "response" in chunk_dict :
969- chunk_dict ["response" ]["conversation" ] = normalize_conversation_id (
970- api_params .conversation
971- )
972- _sanitize_response_dict (
973- chunk_dict ["response" ],
974- configured_mcp_labels ,
975- original_request ,
976- )
977- tools = chunk_dict ["response" ].get ("tools" )
978- if tools is not None :
979- chunk_dict ["response" ]["tools" ] = (
980- translate_vector_store_ids_to_user_facing (
981- tools ,
982- configuration .rag_id_mapping ,
1011+ if "response" in chunk_dict :
1012+ chunk_dict ["response" ]["conversation" ] = normalize_conversation_id (
1013+ api_params .conversation
1014+ )
1015+ _sanitize_response_dict (
1016+ chunk_dict ["response" ],
1017+ configured_mcp_labels ,
1018+ original_request ,
1019+ )
1020+ tools = chunk_dict ["response" ].get ("tools" )
1021+ if tools is not None :
1022+ chunk_dict ["response" ]["tools" ] = (
1023+ translate_vector_store_ids_to_user_facing (
1024+ tools ,
1025+ configuration .rag_id_mapping ,
1026+ )
9831027 )
1028+ # Intermediate response - no quota consumption and text yet
1029+ if chunk .type == "response.in_progress" :
1030+ chunk_dict ["response" ]["available_quotas" ] = {}
1031+ chunk_dict ["response" ]["output_text" ] = ""
1032+
1033+ # Handle completion, incomplete, and failed events
1034+ if chunk .type in (
1035+ "response.completed" ,
1036+ "response.incomplete" ,
1037+ "response.failed" ,
1038+ ):
1039+ latest_response_object = cast (
1040+ OpenAIResponseObject , cast (Any , chunk ).response
9841041 )
985- # Intermediate response - no quota consumption and text yet
986- if chunk .type == "response.in_progress" :
987- chunk_dict ["response" ]["available_quotas" ] = {}
988- chunk_dict ["response" ]["output_text" ] = ""
989-
990- # Handle completion, incomplete, and failed events - only quota handling here
991- if chunk .type in (
992- "response.completed" ,
993- "response.incomplete" ,
994- "response.failed" ,
995- ):
996- latest_response_object = cast (
997- OpenAIResponseObject , cast (Any , chunk ).response
998- )
9991042
1000- # Extract and consume tokens if any were used
1001- turn_summary .token_usage = extract_token_usage (
1002- latest_response_object .usage , api_params .model , context .endpoint_path
1003- )
1004- consume_query_tokens (
1005- user_id = context .auth [0 ],
1006- model_id = api_params .model ,
1007- token_usage = turn_summary .token_usage ,
1008- )
1043+ # Extract and consume tokens if any were used
1044+ turn_summary .token_usage = extract_token_usage (
1045+ latest_response_object .usage ,
1046+ api_params .model ,
1047+ context .endpoint_path ,
1048+ )
1049+ consume_query_tokens (
1050+ user_id = context .auth [0 ],
1051+ model_id = api_params .model ,
1052+ token_usage = turn_summary .token_usage ,
1053+ )
10091054
1010- # Get available quotas after token consumption
1011- chunk_dict ["response" ]["available_quotas" ] = get_available_quotas (
1012- quota_limiters = configuration .quota_limiters , user_id = context .auth [0 ]
1013- )
1014- turn_summary .llm_response = extract_text_from_response_items (
1015- latest_response_object .output
1055+ # Get available quotas after token consumption
1056+ chunk_dict ["response" ]["available_quotas" ] = get_available_quotas (
1057+ quota_limiters = configuration .quota_limiters ,
1058+ user_id = context .auth [0 ],
1059+ )
1060+ turn_summary .llm_response = extract_text_from_response_items (
1061+ latest_response_object .output
1062+ )
1063+ chunk_dict ["response" ]["output_text" ] = turn_summary .llm_response
1064+
1065+ # Record inference duration metric for terminal events
1066+ result = "failure" if chunk .type == "response.failed" else "success"
1067+ _record_response_inference_result (
1068+ api_params .model ,
1069+ context .endpoint_path ,
1070+ result ,
1071+ time .monotonic () - inference_start_time ,
1072+ record_failure = (result == "failure" ),
1073+ )
1074+ inference_metric_recorded = True
1075+
1076+ yield f"event: { chunk .type or 'error' } \n data: { json .dumps (chunk_dict )} \n \n "
1077+ except Exception :
1078+ if not inference_metric_recorded :
1079+ _record_response_inference_result (
1080+ api_params .model ,
1081+ context .endpoint_path ,
1082+ "failure" ,
1083+ time .monotonic () - inference_start_time ,
1084+ record_failure = True ,
10161085 )
1017- chunk_dict ["response" ]["output_text" ] = turn_summary .llm_response
1018-
1019- yield f"event: { chunk .type or 'error' } \n data: { json .dumps (chunk_dict )} \n \n "
1086+ raise
10201087
10211088 # Extract response metadata from final response object
10221089 if latest_response_object :
@@ -1108,13 +1175,22 @@ async def handle_non_streaming_response(
11081175 await _persist_blocked_response_turn (api_params , context )
11091176 _queue_blocked_response_event (api_params , context , output_text )
11101177 else :
1178+ inference_start_time = time .monotonic ()
1179+ inference_metric_recorded = False
11111180 try :
11121181 api_response = cast (
11131182 OpenAIResponseObject ,
11141183 await context .client .responses .create (
11151184 ** api_params .model_dump (exclude_none = True )
11161185 ),
11171186 )
1187+ _record_response_inference_result (
1188+ api_params .model ,
1189+ context .endpoint_path ,
1190+ "success" ,
1191+ time .monotonic () - inference_start_time ,
1192+ )
1193+ inference_metric_recorded = True
11181194 token_usage = extract_token_usage (
11191195 api_response .usage , api_params .model , context .endpoint_path
11201196 )
@@ -1138,6 +1214,14 @@ async def handle_non_streaming_response(
11381214 LLSApiStatusError ,
11391215 OpenAIAPIStatusError ,
11401216 ) as e :
1217+ if not inference_metric_recorded :
1218+ _record_response_inference_result (
1219+ api_params .model ,
1220+ context .endpoint_path ,
1221+ "failure" ,
1222+ time .monotonic () - inference_start_time ,
1223+ record_failure = True ,
1224+ )
11411225 _raise_response_api_http_exception (e , api_params , context )
11421226
11431227 # Get available quotas
0 commit comments