Skip to content

Latest commit

Β 

History

History
758 lines (626 loc) Β· 26.8 KB

File metadata and controls

758 lines (626 loc) Β· 26.8 KB

What Happens When an Agent Requests a Task

Internal Systems Architecture Deep Dive

When an AI agent calls request_next_task("dev-001"), it triggers a sophisticated 8-stage orchestration involving 15+ interconnected systems that transforms a simple "give me work" request into an intelligent task assignment with contextual guidance, dependency awareness, risk analysis, and learning integration. This document explains the internal complexity behind Marcus's AI-powered task coordination.


🎯 The Complete Flow Overview

Agent Request β†’ Conversation Log β†’ State Refresh β†’ Safety Filtering β†’ AI Analysis β†’ Context Building β†’ Assignment Lease β†’ Instruction Generation
     ↓              ↓              ↓               ↓              ↓              ↓               ↓               ↓
 [Task Tool]   [Logging Sys]   [Project Mgmt]  [Phase Control] [AI Engine]   [Context Sys]  [Lease Mgmt]   [Memory Sys]
               [Event Sys]      [Kanban Integ]  [Safety Checks] [Task Match]   [Dependencies] [Persistence]  [Predictions]

Result: An intelligently selected task with comprehensive context, dependency awareness, risk predictions, time estimates, and adaptive instructions tailored to the specific agent and project state.


πŸ“‹ Stage 1: Request Intake & System Coordination

System: 21-agent-coordination.md (Agent Coordination) + 02-logging-system.md (Conversation Logging)

What Is a Task Request?

A task request isn't just "give me work" - it's an agent announcing availability and asking Marcus to make an optimal assignment decision based on current project state, agent capabilities, and intelligent coordination.

What Happens:

1. Conversation & Event Logging

Marcus logs this as a multi-directional conversation:

# Agent β†’ Marcus communication
conversation_logger.log_worker_message(
    agent_id="dev-001",
    direction="to_pm",
    message="Requesting next task",
    metadata={"worker_info": "Worker dev-001 requesting task"}
)

# System event for coordination
state.log_event(
    event_type="task_request",
    data={
        "worker_id": "dev-001",
        "source": "dev-001",
        "target": "marcus",
        "timestamp": "2025-09-05T14:30:00Z"
    }
)

# Visualization event for real-time monitoring
log_agent_event("task_request", {"worker_id": "dev-001"})

Why this triple logging exists:

  • Conversation Log: Tracks the communication flow for debugging coordination issues
  • System Event: Triggers other systems (monitoring, analytics, capacity planning)
  • Visualization Event: Updates real-time dashboards showing agent activity

2. Kanban System Initialization

Marcus ensures the project board is ready:

await state.initialize_kanban()  # Connect to Planka/Linear/GitHub Projects

What initialization does:

  • Connects to the configured Kanban provider (Planka, Linear, GitHub Projects)
  • Validates API credentials and board access
  • Syncs any external changes to the board since last check
  • Ensures Marcus has current task status and assignments

Data Created:

{
  "request_id": "req_2025_1452_dev001",
  "agent_id": "dev-001",
  "request_type": "task_request",
  "timestamp": "2025-09-05T14:30:00Z",
  "system_events": ["conversation_logged", "system_event_fired", "visualization_updated"],
  "kanban_status": "connected"
}

πŸ”„ Stage 2: Project State Analysis & Refresh

System: 16-project-management.md (Project Management) + 04-kanban-integration.md (Kanban Integration)

What Is Project State Refresh?

Before making assignment decisions, Marcus needs a current snapshot of the entire project: what tasks exist, what's completed, what's in progress, who's working on what, and what dependencies have changed.

What Happens:

1. Marcus Internal Reasoning

Marcus logs its own "thinking" process:

log_thinking("marcus", "Need to check current project state")

# Then analyzes the specific agent:
agent = state.agent_status.get("dev-001")
if agent:
    log_thinking(
        "marcus",
        f"Finding optimal task for {agent.name}",
        {
            "agent_skills": ["Python", "FastAPI", "PostgreSQL"],
            "current_workload": len(agent.current_tasks),  # How many tasks already assigned
            "performance_history": agent.performance_score
        }
    )

