Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ jobs:
# flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
cd src/praisonai && python -m pytest
cd src/praisonai && python tests/test_runner.py --pattern fast
144 changes: 1 addition & 143 deletions src/praisonai-agents/.cursorrules
Original file line number Diff line number Diff line change
Expand Up @@ -5,146 +5,4 @@
5. Make it minimal change as possible
6. Firstly try not to make any modification to the existing code as possible
7. Only modify the existing code if its highly required, without that if it cant be done, then add new code section.
8. If you are adding new code, make sure to add it in a way that it can be easily integrated with the existing codebase.


Below is a **detailed technical overview** of the issues that have been coming up in the workflow execution (specifically around loops, resetting tasks, and continuing on to subsequent tasks). It includes:

1. **How the workflow currently flows**
2. **What logic marks a task as “completed”**
3. **What issues arose in `loop` tasks**
4. **Why the workflow can end up “stuck” or “looping indefinitely”**

---

## 1. Overall Workflow Flow

### a. Building relationships among tasks

- At startup, the code iterates through all `tasks`.
- For each `task`:
- It looks at `task.next_tasks`, and for each `next_task_name`:
- Finds the corresponding `Task` object
- Appends the current task’s name to the found `next_task`'s `previous_tasks` list.
- This means if Task A has `next_tasks=["B"]`, then Task B’s `previous_tasks` will include `"A"`.

### b. Finding and starting with a “start task”

- The workflow code tries to locate a task with `is_start=True`.
- If no such task is found, it uses the first item in the tasks dictionary instead.
- That “start task” is what the workflow tries to run first.

### c. Execution loop in the method (e.g., `workflow()` or `aworkflow()`)

- There is a `while current_task:` loop that processes tasks in sequence, or conditionally, based on `task.condition`.
- Each time a task runs (if non-loop), it yields the `task_id` or triggers an agent to run. Once the agent finishes (with or without a final result), the workflow picks up again to see what to do next:
- If the `task` is a `loop` type, the code tries to create or manage sub-tasks for each row/line in an “input_file.”
- If the `task` is a normal (“decision” or “task” or “some-other-type”), it just executes once, sets `status="completed"`, and the code moves on.

### d. Condition-based branching

- If a task has a result that includes a “decision” (like `{"decision":"more"}` or `"done"`), the code checks `task.condition`. For example:
```python
condition = {
"more": "generate_task",
"done": "evaluate_total_questions"
}
```
- If the result’s decision is `"done"`, it jumps to the task named `"evaluate_total_questions"`.
- If the result’s decision is `"more"`, it jumps right back to `"generate_task"`.
- If no condition matches, it can fallback to the first item in `task.next_tasks`.

### e. Marking tasks as “completed”

- After a task’s execution (like a typical “non-loop” task), the code sets `task.status = "completed"`.
- Then the code has a snippet that says:
```python
if self.tasks[task_id].status == "completed":
# Possibly reset to "not started" so we can re-run if needed
```
- By default, the system tries to “reset” tasks to `"not started"`, **unless** it is a loop task or a subtask of a loop.

---

## 2. How a “completed” task is marked

Generally, tasks are marked `status="completed"` in two primary ways:

1. **Non-Loop Execution**
A normal task (like a “decision” or “task”) is executed once the code calls the agent, the agent returns a final result, and the system sets `task.status = "completed"`.
2. **Loop Execution**
A loop-type task is *programmatically* set to `status="completed"` when all of its sub-tasks have finished. That is:
- The code checks: “Have all child tasks of this loop finished?”
- If `True`, the loop task is set to `completed`.

---

## 3. Issues Specifically in `loop` Tasks

### a. Re-Entering the Loop

- Before, the same snippet that “resets completed tasks to ‘not started’ so they can re-run if needed” **also** tried to reset loop tasks or their subtasks.
- If a loop task got reset to `"not started"`, the code would eventually pick it back up again, leading to repeated creation of sub-tasks (or repeated attempts to re-run them).
- This caused an **infinite loop** or repeating the same steps in the workflow, never truly exiting the loop stage.

### b. Subtasks Not Marked or Overwritten

- Another tricky scenario: If sub-tasks themselves got reset, the parent loop would see them as “not started” again, and might wait for them to “complete,” or might re-run them. That can lead to indefinite re-running of sub-tasks.

### c. Not Proceeding to Next (e.g., “upload_to_huggingface”)

- If the loop kept “restarting,” the workflow never ended up hitting the next tasks. For example, if your workflow is:
1. `generate_task`
2. `evaluate_total_questions`
3. `generate_cot` (loop)
4. `upload_to_huggingface`
- The system might get stuck in step #3 indefinitely (the sub-tasks keep getting reset, so it never actually transitions to step #4).

### d. Condition Logic vs. Next Tasks

- Another subtlety: If the loop tasks had a `condition` that pointed them back to a prior step, it might cause unintentional re-entry. Typically, you only want loop tasks to proceed once, **unless** you explicitly want to re-visit. But if it’s a data ingestion process, you usually want to do it once, then move on to the next step.

