The Marcus logging system is a comprehensive, structured logging infrastructure designed to capture all conversations, decisions, and interactions between Workers, Marcus (PM), and the Kanban Board components. It consists of two primary modules that work in tandem to provide both detailed conversation tracking and lightweight event logging.
-
ConversationLogger (
src/logging/conversation_logger.py)- Full-featured structured logging system
- JSON-based log storage with automatic rotation
- Hierarchical conversation categorization
- Rich metadata capture and analysis capabilities
-
Agent Events Logger (
src/logging/agent_events.py)- Lightweight event logging without dependencies
- Fast, safe logging for core Marcus operations
- Simple JSON Lines format for event storage
- Designed for visualization pipeline integration
The system categorizes all interactions into distinct conversation types:
class ConversationType(Enum):
WORKER_TO_PM = "worker_to_pm" # Worker → Marcus communications
PM_TO_WORKER = "pm_to_worker" # Marcus → Worker communications
PM_TO_KANBAN = "pm_to_kanban" # Marcus → Kanban interactions
KANBAN_TO_PM = "kanban_to_pm" # Kanban → Marcus notifications
INTERNAL_THINKING = "internal_thinking" # Internal reasoning processes
DECISION = "decision" # Formal decisions with rationale
ERROR = "error" # Error conditions and failuresWhen an agent registers via register_agent:
# Step 1: Log incoming request
conversation_logger.log_worker_message(agent_id, "to_pm", message, metadata)
# Step 2: Log Marcus thinking process
log_thinking("marcus", thought, context)
# Step 3: Log decision with confidence
conversation_logger.log_pm_decision(decision, rationale, confidence_score)
# Step 4: Log event for visualization
log_agent_event("worker_registration", event_data)
# Step 5: Log response
conversation_logger.log_worker_message(agent_id, "from_pm", response)When an agent requests a task via request_next_task:
# Log request
conversation_logger.log_worker_message(agent_id, "to_pm", "Requesting task")
# Log thinking about state
log_thinking("marcus", "Need to check current project state")
# Log task analysis
log_thinking("marcus", f"Finding optimal task for {agent.name}", context)
# Log assignment decision
conversation_logger.log_task_assignment(task_id, worker_id, details, score)
# Log kanban interaction
conversation_logger.log_kanban_interaction("move_task", "to_kanban", data)When workers report progress via report_task_progress:
# Log progress update
conversation_logger.log_progress_update(
worker_id, task_id, progress, status, message, metrics
)
# Log kanban synchronization
conversation_logger.log_kanban_interaction("update_task", "to_kanban", data)
# Log system state snapshot
conversation_logger.log_system_state(
active_workers, tasks_in_progress, tasks_completed, tasks_blocked, metrics
)When workers encounter blockers via report_blocker:
# Log blocker with severity
conversation_logger.log_blocker(
worker_id, task_id, description, severity,
suggested_solutions, resolution_attempts
)
# Log Marcus analysis
log_thinking("marcus", "Analyzing blocker impact", context)
# Log resolution decision
conversation_logger.log_pm_decision(resolution, rationale, alternatives)The ConversationLogger uses structlog with comprehensive processors:
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)- Conversation logs:
logs/conversations/conversations_{timestamp}.jsonl - Decision logs:
logs/conversations/decisions_{timestamp}.jsonl - Agent events:
logs/agent_events/agent_events_{date}.jsonl
- DEBUG: Internal thinking, reasoning processes
- INFO: Normal operations, task assignments, progress
- WARNING: Blockers, degraded performance
- ERROR: System failures, critical issues
Each log entry includes comprehensive metadata:
- Timestamps (ISO format)
- Participant identifiers
- Conversation types
- Structured context data
- Performance metrics
- Decision factors and confidence scores
The system captures detailed decision-making processes:
log_pm_decision(
decision="Assign critical task to worker_senior_1",
rationale="Worker has security expertise and availability",
alternatives_considered=[{
"option": "Assign to junior",
"score": 0.4,
"reason_rejected": "Task criticality requires experience"
}],
confidence_score=0.85,
decision_factors={
"skill_match": 0.95,
"availability": 0.80,
"task_criticality": "high"
}
)Progress updates include detailed performance data:
metrics={
"time_spent_hours": 12,
"estimated_remaining_hours": 6,
"code_lines_added": 450,
"tests_written": 8,
"test_coverage": 85,
"performance_improvement_percent": 40
}Comprehensive blocker tracking with resolution context:
resolution_attempts=[{
"attempt": "Checked API status",
"timestamp": "2024-01-15T14:30:00Z",
"outcome": "No reported issues",
"time_spent": 0.5,
"lessons_learned": "Status page not always current"
}]Regular snapshots of system health:
system_metrics={
"cpu_utilization": 0.65,
"memory_usage_gb": 12.3,
"avg_task_completion_time_hours": 4.2,
"worker_efficiency": 0.87,
"error_rate": 0.02,
"tasks_per_hour": 12.3
}- Comprehensive Coverage: Captures all system interactions with rich context
- Structured Format: JSON formatting enables easy parsing and analysis
- Performance Tracking: Detailed metrics enable optimization
- Decision Auditing: Full decision tracking with rationale and alternatives
- Visualization Support: Direct integration with visualization pipeline
- Error Resilience: Logging failures don't break core functionality
- Automatic Rotation: Timestamp-based file organization prevents log bloat
- Storage Overhead: Comprehensive logging generates significant data
- Performance Impact: Structured logging adds processing overhead
- Complexity: Multiple log types and processors increase system complexity
- No Built-in Cleanup: Requires external log rotation management
- Memory Usage: In-memory structlog configuration consumes resources
-
Observability First: Marcus is an AI-powered system requiring deep observability for debugging and optimization
-
AI Training Data: Logged conversations and decisions provide valuable training data for improving Marcus' algorithms
-
Compliance & Auditing: Detailed decision tracking enables compliance with transparency requirements
-
Performance Analysis: Rich metrics enable identification of bottlenecks and optimization opportunities
-
Visualization Requirements: Structured logging directly feeds the visualization pipeline for real-time monitoring
-
Debugging Capabilities: Comprehensive context capture enables rapid issue identification and resolution
- Log Aggregation: Implement centralized log aggregation (ELK stack or similar)
- Real-time Streaming: Add WebSocket support for live log streaming
- Compression: Implement log compression for older entries
- Retention Policies: Automated cleanup based on age and importance
- ML-based Analysis: Use logged data to train Marcus' decision-making
- Predictive Analytics: Identify patterns to predict blockers and delays
- Distributed Logging: Support for multi-node Marcus deployments
- Custom Dashboards: Task-specific logging views and analytics
For simple tasks, the logging system:
- Captures basic assignment and completion events
- Minimal metadata collection
- Lower frequency of progress updates
- Streamlined decision logging
For complex tasks, the system provides:
- Detailed dependency tracking and analysis
- Frequent progress updates with granular metrics
- Comprehensive blocker documentation
- Multi-step decision processes with alternatives
- Performance trajectory analysis
The logging system adapts to different board types:
- Basic column transitions logged
- Simple task state changes
- Standard progress metrics
- Integration with code analysis results
- Commit and PR linking in logs
- Implementation context capture
- Prediction accuracy tracking
- Risk assessment logging
- Pattern recognition results
While not directly integrated with Cato, the logging system provides:
- Decision History: Cato can analyze logged decisions for pattern extraction
- Performance Data: Metrics feed into Cato's optimization algorithms
- Learning Material: Conversation logs provide training data for Cato's models
- Feedback Loops: Logged outcomes enable Cato to refine predictions
Here's how logging works through a complete task lifecycle:
- create_project → Logs project initialization, configuration decisions
- register_agent → Logs agent capabilities, role assignment rationale
- request_next_task → Logs task analysis, skill matching, assignment decision
- report_progress → Logs completion percentage, performance metrics, status changes
- report_blocker → Logs blocker details, severity assessment, resolution attempts
- finish_task → Logs completion metrics, quality assessment, lessons learned
Each step generates multiple log entries across different categories, creating a complete audit trail of the project execution.
The Marcus logging system is a sophisticated infrastructure that goes beyond simple event recording. It provides deep insights into system behavior, enables performance optimization, and creates a foundation for continuous improvement through data-driven analysis. Its structured approach and comprehensive coverage make it an essential component of the Marcus ecosystem, supporting both operational excellence and future AI enhancements.