Skip to content

Commit 44af572

Browse files
committed
feat(responses): add Splunk HEC telemetry to responses endpoint
Add telemetry events for responses endpoint covering completed, shield-blocked, and error paths. Error paths use asyncio.create_task() instead of BackgroundTasks since FastAPI discards background tasks on non-2xx responses. RSPEED-2867 Signed-off-by: Major Hayden <major@redhat.com>
1 parent 2b947a9 commit 44af572

4 files changed

Lines changed: 827 additions & 10 deletions

File tree

src/app/endpoints/responses.py

Lines changed: 204 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
# pylint: disable=too-many-locals,too-many-branches,too-many-nested-blocks, too-many-arguments,too-many-positional-arguments
1+
# pylint: disable=too-many-locals,too-many-branches,too-many-nested-blocks,too-many-arguments,too-many-positional-arguments,too-many-lines,too-many-statements
22

33
"""Handler for REST API call to provide answer using Responses API (LCORE specification)."""
44

5+
import asyncio
56
import json
67
from collections.abc import AsyncIterator
78
from datetime import UTC, datetime
89
from typing import Annotated, Any, Optional, cast
910

10-
from fastapi import APIRouter, Depends, HTTPException, Request
11+
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request
1112
from fastapi.responses import StreamingResponse
1213
from llama_stack_api import (
1314
OpenAIResponseObject,
@@ -53,6 +54,7 @@
5354
UnauthorizedResponse,
5455
UnprocessableEntityResponse,
5556
)
57+
from observability import ResponsesEventData, build_responses_event, send_splunk_event
5658
from utils.conversations import append_turn_items_to_conversation
5759
from utils.endpoints import (
5860
check_configuration_loaded,
@@ -89,6 +91,7 @@
8991
resolve_tool_choice,
9092
select_model_for_responses,
9193
)
94+
from utils.rh_identity import get_rh_identity_context
9295
from utils.shields import run_shield_moderation
9396
from utils.suid import (
9497
normalize_conversation_id,
@@ -132,6 +135,60 @@
132135
}
133136

134137

138+
def _queue_responses_splunk_event( # pylint: disable=too-many-arguments,too-many-positional-arguments
139+
background_tasks: Optional[BackgroundTasks],
140+
input_text: str,
141+
response_text: str,
142+
conversation_id: str,
143+
model: str,
144+
rh_identity_context: tuple[str, str],
145+
inference_time: float,
146+
sourcetype: str,
147+
input_tokens: int = 0,
148+
output_tokens: int = 0,
149+
fire_and_forget: bool = False,
150+
) -> None:
151+
"""Build and queue a Splunk telemetry event for the responses endpoint.
152+
153+
No-op when background_tasks is None and fire_and_forget is False
154+
(Splunk telemetry disabled).
155+
156+
Args:
157+
background_tasks: FastAPI background task manager, or None if disabled.
158+
input_text: User input text.
159+
response_text: Response text from LLM or shield.
160+
conversation_id: Conversation identifier.
161+
model: Model name used for inference.
162+
rh_identity_context: Tuple of (org_id, system_id) from RH identity.
163+
inference_time: Request processing duration in seconds.
164+
sourcetype: Splunk sourcetype for the event.
165+
input_tokens: Number of prompt tokens consumed.
166+
output_tokens: Number of completion tokens produced.
167+
fire_and_forget: When True, dispatch via asyncio.create_task() instead
168+
of background_tasks. Use for error paths where an HTTPException
169+
follows, since FastAPI discards BackgroundTasks on non-2xx responses.
170+
"""
171+
if not fire_and_forget and background_tasks is None:
172+
return
173+
org_id, system_id = rh_identity_context
174+
event_data = ResponsesEventData(
175+
input_text=input_text,
176+
response_text=response_text,
177+
conversation_id=conversation_id,
178+
model=model,
179+
org_id=org_id,
180+
system_id=system_id,
181+
inference_time=inference_time,
182+
input_tokens=input_tokens,
183+
output_tokens=output_tokens,
184+
)
185+
event = build_responses_event(event_data)
186+
if fire_and_forget:
187+
asyncio.create_task(send_splunk_event(event, sourcetype))
188+
elif background_tasks is not None:
189+
background_tasks.add_task(send_splunk_event, event, sourcetype)
190+
191+
135192
@router.post(
136193
"/responses",
137194
responses=responses_response,
@@ -144,6 +201,7 @@ async def responses_endpoint_handler(
144201
responses_request: ResponsesRequest,
145202
auth: Annotated[AuthTuple, Depends(get_auth_dependency())],
146203
mcp_headers: dict[str, dict[str, str]] = Depends(mcp_headers_dependency),
204+
background_tasks: BackgroundTasks = cast(BackgroundTasks, None),
147205
) -> ResponsesResponse | StreamingResponse:
148206
"""
149207
Handle request to the /responses endpoint using Responses API (LCORE specification).
@@ -187,6 +245,7 @@ async def responses_endpoint_handler(
187245
)
188246
instructions_substituted = client_instructions is None
189247
started_at = datetime.now(UTC)
248+
rh_identity_context = get_rh_identity_context(request)
190249
user_id = auth[0]
191250

192251
await check_mcp_auth(configuration, mcp_headers)
@@ -307,6 +366,8 @@ async def responses_endpoint_handler(
307366
inline_rag_context=inline_rag_context,
308367
filter_server_tools=filter_server_tools,
309368
instructions_substituted=instructions_substituted,
369+
background_tasks=background_tasks,
370+
rh_identity_context=rh_identity_context,
310371
)
311372

312373

@@ -320,6 +381,8 @@ async def handle_streaming_response(
320381
inline_rag_context: RAGContext,
321382
filter_server_tools: bool = False,
322383
instructions_substituted: bool = False,
384+
background_tasks: BackgroundTasks | None = None,
385+
rh_identity_context: tuple[str, str] = ("", ""),
323386
) -> StreamingResponse:
324387
"""Handle streaming response from Responses API.
325388
@@ -333,6 +396,8 @@ async def handle_streaming_response(
333396
inline_rag_context: Inline RAG context to be used for the response
334397
filter_server_tools: Whether to filter server-deployed MCP tool events from the stream
335398
instructions_substituted: Whether the server substituted the instructions
399+
background_tasks: FastAPI background task manager for telemetry events
400+
rh_identity_context: Tuple of (org_id, system_id) from RH identity
336401
Returns:
337402
StreamingResponse with SSE-formatted events
338403
"""
@@ -359,6 +424,16 @@ async def handle_streaming_response(
359424
user_input=request.input,
360425
llm_output=[moderation_result.refusal_response],
361426
)
427+
_queue_responses_splunk_event(
428+
background_tasks=background_tasks,
429+
input_text=input_text,
430+
response_text=moderation_result.message,
431+
conversation_id=normalize_conversation_id(api_params.conversation),
432+
model=api_params.model,
433+
rh_identity_context=rh_identity_context,
434+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
435+
sourcetype="responses_shield_blocked",
436+
)
362437
else:
363438
try:
364439
response = await client.responses.create(
@@ -376,16 +451,49 @@ async def handle_streaming_response(
376451
)
377452
except RuntimeError as e: # library mode wraps 413 into runtime error
378453
if is_context_length_error(str(e)):
454+
_queue_responses_splunk_event(
455+
background_tasks=background_tasks,
456+
input_text=input_text,
457+
response_text=str(e),
458+
conversation_id=normalize_conversation_id(api_params.conversation),
459+
model=api_params.model,
460+
rh_identity_context=rh_identity_context,
461+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
462+
sourcetype="responses_error",
463+
fire_and_forget=True,
464+
)
379465
error_response = PromptTooLongResponse(model=api_params.model)
380466
raise HTTPException(**error_response.model_dump()) from e
381467
raise e
382468
except APIConnectionError as e:
469+
_queue_responses_splunk_event(
470+
background_tasks=background_tasks,
471+
input_text=input_text,
472+
response_text=str(e),
473+
conversation_id=normalize_conversation_id(api_params.conversation),
474+
model=api_params.model,
475+
rh_identity_context=rh_identity_context,
476+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
477+
sourcetype="responses_error",
478+
fire_and_forget=True,
479+
)
383480
error_response = ServiceUnavailableResponse(
384481
backend_name="Llama Stack",
385482
cause=str(e),
386483
)
387484
raise HTTPException(**error_response.model_dump()) from e
388485
except (LLSApiStatusError, OpenAIAPIStatusError) as e:
486+
_queue_responses_splunk_event(
487+
background_tasks=background_tasks,
488+
input_text=input_text,
489+
response_text=str(e),
490+
conversation_id=normalize_conversation_id(api_params.conversation),
491+
model=api_params.model,
492+
rh_identity_context=rh_identity_context,
493+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
494+
sourcetype="responses_error",
495+
fire_and_forget=True,
496+
)
389497
error_response = handle_known_apistatus_errors(e, api_params.model)
390498
raise HTTPException(**error_response.model_dump()) from e
391499

@@ -399,6 +507,9 @@ async def handle_streaming_response(
399507
started_at=started_at,
400508
api_params=api_params,
401509
generate_topic_summary=request.generate_topic_summary or False,
510+
background_tasks=background_tasks,
511+
rh_identity_context=rh_identity_context,
512+
shield_blocked=(moderation_result.decision == "blocked"),
402513
),
403514
media_type="text/event-stream",
404515
)
@@ -794,6 +905,9 @@ async def generate_response(
794905
started_at: datetime,
795906
api_params: ResponsesApiParams,
796907
generate_topic_summary: bool,
908+
background_tasks: BackgroundTasks | None = None,
909+
rh_identity_context: tuple[str, str] = ("", ""),
910+
shield_blocked: bool = False,
797911
) -> AsyncIterator[str]:
798912
"""Stream the response from the generator and persist conversation details.
799913
@@ -808,6 +922,9 @@ async def generate_response(
808922
started_at: Timestamp when the conversation started
809923
api_params: ResponsesApiParams
810924
generate_topic_summary: Whether to generate topic summary for new conversations
925+
background_tasks: FastAPI background task manager for telemetry events
926+
rh_identity_context: Tuple of (org_id, system_id) from RH identity
927+
shield_blocked: Whether the request was blocked by a shield
811928
Yields:
812929
SSE-formatted strings from the generator
813930
"""
@@ -835,6 +952,25 @@ async def generate_response(
835952
skip_userid_check=skip_userid_check,
836953
topic_summary=topic_summary,
837954
)
955+
if not shield_blocked:
956+
_queue_responses_splunk_event(
957+
background_tasks=background_tasks,
958+
input_text=input_text,
959+
response_text=turn_summary.llm_response,
960+
conversation_id=normalize_conversation_id(api_params.conversation),
961+
model=api_params.model,
962+
rh_identity_context=rh_identity_context,
963+
inference_time=(completed_at - started_at).total_seconds(),
964+
sourcetype="responses_completed",
965+
input_tokens=(
966+
turn_summary.token_usage.input_tokens if turn_summary.token_usage else 0
967+
),
968+
output_tokens=(
969+
turn_summary.token_usage.output_tokens
970+
if turn_summary.token_usage
971+
else 0
972+
),
973+
)
838974

839975

840976
async def handle_non_streaming_response(
@@ -847,6 +983,8 @@ async def handle_non_streaming_response(
847983
inline_rag_context: RAGContext,
848984
filter_server_tools: bool = False,
849985
instructions_substituted: bool = False,
986+
background_tasks: BackgroundTasks | None = None,
987+
rh_identity_context: tuple[str, str] = ("", ""),
850988
) -> ResponsesResponse:
851989
"""Handle non-streaming response from Responses API.
852990
@@ -860,6 +998,8 @@ async def handle_non_streaming_response(
860998
inline_rag_context: Inline RAG context to be used for the response
861999
filter_server_tools: Whether to filter server-deployed MCP tool output
8621000
instructions_substituted: Whether the server substituted the instructions
1001+
background_tasks: FastAPI background task manager for telemetry events
1002+
rh_identity_context: Tuple of (org_id, system_id) from RH identity
8631003
Returns:
8641004
ResponsesResponse with the completed response
8651005
"""
@@ -884,6 +1024,16 @@ async def handle_non_streaming_response(
8841024
user_input=request.input,
8851025
llm_output=[moderation_result.refusal_response],
8861026
)
1027+
_queue_responses_splunk_event(
1028+
background_tasks=background_tasks,
1029+
input_text=input_text,
1030+
response_text=output_text,
1031+
conversation_id=normalize_conversation_id(api_params.conversation),
1032+
model=api_params.model,
1033+
rh_identity_context=rh_identity_context,
1034+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
1035+
sourcetype="responses_shield_blocked",
1036+
)
8871037
else:
8881038
try:
8891039
api_response = cast(
@@ -908,16 +1058,49 @@ async def handle_non_streaming_response(
9081058

9091059
except RuntimeError as e:
9101060
if is_context_length_error(str(e)):
1061+
_queue_responses_splunk_event(
1062+
background_tasks=background_tasks,
1063+
input_text=input_text,
1064+
response_text=str(e),
1065+
conversation_id=normalize_conversation_id(api_params.conversation),
1066+
model=api_params.model,
1067+
rh_identity_context=rh_identity_context,
1068+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
1069+
sourcetype="responses_error",
1070+
fire_and_forget=True,
1071+
)
9111072
error_response = PromptTooLongResponse(model=api_params.model)
9121073
raise HTTPException(**error_response.model_dump()) from e
9131074
raise e
9141075
except APIConnectionError as e:
1076+
_queue_responses_splunk_event(
1077+
background_tasks=background_tasks,
1078+
input_text=input_text,
1079+
response_text=str(e),
1080+
conversation_id=normalize_conversation_id(api_params.conversation),
1081+
model=api_params.model,
1082+
rh_identity_context=rh_identity_context,
1083+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
1084+
sourcetype="responses_error",
1085+
fire_and_forget=True,
1086+
)
9151087
error_response = ServiceUnavailableResponse(
9161088
backend_name="Llama Stack",
9171089
cause=str(e),
9181090
)
9191091
raise HTTPException(**error_response.model_dump()) from e
9201092
except (LLSApiStatusError, OpenAIAPIStatusError) as e:
1093+
_queue_responses_splunk_event(
1094+
background_tasks=background_tasks,
1095+
input_text=input_text,
1096+
response_text=str(e),
1097+
conversation_id=normalize_conversation_id(api_params.conversation),
1098+
model=api_params.model,
1099+
rh_identity_context=rh_identity_context,
1100+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
1101+
sourcetype="responses_error",
1102+
fire_and_forget=True,
1103+
)
9211104
error_response = handle_known_apistatus_errors(e, api_params.model)
9221105
raise HTTPException(**error_response.model_dump()) from e
9231106

@@ -945,6 +1128,25 @@ async def handle_non_streaming_response(
9451128
)
9461129
turn_summary.rag_chunks.extend(inline_rag_context.rag_chunks)
9471130
completed_at = datetime.now(UTC)
1131+
if moderation_result.decision != "blocked":
1132+
_queue_responses_splunk_event(
1133+
background_tasks=background_tasks,
1134+
input_text=input_text,
1135+
response_text=output_text,
1136+
conversation_id=normalize_conversation_id(api_params.conversation),
1137+
model=api_params.model,
1138+
rh_identity_context=rh_identity_context,
1139+
inference_time=(completed_at - started_at).total_seconds(),
1140+
sourcetype="responses_completed",
1141+
input_tokens=(
1142+
turn_summary.token_usage.input_tokens if turn_summary.token_usage else 0
1143+
),
1144+
output_tokens=(
1145+
turn_summary.token_usage.output_tokens
1146+
if turn_summary.token_usage
1147+
else 0
1148+
),
1149+
)
9481150
if api_params.store:
9491151
store_query_results(
9501152
user_id=user_id,

src/app/endpoints/rlsapi_v1.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,6 @@ class TemplateRenderError(Exception):
7878
)
7979

8080

81-
# Backward-compatible alias so existing test imports continue to work.
82-
_get_rh_identity_context = get_rh_identity_context
83-
84-
8581
infer_responses: dict[int | str, dict[str, Any]] = {
8682
200: RlsapiV1InferResponse.openapi_response(),
8783
401: UnauthorizedResponse.openapi_response(examples=UNAUTHORIZED_OPENAPI_EXAMPLES),

0 commit comments

Comments
 (0)