Skip to content

LCORE-2311: Relocated streaming query utilities#1910

Draft
asimurka wants to merge 1 commit into
lightspeed-core:mainfrom
asimurka:relocate_streaming_utils
Draft

LCORE-2311: Relocated streaming query utilities#1910
asimurka wants to merge 1 commit into
lightspeed-core:mainfrom
asimurka:relocate_streaming_utils

Conversation

@asimurka

@asimurka asimurka commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Description

Relocating utility function for stream serialization and stream interruption to separate modules.

Type of change

  • Refactor
  • New feature
  • Bug fix
  • CVE fix
  • Optimization
  • Documentation Update
  • Configuration Update
  • Bump-up service version
  • Bump-up dependent library
  • Bump-up library or tool used for development (does not change the final image)
  • CI configuration change
  • Konflux configuration change
  • Unit tests improvement
  • Integration tests improvement
  • End to end tests improvement
  • Benchmarks improvement

Tools used to create PR

Identify any AI code assistants used in this PR (for transparency and review context)

  • Assisted-by: (e.g., Claude, CodeRabbit, Ollama, etc., N/A if not used)
  • Generated by: (e.g., tool name and version; N/A if not used)

Related Tickets & Documents

Checklist before requesting a review

  • I have performed a self-review of my code.
  • PR has passed all pre-merge test jobs.
  • If it is a core feature, I have added thorough tests.

Testing

  • Please provide detailed steps to perform tests related to this code change.
  • How were the fix/results from this change verified? Please provide relevant screenshots or results.

Summary by CodeRabbit

Release Notes

  • Refactor

    • Internal code improvements: consolidated streaming interrupt handling and Server-Sent Events formatting logic into shared utility modules for better maintainability.
  • Tests

    • Added comprehensive test coverage for stream interruption persistence and event formatting to ensure reliability.

@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Walkthrough

This PR refactors streaming SSE and stream-interruption logic from the streaming_query endpoint into reusable utility modules. Two new modules (streaming_sse.py and stream_interrupt.py) consolidate formatting and persistence helpers, the endpoint delegates to these utilities, and comprehensive tests cover both the new modules and the refactored endpoint.

Changes

Streaming and Interrupt Utilities Extraction

Layer / File(s) Summary
SSE utilities module
src/utils/streaming_sse.py, tests/unit/utils/test_streaming_sse.py
New module provides SSE event formatters for errors, start/end/interrupt markers, and per-event token/tool streaming with JSON and text media-type support; tests validate all event types and format variants.
Stream interrupt utilities module
src/utils/stream_interrupt.py, tests/unit/utils/test_stream_interrupt.py
New module provides interrupt callback registration, interrupted-turn persistence with optional async topic-summary background generation, and task shutdown; tests cover callback behavior, persistence edge cases, and topic-summary scheduling logic.
Streaming endpoint refactored to use utilities
src/app/endpoints/streaming_query.py
Removes 457 lines of local SSE/interrupt helpers and imports from new utility modules; SSE error rendering and interrupt callback registration call sites updated to use imported utilities; endpoint signatures unchanged.
Application imports updated
src/app/main.py
Updates import source for shutdown_background_topic_summary_tasks from endpoint to new utils.stream_interrupt module.
Endpoint test suite refactored
tests/unit/app/endpoints/test_streaming_query.py
Removes 775 lines of tests now covered by utility-specific test modules, including cancellation/interrupt-persistence tests and old SSE helper assertions; retains higher-level OLS compatibility checks.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • lightspeed-core/lightspeed-stack#1796: Both PRs modify src/app/endpoints/streaming_query.py's streaming SSE flow around generate_response and interrupt/cancellation persistence during compaction-aware streaming.

Suggested reviewers

  • tisnik
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main refactoring objective: relocating streaming query utilities from inline implementations to separate utility modules.
Docstring Coverage ✅ Passed Docstring coverage is 83.05% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@asimurka asimurka requested a review from tisnik June 11, 2026 16:03
@asimurka asimurka marked this pull request as draft June 11, 2026 16:10

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tests/unit/utils/test_streaming_sse.py (1)

