The Agent Coordination System is the orchestration layer of Marcus that manages the lifecycle and task assignment of autonomous AI agents. It serves as the central nervous system for coordinating multiple AI workers, ensuring optimal task distribution, preventing conflicts, and maintaining system coherence across distributed work.
┌─────────────────────────────────────────────────────────────────┐
│ Agent Coordination System │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Agent Mgmt │ │ Task Assignment │ │ Assignment │ │
│ │ (register, │ │ (AI-powered │ │ Persistence │ │
│ │ status) │ │ matching) │ │ (durability) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Progress │ │ Blocker │ │ Assignment │ │
│ │ Tracking │ │ Resolution │ │ Monitoring │ │
│ │ (milestones) │ │ (AI suggestions)│ │ (health check) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Kanban Board │ │ AI Engine │ │ Memory System │
│ (planka/gh) │ │ (task match) │ │ (predictions) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
-
Agent Management (
src/marcus_mcp/tools/agent.py)- Agent registration and capability tracking
- Status monitoring and availability management
- Skill-based agent classification
-
Task Assignment Engine (
src/core/ai_powered_task_assignment.py)- AI-powered optimal task selection
- Multi-phase assignment algorithm
- Dependency-aware scheduling
-
Assignment Persistence (
src/core/assignment_persistence.py)- Atomic task assignment storage
- Cross-restart assignment recovery
- Conflict prevention mechanisms
-
Assignment Monitoring (
src/monitoring/assignment_monitor.py)- Real-time state reversion detection
- Health checking and reconciliation
- Assignment drift prevention
The Agent Coordination System sits at the intersection of several major subsystems:
- Above: MCP Server Layer (tool exposure)
- Peers: Context System, Memory System, AI Engine
- Below: Kanban Integration, Persistence Layer
- Integrates With: Communication Hub, Event System, Monitoring
Agent Request → MCP Tool → Coordination Engine → AI Analysis → Assignment Decision
↓ ↓ ↓ ↓ ↓
Registration → Capability → Task Matching → Optimization → Kanban Update
↓ ↓ ↓ ↓ ↓
Status Track → Progress Mon. → Context Inject → Memory Record → Persistence
-
Project Creation (
create_project)- Kanban board initialization
- Task decomposition and prioritization
- Dependency graph construction
-
Agent Registration (
register_agent)await register_agent( agent_id="dev-001", name="Alice Backend", role="Backend Developer", skills=["Python", "FastAPI", "PostgreSQL"] )
-
Task Request Loop (
request_next_task)- AI-powered task selection
- Context and dependency injection
- Implementation guidance generation
- Assignment persistence
-
Progress Reporting (
report_progress)- Milestone tracking (25%, 50%, 75%, 100%)
- Status synchronization with Kanban
- Memory system feedback
-
Blocker Resolution (
report_blocker)- AI-powered analysis and suggestions
- Severity-based escalation
- Alternative solution paths
-
Task Completion (
finish_task)- Code analysis (GitHub projects)
- Performance tracking
- Next task preparation
Unlike traditional task schedulers, Marcus uses multi-phase AI analysis:
Phase 1: Safety Filtering
- Prevents deployment before implementation
- Validates dependency completion
- Blocks unsafe task sequences
Phase 2: Dependency Analysis
- Critical path identification
- Unblocking task prioritization
- Cascade impact prediction
Phase 3: Agent-Task Matching
- Skill compatibility scoring
- Performance history analysis
- Context-aware recommendations
Phase 4: Predictive Impact
- Timeline optimization
- Risk mitigation assessment
- Resource allocation planning
The system builds context-aware instructions dynamically:
def build_tiered_instructions(
base_instructions: str,
task: Task,
context_data: Optional[Dict],
dependency_awareness: Optional[str],
predictions: Optional[Dict]
) -> str:
# Layer 1: Base instructions (always)
# Layer 2: Implementation context (if GitHub)
# Layer 3: Dependency awareness (if dependents exist)
# Layer 4: Decision logging (if high impact)
# Layer 5: Predictions and warnings (if Memory enabled)
# Layer 6: Task-specific guidance (based on labels)Persistent assignment tracking prevents:
- Double assignment across restarts
- Task state inconsistencies
- Agent confusion during system recovery
@dataclass
class WorkerStatus:
worker_id: str
name: str
role: str
email: Optional[str]
skills: List[str]
current_tasks: List[Task]
completed_tasks_count: int
capacity: int
performance_score: float
availability: Dict[str, bool]
@dataclass
class TaskAssignment:
task_id: str
task_name: str
description: str
instructions: str
assigned_to: str
assigned_at: datetime
estimated_hours: float
priority: Priority
dependencies: List[str]
due_date: Optional[datetime]
workspace_path: Optional[str]
forbidden_paths: List[str]find_optimal_task_for_agent is a method on the AITaskAssignmentEngine class in src/core/ai_powered_task_assignment.py.
# class AITaskAssignmentEngine (src/core/ai_powered_task_assignment.py)
async def find_optimal_task_for_agent(
self,
agent_id: str,
agent_info: Dict[str, Any],
available_tasks: List[Task],
assigned_task_ids: Set[str]
) -> Optional[Task]:
# Multi-phase scoring
safety_scores = await _filter_safe_tasks(tasks)
dependency_scores = await _analyze_dependencies(tasks)
ai_scores = await _get_ai_recommendations(tasks, agent_info)
impact_scores = await _predict_task_impact(tasks)
# Weighted combination
weights = {
"skill_match": 0.15,
"priority": 0.15,
"dependencies": 0.25,
"ai_recommendation": 0.30,
"impact": 0.15
}
return _select_best_task(tasks, scores, weights)class AssignmentMonitor:
"""Continuous monitoring for assignment consistency"""
async def _check_for_reversions(self):
# Detect state reversions
# Handle missing tasks
# Reconcile inconsistencies
# Alert on repeated issues- Intelligence: AI-powered matching vs. simple skill/priority scoring
- Durability: Persistent assignments survive system restarts
- Context-Aware: Integrates implementation history and dependencies
- Predictive: Forecasts completion times and potential blockers
- Self-Healing: Monitors and corrects assignment drift
- Scalable: Handles multiple agents and complex dependency graphs
Why AI-Powered Assignment?
- Traditional schedulers optimize for resource utilization
- Marcus optimizes for project success and agent development
- AI considers factors humans miss (performance trends, context patterns)
Why Persistent Assignments?
- Agents work across extended timeframes
- System restarts shouldn't lose work context
- Multiple Marcus instances need coordination
Why Real-Time Monitoring?
- External changes to Kanban boards create drift
- Human intervention can disrupt agent workflows
- Early detection prevents compound problems
- Direct skill matching
- Basic priority scoring
- Minimal context injection
- Full AI analysis pipeline
- Extensive context building
- Dependency cascade analysis
- Performance trajectory consideration
Task Complexity | Dependency Count | AI Analysis | Context Layers
─────────────────┼──────────────────┼─────────────┼───────────────
Simple | 0-1 | Basic | 1-2 layers
Moderate | 2-4 | Enhanced | 3-4 layers
Complex | 5+ | Full | 5-6 layers
- Real-time WebSocket updates
- Card status synchronization
- Comment-based progress tracking
- Label-based skill matching
- Issue-based task management
- Pull request workflow integration
- Code analysis for completed tasks
- Implementation context extraction
class KanbanInterface:
async def get_available_tasks(self) -> List[Task]
async def update_task(self, task_id: str, updates: Dict)
async def add_comment(self, task_id: str, comment: str)While the coordination system doesn't directly integrate with Cato (a philosophy AI), it provides the infrastructure for philosophical agents:
- Capability Registration: Cato agents register with specialized skills
- Task Type Routing: Philosophy-related tasks routed to appropriate agents
- Context Preservation: Maintains conversation context across philosophical discussions
- Progress Tracking: Monitors philosophical analysis milestones
- AI analysis adds latency to task assignment
- Multiple context lookups for complex tasks
- Synchronous assignment lock prevents parallelism
- Linear search through available tasks
- In-memory state doesn't scale beyond ~100 agents
- No agent load balancing across Marcus instances
- Agent skill learning from task outcomes
- Dynamic agent spawning based on workload
- Cross-project agent sharing
- Agent specialization recommendations
Phase 1: Performance Optimization
- Async assignment pipeline
- Task indexing and filtering
- Agent pool management
Phase 2: Advanced Intelligence
- Machine learning for assignment optimization
- Agent performance prediction
- Dynamic skill inference
Phase 3: Distributed Coordination
- Multi-instance agent coordination
- Cross-project agent migration
- Federated assignment consensus
Phase 4: Autonomous Management
- Self-organizing agent teams
- Automatic agent provisioning
- Adaptive assignment algorithms
The Agent Coordination System will evolve from a centralized scheduler to a distributed mesh of intelligent agents that self-organize around project goals, learn from each other's experiences, and adapt their coordination patterns to optimize for both individual growth and collective success.
- Assignment Latency: 200-500ms (with AI analysis)
- State Consistency: 99.9% (with monitoring)
- Recovery Time: < 30 seconds (after restart)
- Throughput: 50+ concurrent assignments
- Assignment Conflicts: < 0.1% (with persistence)
- State Drift: < 1% (with monitoring)
- AI Analysis Failures: < 5% (with fallback)
The Agent Coordination System represents Marcus's commitment to intelligent, context-aware task management that treats AI agents as sophisticated workers capable of growth, learning, and autonomous decision-making within a structured coordination framework.