Skip to content

Commit f5d567f

Browse files
committed
RSPEED-2997: extract Splunk telemetry from responses endpoint
Move telemetry functions from responses.py into a dedicated responses_telemetry.py module. Centralize fire-and-forget dispatch logic in observability/splunk.py as dispatch_splunk_event(). No behavioral changes to request handling. Reduces responses.py from 1201 to 1057 lines. Signed-off-by: Major Hayden <major@redhat.com>
1 parent 6280606 commit f5d567f

6 files changed

Lines changed: 396 additions & 182 deletions

File tree

src/app/endpoints/responses.py

Lines changed: 10 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
"""Handler for REST API call to provide answer using Responses API (LCORE specification)."""
44

5-
import asyncio
65
import json
76
import time
87
from collections.abc import AsyncIterator, Sequence
@@ -32,6 +31,11 @@
3231
APIStatusError as OpenAIAPIStatusError,
3332
)
3433

34+
from app.endpoints.responses_telemetry import (
35+
queue_blocked_response_event,
36+
queue_completed_response_event,
37+
queue_responses_error_event,
38+
)
3539
from authentication import get_auth_dependency
3640
from authentication.interface import AuthTuple
3741
from authorization.azure_token_manager import AzureEntraIDManager
@@ -60,7 +64,6 @@
6064
from models.common.responses.responses_context import ResponsesContext
6165
from models.common.turn_summary import TurnSummary
6266
from models.config import Action
63-
from observability import ResponsesEventData, build_responses_event, send_splunk_event
6467
from utils.conversations import append_turn_items_to_conversation
6568
from utils.endpoints import (
6669
check_configuration_loaded,
@@ -159,96 +162,6 @@ def _get_user_agent(request: Request) -> Optional[str]:
159162
}
160163

161164