---

## 4. Why the System Can End Up Stuck or “Looping Indefinitely”

1. **Reset Mechanism**
- The code tries to “reset tasks to ‘not started’ once they complete,” so they can be re-run in some dynamic multi-run scenario.
- But that same logic can cause loop tasks to revert back to “not started” the moment they end. The system sees “Oh, a task is ‘not started’? Let’s run it!” and you’re in a cycle.

2. **No Condition for Exit**
- If the loop has a condition that leads back to a prior step (like `"more" -> generate_cot`), it can keep re-running.

3. **Subtasks Not Marked**
- If the subtask or the loop tries to “reactivate” each other, it never exits.

---

## Summary of the “Core Problem”

1. We want to keep the resetting mechanism for **non-loop tasks** – because in some advanced workflows, we like re-running them from a different path or after some condition.
2. But we want **loop tasks** to remain `"completed"` once all sub-tasks are done, so the code can seamlessly proceed to the next major step.
3. Before the fix, loop tasks or their sub-tasks got reset. This triggered the system to re-enter the loop, re-run the sub-tasks, etc., causing an infinite loop and preventing the workflow from reaching tasks like “upload_to_huggingface.”

---

## Technical Highlights to Pass On

- **In the reset snippet**:
```python
if self.tasks[task_id].status == "completed":
# never reset if loop or subtask-of-loop
# else reset to "not started"
```
This is crucial to skipping re-runs on loop tasks.
- **Ensure** that once a loop’s sub-tasks are all “completed,” the loop’s status is set to “completed,” and it transitions to the next major tasks (like `upload_to_huggingface`).
- **Check** if the loop’s condition is correct. If you want a single pass, do not implement a condition that leads back to the same loop.
- Also, you can check you do not have “overlapping conditions” that cause re-entry.

---

### Conclusion

**Hence,** the main challenge is that the reset logic (meant to let normal tasks be re-run) conflicts with a loop task’s one-pass usage. Once you avoid resetting the loop tasks or sub-tasks, you can finish them once, mark them “completed,” and properly proceed to the next stage.

Dont remove any logging or debug statements, as it will help you to understand the flow of the code.
8. If you are adding new code, make sure to add it in a way that it can be easily integrated with the existing codebase.
237 changes: 237 additions & 0 deletions src/praisonai-agents/CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

PraisonAI Agents is a hierarchical AI agent framework for completing complex tasks with self-reflection capabilities. It supports multi-agent collaboration, tool integration, and various execution patterns (sequential, hierarchical, parallel).

## Development Commands

### Installation and Setup
```bash
# Install core package
pip install -e .

# Install with specific features
pip install -e .[all] # All features
pip install -e .[memory] # Memory capabilities
pip install -e .[knowledge] # Document processing
pip install -e .[mcp] # MCP server support
pip install -e .[llm] # Extended LLM support
pip install -e .[api] # API server capabilities
```

### Testing
```bash
# Run individual test examples (no formal test runner configured)
python tests/basic-agents.py
python tests/async_example.py
python tests/knowledge-agents.py

# Test specific features
python tests/mcp-agents.py # MCP integration
python tests/memory_example.py # Memory functionality
python tests/tools_example.py # Tool system
```

### Running Examples
```bash
# Basic agent usage
python tests/single-agent.py

# Multi-agent workflows
python tests/multi-agents-api.py

# Async operations
python tests/async_example_full.py

# MCP server examples
python tests/mcp-sse-direct-server.py # Start MCP server
python tests/mcp-sse-direct-client.py # Connect to server
```

## Core Architecture

### Agent System (`praisonaiagents/agent/`)
- **Agent**: Core agent class with LLM integration, tool calling, and self-reflection
- **ImageAgent**: Specialized multimodal agent for image processing
- Self-reflection with configurable min/max iterations (default: 1-3)
- Delegation support for hierarchical agent structures

### Multi-Agent Orchestration (`praisonaiagents/agents/`)
- **PraisonAIAgents**: Main orchestrator for managing multiple agents and tasks
- **AutoAgents**: Automatic agent creation and management
- Process types: `sequential`, `hierarchical`, `parallel`
- Context passing between agents and task dependency management

### Task System (`praisonaiagents/task/`)
- **Task**: Core task definition with context, callbacks, and output specifications
- Supports file output, JSON/Pydantic structured output, async execution
- Conditional logic with `condition` parameter for task flow control
- Context passing via `context` parameter for task dependencies
- **Guardrails**: Built-in validation and safety mechanisms for task outputs
- Function-based guardrails for custom validation logic
- LLM-based guardrails using natural language descriptions
- Automatic retry with configurable `max_retries` parameter
- Compatible with CrewAI guardrail patterns

### LLM Integration (`praisonaiagents/llm/`)
- Unified wrapper for multiple LLM providers via LiteLLM
- Supports OpenAI, Anthropic, Gemini, DeepSeek, local models (Ollama)
- Context length management and tool calling capabilities
- Set via `llm` parameter on agents or global `OPENAI_API_KEY`/`ANTHROPIC_API_KEY`

### Tool System (`praisonaiagents/tools/`)
Two implementation patterns:
1. **Function-based**: Simple tools using `@tool` decorator
2. **Class-based**: Complex tools inheriting from `BaseTool`

Built-in tools include: DuckDuckGo search, file operations, calculator, Wikipedia, arXiv, data analysis tools, shell execution.

### Memory & Knowledge Systems
- **Memory** (`praisonaiagents/memory/`): Multi-layered memory with RAG support
- Types: short-term, long-term, entity, user memory
- Providers: ChromaDB, Mem0, custom implementations
- **Knowledge** (`praisonaiagents/knowledge/`): Document processing with chunking
- Chunking strategies via `chonkie` library
- Embedding and retrieval capabilities

### MCP (Model Context Protocol) Integration
- **MCP Server**: Server-side tool protocol for distributed execution
- **SSE Support**: Server-sent events for real-time communication
- Tool discovery and dynamic registration

## Development Patterns

### Agent Creation
```python
agent = Agent(
name="Agent Name",
role="Agent Role",
goal="Agent Goal",
backstory="Agent Background",
llm="gpt-4o-mini", # or other LLM
self_reflect=True, # Enable self-reflection
min_reflect=1, # Minimum reflection iterations
max_reflect=3, # Maximum reflection iterations
tools=[tool1, tool2], # Optional tools
guardrail=validate_function, # Agent-level guardrail (function or string)
max_guardrail_retries=3 # Retry limit for guardrail failures
)
```

### Task Definition
```python
task = Task(
name="task_name",
description="Task description",
expected_output="Expected output format",
agent=agent,
context=[previous_task], # Task dependencies
output_pydantic=ResponseModel, # Structured output
condition="condition_function" # Conditional execution
)
```

### Guardrails Usage

#### Task-Level Guardrails
```python
from typing import Tuple, Any

# Function-based guardrail
def validate_output(task_output: TaskOutput) -> Tuple[bool, Any]:
"""Custom validation function."""
if "error" in task_output.raw.lower():
return False, "Output contains errors"
if len(task_output.raw) < 10:
return False, "Output is too short"
return True, task_output

task = Task(
description="Write a professional email",
expected_output="A well-formatted email",
agent=agent,
guardrail=validate_output, # Function-based guardrail
max_retries=3 # Retry up to 3 times if guardrail fails
)

# LLM-based guardrail
task = Task(
description="Generate marketing copy",
expected_output="Professional marketing content",
agent=agent,
guardrail="Ensure the content is professional, engaging, and free of errors", # String description
max_retries=2
)
```

#### Agent-Level Guardrails
```python
# Agent guardrails apply to ALL outputs from that agent
def validate_professional_tone(task_output: TaskOutput) -> Tuple[bool, Any]:
"""Ensure professional tone in all agent responses."""
content = task_output.raw.lower()
casual_words = ['yo', 'dude', 'awesome', 'cool']
for word in casual_words:
if word in content:
return False, f"Unprofessional language detected: {word}"
return True, task_output

# Agent with function-based guardrail
agent = Agent(
name="BusinessWriter",
instructions="You are a professional business writer",
guardrail=validate_professional_tone, # Function guardrail
max_guardrail_retries=3
)

# Agent with LLM-based guardrail
agent = Agent(
name="ContentWriter",
instructions="You are a content writer",
guardrail="Ensure all responses are professional, accurate, and appropriate for business use", # String guardrail
max_guardrail_retries=2
)
```

### Multi-Agent Workflow
```python
workflow = PraisonAIAgents(
agents=[agent1, agent2],
tasks=[task1, task2],
process="sequential", # or "hierarchical", "parallel"
verbose=True,
manager_agent=manager_agent # For hierarchical process
)
result = workflow.start()
```

### Async Support
All major components support async execution:
```python
result = await workflow.astart()
result = await agent.aexecute(task)
```

## Key Dependencies

- **Core**: `pydantic`, `rich`, `openai`, `mcp`
- **Memory**: `chromadb`, `mem0ai`
- **Knowledge**: `markitdown`, `chonkie`
- **LLM**: `litellm` for unified provider access
- **API**: `fastapi`, `uvicorn` for server capabilities

## Error Handling

- Global error logging via `error_logs` list
- Callback system for real-time error reporting
- Context length exception handling with automatic retry
- Graceful degradation for optional dependencies

## Testing Strategy

The project uses example-driven testing with 100+ test files in `tests/` directory. Each test file demonstrates specific usage patterns and serves as both test and documentation. Run individual examples to test functionality rather than using a formal test runner.

Use conda activate praisonai-agents to activate the environment.
Loading
Loading