268-278: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add content assertion to match the JSON test's thoroughness.

The JSON test at line 266 verifies that the violation message appears in the result, but this text test only checks that items were generated. For consistency and completeness, add a similar content assertion.

🧪 Proposed fix
         assert len(result) > 0
+        assert any("Violation message" in item for item in result)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/utils/test_streaming_sse.py` around lines 268 - 278, The test for
text media type currently only checks that shield_violation_generator produced
items; update the test_shield_violation_generator_text to also assert that the
generated content contains the expected violation message ("Violation message").
Locate the async test_shield_violation_generator_text and after collecting
result from shield_violation_generator(..., MEDIA_TYPE_TEXT) add a content
assertion (e.g., join or inspect result items) to confirm the string "Violation
message" appears, mirroring the JSON test's thoroughness.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/app/endpoints/streaming_query.py`:
- Around line 483-485: The compaction-path except block yields
http_exception_stream_event(e) without preserving the negotiated media type,
causing text-mode errors to lose plain-text formatting; update the except
handler in streaming_query.py to pass the negotiated media type into the
formatter (e.g., call http_exception_stream_event(e, media_type) or route this
branch through stream_http_error_event(e, media_type)) so MEDIA_TYPE_TEXT
requests get the same media-type-aware error shape as the non-compaction path.

In `@src/utils/stream_interrupt.py`:
- Around line 35-43: Update type hints and docstrings in
src/utils/stream_interrupt.py: replace all uses of Optional[ResponseInput] with
the modern union ResponseInput | None (including signatures of
persist_interrupted_turn and register_interrupt_callback) and remove the
Optional import; add Google-style docstring sections (Parameters, Returns,
Raises) for background_update_topic_summary,
shutdown_background_topic_summary_tasks, persist_interrupted_turn, and
register_interrupt_callback (ensure register_interrupt_callback documents the
Raises clause), and add a short docstring for the nested async def
on_interrupt() -> None explaining its purpose and exceptions; keep identifiers
intact (background_update_topic_summary,
shutdown_background_topic_summary_tasks, persist_interrupted_turn,
register_interrupt_callback, on_interrupt) so reviewers can locate the changes.

In `@src/utils/streaming_sse.py`:
- Around line 27-29: The type annotation for the media_type parameter in
stream_http_error_event uses Optional[str]; update it to the modern union syntax
str | None (i.e., change "media_type: Optional[str] = MEDIA_TYPE_JSON" to
"media_type: str | None = MEDIA_TYPE_JSON") and remove any now-unnecessary
Optional import if present; keep the function name stream_http_error_event and
default value MEDIA_TYPE_JSON unchanged.

In `@tests/unit/utils/test_stream_interrupt.py`:
- Around line 114-115: Replace the fragile fixed sleep (await
asyncio.sleep(0.01)) with an explicit completion mechanism: have the test wait
for the actual work to finish by either awaiting the spawned task, waiting on an
asyncio.Event set by the background callback, or draining
utils.stream_interrupt.background_topic_summary_tasks (e.g., await all tasks in
that set) so the assertion only runs after the consumer/callback has completed;
update the spots mentioned (around lines with await asyncio.sleep, and similarly
at the other locations noted) to use one of these explicit waits and ensure the
cancelled consumer task is awaited/joined instead of relying on time-based
sleeps.
- Around line 188-193: The test uses AsyncMock instances but asserts them with
sync assertions; change append_turn_mock.assert_called_once_with(...) to
await-based assertion append_turn_mock.assert_awaited_once_with(...) and
likewise change get_topic_summary_mock.assert_called_once_with(...) to
get_topic_summary_mock.assert_awaited_once_with(...). Also remove timing-based
await asyncio.sleep(0.1) and replace with deterministic synchronization: await
the spawned background task or wait on an asyncio.Event/Task.result() that the
test triggers so the AsyncMocks are guaranteed to have been awaited before
asserting.

