Skip to content

Commit c2b99cb

Browse files
committed
add streaming_query/interrupt for stopping in-flight requests
Signed-off-by: Jordan Dubrick <jdubrick@redhat.com>
1 parent 5495f95 commit c2b99cb

12 files changed

Lines changed: 729 additions & 26 deletions

File tree

docs/openapi.json

Lines changed: 175 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,7 +1661,7 @@
16611661
"type": "string",
16621662
"format": "text/event-stream"
16631663
},
1664-
"example": "data: {\"event\": \"start\", \"data\": {\"conversation_id\": \"123e4567-e89b-12d3-a456-426614174000\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 0, \"token\": \"No Violation\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 1, \"token\": \"\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 2, \"token\": \"Hello\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 3, \"token\": \"!\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 4, \"token\": \" How\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 5, \"token\": \" can\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 6, \"token\": \" I\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 7, \"token\": \" assist\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 8, \"token\": \" you\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 9, \"token\": \" today\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 10, \"token\": \"?\"}}\n\ndata: {\"event\": \"turn_complete\", \"data\": {\"token\": \"Hello! How can I assist you today?\"}}\n\ndata: {\"event\": \"end\", \"data\": {\"referenced_documents\": [], \"truncated\": null, \"input_tokens\": 11, \"output_tokens\": 19}, \"available_quotas\": {}}\n\n"
1664+
"example": "data: {\"event\": \"start\", \"data\": {\"conversation_id\": \"123e4567-e89b-12d3-a456-426614174000\", \"request_id\": \"123e4567-e89b-12d3-a456-426614174001\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 0, \"token\": \"No Violation\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 1, \"token\": \"\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 2, \"token\": \"Hello\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 3, \"token\": \"!\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 4, \"token\": \" How\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 5, \"token\": \" can\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 6, \"token\": \" I\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 7, \"token\": \" assist\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 8, \"token\": \" you\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 9, \"token\": \" today\"}}\n\ndata: {\"event\": \"token\", \"data\": {\"id\": 10, \"token\": \"?\"}}\n\ndata: {\"event\": \"turn_complete\", \"data\": {\"token\": \"Hello! How can I assist you today?\"}}\n\ndata: {\"event\": \"end\", \"data\": {\"referenced_documents\": [], \"truncated\": null, \"input_tokens\": 11, \"output_tokens\": 19}, \"available_quotas\": {}}\n\n"
16651665
}
16661666
}
16671667
},
@@ -1912,6 +1912,111 @@
19121912
}
19131913
}
19141914
},
1915+
"/v1/streaming_query/interrupt": {
1916+
"post": {
1917+
"tags": [
1918+
"streaming_query_interrupt"
1919+
],
1920+
"summary": "Streaming Query Interrupt Endpoint Handler",
1921+
"description": "Interrupt an in-progress streaming query by request identifier.",
1922+
"operationId": "stream_interrupt_endpoint_handler_v1_streaming_query_interrupt_post",
1923+
"requestBody": {
1924+
"content": {
1925+
"application/json": {
1926+
"schema": {
1927+
"$ref": "#/components/schemas/StreamingInterruptRequest"
1928+
}
1929+
}
1930+
},
1931+
"required": true
1932+
},
1933+
"responses": {
1934+
"200": {
1935+
"description": "Successful response",
1936+
"content": {
1937+
"application/json": {
1938+
"schema": {
1939+
"$ref": "#/components/schemas/StreamingInterruptResponse"
1940+
},
1941+
"example": {
1942+
"interrupted": true,
1943+
"message": "Streaming request interrupted",
1944+
"request_id": "123e4567-e89b-12d3-a456-426614174000"
1945+
}
1946+
}
1947+
}
1948+
},
1949+
"401": {
1950+
"description": "Unauthorized",
1951+
"content": {
1952+
"application/json": {
1953+
"schema": {
1954+
"$ref": "#/components/schemas/UnauthorizedResponse"
1955+
},
1956+
"examples": {
1957+
"missing header": {
1958+
"value": {
1959+
"detail": {
1960+
"cause": "No Authorization header found",
1961+
"response": "Missing or invalid credentials provided by client"
1962+
}
1963+
}
1964+
},
1965+
"missing token": {
1966+
"value": {
1967+
"detail": {
1968+
"cause": "No token found in Authorization header",
1969+
"response": "Missing or invalid credentials provided by client"
1970+
}
1971+
}
1972+
}
1973+
}
1974+
}
1975+
}
1976+
},
1977+
"403": {
1978+
"description": "Permission denied",
1979+
"content": {
1980+
"application/json": {
1981+
"schema": {
1982+
"$ref": "#/components/schemas/ForbiddenResponse"
1983+
},
1984+
"examples": {
1985+
"endpoint": {
1986+
"value": {
1987+
"detail": {
1988+
"cause": "User 6789 is not authorized to access this endpoint.",
1989+
"response": "User does not have permission to access this endpoint"
1990+
}
1991+
}
1992+
}
1993+
}
1994+
}
1995+
}
1996+
},
1997+
"404": {
1998+
"description": "Resource not found",
1999+
"content": {
2000+
"application/json": {
2001+
"schema": {
2002+
"$ref": "#/components/schemas/NotFoundResponse"
2003+
}
2004+
}
2005+
}
2006+
},
2007+
"422": {
2008+
"description": "Validation Error",
2009+
"content": {
2010+
"application/json": {
2011+
"schema": {
2012+
"$ref": "#/components/schemas/HTTPValidationError"
2013+
}
2014+
}
2015+
}
2016+
}
2017+
}
2018+
}
2019+
},
19152020
"/v1/config": {
19162021
"get": {
19172022
"tags": [
@@ -4332,7 +4437,7 @@
43324437
],
43334438
"summary": "Handle A2A Jsonrpc",
43344439
"description": "Handle A2A JSON-RPC requests following the A2A protocol specification.\n\nThis endpoint uses the DefaultRequestHandler from the A2A SDK to handle\nall JSON-RPC requests including message/send, message/stream, etc.\n\nThe A2A SDK application is created per-request to include authentication\ncontext while still leveraging FastAPI's authorization middleware.\n\nAutomatically detects streaming requests (message/stream JSON-RPC method)\nand returns a StreamingResponse to enable real-time chunk delivery.\n\nArgs:\n request: FastAPI request object\n auth: Authentication tuple\n mcp_headers: MCP headers for context propagation\n\nReturns:\n JSON-RPC response or streaming response",
4335-
"operationId": "handle_a2a_jsonrpc_a2a_post",
4440+
"operationId": "handle_a2a_jsonrpc_a2a_get",
43364441
"responses": {
43374442
"200": {
43384443
"description": "Successful Response",
@@ -4350,7 +4455,7 @@
43504455
],
43514456
"summary": "Handle A2A Jsonrpc",
43524457
"description": "Handle A2A JSON-RPC requests following the A2A protocol specification.\n\nThis endpoint uses the DefaultRequestHandler from the A2A SDK to handle\nall JSON-RPC requests including message/send, message/stream, etc.\n\nThe A2A SDK application is created per-request to include authentication\ncontext while still leveraging FastAPI's authorization middleware.\n\nAutomatically detects streaming requests (message/stream JSON-RPC method)\nand returns a StreamingResponse to enable real-time chunk delivery.\n\nArgs:\n request: FastAPI request object\n auth: Authentication tuple\n mcp_headers: MCP headers for context propagation\n\nReturns:\n JSON-RPC response or streaming response",
4353-
"operationId": "handle_a2a_jsonrpc_a2a_post",
4458+
"operationId": "handle_a2a_jsonrpc_a2a_get",
43544459
"responses": {
43554460
"200": {
43564461
"description": "Successful Response",
@@ -9224,6 +9329,73 @@
92249329
}
92259330
]
92269331
},
9332+
"StreamingInterruptRequest": {
9333+
"properties": {
9334+
"request_id": {
9335+
"type": "string",
9336+
"title": "Request Id",
9337+
"description": "The active streaming request ID to interrupt",
9338+
"examples": [
9339+
"123e4567-e89b-12d3-a456-426614174000"
9340+
]
9341+
}
9342+
},
9343+
"additionalProperties": false,
9344+
"type": "object",
9345+
"required": [
9346+
"request_id"
9347+
],
9348+
"title": "StreamingInterruptRequest",
9349+
"description": "Model representing a request to interrupt an active streaming query.",
9350+
"examples": [
9351+
{
9352+
"request_id": "123e4567-e89b-12d3-a456-426614174000"
9353+
}
9354+
]
9355+
},
9356+
"StreamingInterruptResponse": {
9357+
"properties": {
9358+
"request_id": {
9359+
"type": "string",
9360+
"title": "Request Id",
9361+
"description": "The streaming request ID targeted by the interrupt call",
9362+
"examples": [
9363+
"123e4567-e89b-12d3-a456-426614174000"
9364+
]
9365+
},
9366+
"interrupted": {
9367+
"type": "boolean",
9368+
"title": "Interrupted",
9369+
"description": "Whether an in-progress stream was interrupted",
9370+
"examples": [
9371+
true
9372+
]
9373+
},
9374+
"message": {
9375+
"type": "string",
9376+
"title": "Message",
9377+
"description": "Human-readable interruption status message",
9378+
"examples": [
9379+
"Streaming request interrupted"
9380+
]
9381+
}
9382+
},
9383+
"type": "object",
9384+
"required": [
9385+
"request_id",
9386+
"interrupted",
9387+
"message"
9388+
],
9389+
"title": "StreamingInterruptResponse",
9390+
"description": "Model representing a response to a streaming interrupt request.",
9391+
"examples": [
9392+
{
9393+
"interrupted": true,
9394+
"message": "Streaming request interrupted",
9395+
"request_id": "123e4567-e89b-12d3-a456-426614174000"
9396+
}
9397+
]
9398+
},
92279399
"TLSConfiguration": {
92289400
"properties": {
92299401
"tls_certificate_path": {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""Endpoint for interrupting in-progress streaming query requests."""
2+
3+
from typing import Annotated, Any
4+
5+
from fastapi import APIRouter, Depends, HTTPException
6+
7+
from authentication import get_auth_dependency
8+
from authentication.interface import AuthTuple
9+
from authorization.middleware import authorize
10+
from models.config import Action
11+
from models.requests import StreamingInterruptRequest
12+
from models.responses import (
13+
ForbiddenResponse,
14+
NotFoundResponse,
15+
StreamingInterruptResponse,
16+
UnauthorizedResponse,
17+
)
18+
from utils.stream_interrupts import (
19+
StreamInterruptRegistry,
20+
get_stream_interrupt_registry,
21+
)
22+
23+
router = APIRouter(tags=["streaming_query_interrupt"])
24+
25+
stream_interrupt_responses: dict[int | str, dict[str, Any]] = {
26+
200: StreamingInterruptResponse.openapi_response(),
27+
401: UnauthorizedResponse.openapi_response(
28+
examples=["missing header", "missing token"]
29+
),
30+
403: ForbiddenResponse.openapi_response(examples=["endpoint"]),
31+
404: NotFoundResponse.openapi_response(examples=["streaming request"]),
32+
}
33+
34+
35+
@router.post(
36+
"/streaming_query/interrupt",
37+
responses=stream_interrupt_responses,
38+
summary="Streaming Query Interrupt Endpoint Handler",
39+
)
40+
@authorize(Action.STREAMING_QUERY)
41+
async def stream_interrupt_endpoint_handler(
42+
interrupt_request: StreamingInterruptRequest,
43+
auth: Annotated[AuthTuple, Depends(get_auth_dependency())],
44+
registry: Annotated[
45+
StreamInterruptRegistry, Depends(get_stream_interrupt_registry)
46+
],
47+
) -> StreamingInterruptResponse:
48+
"""Interrupt an in-progress streaming query by request identifier."""
49+
user_id, _, _, _ = auth
50+
request_id = interrupt_request.request_id
51+
interrupted = registry.cancel_stream(request_id, user_id)
52+
if not interrupted:
53+
response = NotFoundResponse(
54+
resource="streaming request",
55+
resource_id=request_id,
56+
)
57+
raise HTTPException(**response.model_dump())
58+
59+
return StreamingInterruptResponse(
60+
request_id=request_id,
61+
interrupted=True,
62+
message="Streaming request interrupted",
63+
)

0 commit comments

Comments
 (0)