From c13b9a748958ec69e4f6f7538839dd132b35b174 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Sun, 12 Apr 2026 09:36:32 +0000 Subject: [PATCH 1/2] fix: address critical concurrency, memory, and resource lifecycle gaps - Fix DualLock async lock creation race condition with thread lock protection - Implement double-checked locking for Process state lock initialization - Add atomic session state operations with RLock protection - Make retry counter increments atomic to prevent lost updates - Preserve injection context in tool execution with contextvars.copy_context() - Remove dual persistence in memory storage (only fallback when primary fails) - Enforce checkpoint limits with proper pruning logic - Add workflow cancellation flag that propagates through execution - Improve agent cleanup with LLM client connection closing These fixes address the 3 critical architecture gaps identified in issue #1365: Gap 1: Concurrency & Async Safety - multiple race conditions fixed Gap 2: Unbounded Memory Growth - dual storage and checkpoint limits fixed Gap 3: Resource Lifecycle - timeout cancellation and cleanup improved Co-authored-by: MervinPraison --- .../praisonaiagents/agent/agent.py | 9 ++++++ .../praisonaiagents/agent/async_safety.py | 20 +++++++----- .../praisonaiagents/agent/tool_execution.py | 32 ++++++++++++------- .../praisonaiagents/checkpoints/service.py | 12 +++++-- .../praisonaiagents/memory/core.py | 12 +++---- .../praisonaiagents/process/process.py | 25 +++++++++++++-- .../praisonaiagents/session.py | 23 ++++++++----- 7 files changed, 93 insertions(+), 40 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index 45b193d7b..6ac738892 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -4501,6 +4501,15 @@ def close(self) -> None: except Exception as e: logger.warning(f"Memory cleanup failed: {e}") + # LLM client cleanup + try: + if hasattr(self, 'llm') and self.llm: + llm_client = getattr(self.llm, '_client', None) + if llm_client and hasattr(llm_client, 'close'): + llm_client.close() + except Exception as e: + logger.warning(f"LLM client cleanup failed: {e}") + # MCP cleanup try: if hasattr(self, '_mcp_clients') and self._mcp_clients: diff --git a/src/praisonai-agents/praisonaiagents/agent/async_safety.py b/src/praisonai-agents/praisonaiagents/agent/async_safety.py index 3336595df..e575657b0 100644 --- a/src/praisonai-agents/praisonaiagents/agent/async_safety.py +++ b/src/praisonai-agents/praisonaiagents/agent/async_safety.py @@ -45,16 +45,20 @@ def _get_async_lock(self) -> asyncio.Lock: current_loop = asyncio.get_running_loop() current_loop_id = id(current_loop) - # Create new lock if loop changed or first time - if self._loop_id != current_loop_id: - self._async_lock = asyncio.Lock() - self._loop_id = current_loop_id - - return self._async_lock + # Atomic check and create: use thread lock to protect async lock creation + with self._thread_lock: + # Create new lock if loop changed or first time + if self._loop_id != current_loop_id: + self._async_lock = asyncio.Lock() + self._loop_id = current_loop_id + + return self._async_lock except RuntimeError: # No event loop running, fall back to thread lock in a new loop - self._async_lock = asyncio.Lock() - return self._async_lock + with self._thread_lock: + if self._async_lock is None: + self._async_lock = asyncio.Lock() + return self._async_lock @contextmanager def sync(self): diff --git a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py index e437eb447..83f8a71cb 100644 --- a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py +++ b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py @@ -190,18 +190,26 @@ def _execute_tool_with_context(self, function_name, arguments, state, tool_call_ if res.output and res.output.modified_data: arguments.update(res.output.modified_data) - with with_injection_context(state): - # P8/G11: Apply tool timeout if configured - tool_timeout = getattr(self, '_tool_timeout', None) - if tool_timeout and tool_timeout > 0: - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(self._execute_tool_impl, function_name, arguments) - try: - result = future.result(timeout=tool_timeout) - except concurrent.futures.TimeoutError: - logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") - result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} - else: + # P8/G11: Apply tool timeout if configured + tool_timeout = getattr(self, '_tool_timeout', None) + if tool_timeout and tool_timeout > 0: + # Use copy_context to preserve injection context in executor thread + import contextvars + ctx = contextvars.copy_context() + + def execute_with_context(): + with with_injection_context(state): + return self._execute_tool_impl(function_name, arguments) + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(ctx.run, execute_with_context) + try: + result = future.result(timeout=tool_timeout) + except concurrent.futures.TimeoutError: + logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") + result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} + else: + with with_injection_context(state): result = self._execute_tool_impl(function_name, arguments) # Apply tool output truncation to prevent context overflow diff --git a/src/praisonai-agents/praisonaiagents/checkpoints/service.py b/src/praisonai-agents/praisonaiagents/checkpoints/service.py index 70c4c6dc0..573bd5816 100644 --- a/src/praisonai-agents/praisonaiagents/checkpoints/service.py +++ b/src/praisonai-agents/praisonaiagents/checkpoints/service.py @@ -485,9 +485,17 @@ async def _prune_checkpoints(self): if len(self._checkpoints) <= self.config.max_checkpoints: return - # Keep only the most recent checkpoints - # Note: This doesn't actually delete git history, just our tracking + # Calculate how many to remove + num_to_remove = len(self._checkpoints) - self.config.max_checkpoints + checkpoints_to_remove = self._checkpoints[-num_to_remove:] # Remove oldest ones + + # Keep only the most recent checkpoints in memory self._checkpoints = self._checkpoints[:self.config.max_checkpoints] + + logger.info(f"Pruned {num_to_remove} old checkpoints to stay under limit of {self.config.max_checkpoints}") + + # Emit pruning event for any cleanup hooks + self._emit(CheckpointEvent.ERROR, {"action": "pruned", "removed_count": num_to_remove}) async def get_checkpoint(self, checkpoint_id: str) -> Optional[Checkpoint]: """Get a specific checkpoint by ID.""" diff --git a/src/praisonai-agents/praisonaiagents/memory/core.py b/src/praisonai-agents/praisonaiagents/memory/core.py index 4be7e391c..85278af56 100644 --- a/src/praisonai-agents/praisonaiagents/memory/core.py +++ b/src/praisonai-agents/praisonaiagents/memory/core.py @@ -62,16 +62,14 @@ def store_short_term(self, content: str, metadata: Optional[Dict] = None, qualit except Exception as e: self._log_verbose(f"Failed to store in {self.provider} STM: {e}", logging.WARNING) - # Backward compatibility: Also store in SQLite if not using SQLite adapter - if hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None): + # Only use SQLite fallback if primary storage failed completely + if not memory_id and hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None): try: - fallback_id = self._sqlite_adapter.store_short_term(content, metadata=clean_metadata, **kwargs) - if not memory_id: - memory_id = fallback_id + memory_id = self._sqlite_adapter.store_short_term(content, metadata=clean_metadata, **kwargs) + self._log_verbose(f"Stored in SQLite STM as fallback: {content[:100]}...") except Exception as e: logging.error(f"Failed to store in SQLite STM fallback: {e}") - if not memory_id: - return "" + return "" # Auto-promote to long-term memory if quality is high if auto_promote and quality_score >= 7.5: # High quality threshold diff --git a/src/praisonai-agents/praisonaiagents/process/process.py b/src/praisonai-agents/praisonaiagents/process/process.py index 9036d20a5..bfbfdebbf 100644 --- a/src/praisonai-agents/praisonaiagents/process/process.py +++ b/src/praisonai-agents/praisonaiagents/process/process.py @@ -45,6 +45,8 @@ def __init__( self.workflow_timeout = workflow_timeout self.task_retry_counter: Dict[str, int] = {} # Initialize retry counter self.workflow_finished = False # ADDED: Workflow finished flag + self.workflow_cancelled = False # ADDED: Workflow cancellation flag for timeout + self._state_lock_init = threading.Lock() # Thread lock for async lock creation self._state_lock = None # Lazy-initialized async lock for shared state protection # Resolve verbose from output= param (takes precedence) or legacy verbose= param @@ -255,7 +257,10 @@ def _find_next_not_started_task(self) -> Optional[Task]: continue # Skip if no valid path exists if self.task_retry_counter.get(task_candidate.id, 0) < self.max_retries: - self.task_retry_counter[task_candidate.id] = self.task_retry_counter.get(task_candidate.id, 0) + 1 + # Atomic increment using thread lock to prevent race conditions + with self._state_lock_init: + current_count = self.task_retry_counter.get(task_candidate.id, 0) + self.task_retry_counter[task_candidate.id] = current_count + 1 temp_current_task = task_candidate logging.debug(f"Fallback attempt {fallback_attempts}: Found 'not started' task: {temp_current_task.name}, retry count: {self.task_retry_counter[temp_current_task.id]}") return temp_current_task # Return the found task immediately @@ -429,13 +434,19 @@ async def aworkflow(self) -> AsyncGenerator[str, None]: if self.workflow_timeout is not None: elapsed = time.monotonic() - workflow_start if elapsed > self.workflow_timeout: - logging.warning(f"Workflow timeout ({self.workflow_timeout}s) exceeded after {elapsed:.1f}s, stopping.") + logging.warning(f"Workflow timeout ({self.workflow_timeout}s) exceeded after {elapsed:.1f}s, cancelling workflow.") + self.workflow_cancelled = True break # ADDED: Check workflow finished flag at the start of each cycle if self.workflow_finished: logging.info("Workflow finished early as all tasks are completed.") break + + # ADDED: Check workflow cancellation flag + if self.workflow_cancelled: + logging.warning("Workflow has been cancelled, stopping task execution.") + break # Add task summary at start of each cycle logging.debug(f""" @@ -597,8 +608,11 @@ async def aworkflow(self) -> AsyncGenerator[str, None]: break # Reset completed task to "not started" so it can run again (atomic operation) + # Atomic state lock initialization if self._state_lock is None: - self._state_lock = asyncio.Lock() + with self._state_lock_init: + if self._state_lock is None: # Double-checked locking pattern + self._state_lock = asyncio.Lock() async with self._state_lock: if self.tasks[task_id].status == "completed": # Never reset loop tasks, decision tasks, or their subtasks if rerun is False @@ -1031,6 +1045,11 @@ def workflow(self): if self.workflow_finished: logging.info("Workflow finished early as all tasks are completed.") break + + # ADDED: Check workflow cancellation flag + if self.workflow_cancelled: + logging.warning("Workflow has been cancelled, stopping task execution.") + break # Add task summary at start of each cycle logging.debug(f""" diff --git a/src/praisonai-agents/praisonaiagents/session.py b/src/praisonai-agents/praisonaiagents/session.py index 244d6e73b..f5689c0e9 100644 --- a/src/praisonai-agents/praisonaiagents/session.py +++ b/src/praisonai-agents/praisonaiagents/session.py @@ -11,6 +11,7 @@ import requests import json import time +import threading from typing import Any, Dict, List, Optional, TYPE_CHECKING if TYPE_CHECKING: @@ -69,6 +70,7 @@ def __init__( self.agent_url = agent_url self.timeout = timeout self.is_remote = agent_url is not None + self._state_lock = threading.RLock() # Reentrant lock for thread-safe state operations # Validate agent_url format if self.is_remote: @@ -384,19 +386,24 @@ def _save_agent_chat_histories(self) -> None: def get_state(self, key: str, default: Any = None) -> Any: """Get a specific state value""" - state = self.restore_state() - return state.get(key, default) + with self._state_lock: + state = self.restore_state() + return state.get(key, default) def set_state(self, key: str, value: Any) -> None: """Set a specific state value""" - current_state = self.restore_state() - current_state[key] = value - self.save_state(current_state) + with self._state_lock: + current_state = self.restore_state() + current_state[key] = value + self.save_state(current_state) def increment_state(self, key: str, increment: int = 1, default: int = 0) -> None: - """Increment a numeric state value""" - current_value = self.get_state(key, default) - self.set_state(key, current_value + increment) + """Increment a numeric state value atomically""" + with self._state_lock: + current_state = self.restore_state() + current_value = current_state.get(key, default) + current_state[key] = current_value + increment + self.save_state(current_state) def add_memory(self, text: str, memory_type: str = "long", **metadata) -> None: """ From 1277a7a08b5bc4e2c1b8dfb127f6cf0bbf8b64d5 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Tue, 14 Apr 2026 16:06:48 +0000 Subject: [PATCH 2/2] fix: resolve critical architecture issues identified in code reviews - Fix checkpoint pruning logic reversal (use newest-last semantics) - Add CHECKPOINTS_PRUNED event type to replace ERROR for normal operations - Fix ThreadPoolExecutor timeout bypass with explicit executor lifecycle - Unify AsyncSafeState to use single thread lock across sync/async contexts - Fix agent cleanup to target actual live clients (llm_instance, openai_client) - Sync memory fallback logic across async/structured STM entry points - Move contextvars import to module level for better performance Addresses critical concurrency, security, and data integrity issues. Co-authored-by: Mervin Praison --- .../praisonaiagents/agent/agent.py | 28 +++++++- .../praisonaiagents/agent/async_safety.py | 41 +++--------- .../praisonaiagents/agent/tool_execution.py | 16 ++++- .../praisonaiagents/checkpoints/service.py | 8 +-- .../praisonaiagents/checkpoints/types.py | 1 + .../praisonaiagents/memory/core.py | 67 +++++++++++-------- 6 files changed, 95 insertions(+), 66 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index 6ac738892..3fd5fc351 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -4501,9 +4501,33 @@ def close(self) -> None: except Exception as e: logger.warning(f"Memory cleanup failed: {e}") - # LLM client cleanup + # LLM client cleanup - target actual live clients, not model strings try: - if hasattr(self, 'llm') and self.llm: + # Primary cleanup targets - actual live clients + if hasattr(self, 'llm_instance') and self.llm_instance: + if hasattr(self.llm_instance, 'aclose'): + # Try async close first + try: + import asyncio + if asyncio.iscoroutinefunction(self.llm_instance.aclose): + # We're in sync context, so use asyncio.run() for the cleanup + asyncio.run(self.llm_instance.aclose()) + else: + self.llm_instance.aclose() + except Exception: + # Fall back to sync close if async fails + if hasattr(self.llm_instance, 'close'): + self.llm_instance.close() + elif hasattr(self.llm_instance, 'close'): + self.llm_instance.close() + + # Check for OpenAI client (common pattern in agents) + if hasattr(self, '_Agent__openai_client') and self._Agent__openai_client: + if hasattr(self._Agent__openai_client, 'close'): + self._Agent__openai_client.close() + + # Legacy fallback - check self.llm._client (but less likely to work) + if hasattr(self, 'llm') and self.llm and not isinstance(self.llm, str): llm_client = getattr(self.llm, '_client', None) if llm_client and hasattr(llm_client, 'close'): llm_client.close() diff --git a/src/praisonai-agents/praisonaiagents/agent/async_safety.py b/src/praisonai-agents/praisonaiagents/agent/async_safety.py index e575657b0..2cea20e86 100644 --- a/src/praisonai-agents/praisonaiagents/agent/async_safety.py +++ b/src/praisonai-agents/praisonaiagents/agent/async_safety.py @@ -35,30 +35,8 @@ class DualLock: """ def __init__(self): - self._thread_lock = threading.Lock() - self._async_lock: Optional[asyncio.Lock] = None - self._loop_id: Optional[int] = None - - def _get_async_lock(self) -> asyncio.Lock: - """Get or create asyncio.Lock for current event loop.""" - try: - current_loop = asyncio.get_running_loop() - current_loop_id = id(current_loop) - - # Atomic check and create: use thread lock to protect async lock creation - with self._thread_lock: - # Create new lock if loop changed or first time - if self._loop_id != current_loop_id: - self._async_lock = asyncio.Lock() - self._loop_id = current_loop_id - - return self._async_lock - except RuntimeError: - # No event loop running, fall back to thread lock in a new loop - with self._thread_lock: - if self._async_lock is None: - self._async_lock = asyncio.Lock() - return self._async_lock + """Initialize with unified thread-safe locking.""" + self._thread_lock = threading.Lock() # Single canonical lock for all contexts @contextmanager def sync(self): @@ -68,10 +46,13 @@ def sync(self): @asynccontextmanager async def async_lock(self): - """Acquire lock in asynchronous context using asyncio.Lock.""" - async_lock = self._get_async_lock() - async with async_lock: + """Acquire lock in asynchronous context using threading.Lock via asyncio.to_thread().""" + # Use asyncio.to_thread to acquire the thread lock without blocking the event loop + await asyncio.to_thread(self._thread_lock.acquire) + try: yield + finally: + self._thread_lock.release() def is_async_context(self) -> bool: """Check if we're currently in an async context.""" @@ -133,14 +114,12 @@ def __exit__(self, exc_type, exc_val, exc_tb): async def __aenter__(self): """Support for asynchronous context manager protocol.""" - async_lock = self._lock._get_async_lock() - await async_lock.acquire() + await asyncio.to_thread(self._lock._thread_lock.acquire) return self.value async def __aexit__(self, exc_type, exc_val, exc_tb): """Support for asynchronous context manager protocol.""" - async_lock = self._lock._get_async_lock() - async_lock.release() + self._lock._thread_lock.release() return None def get(self) -> Any: diff --git a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py index 83f8a71cb..e9f63df80 100644 --- a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py +++ b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py @@ -12,6 +12,7 @@ import logging import asyncio import inspect +import contextvars import concurrent.futures from typing import List, Optional, Any, Dict, Union, TYPE_CHECKING @@ -194,20 +195,31 @@ def _execute_tool_with_context(self, function_name, arguments, state, tool_call_ tool_timeout = getattr(self, '_tool_timeout', None) if tool_timeout and tool_timeout > 0: # Use copy_context to preserve injection context in executor thread - import contextvars ctx = contextvars.copy_context() def execute_with_context(): with with_injection_context(state): return self._execute_tool_impl(function_name, arguments) - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + # Use explicit executor lifecycle to actually bound execution time + executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + try: future = executor.submit(ctx.run, execute_with_context) try: result = future.result(timeout=tool_timeout) except concurrent.futures.TimeoutError: + # Cancel and shutdown immediately to avoid blocking + future.cancel() + executor.shutdown(wait=False, cancel_futures=True) logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} + else: + # Normal completion - shutdown gracefully + executor.shutdown(wait=False) + finally: + # Ensure executor is always cleaned up + if not executor._shutdown: + executor.shutdown(wait=False) else: with with_injection_context(state): result = self._execute_tool_impl(function_name, arguments) diff --git a/src/praisonai-agents/praisonaiagents/checkpoints/service.py b/src/praisonai-agents/praisonaiagents/checkpoints/service.py index 573bd5816..c6d47762a 100644 --- a/src/praisonai-agents/praisonaiagents/checkpoints/service.py +++ b/src/praisonai-agents/praisonaiagents/checkpoints/service.py @@ -487,15 +487,15 @@ async def _prune_checkpoints(self): # Calculate how many to remove num_to_remove = len(self._checkpoints) - self.config.max_checkpoints - checkpoints_to_remove = self._checkpoints[-num_to_remove:] # Remove oldest ones - # Keep only the most recent checkpoints in memory - self._checkpoints = self._checkpoints[:self.config.max_checkpoints] + # Keep only the most recent checkpoints in memory (newest-last semantics) + # Since save() appends (newest last), keep the last N entries + self._checkpoints = self._checkpoints[-self.config.max_checkpoints:] logger.info(f"Pruned {num_to_remove} old checkpoints to stay under limit of {self.config.max_checkpoints}") # Emit pruning event for any cleanup hooks - self._emit(CheckpointEvent.ERROR, {"action": "pruned", "removed_count": num_to_remove}) + self._emit(CheckpointEvent.CHECKPOINTS_PRUNED, {"action": "pruned", "removed_count": num_to_remove}) async def get_checkpoint(self, checkpoint_id: str) -> Optional[Checkpoint]: """Get a specific checkpoint by ID.""" diff --git a/src/praisonai-agents/praisonaiagents/checkpoints/types.py b/src/praisonai-agents/praisonaiagents/checkpoints/types.py index c65b2ed4f..f5e6ce14d 100644 --- a/src/praisonai-agents/praisonaiagents/checkpoints/types.py +++ b/src/praisonai-agents/praisonaiagents/checkpoints/types.py @@ -26,6 +26,7 @@ class CheckpointEvent(str, Enum): INITIALIZED = "initialized" CHECKPOINT_CREATED = "checkpoint_created" CHECKPOINT_RESTORED = "checkpoint_restored" + CHECKPOINTS_PRUNED = "checkpoints_pruned" ERROR = "error" diff --git a/src/praisonai-agents/praisonaiagents/memory/core.py b/src/praisonai-agents/praisonaiagents/memory/core.py index 85278af56..bfb1137e0 100644 --- a/src/praisonai-agents/praisonaiagents/memory/core.py +++ b/src/praisonai-agents/praisonaiagents/memory/core.py @@ -120,38 +120,39 @@ def store_short_term_structured(self, content: str, metadata: Optional[Dict] = N clean_metadata = self._sanitize_metadata(metadata) # Protocol-driven storage: Try primary adapter first + memory_id = "" primary_error = None - memory_id = None - try: if hasattr(self, 'memory_adapter') and self.memory_adapter: memory_id = self.memory_adapter.store_short_term(content, metadata=clean_metadata, **kwargs) self._log_verbose(f"Stored in {self.provider} STM via adapter: {content[:100]}...") - - # Auto-promote to long-term memory if quality is high - if auto_promote and quality_score >= 7.5: - try: - self.store_long_term(content, clean_metadata, quality_score, user_id, **kwargs) - self._log_verbose(f"Auto-promoted STM content to LTM (score: {quality_score:.2f})") - except Exception as e: - # Auto-promotion failure doesn't affect the primary storage result - logging.warning(f"Failed to auto-promote to LTM: {e}") - - # Emit memory event for successful storage - self._emit_memory_event("store", "short_term", content, clean_metadata) - - return MemoryResult.success_result( - memory_id=memory_id, - adapter_used=self.provider, - context={ - "quality_score": quality_score, - "auto_promoted": auto_promote and quality_score >= 7.5 - } - ) except Exception as e: primary_error = str(e) self._log_verbose(f"Failed to store in {self.provider} STM: {e}", logging.WARNING) + # Only proceed with success if we got a valid memory_id + if memory_id: + # Auto-promote to long-term memory if quality is high + if auto_promote and quality_score >= 7.5: + try: + self.store_long_term(content, clean_metadata, quality_score, user_id, **kwargs) + self._log_verbose(f"Auto-promoted STM content to LTM (score: {quality_score:.2f})") + except Exception as e: + # Auto-promotion failure doesn't affect the primary storage result + logging.warning(f"Failed to auto-promote to LTM: {e}") + + # Emit memory event for successful storage + self._emit_memory_event("store", "short_term", content, clean_metadata) + + return MemoryResult.success_result( + memory_id=memory_id, + adapter_used=self.provider, + context={ + "quality_score": quality_score, + "auto_promoted": auto_promote and quality_score >= 7.5 + } + ) + # Fallback to SQLite if available and different from primary adapter fallback_error = None if hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None): @@ -448,13 +449,25 @@ async def store_short_term_async(self, content: str, metadata: Optional[Dict] = raw_metadata["user_id"] = user_id clean_metadata = self._sanitize_metadata(raw_metadata) - # Store in SQLite STM + # Try primary adapter first (async version) memory_id = "" try: - memory_id = await asyncio.to_thread(self._store_sqlite_stm, content, clean_metadata, quality_score) + if hasattr(self, 'memory_adapter') and self.memory_adapter: + memory_id = await asyncio.to_thread( + self.memory_adapter.store_short_term, content, metadata=clean_metadata, **kwargs + ) + self._log_verbose(f"Stored in {self.provider} async STM via adapter: {content[:100]}...") except Exception as e: - logging.error(f"Failed to store in SQLite STM: {e}") - return "" + self._log_verbose(f"Failed to store in {self.provider} async STM: {e}", logging.WARNING) + + # Only use SQLite fallback if primary storage failed completely + if not memory_id and hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None): + try: + memory_id = await asyncio.to_thread(self._store_sqlite_stm, content, clean_metadata, quality_score) + self._log_verbose(f"Stored in SQLite async STM as fallback: {content[:100]}...") + except Exception as e: + logging.error(f"Failed to store in SQLite async STM fallback: {e}") + return "" # Auto-promote to long-term memory if quality is high (async) if auto_promote and quality_score >= 7.5: # High quality threshold