Skip to content

Commit 8898b0c

Browse files
ehsan6shaclaude
andcommitted
routes: wake parked consumers on generation bump (last-wins fix)
Advisor 2026-05-28 caught two bugs in the resume flow: 1. consumer_generation bump didn't wake parked consumers. _stream_from_buffer's last-wins check is "while my_generation == session.consumer_generation, sleep on cond". When a new consumer increments the generation, the old consumer is parked in cond.wait() — and stays parked until the next append_event fires notify. If the generator is mid-pause awaiting a user_question reply, no event ever appends so the old consumer sleeps forever. Fix: both POST and resume handlers now notify_all on the cond after bumping consumer_generation, so parked old loops wake up + check the token + exit cleanly. 2. Redundant `session.consumer_generation = my_generation` write inside the POST handler's sse_stream() coroutine. The value was already captured immediately after the increment, and echoing it back risked clobbering a newer consumer's bump if a /resume call arrived between the route handler running and the coroutine starting. Removed. No new tests — both bugs only surface in narrow concurrent scenarios (old consumer mid-pause when new arrives; near-simultaneous POST + resume) that the TestClient's single-coroutine pattern doesn't reliably reproduce. Manual verification: lab device, kill SSE mid-user_question, foreground auto-resume — old consumer log should show exit before the new consumer starts streaming. 30 existing tests still pass. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 1a420fd commit 8898b0c

1 file changed

Lines changed: 16 additions & 3 deletions

File tree

src/routes/troubleshoot.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,13 @@ async def troubleshoot(req: TroubleshootRequest, request: Request) -> Response:
235235
session.generator_done = False
236236
session.consumer_generation += 1
237237
my_generation = session.consumer_generation
238+
# Wake any consumer that was parked in cond.wait() for the prior
239+
# generator — last-wins only works if the old loop actually wakes
240+
# up to check the generation token. Without this notify, a
241+
# consumer that's mid-pause (e.g. awaiting a user_question reply)
242+
# sleeps forever after a new consumer arrives. advisor 2026-05-28.
243+
async with session.cond:
244+
session.cond.notify_all()
238245

239246
session.generator_task = asyncio.create_task(
240247
_drive_generator_into_buffer(
@@ -244,9 +251,10 @@ async def troubleshoot(req: TroubleshootRequest, request: Request) -> Response:
244251
)
245252

246253
async def sse_stream():
247-
# consumer_generation was bumped above; capture it for the
248-
# last-wins check in _stream_from_buffer.
249-
session.consumer_generation = my_generation
254+
# `my_generation` was captured immediately after the increment
255+
# above; _stream_from_buffer re-reads `session.consumer_generation`
256+
# internally so we don't echo the bump here (the duplicate write
257+
# would clobber a newer consumer's increment, advisor 2026-05-28).
250258
try:
251259
async for chunk in _stream_from_buffer(session, from_seq=0):
252260
yield chunk
@@ -294,7 +302,12 @@ async def troubleshoot_resume(
294302
# Slide TTL — resume IS user activity.
295303
session_mgr.touch(session.session_id)
296304
# Last-wins: bump generation so any prior consumer's loop exits.
305+
# Also wake any parked old consumer so it wakes up + checks the
306+
# generation token + exits cleanly (advisor 2026-05-28: old
307+
# consumer mid-pause would otherwise sleep forever).
297308
session.consumer_generation += 1
309+
async with session.cond:
310+
session.cond.notify_all()
298311

299312
async def sse_stream():
300313
try:

0 commit comments

Comments
 (0)