Skip to content

Commit 2596e0a

Browse files
authored
Merge pull request #1971 from Jdubrick/interrupt-message-persistence
RHIDP-12952: persist interrupted conversation
2 parents ab86328 + 458b1d6 commit 2596e0a

10 files changed

Lines changed: 1059 additions & 19 deletions

File tree

src/app/endpoints/streaming_query.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
from configuration import configuration
4747
from constants import (
4848
ENDPOINT_PATH_STREAMING_QUERY,
49-
INTERRUPTED_RESPONSE_MESSAGE,
5049
LLM_TOKEN_EVENT,
5150
LLM_TOOL_CALL_EVENT,
5251
LLM_TOOL_RESULT_EVENT,
@@ -122,6 +121,7 @@
122121
validate_shield_ids_override,
123122
)
124123
from utils.stream_interrupts import (
124+
build_interrupted_response,
125125
deregister_stream,
126126
persist_interrupted_turn,
127127
register_interrupt_callback,
@@ -634,16 +634,22 @@ async def generate_response( # pylint: disable=too-many-arguments,too-many-posi
634634
current_task = asyncio.current_task()
635635
if current_task is not None:
636636
current_task.uncancel()
637+
full_text, suffix = build_interrupted_response(turn_summary.partial_tokens)
637638
if not persist_guard[0]:
638639
persist_guard[0] = True
639-
turn_summary.llm_response = INTERRUPTED_RESPONSE_MESSAGE
640+
turn_summary.llm_response = full_text
640641
await persist_interrupted_turn(
641642
context,
642643
responses_params,
643644
turn_summary,
644645
_background_topic_summary_tasks,
645646
original_input,
646647
)
648+
yield stream_event(
649+
{"id": turn_summary.next_chunk_id, "token": suffix},
650+
LLM_TOKEN_EVENT,
651+
context.query_request.media_type or MEDIA_TYPE_JSON,
652+
)
647653
yield stream_interrupted_event(context.request_id)
648654
finally:
649655
deregister_stream(context.request_id)
@@ -765,15 +771,17 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat
765771

766772
# Content part started - emit an empty token to kick off UI streaming
767773
if event_type == "response.content_part.added":
774+
event_id = chunk_id
775+
chunk_id += 1
776+
turn_summary.next_chunk_id = chunk_id
768777
yield stream_event(
769778
{
770-
"id": chunk_id,
779+
"id": event_id,
771780
"token": "",
772781
},
773782
LLM_TOKEN_EVENT,
774783
media_type,
775784
)
776-
chunk_id += 1
777785

778786
# Store MCP call item info for later lookup when arguments.done event occurs
779787
elif event_type == "response.output_item.added":
@@ -789,15 +797,18 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat
789797
elif event_type == "response.output_text.delta":
790798
delta_chunk = cast(TextDeltaChunk, chunk)
791799
text_parts.append(delta_chunk.delta)
800+
turn_summary.partial_tokens.append(delta_chunk.delta)
801+
event_id = chunk_id
802+
chunk_id += 1
803+
turn_summary.next_chunk_id = chunk_id
792804
yield stream_event(
793805
{
794-
"id": chunk_id,
806+
"id": event_id,
795807
"token": delta_chunk.delta,
796808
},
797809
LLM_TOKEN_EVENT,
798810
media_type,
799811
)
800-
chunk_id += 1
801812

802813
# Final text of the output (capture, but emit at response.completed)
803814
elif event_type == "response.output_text.done":
@@ -877,15 +888,17 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat
877888
# (LCORE-1572), so the persisted turn keeps non-text output items
878889
# rather than being flattened to the response text.
879890
turn_summary.output_items = list(latest_response_object.output or [])
891+
event_id = chunk_id
892+
chunk_id += 1
893+
turn_summary.next_chunk_id = chunk_id
880894
yield stream_event(
881895
{
882-
"id": chunk_id,
896+
"id": event_id,
883897
"token": turn_summary.llm_response,
884898
},
885899
LLM_TURN_COMPLETE_EVENT,
886900
media_type,
887901
)
888-
chunk_id += 1
889902

890903
# Incomplete or failed response - emit error
891904
elif event_type in ("response.incomplete", "response.failed"):

src/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
UNABLE_TO_PROCESS_RESPONSE: Final[str] = "Unable to process this request"
2626

2727
# Response stored in the conversation when the user interrupts a streaming request
28-
INTERRUPTED_RESPONSE_MESSAGE: Final[str] = "You interrupted this request."
28+
INTERRUPTED_RESPONSE_MESSAGE: Final[str] = "Response stopped by the user."
2929

3030
# Max seconds to wait for topic summary in background task after interrupt persist.
3131
TOPIC_SUMMARY_INTERRUPT_TIMEOUT_SECONDS: Final[float] = 30.0

src/models/common/turn_summary.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,16 @@ class TurnSummary(BaseModel):
114114
description="Structured response output items, captured for compacted-mode "
115115
"turn persistence (LCORE-1572). Empty on the non-compacted path.",
116116
)
117+
partial_tokens: list[str] = Field(
118+
default_factory=list,
119+
description="Accumulated text deltas during streaming, used to reconstruct "
120+
"partial content on interruption.",
121+
)
122+
next_chunk_id: int = Field(
123+
default=0,
124+
description="Next monotonic SSE chunk index, kept in sync with the inner "
125+
"generator so the interrupt handler can emit a sequentially valid id.",
126+
)
117127

118128

119129
class ToolInfoSummary(BaseModel):

src/utils/agents/streaming.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
)
2626

2727
from configuration import configuration
28-
from constants import INTERRUPTED_RESPONSE_MESSAGE, MEDIA_TYPE_JSON
28+
from constants import MEDIA_TYPE_JSON
2929
from log import get_logger
3030
from models.common.agents import (
3131
AgentTurnAccumulator,
@@ -65,6 +65,7 @@
6565
maybe_get_topic_summary,
6666
)
6767
from utils.stream_interrupts import (
68+
build_interrupted_response,
6869
deregister_stream,
6970
persist_interrupted_turn,
7071
register_interrupt_callback,
@@ -197,16 +198,23 @@ async def generate_agent_response(
197198
current_task = asyncio.current_task()
198199
if current_task is not None:
199200
current_task.uncancel()
201+
full_text, suffix = build_interrupted_response(turn_summary.partial_tokens)
200202
if not persist_guard[0]:
201203
persist_guard[0] = True
202-
turn_summary.llm_response = INTERRUPTED_RESPONSE_MESSAGE
204+
turn_summary.llm_response = full_text
203205
await persist_interrupted_turn(
204206
context,
205207
responses_params,
206208
turn_summary,
207209
background_topic_summary_tasks,
208210
original_input,
209211
)
212+
yield serialize_event(
213+
TokenStreamPayload.create(
214+
chunk_id=turn_summary.next_chunk_id, token=suffix
215+
),
216+
media_type,
217+
)
210218
yield serialize_event(
211219
InterruptedStreamPayload.create(request_id=context.request_id),
212220
media_type,
@@ -347,11 +355,13 @@ def _process_token(
347355
Token stream payload containing the emitted token chunk.
348356
"""
349357
state.text_parts.append(text)
358+
state.turn_summary.partial_tokens.append(text)
350359
payload = TokenStreamPayload.create(
351360
chunk_id=state.chunk_id,
352361
token=text,
353362
)
354363
state.chunk_id += 1
364+
state.turn_summary.next_chunk_id = state.chunk_id
355365
return payload
356366

357367

@@ -402,6 +412,7 @@ def _(
402412
token=final_text,
403413
)
404414
state.chunk_id += 1
415+
state.turn_summary.next_chunk_id = state.chunk_id
405416
return payload
406417

407418

0 commit comments

Comments
 (0)