---

Outside diff comments:
In `@tests/unit/utils/test_streaming_sse.py`:
- Around line 268-278: The test for text media type currently only checks that
shield_violation_generator produced items; update the
test_shield_violation_generator_text to also assert that the generated content
contains the expected violation message ("Violation message"). Locate the async
test_shield_violation_generator_text and after collecting result from
shield_violation_generator(..., MEDIA_TYPE_TEXT) add a content assertion (e.g.,
join or inspect result items) to confirm the string "Violation message" appears,
mirroring the JSON test's thoroughness.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 89bec62d-b14f-488a-a6d2-a07ef1281999

📥 Commits

Reviewing files that changed from the base of the PR and between 6116ef7 and a6cd9eb.

📒 Files selected for processing (7)
  • src/app/endpoints/streaming_query.py
  • src/app/main.py
  • src/utils/stream_interrupt.py
  • src/utils/streaming_sse.py
  • tests/unit/app/endpoints/test_streaming_query.py
  • tests/unit/utils/test_stream_interrupt.py
  • tests/unit/utils/test_streaming_sse.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
  • GitHub Check: integration_tests (3.13)
  • GitHub Check: integration_tests (3.12)
  • GitHub Check: list_outdated_dependencies
  • GitHub Check: build-pr
  • GitHub Check: E2E: library mode / ci / group 3
  • GitHub Check: unit_tests (3.13)
  • GitHub Check: unit_tests (3.12)
  • GitHub Check: E2E: library mode / ci / group 1
  • GitHub Check: E2E: server mode / ci / group 1
  • GitHub Check: E2E: server mode / ci / group 3
  • GitHub Check: E2E: server mode / ci / group 2
  • GitHub Check: E2E: library mode / ci / group 2
  • GitHub Check: Pylinter
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-0-6-on-pull-request
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
🧰 Additional context used
📓 Path-based instructions (3)
src/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/**/*.py: Use absolute imports for internal modules: from authentication import get_auth_dependency
Llama Stack imports: Use from llama_stack_client import AsyncLlamaStackClient
Check constants.py for shared constants before defining new ones
All modules must start with descriptive docstrings explaining purpose
Use logger = get_logger(__name__) from log.py for module logging
All functions must have complete type annotations for parameters and return types, use modern syntax (str | int), and include descriptive docstrings
Use snake_case with descriptive, action-oriented names for functions (get_, validate_, check_)
Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters
Use async def for I/O operations and external API calls
Use standard log levels with clear purposes: debug() for diagnostic info, info() for program execution, warning() for unexpected events, error() for serious problems
All classes must have descriptive docstrings explaining purpose and use PascalCase with standard suffixes: Configuration, Error/Exception, Resolver, Interface
Abstract classes must use ABC with @abstractmethod decorators
Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes

Files:

  • src/app/main.py
  • src/utils/stream_interrupt.py
  • src/utils/streaming_sse.py
  • src/app/endpoints/streaming_query.py
src/app/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/app/**/*.py: FastAPI dependencies: Import from fastapi module for APIRouter, HTTPException, Request, status, Depends
Use FastAPI HTTPException with appropriate status codes for API endpoints and handle APIConnectionError from Llama Stack

Files:

  • src/app/main.py
  • src/app/endpoints/streaming_query.py
tests/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

tests/**/*.py: Use pytest for all unit and integration tests; do not use unittest
Use pytest.mark.asyncio marker for async tests

Files:

  • tests/unit/utils/test_streaming_sse.py
  • tests/unit/utils/test_stream_interrupt.py
  • tests/unit/app/endpoints/test_streaming_query.py
