Skip to content

LCORE-2311: Relocated stream interruption utilities#1918

Merged
tisnik merged 1 commit into
lightspeed-core:mainfrom
asimurka:relocate_streaming_utils
Jun 12, 2026
Merged

LCORE-2311: Relocated stream interruption utilities#1918
tisnik merged 1 commit into
lightspeed-core:mainfrom
asimurka:relocate_streaming_utils

Conversation

@asimurka

@asimurka asimurka commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Description

Relocates stream interruption utilities from streaming query endpoint to dedicated utils file.

Type of change

  • Refactor
  • New feature
  • Bug fix
  • CVE fix
  • Optimization
  • Documentation Update
  • Configuration Update
  • Bump-up service version
  • Bump-up dependent library
  • Bump-up library or tool used for development (does not change the final image)
  • CI configuration change
  • Konflux configuration change
  • Unit tests improvement
  • Integration tests improvement
  • End to end tests improvement
  • Benchmarks improvement

Tools used to create PR

Identify any AI code assistants used in this PR (for transparency and review context)

  • Assisted-by: (e.g., Claude, CodeRabbit, Ollama, etc., N/A if not used)
  • Generated by: (e.g., tool name and version; N/A if not used)

Related Tickets & Documents

Checklist before requesting a review

  • I have performed a self-review of my code.
  • PR has passed all pre-merge test jobs.
  • If it is a core feature, I have added thorough tests.

Testing

  • Please provide detailed steps to perform tests related to this code change.
  • How were the fix/results from this change verified? Please provide relevant screenshots or results.

Summary by CodeRabbit

  • Bug Fixes

    • Improved handling of interrupted streaming responses to ensure graceful cleanup and persistence.
  • Chores

    • Refactored internal stream interruption and persistence logic into shared utilities for better maintainability and consistency.
    • Enhanced background topic summary generation during stream interruptions with improved error handling and timeout management.

@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Walkthrough

This PR refactors streaming interruption and interrupted-turn persistence logic from local helpers in the endpoint module into shared utilities. The endpoint and existing tests are updated to import and use the extracted functions, and new tests cover the shared utilities in isolation.

Changes

Stream interrupt persistence refactoring

Layer / File(s) Summary
Shared stream interrupt utilities
src/utils/stream_interrupts.py
New deregister_stream(), background_update_topic_summary(), persist_interrupted_turn(), and register_interrupt_callback() helpers manage stream registration, background topic summaries, and interrupted-turn persistence with double-interrupt guards.
Refactor streaming_query to use shared utilities
src/app/endpoints/streaming_query.py
Remove local interrupt helpers (_background_update_topic_summary, _persist_interrupted_turn, _register_interrupt_callback) and import shared functions; update generate_response to register interrupts and handle CancelledError via shared persistence and deregistration.
Update endpoint tests to patch from shared utilities
tests/unit/app/endpoints/test_streaming_query.py
Adjust all interrupt test patches to mock from utils.stream_interrupts.* instead of the local endpoint module; add missing @pytest.mark.asyncio decorator; consolidate interrupted-turn test coverage into cancellation/interrupt scenarios.
Add tests for shared interrupt utilities
tests/unit/utils/test_stream_interrupts.py
New test module verifies persist_interrupted_turn respects original_input for compacted turns, schedules background topic-summary tasks when enabled, and register_interrupt_callback registers handlers with double-interrupt guards.

Sequence Diagram

