Skip to content

Commit d1c79ab

Browse files
LCORE-1572: Integrate conversation compaction into the query flow (#1796)
* LCORE-1572: add conversation compaction and wire it into /v1/query Introduce runtime conversation compaction (Option A): once a conversation approaches the model's context window, lightspeed-stack summarizes older turns and owns the LLM context itself instead of letting Llama Stack reload the full history. - src/utils/conversation_compaction.py: apply_compaction() async generator and apply_compaction_blocking() wrapper. Holds a per-conversation lock (R11), estimates tokens (LCORE-1569), partitions and summarizes old turns (LCORE-1570), writes the summary into the conversation as a marker item, and rebuilds the request as explicit input (summaries + recent verbatim turns + new query). Marker items track the boundary; the conversation_id is preserved and the full history stays in Llama Stack items for audit. - models/common/responses/responses_api_params.py: omit_conversation flag so the conversation parameter is dropped from the request body in compacted mode while remaining on the object for identity. - configuration.py: AppConfig.compaction accessor. - app/endpoints/query.py: apply compaction after preparing params; in compacted mode store the completed turn against the original user query (the conversation parameter is no longer sent, so Llama Stack does not persist the turn automatically). Background: the spec's original marker-keeps-conversation-parameter approach was found unimplementable on llama-stack 0.6.0, which always reloads the full conversation history when the conversation parameter is set. This restores the spike's original explicit-input approach. * LCORE-1572: unit tests for conversation compaction core and /v1/query Cover marker detection and boundary selection, explicit-input assembly, the trigger threshold, the disabled / no-context-window / existing-marker / triggered paths of apply_compaction, the streaming CompactionStartedEvent ordering, and compacted-turn storage. * LCORE-1572: apply conversation compaction in the A2A endpoint The A2A executor uses the same prepare_responses_params + Responses API flow as /v1/query and persists conversation_id for multi-turn contexts, so it accumulates context and must compact too. - Run apply_compaction_blocking before responses.create (A2A is not a browser SSE stream, so no progress event is emitted). - In compacted mode, persist the completed turn from the response.completed stream event, since the conversation parameter is no longer sent and Llama Stack therefore does not store the turn automatically. * LCORE-1572: apply conversation compaction in the streaming_query endpoint Stream /v1/streaming_query through the compaction-aware path only when the conversation actually compacts, so non-compacting requests are unaffected (byte-for-byte the existing flow, including HTTP error handling). - conversation_compaction: add needs_compaction_path(), a cheap pre-stream predicate (no LLM, no lock) that is true only when the conversation already has a summary marker or would trigger a new compaction. - streaming_query: when the predicate is true, stream via the new generate_response_with_compaction(), which emits the compaction progress event before the summarization LLM call (R12) and creates the response inside the stream, surfacing create-time errors as SSE error events. generate_response gains emit_start/compacted parameters and, in compacted mode, appends the completed turn to the conversation (the conversation parameter is not sent, so Llama Stack does not store it automatically). - a2a: silence too-many-lines after the earlier compaction wiring. * LCORE-1572: tests for the streaming compaction gate Cover needs_compaction_path: disabled, existing-marker, over-threshold, and under-threshold — the gate that keeps non-compacting requests on the unchanged streaming path. * LCORE-1572: apply conversation compaction in the /v1/responses endpoint /v1/responses is the OpenAI-compatible Responses API, so compaction is silent: no custom SSE event is injected (preserving wire compatibility) and create-time error handling is unchanged. Summarization runs before the response is created, on both the streaming and non-streaming paths. - responses_endpoint_handler: run apply_compaction_blocking before the streaming/non-streaming split, gated to stateful single-conversation requests (store=True, a conversation present, no previous_response_id). - ResponsesContext: carry compacted_original_input so the finalization can store the turn against the original user input. - _append_previous_response_turn: generalized to also append the turn in compacted mode (the conversation parameter is dropped, so Llama Stack does not store the turn automatically) using the original input. * LCORE-1572: tests for /v1/responses compacted-turn storage Verify _append_previous_response_turn stores the turn against the original input in compacted mode, and stores nothing when store is disabled. * LCORE-1572: update spec doc to the as-built compaction design Revise R10, R12, the architecture flow, the changed-request-flow section, and the implementation guidance to match what was built: in compacted mode lightspeed-stack builds explicit input and omits the Llama Stack conversation parameter (which always reloads full history), preserving conversation_id and the full item history. Record the redesign and the four affected endpoints (query, streaming_query, A2A, /v1/responses) in a new Changelog section. * LCORE-1572: fix needs_compaction_path docstring (pydocstyle D400) * LCORE-1572: build compacted input as typed messages (silence Pydantic warning) The explicit compacted input was assembled as plain dicts, which produced PydanticSerializationUnexpectedValue warnings when ResponsesApiParams was dumped (its input field is typed ResponseInput). Build the summary, recent verbatim, and query items as typed OpenAIResponseMessage objects instead. Verified end-to-end against a live stack: the serializer warning is gone and compaction still triggers, preserves conversation identity, and recalls earlier context correctly. * LCORE-1572: raise instead of assert on the drained compaction result apply_compaction_blocking asserted that the generator yielded a result. Under python -O asserts are stripped, so the guard would vanish and a None result could propagate to callers. Replace it with an explicit None check that raises RuntimeError. Clears a GitHub code-scanning (CodeQL) "use of assert" finding. The repository's Bandit configuration skips B101, so this only surfaced via code scanning, not the Bandit CI job. * LCORE-1572: wire persisted recursive fold (R3) via the summary cache Make the conversation summary cache the preferred source of truth for compaction summaries and the home of the persisted recursive fold. - apply_compaction / apply_compaction_blocking gain cache + user_id + skip_user_id_check. Summaries are read from the cache (get_summaries) and each new chunk is written to it (store_summary); the Llama Stack marker texts remain an authoritative fallback when no persisting cache is configured (marker-only mode, additive summaries, no fold). - When the persisted summaries themselves exceed the threshold, they are folded via recursively_resummarize and the fold is persisted with replace_summaries, so it is computed once and reused rather than recomputed per request (R3). - configured_conversation_cache() resolves the configured cache (or None) for the endpoints. - Wired into /v1/query, /v1/streaming_query, and /v1/responses. The A2A executor stays marker-only: it has no resolved user_id for the (user_id, conversation_id) cache key. Adds 7 unit tests: cache-preferred reads, store-on-compaction, fold trigger and persistence, no-fold-without-cache, marker fallback, and the cache resolver. * LCORE-1572: address CodeRabbit review — list-form input tokens + clarity rename - Count tokens for list-form ResponseInput (e.g. /v1/responses), not only the string form, so compaction is not skipped on large item-list inputs that could otherwise still hit HTTP 413. Adds _estimate_response_input_tokens and a regression test. - Rename CompactionResult.summarized to compacted: the flag means "served in compacted / explicit-input mode" (set whenever the conversation has any summary, reused or fresh), not "a summary was created this request". The old name caused reviewer confusion about turn-persistence gating, which is correct as written. * LCORE-1572: persist compacted streaming turns with structure (CodeRabbit #4) In compacted mode the streaming endpoint persisted the completed turn as flattened strings via append_turn_to_conversation, dropping attachments and non-text output items, and double-storing for shield-blocked requests. Persist the structured turn instead: - Capture the response's structured output items onto TurnSummary.output_items (set at response.completed, and to the refusal item on a shield block). - generate_response now takes original_input and persists via store_compacted_turn with the original input plus structured output items, matching the /v1/query and A2A paths. - The shield-blocked branch no longer stores the turn when the conversation parameter was omitted (compacted mode); generate_response stores it once with the correct original input, avoiding the duplicate refusal turn. Adds tests for the structured compacted persistence and the shield dedup (compacted and non-compacted). * LCORE-1572: do not initialize the conversation cache when compaction is disabled configured_conversation_cache() is evaluated eagerly as a call argument in the query endpoint, so it ran on every request and accessed configuration.conversation_cache unconditionally — forcing the (SQLite) cache to initialize even when compaction is disabled. On configurations whose cache file could not be opened that raised and returned HTTP 500, which failed the e2e suites (where compaction is off). Return None without touching the cache when compaction is disabled; the cache is only used by compaction on this path. Adds a regression test. * LCORE-1572: address CodeRabbit round 2 (compacted-mode persistence edges) Follow-ups to the streaming-persistence work, all for non-happy-path terminals in compacted mode (conversation parameter omitted), so the persisted turn uses the original user input + structured output rather than the explicit rewrite or flattened strings: - /v1/responses: shield-blocked turns persist against compacted_original_input, not api_params.input (the explicit rewrite). - streaming: interrupted (CancelledError) turns thread original_input through the interrupt callback and persist structured items, fixing the wrong-input storage and the cast(str, input) break on list inputs. - streaming: capture output_items on response.failed / response.incomplete terminals too, not only response.completed, so compacted persistence keeps partial output. - TurnSummary.output_items typed as list[OpenAIResponseOutput] instead of list[Any]. Also documents that disabling compaction mid-conversation on an already-compacted conversation reverts it to full-history replay (unsupported transition); the enabled flag stays a full off-switch (CodeRabbit E, declined by design). Adds unit tests for the blocked /responses path, the interrupted compacted path, and output_items capture on a failed terminal. * LCORE-1572: document the disable-after-compaction limitation in the spec doc (CodeRabbit E) * LCORE-1572: document as-built divergences in spec doc (cache source-of-truth, persisted fold) The spec still described the earlier design (cache as a parallel/best-effort layer, markers as the summary source). Update Summary storage, Additive summarization, and Changed request flow to the as-built design, and add a Changelog entry: the cache is the preferred source of truth for summaries (marker texts as fallback + audit/boundary), the recursive fold is persisted via replace_summaries (in-memory fold rejected), A2A is marker-only, and the enabled flag stays a full off-switch. * LCORE-1572: fix line-too-long (C0301) in interrupted-turn test docstring * LCORE-1572: harden disabled-cache regression test to fail on eager cache access (CodeRabbit) * LCORE-1572: ref-count per-conversation lock + extract apply_compaction helpers (review) Addresses two inline review nits from tisnik on the LCORE-1572 PR. Per-conversation lock cleanup (R11): Replace the bare ``dict[str, asyncio.Lock]`` registry with a ref-counted ``_LockEntry`` and an ``@asynccontextmanager`` helper guarded by a registry mutex. Entries are removed once the last waiter exits, so the registry no longer grows unbounded with the set of conversation_ids ever seen by the process. Adds tests for serialization, deletion-after-last-release, entry-kept-while-waiters-queued, and cleanup-on-cancellation. apply_compaction refactor: Extract five helpers — ``_load_compaction_state``, ``_estimate_total_tokens``, ``_persist_new_summary_chunk``, ``_maybe_persist_fold``, ``_compacted_result`` — leaving the orchestrating generator linear and roughly one screen long. The state-loading, token-estimation, persistence-side-effects, and result- building concerns are now each named and individually testable. * LCORE-1572: tighten typed-item handling in compaction helpers (review) Addresses asimurka's review nit about the dual dict-or-model branches in ``_verbatim_input_message`` and the surrounding token-estimator helpers. Llama Stack's ``client.conversations.items.list`` returns items as typed Pydantic models (the ``ItemListResponse`` discriminated union). The dict branches in ``is_message_item``, ``extract_message_text``, ``estimate_conversation_tokens``, ``format_conversation_for_summary`` and ``_verbatim_input_message`` were defensive code for a shape that never arrives from production code paths — they only kept the dict-using test fixtures alive. Drop the dict branches and tighten the docstrings to state the typed-item contract. Update the compaction test fixtures (``_msg``, ``_marker``) to return ``OpenAIResponseMessage`` instances instead of dicts. Remove the token-estimator and compaction tests that explicitly asserted dict-shape acceptance; replace with single tests verifying that dicts are now ignored. * LCORE-1572: soften R12 doc on silent /v1/responses compaction (review) Addresses asimurka's review note: emitting a compaction event on the ``/v1/responses`` endpoint would itself be spec-compliant under the OpenResponses extension-events convention, so framing silent compaction as a forced choice for "wire compatibility" overstated the constraint. Reword R12 and the changelog entry to acknowledge the spec-compliant option and to frame silent as the *initial* choice, kept to preserve drop-in compatibility with clients written against the upstream OpenAI Responses API; emitting the event on this endpoint is left open as a follow-up. Lightspeed's own clients can already use ``/v1/streaming_query`` to receive the event.
1 parent 2606037 commit d1c79ab

18 files changed

Lines changed: 2325 additions & 183 deletions

docs/design/conversation-compaction/conversation-compaction.md

Lines changed: 102 additions & 50 deletions
Large diffs are not rendered by default.

src/app/endpoints/a2a.py

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Handler for A2A (Agent-to-Agent) protocol endpoints using Responses API."""
22

3+
# pylint: disable=too-many-lines
4+
35
import asyncio
46
import json
57
import uuid
@@ -32,7 +34,7 @@
3234
from llama_stack_api.openai_responses import (
3335
OpenAIResponseObjectStream,
3436
)
35-
from llama_stack_client import APIConnectionError
37+
from llama_stack_client import APIConnectionError, AsyncLlamaStackClient
3638
from starlette.responses import Response, StreamingResponse
3739

3840
from a2a_storage import A2AContextStore, A2AStorageFactory
@@ -45,13 +47,18 @@
4547
from constants import MEDIA_TYPE_EVENT_STREAM
4648
from log import get_logger
4749
from models.api.requests import QueryRequest
50+
from models.common.responses.types import ResponseInput
4851
from models.config import Action
52+
from utils.conversation_compaction import (
53+
apply_compaction_blocking,
54+
store_compacted_turn,
55+
)
4956
from utils.mcp_headers import McpHeaders, mcp_headers_dependency
5057
from utils.responses import (
5158
extract_text_from_response_item,
5259
prepare_responses_params,
5360
)
54-
from utils.suid import normalize_conversation_id
61+
from utils.suid import normalize_conversation_id, to_llama_stack_conversation_id
5562
from version import __version__
5663

5764
logger = get_logger(__name__)
@@ -336,6 +343,19 @@ async def _process_task_streaming( # pylint: disable=too-many-locals
336343
store=True,
337344
request_headers=self.request_headers,
338345
)
346+
# Compact the conversation if it is approaching the context window
347+
# limit. A2A is not a browser SSE stream, so no progress event is
348+
# emitted; the blocking variant summarizes inline before the call.
349+
# No conversation cache is passed: the A2A executor has no resolved
350+
# user_id for the (user_id, conversation_id) cache key, so A2A runs
351+
# in marker-only mode (additive summaries, no persisted fold).
352+
compaction = await apply_compaction_blocking(
353+
client,
354+
responses_params,
355+
configuration.inference,
356+
configuration.compaction,
357+
)
358+
responses_params = compaction.params
339359
# Stream response from LLM using the Responses API
340360
stream = await client.responses.create(**responses_params.model_dump())
341361
except APIConnectionError as e:
@@ -392,9 +412,16 @@ async def _process_task_streaming( # pylint: disable=too-many-locals
392412
)
393413
)
394414

395-
# Process stream using generator and aggregator pattern
415+
# Process stream using generator and aggregator pattern. In compacted
416+
# mode the conversation parameter is not sent, so the turn is stored
417+
# explicitly once the response completes (see _convert_stream_to_events).
396418
async for a2a_event in self._convert_stream_to_events(
397-
stream, task_id, context_id, conversation_id
419+
stream,
420+
task_id,
421+
context_id,
422+
conversation_id,
423+
client,
424+
compaction.original_input if compaction.compacted else None,
398425
):
399426
aggregator.process_event(a2a_event)
400427
await event_queue.enqueue_event(a2a_event)
@@ -414,12 +441,14 @@ async def _process_task_streaming( # pylint: disable=too-many-locals
414441
final=True,
415442
)
416443

417-
async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-many-locals
444+
async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-many-locals,too-many-arguments,too-many-positional-arguments
418445
self,
419446
stream: AsyncIterator[OpenAIResponseObjectStream],
420447
task_id: str,
421448
context_id: str,
422449
conversation_id: Optional[str],
450+
client: Optional[AsyncLlamaStackClient] = None,
451+
compacted_original_input: Optional[ResponseInput] = None,
423452
) -> AsyncIterator[Any]:
424453
"""Convert Responses API stream chunks to A2A events.
425454
@@ -508,6 +537,20 @@ async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-ma
508537

509538
if response_obj:
510539
output = getattr(response_obj, "output", [])
540+
# In compacted mode the conversation parameter was not sent,
541+
# so persist this turn ourselves to keep the recent-turn
542+
# buffer and audit history intact for the next request.
543+
if (
544+
compacted_original_input is not None
545+
and client is not None
546+
and conversation_id
547+
):
548+
await store_compacted_turn(
549+
client,
550+
to_llama_stack_conversation_id(conversation_id),
551+
compacted_original_input,
552+
output,
553+
)
511554
a2a_parts = _convert_responses_content_to_a2a_parts(output)
512555
if not a2a_parts and final_text:
513556
a2a_parts = [Part(root=TextPart(text=final_text))]

src/app/endpoints/query.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,14 @@
3939
from models.api.responses.successful import QueryResponse
4040
from models.common.moderation import ShieldModerationResult
4141
from models.common.responses.responses_api_params import ResponsesApiParams
42+
from models.common.responses.types import ResponseInput
4243
from models.common.turn_summary import TurnSummary
4344
from models.config import Action
45+
from utils.conversation_compaction import (
46+
apply_compaction_blocking,
47+
configured_conversation_cache,
48+
store_compacted_turn,
49+
)
4450
from utils.conversations import append_turn_items_to_conversation
4551
from utils.endpoints import (
4652
check_configuration_loaded,
@@ -196,6 +202,20 @@ async def query_endpoint_handler(
196202
inline_rag_context=inline_rag_context.context_text,
197203
)
198204

205+
# Compact the conversation if it is approaching the context window limit.
206+
# When compaction is active, params carry explicit input and the
207+
# conversation parameter is dropped (lightspeed-stack owns the context).
208+
compaction = await apply_compaction_blocking(
209+
client,
210+
responses_params,
211+
configuration.inference,
212+
configuration.compaction,
213+
cache=configured_conversation_cache(),
214+
user_id=user_id,
215+
skip_user_id_check=_skip_userid_check,
216+
)
217+
responses_params = compaction.params
218+
199219
# Handle Azure token refresh if needed
200220
if (
201221
responses_params.model.startswith("azure")
@@ -207,7 +227,11 @@ async def query_endpoint_handler(
207227

208228
# Retrieve response using Responses API
209229
turn_summary = await retrieve_response(
210-
client, responses_params, moderation_result, endpoint_path
230+
client,
231+
responses_params,
232+
moderation_result,
233+
endpoint_path,
234+
original_input=compaction.original_input if compaction.compacted else None,
211235
)
212236

213237
if moderation_result.decision == "passed":
@@ -282,6 +306,7 @@ async def retrieve_response(
282306
responses_params: ResponsesApiParams,
283307
moderation_result: ShieldModerationResult,
284308
endpoint_path: str = "",
309+
original_input: Optional[ResponseInput] = None,
285310
) -> TurnSummary:
286311
"""
287312
Retrieve response from LLMs and agents.
@@ -294,17 +319,28 @@ async def retrieve_response(
294319
client: The AsyncLlamaStackClient to use for the request.
295320
responses_params: The Responses API parameters.
296321
moderation_result: The moderation result.
322+
endpoint_path: The request path, for metrics/telemetry.
323+
original_input: Set only in compacted mode (LCORE-1572). It is the new
324+
user query before the explicit-input rewrite. When provided, the
325+
turn is appended to the conversation here, because the conversation
326+
parameter is no longer passed to Llama Stack and so the turn is not
327+
stored automatically.
297328
298329
Returns:
299330
-------
300331
TurnSummary: Summary of the LLM response content
301332
"""
302333
response: Optional[OpenAIResponseObject] = None
334+
# In compacted mode, the new turn must be stored against the original user
335+
# query, not the explicit summaries-plus-recent input we send to inference.
336+
turn_input = (
337+
original_input if original_input is not None else responses_params.input
338+
)
303339
if moderation_result.decision == "blocked":
304340
await append_turn_items_to_conversation(
305341
client,
306342
responses_params.conversation,
307-
responses_params.input,
343+
turn_input,
308344
[moderation_result.refusal_response],
309345
)
310346
return TurnSummary(
@@ -331,6 +367,16 @@ async def retrieve_response(
331367
error_response = handle_known_apistatus_errors(e, responses_params.model)
332368
raise HTTPException(**error_response.model_dump()) from e
333369

370+
# In compacted mode, store the completed turn ourselves (the conversation
371+
# parameter was not sent, so Llama Stack did not persist it).
372+
if original_input is not None:
373+
await store_compacted_turn(
374+
client,
375+
responses_params.conversation,
376+
original_input,
377+
response.output,
378+
)
379+
334380
vector_store_ids = extract_vector_store_ids_from_tools(responses_params.tools)
335381
rag_id_mapping = configuration.rag_id_mapping
336382
return build_turn_summary(

src/app/endpoints/responses.py

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,13 @@
6262
from models.common.moderation import ShieldModerationBlocked
6363
from models.common.responses.contexts import ResponsesContext
6464
from models.common.responses.responses_api_params import ResponsesApiParams
65+
from models.common.responses.types import ResponseInput
6566
from models.common.turn_summary import TurnSummary
6667
from models.config import Action
68+
from utils.conversation_compaction import (
69+
apply_compaction_blocking,
70+
configured_conversation_cache,
71+
)
6772
from utils.conversations import append_turn_items_to_conversation
6873
from utils.endpoints import (
6974
check_configuration_loaded,
@@ -225,10 +230,18 @@ async def _persist_blocked_response_turn(
225230
"""
226231
if api_params.store:
227232
moderation_result = cast(ShieldModerationBlocked, context.moderation_result)
233+
# In compacted mode the conversation parameter was dropped and
234+
# api_params.input is the explicit-input rewrite, so persist the turn
235+
# against the original user input instead (LCORE-1572).
236+
user_input = (
237+
context.compacted_original_input
238+
if context.compacted_original_input is not None
239+
else api_params.input
240+
)
228241
await append_turn_items_to_conversation(
229242
client=context.client,
230243
conversation_id=api_params.conversation,
231-
user_input=api_params.input,
244+
user_input=user_input,
232245
llm_output=[moderation_result.refusal_response],
233246
)
234247

@@ -238,14 +251,30 @@ async def _append_previous_response_turn(
238251
context: ResponsesContext,
239252
output: Sequence[OpenAIResponseOutput],
240253
) -> None:
241-
"""Append response output when continuing from a previous response id.
254+
"""Append the completed turn when Llama Stack did not store it automatically.
255+
256+
Llama Stack stores the turn itself only when the conversation parameter is
257+
sent. Two cases bypass that and require an explicit append: continuing from
258+
a ``previous_response_id``, and conversation compaction (LCORE-1572), where
259+
the conversation parameter is dropped in favor of explicit input. In the
260+
compaction case the turn is stored against the original user input (before
261+
the explicit-input rewrite), carried on the context.
242262
243263
Args:
244264
api_params: Responses API parameters containing conversation details.
245265
context: Request-scoped Responses API context.
246266
output: Final output items from the Responses API object.
247267
"""
248-
if api_params.store and api_params.previous_response_id:
268+
if not api_params.store:
269+
return
270+
if context.compacted_original_input is not None:
271+
await append_turn_items_to_conversation(
272+
context.client,
273+
api_params.conversation,
274+
context.compacted_original_input,
275+
output,
276+
)
277+
elif api_params.previous_response_id:
249278
await append_turn_items_to_conversation(
250279
context.client,
251280
api_params.conversation,
@@ -337,7 +366,7 @@ async def responses_endpoint_handler(
337366
check_configuration_loaded(configuration)
338367
started_at = datetime.now(UTC)
339368
rh_identity_context = get_rh_identity_context(request)
340-
user_id, _, _, token = auth
369+
user_id, _, skip_userid_check, token = auth
341370

342371
await check_mcp_auth(configuration, mcp_headers, token, request.headers)
343372

@@ -436,6 +465,32 @@ async def responses_endpoint_handler(
436465
)
437466

438467
api_params = ResponsesApiParams.model_validate(updated_request.model_dump())
468+
469+
# Compact the conversation if it is approaching the context window limit.
470+
# /v1/responses is OpenAI-compatible, so compaction is silent (no custom SSE
471+
# event): summarization happens before the response is created, and the turn
472+
# is appended explicitly afterward (the conversation parameter is dropped).
473+
# Only stateful single-conversation requests are eligible.
474+
compacted_original_input: Optional[ResponseInput] = None
475+
if (
476+
configuration.compaction.enabled
477+
and api_params.store
478+
and api_params.conversation
479+
and not api_params.previous_response_id
480+
):
481+
compaction = await apply_compaction_blocking(
482+
client,
483+
api_params,
484+
configuration.inference,
485+
configuration.compaction,
486+
cache=configured_conversation_cache(),
487+
user_id=user_id,
488+
skip_user_id_check=skip_userid_check,
489+
)
490+
api_params = compaction.params
491+
if compaction.compacted:
492+
compacted_original_input = compaction.original_input
493+
439494
context = ResponsesContext(
440495
client=client,
441496
auth=auth,
@@ -449,6 +504,7 @@ async def responses_endpoint_handler(
449504
user_agent=_get_user_agent(request),
450505
endpoint_path=endpoint_path,
451506
generate_topic_summary=updated_request.generate_topic_summary,
507+
compacted_original_input=compacted_original_input,
452508
)
453509
response_handler = (
454510
handle_streaming_response

0 commit comments

Comments
 (0)