Comprehensive agent framework for the Document-Analyzer-Operator Platform supporting dynamic agent creation, registration, and execution.
The Agent Framework Core provides:
- Base Agent System: Abstract base classes with lifecycle management, state management, and task execution
- Agent Registry: Type registration, instance management, and capability-based discovery
- Agent Communication Protocol: Message format, routing, and event publishing/subscribing
- Agent Types: Pre-built agent categories (Cognitive, Content, Engineering, Programming, Operational, Validation)
- Dynamic Agent Creation: Factory pattern with templates and configuration validation
- Agent Orchestration: Task assignment, load balancing, and scaling
┌─────────────────────────────────────────────────────────────────┐
│ Agent Framework Core │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Base Agent │ │ Registry │ │ Orchestration │ │
│ │ │ │ │ │ │ │
│ │ - Lifecycle │ │ - Type Mgmt │ │ - Load Balance │ │
│ │ - State Mgmt │ │ - Discovery │ │ - Task Assign │ │
│ │ - Messaging │ │ - Health │ │ - Scaling │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ Agent Types │
├────────────┬────────────┬────────────┬────────────┬─────────────┤
│ Cognitive │ Content │ Engineering│ Programming│ Operational │
│ │ │ │ │ │
│ - Research │ - Architect│ - Analyst │ - Generator│ - Workflow │
│ - Doc Intel│ - Writer │ - Selector │ - Reviewer │ - Files │
│ - Synthesis│ - Editor │ - Debater │ - Debugger │ - Automation│
└────────────┴────────────┴────────────┴────────────┴─────────────┘
from app.agents.core.base import BaseAgent
from typing import Dict, Any
class MyCustomAgent(BaseAgent):
"""Custom agent implementation."""
async def _execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a task."""
# Your task execution logic here
return {"result": "success"}
def get_capabilities(self) -> Dict[str, Any]:
"""Get agent capabilities."""
return {
"category": "custom",
"skills": ["skill1", "skill2"],
}
# Usage
agent = MyCustomAgent("agent-1", "MyAgent", "custom")
await agent.initialize()
await agent.start()
result = await agent.execute({"type": "my_task", "data": "..."})from app.agents.registry.agent_registry import AgentRegistry
registry = AgentRegistry()
# Register agent type
registry.register_type(
type_name="my_agent",
agent_class=MyCustomAgent,
description="My custom agent",
capabilities={"skill1": "Description"},
)
# Create instance
agent = registry.create_instance(
type_name="my_agent",
agent_id="my-agent-1",
name="My Agent",
config={"key": "value"},
)
# Register instance
registry.register_instance(agent)from app.agents.orchestration.orchestrator import AgentOrchestrator
from app.agents.orchestration.task_assigner import Task
orchestrator = AgentOrchestrator()
await orchestrator.initialize()
# Register agent type
orchestrator.register_agent_type("my_agent", MyCustomAgent)
# Create and start agent
agent = orchestrator.create_agent("my_agent", name="MyAgent")
await orchestrator.start_agent(agent.agent_id)
# Submit task
task = Task(
type="my_task",
payload={"data": "..."},
timeout_seconds=60.0,
)
await orchestrator.submit_task(task)
# Shutdown
await orchestrator.shutdown()CREATED → INITIALIZED → STARTED → RUNNING → STOPPED → TERMINATED
↓
PAUSED
↓
RESUMED
| From State | To State | Method |
|---|---|---|
| CREATED | INITIALIZING | initialize() |
| INITIALIZING | IDLE | (auto on success) |
| IDLE | RUNNING | start() or execute() |
| RUNNING | PAUSED | pause() |
| PAUSED | RUNNING | resume() |
| RUNNING | STOPPED | stop() |
| STOPPED | TERMINATED | terminate() |
- RequestMessage: Request expecting a response
- ResponseMessage: Response to a request
- EventMessage: One-way event notification
- CommandMessage: Command to execute
- ResultMessage: Result of command execution
- ErrorMessage: Error notification
- CRITICAL (3): Processed immediately
- HIGH (2): High priority
- NORMAL (1): Default priority
- LOW (0): Low priority
# Send request
response = await agent.send_request(
receiver_id="target-agent",
payload={"query": "data"},
timeout_seconds=30.0,
)
# Send event
await agent.send_event(
event_name="task_completed",
payload={"task_id": "123"},
broadcast=True,
)
# Send command
result = await agent.send_command(
receiver_id="worker-agent",
command="process",
args=["arg1"],
kwargs={"key": "value"},
)For analysis and reasoning tasks:
- BaseCognitiveAgent: Base for cognitive agents
- ResearchAgent: Web research and information gathering
- DocumentIntelligenceAgent: Document parsing and analysis
- KnowledgeSynthesisAgent: Knowledge integration
For content creation tasks:
- BaseContentAgent: Base for content agents
- ContentArchitectAgent: Content structure planning
- WritingAgent: Content generation
- EditingAgent: Content refinement
For technical decision making:
- BaseEngineeringAgent: Base for engineering agents
- ArchitectureAnalystAgent: System analysis
- TechnologySelectorAgent: Tech stack selection
- DebateModeratorAgent: Technical debate coordination
For code-related tasks:
- BaseProgrammingAgent: Base for programming agents
- CodeGeneratorAgent: Code creation
- CodeReviewerAgent: Code analysis
- DebuggerAgent: Bug fixing
For task execution:
- BaseOperationalAgent: Base for operational agents
- WorkflowExecutorAgent: Workflow execution
- FileOperationsAgent: File management
- AutomationAgent: Task automation
For quality assurance:
- BaseValidationAgent: Base for validation agents
- OutputValidatorAgent: Output validation
- ConsistencyCheckerAgent: Consistency verification
- FactVerifierAgent: Fact checking
- ROUND_ROBIN: Distribute evenly across agents
- LEAST_BUSY: Send to agent with fewest tasks
- FASTEST: Send to agent with best performance
- CAPABILITY_BASED: Match task requirements to agent skills
- WEIGHTED: Distribute based on agent weights
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/agent-management/ |
List agents |
| GET | /api/v1/agent-management/{id} |
Get agent |
| GET | /api/v1/agent-management/{id}/info |
Get agent info |
| POST | /api/v1/agent-management/ |
Create agent |
| PATCH | /api/v1/agent-management/{id} |
Update agent |
| DELETE | /api/v1/agent-management/{id} |
Delete agent |
| POST | /api/v1/agent-management/{id}/start |
Start agent |
| POST | /api/v1/agent-management/{id}/stop |
Stop agent |
| GET | /api/v1/agent-management/{id}/metrics |
Get metrics |
| WS | /api/v1/agent-management/ws/{id} |
WebSocket updates |
Core agent instance data with relationships to sessions and metrics.
Agent type definitions with capabilities and default configurations.
Tracks agent execution sessions with context and metadata.
Stores performance metrics for monitoring and analysis.
Run tests with pytest:
cd backend
pytest tests/test_agents/ -vtest_base_agent.py: Base agent functionalitytest_registry.py: Agent registry operationstest_orchestrator.py: Orchestrator and load balancing
The agent framework integrates with the existing backend through:
- Database Models: Extended agent models with sessions and metrics
- API Routes: New
/agent-managementendpoints - Services:
AgentServicefor CRUD operations - WebSocket: Real-time agent status updates
- Events: Integration with existing event system
- Always initialize agents before starting them
- Handle errors gracefully using the built-in error hierarchy
- Use telemetry to monitor agent performance
- Implement proper cleanup in
_on_terminate() - Use async/await consistently throughout
- Register capabilities for capability-based routing
- Configure timeouts for long-running tasks
- Monitor health using the orchestrator's health checks
- Message persistence: Messages are not persisted by default
- Distributed agents: Framework assumes single-process deployment
- LLM integration: Agent implementations are placeholders for LLM integration
- Resource limits: No built-in resource quota management
- Add message persistence layer
- Implement distributed agent support
- Add LLM provider integration
- Implement resource quota management
- Add agent versioning and migration
- Implement agent templates library
- Add comprehensive monitoring dashboard
- Implement agent collaboration patterns
- Add retry policies configuration
- Implement circuit breaker pattern
backend/app/agents/
├── __init__.py
├── core/
│ ├── __init__.py
│ ├── base.py # Base agent class
│ ├── states.py # State management
│ ├── messages.py # Communication protocol
│ ├── errors.py # Error hierarchy
│ └── telemetry.py # Metrics and telemetry
├── registry/
│ ├── __init__.py
│ ├── agent_registry.py # Type and instance registry
│ └── agent_factory.py # Factory pattern
├── orchestration/
│ ├── __init__.py
│ ├── orchestrator.py # Main orchestrator
│ ├── load_balancer.py # Load balancing
│ └── task_assigner.py # Task assignment
└── types/
├── __init__.py
├── cognitive/ # Cognitive agents
├── content/ # Content agents
├── engineering/ # Engineering agents
├── programming/ # Programming agents
├── operational/ # Operational agents
└── validation/ # Validation agents
Part of the Document-Analyzer-Operator Platform.