Skip to content

fix: git middleware breaks SSE disconnect handling#1319

Closed
leonardmq wants to merge 1 commit intomainfrom
leonard/kil-582-evals-eval-run-keeps-going-on-server-after-hard-refresh-user
Closed

fix: git middleware breaks SSE disconnect handling#1319
leonardmq wants to merge 1 commit intomainfrom
leonard/kil-582-evals-eval-run-keeps-going-on-server-after-hard-refresh-user

Conversation

@leonardmq
Copy link
Copy Markdown
Collaborator

@leonardmq leonardmq commented Apr 22, 2026

What does this PR do?

Problem: a started Eval or Rag job does not terminate when the user refreshes their browser. The job continues running in the background - potentially at high concurrency.

Checklists

  • Tests have been run locally and passed
  • New tests have been added to any work in /lib

The cause is quite in the weeds, so do not understand all of it yet; but here is the slop breakdown:

Fix: SSE runs (eval/extractor/RAG) don't cancel on browser hard-refresh

The bug

Running an eval via GET /api/projects/{p}/tasks/{t}/evals/{e}/eval_config/{c}/run_comparison
and then hard-refreshing the browser left the eval running server-side indefinitely. New EvalRun
rows kept being created, LLM calls kept being made, and the app became unresponsive while the
worker pool continued to churn. Previously, refresh cancelled the in-flight eval; that behavior
regressed when git-sync (PR a46ff424e, "Phase 3: SSE endpoint wiring") landed.

Same bug applied to run_extractor_config and run_rag_config in document_api.py — all three
SSE endpoints share the same pattern.

Root cause

The regression had two compounding problems, both caused by GitSyncMiddleware extending
starlette.middleware.base.BaseHTTPMiddleware:

1. request.is_disconnected() silently returned False under BaseHTTPMiddleware

BaseHTTPMiddleware.__call__ replaces the ASGI receive callable with its own
receive_or_disconnect, which opens an anyio.create_task_group internally. Request.is_disconnected()
polls with a pre-cancelled anyio.CancelScope; under the middleware, the nested task group
unwinds before any message can be dequeued, so the method returned False even when the client
had disconnected. No disconnect signal ever reached the endpoint.

2. No other disconnect detection fired during long LLM calls

Starlette 0.52.1 (ASGI spec ≥ 2.4) removed StreamingResponse.listen_for_disconnect. Modern
Starlette only notices a disconnect when await send(...) raises OSError. Since our SSE
generator only called send() when AsyncJobRunner yielded a new progress event — which could
be 30+ seconds during a slow LLM call — the TCP close went undetected for the whole run.

Why the worker pool then stayed alive

AsyncJobRunner.run() already has correct cancellation wiring: its finally block cancels all
worker tasks. But that finally only fires if the caller's async for progress in runner.run():
exits. Since nothing was exiting the outer SSE generator, that finally never ran. Workers kept
pulling new jobs off the queue → "keeps running new eval runs" as reported.

The fix

Three layers, each addressing one link in the chain.

1. Bypass BaseHTTPMiddleware for @no_write_lock endpoints

app/desktop/git_sync/middleware.py — override GitSyncMiddleware.__call__ so endpoints marked
@no_write_lock (the SSE ones) get a pure ASGI pass-through. The real receive and send
reach the endpoint directly; request.is_disconnected() now works. Other endpoints still use
the normal BaseHTTPMiddleware.dispatch path.

The bypass still does what the normal path did:

  • resolves the git-sync manager for the project,
  • calls ensure_fresh_for_read(),
  • attaches the manager to scope["state"]["git_sync_manager"] so build_save_context(request)
    works inside the endpoint,
  • notifies the background sync task.

It just skips the anyio task-group wrapping that breaks streaming.

2. Active disconnect polling in the SSE helper

libs/server/kiln_server/sse.py — new stream_with_heartbeat() helper wraps an async source
and:

  • polls is_disconnected() at each iteration (default 3s). If it returns True, the
    generator returns, triggering the source's aclose() in its own finally block.
  • emits SSE keepalive comments (: keepalive\n\n) after heartbeat_seconds of idleness.
    Keepalives force send() to fire, which raises OSErrorClientDisconnect if the
    connection is closed — belt-and-suspenders in case polling misses.
  • always aclose()s the source in its finally block, regardless of exit reason.

Default heartbeat is 3s (DEFAULT_HEARTBEAT_SECONDS).

