Skip to content

Commit ebbc114

Browse files
GWealecopybara-github
authored andcommitted
fix: Validate session before streaming instead of eagerly advancing the runner generator
Co-authored-by: George Weale <gweale@google.com> PiperOrigin-RevId: 875892569
1 parent d55afed commit ebbc114

File tree

1 file changed

+28
-35
lines changed

1 file changed

+28
-35
lines changed

src/google/adk/cli/adk_web_server.py

Lines changed: 28 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1668,46 +1668,39 @@ async def run_agent(req: RunAgentRequest) -> list[Event]:
16681668
async def run_agent_sse(req: RunAgentRequest) -> StreamingResponse:
16691669
stream_mode = StreamingMode.SSE if req.streaming else StreamingMode.NONE
16701670
runner = await self.get_runner_async(req.app_name)
1671-
agen = runner.run_async(
1672-
user_id=req.user_id,
1673-
session_id=req.session_id,
1674-
new_message=req.new_message,
1675-
state_delta=req.state_delta,
1676-
run_config=RunConfig(streaming_mode=stream_mode),
1677-
invocation_id=req.invocation_id,
1678-
)
16791671

1680-
# Eagerly advance the generator to trigger session validation
1681-
# before the streaming response is created. This lets us return
1682-
# a proper HTTP 404 for missing sessions without a redundant
1683-
# get_session call — the Runner's single _get_or_create_session
1684-
# call is the only one that runs.
1685-
first_event = None
1686-
first_error = None
1687-
try:
1688-
first_event = await anext(agen)
1689-
except SessionNotFoundError as e:
1690-
await agen.aclose()
1691-
raise HTTPException(status_code=404, detail=str(e)) from e
1692-
except StopAsyncIteration:
1693-
await agen.aclose()
1694-
except Exception as e:
1695-
first_error = e
1672+
# Validate session existence before starting the stream.
1673+
# We check directly here instead of eagerly advancing the
1674+
# runner's async generator with anext(), because splitting
1675+
# generator consumption across two asyncio Tasks (request
1676+
# handler vs StreamingResponse) breaks OpenTelemetry context
1677+
# detachment.
1678+
if not runner.auto_create_session:
1679+
session = await self.session_service.get_session(
1680+
app_name=req.app_name,
1681+
user_id=req.user_id,
1682+
session_id=req.session_id,
1683+
)
1684+
if not session:
1685+
raise HTTPException(
1686+
status_code=404,
1687+
detail=f"Session not found: {req.session_id}",
1688+
)
16961689

16971690
# Convert the events to properly formatted SSE
16981691
async def event_generator():
1699-
async with Aclosing(agen):
1692+
async with Aclosing(
1693+
runner.run_async(
1694+
user_id=req.user_id,
1695+
session_id=req.session_id,
1696+
new_message=req.new_message,
1697+
state_delta=req.state_delta,
1698+
run_config=RunConfig(streaming_mode=stream_mode),
1699+
invocation_id=req.invocation_id,
1700+
)
1701+
) as agen:
17001702
try:
1701-
if first_error:
1702-
raise first_error
1703-
1704-
async def all_events():
1705-
if first_event is not None:
1706-
yield first_event
1707-
async for event in agen:
1708-
yield event
1709-
1710-
async for event in all_events():
1703+
async for event in agen:
17111704
# ADK Web renders artifacts from `actions.artifactDelta`
17121705
# during part processing *and* during action processing
17131706
# 1) the original event with `artifactDelta` cleared (content)

0 commit comments

Comments
 (0)