Skip to content

Commit 5634235

Browse files
authored
Merge pull request #62 from usnavy13/session-fix
State size threshold for Redis with MinIO overflow
2 parents 5d30af2 + 24c4e14 commit 5634235

6 files changed

Lines changed: 152 additions & 19 deletions

File tree

src/api/files.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,18 +115,18 @@ async def upload_file(
115115
# Read file content
116116
content = await file.read()
117117

118-
# Store file directly
118+
# Sanitize filename to match what will be used in container
119+
sanitized_name = OutputProcessor.sanitize_filename(file.filename)
120+
121+
# Store with sanitized name so MinIO, sandbox, and cleanup all use the same name
119122
file_id = await file_service.store_uploaded_file(
120123
session_id=session_id,
121-
filename=file.filename,
124+
filename=sanitized_name,
122125
content=content,
123126
content_type=file.content_type,
124127
is_agent_file=is_agent_file,
125128
)
126129

127-
# Sanitize filename to match what will be used in container
128-
sanitized_name = OutputProcessor.sanitize_filename(file.filename)
129-
130130
uploaded_files.append(
131131
{
132132
"id": file_id,

src/config/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,12 @@ class Settings(BaseSettings):
201201
state_capture_on_error: bool = Field(
202202
default=False, description="Capture and persist state even when execution fails"
203203
)
204+
state_max_redis_size_mb: int = Field(
205+
default=100,
206+
ge=1,
207+
le=500,
208+
description="Max state size (MB, raw bytes) for Redis storage. Larger states go directly to MinIO",
209+
)
204210

205211
# State Archival Configuration - Hybrid Redis + MinIO storage
206212
state_archive_enabled: bool = Field(

src/core/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def _initialize(self) -> None:
4040
redis_url,
4141
max_connections=20, # Shared across all services
4242
decode_responses=True,
43-
socket_timeout=5.0,
43+
socket_timeout=30.0,
4444
socket_connect_timeout=5.0,
4545
retry_on_timeout=True,
4646
)

src/services/orchestrator.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -520,11 +520,51 @@ async def _save_state(self, ctx: ExecutionContext) -> None:
520520

521521
if ctx.new_state:
522522
try:
523-
success, state_hash = await self.state_service.save_state(
524-
ctx.session_id,
525-
ctx.new_state,
526-
ttl_seconds=settings.state_ttl_seconds,
527-
)
523+
import base64
524+
525+
raw_size = len(base64.b64decode(ctx.new_state))
526+
max_redis_bytes = settings.state_max_redis_size_mb * 1024 * 1024
527+
528+
if raw_size > max_redis_bytes:
529+
# Large state: store blob in MinIO, pointer in Redis
530+
logger.info(
531+
"State exceeds Redis threshold, storing in MinIO",
532+
session_id=ctx.session_id[:12],
533+
state_size_mb=round(raw_size / 1024 / 1024, 1),
534+
threshold_mb=settings.state_max_redis_size_mb,
535+
)
536+
archived = False
537+
if self.state_archival_service:
538+
archived = await self.state_archival_service.archive_state(
539+
ctx.session_id, ctx.new_state
540+
)
541+
if archived:
542+
success, state_hash = (
543+
await self.state_service.save_state_pointer(
544+
ctx.session_id,
545+
ctx.new_state,
546+
ttl_seconds=settings.state_ttl_seconds,
547+
)
548+
)
549+
else:
550+
# MinIO failed, fall back to Redis anyway
551+
logger.warning(
552+
"MinIO archival failed, falling back to Redis",
553+
session_id=ctx.session_id[:12],
554+
)
555+
success, state_hash = await self.state_service.save_state(
556+
ctx.session_id,
557+
ctx.new_state,
558+
ttl_seconds=settings.state_ttl_seconds,
559+
)
560+
else:
561+
# Normal path: store in Redis
562+
success, state_hash = await self.state_service.save_state(
563+
ctx.session_id,
564+
ctx.new_state,
565+
ttl_seconds=settings.state_ttl_seconds,
566+
)
567+
528568
if success:
529569
ctx.new_state_hash = state_hash
530570

src/services/state.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,71 @@ async def save_state(
180180
)
181181
return False, None
182182

183+
async def save_state_pointer(
184+
self,
185+
session_id: str,
186+
state_b64: str,
187+
ttl_seconds: Optional[int] = None,
188+
) -> Tuple[bool, Optional[str]]:
189+
"""Save only hash and metadata to Redis (state blob stored in MinIO).
190+
191+
Used when state exceeds the Redis size threshold. The full state
192+
is stored in MinIO; Redis only holds the hash and metadata for
193+
fast lookups. The orchestrator's _load_state MinIO fallback
194+
handles retrieval.
195+
196+
Args:
197+
session_id: Session identifier
198+
state_b64: Base64-encoded state (used to compute hash/size, not stored in Redis)
199+
ttl_seconds: TTL in seconds (default from settings)
200+
201+
Returns:
202+
Tuple of (success: bool, state_hash: Optional[str])
203+
"""
204+
if not state_b64:
205+
return True, None
206+
207+
if ttl_seconds is None:
208+
ttl_seconds = settings.state_ttl_seconds
209+
210+
try:
211+
raw_bytes = base64.b64decode(state_b64)
212+
state_hash = self.compute_hash(raw_bytes)
213+
now = datetime.now(timezone.utc)
214+
215+
pipe = self.redis.pipeline(transaction=True)
216+
217+
# Save hash (small — just the SHA256 string)
218+
pipe.setex(self._hash_key(session_id), ttl_seconds, state_hash)
219+
220+
# Save metadata with storage location marker
221+
meta = json.dumps(
222+
{
223+
"size_bytes": len(raw_bytes),
224+
"hash": state_hash,
225+
"created_at": now.isoformat(),
226+
"storage": "minio",
227+
}
228+
)
229+
pipe.setex(self._meta_key(session_id), ttl_seconds, meta)
230+
231+
await pipe.execute()
232+
233+
logger.info(
234+
"Saved state pointer to Redis (blob in MinIO)",
235+
session_id=session_id[:12],
236+
state_size=len(raw_bytes),
237+
hash=state_hash[:12],
238+
)
239+
return True, state_hash
240+
except Exception as e:
241+
logger.error(
242+
"Failed to save state pointer",
243+
session_id=session_id[:12],
244+
error=str(e),
245+
)
246+
return False, None
247+
183248
async def delete_state(self, session_id: str) -> bool:
184249
"""Delete state for a session.
185250

src/services/state_archival.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,26 @@ async def restore_state(self, session_id: str) -> Optional[str]:
187187

188188
state_data = state_bytes.decode("utf-8")
189189

190-
# Restore to Redis for fast access
191-
await self.state_service.save_state(
192-
session_id, state_data, ttl_seconds=settings.state_ttl_seconds
193-
)
190+
# Only restore to Redis if under the size threshold
191+
import base64 as _b64
192+
193+
raw_size = len(_b64.b64decode(state_data))
194+
max_redis_bytes = settings.state_max_redis_size_mb * 1024 * 1024
195+
196+
if raw_size <= max_redis_bytes:
197+
await self.state_service.save_state(
198+
session_id, state_data, ttl_seconds=settings.state_ttl_seconds
199+
)
200+
else:
201+
# Too large for Redis — save only pointer
202+
await self.state_service.save_state_pointer(
203+
session_id, state_data, ttl_seconds=settings.state_ttl_seconds
204+
)
205+
logger.info(
206+
"State too large for Redis, kept in MinIO only",
207+
session_id=session_id[:12],
208+
state_size_mb=round(raw_size / 1024 / 1024, 1),
209+
)
194210

195211
logger.info(
196212
"Restored state from MinIO",
@@ -504,10 +520,16 @@ async def restore_state_by_hash(self, state_hash: str) -> Optional[str]:
504520

505521
state_data = state_bytes.decode("utf-8")
506522

507-
# Restore to Redis for fast access
508-
await self.state_service.save_state_by_hash(
509-
state_hash, state_data, ttl_seconds=settings.state_ttl_seconds
510-
)
523+
# Only restore to Redis if under the size threshold
524+
import base64 as _b64
525+
526+
raw_size = len(_b64.b64decode(state_data))
527+
max_redis_bytes = settings.state_max_redis_size_mb * 1024 * 1024
528+
529+
if raw_size <= max_redis_bytes:
530+
await self.state_service.save_state_by_hash(
531+
state_hash, state_data, ttl_seconds=settings.state_ttl_seconds
532+
)
511533

512534
logger.debug(
513535
"Restored state by hash from MinIO",

0 commit comments

Comments
 (0)