What "Marcus thinking" captures: These logs show the internal reasoning process that Marcus's AI goes through, so developers can understand WHY certain decisions were made later during debugging.

2. Project State Synchronization

await state.refresh_project_state()

What refresh_project_state() does:

  • Pulls latest task status from Kanban board (in case external changes happened)
  • Updates internal task cache with current assignments
  • Identifies any tasks that may have been completed outside Marcus
  • Checks for new tasks added externally
  • Validates that dependencies are still accurate
  • Updates project completion metrics

Internal State Updated:

project_state = {
    "total_tasks": 47,
    "completed_tasks": 23,
    "in_progress_tasks": 8,
    "blocked_tasks": 3,
    "available_tasks": 13,
    "active_agents": ["dev-001", "dev-002", "design-001"],
    "last_synced": "2025-09-05T14:30:15Z"
}

πŸ›‘οΈ Stage 3: Multi-Layer Safety Filtering

System: Custom Task Assignment Logic + 23-task-management-intelligence.md (Task Intelligence)

What Is Safety Filtering?

Marcus doesn't just assign any available task - it applies multiple intelligence layers to prevent illogical assignments that could break the project or waste agent time.

What Happens:

1. Assignment Conflict Prevention

# Get all currently assigned tasks to prevent duplicates
all_assigned_ids = set()
for other_agent_id, assignment in state.assignment_persistence.get_all_assignments().items():
    all_assigned_ids.add(assignment["task_id"])

# Filter out already assigned tasks
available_tasks = [
    task for task in state.project_tasks
    if task.status == TaskStatus.TODO and task.id not in all_assigned_ids
]

Why this prevents chaos: Without this check, multiple agents could work on the same task simultaneously, creating conflicts and wasted effort.

2. Phase-Based Constraint Enforcement

Marcus uses an Enhanced Task Classifier and Phase Enforcer to prevent illogical task ordering:

# Classify each task by type
task_type = classifier.classify(task)  # DESIGN, IMPLEMENTATION, TESTING, DEPLOYMENT
task_phase = phase_enforcer._get_task_phase(task_type)

# Check phase ordering within features
if task.labels and ip_task.labels:  # If tasks share feature labels
    shared_labels = set(task.labels) & set(ip_task.labels)
    if shared_labels:  # Same feature
        if phase_enforcer._should_depend_on_phase(task_phase, ip_phase):
            # This phase should wait for the in-progress phase to complete
            phase_allowed = False

Real example: If someone is working on "User Authentication API Implementation" (IMPLEMENTATION phase), Marcus won't assign "User Authentication Testing" (TESTING phase) to another agent until the implementation is done.

3. Deployment Task Deprioritization

Marcus actively avoids assigning deployment tasks unless no other work is available:

deployment_keywords = ["deploy", "release", "production", "launch", "rollout"]

# Separate tasks into deployment vs non-deployment
for task in available_tasks:
    is_deployment = any(
        keyword in task.name.lower() or keyword in task.labels
        for keyword in deployment_keywords
    )

    if is_deployment:
        deployment_tasks.append(task)
    else:
        non_deployment_tasks.append(task)

# Prefer non-deployment tasks
available_tasks = non_deployment_tasks if non_deployment_tasks else deployment_tasks

Why deployment avoidance: Deployment tasks are typically reserved for when most development is complete, and they require special coordination.

Safety Filter Results:

{
  "initial_tasks": 13,
  "after_assignment_check": 11,  # 2 already assigned to other agents
  "after_phase_filtering": 8,    # 3 blocked by phase constraints
  "after_deployment_filter": 7,  # 1 deployment task deprioritized
  "final_eligible_tasks": 7
}

🧠 Stage 4: AI-Powered Task Selection

System: 07-ai-intelligence-engine.md (AI Engine) + Custom AI Task Assignment Engine

What Is AI-Powered Task Selection?

Instead of simple skill matching, Marcus uses a 4-phase AI analysis to find the truly optimal task based on complex factors that humans would struggle to evaluate simultaneously.

What Happens:

Phase 1: Safety Validation

AI validates that tasks are logically safe to assign:

safe_tasks = await self._filter_safe_tasks(available_tasks)

