Skip to content

Commit 1a420fd

Browse files
ehsan6shaclaude
andcommitted
routes: detached generator + per-session buffer + /resume endpoint
Mid-session resume support. When the phone backgrounds (OS suspends JS, SSE socket drops) the server used to cancel the generator — asyncio.CancelledError propagated through stream_troubleshoot and the model run died. On foreground the chat had no real recovery beyond "Start a new chat" + the Try-Again button shipped in b2a7dcb. New flow: - POST /troubleshoot mints (or reuses) a SessionState and starts the bridge in an asyncio.create_task that's detached from the SSE handler. The handler is now a buffer reader. - GET /troubleshoot/resume?session_id=X&from=N reattaches to the same task's output by streaming buffered events with seq > N, then awaits the per-session asyncio.Condition for new events. - Each SSE frame carries an `id: <seq>` line so the client's EventSource records lastEventId; the client supplies that as `?from=` on the next reconnect. - Buffer cap is 500 events per session; on overflow the oldest is dropped + dropped_count increments. When a resume's `from` is older than the oldest buffered seq, the SSE stream injects a synthetic `thought` event explaining the gap (no schema change — reusing thought for a transport concern per advisor). Concurrency: - Last-wins consumer policy via SessionState.consumer_generation. Each new SSE handler bumps the token; older consumer loops check the token and exit cleanly. Auto-resume on foreground can't end up with two SSE streams pumping duplicate frames. - POST /troubleshoot on a session with a still-running task returns 409 (callers must use /resume instead). Once the prior task is done, POST starts a fresh generator and resets the buffer. SessionState additions: - event_buffer, next_seq, dropped_count, generator_done, generator_task, consumer_generation, cond - append_event(): single writer entry point (notify_all after append) - mark_done(): idempotent termination + wake-all Test parsers across 4 files updated to handle the new `id: N\\ndata: {...}` SSE block format. All 256 tests pass. Client-side wiring (AsyncStorage persistence + AppState foreground subscriber) ships in apps/box separately. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent c919a1d commit 1a420fd

6 files changed

Lines changed: 517 additions & 49 deletions

File tree

src/routes/troubleshoot.py

Lines changed: 217 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,28 @@
77
C5 adds:
88
- POST /troubleshoot/user-reply (matches Phase 11 contract)
99
- POST /troubleshoot/phone-context (matches Phase 11 contract)
10+
11+
2026-05-28 resume support:
12+
- POST /troubleshoot now starts the generator as a detached
13+
`asyncio.create_task`; the SSE handler is just a buffer reader.
14+
Disconnecting the SSE consumer no longer cancels the model run.
15+
- GET /troubleshoot/resume?session_id=X&from=N reattaches to the
16+
same task's buffered output. App-side persistence (lastEventSeq
17+
in AsyncStorage) plus an AppState foreground subscriber makes
18+
background-then-resume seamless.
1019
"""
1120
from __future__ import annotations
1221

22+
import asyncio
1323
import json
1424
import logging
25+
from typing import AsyncIterator
1526

16-
from fastapi import APIRouter, HTTPException, Request, Response
27+
from fastapi import APIRouter, HTTPException, Query, Request, Response
1728
from fastapi.responses import JSONResponse, StreamingResponse
1829
from pydantic import BaseModel, ConfigDict, Field
1930

20-
from src.session.manager import sanitize_for_log
31+
from src.session.manager import SessionState, sanitize_for_log
2132
from src.session.tool_call_loop import stream_troubleshoot
2233

2334

@@ -61,6 +72,124 @@ def _error_body(code: str, detail: str = "") -> dict:
6172
return out
6273

6374

75+
async def _drive_generator_into_buffer(
76+
session: SessionState,
77+
backend,
78+
tool_executor,
79+
validator,
80+
prompt: str,
81+
) -> None:
82+
"""Run the bridge generator detached from any SSE consumer, writing
83+
each event into the session's buffer. SSE consumers come and go via
84+
`_stream_from_buffer`; this task is the SOLE writer. Survives
85+
consumer disconnect — that's the whole point of the resume feature.
86+
87+
On exception we still try to append a synthetic error event so the
88+
consumer can render the failure instead of staring at a hanging
89+
chat. `mark_done` is in the finally so even hard exceptions release
90+
the consumers' `cond.wait()`.
91+
"""
92+
backend_handles_tools = getattr(backend, "consumes_tool_results", False)
93+
try:
94+
backend_events = backend.run_troubleshoot(
95+
prompt=prompt,
96+
session_id=session.session_id,
97+
)
98+
async for event in stream_troubleshoot(
99+
backend_events, tool_executor, validator,
100+
session=session,
101+
backend_handles_tools=backend_handles_tools,
102+
):
103+
await session.append_event(event)
104+
except asyncio.CancelledError:
105+
# Container shutdown or explicit cancel — propagate after
106+
# marking done so consumers exit.
107+
await session.mark_done()
108+
raise
109+
except Exception as e: # noqa: BLE001
110+
logger.exception("background generator failed session=%s", session.session_id)
111+
try:
112+
await session.append_event({
113+
"type": "error",
114+
"code": "INTERNAL_ERROR",
115+
"message": str(e)[:200],
116+
"recoverable": False,
117+
})
118+
except Exception:
119+
pass # last-ditch; buffer write itself shouldn't crash
120+
finally:
121+
await session.mark_done()
122+
123+
124+
async def _stream_from_buffer(
125+
session: SessionState,
126+
from_seq: int,
127+
) -> AsyncIterator[str]:
128+
"""Yield SSE-formatted strings from the session's event buffer
129+
starting at `from_seq`, blocking on `session.cond` for new events
130+
until the generator marks itself done OR a newer consumer claims
131+
this session (last-wins policy).
132+
133+
SSE `id:` field carries the seq number so the client's
134+
EventSource records it on lastEventId; the client persists this to
135+
AsyncStorage and supplies it as `?from=` on the next reconnect.
136+
137+
Truncation marker: if `from_seq` is less than the oldest buffered
138+
seq (the consumer was away long enough for events to fall off the
139+
cap), inject a synthetic `thought` event with the dropped count.
140+
Per advisor input we don't grow the SSE schema for a flow-control
141+
concern — a `thought` event with the marker text is enough; the
142+
chat surface renders it as italic gray prose."""
143+
my_generation = session.consumer_generation
144+
last_yielded_seq = from_seq - 1
145+
146+
# Truncation detection — fires when the consumer is asking for an
147+
# event older than what's still in the buffer. Includes the
148+
# from_seq=0 case (fresh resume after the head of the buffer
149+
# already overflowed).
150+
if (
151+
session.event_buffer
152+
and session.event_buffer[0][0] > from_seq
153+
):
154+
gap = session.event_buffer[0][0] - from_seq
155+
marker = {
156+
"type": "thought",
157+
"payload": (
158+
f"[resume] {gap} earlier event(s) dropped from the on-device "
159+
f"buffer (cap 500). Newer events from this session continue below."
160+
),
161+
}
162+
# Emit the marker with a special id=-1 (no real seq) so the
163+
# client's lastEventId tracker doesn't try to use it as a
164+
# resume offset later.
165+
yield f"id: -1\ndata: {json.dumps(marker, separators=(',', ':'))}\n\n"
166+
167+
while True:
168+
# Yield any buffered events strictly newer than last_yielded.
169+
# Materialize so we don't hold the buffer reference across the
170+
# await (the buffer is mutated by the producer task).
171+
new_events = [
172+
(s, e) for s, e in session.event_buffer if s > last_yielded_seq
173+
]
174+
for s, e in new_events:
175+
yield f"id: {s}\ndata: {json.dumps(e, separators=(',', ':'))}\n\n"
176+
last_yielded_seq = s
177+
178+
# Check exit conditions BEFORE waiting (covers the case where
179+
# mark_done already fired or a new consumer already took over).
180+
if session.generator_done:
181+
return
182+
if session.consumer_generation != my_generation:
183+
# Last-wins: another /troubleshoot or /resume call took
184+
# over this session. Quietly exit so the network frame
185+
# stream isn't doubled. The newer consumer reads the same
186+
# buffer and picks up from from_seq.
187+
return
188+
189+
async with session.cond:
190+
await session.cond.wait()
191+
192+
64193
@router.post("/troubleshoot")
65194
async def troubleshoot(req: TroubleshootRequest, request: Request) -> Response:
66195
backend = request.app.state.backend
@@ -69,7 +198,7 @@ async def troubleshoot(req: TroubleshootRequest, request: Request) -> Response:
69198
session_mgr = request.app.state.session_manager
70199

71200
# Resolve session: caller-supplied wins; else mint new.
72-
session = None
201+
session: SessionState | None = None
73202
if req.session_id:
74203
session = session_mgr.get(req.session_id)
75204
if session is None:
@@ -80,30 +209,47 @@ async def troubleshoot(req: TroubleshootRequest, request: Request) -> Response:
80209
else:
81210
session = session_mgr.create()
82211

83-
backend_events = backend.run_troubleshoot(
84-
prompt=req.prompt,
85-
session_id=session.session_id,
86-
)
212+
# If this session already has a running generator (caller retried
213+
# POST on the same session_id while a previous task is still
214+
# active), reject. Resume via GET /troubleshoot/resume instead so
215+
# we don't spawn duplicate writers into one buffer.
216+
if (
217+
session.generator_task is not None
218+
and not getattr(session.generator_task, "done", lambda: True)()
219+
and not session.generator_done
220+
):
221+
return JSONResponse(
222+
status_code=409,
223+
content=_error_body(
224+
"session_already_active",
225+
"use GET /troubleshoot/resume?session_id=...&from=N to reattach",
226+
),
227+
)
87228

88-
backend_handles_tools = getattr(backend, "consumes_tool_results", False)
229+
# Reset per-session buffer state for the new generator. We keep the
230+
# SessionState (so phone_context, reply_queue, etc. survive) but
231+
# wipe the conversation buffer so the new prompt starts at seq 0.
232+
session.event_buffer = []
233+
session.next_seq = 0
234+
session.dropped_count = 0
235+
session.generator_done = False
236+
session.consumer_generation += 1
237+
my_generation = session.consumer_generation
238+
239+
session.generator_task = asyncio.create_task(
240+
_drive_generator_into_buffer(
241+
session, backend, tool_executor, validator, req.prompt,
242+
),
243+
name=f"blox-ai-generator-{session.session_id}",
244+
)
89245

90246
async def sse_stream():
247+
# consumer_generation was bumped above; capture it for the
248+
# last-wins check in _stream_from_buffer.
249+
session.consumer_generation = my_generation
91250
try:
92-
async for event in stream_troubleshoot(
93-
backend_events, tool_executor, validator,
94-
session=session,
95-
backend_handles_tools=backend_handles_tools,
96-
):
97-
yield f"data: {json.dumps(event, separators=(',', ':'))}\n\n"
98-
except Exception:
99-
logger.exception("unexpected bridge failure")
100-
fallback = {
101-
"type": "error",
102-
"code": "INTERNAL_ERROR",
103-
"message": "unexpected bridge failure",
104-
"recoverable": False,
105-
}
106-
yield f"data: {json.dumps(fallback, separators=(',', ':'))}\n\n"
251+
async for chunk in _stream_from_buffer(session, from_seq=0):
252+
yield chunk
107253
finally:
108254
# Slide TTL on stream completion so a session that just
109255
# finished a turn doesn't expire instantly.
@@ -119,6 +265,54 @@ async def sse_stream():
119265
)
120266

121267

268+
@router.get("/troubleshoot/resume")
269+
async def troubleshoot_resume(
270+
request: Request,
271+
session_id: str = Query(min_length=1, max_length=128),
272+
from_seq: int = Query(alias="from", default=0, ge=0),
273+
) -> Response:
274+
"""Reattach to a /troubleshoot session's existing generator output.
275+
Returns 404 if the session was evicted (TTL, LRU, or container
276+
restart); the client clears its persisted state on 404 + offers
277+
Start-new-chat.
278+
279+
Replays buffered events newer than `from_seq`, injects a
280+
truncation marker if `from_seq` is older than the oldest buffered
281+
event, then blocks on the session's cond for new events until the
282+
generator marks itself done or a newer consumer takes over.
283+
284+
Idempotent: multiple consumers can call /resume; last-wins kicks
285+
the older consumer (it exits cleanly without re-emitting events).
286+
"""
287+
session_mgr = request.app.state.session_manager
288+
session = session_mgr.get(session_id)
289+
if session is None:
290+
return JSONResponse(
291+
status_code=404,
292+
content=_error_body("session_not_found"),
293+
)
294+
# Slide TTL — resume IS user activity.
295+
session_mgr.touch(session.session_id)
296+
# Last-wins: bump generation so any prior consumer's loop exits.
297+
session.consumer_generation += 1
298+
299+
async def sse_stream():
300+
try:
301+
async for chunk in _stream_from_buffer(session, from_seq=from_seq):
302+
yield chunk
303+
finally:
304+
session_mgr.touch(session.session_id)
305+
306+
return StreamingResponse(
307+
sse_stream(),
308+
media_type="text/event-stream",
309+
headers={
310+
"Cache-Control": "no-cache",
311+
"X-Accel-Buffering": "no",
312+
},
313+
)
314+
315+
122316
@router.post("/troubleshoot/user-reply")
123317
async def user_reply(req: UserReplyRequest, request: Request) -> Response:
124318
"""Phase 11 contract: the app submits this when the user answers a

src/session/manager.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@
2727

2828
DEFAULT_TTL_SEC = 30 * 60 # 30 minutes
2929
DEFAULT_MAX_SESSIONS = 50
30+
# Resume support — per-session event buffer cap. ~1 KB/event × 500 cap ×
31+
# 50 sessions ≈ 25 MB worst-case; comfortable on a device with 7-8 GB
32+
# RAM. On overflow we drop the oldest event AND inject a synthetic
33+
# `thought` truncation marker (see SessionState.append_event) so the
34+
# resuming consumer sees a plain-text note about the gap. Reusing
35+
# `thought` avoids touching the SSE schema for a flow-control concern
36+
# (advisor input: don't grow the schema for transport metadata).
37+
DEFAULT_BUFFER_CAP = 500
3038

3139

3240
@dataclass
@@ -52,6 +60,58 @@ class SessionState:
5260
# for the executor; the HMAC token binds to action_id, the
5361
# dispatch table needs action_name + args.
5462
issued_recommendations: dict[str, dict] = field(default_factory=dict)
63+
# ---- Resume support (added 2026-05-28) -----------------------------
64+
# event_buffer holds (seq, event) tuples in arrival order. seq is
65+
# monotonic per-session — even on truncation we keep counting up so
66+
# gaps in seq let the consumer detect dropped events.
67+
event_buffer: list[tuple[int, dict]] = field(default_factory=list)
68+
next_seq: int = 0
69+
# Total events dropped from the head of the buffer due to cap.
70+
# Cumulative across the session lifetime. Surfaced to the consumer
71+
# as the marker payload when a resume's `from` is older than the
72+
# oldest buffered seq.
73+
dropped_count: int = 0
74+
# Set True once the generator task finishes (verdict reached, error,
75+
# or session_cancelled). Consumers exit their wait loop on this.
76+
generator_done: bool = False
77+
# The detached generator task. Created on the first /troubleshoot
78+
# POST for this session; subsequent /resume calls reattach to the
79+
# same task's output (via the buffer). NEVER awaited from the SSE
80+
# handler — that's the whole point of the resume feature (SSE
81+
# consumer disconnect must NOT cancel the generator).
82+
generator_task: object | None = None
83+
# Last-wins consumer policy: each new SSE consumer increments this
84+
# token; consumers loop while their captured token equals the
85+
# current value, exit when a newer consumer has taken over. Avoids
86+
# multiple SSE streams writing duplicate frames to the network.
87+
consumer_generation: int = 0
88+
# Producer/consumer wake — asyncio.Condition (not Event) per
89+
# advisor input: Event has the "lost wake between clear and wait"
90+
# race. Condition's notify_all + acquire pattern is race-free for
91+
# multiple consumers, and the migration from single-consumer to
92+
# last-wins doesn't require touching wake logic.
93+
cond: asyncio.Condition = field(default_factory=asyncio.Condition)
94+
95+
async def append_event(self, event: dict) -> None:
96+
"""Append an event to the buffer + wake any waiting consumer.
97+
Drops the oldest event on overflow (incrementing dropped_count
98+
so the next resume can synthesize a truncation marker)."""
99+
seq = self.next_seq
100+
self.next_seq += 1
101+
self.event_buffer.append((seq, event))
102+
if len(self.event_buffer) > DEFAULT_BUFFER_CAP:
103+
self.event_buffer.pop(0)
104+
self.dropped_count += 1
105+
async with self.cond:
106+
self.cond.notify_all()
107+
108+
async def mark_done(self) -> None:
109+
"""Mark the generator as finished + wake all consumers so they
110+
exit their stream loop. Idempotent — safe to call from the
111+
generator wrapper's try/except/finally branches."""
112+
self.generator_done = True
113+
async with self.cond:
114+
self.cond.notify_all()
55115

56116

57117
class SessionManager:

tests/test_c6_routes.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ def client_with_tmp_feedback_log(client, tmp_path):
2323

2424
def _open_session(client) -> str:
2525
r = client.post("/troubleshoot", json={"prompt": "just diagnose"})
26-
first = r.text.split("\n\n")[0]
27-
return json.loads(first[len("data: "):])["session_id"]
26+
first_block = r.text.split("\n\n")[0]
27+
# 2026-05-28 resume support: SSE events now have `id:` then `data:`.
28+
data_line = next(L for L in first_block.split("\n") if L.startswith("data: "))
29+
return json.loads(data_line[len("data: "):])["session_id"]
2830

2931

3032
def test_feedback_happy_path_writes_log_line(client_with_tmp_feedback_log):

0 commit comments

Comments
 (0)