Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/praisonai-agents/praisonaiagents/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,11 @@ async def arun_task(self, task_id):
except Exception as e:
logger.error(f"Error executing memory callback for task {task_id}: {e}")
logger.exception(e)
# Respect task failure policies - re-raise if configured
if hasattr(task, 'fail_on_callback_error') and task.fail_on_callback_error:
raise
if hasattr(task, 'fail_on_memory_error') and task.fail_on_memory_error:
raise

# Run task callback if exists
if task.callback:
Expand Down
26 changes: 15 additions & 11 deletions src/praisonai-agents/praisonaiagents/llm/error_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import re
import random
from enum import Enum
from typing import Dict, Tuple, List, Optional

Expand Down Expand Up @@ -145,6 +146,9 @@ def should_retry(category: ErrorCategory) -> bool:
def get_retry_delay(category: ErrorCategory, attempt: int = 1, base_delay: float = 1.0) -> float:
"""Get the appropriate delay before retrying based on error category.

Uses full jitter to prevent thundering herd problems in multi-agent setups
where multiple agents hit rate limits simultaneously.

Args:
category: Error category
attempt: Current attempt number (1-based)
Expand All @@ -154,29 +158,29 @@ def get_retry_delay(category: ErrorCategory, attempt: int = 1, base_delay: float
Delay in seconds, or 0 if should not retry

Examples:
>>> get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=1)
3.0
>>> get_retry_delay(ErrorCategory.TRANSIENT, attempt=3)
8.0
>>> get_retry_delay(ErrorCategory.AUTH, attempt=1)
0
>>> # With jitter, these will return random values in range:
>>> get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=1) # 0.0 to 3.0
>>> get_retry_delay(ErrorCategory.TRANSIENT, attempt=3) # 0.0 to 8.0
>>> get_retry_delay(ErrorCategory.AUTH, attempt=1) # Always 0
"""
attempt = max(1, attempt)

if not should_retry(category):
return 0

if category == ErrorCategory.RATE_LIMIT:
Comment thread
greptile-apps[bot] marked this conversation as resolved.
# Longer delay for rate limits to avoid hitting limits again
return min(base_delay * (3 ** attempt), 60.0)
# Exponential backoff with equal jitter for rate limits (minimum floor to prevent instant retries)
max_delay = min(base_delay * (3 ** attempt), 60.0)
return base_delay + random.uniform(0, max_delay - base_delay)

elif category == ErrorCategory.CONTEXT_LIMIT:
# Short delay for context limits (compression should be tried)
# Short delay for context limits (no jitter needed - not a contention issue)
return base_delay * 0.5

elif category == ErrorCategory.TRANSIENT:
# Exponential backoff for transient errors
return min(base_delay * (2 ** attempt), 30.0)
# Exponential backoff with equal jitter for transient errors (minimum floor to prevent instant retries)
max_delay = min(base_delay * (2 ** attempt), 30.0)
return base_delay + random.uniform(0, max_delay - base_delay)

return 0

Expand Down
9 changes: 9 additions & 0 deletions src/praisonai-agents/praisonaiagents/process/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,7 @@ def workflow(self):
stacklevel=3
)
current_iter = 0 # Track how many times we've looped
workflow_start = time.monotonic() # For timeout enforcement
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Timeout does not cover pre-loop CSV/file expansion.

workflow_start is captured at line 943, but the timeout is first checked inside the main while current_task: loop at line 1073. The block at lines 968-1060 (CSV/text file expansion for a loop start task) runs synchronously between those points and is not subject to the timeout. With a large input file, the workflow can spend significant time before the first timeout check.

Consider adding an early timeout check immediately before entering the main loop, or moving workflow_start = time.monotonic() to just before file expansion if you want full coverage. Same nit applies to aworkflow() (lines 430 vs 467). Low impact in practice.