sequenceDiagram
  participant StreamingEndpoint
  participant RegisterCallback as register_interrupt_callback
  participant Persist as persist_interrupted_turn
  participant Background as background_update_topic_summary
  participant Registry as StreamInterruptRegistry
  StreamingEndpoint->>RegisterCallback: invoke with guard list
  RegisterCallback->>Registry: register callback for current task
  RegisterCallback-->>StreamingEndpoint: return guard
  Note over StreamingEndpoint: on CancelledError
  StreamingEndpoint->>Persist: call with turn data
  Persist->>Persist: append interrupted message to conversation
  Persist->>Persist: store query results (topic_summary=None)
  alt topic summary enabled
    Persist->>Background: schedule as background task
    Background->>Background: generate topic summary with timeout
    Background->>Persist: update conversation topic summary
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • lightspeed-core/lightspeed-stack#1910: Both PRs refactor src/app/endpoints/streaming_query.py's streaming-cancellation path to use shared interrupt utilities, removing local interrupted-turn persistence helpers.
  • lightspeed-core/lightspeed-stack#1888: Both PRs are tied to the stream interrupt registry: this PR extracts and uses deregister_stream/register_interrupt_callback utilities, while the retrieved PR adds integration tests for double-interrupt behavior and the interrupt endpoint.
  • lightspeed-core/lightspeed-stack#1796: This PR wires shared persist_interrupted_turn into the endpoint with original_input support for compacted turns, building on the retrieved PR's compaction-aware interrupt flow.

Suggested reviewers

  • tisnik
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main change: relocating stream interruption utilities from the streaming query endpoint to a dedicated utilities file, which is the core refactoring objective.
Docstring Coverage ✅ Passed Docstring coverage is 87.50% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tests/unit/utils/test_stream_interrupts.py (1)

117-161: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

Consider using @pytest.mark.asyncio for consistency.

The test uses asyncio.run() to execute async code, while the other tests in this file use @pytest.mark.asyncio and run in pytest's asyncio event loop. Making this test async would be more consistent with the file's testing style.

