Skip to content

Commit 97de845

Browse files
committed
add streaming_query/interrupt for stopping in-flight requests
Signed-off-by: Jordan Dubrick <jdubrick@redhat.com>
1 parent d5b791c commit 97de845

12 files changed

Lines changed: 729 additions & 26 deletions

File tree

docs/openapi.json

Lines changed: 175 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,7 +1661,7 @@
16611661
"type": "string",
16621662
"format": "text/event-stream"
16631663
},
1664-
"example": "data: {\"event\": \"start\", \"data\": {\"conversation_id\": \"123e4567-e89b-12d3-a456-426614174000\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 0, \"token\": \"No Violation\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 1, \"token\": \"\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 2, \"token\": \"Hello\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 3, \"token\": \"!\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 4, \"token\": \" How\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 5, \"token\": \" can\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 6, \"token\": \" I\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 7, \"token\": \" assist\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 8, \"token\": \" you\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 9, \"token\": \" today\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 10, \"token\": \"?\"}}\n\ndata: {\"event\": \"turn_complete\", \"data\": {\"token\": \"Hello! How can I assist you today?\"}}\n\ndata: {\"event\": \"end\", \"data\": {\"referenced_documents\": [], \"truncated\": null, \"input_tokens\": 11, \"output_tokens\": 19}, \"available_quotas\": {}}\n\n"
1664+
"example": "data: {\"event\": \"start\", \"data\": {\"conversation_id\": \"123e4567-e89b-12d3-a456-426614174000\", \"request_id\": \"123e4567-e89b-12d3-a456-426614174001\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 0, \"token\": \"No Violation\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 1, \"token\": \"\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 2, \"token\": \"Hello\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 3, \"token\": \"!\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 4, \"token\": \" How\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 5, \"token\": \" can\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 6, \"token\": \" I\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 7, \"token\": \" assist\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 8, \"token\": \" you\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 9, \"token\": \" today\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 10, \"token\": \"?\"}}\n\ndata: {\"event\": \"turn_complete\", \"data\": {\"token\": \"Hello! How can I assist you today?\"}}\n\ndata: {\"event\": \"end\", \"data\": {\"referenced_documents\": [], \"truncated\": null, \"input_tokens\": 11, \"output_tokens\": 19}, \"available_quotas\": {}}\n\n"
16651665
}
16661666
}
16671667
},
@@ -1912,6 +1912,111 @@
19121912
}
19131913
}
19141914
},
1915+
"/v1/streaming_query/interrupt": {
1916+
"post": {
1917+
"tags": [
1918+
"streaming_query_interrupt"
1919+
],
1920+
"summary": "Streaming Query Interrupt Endpoint Handler",
1921+
"description": "Interrupt an in-progress streaming query by request identifier.",
1922+
"operationId": "stream_interrupt_endpoint_handler_v1_streaming_query_interrupt_post",
1923+
"requestBody": {
1924+
"content": {
1925+
"application/json": {
1926+
"schema": {
1927+
"$ref": "#/components/schemas/StreamingInterruptRequest"
1928+
}
1929+
}
1930+
},
1931+
"required": true
1932+
},
1933+
"responses": {
1934+
"200": {
1935+
"description": "Successful response",
1936+
"content": {
1937+
"application/json": {
1938+
"schema": {
1939+
"$ref": "#/components/schemas/StreamingInterruptResponse"
1940+
},
1941+
"example": {
1942+
"interrupted": true,
1943+
"message": "Streaming request interrupted",
1944+
"request_id": "123e4567-e89b-12d3-a456-426614174000"
1945+
}
1946+
}
1947+
}
1948+
},
1949+
"401": {
1950+
"description": "Unauthorized",
1951+
"content": {
1952+
"application/json": {
1953+
"schema": {
1954+
"$ref": "#/components/schemas/UnauthorizedResponse"
1955+
},
1956+
"examples": {
1957+
"missing header": {
1958+
"value": {
1959+
"detail": {
1960+
"cause": "No Authorization header found",
1961+
"response": "Missing or invalid credentials provided by client"
1962+
}
1963+
}
1964+
},
1965+
"missing token": {
1966+
"value": {
1967+
"detail": {
1968+
"cause": "No token found in Authorization header",
1969+
"response": "Missing or invalid credentials provided by client"
1970+
}
1971+
}
1972+
}
1973+
}
1974+
}
1975+
}
1976+
},
1977+
"403": {
1978+
"description": "Permission denied",
1979+
"content": {
1980+
"application/json": {
1981+
"schema": {
1982+
"$ref": "#/components/schemas/ForbiddenResponse"
1983+
},
1984+
"examples": {
1985+
"endpoint": {
1986+
"value": {
1987+
"detail": {
1988+
"cause": "User 6789 is not authorized to access this endpoint.",
1989+
"response": "User does not have permission to access this endpoint"
1990+
}
1991+
}
1992+
}
1993+
}
1994+
}
1995+
}
1996+
},
1997+
"404": {
1998+
"description": "Resource not found",
1999+
"content": {
2000+
"application/json": {
2001+
"schema": {
2002+
"$ref": "#/components/schemas/NotFoundResponse"
2003+
}
2004+
}
2005+
}
2006+
},
2007+
"422": {
2008+
"description": "Validation Error",
2009+
"content": {
2010+
"application/json": {
2011+
"schema": {
2012+
"$ref": "#/components/schemas/HTTPValidationError"
2013+
}
2014+
}
2015+
}
2016+
}
2017+
}
2018+
}
2019+
},
19152020
"/v1/config": {
19162021
"get": {
19172022
"tags": [
@@ -4312,7 +4417,7 @@
43124417
],
43134418
"summary": "Handle A2A Jsonrpc",
43144419
"description": "Handle A2A JSON-RPC requests following the A2A protocol specification.\n\nThis endpoint uses the DefaultRequestHandler from the A2A SDK to handle\nall JSON-RPC requests including message/send, message/stream, etc.\n\nThe A2A SDK application is created per-request to include authentication\ncontext while still leveraging FastAPI's authorization middleware.\n\nAutomatically detects streaming requests (message/stream JSON-RPC method)\nand returns a StreamingResponse to enable real-time chunk delivery.\n\nArgs:\n request: FastAPI request object\n auth: Authentication tuple\n mcp_headers: MCP headers for context propagation\n\nReturns:\n JSON-RPC response or streaming response",
4315-
"operationId": "handle_a2a_jsonrpc_a2a_post",
4420+
"operationId": "handle_a2a_jsonrpc_a2a_get",
43164421
"responses": {
43174422
"200": {
43184423
"description": "Successful Response",
@@ -4330,7 +4435,7 @@
43304435
],
43314436
"summary": "Handle A2A Jsonrpc",
43324437
"description": "Handle A2A JSON-RPC requests following the A2A protocol specification.\n\nThis endpoint uses the DefaultRequestHandler from the A2A SDK to handle\nall JSON-RPC requests including message/send, message/stream, etc.\n\nThe A2A SDK application is created per-request to include authentication\ncontext while still leveraging FastAPI's authorization middleware.\n\nAutomatically detects streaming requests (message/stream JSON-RPC method)\nand returns a StreamingResponse to enable real-time chunk delivery.\n\nArgs:\n request: FastAPI request object\n auth: Authentication tuple\n mcp_headers: MCP headers for context propagation\n\nReturns:\n JSON-RPC response or streaming response",
4333-
"operationId": "handle_a2a_jsonrpc_a2a_post",
4438+
"operationId": "handle_a2a_jsonrpc_a2a_get",
43344439
"responses": {
43354440
"200": {
43364441
"description": "Successful Response",
@@ -9177,6 +9282,73 @@
91779282
}
91789283
]
91799284
},
9285+
"StreamingInterruptRequest": {
9286+
"properties": {
9287+
"request_id": {
9288+
"type": "string",
9289+
"title": "Request Id",
9290+
"description": "The active streaming request ID to interrupt",
9291+
"examples": [
9292+
"123e4567-e89b-12d3-a456-426614174000"
9293+
]
9294+
}
9295+
},
9296+
"additionalProperties": false,
9297+
"type": "object",
9298+
"required": [
9299+
"request_id"
9300+
],
9301+
"title": "StreamingInterruptRequest",
9302+
"description": "Model representing a request to interrupt an active streaming query.",
9303+
"examples": [
9304+
{
9305+
"request_id": "123e4567-e89b-12d3-a456-426614174000"
9306+
}
9307+
]
9308+
},
9309+
"StreamingInterruptResponse": {
9310+
"properties": {
9311+
"request_id": {
9312+
"type": "string",
9313+
"title": "Request Id",
9314+
"description": "The streaming request ID targeted by the interrupt call",
9315+
"examples": [
9316+
"123e4567-e89b-12d3-a456-426614174000"
9317+
]
9318+
},
9319+
"interrupted": {
9320+
"type": "boolean",
9321+
"title": "Interrupted",
9322+
"description": "Whether an in-progress stream was interrupted",
9323+
"examples": [
9324+
true
9325+
]
9326+
},
9327+
"message": {
9328+
"type": "string",
9329+
"title": "Message",
9330+
"description": "Human-readable interruption status message",
9331+
"examples": [
9332+
"Streaming request interrupted"
9333+
]
9334+
}
9335+
},
9336+
"type": "object",
9337+
"required": [
9338+
"request_id",
9339+
"interrupted",
9340+
"message"
9341+
],
9342+
"title": "StreamingInterruptResponse",
9343+
"description": "Model representing a response to a streaming interrupt request.",
9344+
"examples": [
9345+
{
9346+
"interrupted": true,
9347+
"message": "Streaming request interrupted",
9348+
"request_id": "123e4567-e89b-12d3-a456-426614174000"
9349+
}
9350+
]
9351+
},
91809352
"TLSConfiguration": {
91819353
"properties": {
91829354
"tls_certificate_path": {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""Endpoint for interrupting in-progress streaming query requests."""
2+
3+
from typing import Annotated, Any
4+
5+
from fastapi import APIRouter, Depends, HTTPException
6+
7+
from authentication import get_auth_dependency
8+
from authentication.interface import AuthTuple
9+
from authorization.middleware import authorize
10+
from models.config import Action
11+
from models.requests import StreamingInterruptRequest
12+
from models.responses import (
13+
ForbiddenResponse,
14+
NotFoundResponse,
15+
StreamingInterruptResponse,
16+
UnauthorizedResponse,
17+
)
18+
from utils.stream_interrupts import (
19+
StreamInterruptRegistry,
20+
get_stream_interrupt_registry,
21+
)
22+
23+
router = APIRouter(tags=["streaming_query_interrupt"])
24+
25+
stream_interrupt_responses: dict[int | str, dict[str, Any]] = {
26+
200: StreamingInterruptResponse.openapi_response(),
27+
401: UnauthorizedResponse.openapi_response(
28+
examples=["missing header", "missing token"]
29+
),
30+
403: ForbiddenResponse.openapi_response(examples=["endpoint"]),
31+
404: NotFoundResponse.openapi_response(examples=["streaming request"]),
32+
}
33+
34+
35+
@router.post(
36+
"/streaming_query/interrupt",
37+
responses=stream_interrupt_responses,
38+
summary="Streaming Query Interrupt Endpoint Handler",
39+
)
40+
@authorize(Action.STREAMING_QUERY)
41+
async def stream_interrupt_endpoint_handler(
42+
interrupt_request: StreamingInterruptRequest,
43+
auth: Annotated[AuthTuple, Depends(get_auth_dependency())],
44+
registry: Annotated[
45+
StreamInterruptRegistry, Depends(get_stream_interrupt_registry)
46+
],
47+
) -> StreamingInterruptResponse:
48+
"""Interrupt an in-progress streaming query by request identifier."""
49+
user_id, _, _, _ = auth
50+
request_id = interrupt_request.request_id
51+
interrupted = registry.cancel_stream(request_id, user_id)
52+
if not interrupted:
53+
response = NotFoundResponse(
54+
resource="streaming request",
55+
resource_id=request_id,
56+
)
57+
raise HTTPException(**response.model_dump())
58+
59+
return StreamingInterruptResponse(
60+
request_id=request_id,
61+
interrupted=True,
62+
message="Streaming request interrupted",
63+
)

0 commit comments

Comments
 (0)