Also applies to: 1072-1078

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/process/process.py` at line 943, The
timeout start time (workflow_start) is set too early relative to the CSV/text
file expansion, so that the expansion block (executed before the first timeout
check in the main while current_task: loop) isn't covered; fix by either moving
the workflow_start = time.monotonic() assignment to immediately before the
CSV/file expansion block or by adding an early timeout check using
workflow_start right before the expansion (the latter requires the same timeout
logic used inside the main loop), and apply the same change to aworkflow() where
its workflow_start is similarly out of place (ensure references to
workflow_start, the CSV/text file expansion block, and the main while
current_task: loop are updated consistently).

# Build workflow relationships first
for task in self.tasks.values():
if task.next_tasks:
Expand Down Expand Up @@ -1068,6 +1069,14 @@ def workflow(self):
logging.info(f"Max iteration limit {self.max_iter} reached, ending workflow.")
break

# Enforce workflow timeout if set
if self.workflow_timeout is not None:
elapsed = time.monotonic() - workflow_start
if elapsed > self.workflow_timeout:
logging.warning(f"Workflow timeout ({self.workflow_timeout}s) exceeded after {elapsed:.1f}s, ending workflow.")
self.workflow_cancelled = True
break

# ADDED: Check workflow finished flag at the start of each cycle
if self.workflow_finished:
logging.info("Workflow finished early as all tasks are completed.")
Expand Down
35 changes: 29 additions & 6 deletions src/praisonai-agents/praisonaiagents/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ def __init__(
caching: Optional[Any] = None,
# Output variable name for workflow variable assignment
output_variable: Optional[str] = None,
# Failure handling policy configuration
fail_on_callback_error: bool = False,
fail_on_memory_error: bool = False,
):
# Add check if memory config is provided
if memory is not None or (config and config.get('memory_config')):
Expand Down Expand Up @@ -222,6 +225,10 @@ def __init__(
self.agent_config = agent_config # Per-task agent configuration {role, goal, backstory, llm}
self.variables = variables if variables else {} # Variables for substitution in description
self.non_fatal_errors = [] # Accumulate non-fatal errors for visibility

# Failure handling policy configuration
self.fail_on_callback_error = fail_on_callback_error
self.fail_on_memory_error = fail_on_memory_error

# ============================================================
# ROBUSTNESS PARAMS (graceful degradation & retry control)
Expand Down Expand Up @@ -615,9 +622,13 @@ def _verify_memory_ready(self) -> bool:
# Also check for SQLite fallback
has_sqlite = hasattr(self.memory, '_sqlite_adapter') and self.memory._sqlite_adapter is not None

if not (has_adapter or has_sqlite):
logger.warning(f"Task {self.id}: Memory initialized but no adapter available — check memory configuration")

return has_adapter or has_sqlite
except Exception:
# If any error occurs during readiness check, consider memory not ready
except Exception as e:
# Surface configuration errors instead of hiding them
logger.error(f"Task {self.id}: Memory readiness check failed: {e}")
return False

def store_in_memory(self, content: str, agent_name: str = None, task_id: str = None):
Expand All @@ -635,8 +646,12 @@ def store_in_memory(self, content: str, agent_name: str = None, task_id: str = N
)
logger.info(f"Task {self.id}: Content stored in memory")
except Exception as e:
error_msg = f"store_in_memory: {e}"
self.non_fatal_errors.append(error_msg)
logger.error(f"Task {self.id}: Failed to store content in memory: {e}")
logger.exception(e)
if self.fail_on_memory_error:
raise

async def execute_callback(self, task_output: TaskOutput) -> None:
"""Execute callback and store quality metrics if enabled"""
Expand Down Expand Up @@ -665,6 +680,9 @@ async def execute_callback(self, task_output: TaskOutput) -> None:
except Exception as e:
logger.error(f"Task {self.id}: Failed to store task output in memory: {e}")
logger.exception(e)
# store_in_memory already appended to non_fatal_errors; respect policy
if self.fail_on_memory_error:
raise

logger.info(f"Task output: {task_output.raw[:100]}...")

Expand Down Expand Up @@ -751,10 +769,15 @@ async def execute_callback(self, task_output: TaskOutput) -> None:
logger.exception(e)
# Attach error to output for workflow orchestrator visibility
task_output.callback_error = str(e)
# TODO: Consider raising if callback is marked as critical
# if getattr(self, 'callback_critical', False):
# raise
if self.non_fatal_errors:
if self.fail_on_callback_error:
# Attach errors before re-raising
if self.non_fatal_errors:
task_output.non_fatal_errors = list(self.non_fatal_errors)
raise
Comment thread
greptile-apps[bot] marked this conversation as resolved.
# Attach non_fatal_errors to output if not already attached
# (TaskOutput.non_fatal_errors is a Pydantic field that defaults to None,
# so check the value, not field existence)
if self.non_fatal_errors and getattr(task_output, 'non_fatal_errors', None) is None:
task_output.non_fatal_errors = list(self.non_fatal_errors)

task_prompt = f"""
Expand Down
53 changes: 29 additions & 24 deletions src/praisonai-agents/tests/unit/llm/test_error_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,38 +113,43 @@ def test_unknown_errors(self):
class TestRetryLogic:

def test_retry_delays(self):
"""Test retry delay calculation for different categories."""
# Rate limit delays (exponential with factor of 3)
assert get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=1) == 3.0
assert get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=2) == 9.0
assert get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=3) == 27.0

# Context limit delays (short, for immediate retry with compression)
"""Test retry delay ranges for different categories.

After issue #1553, RATE_LIMIT and TRANSIENT use equal-jitter to prevent
thundering-herd retry storms across multi-agent workflows. Delays are
bounded in [base_delay, exp_max] rather than deterministic.
"""
# Rate limit delays (exponential factor 3, jittered floor=base_delay=1.0)
assert 1.0 <= get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=1) <= 3.0
assert 1.0 <= get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=2) <= 9.0
assert 1.0 <= get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=3) <= 27.0

# Context limit delays remain deterministic (no contention concern)
assert get_retry_delay(ErrorCategory.CONTEXT_LIMIT, attempt=1) == 0.5
assert get_retry_delay(ErrorCategory.CONTEXT_LIMIT, attempt=2) == 0.5
# Transient delays (exponential with factor of 2)
assert get_retry_delay(ErrorCategory.TRANSIENT, attempt=1) == 2.0
assert get_retry_delay(ErrorCategory.TRANSIENT, attempt=2) == 4.0
assert get_retry_delay(ErrorCategory.TRANSIENT, attempt=3) == 8.0

# Transient delays (exponential factor 2, jittered floor=base_delay=1.0)
assert 1.0 <= get_retry_delay(ErrorCategory.TRANSIENT, attempt=1) <= 2.0
assert 1.0 <= get_retry_delay(ErrorCategory.TRANSIENT, attempt=2) <= 4.0
assert 1.0 <= get_retry_delay(ErrorCategory.TRANSIENT, attempt=3) <= 8.0

# No retry for permanent errors
assert get_retry_delay(ErrorCategory.AUTH, attempt=1) == 0
assert get_retry_delay(ErrorCategory.INVALID_REQUEST, attempt=1) == 0
assert get_retry_delay(ErrorCategory.PERMANENT, attempt=1) == 0

def test_retry_delay_caps(self):
"""Test that retry delays have appropriate caps."""
# Rate limit cap at 60 seconds
assert get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=10) == 60.0
# Transient cap at 30 seconds
assert get_retry_delay(ErrorCategory.TRANSIENT, attempt=10) == 30.0
"""Test that jittered retry delays remain within their respective caps."""
# Rate limit cap at 60 seconds (jittered floor=base_delay=1.0)
assert 1.0 <= get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=10) <= 60.0

# Transient cap at 30 seconds (jittered floor=base_delay=1.0)
assert 1.0 <= get_retry_delay(ErrorCategory.TRANSIENT, attempt=10) <= 30.0

def test_base_delay_scaling(self):
"""Test custom base delay scaling."""
assert get_retry_delay(ErrorCategory.TRANSIENT, attempt=1, base_delay=2.0) == 4.0
assert get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=1, base_delay=2.0) == 6.0
"""Test custom base delay scaling — base_delay sets the jitter floor."""
assert 2.0 <= get_retry_delay(ErrorCategory.TRANSIENT, attempt=1, base_delay=2.0) <= 4.0
assert 2.0 <= get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=1, base_delay=2.0) <= 6.0


class TestRetryAfterExtraction:
Expand Down
73 changes: 73 additions & 0 deletions src/praisonai-agents/tests/unit/test_error_classifier_jitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Test for retry jitter in error classifier (Issue #1553 Gap 2)
"""
import pytest
from praisonaiagents.llm.error_classifier import ErrorCategory, get_retry_delay


def test_rate_limit_jitter():
"""Test that RATE_LIMIT errors use jitter with minimum floor"""
delays = []
for _ in range(20):
delay = get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=1)
delays.append(delay)

# All delays should be in valid range [base_delay=1.0, max_delay=3.0]
assert all(1.0 <= delay <= 3.0 for delay in delays), f"Some delays out of range: {delays}"

# Should have some variation (jitter working)
unique_delays = len(set(delays))
assert unique_delays >= 5, f"Not enough variation in delays (got {unique_delays} unique out of 20)"

# Should have minimum floor (no zero delays)
assert all(delay >= 1.0 for delay in delays), f"Some delays below minimum: {min(delays)}"


def test_transient_jitter():
"""Test that TRANSIENT errors use jitter with minimum floor"""
delays = []
for _ in range(20):
delay = get_retry_delay(ErrorCategory.TRANSIENT, attempt=1)
delays.append(delay)

# All delays should be in valid range [base_delay=1.0, max_delay=2.0]
assert all(1.0 <= delay <= 2.0 for delay in delays), f"Some delays out of range: {delays}"

# Should have some variation
unique_delays = len(set(delays))
assert unique_delays >= 5, f"Not enough variation in delays (got {unique_delays} unique out of 20)"

# Should have minimum floor
assert all(delay >= 1.0 for delay in delays), f"Some delays below minimum: {min(delays)}"


def test_context_limit_deterministic():
"""Test that CONTEXT_LIMIT delays remain deterministic (no jitter needed)"""
delay1 = get_retry_delay(ErrorCategory.CONTEXT_LIMIT, attempt=1)
delay2 = get_retry_delay(ErrorCategory.CONTEXT_LIMIT, attempt=1)
delay3 = get_retry_delay(ErrorCategory.CONTEXT_LIMIT, attempt=2)

# Context limits should be deterministic
assert delay1 == delay2, "Context limit delays should be deterministic"
assert delay1 == 0.5, f"Context limit delay should be 0.5, got {delay1}"
assert delay3 == 0.5, f"Context limit delay should be 0.5 regardless of attempt, got {delay3}"


def test_exponential_backoff_with_jitter():
"""Test that exponential backoff still works with jitter"""
# Test increasing attempts for rate limits
delay_attempt1 = get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=1) # range: [1.0, 3.0]
delay_attempt2 = get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=2) # range: [1.0, 9.0]
delay_attempt3 = get_retry_delay(ErrorCategory.RATE_LIMIT, attempt=3) # range: [1.0, 27.0]

# Higher attempts should generally produce higher maximum possible delays
# (though jitter means specific values may vary)
assert delay_attempt1 <= 3.0, f"Attempt 1 delay should be <= 3.0, got {delay_attempt1}"
assert delay_attempt2 <= 9.0, f"Attempt 2 delay should be <= 9.0, got {delay_attempt2}"
assert delay_attempt3 <= 60.0, f"Attempt 3 delay should be <= 60.0 (capped), got {delay_attempt3}"


def test_no_retry_categories():
"""Test that AUTH and other non-retryable categories return 0"""
assert get_retry_delay(ErrorCategory.AUTH, attempt=1) == 0
assert get_retry_delay(ErrorCategory.AUTH, attempt=5) == 0
Loading