Skip to content

Commit 14683da

Browse files
author
jobell
committed
improve session resilience
Signed-off-by: Josh Bell <joshadambell@me.com>
1 parent fcad888 commit 14683da

File tree

8 files changed

+696
-152
lines changed

8 files changed

+696
-152
lines changed

python/packages/kagent-adk/src/kagent/adk/_session_service.py

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ async def get_session(
114114
session = Session(
115115
id=session_data["id"],
116116
user_id=session_data["user_id"],
117-
events=[],
117+
events=events,
118118
app_name=app_name,
119119
state={},
120120
)
@@ -157,6 +157,94 @@ async def delete_session(self, *, app_name: str, user_id: str, session_id: str)
157157
)
158158
response.raise_for_status()
159159

160+
async def _recreate_session(self, session: Session) -> None:
161+
"""Recreate a session that was not found (404).
162+
163+
This handles the case where a session expires or is cleaned up
164+
during a long-running operation.
165+
166+
Session State Preservation:
167+
- session_id: Preserved (same ID used for recreation)
168+
- user_id: Preserved
169+
- agent_ref: Preserved
170+
- session_name: Preserved (from session.state["session_name"])
171+
- Other session.state fields: NOT preserved (lost on recreation)
172+
173+
Note: Only session_name is currently used by the application.
174+
If additional state fields are added in the future, they must be
175+
explicitly preserved here.
176+
177+
Args:
178+
session: The session object to recreate
179+
180+
Raises:
181+
httpx.HTTPStatusError: If recreation fails
182+
"""
183+
request_data = {
184+
"id": session.id,
185+
"user_id": session.user_id,
186+
"agent_ref": session.app_name,
187+
}
188+
if session.state and session.state.get("session_name"):
189+
request_data["name"] = session.state["session_name"]
190+
191+
# Warn if session has additional state fields that won't be preserved
192+
if session.state and len(session.state) > 1:
193+
extra_fields = [k for k in session.state.keys() if k != "session_name"]
194+
logger.warning(
195+
"Session %s has additional state fields that will not be preserved during recreation: %s. "
196+
"Update _recreate_session() if these fields are critical.",
197+
session.id,
198+
extra_fields,
199+
)
200+
201+
response = await self.client.post(
202+
"/api/sessions",
203+
json=request_data,
204+
headers={"X-User-ID": session.user_id},
205+
)
206+
if response.status_code == 409:
207+
# Session was already recreated by a concurrent call — treat as success.
208+
logger.info(
209+
"Session %s already exists (409 Conflict) during recreation, "
210+
"likely recreated by a concurrent request. Proceeding with retry.",
211+
session.id,
212+
)
213+
else:
214+
response.raise_for_status()
215+
logger.info("Successfully recreated session %s", session.id)
216+
217+
# Fetch existing tasks for this session to check for in-flight work
218+
tasks_response = await self.client.get(
219+
f"/api/sessions/{session.id}/tasks?user_id={session.user_id}",
220+
headers={"X-User-ID": session.user_id},
221+
)
222+
if tasks_response.status_code == 200:
223+
tasks_data = tasks_response.json()
224+
if tasks_data.get("data"):
225+
logger.info(
226+
"Session %s has %d existing task(s) after recreation",
227+
session.id,
228+
len(tasks_data["data"]),
229+
)
230+
# Log info about in-flight tasks
231+
for task in tasks_data["data"]:
232+
task_status = task.get("status", {})
233+
task_state = task_status.get("state", "unknown")
234+
if task_state in ("working", "submitted"):
235+
logger.info(
236+
"Found in-flight task %s in state '%s' - UI should resubscribe to continue receiving updates",
237+
task.get("id"),
238+
task_state,
239+
)
240+
else:
241+
logger.warning(
242+
"Failed to fetch tasks for recreated session %s (HTTP %d). "
243+
"In-flight task detection unavailable - UI may not auto-reconnect to active tasks.",
244+
session.id,
245+
tasks_response.status_code,
246+
)
247+
160248
@override
161249
async def append_event(self, session: Session, event: Event) -> Event:
162250
if event.partial:
@@ -174,6 +262,24 @@ async def append_event(self, session: Session, event: Event) -> Event:
174262
json=event_data,
175263
headers={"X-User-ID": session.user_id},
176264
)
265+
266+
# Handle 404 by recreating session and retrying once
267+
if response.status_code == 404:
268+
logger.warning(
269+
"Session %s not found (404), attempting to recreate before retry",
270+
session.id,
271+
)
272+
await self._recreate_session(session)
273+
274+
# Retry the append ONCE. If this retry also fails (including another 404),
275+
# raise_for_status() below will propagate the error without further attempts.
276+
# This prevents infinite recursion while allowing recovery from transient deletion.
277+
response = await self.client.post(
278+
f"/api/sessions/{session.id}/events?user_id={session.user_id}",
279+
json=event_data,
280+
headers={"X-User-ID": session.user_id},
281+
)
282+
177283
response.raise_for_status()
178284

179285
# TODO: potentially pull and update the session from the server

0 commit comments

Comments
 (0)