Skip to content

Commit 45337b0

Browse files
committed
RSPEED-2997: extract Splunk telemetry from responses endpoint
Move telemetry functions from responses.py into a dedicated responses_telemetry.py module. Centralize fire-and-forget dispatch logic in observability/splunk.py as dispatch_splunk_event(). No behavioral changes to request handling. Reduces responses.py from 1201 to 1057 lines. Signed-off-by: Major Hayden <major@redhat.com>
1 parent c91efce commit 45337b0

6 files changed

Lines changed: 326 additions & 182 deletions

File tree

src/app/endpoints/responses.py

Lines changed: 10 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
"""Handler for REST API call to provide answer using Responses API (LCORE specification)."""
44

5-
import asyncio
65
import json
76
from collections.abc import AsyncIterator, Sequence
87
from datetime import UTC, datetime
@@ -31,6 +30,11 @@
3130
APIStatusError as OpenAIAPIStatusError,
3231
)
3332

33+
from app.endpoints.responses_telemetry import (
34+
queue_blocked_response_event,
35+
queue_completed_response_event,
36+
queue_responses_error_event,
37+
)
3438
from authentication import get_auth_dependency
3539
from authentication.interface import AuthTuple
3640
from authorization.azure_token_manager import AzureEntraIDManager
@@ -58,7 +62,6 @@
5862
from models.common.turn_summary import TurnSummary
5963
from models.config import Action
6064
from models.requests import ResponsesRequest
61-
from observability import ResponsesEventData, build_responses_event, send_splunk_event
6265
from utils.conversations import append_turn_items_to_conversation
6366
from utils.endpoints import (
6467
check_configuration_loaded,
@@ -157,96 +160,6 @@ def _get_user_agent(request: Request) -> Optional[str]:
157160
}
158161

159162

160-
# Strong references for fire-and-forget telemetry tasks so they aren't
161-
# garbage-collected before completion (the event loop only holds weak refs).
162-
_background_splunk_tasks: set[asyncio.Task[None]] = set()
163-
164-
165-
def _queue_responses_splunk_event( # pylint: disable=too-many-arguments,too-many-positional-arguments
166-
background_tasks: Optional[BackgroundTasks],
167-
input_text: str,
168-
response_text: str,
169-
conversation_id: str,
170-
model: str,
171-
rh_identity_context: tuple[str, str],
172-
inference_time: float,
173-
sourcetype: str,
174-
input_tokens: int = 0,
175-
output_tokens: int = 0,
176-
fire_and_forget: bool = False,
177-
user_agent: Optional[str] = None,
178-
) -> None:
179-
"""Build and queue a Splunk telemetry event for the responses endpoint.
180-
181-
No-op when background_tasks is None and fire_and_forget is False
182-
(Splunk telemetry disabled).
183-
184-
Args:
185-
background_tasks: FastAPI background task manager, or None if disabled.
186-
input_text: User input text.
187-
response_text: Response text from LLM or shield.
188-
conversation_id: Conversation identifier.
189-
model: Model name used for inference.
190-
rh_identity_context: Tuple of (org_id, system_id) from RH identity.
191-
inference_time: Request processing duration in seconds.
192-
sourcetype: Splunk sourcetype for the event.
193-
input_tokens: Number of prompt tokens consumed.
194-
output_tokens: Number of completion tokens produced.
195-
fire_and_forget: When True, dispatch via asyncio.create_task() instead
196-
of background_tasks. Use for error paths where an HTTPException
197-
follows, since FastAPI discards BackgroundTasks on non-2xx responses.
198-
user_agent: Sanitized User-Agent string from the request header, or None.
199-
"""
200-
if not fire_and_forget and background_tasks is None:
201-
return
202-
org_id, system_id = rh_identity_context
203-
event_data = ResponsesEventData(
204-
input_text=input_text,
205-
response_text=response_text,
206-
conversation_id=conversation_id,
207-
model=model,
208-
org_id=org_id,
209-
system_id=system_id,
210-
inference_time=inference_time,
211-
input_tokens=input_tokens,
212-
output_tokens=output_tokens,
213-
user_agent=user_agent,
214-
)
215-
event = build_responses_event(event_data)
216-
if fire_and_forget:
217-
task = asyncio.create_task(send_splunk_event(event, sourcetype))
218-
_background_splunk_tasks.add(task)
219-
task.add_done_callback(_background_splunk_tasks.discard)
220-
elif background_tasks is not None:
221-
background_tasks.add_task(send_splunk_event, event, sourcetype)
222-
223-
224-
def _queue_responses_error_event(
225-
error: Exception,
226-
api_params: ResponsesApiParams,
227-
context: ResponsesContext,
228-
) -> None:
229-
"""Queue fire-and-forget Splunk telemetry for a Responses API error.
230-
231-
Args:
232-
error: The backend exception being converted into an HTTP error.
233-
api_params: Responses API parameters for the failed request.
234-
context: Request-scoped Responses API context.
235-
"""
236-
_queue_responses_splunk_event(
237-
background_tasks=context.background_tasks,
238-
input_text=context.input_text,
239-
response_text=str(error),
240-
conversation_id=normalize_conversation_id(api_params.conversation),
241-
model=api_params.model,
242-
rh_identity_context=context.rh_identity_context,
243-
inference_time=(datetime.now(UTC) - context.started_at).total_seconds(),
244-
sourcetype="responses_error",
245-
fire_and_forget=True,
246-
user_agent=context.user_agent,
247-
)
248-
249-
250163
def _http_exception_for_response_api_error(
251164
error: Exception,
252165
api_params: ResponsesApiParams,
@@ -295,7 +208,7 @@ def _raise_response_api_http_exception(
295208
http_exception = _http_exception_for_response_api_error(error, api_params)
296209
if http_exception is None:
297210
raise error
298-
_queue_responses_error_event(error, api_params, context)
211+
queue_responses_error_event(error, api_params, context)
299212
raise http_exception from error
300213

301214

@@ -319,31 +232,6 @@ async def _persist_blocked_response_turn(
319232
)
320233

321234

322-
def _queue_blocked_response_event(
323-
api_params: ResponsesApiParams,
324-
context: ResponsesContext,
325-
response_text: str,
326-
) -> None:
327-
"""Queue Splunk telemetry for a shield-blocked Responses API request.
328-
329-
Args:
330-
api_params: Responses API parameters for the blocked request.
331-
context: Request-scoped Responses API context.
332-
response_text: Refusal text sent to the client.
333-
"""
334-
_queue_responses_splunk_event(
335-
background_tasks=context.background_tasks,
336-
input_text=context.input_text,
337-
response_text=response_text,
338-
conversation_id=normalize_conversation_id(api_params.conversation),
339-
model=api_params.model,
340-
rh_identity_context=context.rh_identity_context,
341-
inference_time=(datetime.now(UTC) - context.started_at).total_seconds(),
342-
sourcetype="responses_shield_blocked",
343-
user_agent=context.user_agent,
344-
)
345-
346-
347235
async def _append_previous_response_turn(
348236
api_params: ResponsesApiParams,
349237
context: ResponsesContext,
@@ -417,39 +305,6 @@ def _store_response_query_results(
417305
)
418306

419307

420-
def _queue_completed_response_event(
421-
api_params: ResponsesApiParams,
422-
context: ResponsesContext,
423-
turn_summary: TurnSummary,
424-
completed_at: datetime,
425-
response_text: str,
426-
) -> None:
427-
"""Queue Splunk telemetry for a completed Responses API request.
428-
429-
Args:
430-
api_params: Responses API parameters for the completed request.
431-
context: Request-scoped Responses API context.
432-
turn_summary: Summary containing token usage for telemetry.
433-
completed_at: Time when response handling completed.
434-
response_text: Final text sent to the client.
435-
"""
436-
if context.moderation_result.decision != "passed":
437-
return
438-
_queue_responses_splunk_event(
439-
background_tasks=context.background_tasks,
440-
input_text=context.input_text,
441-
response_text=response_text,
442-
conversation_id=normalize_conversation_id(api_params.conversation),
443-
model=api_params.model,
444-
rh_identity_context=context.rh_identity_context,
445-
inference_time=(completed_at - context.started_at).total_seconds(),
446-
sourcetype="responses_completed",
447-
input_tokens=turn_summary.token_usage.input_tokens,
448-
output_tokens=turn_summary.token_usage.output_tokens,
449-
user_agent=context.user_agent,
450-
)
451-
452-
453308
@router.post(
454309
"/responses",
455310
responses=responses_response,
@@ -652,7 +507,7 @@ async def handle_streaming_response(
652507
turn_summary.llm_response = context.moderation_result.message
653508
generator = shield_violation_generator(api_params, context)
654509
await _persist_blocked_response_turn(api_params, context)
655-
_queue_blocked_response_event(
510+
queue_blocked_response_event(
656511
api_params,
657512
context,
658513
context.moderation_result.message,
@@ -1069,7 +924,7 @@ async def generate_response(
1069924
completed_at,
1070925
topic_summary,
1071926
)
1072-
_queue_completed_response_event(
927+
queue_completed_response_event(
1073928
api_params,
1074929
context,
1075930
turn_summary,
@@ -1106,7 +961,7 @@ async def handle_non_streaming_response(
1106961
**api_params.echoed_params(configuration.rag_id_mapping),
1107962
)
1108963
await _persist_blocked_response_turn(api_params, context)
1109-
_queue_blocked_response_event(api_params, context, output_text)
964+
queue_blocked_response_event(api_params, context, output_text)
1110965
else:
1111966
try:
1112967
api_response = cast(
@@ -1169,7 +1024,7 @@ async def handle_non_streaming_response(
11691024
completed_at,
11701025
topic_summary,
11711026
)
1172-
_queue_completed_response_event(
1027+
queue_completed_response_event(
11731028
api_params,
11741029
context,
11751030
turn_summary,

0 commit comments

Comments
 (0)