3. Prompt cleanup via contextlib.aclosing

Each SSE endpoint now wraps the heartbeat stream in contextlib.aclosing(hb):

async def event_generator():
    hb = stream_with_heartbeat(
        eval_runner.run(),
        _format_progress_sse,
        is_disconnected=request.is_disconnected,
    )
    async with contextlib.aclosing(hb) as stream:
        async for chunk in stream:
            yield chunk
    if not await request.is_disconnected():
        yield "data: complete\n\n"

Why aclosing: when Starlette cancels event_generator on client disconnect, a plain
async for does not automatically close the inner generator — cleanup would wait for GC,
leaving workers alive for an indeterminate time. aclosing guarantees hb.aclose() runs
synchronously, which in turn runs stream_with_heartbeat's finally, which calls
eval_runner.run().aclose(), which fires AsyncJobRunner's finally that cancels all workers.

Cancellation chain (end to end)

  1. Browser hard-refresh → TCP close.
  2. uvicorn sends {"type": "http.disconnect"} on the receive channel.
  3. Under the ASGI bypass, the endpoint's request._receive is the real channel (not the
    middleware's wrapped proxy), so on the next poll tick, request.is_disconnected() returns
    True.
  4. stream_with_heartbeat returns; its finally runs eval_runner.run().aclose().
  5. aclose() raises GeneratorExit inside AsyncJobRunner.run() at its current yield point.
  6. AsyncJobRunner.run()'s finally runs: for w in workers: w.cancel().
  7. Each worker's current await run_job_fn(job) raises CancelledError. CancelledError
    inherits from BaseException (not Exception), so the worker's except Exception clauses
    don't catch it. The worker coroutine unwinds and the task completes.
  8. asyncio.gather(*workers) in AsyncJobRunner.run()'s finally completes.
  9. event_generator's aclosing block exits.
  10. Response generator ends. The SSE connection is already closed client-side.

Worst-case latency is one heartbeat interval (3s) between refresh and all workers having
cancel() called.

Files changed

  • app/desktop/git_sync/middleware.py — ASGI bypass for @no_write_lock.
  • libs/server/kiln_server/sse.py — new module with stream_with_heartbeat.
  • app/desktop/studio_server/eval_api.py — use stream_with_heartbeat + aclosing.
  • libs/server/kiln_server/document_api.py — same for extractor and RAG workflow helpers.

Tests added

  • libs/server/kiln_server/test_sse.py — unit tests for stream_with_heartbeat (forwards
    items, emits heartbeat on idle, closes source on normal exit / on aclose / on
    is_disconnected=True).
  • app/desktop/git_sync/test_middleware.py — three new tests: bypass skips dispatch(),
    bypass still attaches git_sync_manager to request.state, and an ASGI-level test
    proving http.disconnect reaches the endpoint's receive channel under the bypass.
  • app/desktop/studio_server/test_eval_api.py and
    libs/server/kiln_server/test_document_api.py — regression tests exercising disconnect
    (aclose), is_disconnected=True polling, the happy path, and idle-heartbeat emission.

All 297 touched-module tests pass; ./checks.sh --agent-mode exits 0.

Logging

When the disconnect path triggers, you'll see:

SSE stream: client disconnected, cancelling workers

at INFO level from the kiln_server.sse logger. Enable DEBUG on that logger if you want
to see heartbeat emissions (SSE stream: idle for 3.0s, emitting heartbeat).

Open follow-ups (not in this fix)

  • UI reconnect-to-in-flight-eval. After a refresh, the page has no way to rejoin an
    already-running eval — it starts from zero on the next click. Fixing this needs persistent
    server-side run state and a GET …/current_run endpoint. Separate PR.
  • Cancel button. With the SSE stream gone, there's no user-facing way to abort a run that
    was started intentionally (e.g. accidental "Run All Evals"). A kill-switch endpoint keyed by
    eval/run id would be a small, robust addition.
  • Concurrency tuning. Default is 25 workers; each eval-save serializes through the
    single-threaded git executor and a threading.Lock. Fine at small scale, but large eval
    sets may bottleneck on atomic_write. Worth profiling separately.

Summary by CodeRabbit

  • New Features

    • Added SSE heartbeat support to maintain connections during long-running operations like evaluations and document processing.
    • Improved progress reporting with formatted status updates for streaming operations.
  • Bug Fixes

    • Enhanced client disconnection handling to properly detect and respond when users disconnect from streaming endpoints.
    • Ensured proper cleanup of resources when streams are terminated.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 22, 2026

📝 Walkthrough

Walkthrough

This change adds SSE heartbeat support to prevent stream timeouts during long-idle transfers, introduces disconnect-aware streaming that suppresses final messages when clients disconnect, refactors multiple streaming endpoints to use a unified heartbeat wrapper, and optimizes Git Sync middleware to bypass BaseHTTPMiddleware buffering for @no_write_lock endpoints via direct ASGI pass-through.

Changes

Cohort / File(s) Summary
Git Sync Middleware ASGI Optimization
app/desktop/git_sync/middleware.py
Added custom __call__ method to intercept ASGI http requests and route @no_write_lock endpoints through new _handle_self_managed for direct pass-through without BaseHTTPMiddleware buffering; includes git sync manager attachment and fresh-state checks.
Git Sync Middleware Tests
app/desktop/git_sync/test_middleware.py
Added three test cases verifying ASGI bypass behavior: dispatch() is not invoked for @no_write_lock, git sync manager is attached to request state, and real ASGI receive channel is forwarded for disconnect observation.
SSE Heartbeat Infrastructure
libs/server/kiln_server/sse.py
New module providing stream_with_heartbeat async generator wrapper that emits periodic SSE heartbeat comments (: keepalive\n\n) during idle periods, supports optional disconnect detection, and ensures cleanup of underlying source generators.
SSE Heartbeat Tests
libs/server/kiln_server/test_sse.py
New test suite validating heartbeat emission during idle streams, resource cleanup on normal/cancelled completion, disconnect-triggered early termination, and correct forwarding of source items via format callback.
Studio Eval API Streaming
app/desktop/studio_server/eval_api.py
Refactored run_eval_runner_with_status to wrap eval runner with stream_with_heartbeat, added disconnect-aware finalization (suppress "data: complete" if client disconnected), and introduced _format_progress_sse helper for SSE payload formatting.
Studio Eval API Tests
app/desktop/studio_server/test_eval_api.py
Added four async regression tests covering client disconnect cancellation, exit behavior on disconnection, normal completion with completion sentinel, and heartbeat emission during idle generator periods.
Document API Streaming (Extractor & RAG)
libs/server/kiln_server/document_api.py
Updated run_extractor_runner_with_status and run_rag_workflow_runner_with_status to use heartbeat wrapper, added disconnect-aware completion message suppression, and introduced _format_extractor_progress_sse formatter; endpoints now pass request object to runners.
Document API Tests
libs/server/kiln_server/test_document_api.py
Updated existing test calls with mock request parameter; added comprehensive async tests for client disconnect handling, disconnection-triggered stream exit, and heartbeat emission in extractor and RAG workflow streaming.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Git Auto Sync #1252: Directly related as part of the same Git Auto Sync feature implementation, including middleware ASGI bypass optimization, decorator handling, and SSE/no-write-lock integration.

Poem

🐰 Whisper heartbeats through the streams so long,
No timeouts now—we keep them strong!
When clients slip away unseen,
We know, we pause, we stay serene.
ASGI paths bypass with grace,
Sync flows swift to the right place!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 34.21% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately captures the main fix: addressing a git middleware issue that broke SSE disconnect handling for streaming endpoints.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The PR description is comprehensive and well-structured, covering the problem, root cause, fix layers, cancellation chain, files changed, tests added, logging, and follow-ups.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch leonard/kil-582-evals-eval-run-keeps-going-on-server-after-hard-refresh-user

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

📊 Coverage Report

Overall Coverage: 92%

Diff: origin/main...HEAD

  • app/desktop/git_sync/middleware.py (75.0%): Missing lines 110-111,115-117,122-123
  • app/desktop/studio_server/eval_api.py (100%)
  • libs/server/kiln_server/document_api.py (95.7%): Missing lines 1824
  • libs/server/kiln_server/sse.py (100%)

Summary

  • Total: 105 lines
  • Missing: 8 lines
  • Coverage: 92%

Line-by-line

View line-by-line diff coverage

app/desktop/git_sync/middleware.py

Lines 106-127

  106         save_context per write inside its worker loop.
  107         """
  108         manager = self._get_manager_for_request(request)
  109         if manager is None:
! 110             await self.app(scope, receive, send)
! 111             return
  112 
  113         try:
  114             await manager.ensure_fresh_for_read()
! 115         except GitSyncError as e:
! 116             status, message = self._map_error(e)
! 117             response = Response(
  118                 content=json.dumps({"detail": message}, ensure_ascii=False),
  119                 status_code=status,
  120                 media_type="application/json",
  121             )
! 122             await response(scope, receive, send)
! 123             return
  124 
  125         self._notify_background_sync(manager)
  126         # Attach the manager via scope["state"] so build_save_context(request)
  127         # can find it via request.state.git_sync_manager.

libs/server/kiln_server/document_api.py

Lines 1820-1828

  1820             documents=[document],
  1821             save_context=save_context,
  1822         )
  1823 
! 1824         return await run_extractor_runner_with_status(extractor_runner, request)
  1825 
  1826     @app.delete(
  1827         "/api/projects/{project_id}/documents/{document_id}/extractions/{extraction_id}",
  1828         tags=["Documents"],


Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a mechanism to ensure client disconnects are correctly propagated to background workers, particularly for Server-Sent Events (SSE) endpoints. It introduces a bypass in GitSyncMiddleware for @no_write_lock endpoints and a new stream_with_heartbeat utility to handle idle timeouts and disconnect polling. Feedback suggests refining the cleanup logic in the SSE utility to target the iterator rather than the source to prevent resource leaks, and ensuring the 'complete' sentinel is only sent when appropriate to avoid misleading the frontend after fatal errors.

Comment on lines +83 to +85
if hasattr(source, "aclose"):
with contextlib.suppress(BaseException):
await _call_aclose(source)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The cleanup logic should target the iterator it rather than the iterable source. While they are often the same object (e.g., for async generators), they can differ if a custom class implements __aiter__. Closing the iterator ensures that the actual producer is terminated, which is critical for stopping background workers in this context.

Comment on lines +271 to +273
if not await request.is_disconnected():
# Send the final complete message the app expects, and uses to stop listening
yield "data: complete\n\n"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The 'complete' sentinel is yielded even if a fatal exception occurred and was caught in the preceding blocks. While this might be intended to signal the frontend to stop listening, it could be misleading if the job actually failed. Consider only yielding 'complete' if no exception was raised or if the error was non-fatal.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
libs/server/kiln_server/document_api.py (1)

1441-1472: ⚠️ Potential issue | 🟠 Major

Hold the extractor lock during streaming body iteration.

The return statement at line 1472 constructs and returns the StreamingResponse immediately, exiting the async with shared_async_lock_manager.acquire(...) context. However, the async generator event_generator() does not execute until FastAPI iterates the response body after the route handler returns. This means extractor_runner.run() executes after the lock is released, allowing concurrent extractor/RAG runs to overlap despite the lock's intent.

Move document selection and runner execution into the generator, or manually manage the lock lifetime (acquire before constructing the runner, release in the generator's finally block).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/server/kiln_server/document_api.py` around lines 1441 - 1472, The lock
acquired via shared_async_lock_manager.acquire(...) is released before the
returned StreamingResponse's async generator runs, so move the critical work
into the generator or manually manage the lock across streaming: acquire the
lock before constructing and yielding responses, run extractor_runner.run() (or
call run_extractor_runner_with_status) from inside the async generator (e.g.,
event_generator) so the work executes while the lock is held, and ensure the
lock is released in the generator's finally block; reference
shared_async_lock_manager.acquire, ExtractorRunner,
run_extractor_runner_with_status, and event_generator to locate and move the
document selection and extractor_runner execution into the streaming generator
or wrap acquisition/release around the generator lifecycle.
🧹 Nitpick comments (3)
app/desktop/git_sync/test_middleware.py (1)

989-1013: Assert the full bypass contract here.

This test proves state is populated, but it would still pass if the bypass stopped calling ensure_fresh_for_read() or attached an unexpected manager. Please assert the expected manager identity and freshness call.

🧪 Proposed test strengthening
 def test_no_write_lock_bypass_still_attaches_manager_to_state(git_repos):
@@
     local_path, _ = git_repos
     config = _auto_config(str(local_path))
+    expected_manager = GitSyncRegistry.get_or_create(
+        local_path, auth_mode="system_keys"
+    )
 
     captured_manager = {}
@@
-    with mock_git_sync_config(config):
+    with (
+        mock_git_sync_config(config),
+        patch.object(
+            expected_manager,
+            "ensure_fresh_for_read",
+            wraps=expected_manager.ensure_fresh_for_read,
+        ) as mock_fresh,
+    ):
         client = TestClient(app)
         resp = client.get(f"/api/projects/{PROJECT_ID}/items")
 
     assert resp.status_code == 200
     assert captured_manager["value"] != "MISSING"
-    assert captured_manager["value"] is not None
+    assert captured_manager["value"] is expected_manager
+    mock_fresh.assert_called_once()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/desktop/git_sync/test_middleware.py` around lines 989 - 1013, The test
currently only checks that request.state.git_sync_manager is present; update
test_no_write_lock_bypass_still_attaches_manager_to_state to assert the exact
manager identity and that its freshness method was invoked: create a
concrete/mock GitSyncManager instance before building the app, inject/attach
that instance into the request context used by the no_write_lock bypass (so
request.state.git_sync_manager should be that instance), spy or mock the
manager.ensure_fresh_for_read method and assert it was called exactly once
during the endpoint invocation; reference the no_write_lock decorator,
request.state.git_sync_manager, and ensure_fresh_for_read when making these
assertions.
libs/server/kiln_server/document_api.py (1)

162-168: Type the extractor progress formatter input.

The formatter expects Progress from ExtractorRunner.run(); make that contract explicit for type checking. As per coding guidelines, "Ensure all Python code is strongly typed."

♻️ Proposed typing improvement
 from kiln_ai.utils import shared_async_lock_manager
+from kiln_ai.utils.async_job_runner import Progress
 from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error
@@
-def _format_extractor_progress_sse(progress) -> str:
+def _format_extractor_progress_sse(progress: Progress) -> str:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/server/kiln_server/document_api.py` around lines 162 - 168, Annotate the
_format_extractor_progress_sse function to accept the Progress type returned by
ExtractorRunner.run() and ensure Progress is imported so type checkers can
validate it; specifically update the function signature to use progress:
Progress and add the appropriate import for Progress from the module that
defines it (the ExtractorRunner or its Progress class), leaving the body
unchanged.
app/desktop/studio_server/eval_api.py (1)

128-134: Type the progress formatter input.

progress is part of the streaming boundary; annotating it lets type checking catch drift in the EvalRunner.run() progress shape. As per coding guidelines, "Ensure all Python code is strongly typed."

♻️ Proposed typing improvement
 from kiln_ai.utils.name_generator import generate_memorable_name
+from kiln_ai.utils.async_job_runner import Progress
 from kiln_server.git_sync_decorators import build_save_context, no_write_lock
 from kiln_server.sse import stream_with_heartbeat
@@
-def _format_progress_sse(progress) -> str:
+def _format_progress_sse(progress: Progress) -> str:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/desktop/studio_server/eval_api.py` around lines 128 - 134, Annotate the
_format_progress_sse(progress) parameter with a concrete type that matches the
progress shape emitted by EvalRunner.run() (e.g., define a TypedDict or
dataclass like EvalProgress with fields complete: int, total: int, errors:
list[str] and use progress: EvalProgress), update the import/type definition,
and use that type on both _format_progress_sse and the progress objects produced
in EvalRunner.run() so mypy/type-checking will validate the progress shape.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@libs/server/kiln_server/sse.py`:
- Around line 53-85: The iterator returned by source.__aiter__ (variable it) may
have its own aclose and must be closed instead of closing source; in the finally
block, after cancelling pending (as done now), call and await it.aclose() (use
contextlib.suppress(BaseException) around the await) if hasattr(it, "aclose") so
the actual iterator blocked in __anext__ is cleaned up; remove or stop using
_call_aclose for source since closing the returned iterator covers it.

---

Outside diff comments:
In `@libs/server/kiln_server/document_api.py`:
- Around line 1441-1472: The lock acquired via
shared_async_lock_manager.acquire(...) is released before the returned
StreamingResponse's async generator runs, so move the critical work into the
generator or manually manage the lock across streaming: acquire the lock before
constructing and yielding responses, run extractor_runner.run() (or call
run_extractor_runner_with_status) from inside the async generator (e.g.,
event_generator) so the work executes while the lock is held, and ensure the
lock is released in the generator's finally block; reference
shared_async_lock_manager.acquire, ExtractorRunner,
run_extractor_runner_with_status, and event_generator to locate and move the
document selection and extractor_runner execution into the streaming generator
or wrap acquisition/release around the generator lifecycle.

---

Nitpick comments:
In `@app/desktop/git_sync/test_middleware.py`:
- Around line 989-1013: The test currently only checks that
request.state.git_sync_manager is present; update
test_no_write_lock_bypass_still_attaches_manager_to_state to assert the exact
manager identity and that its freshness method was invoked: create a
concrete/mock GitSyncManager instance before building the app, inject/attach
that instance into the request context used by the no_write_lock bypass (so
request.state.git_sync_manager should be that instance), spy or mock the
manager.ensure_fresh_for_read method and assert it was called exactly once
during the endpoint invocation; reference the no_write_lock decorator,
request.state.git_sync_manager, and ensure_fresh_for_read when making these
assertions.

In `@app/desktop/studio_server/eval_api.py`:
- Around line 128-134: Annotate the _format_progress_sse(progress) parameter
with a concrete type that matches the progress shape emitted by EvalRunner.run()
(e.g., define a TypedDict or dataclass like EvalProgress with fields complete:
int, total: int, errors: list[str] and use progress: EvalProgress), update the
import/type definition, and use that type on both _format_progress_sse and the
progress objects produced in EvalRunner.run() so mypy/type-checking will
validate the progress shape.

In `@libs/server/kiln_server/document_api.py`:
- Around line 162-168: Annotate the _format_extractor_progress_sse function to
accept the Progress type returned by ExtractorRunner.run() and ensure Progress
is imported so type checkers can validate it; specifically update the function
signature to use progress: Progress and add the appropriate import for Progress
from the module that defines it (the ExtractorRunner or its Progress class),
leaving the body unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 79a37506-914f-4918-820b-ace510c5daf9

📥 Commits

Reviewing files that changed from the base of the PR and between c5ac509 and 6cbc3df.

📒 Files selected for processing (8)
  • app/desktop/git_sync/middleware.py
  • app/desktop/git_sync/test_middleware.py
  • app/desktop/studio_server/eval_api.py
  • app/desktop/studio_server/test_eval_api.py
  • libs/server/kiln_server/document_api.py
  • libs/server/kiln_server/sse.py
  • libs/server/kiln_server/test_document_api.py
  • libs/server/kiln_server/test_sse.py

Comment on lines +53 to +85
it: AsyncIterator[T] = source.__aiter__()

async def next_item() -> T:
return await it.__anext__()

pending: asyncio.Task[T] | None = None
try:
while True:
if is_disconnected is not None and await is_disconnected():
logger.info("SSE stream: client disconnected, cancelling workers")
return

if pending is None:
pending = asyncio.create_task(next_item())
done, _ = await asyncio.wait({pending}, timeout=timeout)
if pending in done:
try:
item = pending.result()
except StopAsyncIteration:
return
pending = None
yield format_chunk(item)
else:
logger.debug("SSE stream: idle for %.1fs, emitting heartbeat", timeout)
yield HEARTBEAT_COMMENT
finally:
if pending is not None and not pending.done():
pending.cancel()
with contextlib.suppress(BaseException):
await pending
if hasattr(source, "aclose"):
with contextlib.suppress(BaseException):
await _call_aclose(source)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Close the iterator returned by __aiter__(), not just source.

source is typed as AsyncIterable, so source.__aiter__() may return a separate async iterator with its own aclose(). In that case, lines 83-85 skip the object that is actually blocked in __anext__(), so its finally may not run on disconnect.

Proposed cleanup adjustment
-        if hasattr(source, "aclose"):
+        aclose = getattr(it, "aclose", None) or getattr(source, "aclose", None)
+        if aclose is not None:
             with contextlib.suppress(BaseException):
-                await _call_aclose(source)
+                await aclose()

This also makes _call_aclose() unnecessary.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@libs/server/kiln_server/sse.py` around lines 53 - 85, The iterator returned
by source.__aiter__ (variable it) may have its own aclose and must be closed
instead of closing source; in the finally block, after cancelling pending (as
done now), call and await it.aclose() (use contextlib.suppress(BaseException)
around the await) if hasattr(it, "aclose") so the actual iterator blocked in
__anext__ is cleaned up; remove or stop using _call_aclose for source since
closing the returned iterator covers it.

Copy link
Copy Markdown
Collaborator

@scosman scosman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a lower blast radius fix coming for this.

@leonardmq
Copy link
Copy Markdown
Collaborator Author

Closing - replaced with smaller blast radius one: #1322

@leonardmq leonardmq closed this Apr 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants