forked from openai/openai-agents-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_session.py
More file actions
135 lines (115 loc) · 5.08 KB
/
file_session.py
File metadata and controls
135 lines (115 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
Simple file-backed session implementation for examples.
Persists conversation history as JSON on disk so runs can resume across processes.
"""
from __future__ import annotations
import asyncio
import json
from datetime import datetime
from pathlib import Path
from typing import Any
from uuid import uuid4
from agents.memory.session import Session
from agents.memory.session_settings import SessionSettings
from agents.run_context import RunContextWrapper
class FileSession(Session):
"""Persist session items to a JSON file on disk."""
session_settings: SessionSettings | None = None
def __init__(self, *, dir: str | Path | None = None, session_id: str | None = None) -> None:
self._dir = Path(dir) if dir is not None else Path.cwd() / ".agents-sessions"
self.session_id = session_id or ""
# Ensure the directory exists up front so subsequent file operations do not race.
self._dir.mkdir(parents=True, exist_ok=True)
async def _ensure_session_id(self) -> str:
if not self.session_id:
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
# Prefix with wall-clock time so recent sessions are easy to spot on disk.
self.session_id = f"{timestamp}-{uuid4().hex[:12]}"
await asyncio.to_thread(self._dir.mkdir, parents=True, exist_ok=True)
file_path = self._items_path(self.session_id)
if not file_path.exists():
await asyncio.to_thread(file_path.write_text, "[]", encoding="utf-8")
return self.session_id
async def get_session_id(self) -> str:
"""Return the session id, creating one if needed."""
return await self._ensure_session_id()
async def get_items(
self,
limit: int | None = None,
*,
wrapper: RunContextWrapper[Any] | None = None,
) -> list[Any]:
session_id = await self._ensure_session_id()
items = await self._read_items(session_id)
if limit is not None and limit >= 0:
return items[-limit:]
return items
async def add_items(
self,
items: list[Any],
*,
wrapper: RunContextWrapper[Any] | None = None,
) -> None:
if not items:
return
session_id = await self._ensure_session_id()
current = await self._read_items(session_id)
# Deep-copy via JSON to avoid persisting live references that might mutate later.
cloned = json.loads(json.dumps(items))
await self._write_items(session_id, current + cloned)
async def pop_item(self) -> Any | None:
session_id = await self._ensure_session_id()
items = await self._read_items(session_id)
if not items:
return None
popped = items.pop()
await self._write_items(session_id, items)
return popped
async def clear_session(self) -> None:
if not self.session_id:
return
file_path = self._items_path(self.session_id)
state_path = self._state_path(self.session_id)
try:
await asyncio.to_thread(file_path.unlink)
except FileNotFoundError:
pass
try:
await asyncio.to_thread(state_path.unlink)
except FileNotFoundError:
pass
self.session_id = ""
def _items_path(self, session_id: str) -> Path:
return self._dir / f"{session_id}.json"
def _state_path(self, session_id: str) -> Path:
return self._dir / f"{session_id}-state.json"
async def _read_items(self, session_id: str) -> list[Any]:
file_path = self._items_path(session_id)
try:
data = await asyncio.to_thread(file_path.read_text, "utf-8")
parsed = json.loads(data)
return parsed if isinstance(parsed, list) else []
except FileNotFoundError:
return []
async def _write_items(self, session_id: str, items: list[Any]) -> None:
file_path = self._items_path(session_id)
payload = json.dumps(items, indent=2, ensure_ascii=False)
await asyncio.to_thread(self._dir.mkdir, parents=True, exist_ok=True)
await asyncio.to_thread(file_path.write_text, payload, encoding="utf-8")
async def load_state_json(self) -> dict[str, Any] | None:
"""Load a previously saved RunState JSON payload, if present."""
session_id = await self._ensure_session_id()
state_path = self._state_path(session_id)
try:
data = await asyncio.to_thread(state_path.read_text, "utf-8")
parsed = json.loads(data)
return parsed if isinstance(parsed, dict) else None
except FileNotFoundError:
return None
async def save_state_json(self, state: dict[str, Any]) -> None:
"""Persist the serialized RunState JSON payload alongside session items."""
session_id = await self._ensure_session_id()
state_path = self._state_path(session_id)
payload = json.dumps(state, indent=2, ensure_ascii=False)
await asyncio.to_thread(self._dir.mkdir, parents=True, exist_ok=True)
await asyncio.to_thread(state_path.write_text, payload, encoding="utf-8")