fix: git middleware breaks SSE disconnect handling#1319
fix: git middleware breaks SSE disconnect handling#1319
Conversation
📝 WalkthroughWalkthroughThis change adds SSE heartbeat support to prevent stream timeouts during long-idle transfers, introduces disconnect-aware streaming that suppresses final messages when clients disconnect, refactors multiple streaming endpoints to use a unified heartbeat wrapper, and optimizes Git Sync middleware to bypass Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
📊 Coverage ReportOverall Coverage: 92% Diff: origin/main...HEAD
Summary
Line-by-lineView line-by-line diff coverageapp/desktop/git_sync/middleware.pyLines 106-127 106 save_context per write inside its worker loop.
107 """
108 manager = self._get_manager_for_request(request)
109 if manager is None:
! 110 await self.app(scope, receive, send)
! 111 return
112
113 try:
114 await manager.ensure_fresh_for_read()
! 115 except GitSyncError as e:
! 116 status, message = self._map_error(e)
! 117 response = Response(
118 content=json.dumps({"detail": message}, ensure_ascii=False),
119 status_code=status,
120 media_type="application/json",
121 )
! 122 await response(scope, receive, send)
! 123 return
124
125 self._notify_background_sync(manager)
126 # Attach the manager via scope["state"] so build_save_context(request)
127 # can find it via request.state.git_sync_manager.libs/server/kiln_server/document_api.pyLines 1820-1828 1820 documents=[document],
1821 save_context=save_context,
1822 )
1823
! 1824 return await run_extractor_runner_with_status(extractor_runner, request)
1825
1826 @app.delete(
1827 "/api/projects/{project_id}/documents/{document_id}/extractions/{extraction_id}",
1828 tags=["Documents"],
|
There was a problem hiding this comment.
Code Review
This pull request implements a mechanism to ensure client disconnects are correctly propagated to background workers, particularly for Server-Sent Events (SSE) endpoints. It introduces a bypass in GitSyncMiddleware for @no_write_lock endpoints and a new stream_with_heartbeat utility to handle idle timeouts and disconnect polling. Feedback suggests refining the cleanup logic in the SSE utility to target the iterator rather than the source to prevent resource leaks, and ensuring the 'complete' sentinel is only sent when appropriate to avoid misleading the frontend after fatal errors.
| if hasattr(source, "aclose"): | ||
| with contextlib.suppress(BaseException): | ||
| await _call_aclose(source) |
There was a problem hiding this comment.
The cleanup logic should target the iterator it rather than the iterable source. While they are often the same object (e.g., for async generators), they can differ if a custom class implements __aiter__. Closing the iterator ensures that the actual producer is terminated, which is critical for stopping background workers in this context.
| if not await request.is_disconnected(): | ||
| # Send the final complete message the app expects, and uses to stop listening | ||
| yield "data: complete\n\n" |
There was a problem hiding this comment.
The 'complete' sentinel is yielded even if a fatal exception occurred and was caught in the preceding blocks. While this might be intended to signal the frontend to stop listening, it could be misleading if the job actually failed. Consider only yielding 'complete' if no exception was raised or if the error was non-fatal.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
libs/server/kiln_server/document_api.py (1)
1441-1472:⚠️ Potential issue | 🟠 MajorHold the extractor lock during streaming body iteration.
The return statement at line 1472 constructs and returns the
StreamingResponseimmediately, exiting theasync with shared_async_lock_manager.acquire(...)context. However, the async generatorevent_generator()does not execute until FastAPI iterates the response body after the route handler returns. This meansextractor_runner.run()executes after the lock is released, allowing concurrent extractor/RAG runs to overlap despite the lock's intent.Move document selection and runner execution into the generator, or manually manage the lock lifetime (acquire before constructing the runner, release in the generator's
finallyblock).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/server/kiln_server/document_api.py` around lines 1441 - 1472, The lock acquired via shared_async_lock_manager.acquire(...) is released before the returned StreamingResponse's async generator runs, so move the critical work into the generator or manually manage the lock across streaming: acquire the lock before constructing and yielding responses, run extractor_runner.run() (or call run_extractor_runner_with_status) from inside the async generator (e.g., event_generator) so the work executes while the lock is held, and ensure the lock is released in the generator's finally block; reference shared_async_lock_manager.acquire, ExtractorRunner, run_extractor_runner_with_status, and event_generator to locate and move the document selection and extractor_runner execution into the streaming generator or wrap acquisition/release around the generator lifecycle.
🧹 Nitpick comments (3)
app/desktop/git_sync/test_middleware.py (1)
989-1013: Assert the full bypass contract here.This test proves state is populated, but it would still pass if the bypass stopped calling
ensure_fresh_for_read()or attached an unexpected manager. Please assert the expected manager identity and freshness call.🧪 Proposed test strengthening
def test_no_write_lock_bypass_still_attaches_manager_to_state(git_repos): @@ local_path, _ = git_repos config = _auto_config(str(local_path)) + expected_manager = GitSyncRegistry.get_or_create( + local_path, auth_mode="system_keys" + ) captured_manager = {} @@ - with mock_git_sync_config(config): + with ( + mock_git_sync_config(config), + patch.object( + expected_manager, + "ensure_fresh_for_read", + wraps=expected_manager.ensure_fresh_for_read, + ) as mock_fresh, + ): client = TestClient(app) resp = client.get(f"/api/projects/{PROJECT_ID}/items") assert resp.status_code == 200 assert captured_manager["value"] != "MISSING" - assert captured_manager["value"] is not None + assert captured_manager["value"] is expected_manager + mock_fresh.assert_called_once()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/desktop/git_sync/test_middleware.py` around lines 989 - 1013, The test currently only checks that request.state.git_sync_manager is present; update test_no_write_lock_bypass_still_attaches_manager_to_state to assert the exact manager identity and that its freshness method was invoked: create a concrete/mock GitSyncManager instance before building the app, inject/attach that instance into the request context used by the no_write_lock bypass (so request.state.git_sync_manager should be that instance), spy or mock the manager.ensure_fresh_for_read method and assert it was called exactly once during the endpoint invocation; reference the no_write_lock decorator, request.state.git_sync_manager, and ensure_fresh_for_read when making these assertions.libs/server/kiln_server/document_api.py (1)
162-168: Type the extractor progress formatter input.The formatter expects
ProgressfromExtractorRunner.run(); make that contract explicit for type checking. As per coding guidelines, "Ensure all Python code is strongly typed."♻️ Proposed typing improvement
from kiln_ai.utils import shared_async_lock_manager +from kiln_ai.utils.async_job_runner import Progress from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error @@ -def _format_extractor_progress_sse(progress) -> str: +def _format_extractor_progress_sse(progress: Progress) -> str:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@libs/server/kiln_server/document_api.py` around lines 162 - 168, Annotate the _format_extractor_progress_sse function to accept the Progress type returned by ExtractorRunner.run() and ensure Progress is imported so type checkers can validate it; specifically update the function signature to use progress: Progress and add the appropriate import for Progress from the module that defines it (the ExtractorRunner or its Progress class), leaving the body unchanged.app/desktop/studio_server/eval_api.py (1)
128-134: Type the progress formatter input.
progressis part of the streaming boundary; annotating it lets type checking catch drift in theEvalRunner.run()progress shape. As per coding guidelines, "Ensure all Python code is strongly typed."♻️ Proposed typing improvement
from kiln_ai.utils.name_generator import generate_memorable_name +from kiln_ai.utils.async_job_runner import Progress from kiln_server.git_sync_decorators import build_save_context, no_write_lock from kiln_server.sse import stream_with_heartbeat @@ -def _format_progress_sse(progress) -> str: +def _format_progress_sse(progress: Progress) -> str:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/desktop/studio_server/eval_api.py` around lines 128 - 134, Annotate the _format_progress_sse(progress) parameter with a concrete type that matches the progress shape emitted by EvalRunner.run() (e.g., define a TypedDict or dataclass like EvalProgress with fields complete: int, total: int, errors: list[str] and use progress: EvalProgress), update the import/type definition, and use that type on both _format_progress_sse and the progress objects produced in EvalRunner.run() so mypy/type-checking will validate the progress shape.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@libs/server/kiln_server/sse.py`:
- Around line 53-85: The iterator returned by source.__aiter__ (variable it) may
have its own aclose and must be closed instead of closing source; in the finally
block, after cancelling pending (as done now), call and await it.aclose() (use
contextlib.suppress(BaseException) around the await) if hasattr(it, "aclose") so
the actual iterator blocked in __anext__ is cleaned up; remove or stop using
_call_aclose for source since closing the returned iterator covers it.
---
Outside diff comments:
In `@libs/server/kiln_server/document_api.py`:
- Around line 1441-1472: The lock acquired via
shared_async_lock_manager.acquire(...) is released before the returned
StreamingResponse's async generator runs, so move the critical work into the
generator or manually manage the lock across streaming: acquire the lock before
constructing and yielding responses, run extractor_runner.run() (or call
run_extractor_runner_with_status) from inside the async generator (e.g.,
event_generator) so the work executes while the lock is held, and ensure the
lock is released in the generator's finally block; reference
shared_async_lock_manager.acquire, ExtractorRunner,
run_extractor_runner_with_status, and event_generator to locate and move the
document selection and extractor_runner execution into the streaming generator
or wrap acquisition/release around the generator lifecycle.
---
Nitpick comments:
In `@app/desktop/git_sync/test_middleware.py`:
- Around line 989-1013: The test currently only checks that
request.state.git_sync_manager is present; update
test_no_write_lock_bypass_still_attaches_manager_to_state to assert the exact
manager identity and that its freshness method was invoked: create a
concrete/mock GitSyncManager instance before building the app, inject/attach
that instance into the request context used by the no_write_lock bypass (so
request.state.git_sync_manager should be that instance), spy or mock the
manager.ensure_fresh_for_read method and assert it was called exactly once
during the endpoint invocation; reference the no_write_lock decorator,
request.state.git_sync_manager, and ensure_fresh_for_read when making these
assertions.
In `@app/desktop/studio_server/eval_api.py`:
- Around line 128-134: Annotate the _format_progress_sse(progress) parameter
with a concrete type that matches the progress shape emitted by EvalRunner.run()
(e.g., define a TypedDict or dataclass like EvalProgress with fields complete:
int, total: int, errors: list[str] and use progress: EvalProgress), update the
import/type definition, and use that type on both _format_progress_sse and the
progress objects produced in EvalRunner.run() so mypy/type-checking will
validate the progress shape.
In `@libs/server/kiln_server/document_api.py`:
- Around line 162-168: Annotate the _format_extractor_progress_sse function to
accept the Progress type returned by ExtractorRunner.run() and ensure Progress
is imported so type checkers can validate it; specifically update the function
signature to use progress: Progress and add the appropriate import for Progress
from the module that defines it (the ExtractorRunner or its Progress class),
leaving the body unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 79a37506-914f-4918-820b-ace510c5daf9
📒 Files selected for processing (8)
app/desktop/git_sync/middleware.pyapp/desktop/git_sync/test_middleware.pyapp/desktop/studio_server/eval_api.pyapp/desktop/studio_server/test_eval_api.pylibs/server/kiln_server/document_api.pylibs/server/kiln_server/sse.pylibs/server/kiln_server/test_document_api.pylibs/server/kiln_server/test_sse.py
| it: AsyncIterator[T] = source.__aiter__() | ||
|
|
||
| async def next_item() -> T: | ||
| return await it.__anext__() | ||
|
|
||
| pending: asyncio.Task[T] | None = None | ||
| try: | ||
| while True: | ||
| if is_disconnected is not None and await is_disconnected(): | ||
| logger.info("SSE stream: client disconnected, cancelling workers") | ||
| return | ||
|
|
||
| if pending is None: | ||
| pending = asyncio.create_task(next_item()) | ||
| done, _ = await asyncio.wait({pending}, timeout=timeout) | ||
| if pending in done: | ||
| try: | ||
| item = pending.result() | ||
| except StopAsyncIteration: | ||
| return | ||
| pending = None | ||
| yield format_chunk(item) | ||
| else: | ||
| logger.debug("SSE stream: idle for %.1fs, emitting heartbeat", timeout) | ||
| yield HEARTBEAT_COMMENT | ||
| finally: | ||
| if pending is not None and not pending.done(): | ||
| pending.cancel() | ||
| with contextlib.suppress(BaseException): | ||
| await pending | ||
| if hasattr(source, "aclose"): | ||
| with contextlib.suppress(BaseException): | ||
| await _call_aclose(source) |
There was a problem hiding this comment.
Close the iterator returned by __aiter__(), not just source.
source is typed as AsyncIterable, so source.__aiter__() may return a separate async iterator with its own aclose(). In that case, lines 83-85 skip the object that is actually blocked in __anext__(), so its finally may not run on disconnect.
Proposed cleanup adjustment
- if hasattr(source, "aclose"):
+ aclose = getattr(it, "aclose", None) or getattr(source, "aclose", None)
+ if aclose is not None:
with contextlib.suppress(BaseException):
- await _call_aclose(source)
+ await aclose()This also makes _call_aclose() unnecessary.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@libs/server/kiln_server/sse.py` around lines 53 - 85, The iterator returned
by source.__aiter__ (variable it) may have its own aclose and must be closed
instead of closing source; in the finally block, after cancelling pending (as
done now), call and await it.aclose() (use contextlib.suppress(BaseException)
around the await) if hasattr(it, "aclose") so the actual iterator blocked in
__anext__ is cleaned up; remove or stop using _call_aclose for source since
closing the returned iterator covers it.
scosman
left a comment
There was a problem hiding this comment.
I have a lower blast radius fix coming for this.
|
Closing - replaced with smaller blast radius one: #1322 |
What does this PR do?
Problem: a started Eval or Rag job does not terminate when the user refreshes their browser. The job continues running in the background - potentially at high concurrency.
Checklists
The cause is quite in the weeds, so do not understand all of it yet; but here is the slop breakdown:
Fix: SSE runs (eval/extractor/RAG) don't cancel on browser hard-refresh
The bug
Running an eval via
GET /api/projects/{p}/tasks/{t}/evals/{e}/eval_config/{c}/run_comparisonand then hard-refreshing the browser left the eval running server-side indefinitely. New
EvalRunrows kept being created, LLM calls kept being made, and the app became unresponsive while the
worker pool continued to churn. Previously, refresh cancelled the in-flight eval; that behavior
regressed when git-sync (PR
a46ff424e, "Phase 3: SSE endpoint wiring") landed.Same bug applied to
run_extractor_configandrun_rag_configindocument_api.py— all threeSSE endpoints share the same pattern.
Root cause
The regression had two compounding problems, both caused by
GitSyncMiddlewareextendingstarlette.middleware.base.BaseHTTPMiddleware:1.
request.is_disconnected()silently returned False underBaseHTTPMiddlewareBaseHTTPMiddleware.__call__replaces the ASGIreceivecallable with its ownreceive_or_disconnect, which opens ananyio.create_task_groupinternally.Request.is_disconnected()polls with a pre-cancelled
anyio.CancelScope; under the middleware, the nested task groupunwinds before any message can be dequeued, so the method returned False even when the client
had disconnected. No disconnect signal ever reached the endpoint.
2. No other disconnect detection fired during long LLM calls
Starlette 0.52.1 (ASGI spec ≥ 2.4) removed
StreamingResponse.listen_for_disconnect. ModernStarlette only notices a disconnect when
await send(...)raisesOSError. Since our SSEgenerator only called
send()whenAsyncJobRunneryielded a new progress event — which couldbe 30+ seconds during a slow LLM call — the TCP close went undetected for the whole run.
Why the worker pool then stayed alive
AsyncJobRunner.run()already has correct cancellation wiring: itsfinallyblock cancels allworker tasks. But that
finallyonly fires if the caller'sasync for progress in runner.run():exits. Since nothing was exiting the outer SSE generator, that
finallynever ran. Workers keptpulling new jobs off the queue → "keeps running new eval runs" as reported.
The fix
Three layers, each addressing one link in the chain.
1. Bypass
BaseHTTPMiddlewarefor@no_write_lockendpointsapp/desktop/git_sync/middleware.py— overrideGitSyncMiddleware.__call__so endpoints marked@no_write_lock(the SSE ones) get a pure ASGI pass-through. The realreceiveandsendreach the endpoint directly;
request.is_disconnected()now works. Other endpoints still usethe normal
BaseHTTPMiddleware.dispatchpath.The bypass still does what the normal path did:
ensure_fresh_for_read(),scope["state"]["git_sync_manager"]sobuild_save_context(request)works inside the endpoint,
It just skips the anyio task-group wrapping that breaks streaming.
2. Active disconnect polling in the SSE helper
libs/server/kiln_server/sse.py— newstream_with_heartbeat()helper wraps an async sourceand:
is_disconnected()at each iteration (default 3s). If it returns True, thegenerator returns, triggering the source's
aclose()in its ownfinallyblock.: keepalive\n\n) afterheartbeat_secondsof idleness.Keepalives force
send()to fire, which raisesOSError→ClientDisconnectif theconnection is closed — belt-and-suspenders in case polling misses.
aclose()s the source in itsfinallyblock, regardless of exit reason.Default heartbeat is 3s (
DEFAULT_HEARTBEAT_SECONDS).3. Prompt cleanup via
contextlib.aclosingEach SSE endpoint now wraps the heartbeat stream in
contextlib.aclosing(hb):Why
aclosing: when Starlette cancelsevent_generatoron client disconnect, a plainasync fordoes not automatically close the inner generator — cleanup would wait for GC,leaving workers alive for an indeterminate time.
aclosingguaranteeshb.aclose()runssynchronously, which in turn runs
stream_with_heartbeat'sfinally, which callseval_runner.run().aclose(), which firesAsyncJobRunner'sfinallythat cancels all workers.Cancellation chain (end to end)
{"type": "http.disconnect"}on the receive channel.request._receiveis the real channel (not themiddleware's wrapped proxy), so on the next poll tick,
request.is_disconnected()returnsTrue.
stream_with_heartbeatreturns; itsfinallyrunseval_runner.run().aclose().aclose()raisesGeneratorExitinsideAsyncJobRunner.run()at its current yield point.AsyncJobRunner.run()'sfinallyruns:for w in workers: w.cancel().await run_job_fn(job)raisesCancelledError.CancelledErrorinherits from
BaseException(notException), so the worker'sexcept Exceptionclausesdon't catch it. The worker coroutine unwinds and the task completes.
asyncio.gather(*workers)inAsyncJobRunner.run()'sfinallycompletes.event_generator'saclosingblock exits.Worst-case latency is one heartbeat interval (3s) between refresh and all workers having
cancel()called.Files changed
app/desktop/git_sync/middleware.py— ASGI bypass for@no_write_lock.libs/server/kiln_server/sse.py— new module withstream_with_heartbeat.app/desktop/studio_server/eval_api.py— usestream_with_heartbeat+aclosing.libs/server/kiln_server/document_api.py— same for extractor and RAG workflow helpers.Tests added
libs/server/kiln_server/test_sse.py— unit tests forstream_with_heartbeat(forwardsitems, emits heartbeat on idle, closes source on normal exit / on
aclose/ onis_disconnected=True).app/desktop/git_sync/test_middleware.py— three new tests: bypass skipsdispatch(),bypass still attaches
git_sync_managertorequest.state, and an ASGI-level testproving
http.disconnectreaches the endpoint's receive channel under the bypass.app/desktop/studio_server/test_eval_api.pyandlibs/server/kiln_server/test_document_api.py— regression tests exercising disconnect(aclose),
is_disconnected=Truepolling, the happy path, and idle-heartbeat emission.All 297 touched-module tests pass;
./checks.sh --agent-modeexits 0.Logging
When the disconnect path triggers, you'll see:
at
INFOlevel from thekiln_server.sselogger. EnableDEBUGon that logger if you wantto see heartbeat emissions (
SSE stream: idle for 3.0s, emitting heartbeat).Open follow-ups (not in this fix)
already-running eval — it starts from zero on the next click. Fixing this needs persistent
server-side run state and a
GET …/current_runendpoint. Separate PR.was started intentionally (e.g. accidental "Run All Evals"). A kill-switch endpoint keyed by
eval/run id would be a small, robust addition.
single-threaded git executor and a
threading.Lock. Fine at small scale, but large evalsets may bottleneck on atomic_write. Worth profiling separately.
Summary by CodeRabbit
New Features
Bug Fixes