♻️ Suggested refactor for consistency
+@pytest.mark.asyncio
-def test_register_interrupt_callback_registers_current_task(
+async def test_register_interrupt_callback_registers_current_task(
     mocker: MockerFixture,
 ) -> None:
     """register_interrupt_callback binds the current asyncio task to the registry."""
     ...
-    async def run() -> list[bool]:
-        return register_interrupt_callback(
-            context,
-            responses_params,
-            turn_summary,
-            background_tasks,
-        )
-
-    guard = asyncio.run(run())
+    guard = register_interrupt_callback(
+        context,
+        responses_params,
+        turn_summary,
+        background_tasks,
+    )
 
     assert guard == [False]
     ...
     on_interrupt = registry.register_stream.call_args.kwargs["on_interrupt"]
 
-    async def invoke_callback() -> None:
-        await on_interrupt()
-
-    asyncio.run(invoke_callback())
+    await on_interrupt()
     persist_mock.assert_awaited_once()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/utils/test_stream_interrupts.py` around lines 117 - 161, Convert
the test_register_interrupt_callback_registers_current_task to an async test
using pytest.mark.asyncio: change the function signature to async def
test_register_interrupt_callback_registers_current_task(...), add
`@pytest.mark.asyncio`, replace the asyncio.run(run()) call by directly awaiting
register_interrupt_callback(context, responses_params, turn_summary,
background_tasks) and replace asyncio.run(invoke_callback()) by awaiting
on_interrupt() (or await invoke_callback()), keeping the mocks
(get_stream_interrupt_registry, persist_interrupted_turn) and assertions the
same so persist_interrupted_turn stays an AsyncMock and
registry.register_stream/on_interrupt are exercised under pytest's event loop.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@tests/unit/utils/test_stream_interrupts.py`:
- Around line 117-161: Convert the
test_register_interrupt_callback_registers_current_task to an async test using
pytest.mark.asyncio: change the function signature to async def
test_register_interrupt_callback_registers_current_task(...), add
`@pytest.mark.asyncio`, replace the asyncio.run(run()) call by directly awaiting
register_interrupt_callback(context, responses_params, turn_summary,
background_tasks) and replace asyncio.run(invoke_callback()) by awaiting
on_interrupt() (or await invoke_callback()), keeping the mocks
(get_stream_interrupt_registry, persist_interrupted_turn) and assertions the
same so persist_interrupted_turn stays an AsyncMock and
registry.register_stream/on_interrupt are exercised under pytest's event loop.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 3869a605-0654-4cf2-a3de-9f95be669aa5

📥 Commits

Reviewing files that changed from the base of the PR and between 273c0eb and eafb386.

📒 Files selected for processing (4)
  • src/app/endpoints/streaming_query.py
  • src/utils/stream_interrupts.py
  • tests/unit/app/endpoints/test_streaming_query.py
  • tests/unit/utils/test_stream_interrupts.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-0-6-on-pull-request
  • GitHub Check: Pyright
  • GitHub Check: integration_tests (3.13)
  • GitHub Check: E2E: library mode / ci / group 2
  • GitHub Check: E2E: server mode / ci / group 3
  • GitHub Check: E2E: library mode / ci / group 3
  • GitHub Check: E2E: library mode / ci / group 1
  • GitHub Check: E2E: server mode / ci / group 1
  • GitHub Check: E2E: server mode / ci / group 2
🧰 Additional context used
📓 Path-based instructions (3)
tests/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

tests/**/*.py: Use pytest for all unit and integration tests; do not use unittest
Use pytest.mark.asyncio marker for async tests

Files:

  • tests/unit/utils/test_stream_interrupts.py
  • tests/unit/app/endpoints/test_streaming_query.py
src/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/**/*.py: Use absolute imports for internal modules: from authentication import get_auth_dependency
Llama Stack imports: Use from llama_stack_client import AsyncLlamaStackClient
Check constants.py for shared constants before defining new ones
All modules must start with descriptive docstrings explaining purpose
Use logger = get_logger(__name__) from log.py for module logging
All functions must have complete type annotations for parameters and return types, use modern syntax (str | int), and include descriptive docstrings
Use snake_case with descriptive, action-oriented names for functions (get_, validate_, check_)
Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters
Use async def for I/O operations and external API calls
Use standard log levels with clear purposes: debug() for diagnostic info, info() for program execution, warning() for unexpected events, error() for serious problems
All classes must have descriptive docstrings explaining purpose and use PascalCase with standard suffixes: Configuration, Error/Exception, Resolver, Interface
Abstract classes must use ABC with @abstractmethod decorators
Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes

Files:

  • src/utils/stream_interrupts.py
  • src/app/endpoints/streaming_query.py
src/app/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/app/**/*.py: FastAPI dependencies: Import from fastapi module for APIRouter, HTTPException, Request, status, Depends
Use FastAPI HTTPException with appropriate status codes for API endpoints and handle APIConnectionError from Llama Stack

Files:

  • src/app/endpoints/streaming_query.py
🧠 Learnings (1)
📚 Learning: 2026-04-06T20:18:07.852Z
Learnt from: major
Repo: lightspeed-core/lightspeed-stack PR: 1463
File: src/app/endpoints/rlsapi_v1.py:266-271
Timestamp: 2026-04-06T20:18:07.852Z
Learning: In the lightspeed-stack codebase, within `src/app/endpoints/` inference/MCP endpoints, treat `tools: Optional[list[Any]]` in MCP tool definitions as an intentional, consistent typing pattern (used across `query`, `responses`, `streaming_query`, `rlsapi_v1`). Do not raise or suggest this as a typing issue during code review; changing it in isolation could break endpoint typing consistency across the codebase.

Applied to files:

  • src/app/endpoints/streaming_query.py
🔇 Additional comments (14)
src/utils/stream_interrupts.py (5)

1-28: LGTM!


166-173: LGTM!


176-215: LGTM!


218-303: LGTM!


306-369: LGTM!

src/app/endpoints/streaming_query.py (4)

119-123: LGTM!


581-587: LGTM!


620-635: LGTM!


636-637: LGTM!

tests/unit/app/endpoints/test_streaming_query.py (3)

956-963: LGTM!


1325-1325: LGTM!


1361-1367: LGTM!

Also applies to: 1439-1452, 1519-1529, 1586-1596, 1645-1652, 1704-1710

tests/unit/utils/test_stream_interrupts.py (2)

19-67: LGTM!


69-115: LGTM!

@asimurka asimurka requested a review from tisnik June 12, 2026 09:16

@tisnik tisnik left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@tisnik tisnik merged commit 42bf698 into lightspeed-core:main Jun 12, 2026
33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants