LCORE-2311: Relocated streaming query utilities#1910
Conversation
WalkthroughThis PR refactors streaming SSE and stream-interruption logic from the ChangesStreaming and Interrupt Utilities Extraction
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tests/unit/utils/test_streaming_sse.py (1)
268-278:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAdd content assertion to match the JSON test's thoroughness.
The JSON test at line 266 verifies that the violation message appears in the result, but this text test only checks that items were generated. For consistency and completeness, add a similar content assertion.
🧪 Proposed fix
assert len(result) > 0 + assert any("Violation message" in item for item in result)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/utils/test_streaming_sse.py` around lines 268 - 278, The test for text media type currently only checks that shield_violation_generator produced items; update the test_shield_violation_generator_text to also assert that the generated content contains the expected violation message ("Violation message"). Locate the async test_shield_violation_generator_text and after collecting result from shield_violation_generator(..., MEDIA_TYPE_TEXT) add a content assertion (e.g., join or inspect result items) to confirm the string "Violation message" appears, mirroring the JSON test's thoroughness.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/app/endpoints/streaming_query.py`:
- Around line 483-485: The compaction-path except block yields
http_exception_stream_event(e) without preserving the negotiated media type,
causing text-mode errors to lose plain-text formatting; update the except
handler in streaming_query.py to pass the negotiated media type into the
formatter (e.g., call http_exception_stream_event(e, media_type) or route this
branch through stream_http_error_event(e, media_type)) so MEDIA_TYPE_TEXT
requests get the same media-type-aware error shape as the non-compaction path.
In `@src/utils/stream_interrupt.py`:
- Around line 35-43: Update type hints and docstrings in
src/utils/stream_interrupt.py: replace all uses of Optional[ResponseInput] with
the modern union ResponseInput | None (including signatures of
persist_interrupted_turn and register_interrupt_callback) and remove the
Optional import; add Google-style docstring sections (Parameters, Returns,
Raises) for background_update_topic_summary,
shutdown_background_topic_summary_tasks, persist_interrupted_turn, and
register_interrupt_callback (ensure register_interrupt_callback documents the
Raises clause), and add a short docstring for the nested async def
on_interrupt() -> None explaining its purpose and exceptions; keep identifiers
intact (background_update_topic_summary,
shutdown_background_topic_summary_tasks, persist_interrupted_turn,
register_interrupt_callback, on_interrupt) so reviewers can locate the changes.
In `@src/utils/streaming_sse.py`:
- Around line 27-29: The type annotation for the media_type parameter in
stream_http_error_event uses Optional[str]; update it to the modern union syntax
str | None (i.e., change "media_type: Optional[str] = MEDIA_TYPE_JSON" to
"media_type: str | None = MEDIA_TYPE_JSON") and remove any now-unnecessary
Optional import if present; keep the function name stream_http_error_event and
default value MEDIA_TYPE_JSON unchanged.
In `@tests/unit/utils/test_stream_interrupt.py`:
- Around line 114-115: Replace the fragile fixed sleep (await
asyncio.sleep(0.01)) with an explicit completion mechanism: have the test wait
for the actual work to finish by either awaiting the spawned task, waiting on an
asyncio.Event set by the background callback, or draining
utils.stream_interrupt.background_topic_summary_tasks (e.g., await all tasks in
that set) so the assertion only runs after the consumer/callback has completed;
update the spots mentioned (around lines with await asyncio.sleep, and similarly
at the other locations noted) to use one of these explicit waits and ensure the
cancelled consumer task is awaited/joined instead of relying on time-based
sleeps.
- Around line 188-193: The test uses AsyncMock instances but asserts them with
sync assertions; change append_turn_mock.assert_called_once_with(...) to
await-based assertion append_turn_mock.assert_awaited_once_with(...) and
likewise change get_topic_summary_mock.assert_called_once_with(...) to
get_topic_summary_mock.assert_awaited_once_with(...). Also remove timing-based
await asyncio.sleep(0.1) and replace with deterministic synchronization: await
the spawned background task or wait on an asyncio.Event/Task.result() that the
test triggers so the AsyncMocks are guaranteed to have been awaited before
asserting.
---
Outside diff comments:
In `@tests/unit/utils/test_streaming_sse.py`:
- Around line 268-278: The test for text media type currently only checks that
shield_violation_generator produced items; update the
test_shield_violation_generator_text to also assert that the generated content
contains the expected violation message ("Violation message"). Locate the async
test_shield_violation_generator_text and after collecting result from
shield_violation_generator(..., MEDIA_TYPE_TEXT) add a content assertion (e.g.,
join or inspect result items) to confirm the string "Violation message" appears,
mirroring the JSON test's thoroughness.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 89bec62d-b14f-488a-a6d2-a07ef1281999
📒 Files selected for processing (7)
src/app/endpoints/streaming_query.pysrc/app/main.pysrc/utils/stream_interrupt.pysrc/utils/streaming_sse.pytests/unit/app/endpoints/test_streaming_query.pytests/unit/utils/test_stream_interrupt.pytests/unit/utils/test_streaming_sse.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: integration_tests (3.13)
- GitHub Check: integration_tests (3.12)
- GitHub Check: list_outdated_dependencies
- GitHub Check: build-pr
- GitHub Check: E2E: library mode / ci / group 3
- GitHub Check: unit_tests (3.13)
- GitHub Check: unit_tests (3.12)
- GitHub Check: E2E: library mode / ci / group 1
- GitHub Check: E2E: server mode / ci / group 1
- GitHub Check: E2E: server mode / ci / group 3
- GitHub Check: E2E: server mode / ci / group 2
- GitHub Check: E2E: library mode / ci / group 2
- GitHub Check: Pylinter
- GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-0-6-on-pull-request
- GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
🧰 Additional context used
📓 Path-based instructions (3)
src/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/**/*.py: Use absolute imports for internal modules:from authentication import get_auth_dependency
Llama Stack imports: Usefrom llama_stack_client import AsyncLlamaStackClient
Checkconstants.pyfor shared constants before defining new ones
All modules must start with descriptive docstrings explaining purpose
Uselogger = get_logger(__name__)fromlog.pyfor module logging
All functions must have complete type annotations for parameters and return types, use modern syntax (str | int), and include descriptive docstrings
Use snake_case with descriptive, action-oriented names for functions (get_, validate_, check_)
Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters
Useasync deffor I/O operations and external API calls
Use standard log levels with clear purposes:debug()for diagnostic info,info()for program execution,warning()for unexpected events,error()for serious problems
All classes must have descriptive docstrings explaining purpose and use PascalCase with standard suffixes:Configuration,Error/Exception,Resolver,Interface
Abstract classes must use ABC with@abstractmethoddecorators
Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes
Files:
src/app/main.pysrc/utils/stream_interrupt.pysrc/utils/streaming_sse.pysrc/app/endpoints/streaming_query.py
src/app/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/app/**/*.py: FastAPI dependencies: Import fromfastapimodule forAPIRouter,HTTPException,Request,status,Depends
Use FastAPIHTTPExceptionwith appropriate status codes for API endpoints and handleAPIConnectionErrorfrom Llama Stack
Files:
src/app/main.pysrc/app/endpoints/streaming_query.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: Use pytest for all unit and integration tests; do not use unittest
Usepytest.mark.asynciomarker for async tests
Files:
tests/unit/utils/test_streaming_sse.pytests/unit/utils/test_stream_interrupt.pytests/unit/app/endpoints/test_streaming_query.py
🧠 Learnings (1)
📚 Learning: 2026-04-06T20:18:07.852Z
Learnt from: major
Repo: lightspeed-core/lightspeed-stack PR: 1463
File: src/app/endpoints/rlsapi_v1.py:266-271
Timestamp: 2026-04-06T20:18:07.852Z
Learning: In the lightspeed-stack codebase, within `src/app/endpoints/` inference/MCP endpoints, treat `tools: Optional[list[Any]]` in MCP tool definitions as an intentional, consistent typing pattern (used across `query`, `responses`, `streaming_query`, `rlsapi_v1`). Do not raise or suggest this as a typing issue during code review; changing it in isolation could break endpoint typing consistency across the codebase.
Applied to files:
src/app/endpoints/streaming_query.py
🪛 ast-grep (0.43.0)
src/utils/streaming_sse.py
[info] 65-65: use jsonify instead of json.dumps for JSON output
Context: json.dumps(d)
Note: Security best practice.
(use-jsonify)
🔇 Additional comments (11)
src/utils/streaming_sse.py (6)
1-24: LGTM!
57-67: LGTM!
70-139: LGTM!
185-212: LGTM!
215-259: LGTM!
176-176: Keep the SSE"end"eventtruncatedfield asnull.EndEventData.truncatedisOptional[bool], the"end"payload is constructed withtruncated=None, and unit tests plusdocs/openapi.jsonexpect"truncated": nullfor the end event—so it shouldn’t be removed.tests/unit/utils/test_streaming_sse.py (5)
1-26: LGTM!
28-104: LGTM!
106-204: LGTM!
206-239: LGTM!
241-251: LGTM!
| except HTTPException as e: | ||
| yield _http_exception_stream_event(e) | ||
| yield http_exception_stream_event(e) | ||
| return |
There was a problem hiding this comment.
Preserve text-mode error formatting in the compaction path.
Line 484 drops media_type when converting a streamed HTTPException. For MEDIA_TYPE_TEXT requests, that helper has no way to know the caller expects plain-text errors, so compaction-path failures can return a different error shape than the non-compaction path, which still threads media_type through stream_http_error_event(...). Please pass the negotiated media type through here as well, or route this branch through the same media-type-aware formatter.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/app/endpoints/streaming_query.py` around lines 483 - 485, The
compaction-path except block yields http_exception_stream_event(e) without
preserving the negotiated media type, causing text-mode errors to lose
plain-text formatting; update the except handler in streaming_query.py to pass
the negotiated media type into the formatter (e.g., call
http_exception_stream_event(e, media_type) or route this branch through
stream_http_error_event(e, media_type)) so MEDIA_TYPE_TEXT requests get the same
media-type-aware error shape as the non-compaction path.
| """Generate topic summary and update DB/cache in the background. | ||
|
|
||
| Runs as a fire-and-forget task after an interrupted turn is persisted. | ||
| All errors are caught and logged. | ||
|
|
||
| Args: | ||
| context: The response generator context. | ||
| model: Model identifier used for topic summary generation. | ||
| """ |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate and print the relevant parts of the reviewed file
FILE="src/utils/stream_interrupt.py"
if [ ! -f "$FILE" ]; then
echo "Missing file: $FILE"
exit 1
fi
echo "=== File: $FILE (head) ==="
wc -l "$FILE"
sed -n '1,120p' "$FILE" | cat -n
echo "=== File: $FILE (middle) ==="
sed -n '120,260p' "$FILE" | cat -n
echo "=== File: $FILE (tail) ==="
sed -n '260,420p' "$FILE" | cat -n
# Also search for other occurrences of this file's helper names to ensure no other patterns exist
rg -n "background_update_topic_summary|shutdown_background_topic_summary_tasks|persist_interrupted_turn|register_interrupt_callback" -S "$FILE" || trueRepository: lightspeed-core/lightspeed-stack
Length of output: 10245
Update src/utils/stream_interrupt.py helper typings/docstrings to match repo conventions.
- Replace
Optional[ResponseInput]with modern union syntax (ResponseInput | None) inpersist_interrupted_turnandregister_interrupt_callback, and remove theOptionalimport. - Adjust docstrings to Google-style required sections (
Parameters,Returns,Raises) across new public helpers:background_update_topic_summary: currently onlyArgs:shutdown_background_topic_summary_tasks: currently noParameters/Returns/Raisespersist_interrupted_turn: currently onlyArgs:register_interrupt_callback: hasReturnsbut missingRaises
- Add a docstring for the nested
async def on_interrupt() -> Nonehelper (currently undocumented).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/utils/stream_interrupt.py` around lines 35 - 43, Update type hints and
docstrings in src/utils/stream_interrupt.py: replace all uses of
Optional[ResponseInput] with the modern union ResponseInput | None (including
signatures of persist_interrupted_turn and register_interrupt_callback) and
remove the Optional import; add Google-style docstring sections (Parameters,
Returns, Raises) for background_update_topic_summary,
shutdown_background_topic_summary_tasks, persist_interrupted_turn, and
register_interrupt_callback (ensure register_interrupt_callback documents the
Raises clause), and add a short docstring for the nested async def
on_interrupt() -> None explaining its purpose and exceptions; keep identifiers
intact (background_update_topic_summary,
shutdown_background_topic_summary_tasks, persist_interrupted_turn,
register_interrupt_callback, on_interrupt) so reviewers can locate the changes.
Source: Coding guidelines
| def stream_http_error_event( | ||
| error: AbstractErrorResponse, media_type: Optional[str] = MEDIA_TYPE_JSON | ||
| ) -> str: |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Prefer modern union syntax for optional parameters.
The coding guidelines specify using modern syntax (str | None) instead of Optional[str] for type annotations.
♻️ Proposed refactor
def stream_http_error_event(
- error: AbstractErrorResponse, media_type: Optional[str] = MEDIA_TYPE_JSON
+ error: AbstractErrorResponse, media_type: str | None = MEDIA_TYPE_JSON
) -> str:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def stream_http_error_event( | |
| error: AbstractErrorResponse, media_type: Optional[str] = MEDIA_TYPE_JSON | |
| ) -> str: | |
| def stream_http_error_event( | |
| error: AbstractErrorResponse, media_type: str | None = MEDIA_TYPE_JSON | |
| ) -> str: |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/utils/streaming_sse.py` around lines 27 - 29, The type annotation for the
media_type parameter in stream_http_error_event uses Optional[str]; update it to
the modern union syntax str | None (i.e., change "media_type: Optional[str] =
MEDIA_TYPE_JSON" to "media_type: str | None = MEDIA_TYPE_JSON") and remove any
now-unnecessary Optional import if present; keep the function name
stream_http_error_event and default value MEDIA_TYPE_JSON unchanged.
Source: Coding guidelines
| await asyncio.sleep(0.01) | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Replace these fixed sleeps with explicit completion signals.
Each of these assertions races the event loop. A slower CI run can hit the assertion before the callback/background task finishes, and the last test never actually joins the cancelled consumer task. Wait on an asyncio.Event, await the spawned task, or explicitly drain utils.stream_interrupt.background_topic_summary_tasks instead of sleeping for an assumed duration.
Also applies to: 265-265, 342-342, 529-531
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/unit/utils/test_stream_interrupt.py` around lines 114 - 115, Replace
the fragile fixed sleep (await asyncio.sleep(0.01)) with an explicit completion
mechanism: have the test wait for the actual work to finish by either awaiting
the spawned task, waiting on an asyncio.Event set by the background callback, or
draining utils.stream_interrupt.background_topic_summary_tasks (e.g., await all
tasks in that set) so the assertion only runs after the consumer/callback has
completed; update the spots mentioned (around lines with await asyncio.sleep,
and similarly at the other locations noted) to use one of these explicit waits
and ensure the cancelled consumer task is awaited/joined instead of relying on
time-based sleeps.
| append_turn_mock.assert_called_once_with( | ||
| mock_context.client, | ||
| existing_conv_id, | ||
| "test", | ||
| "You interrupted this request.", | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tests/unit/utils/test_stream_interrupt.py"
# Show the exact sections referenced in the review comment
sed -n '160,220p' "$FILE" | nl -ba | sed -n '1,120p'
echo "-----"
sed -n '240,310p' "$FILE" | nl -ba | sed -n '1,120p'
# Identify AsyncMock declarations/usages in the file
rg -n "AsyncMock|assert_awaited_once_with|assert_called_once_with|asyncio\.sleep|pytest\.mark\.asyncio" "$FILE"
# Show any nearby context around the specific assertion lines
rg -n "append_turn_mock\.assert_called_once_with|get_topic_summary_mock\.assert_called_once_with" "$FILE" -nRepository: lightspeed-core/lightspeed-stack
Length of output: 119
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tests/unit/utils/test_stream_interrupt.py"
if [ ! -f "$FILE" ]; then
echo "Missing file: $FILE"
exit 1
fi
echo "== excerpt: 170-210 =="
awk 'NR>=170 && NR<=210 {printf "%5d %s\n", NR, $0}' "$FILE"
echo "== excerpt: 250-290 =="
awk 'NR>=250 && NR<=290 {printf "%5d %s\n", NR, $0}' "$FILE"
echo "== rg summary (AsyncMock/assertions/sleep/asyncio marker) =="
rg -n "AsyncMock|assert_awaited_once_with|assert_called_once_with|asyncio\.sleep|pytest\.mark\.asyncio" "$FILE" || true
echo "== rg for the specific mocks referenced in the review comment =="
rg -n "append_turn_mock\.assert_called_once_with|get_topic_summary_mock\.assert_called_once_with|append_turn_mock\.assert_awaited_once_with|get_topic_summary_mock\.assert_awaited_once_with" "$FILE" || trueRepository: lightspeed-core/lightspeed-stack
Length of output: 5967
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tests/unit/utils/test_stream_interrupt.py"
echo "== excerpt: 130-200 =="
awk 'NR>=120 && NR<=210 {printf "%5d %s\n", NR, $0}' "$FILE"
echo "== excerpt: 220-280 =="
awk 'NR>=210 && NR<=290 {printf "%5d %s\n", NR, $0}' "$FILE"
echo "== rg for get_topic_summary_mock patch/creation =="
rg -n "get_topic_summary_mock|get_topic_summary" "$FILE" | head -n 80
echo "== rg for append_turn_mock patch/creation =="
rg -n "append_turn_mock|append_turn_to_conversation" "$FILE" | head -n 80Repository: lightspeed-core/lightspeed-stack
Length of output: 9196
Use awaited assertions for AsyncMock mocks in tests/unit/utils/test_stream_interrupt.py
append_turn_mock(lines 188-193) is anAsyncMockasserted withassert_called_once_with; switch toassert_awaited_once_with.get_topic_summary_mock(lines 270-274) is anAsyncMockasserted withassert_called_once_with; switch toassert_awaited_once_with.- Avoid
await asyncio.sleep(0.1)timing-based waits; use deterministic synchronization (e.g., await the spawned task / anEvent) to reduce CI flakiness.
Suggested assertion change
- append_turn_mock.assert_called_once_with(
+ append_turn_mock.assert_awaited_once_with(
mock_context.client,
existing_conv_id,
"test",
"You interrupted this request.",
)
@@
- get_topic_summary_mock.assert_called_once_with(
+ get_topic_summary_mock.assert_awaited_once_with(
"What is Kubernetes?",
mock_context.client,
"provider1/model1",
)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/unit/utils/test_stream_interrupt.py` around lines 188 - 193, The test
uses AsyncMock instances but asserts them with sync assertions; change
append_turn_mock.assert_called_once_with(...) to await-based assertion
append_turn_mock.assert_awaited_once_with(...) and likewise change
get_topic_summary_mock.assert_called_once_with(...) to
get_topic_summary_mock.assert_awaited_once_with(...). Also remove timing-based
await asyncio.sleep(0.1) and replace with deterministic synchronization: await
the spawned background task or wait on an asyncio.Event/Task.result() that the
test triggers so the AsyncMocks are guaranteed to have been awaited before
asserting.
Description
Relocating utility function for stream serialization and stream interruption to separate modules.
Type of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
Release Notes
Refactor
Tests