🧠 Learnings (1)
📚 Learning: 2026-04-06T20:18:07.852Z
Learnt from: major
Repo: lightspeed-core/lightspeed-stack PR: 1463
File: src/app/endpoints/rlsapi_v1.py:266-271
Timestamp: 2026-04-06T20:18:07.852Z
Learning: In the lightspeed-stack codebase, within `src/app/endpoints/` inference/MCP endpoints, treat `tools: Optional[list[Any]]` in MCP tool definitions as an intentional, consistent typing pattern (used across `query`, `responses`, `streaming_query`, `rlsapi_v1`). Do not raise or suggest this as a typing issue during code review; changing it in isolation could break endpoint typing consistency across the codebase.

Applied to files:

  • src/app/endpoints/streaming_query.py
🪛 ast-grep (0.43.0)
src/utils/streaming_sse.py

[info] 65-65: use jsonify instead of json.dumps for JSON output
Context: json.dumps(d)
Note: Security best practice.

(use-jsonify)

🔇 Additional comments (11)
src/utils/streaming_sse.py (6)

1-24: LGTM!


57-67: LGTM!


70-139: LGTM!


185-212: LGTM!


215-259: LGTM!


176-176: Keep the SSE "end" event truncated field as null. EndEventData.truncated is Optional[bool], the "end" payload is constructed with truncated=None, and unit tests plus docs/openapi.json expect "truncated": null for the end event—so it shouldn’t be removed.

tests/unit/utils/test_streaming_sse.py (5)

1-26: LGTM!


28-104: LGTM!


106-204: LGTM!


206-239: LGTM!


241-251: LGTM!

Comment on lines 483 to 485
except HTTPException as e:
yield _http_exception_stream_event(e)
yield http_exception_stream_event(e)
return

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Preserve text-mode error formatting in the compaction path.

Line 484 drops media_type when converting a streamed HTTPException. For MEDIA_TYPE_TEXT requests, that helper has no way to know the caller expects plain-text errors, so compaction-path failures can return a different error shape than the non-compaction path, which still threads media_type through stream_http_error_event(...). Please pass the negotiated media type through here as well, or route this branch through the same media-type-aware formatter.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/app/endpoints/streaming_query.py` around lines 483 - 485, The
compaction-path except block yields http_exception_stream_event(e) without
preserving the negotiated media type, causing text-mode errors to lose
plain-text formatting; update the except handler in streaming_query.py to pass
the negotiated media type into the formatter (e.g., call
http_exception_stream_event(e, media_type) or route this branch through
stream_http_error_event(e, media_type)) so MEDIA_TYPE_TEXT requests get the same
media-type-aware error shape as the non-compaction path.

Comment on lines +35 to +43
"""Generate topic summary and update DB/cache in the background.

Runs as a fire-and-forget task after an interrupted turn is persisted.
All errors are caught and logged.

Args:
context: The response generator context.
model: Model identifier used for topic summary generation.
"""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate and print the relevant parts of the reviewed file
FILE="src/utils/stream_interrupt.py"
if [ ! -f "$FILE" ]; then
  echo "Missing file: $FILE"
  exit 1
fi

echo "=== File: $FILE (head) ==="
wc -l "$FILE"
sed -n '1,120p' "$FILE" | cat -n

echo "=== File: $FILE (middle) ==="
sed -n '120,260p' "$FILE" | cat -n

echo "=== File: $FILE (tail) ==="
sed -n '260,420p' "$FILE" | cat -n

# Also search for other occurrences of this file's helper names to ensure no other patterns exist
rg -n "background_update_topic_summary|shutdown_background_topic_summary_tasks|persist_interrupted_turn|register_interrupt_callback" -S "$FILE" || true

Repository: lightspeed-core/lightspeed-stack

Length of output: 10245


Update src/utils/stream_interrupt.py helper typings/docstrings to match repo conventions.

  • Replace Optional[ResponseInput] with modern union syntax (ResponseInput | None) in persist_interrupted_turn and register_interrupt_callback, and remove the Optional import.
  • Adjust docstrings to Google-style required sections (Parameters, Returns, Raises) across new public helpers:
    • background_update_topic_summary: currently only Args:
    • shutdown_background_topic_summary_tasks: currently no Parameters/Returns/Raises
    • persist_interrupted_turn: currently only Args:
    • register_interrupt_callback: has Returns but missing Raises
  • Add a docstring for the nested async def on_interrupt() -> None helper (currently undocumented).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/utils/stream_interrupt.py` around lines 35 - 43, Update type hints and
