LCORE-2311: Relocated stream interruption utilities#1918
Conversation
WalkthroughThis PR refactors streaming interruption and interrupted-turn persistence logic from local helpers in the endpoint module into shared utilities. The endpoint and existing tests are updated to import and use the extracted functions, and new tests cover the shared utilities in isolation. ChangesStream interrupt persistence refactoring
Sequence DiagramsequenceDiagram
participant StreamingEndpoint
participant RegisterCallback as register_interrupt_callback
participant Persist as persist_interrupted_turn
participant Background as background_update_topic_summary
participant Registry as StreamInterruptRegistry
StreamingEndpoint->>RegisterCallback: invoke with guard list
RegisterCallback->>Registry: register callback for current task
RegisterCallback-->>StreamingEndpoint: return guard
Note over StreamingEndpoint: on CancelledError
StreamingEndpoint->>Persist: call with turn data
Persist->>Persist: append interrupted message to conversation
Persist->>Persist: store query results (topic_summary=None)
alt topic summary enabled
Persist->>Background: schedule as background task
Background->>Background: generate topic summary with timeout
Background->>Persist: update conversation topic summary
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tests/unit/utils/test_stream_interrupts.py (1)
117-161: 🧹 Nitpick | 🔵 Trivial | 💤 Low valueConsider using
@pytest.mark.asynciofor consistency.The test uses
asyncio.run()to execute async code, while the other tests in this file use@pytest.mark.asyncioand run in pytest's asyncio event loop. Making this test async would be more consistent with the file's testing style.♻️ Suggested refactor for consistency
+@pytest.mark.asyncio -def test_register_interrupt_callback_registers_current_task( +async def test_register_interrupt_callback_registers_current_task( mocker: MockerFixture, ) -> None: """register_interrupt_callback binds the current asyncio task to the registry.""" ... - async def run() -> list[bool]: - return register_interrupt_callback( - context, - responses_params, - turn_summary, - background_tasks, - ) - - guard = asyncio.run(run()) + guard = register_interrupt_callback( + context, + responses_params, + turn_summary, + background_tasks, + ) assert guard == [False] ... on_interrupt = registry.register_stream.call_args.kwargs["on_interrupt"] - async def invoke_callback() -> None: - await on_interrupt() - - asyncio.run(invoke_callback()) + await on_interrupt() persist_mock.assert_awaited_once()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/utils/test_stream_interrupts.py` around lines 117 - 161, Convert the test_register_interrupt_callback_registers_current_task to an async test using pytest.mark.asyncio: change the function signature to async def test_register_interrupt_callback_registers_current_task(...), add `@pytest.mark.asyncio`, replace the asyncio.run(run()) call by directly awaiting register_interrupt_callback(context, responses_params, turn_summary, background_tasks) and replace asyncio.run(invoke_callback()) by awaiting on_interrupt() (or await invoke_callback()), keeping the mocks (get_stream_interrupt_registry, persist_interrupted_turn) and assertions the same so persist_interrupted_turn stays an AsyncMock and registry.register_stream/on_interrupt are exercised under pytest's event loop.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@tests/unit/utils/test_stream_interrupts.py`:
- Around line 117-161: Convert the
test_register_interrupt_callback_registers_current_task to an async test using
pytest.mark.asyncio: change the function signature to async def
test_register_interrupt_callback_registers_current_task(...), add
`@pytest.mark.asyncio`, replace the asyncio.run(run()) call by directly awaiting
register_interrupt_callback(context, responses_params, turn_summary,
background_tasks) and replace asyncio.run(invoke_callback()) by awaiting
on_interrupt() (or await invoke_callback()), keeping the mocks
(get_stream_interrupt_registry, persist_interrupted_turn) and assertions the
same so persist_interrupted_turn stays an AsyncMock and
registry.register_stream/on_interrupt are exercised under pytest's event loop.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 3869a605-0654-4cf2-a3de-9f95be669aa5
📒 Files selected for processing (4)
src/app/endpoints/streaming_query.pysrc/utils/stream_interrupts.pytests/unit/app/endpoints/test_streaming_query.pytests/unit/utils/test_stream_interrupts.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
- GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-0-6-on-pull-request
- GitHub Check: Pyright
- GitHub Check: integration_tests (3.13)
- GitHub Check: E2E: library mode / ci / group 2
- GitHub Check: E2E: server mode / ci / group 3
- GitHub Check: E2E: library mode / ci / group 3
- GitHub Check: E2E: library mode / ci / group 1
- GitHub Check: E2E: server mode / ci / group 1
- GitHub Check: E2E: server mode / ci / group 2
🧰 Additional context used
📓 Path-based instructions (3)
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: Use pytest for all unit and integration tests; do not use unittest
Usepytest.mark.asynciomarker for async tests
Files:
tests/unit/utils/test_stream_interrupts.pytests/unit/app/endpoints/test_streaming_query.py
src/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/**/*.py: Use absolute imports for internal modules:from authentication import get_auth_dependency
Llama Stack imports: Usefrom llama_stack_client import AsyncLlamaStackClient
Checkconstants.pyfor shared constants before defining new ones
All modules must start with descriptive docstrings explaining purpose
Uselogger = get_logger(__name__)fromlog.pyfor module logging
All functions must have complete type annotations for parameters and return types, use modern syntax (str | int), and include descriptive docstrings
Use snake_case with descriptive, action-oriented names for functions (get_, validate_, check_)
Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters
Useasync deffor I/O operations and external API calls
Use standard log levels with clear purposes:debug()for diagnostic info,info()for program execution,warning()for unexpected events,error()for serious problems
All classes must have descriptive docstrings explaining purpose and use PascalCase with standard suffixes:Configuration,Error/Exception,Resolver,Interface
Abstract classes must use ABC with@abstractmethoddecorators
Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes
Files:
src/utils/stream_interrupts.pysrc/app/endpoints/streaming_query.py
src/app/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/app/**/*.py: FastAPI dependencies: Import fromfastapimodule forAPIRouter,HTTPException,Request,status,Depends
Use FastAPIHTTPExceptionwith appropriate status codes for API endpoints and handleAPIConnectionErrorfrom Llama Stack
Files:
src/app/endpoints/streaming_query.py
🧠 Learnings (1)
📚 Learning: 2026-04-06T20:18:07.852Z
Learnt from: major
Repo: lightspeed-core/lightspeed-stack PR: 1463
File: src/app/endpoints/rlsapi_v1.py:266-271
Timestamp: 2026-04-06T20:18:07.852Z
Learning: In the lightspeed-stack codebase, within `src/app/endpoints/` inference/MCP endpoints, treat `tools: Optional[list[Any]]` in MCP tool definitions as an intentional, consistent typing pattern (used across `query`, `responses`, `streaming_query`, `rlsapi_v1`). Do not raise or suggest this as a typing issue during code review; changing it in isolation could break endpoint typing consistency across the codebase.
Applied to files:
src/app/endpoints/streaming_query.py
🔇 Additional comments (14)
src/utils/stream_interrupts.py (5)
1-28: LGTM!
166-173: LGTM!
176-215: LGTM!
218-303: LGTM!
306-369: LGTM!src/app/endpoints/streaming_query.py (4)
119-123: LGTM!
581-587: LGTM!
620-635: LGTM!
636-637: LGTM!tests/unit/app/endpoints/test_streaming_query.py (3)
956-963: LGTM!
1325-1325: LGTM!
1361-1367: LGTM!Also applies to: 1439-1452, 1519-1529, 1586-1596, 1645-1652, 1704-1710
tests/unit/utils/test_stream_interrupts.py (2)
19-67: LGTM!
69-115: LGTM!
Description
Relocates stream interruption utilities from streaming query endpoint to dedicated utils file.
Type of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
Bug Fixes
Chores