Skip to content

Commit 7bb5911

Browse files
fix: Wire CircuitBreaker and FailoverManager into execution paths (#1539)
* fix: Wire CircuitBreaker and FailoverManager into execution paths (fixes #1535) - Add FailoverManager integration to LLM._call_with_retry() and _call_with_retry_async() * New failover_manager parameter in LLM constructor * _switch_to_profile() method for switching auth profiles * Mark success/failure and automatic profile switching on retries - Add CircuitBreaker integration to tool execution in AgentToolExecution * _execute_tool_with_circuit_breaker() wrapper method * 5-failure threshold, 60s recovery timeout, graceful degradation * Per-tool circuit breakers prevent cascading failures Both components were fully implemented but never called from execution paths. Now users who configure them get actual resilience protection. Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com> * fix: Address critical bugs in CircuitBreaker and FailoverManager implementation - Fix FailoverManager initialization: properly initialize _current_profile - Fix CircuitBreaker failure detection: convert error dicts to exceptions - Fix profile switching: update in-flight request parameters in retry loop - Add proper typing with FailoverManagerProtocol instead of Any - Fix import handling to prevent NameError on CircuitBreakerException - Add agent/session context to circuit breaker error returns - Allow failover on non-retryable provider failures (bad credentials, etc) Addresses critical P1 issues identified by Greptile and CodeRabbit reviews. Makes FailoverManager and CircuitBreaker actually functional end-to-end. Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com> --------- Co-authored-by: praisonai-triage-agent[bot] <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com>
1 parent 4faf056 commit 7bb5911

2 files changed

Lines changed: 168 additions & 13 deletions

File tree

src/praisonai-agents/praisonaiagents/agent/tool_execution.py

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def _execute_tool_with_context(self, function_name, arguments, state, tool_call_
202202

203203
def execute_with_context():
204204
with with_injection_context(state):
205-
return self._execute_tool_impl(function_name, arguments)
205+
return self._execute_tool_with_circuit_breaker(function_name, arguments)
206206

207207
# Use reusable executor to prevent resource leaks
208208
if not hasattr(self, '_tool_executor'):
@@ -219,7 +219,7 @@ def execute_with_context():
219219
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
220220
else:
221221
with with_injection_context(state):
222-
result = self._execute_tool_impl(function_name, arguments)
222+
result = self._execute_tool_with_circuit_breaker(function_name, arguments)
223223

224224
# Apply tool output truncation to prevent context overflow
225225
# Uses context manager budget if enabled, otherwise applies default limit
@@ -594,6 +594,73 @@ async def _check_tool_approval_async(self, function_name, arguments):
594594
logging.info(f"Using modified arguments: {arguments}")
595595
return None, arguments
596596

597+
def _execute_tool_with_circuit_breaker(self, function_name, arguments):
598+
"""Execute tool with circuit breaker protection.
599+
600+
Args:
601+
function_name: Name of the tool to execute
602+
arguments: Arguments for the tool
603+
604+
Returns:
605+
Tool execution result or circuit breaker error
606+
"""
607+
# Import circuit breaker components first (lazy import for performance)
608+
try:
609+
from ..tools.circuit_breaker import get_circuit_breaker, CircuitBreakerConfig, CircuitBreakerException
610+
except ImportError:
611+
# Circuit breaker not available - fallback to direct execution
612+
logging.debug("Circuit breaker not available, falling back to direct tool execution")
613+
return self._execute_tool_impl(function_name, arguments)
614+
615+
try:
616+
617+
# Get or create circuit breaker for this tool
618+
breaker_name = f"tool_{function_name}"
619+
config = CircuitBreakerConfig(
620+
failure_threshold=5, # Open after 5 failures
621+
recovery_timeout=60.0, # Wait 60s before trying half-open
622+
timeout=30.0, # Tool call timeout
623+
graceful_degradation=True # Return error instead of raising exception
624+
)
625+
breaker = get_circuit_breaker(breaker_name, config)
626+
627+
# Execute tool through circuit breaker with failure detection wrapper
628+
def _tool_wrapper():
629+
result = self._execute_tool_impl(function_name, arguments)
630+
# Convert error dicts to exceptions so circuit breaker can detect failures
631+
# Don't treat approval/permission denials as circuit breaker failures
632+
if isinstance(result, dict) and result.get("error") and \
633+
not result.get("approval_denied") and \
634+
not result.get("permission_denied") and \
635+
not result.get("approval_error"):
636+
# Create a sentinel exception to register failure with circuit breaker
637+
class _ToolFailure(Exception):
638+
def __init__(self, error_dict):
639+
self.error_dict = error_dict
640+
super().__init__(error_dict.get("error", "Tool execution failed"))
641+
raise _ToolFailure(result)
642+
return result
643+
644+
try:
645+
return breaker.call(_tool_wrapper)
646+
except Exception as e:
647+
# Check if this is our sentinel exception
648+
if hasattr(e, 'error_dict'):
649+
return e.error_dict # Return the original error dict
650+
else:
651+
raise # Re-raise other exceptions
652+
653+
except CircuitBreakerException as e:
654+
# Circuit breaker is open - return error dict instead of raising
655+
logging.warning(f"Tool '{function_name}' circuit breaker open: {e}")
656+
return {
657+
"error": f"Tool '{function_name}' circuit breaker open - too many recent failures",
658+
"circuit_open": True,
659+
"agent_name": getattr(self, "name", None),
660+
"session_id": getattr(self, "_session_id", None),
661+
"remediation": "Wait for recovery_timeout (60s) or investigate recent tool failures.",
662+
}
663+
597664
def _execute_tool_impl(self, function_name, arguments):
598665
"""Internal tool execution implementation."""
599666

src/praisonai-agents/praisonaiagents/llm/llm.py

Lines changed: 99 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,17 @@
66
import inspect
77
import asyncio
88
from dataclasses import dataclass
9-
from typing import Any, Dict, List, Optional, Union, Literal, Callable, TYPE_CHECKING
9+
from typing import Any, Dict, List, Optional, Union, Literal, Callable, TYPE_CHECKING, Protocol
1010

1111
if TYPE_CHECKING:
1212
from rich.console import Console
1313
from rich.live import Live
14+
15+
class FailoverManagerProtocol(Protocol):
16+
"""Protocol for failover manager implementations."""
17+
def get_next_profile(self) -> Optional["AuthProfile"]: ...
18+
def mark_failure(self, profile: "AuthProfile", error: str, is_rate_limit: bool = False) -> None: ...
19+
def mark_success(self, profile: "AuthProfile") -> None: ...
1420
from pydantic import BaseModel
1521
import time
1622
import json
@@ -354,6 +360,7 @@ def __init__(
354360
web_fetch: Optional[Union[bool, Dict[str, Any]]] = None,
355361
prompt_caching: Optional[bool] = None,
356362
claude_memory: Optional[Union[bool, Any]] = None,
363+
failover_manager: Optional[FailoverManagerProtocol] = None,
357364
**extra_settings
358365
):
359366
# Configure logging only once at the class level
@@ -429,6 +436,14 @@ def __init__(
429436
self._rate_limiter = extra_settings.get('rate_limiter', None)
430437
self._max_retries = extra_settings.get('max_retries', 3)
431438
self._retry_delay = extra_settings.get('retry_delay', 60) # Default 60 seconds
439+
440+
# Failover management
441+
self._failover_manager = failover_manager
442+
self._current_profile = None # Track current auth profile for failover
443+
if self._failover_manager:
444+
self._current_profile = self._failover_manager.get_next_profile()
445+
if self._current_profile:
446+
self._switch_to_profile(self._current_profile)
432447

433448
# Cache for formatted tools and messages
434449
self._formatted_tools_cache = {}
@@ -685,8 +700,23 @@ def _classify_error_and_should_retry(self, error: Exception, attempt: int = 1) -
685700
delay = self._parse_retry_delay(str(error)) if is_rate_limit else 0.0
686701
return "rate_limit" if is_rate_limit else "unknown", is_rate_limit, delay
687702

703+
def _switch_to_profile(self, profile: "AuthProfile") -> None:
704+
"""Switch to a new auth profile for failover.
705+
706+
Args:
707+
profile: AuthProfile to switch to
708+
"""
709+
if profile.api_key:
710+
self.api_key = profile.api_key
711+
if profile.base_url:
712+
self.base_url = profile.base_url
713+
if profile.model and profile.model != self.model:
714+
# Only log if model actually changes
715+
logging.info(f"Failover: switching from {self.model} to {profile.model}")
716+
self.model = profile.model
717+
688718
def _call_with_retry(self, func, *args, **kwargs):
689-
"""Call a function with automatic retry on rate limit errors.
719+
"""Call a function with automatic retry on rate limit errors and failover support.
690720
691721
Args:
692722
func: The function to call (e.g., litellm.completion)
@@ -707,16 +737,45 @@ def _call_with_retry(self, func, *args, **kwargs):
707737
if self._rate_limiter is not None:
708738
self._rate_limiter.acquire()
709739

710-
return func(*args, **kwargs)
740+
result = func(*args, **kwargs)
741+
742+
# Mark success if failover is configured
743+
if self._failover_manager and self._current_profile:
744+
self._failover_manager.mark_success(self._current_profile)
745+
746+
return result
711747

712748
except Exception as e:
713749
category, can_retry, retry_delay = self._classify_error_and_should_retry(e, attempt + 1)
714-
if not can_retry:
715-
raise
716-
750+
717751
last_error = e
718752
error_str = str(e)
719753

754+
# Failover: mark failure and try next profile (do this before early exit)
755+
if self._failover_manager and self._current_profile:
756+
is_rate_limit = (category == "rate_limit")
757+
self._failover_manager.mark_failure(
758+
self._current_profile, error_str, is_rate_limit=is_rate_limit
759+
)
760+
next_profile = self._failover_manager.get_next_profile()
761+
if next_profile and next_profile != self._current_profile:
762+
self._switch_to_profile(next_profile)
763+
self._current_profile = next_profile
764+
# Update the kwargs with new profile values for the next retry
765+
if "api_key" in kwargs:
766+
kwargs["api_key"] = self.api_key
767+
if "base_url" in kwargs:
768+
kwargs["base_url"] = self.base_url
769+
if "model" in kwargs:
770+
kwargs["model"] = self.model
771+
# Enable retry for profile switch even if originally non-retryable
772+
can_retry = True
773+
retry_delay = 0.0
774+
logging.info(f"Failover: switched to profile '{next_profile.name}'")
775+
776+
if not can_retry:
777+
raise
778+
720779
if attempt < self._max_retries:
721780
logging.warning(
722781
f"{category} error hit (attempt {attempt + 1}/{self._max_retries + 1}), "
@@ -746,7 +805,7 @@ def _call_with_retry(self, func, *args, **kwargs):
746805
raise last_error
747806

748807
async def _call_with_retry_async(self, func, *args, **kwargs):
749-
"""Async version of _call_with_retry.
808+
"""Async version of _call_with_retry with failover support.
750809
751810
Args:
752811
func: The async function to call
@@ -767,16 +826,45 @@ async def _call_with_retry_async(self, func, *args, **kwargs):
767826
if self._rate_limiter is not None:
768827
await self._rate_limiter.acquire_async()
769828

770-
return await func(*args, **kwargs)
829+
result = await func(*args, **kwargs)
830+
831+
# Mark success if failover is configured
832+
if self._failover_manager and self._current_profile:
833+
self._failover_manager.mark_success(self._current_profile)
834+
835+
return result
771836

772837
except Exception as e:
773838
category, can_retry, retry_delay = self._classify_error_and_should_retry(e, attempt + 1)
774-
if not can_retry:
775-
raise
776-
839+
777840
last_error = e
778841
error_str = str(e)
779842

843+
# Failover: mark failure and try next profile (do this before early exit)
844+
if self._failover_manager and self._current_profile:
845+
is_rate_limit = (category == "rate_limit")
846+
self._failover_manager.mark_failure(
847+
self._current_profile, error_str, is_rate_limit=is_rate_limit
848+
)
849+
next_profile = self._failover_manager.get_next_profile()
850+
if next_profile and next_profile != self._current_profile:
851+
self._switch_to_profile(next_profile)
852+
self._current_profile = next_profile
853+
# Update the kwargs with new profile values for the next retry
854+
if "api_key" in kwargs:
855+
kwargs["api_key"] = self.api_key
856+
if "base_url" in kwargs:
857+
kwargs["base_url"] = self.base_url
858+
if "model" in kwargs:
859+
kwargs["model"] = self.model
860+
# Enable retry for profile switch even if originally non-retryable
861+
can_retry = True
862+
retry_delay = 0.0
863+
logging.info(f"Failover: switched to profile '{next_profile.name}'")
864+
865+
if not can_retry:
866+
raise
867+
780868
if attempt < self._max_retries:
781869
logging.warning(
782870
f"{category} error hit (attempt {attempt + 1}/{self._max_retries + 1}), "

0 commit comments

Comments
 (0)