# For deployment tasks, AI checks:
if self._is_deployment_task(task):
    if not await self._are_dependencies_complete(task):
        # AI analysis: "Dependencies incomplete"
        continue

    # Additional AI safety check
    safety_check = await ai_engine.check_deployment_safety(task, project_tasks)
    if not safety_check.get("safe", False):
        # AI reasoning: "Database migrations not tested"
        continue

Phase 2: Dependency Impact Analysis

AI analyzes how completing this task affects the rest of the project:

dependency_scores = await self._analyze_dependencies(safe_tasks)

for task in safe_tasks:
    # Count tasks this would unblock
    unblocked_count = sum(
        1 for other_task in project_tasks
        if task.id in other_task.dependencies and other_task.status == TaskStatus.TODO
    )

    # Check if task is on critical path
    critical_path = dependency_graph.get_critical_path()
    is_critical = task.id in critical_path

    # Calculate dependency score
    score = unblocked_count * 0.5 + (0.5 if is_critical else 0)

What this achieves: Prioritizes tasks that will unblock the most other work and keep the project moving forward efficiently.

Phase 3: Agent-Task Matching Intelligence

AI evaluates agent suitability for each task:

ai_scores = await self._get_ai_recommendations(safe_tasks, agent_info)

context = AssignmentContext(
    task=task,
    agent_id="dev-001",
    agent_status=agent_info,
    available_tasks=safe_tasks,
    project_context={
        "total_tasks": 47,
        "completed_tasks": 23,
        "project_phase": "development"  # vs planning, testing, deployment
    }
)

ai_analysis = await ai_engine.analyze_task_assignment(context)
score = ai_analysis.get("suitability_score", 0.5) * ai_analysis.get("confidence", 1.0)

AI evaluates:

  • Skill alignment with task requirements
  • Agent's current workload and capacity
  • Task complexity vs agent experience level
  • Project phase appropriateness
  • Historical performance on similar tasks

Phase 4: Predictive Impact Assessment

AI predicts the consequences of this assignment:

impact_scores = await self._predict_task_impact(safe_tasks)

# Predictions include:
# - Timeline impact if this task is delayed
# - Resource utilization optimization
# - Risk of blocking critical path
# - Effect on team productivity patterns

AI Decision Result:

{
  "selected_task": "task_015_user_auth_api",
  "ai_confidence": 0.87,
  "selection_reasoning": "High skill match, unblocks 3 frontend tasks, on critical path",
  "phase_scores": {
    "safety": 1.0,      # Completely safe to assign
    "dependency": 0.8,  # High impact on project flow
    "agent_match": 0.9, # Excellent skill alignment
    "impact": 0.7       # Good timeline optimization
  },
  "alternatives_considered": 6
}

πŸ“Š Stage 5: Context & Dependency Analysis

System: 03-context-dependency-system.md (Context & Dependency) + 42-code-analysis-system.md (Code Analysis)

What Is Context Building?

Marcus doesn't just assign a task - it provides rich contextual information to help the agent understand how their work fits into the bigger picture and what decisions will affect future tasks.

What Happens:

1. Implementation Context Gathering

For GitHub-integrated projects, Marcus analyzes existing code:

if state.provider == "github" and state.code_analyzer:
    owner = os.getenv("GITHUB_OWNER")
    repo = os.getenv("GITHUB_REPO")
    impl_details = await state.code_analyzer.get_implementation_details(
        optimal_task.dependencies, owner, repo
    )

What this discovers:

  • How similar features were implemented previously
  • Existing API patterns and interfaces to follow
  • Code structure and naming conventions
  • Integration points with existing systems

2. Dependency Relationship Analysis

Marcus builds a complete picture of how this task relates to others:

# Analyze dependencies across the entire project
dep_map = await state.context.analyze_dependencies(state.project_tasks)

# For each task that depends on the current task
for dep_task_id in dep_map[optimal_task.id]:
    dep_task = find_task_by_id(dep_task_id)

    # AI infers what the dependent task needs
    expected_interface = state.context.infer_needed_interface(dep_task, optimal_task.id)

    # Add to context
    state.context.add_dependency(optimal_task.id, DependentTask(
        task_id=dep_task.id,
        task_name=dep_task.name,
        expected_interface=expected_interface  # "REST API with /auth endpoints"
    ))

3. Dependency Awareness Generation

Marcus creates human-readable guidance about future impact:

if task_context.dependent_tasks:
    dep_count = len(task_context.dependent_tasks)
    dep_list = "\n".join([
        f"- {dt['task_name']} (needs: {dt['expected_interface']})"
        for dt in task_context.dependent_tasks[:3]  # Show first 3
    ])
    dependency_awareness = f"{dep_count} future tasks depend on your work:\n{dep_list}"

Context Data Structure:

context_data = {
    "previous_implementations": [
        {
            "file_path": "src/auth/oauth.py",
            "pattern": "OAuth2 implementation with JWT tokens",
            "interfaces": ["POST /auth/login", "GET /auth/verify"]
        }
    ],
    "dependent_tasks": [
        {
            "task_id": "task_020",
            "task_name": "Frontend Login Component",
            "expected_interface": "POST /auth/login endpoint with email/password"
        },
        {
            "task_id": "task_025",
            "task_name": "Mobile Authentication",
            "expected_interface": "JWT token response format"
        }
    ],
    "architectural_decisions": [
        {
            "decision": "Use JWT for stateless auth",
            "rationale": "Mobile apps need offline token validation",
            "impact": "All API endpoints must validate JWT"
        }
    ]
}

πŸ” Stage 6: Assignment Lease Creation

System: 35-assignment-lease-system.md (Assignment Lease System)

What Is an Assignment Lease?

An assignment lease is a time-bound contract between Marcus and the agent. It prevents tasks from getting "stuck" with unresponsive agents and provides automatic recovery mechanisms.

What Happens:

1. Adaptive Duration Calculation

Marcus calculates how long this agent should have to complete this task:

def calculate_adaptive_duration(self, task: Task) -> float:
    base_hours = 2.0  # Default lease time

    # Apply priority multiplier
    priority_mult = {
        "urgent": 0.5,   # Urgent tasks get shorter leases (faster escalation)
        "high": 0.8,
        "medium": 1.0,
        "low": 1.5       # Low priority gets more time
    }.get(task.priority.value.lower(), 1.0)

    base_hours *= priority_mult

    # Apply complexity multiplier based on labels
    if "complex" in task.labels:
        base_hours *= 2.0  # Complex tasks get more time
    if "simple" in task.labels:
        base_hours *= 0.5  # Simple tasks should be quick

    return max(0.5, min(base_hours, 24.0))  # Between 30 minutes and 24 hours

2. Lease Object Creation

assignment_lease = AssignmentLease(
    task_id="task_015_user_auth_api",
    agent_id="dev-001",
    created_at=datetime.now(),
    expires_at=datetime.now() + timedelta(hours=3.2),  # Calculated duration
    duration_hours=3.2,
    renewal_count=0,
    metadata={
        "task_complexity": "medium",
        "agent_performance": 1.0,
        "priority_factor": 0.8
    }
)

3. Background Monitoring Setup

Marcus starts monitoring this lease:

# LeaseMonitor checks every 60 seconds for:
# - Tasks approaching expiration (30 min warning)
# - Expired tasks needing recovery
# - Stuck tasks requiring intervention

Why leases matter:

  • Automatic Recovery: If an agent goes offline, their tasks get automatically reassigned
  • Progress Accountability: Agents must show progress or lose the assignment
  • Capacity Planning: Marcus knows exactly what's assigned and for how long
  • Quality Control: Tasks can't sit indefinitely without progress

Assignment Lease Data:

{
  "task_id": "task_015_user_auth_api",
  "agent_id": "dev-001",
  "created_at": "2025-09-05T14:30:45Z",
  "expires_at": "2025-09-05T17:49:45Z",
  "duration_hours": 3.2,
  "renewable": true,
  "warning_threshold": "2025-09-05T17:19:45Z",
  "monitoring_active": true
}

🧠 Stage 7: Memory Integration & Predictive Analysis

System: 01-memory-system.md (Multi-Tier Memory) + 17-learning-systems.md (Learning Systems)

What Is Memory Integration?

Marcus uses its four-tier memory system to learn from every task assignment and provide predictions about how this assignment will go.

What Happens:

1. Task Outcome Prediction

Marcus predicts how likely this assignment is to succeed:

basic_prediction = await state.memory.predict_task_outcome("dev-001", optimal_task)

# Based on:
# - Agent's historical performance on similar tasks
# - Task complexity vs agent skill level
# - Current project phase and pressure
# - Similar task outcomes in memory

prediction = {
    "success_probability": 0.85,  # 85% chance of successful completion
    "confidence": 0.92,           # 92% confident in this prediction
    "risk_factors": ["complex_integration", "tight_timeline"]
}

2. Completion Time Estimation

Marcus predicts how long this will actually take:

completion_time = await state.memory.predict_completion_time("dev-001", optimal_task)

time_estimate = {
    "expected_hours": 2.8,
    "confidence_interval": {"lower": 1.5, "upper": 4.2},
    "factors": ["authentication complexity", "API design decisions", "testing requirements"],
    "historical_basis": "Similar auth tasks took 2.1-3.4 hours for this agent"
}

3. Blockage Risk Analysis

Marcus predicts what might go wrong:

blockage_analysis = await state.memory.predict_blockage_probability("dev-001", optimal_task)

blockage_risk = {
    "overall_risk": 0.3,  # 30% chance of encountering blockers
    "risk_breakdown": {
        "technical_dependencies": 0.15,    # External API issues
        "requirement_clarity": 0.08,       # Unclear specifications
        "integration_complexity": 0.12     # Existing system conflicts
    },
    "preventive_measures": [
        "Review existing OAuth implementation patterns",
        "Validate API endpoints with frontend team early"
    ]
}

4. Cascade Effect Prediction

Marcus predicts how delays in this task would affect others:

cascade_effects = await state.memory.predict_cascade_effects(optimal_task.id, potential_delay=0.6)

cascade_impact = {
    "critical_path_impact": True,
    "affected_tasks": ["task_020_frontend_login", "task_025_mobile_auth"],
    "delay_multiplier": 1.3,  # 1 hour delay causes 1.3 hours total project delay
    "mitigation_options": ["Parallel frontend mockup development"]
}

5. Agent Performance Trajectory

Marcus analyzes how this assignment fits the agent's development:

performance_trajectory = await state.memory.calculate_agent_performance_trajectory("dev-001")

trajectory = {
    "improving_skills": {"Python": 0.15, "API_design": 0.23},  # Getting better
    "skill_trends": "Strong upward trend in backend development",
    "recommendations": ["Good opportunity to cement authentication expertise"]
}

6. Memory Recording

Marcus records this assignment for future learning:

await state.memory.record_task_start("dev-001", optimal_task)

# Updates:
# - Working Memory: Current agent assignments
# - Episodic Memory: This specific assignment event
# - Semantic Memory: Patterns about auth tasks
# - Procedural Memory: Assignment process effectiveness

Memory & Prediction Data:

{
  "predictions": {
    "success_probability": 0.85,
    "expected_completion_hours": 2.8,
    "blockage_risk": 0.3,
    "cascade_risk": "medium",
    "skill_development_opportunity": "high"
  },
  "memory_updates": {
    "working_memory": "agent_dev-001_assigned_task_015",
    "episodic_event": "task_assignment_2025_09_05_1430",
    "semantic_pattern": "auth_task_backend_developer",
    "procedural_reinforcement": "ai_assignment_success"
  }
}

πŸ“ Stage 8: Intelligent Instruction Generation

System: 07-ai-intelligence-engine.md (AI Engine) + Custom Tiered Instruction Builder

What Is Intelligent Instruction Generation?

Marcus doesn't just say "do this task" - it generates contextually adaptive instructions with 6 layers of intelligence based on the task complexity, dependencies, and agent needs.

What Happens:

1. Base Instruction Generation

Marcus uses AI to create task-specific instructions:

base_instructions = await state.ai_engine.generate_task_instructions(
    optimal_task,
    state.agent_status.get("dev-001")
)

# AI considers:
# - Task description and requirements
# - Agent's skill level and experience
# - Current project context and standards
# - Previous implementations and patterns

2. Tiered Instruction Building

Marcus builds 6 layers of increasingly sophisticated guidance:

Layer 1: Base Instructions (always included)

"Implement user authentication API endpoints using OAuth2 with JWT tokens..."

Layer 2: Implementation Context (if previous work exists)

"πŸ“š IMPLEMENTATION CONTEXT:
2 relevant implementations found. Use these patterns and interfaces to maintain consistency."

Layer 3: Dependency Awareness (if other tasks depend on this)

"πŸ”— DEPENDENCY AWARENESS:
3 future tasks depend on your work:
- Frontend Login Component (needs: POST /auth/login endpoint)
- Mobile Authentication (needs: JWT token response format)
- User Profile Service (needs: User ID extraction from JWT)"

Layer 4: Decision Logging (if task has high impact)

"πŸ“ ARCHITECTURAL DECISIONS:
This task has significant downstream impact. When making technical choices:
Use: 'Marcus, log decision: I chose [WHAT] because [WHY]. This affects [IMPACT].'
Example: 'I chose JWT tokens because mobile apps need stateless auth. This affects all API endpoints.'"

Layer 5: Predictions & Insights (if available)

"⚑ PREDICTIONS & INSIGHTS:
⏱️ Expected duration: 2.8 hours (1.5-4.2 hours)
⚠️ High blockage risk: 30%
   β€’ technical_dependencies: 15% chance
   β€’ integration_complexity: 12% chance
πŸ’‘ Prevention tips:
   β€’ Review existing OAuth implementation patterns
   β€’ Validate API endpoints with frontend team early
πŸ“ˆ You're improving in API_design - great opportunity to excel!"

Layer 6: Task-Specific Guidance (based on task labels)

"πŸ’‘ TASK-SPECIFIC GUIDANCE:
πŸ”’ Security Guidelines: Follow OWASP best practices, implement proper validation, use secure defaults
🌐 API Guidelines: Follow RESTful conventions, include proper error handling, document response formats"

Final Instruction Package:

{
  "task_assignment": {
    "task_id": "task_015_user_auth_api",
    "task_name": "User Authentication API",
    "instructions": "Implement user authentication...[6 layers of guidance]",
    "context_layers": 6,
    "estimated_reading_time": "3-5 minutes",
    "instruction_complexity": "comprehensive"
  },
  "assignment_metadata": {
    "lease_duration": "3.2 hours",
    "renewable": True,
    "progress_expected": "25% increments",
    "decision_logging_encouraged": True
  }
}

πŸ’Ύ Data Persistence Across Systems

What Gets Stored Where:

data/assignments/assignments.json          ← Active task assignments with leases
data/marcus_state/memory/                 ← Learning patterns from task assignments
data/marcus_state/context/                ← Task dependencies and relationships
data/audit_logs/                          ← Complete audit trail of assignment process
data/token_usage.json                     ← AI costs for assignment analysis

System State Changes:

  • Assignment Registry: 35-assignment-lease-system.md creates time-bound assignment
  • Agent Status: 21-agent-coordination.md updates agent's current_tasks list
  • Memory System: 01-memory-system.md records assignment for learning
  • Context System: 03-context-dependency-system.md tracks task relationships
  • Monitoring: 41-assignment-monitor.md begins tracking assignment health

πŸ”„ Why This Complexity Matters

Without This Orchestration:

  • Simple task queue: "Here's the next task in the list"
  • No coordination: Multiple agents working on conflicting tasks
  • No context: Agents making decisions in isolation
  • No learning: Same mistakes repeated on every project
  • No recovery: Tasks stuck forever if agents go offline

With Marcus:

  • AI-Powered Selection: Best task for this specific agent at this specific time
  • Conflict Prevention: Sophisticated safety checks prevent logical errors
  • Rich Context: Agents understand how their work affects the broader project
  • Predictive Intelligence: Risk analysis and time estimation based on learned patterns
  • Automatic Recovery: Time-bound leases with automatic reassignment
  • Continuous Learning: Every assignment improves future coordination

The Result:

A single request_next_task() call triggers sophisticated AI analysis, multi-system coordination, predictive modeling, contextual guidance generation, and learning integrationβ€”transforming a simple "give me work" request into an intelligent project coordination decision that optimizes for both immediate productivity and long-term project success.


🎯 Key Takeaway

Task assignment isn't just "match skills to work"β€”it's a sophisticated AI-powered coordination process involving safety analysis, dependency intelligence, contextual guidance, predictive modeling, time-bound accountability, and continuous learning. This is why Marcus can effectively coordinate complex multi-agent work: every task assignment is an intelligent decision that considers the agent, the project, the dependencies, the risks, and the learning opportunities all simultaneously.