From 56443bf3f8811dcec06272f42bcec2c33272f0a6 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Wed, 8 Apr 2026 07:35:57 +0000 Subject: [PATCH 1/2] feat: fix three architectural gaps identified in issue #1288 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. DRY violation in AgentTeam execute_task/aexecute_task - Extracted shared helpers: _prepare_task_prompt(), _execute_with_agent_sync/async() - Fixed missing stream=self.stream parameter in async path - Eliminated ~120 lines of duplicated logic 2. Structured exception hierarchy with ErrorContextProtocol - Created praisonaiagents/errors.py with protocol-driven error handling - Added error categories: tool, llm, budget, validation, network, handoff - Migrated existing ad-hoc errors to structured hierarchy - Updated lazy imports for backward compatibility 3. Streaming feature parity for YAML/CLI - Enhanced YAML schema to support streaming: configuration block - Added CLI flags: --stream and --stream-metrics - Extended agents_generator.py to wire streaming through OutputConfig - Achieves complete CLI + YAML + Python feature parity Fixes violations of stated pillars: DRY, protocol-driven core, 3-way feature parity. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../praisonaiagents/__init__.py | 18 +- .../praisonaiagents/agent/agent.py | 19 +- .../praisonaiagents/agent/chat_mixin.py | 8 +- .../praisonaiagents/agent/handoff.py | 48 ++- .../praisonaiagents/agents/agents.py | 196 ++++++------ .../praisonaiagents/errors.py | 290 ++++++++++++++++++ src/praisonai/praisonai/agents_generator.py | 31 ++ src/praisonai/praisonai/cli/main.py | 10 + 8 files changed, 475 insertions(+), 145 deletions(-) create mode 100644 src/praisonai-agents/praisonaiagents/errors.py diff --git a/src/praisonai-agents/praisonaiagents/__init__.py b/src/praisonai-agents/praisonaiagents/__init__.py index 2d678a16b..71a4eb803 100644 --- a/src/praisonai-agents/praisonaiagents/__init__.py +++ b/src/praisonai-agents/praisonaiagents/__init__.py @@ -176,10 +176,10 @@ def _get_lazy_cache(): 'HandoffResult': ('praisonaiagents.agent.handoff', 'HandoffResult'), 'HandoffInputData': ('praisonaiagents.agent.handoff', 'HandoffInputData'), 'ContextPolicy': ('praisonaiagents.agent.handoff', 'ContextPolicy'), - 'HandoffError': ('praisonaiagents.agent.handoff', 'HandoffError'), - 'HandoffCycleError': ('praisonaiagents.agent.handoff', 'HandoffCycleError'), - 'HandoffDepthError': ('praisonaiagents.agent.handoff', 'HandoffDepthError'), - 'HandoffTimeoutError': ('praisonaiagents.agent.handoff', 'HandoffTimeoutError'), + 'HandoffError': ('praisonaiagents.errors', 'HandoffError'), + 'HandoffCycleError': ('praisonaiagents.errors', 'HandoffCycleError'), + 'HandoffDepthError': ('praisonaiagents.errors', 'HandoffDepthError'), + 'HandoffTimeoutError': ('praisonaiagents.errors', 'HandoffTimeoutError'), # Embedding API (Note: embedding/embeddings handled in custom_handler to override subpackage) 'aembedding': ('praisonaiagents.embedding.embed', 'aembedding'), @@ -203,7 +203,15 @@ def _get_lazy_cache(): # Agent classes 'Agent': ('praisonaiagents.agent.agent', 'Agent'), - 'BudgetExceededError': ('praisonaiagents.agent.agent', 'BudgetExceededError'), + 'BudgetExceededError': ('praisonaiagents.errors', 'BudgetExceededError'), + + # Error hierarchy - structured exception handling + 'PraisonAIError': ('praisonaiagents.errors', 'PraisonAIError'), + 'ToolExecutionError': ('praisonaiagents.errors', 'ToolExecutionError'), + 'LLMError': ('praisonaiagents.errors', 'LLMError'), + 'ValidationError': ('praisonaiagents.errors', 'ValidationError'), + 'NetworkError': ('praisonaiagents.errors', 'NetworkError'), + 'ErrorContextProtocol': ('praisonaiagents.errors', 'ErrorContextProtocol'), 'Heartbeat': ('praisonaiagents.agent.heartbeat', 'Heartbeat'), 'HeartbeatConfig': ('praisonaiagents.agent.heartbeat', 'HeartbeatConfig'), 'ImageAgent': ('praisonaiagents.agent.image_agent', 'ImageAgent'), diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index 762a435b5..8d6b4f1fb 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -187,23 +187,8 @@ def _is_file_path(value: str) -> bool: from ..rag.models import RAGResult, ContextPack from ..eval.results import EvaluationLoopResult -class BudgetExceededError(Exception): - """Raised when an agent exceeds its max_budget. - - Usage: - try: - agent.start("...") - except BudgetExceededError as e: - print(f"Agent '{e.agent_name}' spent ${e.total_cost:.4f} of ${e.max_budget:.4f}") - """ - def __init__(self, agent_name: str, total_cost: float, max_budget: float): - self.agent_name = agent_name - self.total_cost = total_cost - self.max_budget = max_budget - super().__init__( - f"Agent '{agent_name}' exceeded budget: " - f"${total_cost:.4f} >= ${max_budget:.4f}" - ) +# Import structured error from central errors module +from ..errors import BudgetExceededError class Agent(ToolExecutionMixin, ChatHandlerMixin, SessionManagerMixin, ChatMixin, ExecutionMixin, MemoryMixin): # Class-level counter for generating unique display names for nameless agents diff --git a/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py b/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py index facab40fc..6572a4c8e 100644 --- a/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py +++ b/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py @@ -666,7 +666,13 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=True, r self._llm_call_count += 1 if self._max_budget and self._total_cost >= self._max_budget: if self._on_budget_exceeded == "stop": - raise BudgetExceededError(self.name, self._total_cost, self._max_budget) + raise BudgetExceededError( + f"Agent '{self.name}' exceeded budget: ${self._total_cost:.4f} >= ${self._max_budget:.4f}", + budget_type="tokens", + limit=self._max_budget, + used=self._total_cost, + agent_id=self.name + ) elif self._on_budget_exceeded == "warn": logging.warning( f"[budget] {self.name}: ${self._total_cost:.4f} exceeded " diff --git a/src/praisonai-agents/praisonaiagents/agent/handoff.py b/src/praisonai-agents/praisonaiagents/agent/handoff.py index b9fe5f26e..897aa8255 100644 --- a/src/praisonai-agents/praisonaiagents/agent/handoff.py +++ b/src/praisonai-agents/praisonaiagents/agent/handoff.py @@ -97,29 +97,13 @@ def from_dict(cls, data: Dict[str, Any]) -> 'HandoffConfig': data["context_policy"] = ContextPolicy(data["context_policy"]) return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}) -class HandoffError(Exception): - """Base exception for handoff errors.""" - pass - -class HandoffCycleError(HandoffError): - """Raised when a cycle is detected in handoff chain.""" - def __init__(self, chain: List[str]): - self.chain = chain - super().__init__(f"Handoff cycle detected: {' -> '.join(chain)}") - -class HandoffDepthError(HandoffError): - """Raised when max handoff depth is exceeded.""" - def __init__(self, depth: int, max_depth: int): - self.depth = depth - self.max_depth = max_depth - super().__init__(f"Max handoff depth exceeded: {depth} > {max_depth}") - -class HandoffTimeoutError(HandoffError): - """Raised when handoff times out.""" - def __init__(self, timeout: float, agent_name: str): - self.timeout = timeout - self.agent_name = agent_name - super().__init__(f"Handoff to {agent_name} timed out after {timeout}s") +# Import structured error hierarchy from central errors module +from ..errors import ( + HandoffError, + HandoffCycleError, + HandoffDepthError, + HandoffTimeoutError +) # Thread-local storage for tracking handoff chains _handoff_context = threading.local() @@ -267,12 +251,26 @@ def _check_safety(self, source_agent: 'Agent') -> None: if self.config.detect_cycles: chain = _get_handoff_chain() if target_name in chain: - raise HandoffCycleError(chain + [target_name]) + cycle_path = chain + [target_name] + raise HandoffCycleError( + f"Handoff cycle detected: {' -> '.join(cycle_path)}", + source_agent=source_agent.name if hasattr(source_agent, 'name') else 'unknown', + target_agent=target_name, + agent_id=source_agent.name if hasattr(source_agent, 'name') else 'unknown', + cycle_path=cycle_path + ) # Check depth current_depth = _get_handoff_depth() if current_depth >= self.config.max_depth: - raise HandoffDepthError(current_depth + 1, self.config.max_depth) + raise HandoffDepthError( + f"Max handoff depth exceeded: {current_depth + 1} > {self.config.max_depth}", + source_agent=source_agent.name if hasattr(source_agent, 'name') else 'unknown', + target_agent=target_name, + agent_id=source_agent.name if hasattr(source_agent, 'name') else 'unknown', + max_depth=self.config.max_depth, + current_depth=current_depth + 1 + ) def _prepare_context(self, source_agent: 'Agent', kwargs: Dict[str, Any]) -> HandoffInputData: """ diff --git a/src/praisonai-agents/praisonaiagents/agents/agents.py b/src/praisonai-agents/praisonaiagents/agents/agents.py index 413ac30fa..4420b2bed 100644 --- a/src/praisonai-agents/praisonaiagents/agents/agents.py +++ b/src/praisonai-agents/praisonaiagents/agents/agents.py @@ -108,6 +108,93 @@ def get_multimodal_message(text_prompt: str, images: list) -> list: }) return content +def _prepare_task_prompt(task, task_description, context_text=None): + """ + Prepare task prompt with context and memory (DRY helper). + """ + # Build task prompt - only use "User Input/Topic" format if there's actual content + if context_text and context_text.strip(): + task_prompt = f""" +User Input/Topic: {context_text} + +Task: {task_description} +Expected Output: {task.expected_output} + +IMPORTANT: Your response must be about the user's input/topic above. Incorporate it into your task.""" + else: + task_prompt = f""" +You need to do the following task: {task_description}. +Expected Output: {task.expected_output}.""" + + # Add memory context if available + if task.memory: + try: + memory_context = task.memory.build_context_for_task(task.description) + if memory_context: + # Log detailed memory context for debugging + logger.debug(f"Memory context for task '{task.description}': {memory_context}") + # Include actual memory content without verbose headers (essential for AI agent functionality) + task_prompt += f"\n\n{memory_context}" + except Exception as e: + logger.error(f"Error getting memory context: {e}") + + task_prompt += "\nPlease provide only the final result of your work. Do not add any conversation or extra explanation." + return task_prompt + +def _execute_with_agent_sync(executor_agent, task_prompt, task, tools, stream=None): + """ + Execute task with agent synchronously (DRY helper). + """ + if task.images: + return executor_agent.chat( + get_multimodal_message(task_prompt, task.images), + tools=tools, + output_json=task.output_json, + output_pydantic=task.output_pydantic, + stream=stream, + task_name=task.name, + task_description=task.description, + task_id=task.id + ) + else: + return executor_agent.chat( + task_prompt, + tools=tools, + output_json=task.output_json, + output_pydantic=task.output_pydantic, + stream=stream, + task_name=task.name, + task_description=task.description, + task_id=task.id + ) + +async def _execute_with_agent_async(executor_agent, task_prompt, task, tools, stream=None): + """ + Execute task with agent asynchronously (DRY helper). + """ + if task.images: + return await executor_agent.achat( + get_multimodal_message(task_prompt, task.images), + tools=tools, + output_json=task.output_json, + output_pydantic=task.output_pydantic, + stream=stream, + task_name=task.name, + task_description=task.description, + task_id=task.id + ) + else: + return await executor_agent.achat( + task_prompt, + tools=tools, + output_json=task.output_json, + output_pydantic=task.output_pydantic, + stream=stream, + task_name=task.name, + task_description=task.description, + task_id=task.id + ) + def process_task_context(context_item, verbose=0, user_id=None): """ Process a single context item for task execution. @@ -703,59 +790,17 @@ async def aexecute_task(self, task_id): context_separator = '\n\n' context_text = context_separator.join(unique_contexts) - # Build task prompt - only use "User Input/Topic" format if there's actual content - if context_text and context_text.strip(): - task_prompt = f""" -User Input/Topic: {context_text} - -Task: {task_description} -Expected Output: {task.expected_output} - -IMPORTANT: Your response must be about the user's input/topic above. Incorporate it into your task.""" - else: - task_prompt = f""" -You need to do the following task: {task_description}. -Expected Output: {task.expected_output}.""" - - # Add memory context if available - if task.memory: - try: - memory_context = task.memory.build_context_for_task(task.description) - if memory_context: - # Log detailed memory context for debugging - logger.debug(f"Memory context for task '{task.description}': {memory_context}") - # Include actual memory content without verbose headers (essential for AI agent functionality) - task_prompt += f"\n\n{memory_context}" - except Exception as e: - logger.error(f"Error getting memory context: {e}") - - task_prompt += "\nPlease provide only the final result of your work. Do not add any conversation or extra explanation." + # Build task prompt using DRY helper + task_prompt = _prepare_task_prompt(task, task_description, context_text) if self.verbose >= 2: logger.info(f"Executing task {task_id}: {task_description} using {executor_agent.display_name}") logger.debug(f"Starting execution of task {task_id} with prompt:\n{task_prompt}") - if task.images: - # Use shared multimodal helper (DRY - defined at module level) - agent_output = await executor_agent.achat( - get_multimodal_message(task_prompt, task.images), - tools=tools, - output_json=task.output_json, - output_pydantic=task.output_pydantic, - task_name=task.name, - task_description=task.description, - task_id=task.id - ) - else: - agent_output = await executor_agent.achat( - task_prompt, - tools=tools, - output_json=task.output_json, - output_pydantic=task.output_pydantic, - task_name=task.name, - task_description=task.description, - task_id=task.id - ) + # Execute with agent using DRY helper (fixes missing stream parameter) + agent_output = await _execute_with_agent_async( + executor_agent, task_prompt, task, tools, stream=self.stream + ) if agent_output: # Store the response in memory @@ -1066,60 +1111,17 @@ def execute_task(self, task_id): context_separator = '\n\n' context_text = context_separator.join(unique_contexts) - # Build task prompt - only use "User Input/Topic" format if there's actual content - if context_text and context_text.strip(): - task_prompt = f""" -User Input/Topic: {context_text} - -Task: {task_description} -Expected Output: {task.expected_output} - -IMPORTANT: Your response must be about the user's input/topic above. Incorporate it into your task.""" - else: - task_prompt = f""" -You need to do the following task: {task_description}. -Expected Output: {task.expected_output}.""" - - # Add memory context if available - if task.memory: - try: - memory_context = task.memory.build_context_for_task(task.description) - if memory_context: - # Log detailed memory context for debugging - logger.debug(f"Memory context for task '{task.description}': {memory_context}") - # Include actual memory content without verbose headers (essential for AI agent functionality) - task_prompt += f"\n\n{memory_context}" - except Exception as e: - logger.error(f"Error getting memory context: {e}") - - task_prompt += "\nPlease provide only the final result of your work. Do not add any conversation or extra explanation." + # Build task prompt using DRY helper + task_prompt = _prepare_task_prompt(task, task_description, context_text) if self.verbose >= 2: logger.info(f"Executing task {task_id}: {task.description} using {executor_agent.display_name}") logger.debug(f"Starting execution of task {task_id} with prompt:\n{task_prompt}") - if task.images: - # Use shared multimodal helper (DRY - defined at module level) - agent_output = executor_agent.chat( - get_multimodal_message(task_prompt, task.images), - tools=tools, - output_json=task.output_json, - output_pydantic=task.output_pydantic, - task_name=task.name, - task_description=task.description, - task_id=task_id - ) - else: - agent_output = executor_agent.chat( - task_prompt, - tools=tools, - output_json=task.output_json, - output_pydantic=task.output_pydantic, - stream=self.stream, - task_name=task.name, - task_description=task.description, - task_id=task_id - ) + # Execute with agent using DRY helper + agent_output = _execute_with_agent_sync( + executor_agent, task_prompt, task, tools, stream=self.stream + ) if agent_output: # Store the response in memory diff --git a/src/praisonai-agents/praisonaiagents/errors.py b/src/praisonai-agents/praisonaiagents/errors.py new file mode 100644 index 000000000..5ce474a16 --- /dev/null +++ b/src/praisonai-agents/praisonaiagents/errors.py @@ -0,0 +1,290 @@ +""" +Structured exception hierarchy for PraisonAI SDK. + +Provides uniform error semantics for consistent handling across: +- Multi-agent orchestration +- Tool execution +- LLM interactions +- Memory operations +- External integrations +""" + +from typing import Literal, Protocol, runtime_checkable, Optional, Dict, Any +import uuid + + +@runtime_checkable +class ErrorContextProtocol(Protocol): + """Protocol for structured error context propagation.""" + + agent_id: str + run_id: str + is_retryable: bool + error_category: Literal["tool", "llm", "budget", "validation", "network", "handoff"] + + +class PraisonAIError(Exception): + """ + Base error class with structured context for PraisonAI SDK. + + All PraisonAI errors inherit from this to ensure uniform error handling, + context propagation, and observability hooks. + """ + + def __init__( + self, + message: str, + agent_id: str = "unknown", + run_id: Optional[str] = None, + error_category: Literal["tool", "llm", "budget", "validation", "network", "handoff"] = "validation", + is_retryable: bool = False, + context: Optional[Dict[str, Any]] = None + ): + super().__init__(message) + self.message = message + self.agent_id = agent_id + self.run_id = run_id or str(uuid.uuid4()) + self.error_category = error_category + self.is_retryable = is_retryable + self.context = context or {} + + def __str__(self) -> str: + return f"[{self.error_category}] {self.message} (agent: {self.agent_id}, run: {self.run_id})" + + +class ToolExecutionError(PraisonAIError): + """ + Tool execution failed. + + Includes tool name and execution context for better debugging + and selective retry policies. + """ + + def __init__( + self, + message: str, + tool_name: str = "unknown", + agent_id: str = "unknown", + run_id: Optional[str] = None, + is_retryable: bool = True, # Most tool errors are retryable + context: Optional[Dict[str, Any]] = None + ): + context = context or {} + context["tool_name"] = tool_name + super().__init__( + message, + agent_id=agent_id, + run_id=run_id, + error_category="tool", + is_retryable=is_retryable, + context=context + ) + self.tool_name = tool_name + + +class LLMError(PraisonAIError): + """ + LLM interaction failed. + + Distinguishes between rate limits (retryable) vs model errors (fatal). + """ + + def __init__( + self, + message: str, + model_name: str = "unknown", + agent_id: str = "unknown", + run_id: Optional[str] = None, + is_retryable: bool = False, # Default to non-retryable unless specified + context: Optional[Dict[str, Any]] = None + ): + context = context or {} + context["model_name"] = model_name + super().__init__( + message, + agent_id=agent_id, + run_id=run_id, + error_category="llm", + is_retryable=is_retryable, + context=context + ) + self.model_name = model_name + + +class BudgetExceededError(PraisonAIError): + """ + Budget limits exceeded (tokens, time, etc). + + Generally not retryable without intervention. + """ + + def __init__( + self, + message: str, + budget_type: str = "tokens", + limit: Optional[float] = None, + used: Optional[float] = None, + agent_id: str = "unknown", + run_id: Optional[str] = None, + context: Optional[Dict[str, Any]] = None + ): + context = context or {} + context.update({ + "budget_type": budget_type, + "limit": limit, + "used": used + }) + super().__init__( + message, + agent_id=agent_id, + run_id=run_id, + error_category="budget", + is_retryable=False, # Budget errors require intervention + context=context + ) + self.budget_type = budget_type + self.limit = limit + self.used = used + + +class ValidationError(PraisonAIError): + """ + Input validation failed. + + Usually indicates programming errors, not retryable. + """ + + def __init__( + self, + message: str, + field_name: Optional[str] = None, + agent_id: str = "unknown", + run_id: Optional[str] = None, + context: Optional[Dict[str, Any]] = None + ): + context = context or {} + if field_name: + context["field_name"] = field_name + super().__init__( + message, + agent_id=agent_id, + run_id=run_id, + error_category="validation", + is_retryable=False, # Validation errors need code fixes + context=context + ) + self.field_name = field_name + + +class NetworkError(PraisonAIError): + """ + Network/external service error. + + Often retryable with backoff. + """ + + def __init__( + self, + message: str, + service_name: str = "unknown", + status_code: Optional[int] = None, + agent_id: str = "unknown", + run_id: Optional[str] = None, + is_retryable: bool = True, # Most network errors are retryable + context: Optional[Dict[str, Any]] = None + ): + context = context or {} + context.update({ + "service_name": service_name, + "status_code": status_code + }) + super().__init__( + message, + agent_id=agent_id, + run_id=run_id, + error_category="network", + is_retryable=is_retryable, + context=context + ) + self.service_name = service_name + self.status_code = status_code + + +class HandoffError(PraisonAIError): + """ + Agent handoff/delegation failed. + + Includes source/target agent context for multi-agent debugging. + """ + + def __init__( + self, + message: str, + source_agent: str = "unknown", + target_agent: Optional[str] = None, + agent_id: str = "unknown", + run_id: Optional[str] = None, + is_retryable: bool = False, # Handoff errors usually need investigation + context: Optional[Dict[str, Any]] = None + ): + context = context or {} + context.update({ + "source_agent": source_agent, + "target_agent": target_agent + }) + super().__init__( + message, + agent_id=agent_id, + run_id=run_id, + error_category="handoff", + is_retryable=is_retryable, + context=context + ) + self.source_agent = source_agent + self.target_agent = target_agent + + +# Specialized handoff errors (maintain backward compatibility) +class HandoffCycleError(HandoffError): + """Circular handoff dependency detected.""" + + def __init__(self, message: str, cycle_path: Optional[list] = None, **kwargs): + super().__init__(message, **kwargs) + if cycle_path: + self.context["cycle_path"] = cycle_path + + +class HandoffDepthError(HandoffError): + """Maximum handoff depth exceeded.""" + + def __init__(self, message: str, max_depth: Optional[int] = None, current_depth: Optional[int] = None, **kwargs): + super().__init__(message, **kwargs) + self.context.update({ + "max_depth": max_depth, + "current_depth": current_depth + }) + + +class HandoffTimeoutError(HandoffError): + """Handoff operation timed out.""" + + def __init__(self, message: str, timeout_seconds: Optional[float] = None, **kwargs): + super().__init__(message, is_retryable=True, **kwargs) # Timeouts may be retryable + if timeout_seconds: + self.context["timeout_seconds"] = timeout_seconds + + +# Export all error types for easy importing +__all__ = [ + "ErrorContextProtocol", + "PraisonAIError", + "ToolExecutionError", + "LLMError", + "BudgetExceededError", + "ValidationError", + "NetworkError", + "HandoffError", + "HandoffCycleError", + "HandoffDepthError", + "HandoffTimeoutError" +] \ No newline at end of file diff --git a/src/praisonai/praisonai/agents_generator.py b/src/praisonai/praisonai/agents_generator.py index 6ced9478b..49f9c91d7 100644 --- a/src/praisonai/praisonai/agents_generator.py +++ b/src/praisonai/praisonai/agents_generator.py @@ -1200,6 +1200,27 @@ def _run_praisonai(self, config, topic, tools_dict): autonomy_config = details.get('autonomy') guardrails_config = details.get('guardrails') + # Extract streaming configuration - YAML takes precedence over CLI + streaming_config = details.get('streaming') + stream_enabled = False + stream_metrics = False + + if streaming_config: + if isinstance(streaming_config, bool): + stream_enabled = streaming_config + elif isinstance(streaming_config, dict): + stream_enabled = streaming_config.get('enabled', False) + stream_metrics = streaming_config.get('emit_metrics', False) + # Future: can add callbacks, etc. from streaming_config + elif 'stream' in details: # Also support direct 'stream: true' format + stream_enabled = details.get('stream', False) + + # CLI streaming flags override if YAML doesn't specify + cli_config = getattr(self, 'cli_config', {}) or {} + if not streaming_config and not details.get('stream'): + stream_enabled = cli_config.get('stream', False) + stream_metrics = cli_config.get('stream_metrics', False) + # Reconstruct approval config from potentially scattered settings approval_val = details.get('approval') approve_all = details.get('approve_all_tools') @@ -1238,6 +1259,15 @@ def _run_praisonai(self, config, topic, tools_dict): # Last resort: disable approval for this agent approval_config = None + # Build output configuration with streaming support + output_config = None + if stream_enabled: + try: + from praisonaiagents.config import OutputConfig + output_config = OutputConfig(stream=True, metrics=stream_metrics) + except ImportError: + self.logger.warning("OutputConfig not available, streaming disabled") + agent = PraisonAgent( name=role_filled, role=role_filled, @@ -1253,6 +1283,7 @@ def _run_praisonai(self, config, topic, tools_dict): autonomy=autonomy_config, guardrails=guardrails_config, approval=approval_config, + output=output_config, ) if self.agent_callback: diff --git a/src/praisonai/praisonai/cli/main.py b/src/praisonai/praisonai/cli/main.py index 8a8ef8890..572be70b0 100644 --- a/src/praisonai/praisonai/cli/main.py +++ b/src/praisonai/praisonai/cli/main.py @@ -928,6 +928,8 @@ def parse_args(self): parser.add_argument("--web", "--web-search", action="store_true", help="Enable native web search (OpenAI, Gemini, Anthropic, xAI, Perplexity)") parser.add_argument("--web-fetch", action="store_true", help="Enable web fetch to retrieve URL content (Anthropic only)") parser.add_argument("--prompt-caching", action="store_true", help="Enable prompt caching to reduce costs (OpenAI, Anthropic, Bedrock, Deepseek)") + parser.add_argument("--stream", action="store_true", help="Enable real-time streaming for agent responses") + parser.add_argument("--stream-metrics", action="store_true", help="Enable streaming with token metrics display") # Planning Mode arguments parser.add_argument("--planning", action="store_true", help="Enable planning mode - create plan before execution") @@ -4017,6 +4019,14 @@ def _extract_cli_config_for_yaml(self): if approval_timeout is not None: cli_config['approval_timeout'] = approval_timeout + # Extract streaming configuration for YAML CLI parity + stream = getattr(self.args, 'stream', False) + stream_metrics = getattr(self.args, 'stream_metrics', False) + if stream or stream_metrics: + cli_config['stream'] = stream or stream_metrics + if stream_metrics: + cli_config['stream_metrics'] = True + return cli_config def handle_direct_prompt(self, prompt): From 55287d4cc35dab9ebfea6201c197e4b6cdeb27e3 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Wed, 8 Apr 2026 07:56:55 +0000 Subject: [PATCH 2/2] fix: address reviewer feedback - backward compatibility and bugs - Add missing BudgetExceededError import in chat_mixin.py - Fix budget_type from 'tokens' to 'cost' for USD values - Add backward compatibility constructor for BudgetExceededError - Add legacy attributes (agent_name, total_cost, max_budget) for compatibility - Fix HandoffTimeoutError constructor call in handoff.py - Add backward compatibility aliases for handoff errors - Fix streaming precedence logic to handle explicit false values - Add explicit BudgetExceededError re-raise to prevent generic exception swallowing Co-authored-by: Mervin Praison --- .../praisonaiagents/agent/chat_mixin.py | 5 +- .../praisonaiagents/agent/handoff.py | 8 +- .../praisonaiagents/errors.py | 91 +++++++++++++++---- src/praisonai/praisonai/agents_generator.py | 8 +- 4 files changed, 89 insertions(+), 23 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py b/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py index 6572a4c8e..df8ade3df 100644 --- a/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py +++ b/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py @@ -13,6 +13,7 @@ from praisonaiagents._logging import get_logger import asyncio import threading +from ..errors import BudgetExceededError # Fallback helpers to avoid circular imports def _get_console(): @@ -668,7 +669,7 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=True, r if self._on_budget_exceeded == "stop": raise BudgetExceededError( f"Agent '{self.name}' exceeded budget: ${self._total_cost:.4f} >= ${self._max_budget:.4f}", - budget_type="tokens", + budget_type="cost", limit=self._max_budget, used=self._total_cost, agent_id=self.name @@ -698,6 +699,8 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=True, r return final_response + except BudgetExceededError: + raise except Exception as e: error_str = str(e).lower() diff --git a/src/praisonai-agents/praisonaiagents/agent/handoff.py b/src/praisonai-agents/praisonaiagents/agent/handoff.py index 897aa8255..46bc0dab6 100644 --- a/src/praisonai-agents/praisonaiagents/agent/handoff.py +++ b/src/praisonai-agents/praisonaiagents/agent/handoff.py @@ -536,7 +536,13 @@ async def _execute(): handoff_depth=_get_handoff_depth(), ) self._execute_callback(self.config.on_error, source_agent, kwargs, result) - raise HandoffTimeoutError(self.config.timeout_seconds, self.agent.name) + raise HandoffTimeoutError( + f"Handoff to {self.agent.name} timed out after {self.config.timeout_seconds}s", + timeout_seconds=self.config.timeout_seconds, + source_agent=source_agent.name if hasattr(source_agent, "name") else "unknown", + target_agent=self.agent.name, + agent_id=source_agent.name if hasattr(source_agent, "name") else "unknown" + ) except Exception as e: result = HandoffResult( success=False, diff --git a/src/praisonai-agents/praisonaiagents/errors.py b/src/praisonai-agents/praisonaiagents/errors.py index 5ce474a16..c861ce148 100644 --- a/src/praisonai-agents/praisonaiagents/errors.py +++ b/src/praisonai-agents/praisonaiagents/errors.py @@ -120,7 +120,9 @@ class BudgetExceededError(PraisonAIError): def __init__( self, - message: str, + message_or_agent_name, + total_cost_or_budget_type = None, + max_budget_or_limit = None, budget_type: str = "tokens", limit: Optional[float] = None, used: Optional[float] = None, @@ -128,23 +130,69 @@ def __init__( run_id: Optional[str] = None, context: Optional[Dict[str, Any]] = None ): - context = context or {} - context.update({ - "budget_type": budget_type, - "limit": limit, - "used": used - }) - super().__init__( - message, - agent_id=agent_id, - run_id=run_id, - error_category="budget", - is_retryable=False, # Budget errors require intervention - context=context - ) - self.budget_type = budget_type - self.limit = limit - self.used = used + # Handle backward compatibility: old constructor BudgetExceededError(agent_name, total_cost, max_budget) + if (isinstance(message_or_agent_name, str) and + isinstance(total_cost_or_budget_type, (int, float)) and + isinstance(max_budget_or_limit, (int, float)) and + budget_type == "tokens"): # Default value indicates old constructor + + # Old constructor format + agent_name = message_or_agent_name + total_cost = float(total_cost_or_budget_type) + max_budget = float(max_budget_or_limit) + message = f"Agent '{agent_name}' exceeded budget: ${total_cost:.4f} >= ${max_budget:.4f}" + + context = context or {} + context.update({ + "budget_type": "cost", # Legacy errors are cost-based + "limit": max_budget, + "used": total_cost + }) + super().__init__( + message, + agent_id=agent_name, + run_id=run_id, + error_category="budget", + is_retryable=False, + context=context + ) + self.budget_type = "cost" + self.limit = max_budget + self.used = total_cost + self.agent_name = agent_name + self.total_cost = total_cost + self.max_budget = max_budget + else: + # New constructor format + message = str(message_or_agent_name) + if total_cost_or_budget_type is not None and isinstance(total_cost_or_budget_type, str): + budget_type = total_cost_or_budget_type + if max_budget_or_limit is not None: + if limit is None: + limit = max_budget_or_limit + + context = context or {} + context.update({ + "budget_type": budget_type, + "limit": limit, + "used": used + }) + super().__init__( + message, + agent_id=agent_id, + run_id=run_id, + error_category="budget", + is_retryable=False, + context=context + ) + self.budget_type = budget_type + self.limit = limit + self.used = used + + # Legacy attributes for backward compatibility + self.agent_name = agent_id + self.total_cost = used + self.max_budget = limit class ValidationError(PraisonAIError): @@ -252,6 +300,8 @@ def __init__(self, message: str, cycle_path: Optional[list] = None, **kwargs): super().__init__(message, **kwargs) if cycle_path: self.context["cycle_path"] = cycle_path + # Backward compatibility alias + self.chain = cycle_path class HandoffDepthError(HandoffError): @@ -263,6 +313,9 @@ def __init__(self, message: str, max_depth: Optional[int] = None, current_depth: "max_depth": max_depth, "current_depth": current_depth }) + # Backward compatibility aliases + self.max_depth = max_depth + self.depth = current_depth class HandoffTimeoutError(HandoffError): @@ -272,6 +325,8 @@ def __init__(self, message: str, timeout_seconds: Optional[float] = None, **kwar super().__init__(message, is_retryable=True, **kwargs) # Timeouts may be retryable if timeout_seconds: self.context["timeout_seconds"] = timeout_seconds + # Backward compatibility alias + self.timeout = timeout_seconds # Export all error types for easy importing diff --git a/src/praisonai/praisonai/agents_generator.py b/src/praisonai/praisonai/agents_generator.py index 49f9c91d7..f00097197 100644 --- a/src/praisonai/praisonai/agents_generator.py +++ b/src/praisonai/praisonai/agents_generator.py @@ -1201,23 +1201,25 @@ def _run_praisonai(self, config, topic, tools_dict): guardrails_config = details.get('guardrails') # Extract streaming configuration - YAML takes precedence over CLI + has_streaming_config = 'streaming' in details + has_legacy_stream = 'stream' in details streaming_config = details.get('streaming') stream_enabled = False stream_metrics = False - if streaming_config: + if has_streaming_config: if isinstance(streaming_config, bool): stream_enabled = streaming_config elif isinstance(streaming_config, dict): stream_enabled = streaming_config.get('enabled', False) stream_metrics = streaming_config.get('emit_metrics', False) # Future: can add callbacks, etc. from streaming_config - elif 'stream' in details: # Also support direct 'stream: true' format + elif has_legacy_stream: # Also support direct 'stream: true' format stream_enabled = details.get('stream', False) # CLI streaming flags override if YAML doesn't specify cli_config = getattr(self, 'cli_config', {}) or {} - if not streaming_config and not details.get('stream'): + if not has_streaming_config and not has_legacy_stream: stream_enabled = cli_config.get('stream', False) stream_metrics = cli_config.get('stream_metrics', False)