162-
# Strong references for fire-and-forget telemetry tasks so they aren't
163-
# garbage-collected before completion (the event loop only holds weak refs).
164-
_background_splunk_tasks: set[asyncio.Task[None]] = set()
165-
166-
167-
def _queue_responses_splunk_event( # pylint: disable=too-many-arguments,too-many-positional-arguments
168-
background_tasks: Optional[BackgroundTasks],
169-
input_text: str,
170-
response_text: str,
171-
conversation_id: str,
172-
model: str,
173-
rh_identity_context: tuple[str, str],
174-
inference_time: float,
175-
sourcetype: str,
176-
input_tokens: int = 0,
177-
output_tokens: int = 0,
178-
fire_and_forget: bool = False,
179-
user_agent: Optional[str] = None,
180-
) -> None:
181-
"""Build and queue a Splunk telemetry event for the responses endpoint.
182-
183-
No-op when background_tasks is None and fire_and_forget is False
184-
(Splunk telemetry disabled).
185-
186-
Args:
187-
background_tasks: FastAPI background task manager, or None if disabled.
188-
input_text: User input text.
189-
response_text: Response text from LLM or shield.
190-
conversation_id: Conversation identifier.
191-
model: Model name used for inference.
192-
rh_identity_context: Tuple of (org_id, system_id) from RH identity.
193-
inference_time: Request processing duration in seconds.
194-
sourcetype: Splunk sourcetype for the event.
195-
input_tokens: Number of prompt tokens consumed.
196-
output_tokens: Number of completion tokens produced.
197-
fire_and_forget: When True, dispatch via asyncio.create_task() instead
198-
of background_tasks. Use for error paths where an HTTPException
199-
follows, since FastAPI discards BackgroundTasks on non-2xx responses.
200-
user_agent: Sanitized User-Agent string from the request header, or None.
201-
"""
202-
if not fire_and_forget and background_tasks is None:
203-
return
204-
org_id, system_id = rh_identity_context
205-
event_data = ResponsesEventData(
206-
input_text=input_text,
207-
response_text=response_text,
208-
conversation_id=conversation_id,
209-
model=model,
210-
org_id=org_id,
211-
system_id=system_id,
212-
inference_time=inference_time,
213-
input_tokens=input_tokens,
214-
output_tokens=output_tokens,
215-
user_agent=user_agent,
216-
)
217-
event = build_responses_event(event_data)
218-
if fire_and_forget:
219-
task = asyncio.create_task(send_splunk_event(event, sourcetype))
220-
_background_splunk_tasks.add(task)
221-
task.add_done_callback(_background_splunk_tasks.discard)
222-
elif background_tasks is not None:
223-
background_tasks.add_task(send_splunk_event, event, sourcetype)
224-
225-
226-
def _queue_responses_error_event(
227-
error: Exception,
228-
api_params: ResponsesApiParams,
229-
context: ResponsesContext,
230-
) -> None:
231-
"""Queue fire-and-forget Splunk telemetry for a Responses API error.
232-
233-
Args:
234-
error: The backend exception being converted into an HTTP error.
235-
api_params: Responses API parameters for the failed request.
236-
context: Request-scoped Responses API context.
237-
"""
238-
_queue_responses_splunk_event(
239-
background_tasks=context.background_tasks,
240-
input_text=context.input_text,
241-
response_text=str(error),
242-
conversation_id=normalize_conversation_id(api_params.conversation),
243-
model=api_params.model,
244-
rh_identity_context=context.rh_identity_context,
245-
inference_time=(datetime.now(UTC) - context.started_at).total_seconds(),
246-
sourcetype="responses_error",
247-
fire_and_forget=True,
248-
user_agent=context.user_agent,
249-
)
250-
251-
252165
def _http_exception_for_response_api_error(
253166
error: Exception,
254167
api_params: ResponsesApiParams,
@@ -297,7 +210,7 @@ def _raise_response_api_http_exception(
297210
http_exception = _http_exception_for_response_api_error(error, api_params)
298211
if http_exception is None:
299212
raise error
300-
_queue_responses_error_event(error, api_params, context)
213+
queue_responses_error_event(error, api_params, context)
301214
raise http_exception from error
302215

303216

@@ -321,31 +234,6 @@ async def _persist_blocked_response_turn(
321234
)
322235

323236

324-
def _queue_blocked_response_event(
325-
api_params: ResponsesApiParams,
326-
context: ResponsesContext,
327-
response_text: str,
328-
) -> None:
329-
"""Queue Splunk telemetry for a shield-blocked Responses API request.
330-
331-
Args:
332-
api_params: Responses API parameters for the blocked request.
333-
context: Request-scoped Responses API context.
334-
response_text: Refusal text sent to the client.
335-
"""
336-
_queue_responses_splunk_event(
337-
background_tasks=context.background_tasks,
338-
input_text=context.input_text,
339-
response_text=response_text,
340-
conversation_id=normalize_conversation_id(api_params.conversation),
341-
model=api_params.model,
342-
rh_identity_context=context.rh_identity_context,
343-
inference_time=(datetime.now(UTC) - context.started_at).total_seconds(),
344-
sourcetype="responses_shield_blocked",
345-
user_agent=context.user_agent,
346-
)
347-
348-
349237
async def _append_previous_response_turn(
350238
api_params: ResponsesApiParams,
351239
context: ResponsesContext,
@@ -419,39 +307,6 @@ def _store_response_query_results(
419307
)
420308

421309

422-
def _queue_completed_response_event(
423-
api_params: ResponsesApiParams,
424-
context: ResponsesContext,
425-
turn_summary: TurnSummary,
426-
completed_at: datetime,
427-
response_text: str,
428-
) -> None:
429-
"""Queue Splunk telemetry for a completed Responses API request.
430-
431-
Args:
432-
api_params: Responses API parameters for the completed request.
433-
context: Request-scoped Responses API context.
434-
turn_summary: Summary containing token usage for telemetry.
435-
completed_at: Time when response handling completed.
436-
response_text: Final text sent to the client.
437-
"""
438-
if context.moderation_result.decision != "passed":
439-
return
440-
_queue_responses_splunk_event(
441-
background_tasks=context.background_tasks,
442-
input_text=context.input_text,
443-
response_text=response_text,
444-
conversation_id=normalize_conversation_id(api_params.conversation),
445-
model=api_params.model,
446-
rh_identity_context=context.rh_identity_context,
447-
inference_time=(completed_at - context.started_at).total_seconds(),
448-
sourcetype="responses_completed",
449-
input_tokens=turn_summary.token_usage.input_tokens,
450-
output_tokens=turn_summary.token_usage.output_tokens,
451-
user_agent=context.user_agent,
452-
)
453-
454-
455310
@router.post(
456311
"/responses",
457312
responses=responses_response,
@@ -682,7 +537,7 @@ async def handle_streaming_response(
682537
turn_summary.llm_response = context.moderation_result.message
683538
generator = shield_violation_generator(api_params, context)
684539
await _persist_blocked_response_turn(api_params, context)
685-
_queue_blocked_response_event(
540+
queue_blocked_response_event(
686541
api_params,
687542
context,
688543
context.moderation_result.message,
@@ -1141,7 +996,7 @@ async def generate_response(
1141996
completed_at,
1142997
topic_summary,
1143998
)
1144-
_queue_completed_response_event(
999+
queue_completed_response_event(
11451000
api_params,
11461001
context,
11471002
turn_summary,
@@ -1178,7 +1033,7 @@ async def handle_non_streaming_response(
11781033
**api_params.echoed_params(configuration.rag_id_mapping),
11791034
)
11801035
await _persist_blocked_response_turn(api_params, context)
1181-
_queue_blocked_response_event(api_params, context, output_text)
1036+
queue_blocked_response_event(api_params, context, output_text)
11821037
else:
11831038
inference_start_time = time.monotonic()
11841039
inference_metric_recorded = False
@@ -1258,7 +1113,7 @@ async def handle_non_streaming_response(
12581113
completed_at,
12591114
topic_summary,
12601115
)
1261-
_queue_completed_response_event(
1116+
queue_completed_response_event(
12621117
api_params,
12631118
context,
12641119
turn_summary,

0 commit comments

Comments
 (0)