Skip to content

Commit 218e298

Browse files
committed
feat(responses): add Splunk HEC telemetry to responses endpoint
Add telemetry events for responses endpoint covering completed, shield-blocked, and error paths. Error paths use asyncio.create_task() instead of BackgroundTasks since FastAPI discards background tasks on non-2xx responses. RSPEED-2867 Signed-off-by: Major Hayden <major@redhat.com>
1 parent 2b947a9 commit 218e298

4 files changed

Lines changed: 925 additions & 10 deletions

File tree

src/app/endpoints/responses.py

Lines changed: 211 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
# pylint: disable=too-many-locals,too-many-branches,too-many-nested-blocks, too-many-arguments,too-many-positional-arguments
1+
# pylint: disable=too-many-locals,too-many-branches,too-many-nested-blocks,too-many-arguments,too-many-positional-arguments,too-many-lines,too-many-statements
22

33
"""Handler for REST API call to provide answer using Responses API (LCORE specification)."""
44

5+
import asyncio
56
import json
67
from collections.abc import AsyncIterator
78
from datetime import UTC, datetime
89
from typing import Annotated, Any, Optional, cast
910

10-
from fastapi import APIRouter, Depends, HTTPException, Request
11+
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request
1112
from fastapi.responses import StreamingResponse
1213
from llama_stack_api import (
1314
OpenAIResponseObject,
@@ -53,6 +54,7 @@
5354
UnauthorizedResponse,
5455
UnprocessableEntityResponse,
5556
)
57+
from observability import ResponsesEventData, build_responses_event, send_splunk_event
5658
from utils.conversations import append_turn_items_to_conversation
5759
from utils.endpoints import (
5860
check_configuration_loaded,
@@ -89,6 +91,7 @@
8991
resolve_tool_choice,
9092
select_model_for_responses,
9193
)
94+
from utils.rh_identity import get_rh_identity_context
9295
from utils.shields import run_shield_moderation
9396
from utils.suid import (
9497
normalize_conversation_id,
@@ -132,6 +135,67 @@
132135
}
133136

134137

138+
# Strong references for fire-and-forget telemetry tasks so they aren't
139+
# garbage-collected before completion (the event loop only holds weak refs).
140+
_background_splunk_tasks: set[asyncio.Task[None]] = set()
141+
142+
143+
def _queue_responses_splunk_event( # pylint: disable=too-many-arguments,too-many-positional-arguments
144+
background_tasks: Optional[BackgroundTasks],
145+
input_text: str,
146+
response_text: str,
147+
conversation_id: str,
148+
model: str,
149+
rh_identity_context: tuple[str, str],
150+
inference_time: float,
151+
sourcetype: str,
152+
input_tokens: int = 0,
153+
output_tokens: int = 0,
154+
fire_and_forget: bool = False,
155+
) -> None:
156+
"""Build and queue a Splunk telemetry event for the responses endpoint.
157+
158+
No-op when background_tasks is None and fire_and_forget is False
159+
(Splunk telemetry disabled).
160+
161+
Args:
162+
background_tasks: FastAPI background task manager, or None if disabled.
163+
input_text: User input text.
164+
response_text: Response text from LLM or shield.
165+
conversation_id: Conversation identifier.
166+
model: Model name used for inference.
167+
rh_identity_context: Tuple of (org_id, system_id) from RH identity.
168+
inference_time: Request processing duration in seconds.
169+
sourcetype: Splunk sourcetype for the event.
170+
input_tokens: Number of prompt tokens consumed.
171+
output_tokens: Number of completion tokens produced.
172+
fire_and_forget: When True, dispatch via asyncio.create_task() instead
173+
of background_tasks. Use for error paths where an HTTPException
174+
follows, since FastAPI discards BackgroundTasks on non-2xx responses.
175+
"""
176+
if not fire_and_forget and background_tasks is None:
177+
return
178+
org_id, system_id = rh_identity_context
179+
event_data = ResponsesEventData(
180+
input_text=input_text,
181+
response_text=response_text,
182+
conversation_id=conversation_id,
183+
model=model,
184+
org_id=org_id,
185+
system_id=system_id,
186+
inference_time=inference_time,
187+
input_tokens=input_tokens,
188+
output_tokens=output_tokens,
189+
)
190+
event = build_responses_event(event_data)
191+
if fire_and_forget:
192+
task = asyncio.create_task(send_splunk_event(event, sourcetype))
193+
_background_splunk_tasks.add(task)
194+
task.add_done_callback(_background_splunk_tasks.discard)
195+
elif background_tasks is not None:
196+
background_tasks.add_task(send_splunk_event, event, sourcetype)
197+
198+
135199
@router.post(
136200
"/responses",
137201
responses=responses_response,
@@ -144,6 +208,7 @@ async def responses_endpoint_handler(
144208
responses_request: ResponsesRequest,
145209
auth: Annotated[AuthTuple, Depends(get_auth_dependency())],
146210
mcp_headers: dict[str, dict[str, str]] = Depends(mcp_headers_dependency),
211+
background_tasks: BackgroundTasks = BackgroundTasks(),
147212
) -> ResponsesResponse | StreamingResponse:
148213
"""
149214
Handle request to the /responses endpoint using Responses API (LCORE specification).
@@ -187,6 +252,7 @@ async def responses_endpoint_handler(
187252
)
188253
instructions_substituted = client_instructions is None
189254
started_at = datetime.now(UTC)
255+
rh_identity_context = get_rh_identity_context(request)
190256
user_id = auth[0]
191257

192258
await check_mcp_auth(configuration, mcp_headers)
@@ -307,6 +373,8 @@ async def responses_endpoint_handler(
307373
inline_rag_context=inline_rag_context,
308374
filter_server_tools=filter_server_tools,
309375
instructions_substituted=instructions_substituted,
376+
background_tasks=background_tasks,
377+
rh_identity_context=rh_identity_context,
310378
)
311379

312380

@@ -320,6 +388,8 @@ async def handle_streaming_response(
320388
inline_rag_context: RAGContext,
321389
filter_server_tools: bool = False,
322390
instructions_substituted: bool = False,
391+
background_tasks: BackgroundTasks | None = None,
392+
rh_identity_context: tuple[str, str] = ("", ""),
323393
) -> StreamingResponse:
324394
"""Handle streaming response from Responses API.
325395
@@ -333,6 +403,8 @@ async def handle_streaming_response(
333403
inline_rag_context: Inline RAG context to be used for the response
334404
filter_server_tools: Whether to filter server-deployed MCP tool events from the stream
335405
instructions_substituted: Whether the server substituted the instructions
406+
background_tasks: FastAPI background task manager for telemetry events
407+
rh_identity_context: Tuple of (org_id, system_id) from RH identity
336408
Returns:
337409
StreamingResponse with SSE-formatted events
338410
"""
@@ -359,6 +431,16 @@ async def handle_streaming_response(
359431
user_input=request.input,
360432
llm_output=[moderation_result.refusal_response],
361433
)
434+
_queue_responses_splunk_event(
435+
background_tasks=background_tasks,
436+
input_text=input_text,
437+
response_text=moderation_result.message,
438+
conversation_id=normalize_conversation_id(api_params.conversation),
439+
model=api_params.model,
440+
rh_identity_context=rh_identity_context,
441+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
442+
sourcetype="responses_shield_blocked",
443+
)
362444
else:
363445
try:
364446
response = await client.responses.create(
@@ -376,16 +458,49 @@ async def handle_streaming_response(
376458
)
377459
except RuntimeError as e: # library mode wraps 413 into runtime error
378460
if is_context_length_error(str(e)):
461+
_queue_responses_splunk_event(
462+
background_tasks=background_tasks,
463+
input_text=input_text,
464+
response_text=str(e),
465+
conversation_id=normalize_conversation_id(api_params.conversation),
466+
model=api_params.model,
467+
rh_identity_context=rh_identity_context,
468+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
469+
sourcetype="responses_error",
470+
fire_and_forget=True,
471+
)
379472
error_response = PromptTooLongResponse(model=api_params.model)
380473
raise HTTPException(**error_response.model_dump()) from e
381474
raise e
382475
except APIConnectionError as e:
476+
_queue_responses_splunk_event(
477+
background_tasks=background_tasks,
478+
input_text=input_text,
479+
response_text=str(e),
480+
conversation_id=normalize_conversation_id(api_params.conversation),
481+
model=api_params.model,
482+
rh_identity_context=rh_identity_context,
483+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
484+
sourcetype="responses_error",
485+
fire_and_forget=True,
486+
)
383487
error_response = ServiceUnavailableResponse(
384488
backend_name="Llama Stack",
385489
cause=str(e),
386490
)
387491
raise HTTPException(**error_response.model_dump()) from e
388492
except (LLSApiStatusError, OpenAIAPIStatusError) as e:
493+
_queue_responses_splunk_event(
494+
background_tasks=background_tasks,
495+
input_text=input_text,
496+
response_text=str(e),
497+
conversation_id=normalize_conversation_id(api_params.conversation),
498+
model=api_params.model,
499+
rh_identity_context=rh_identity_context,
500+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
501+
sourcetype="responses_error",
502+
fire_and_forget=True,
503+
)
389504
error_response = handle_known_apistatus_errors(e, api_params.model)
390505
raise HTTPException(**error_response.model_dump()) from e
391506

@@ -399,6 +514,9 @@ async def handle_streaming_response(
399514
started_at=started_at,
400515
api_params=api_params,
401516
generate_topic_summary=request.generate_topic_summary or False,
517+
background_tasks=background_tasks,
518+
rh_identity_context=rh_identity_context,
519+
shield_blocked=(moderation_result.decision == "blocked"),
402520
),
403521
media_type="text/event-stream",
404522
)
@@ -794,6 +912,9 @@ async def generate_response(
794912
started_at: datetime,
795913
api_params: ResponsesApiParams,
796914
generate_topic_summary: bool,
915+
background_tasks: BackgroundTasks | None = None,
916+
rh_identity_context: tuple[str, str] = ("", ""),
917+
shield_blocked: bool = False,
797918
) -> AsyncIterator[str]:
798919
"""Stream the response from the generator and persist conversation details.
799920
@@ -808,6 +929,9 @@ async def generate_response(
808929
started_at: Timestamp when the conversation started
809930
api_params: ResponsesApiParams
810931
generate_topic_summary: Whether to generate topic summary for new conversations
932+
background_tasks: FastAPI background task manager for telemetry events
933+
rh_identity_context: Tuple of (org_id, system_id) from RH identity
934+
shield_blocked: Whether the request was blocked by a shield
811935
Yields:
812936
SSE-formatted strings from the generator
813937
"""
@@ -835,6 +959,25 @@ async def generate_response(
835959
skip_userid_check=skip_userid_check,
836960
topic_summary=topic_summary,
837961
)
962+
if not shield_blocked:
963+
_queue_responses_splunk_event(
964+
background_tasks=background_tasks,
965+
input_text=input_text,
966+
response_text=turn_summary.llm_response,
967+
conversation_id=normalize_conversation_id(api_params.conversation),
968+
model=api_params.model,
969+
rh_identity_context=rh_identity_context,
970+
inference_time=(completed_at - started_at).total_seconds(),
971+
sourcetype="responses_completed",
972+
input_tokens=(
973+
turn_summary.token_usage.input_tokens if turn_summary.token_usage else 0
974+
),
975+
output_tokens=(
976+
turn_summary.token_usage.output_tokens
977+
if turn_summary.token_usage
978+
else 0
979+
),
980+
)
838981

839982

840983
async def handle_non_streaming_response(
@@ -847,6 +990,8 @@ async def handle_non_streaming_response(
847990
inline_rag_context: RAGContext,
848991
filter_server_tools: bool = False,
849992
instructions_substituted: bool = False,
993+
background_tasks: BackgroundTasks | None = None,
994+
rh_identity_context: tuple[str, str] = ("", ""),
850995
) -> ResponsesResponse:
851996
"""Handle non-streaming response from Responses API.
852997
@@ -860,6 +1005,8 @@ async def handle_non_streaming_response(
8601005
inline_rag_context: Inline RAG context to be used for the response
8611006
filter_server_tools: Whether to filter server-deployed MCP tool output
8621007
instructions_substituted: Whether the server substituted the instructions
1008+
background_tasks: FastAPI background task manager for telemetry events
1009+
rh_identity_context: Tuple of (org_id, system_id) from RH identity
8631010
Returns:
8641011
ResponsesResponse with the completed response
8651012
"""
@@ -884,6 +1031,16 @@ async def handle_non_streaming_response(
8841031
user_input=request.input,
8851032
llm_output=[moderation_result.refusal_response],
8861033
)
1034+
_queue_responses_splunk_event(
1035+
background_tasks=background_tasks,
1036+
input_text=input_text,
1037+
response_text=output_text,
1038+
conversation_id=normalize_conversation_id(api_params.conversation),
1039+
model=api_params.model,
1040+
rh_identity_context=rh_identity_context,
1041+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
1042+
sourcetype="responses_shield_blocked",
1043+
)
8871044
else:
8881045
try:
8891046
api_response = cast(
@@ -908,16 +1065,49 @@ async def handle_non_streaming_response(
9081065

9091066
except RuntimeError as e:
9101067
if is_context_length_error(str(e)):
1068+
_queue_responses_splunk_event(
1069+
background_tasks=background_tasks,
1070+
input_text=input_text,
1071+
response_text=str(e),
1072+
conversation_id=normalize_conversation_id(api_params.conversation),
1073+
model=api_params.model,
1074+
rh_identity_context=rh_identity_context,
1075+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
1076+
sourcetype="responses_error",
1077+
fire_and_forget=True,
1078+
)
9111079
error_response = PromptTooLongResponse(model=api_params.model)
9121080
raise HTTPException(**error_response.model_dump()) from e
9131081
raise e
9141082
except APIConnectionError as e:
1083+
_queue_responses_splunk_event(
1084+
background_tasks=background_tasks,
1085+
input_text=input_text,
1086+
response_text=str(e),
1087+
conversation_id=normalize_conversation_id(api_params.conversation),
1088+
model=api_params.model,
1089+
rh_identity_context=rh_identity_context,
1090+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
1091+
sourcetype="responses_error",
1092+
fire_and_forget=True,
1093+
)
9151094
error_response = ServiceUnavailableResponse(
9161095
backend_name="Llama Stack",
9171096
cause=str(e),
9181097
)
9191098
raise HTTPException(**error_response.model_dump()) from e
9201099
except (LLSApiStatusError, OpenAIAPIStatusError) as e:
1100+
_queue_responses_splunk_event(
1101+
background_tasks=background_tasks,
1102+
input_text=input_text,
1103+
response_text=str(e),
1104+
conversation_id=normalize_conversation_id(api_params.conversation),
1105+
model=api_params.model,
1106+
rh_identity_context=rh_identity_context,
1107+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
1108+
sourcetype="responses_error",
1109+
fire_and_forget=True,
1110+
)
9211111
error_response = handle_known_apistatus_errors(e, api_params.model)
9221112
raise HTTPException(**error_response.model_dump()) from e
9231113

@@ -945,6 +1135,25 @@ async def handle_non_streaming_response(
9451135
)
9461136
turn_summary.rag_chunks.extend(inline_rag_context.rag_chunks)
9471137
completed_at = datetime.now(UTC)
1138+
if moderation_result.decision != "blocked":
1139+
_queue_responses_splunk_event(
1140+
background_tasks=background_tasks,
1141+
input_text=input_text,
1142+
response_text=output_text,
1143+
conversation_id=normalize_conversation_id(api_params.conversation),
1144+
model=api_params.model,
1145+
rh_identity_context=rh_identity_context,
1146+
inference_time=(completed_at - started_at).total_seconds(),
1147+
sourcetype="responses_completed",
1148+
input_tokens=(
1149+
turn_summary.token_usage.input_tokens if turn_summary.token_usage else 0
1150+
),
1151+
output_tokens=(
1152+
turn_summary.token_usage.output_tokens
1153+
if turn_summary.token_usage
1154+
else 0
1155+
),
1156+
)
9481157
if api_params.store:
9491158
store_query_results(
9501159
user_id=user_id,

src/app/endpoints/rlsapi_v1.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,6 @@ class TemplateRenderError(Exception):
7878
)
7979

8080

81-
# Backward-compatible alias so existing test imports continue to work.
82-
_get_rh_identity_context = get_rh_identity_context
83-
84-
8581
infer_responses: dict[int | str, dict[str, Any]] = {
8682
200: RlsapiV1InferResponse.openapi_response(),
8783
401: UnauthorizedResponse.openapi_response(examples=UNAUTHORIZED_OPENAPI_EXAMPLES),

0 commit comments

Comments
 (0)