docstrings in src/utils/stream_interrupt.py: replace all uses of
Optional[ResponseInput] with the modern union ResponseInput | None (including
signatures of persist_interrupted_turn and register_interrupt_callback) and
remove the Optional import; add Google-style docstring sections (Parameters,
Returns, Raises) for background_update_topic_summary,
shutdown_background_topic_summary_tasks, persist_interrupted_turn, and
register_interrupt_callback (ensure register_interrupt_callback documents the
Raises clause), and add a short docstring for the nested async def
on_interrupt() -> None explaining its purpose and exceptions; keep identifiers
intact (background_update_topic_summary,
shutdown_background_topic_summary_tasks, persist_interrupted_turn,
register_interrupt_callback, on_interrupt) so reviewers can locate the changes.

Source: Coding guidelines

Comment on lines +27 to +29
def stream_http_error_event(
error: AbstractErrorResponse, media_type: Optional[str] = MEDIA_TYPE_JSON
) -> str:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | 💤 Low value

Prefer modern union syntax for optional parameters.

The coding guidelines specify using modern syntax (str | None) instead of Optional[str] for type annotations.

♻️ Proposed refactor
 def stream_http_error_event(
-    error: AbstractErrorResponse, media_type: Optional[str] = MEDIA_TYPE_JSON
+    error: AbstractErrorResponse, media_type: str | None = MEDIA_TYPE_JSON
 ) -> str:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def stream_http_error_event(
error: AbstractErrorResponse, media_type: Optional[str] = MEDIA_TYPE_JSON
) -> str:
def stream_http_error_event(
error: AbstractErrorResponse, media_type: str | None = MEDIA_TYPE_JSON
) -> str:
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/utils/streaming_sse.py` around lines 27 - 29, The type annotation for the
media_type parameter in stream_http_error_event uses Optional[str]; update it to
the modern union syntax str | None (i.e., change "media_type: Optional[str] =
MEDIA_TYPE_JSON" to "media_type: str | None = MEDIA_TYPE_JSON") and remove any
now-unnecessary Optional import if present; keep the function name
stream_http_error_event and default value MEDIA_TYPE_JSON unchanged.

Source: Coding guidelines

Comment on lines +114 to +115
await asyncio.sleep(0.01)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Replace these fixed sleeps with explicit completion signals.

Each of these assertions races the event loop. A slower CI run can hit the assertion before the callback/background task finishes, and the last test never actually joins the cancelled consumer task. Wait on an asyncio.Event, await the spawned task, or explicitly drain utils.stream_interrupt.background_topic_summary_tasks instead of sleeping for an assumed duration.

Also applies to: 265-265, 342-342, 529-531

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/utils/test_stream_interrupt.py` around lines 114 - 115, Replace
the fragile fixed sleep (await asyncio.sleep(0.01)) with an explicit completion
mechanism: have the test wait for the actual work to finish by either awaiting
the spawned task, waiting on an asyncio.Event set by the background callback, or
draining utils.stream_interrupt.background_topic_summary_tasks (e.g., await all
tasks in that set) so the assertion only runs after the consumer/callback has
completed; update the spots mentioned (around lines with await asyncio.sleep,
and similarly at the other locations noted) to use one of these explicit waits
and ensure the cancelled consumer task is awaited/joined instead of relying on
time-based sleeps.

Comment on lines +188 to +193
append_turn_mock.assert_called_once_with(
mock_context.client,
existing_conv_id,
"test",
"You interrupted this request.",
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="tests/unit/utils/test_stream_interrupt.py"

# Show the exact sections referenced in the review comment
sed -n '160,220p' "$FILE" | nl -ba | sed -n '1,120p'
echo "-----"
sed -n '240,310p' "$FILE" | nl -ba | sed -n '1,120p'

# Identify AsyncMock declarations/usages in the file
rg -n "AsyncMock|assert_awaited_once_with|assert_called_once_with|asyncio\.sleep|pytest\.mark\.asyncio" "$FILE"

# Show any nearby context around the specific assertion lines
rg -n "append_turn_mock\.assert_called_once_with|get_topic_summary_mock\.assert_called_once_with" "$FILE" -n

Repository: lightspeed-core/lightspeed-stack

Length of output: 119


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="tests/unit/utils/test_stream_interrupt.py"
if [ ! -f "$FILE" ]; then
  echo "Missing file: $FILE"
  exit 1
fi

echo "== excerpt: 170-210 =="
awk 'NR>=170 && NR<=210 {printf "%5d %s\n", NR, $0}' "$FILE"

echo "== excerpt: 250-290 =="
awk 'NR>=250 && NR<=290 {printf "%5d %s\n", NR, $0}' "$FILE"

echo "== rg summary (AsyncMock/assertions/sleep/asyncio marker) =="
rg -n "AsyncMock|assert_awaited_once_with|assert_called_once_with|asyncio\.sleep|pytest\.mark\.asyncio" "$FILE" || true

echo "== rg for the specific mocks referenced in the review comment =="
rg -n "append_turn_mock\.assert_called_once_with|get_topic_summary_mock\.assert_called_once_with|append_turn_mock\.assert_awaited_once_with|get_topic_summary_mock\.assert_awaited_once_with" "$FILE" || true

Repository: lightspeed-core/lightspeed-stack

Length of output: 5967


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="tests/unit/utils/test_stream_interrupt.py"

echo "== excerpt: 130-200 =="
awk 'NR>=120 && NR<=210 {printf "%5d %s\n", NR, $0}' "$FILE"

echo "== excerpt: 220-280 =="
awk 'NR>=210 && NR<=290 {printf "%5d %s\n", NR, $0}' "$FILE"

echo "== rg for get_topic_summary_mock patch/creation =="
rg -n "get_topic_summary_mock|get_topic_summary" "$FILE" | head -n 80

echo "== rg for append_turn_mock patch/creation =="
rg -n "append_turn_mock|append_turn_to_conversation" "$FILE" | head -n 80

Repository: lightspeed-core/lightspeed-stack

Length of output: 9196


Use awaited assertions for AsyncMock mocks in tests/unit/utils/test_stream_interrupt.py

  • append_turn_mock (lines 188-193) is an AsyncMock asserted with assert_called_once_with; switch to assert_awaited_once_with.
  • get_topic_summary_mock (lines 270-274) is an AsyncMock asserted with assert_called_once_with; switch to assert_awaited_once_with.
  • Avoid await asyncio.sleep(0.1) timing-based waits; use deterministic synchronization (e.g., await the spawned task / an Event) to reduce CI flakiness.
Suggested assertion change
-        append_turn_mock.assert_called_once_with(
+        append_turn_mock.assert_awaited_once_with(
             mock_context.client,
             existing_conv_id,
             "test",
             "You interrupted this request.",
         )
@@
-        get_topic_summary_mock.assert_called_once_with(
+        get_topic_summary_mock.assert_awaited_once_with(
             "What is Kubernetes?",
             mock_context.client,
             "provider1/model1",
         )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/utils/test_stream_interrupt.py` around lines 188 - 193, The test
uses AsyncMock instances but asserts them with sync assertions; change
append_turn_mock.assert_called_once_with(...) to await-based assertion
append_turn_mock.assert_awaited_once_with(...) and likewise change
get_topic_summary_mock.assert_called_once_with(...) to
get_topic_summary_mock.assert_awaited_once_with(...). Also remove timing-based
await asyncio.sleep(0.1) and replace with deterministic synchronization: await
the spawned background task or wait on an asyncio.Event/Task.result() that the
test triggers so the AsyncMocks are guaranteed to have been awaited before
asserting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant