Skip to content

Commit b092d7d

Browse files
committed
feat(responses): add Splunk HEC telemetry to responses endpoint
Wires BackgroundTasks and RH identity through the responses endpoint handler and adds telemetry hooks for all 6 code paths (streaming and non-streaming success, error, and shield-blocked events). - Add background_tasks and rh_identity_context parameters to responses_endpoint_handler(), handle_streaming_response(), handle_non_streaming_response(), and generate_response() - Extract RH identity in responses_endpoint_handler() before sub-handler dispatch (sub-handlers receive ResponsesRequest, not FastAPI Request) - Add _queue_responses_splunk_event() helper using BackgroundTasks - Fire responses_completed, responses_error, and responses_shield_blocked sourcetype events on all paths - Add unit tests covering all 6 telemetry paths Part 3/3 of RSPEED-2867 Splunk HEC telemetry for /responses. Signed-off-by: Major Hayden <mhayden@redhat.com> Signed-off-by: Major Hayden <major@redhat.com>
1 parent a4cb698 commit b092d7d

2 files changed

Lines changed: 771 additions & 2 deletions

File tree

src/app/endpoints/responses.py

Lines changed: 183 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# pylint: disable=too-many-locals,too-many-branches,too-many-nested-blocks, too-many-arguments,too-many-positional-arguments
1+
# pylint: disable=too-many-locals,too-many-branches,too-many-nested-blocks,too-many-arguments,too-many-positional-arguments,too-many-lines,too-many-statements
22

33
"""Handler for REST API call to provide answer using Responses API (LCORE specification)."""
44

@@ -7,7 +7,7 @@
77
from datetime import UTC, datetime
88
from typing import Annotated, Any, Optional, cast
99

10-
from fastapi import APIRouter, Depends, HTTPException, Request
10+
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request
1111
from fastapi.responses import StreamingResponse
1212
from llama_stack_api import (
1313
OpenAIResponseObject,
@@ -51,6 +51,7 @@
5151
UnauthorizedResponse,
5252
UnprocessableEntityResponse,
5353
)
54+
from observability import ResponsesEventData, build_responses_event, send_splunk_event
5455
from utils.conversations import append_turn_items_to_conversation
5556
from utils.endpoints import (
5657
check_configuration_loaded,
@@ -87,6 +88,7 @@
8788
resolve_tool_choice,
8889
select_model_for_responses,
8990
)
91+
from utils.rh_identity import get_rh_identity_context
9092
from utils.shields import run_shield_moderation
9193
from utils.suid import (
9294
normalize_conversation_id,
@@ -130,6 +132,52 @@
130132
}
131133

132134

135+
def _queue_responses_splunk_event( # pylint: disable=too-many-arguments,too-many-positional-arguments
136+
background_tasks: Optional[BackgroundTasks],
137+
input_text: str,
138+
response_text: str,
139+
conversation_id: str,
140+
model: str,
141+
rh_identity_context: tuple[str, str],
142+
inference_time: float,
143+
sourcetype: str,
144+
input_tokens: int = 0,
145+
output_tokens: int = 0,
146+
) -> None:
147+
"""Build and queue a Splunk telemetry event for the responses endpoint.
148+
149+
No-op when background_tasks is None (Splunk telemetry disabled).
150+
151+
Args:
152+
background_tasks: FastAPI background task manager, or None if disabled.
153+
input_text: User input text.
154+
response_text: Response text from LLM or shield.
155+
conversation_id: Conversation identifier.
156+
model: Model name used for inference.
157+
rh_identity_context: Tuple of (org_id, system_id) from RH identity.
158+
inference_time: Request processing duration in seconds.
159+
sourcetype: Splunk sourcetype for the event.
160+
input_tokens: Number of prompt tokens consumed.
161+
output_tokens: Number of completion tokens produced.
162+
"""
163+
if background_tasks is None:
164+
return
165+
org_id, system_id = rh_identity_context
166+
event_data = ResponsesEventData(
167+
input_text=input_text,
168+
response_text=response_text,
169+
conversation_id=conversation_id,
170+
model=model,
171+
org_id=org_id,
172+
system_id=system_id,
173+
inference_time=inference_time,
174+
input_tokens=input_tokens,
175+
output_tokens=output_tokens,
176+
)
177+
event = build_responses_event(event_data)
178+
background_tasks.add_task(send_splunk_event, event, sourcetype)
179+
180+
133181
@router.post(
134182
"/responses",
135183
responses=responses_response,
@@ -142,6 +190,7 @@ async def responses_endpoint_handler(
142190
responses_request: ResponsesRequest,
143191
auth: Annotated[AuthTuple, Depends(get_auth_dependency())],
144192
mcp_headers: dict[str, dict[str, str]] = Depends(mcp_headers_dependency),
193+
background_tasks: BackgroundTasks = cast(BackgroundTasks, None),
145194
) -> ResponsesResponse | StreamingResponse:
146195
"""
147196
Handle request to the /responses endpoint using Responses API (LCORE specification).
@@ -183,6 +232,7 @@ async def responses_endpoint_handler(
183232
responses_request.instructions, field_name="instructions"
184233
)
185234
started_at = datetime.now(UTC)
235+
rh_identity_context = get_rh_identity_context(request)
186236
user_id = auth[0]
187237

188238
await check_mcp_auth(configuration, mcp_headers)
@@ -302,6 +352,8 @@ async def responses_endpoint_handler(
302352
moderation_result=moderation_result,
303353
inline_rag_context=inline_rag_context,
304354
filter_server_tools=filter_server_tools,
355+
background_tasks=background_tasks,
356+
rh_identity_context=rh_identity_context,
305357
)
306358

307359

@@ -314,6 +366,8 @@ async def handle_streaming_response(
314366
moderation_result: ShieldModerationResult,
315367
inline_rag_context: RAGContext,
316368
filter_server_tools: bool = False,
369+
background_tasks: BackgroundTasks | None = None,
370+
rh_identity_context: tuple[str, str] = ("", ""),
317371
) -> StreamingResponse:
318372
"""Handle streaming response from Responses API.
319373
@@ -326,6 +380,8 @@ async def handle_streaming_response(
326380
moderation_result: Result of shield moderation check
327381
inline_rag_context: Inline RAG context to be used for the response
328382
filter_server_tools: Whether to filter server-deployed MCP tool events from the stream
383+
background_tasks: FastAPI background task manager for telemetry events
384+
rh_identity_context: Tuple of (org_id, system_id) from RH identity
329385
Returns:
330386
StreamingResponse with SSE-formatted events
331387
"""
@@ -352,6 +408,16 @@ async def handle_streaming_response(
352408
user_input=request.input,
353409
llm_output=[moderation_result.refusal_response],
354410
)
411+
_queue_responses_splunk_event(
412+
background_tasks=background_tasks,
413+
input_text=input_text,
414+
response_text=moderation_result.message,
415+
conversation_id=normalize_conversation_id(api_params.conversation),
416+
model=api_params.model,
417+
rh_identity_context=rh_identity_context,
418+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
419+
sourcetype="responses_shield_blocked",
420+
)
355421
else:
356422
try:
357423
response = await client.responses.create(
@@ -368,16 +434,46 @@ async def handle_streaming_response(
368434
)
369435
except RuntimeError as e: # library mode wraps 413 into runtime error
370436
if is_context_length_error(str(e)):
437+
_queue_responses_splunk_event(
438+
background_tasks=background_tasks,
439+
input_text=input_text,
440+
response_text=str(e),
441+
conversation_id=normalize_conversation_id(api_params.conversation),
442+
model=api_params.model,
443+
rh_identity_context=rh_identity_context,
444+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
445+
sourcetype="responses_error",
446+
)
371447
error_response = PromptTooLongResponse(model=api_params.model)
372448
raise HTTPException(**error_response.model_dump()) from e
373449
raise e
374450
except APIConnectionError as e:
451+
_queue_responses_splunk_event(
452+
background_tasks=background_tasks,
453+
input_text=input_text,
454+
response_text=str(e),
455+
conversation_id=normalize_conversation_id(api_params.conversation),
456+
model=api_params.model,
457+
rh_identity_context=rh_identity_context,
458+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
459+
sourcetype="responses_error",
460+
)
375461
error_response = ServiceUnavailableResponse(
376462
backend_name="Llama Stack",
377463
cause=str(e),
378464
)
379465
raise HTTPException(**error_response.model_dump()) from e
380466
except (LLSApiStatusError, OpenAIAPIStatusError) as e:
467+
_queue_responses_splunk_event(
468+
background_tasks=background_tasks,
469+
input_text=input_text,
470+
response_text=str(e),
471+
conversation_id=normalize_conversation_id(api_params.conversation),
472+
model=api_params.model,
473+
rh_identity_context=rh_identity_context,
474+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
475+
sourcetype="responses_error",
476+
)
381477
error_response = handle_known_apistatus_errors(e, api_params.model)
382478
raise HTTPException(**error_response.model_dump()) from e
383479

@@ -391,6 +487,8 @@ async def handle_streaming_response(
391487
started_at=started_at,
392488
api_params=api_params,
393489
generate_topic_summary=request.generate_topic_summary or False,
490+
background_tasks=background_tasks,
491+
rh_identity_context=rh_identity_context,
394492
),
395493
media_type="text/event-stream",
396494
)
@@ -742,6 +840,8 @@ async def generate_response(
742840
started_at: datetime,
743841
api_params: ResponsesApiParams,
744842
generate_topic_summary: bool,
843+
background_tasks: BackgroundTasks | None = None,
844+
rh_identity_context: tuple[str, str] = ("", ""),
745845
) -> AsyncIterator[str]:
746846
"""Stream the response from the generator and persist conversation details.
747847
@@ -756,6 +856,8 @@ async def generate_response(
756856
started_at: Timestamp when the conversation started
757857
api_params: ResponsesApiParams
758858
generate_topic_summary: Whether to generate topic summary for new conversations
859+
background_tasks: FastAPI background task manager for telemetry events
860+
rh_identity_context: Tuple of (org_id, system_id) from RH identity
759861
Yields:
760862
SSE-formatted strings from the generator
761863
"""
@@ -783,6 +885,22 @@ async def generate_response(
783885
skip_userid_check=skip_userid_check,
784886
topic_summary=topic_summary,
785887
)
888+
_queue_responses_splunk_event(
889+
background_tasks=background_tasks,
890+
input_text=input_text,
891+
response_text=turn_summary.llm_response,
892+
conversation_id=normalize_conversation_id(api_params.conversation),
893+
model=api_params.model,
894+
rh_identity_context=rh_identity_context,
895+
inference_time=(completed_at - started_at).total_seconds(),
896+
sourcetype="responses_completed",
897+
input_tokens=(
898+
turn_summary.token_usage.input_tokens if turn_summary.token_usage else 0
899+
),
900+
output_tokens=(
901+
turn_summary.token_usage.output_tokens if turn_summary.token_usage else 0
902+
),
903+
)
786904

787905

788906
async def handle_non_streaming_response(
@@ -794,6 +912,8 @@ async def handle_non_streaming_response(
794912
moderation_result: ShieldModerationResult,
795913
inline_rag_context: RAGContext,
796914
filter_server_tools: bool = False,
915+
background_tasks: BackgroundTasks | None = None,
916+
rh_identity_context: tuple[str, str] = ("", ""),
797917
) -> ResponsesResponse:
798918
"""Handle non-streaming response from Responses API.
799919
@@ -806,6 +926,8 @@ async def handle_non_streaming_response(
806926
moderation_result: Result of shield moderation check
807927
inline_rag_context: Inline RAG context to be used for the response
808928
filter_server_tools: Whether to filter server-deployed MCP tool output
929+
background_tasks: FastAPI background task manager for telemetry events
930+
rh_identity_context: Tuple of (org_id, system_id) from RH identity
809931
Returns:
810932
ResponsesResponse with the completed response
811933
"""
@@ -830,6 +952,16 @@ async def handle_non_streaming_response(
830952
user_input=request.input,
831953
llm_output=[moderation_result.refusal_response],
832954
)
955+
_queue_responses_splunk_event(
956+
background_tasks=background_tasks,
957+
input_text=input_text,
958+
response_text=output_text,
959+
conversation_id=normalize_conversation_id(api_params.conversation),
960+
model=api_params.model,
961+
rh_identity_context=rh_identity_context,
962+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
963+
sourcetype="responses_shield_blocked",
964+
)
833965
else:
834966
try:
835967
api_response = cast(
@@ -854,16 +986,46 @@ async def handle_non_streaming_response(
854986

855987
except RuntimeError as e:
856988
if is_context_length_error(str(e)):
989+
_queue_responses_splunk_event(
990+
background_tasks=background_tasks,
991+
input_text=input_text,
992+
response_text=str(e),
993+
conversation_id=normalize_conversation_id(api_params.conversation),
994+
model=api_params.model,
995+
rh_identity_context=rh_identity_context,
996+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
997+
sourcetype="responses_error",
998+
)
857999
error_response = PromptTooLongResponse(model=api_params.model)
8581000
raise HTTPException(**error_response.model_dump()) from e
8591001
raise e
8601002
except APIConnectionError as e:
1003+
_queue_responses_splunk_event(
1004+
background_tasks=background_tasks,
1005+
input_text=input_text,
1006+
response_text=str(e),
1007+
conversation_id=normalize_conversation_id(api_params.conversation),
1008+
model=api_params.model,
1009+
rh_identity_context=rh_identity_context,
1010+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
1011+
sourcetype="responses_error",
1012+
)
8611013
error_response = ServiceUnavailableResponse(
8621014
backend_name="Llama Stack",
8631015
cause=str(e),
8641016
)
8651017
raise HTTPException(**error_response.model_dump()) from e
8661018
except (LLSApiStatusError, OpenAIAPIStatusError) as e:
1019+
_queue_responses_splunk_event(
1020+
background_tasks=background_tasks,
1021+
input_text=input_text,
1022+
response_text=str(e),
1023+
conversation_id=normalize_conversation_id(api_params.conversation),
1024+
model=api_params.model,
1025+
rh_identity_context=rh_identity_context,
1026+
inference_time=(datetime.now(UTC) - started_at).total_seconds(),
1027+
sourcetype="responses_error",
1028+
)
8671029
error_response = handle_known_apistatus_errors(e, api_params.model)
8681030
raise HTTPException(**error_response.model_dump()) from e
8691031

@@ -891,6 +1053,25 @@ async def handle_non_streaming_response(
8911053
)
8921054
turn_summary.rag_chunks.extend(inline_rag_context.rag_chunks)
8931055
completed_at = datetime.now(UTC)
1056+
if moderation_result.decision != "blocked":
1057+
_queue_responses_splunk_event(
1058+
background_tasks=background_tasks,
1059+
input_text=input_text,
1060+
response_text=output_text,
1061+
conversation_id=normalize_conversation_id(api_params.conversation),
1062+
model=api_params.model,
1063+
rh_identity_context=rh_identity_context,
1064+
inference_time=(completed_at - started_at).total_seconds(),
1065+
sourcetype="responses_completed",
1066+
input_tokens=(
1067+
turn_summary.token_usage.input_tokens if turn_summary.token_usage else 0
1068+
),
1069+
output_tokens=(
1070+
turn_summary.token_usage.output_tokens
1071+
if turn_summary.token_usage
1072+
else 0
1073+
),
1074+
)
8941075
if api_params.store:
8951076
store_query_results(
8961077
user_id=user_id,

0 commit comments

Comments
 (0)