---
description: Anthropic Stack Optimization Plan - Empathy Framework: **Version:** 1.0 **Created:** January 16, 2026 **Owner:** Engineering Team **Status:** Planning Phase **T
---
Version: 1.0 Created: January 16, 2026 Owner: Engineering Team Status: Planning Phase Timeline: 3-4 weeks (estimated)
This plan outlines optimizations to fully leverage Anthropic's capabilities in the Empathy Framework. While the framework already uses Anthropic as the default provider, several advanced features remain underutilized.
Current State:
- ✅ Anthropic is default provider
- ✅ Latest Claude models (Sonnet 4.5, Opus 4.5, Haiku 3.5)
- ✅ Official Anthropic SDK integrated
- ✅ Task-based tier routing
Goals:
- 💰 Reduce API costs by 30-50% through Batch API and prompt caching
- ⚡ Improve response quality with Thinking mode
- 🎨 Enable vision-based workflows
- 📊 Achieve precise cost tracking with accurate token counting
- 🔄 Add real-time streaming for better UX
Expected Impact:
- Cost Reduction: 30-50% for batch-eligible tasks
- Quality Improvement: 15-20% better reasoning on complex tasks
- New Capabilities: Vision analysis, real-time streaming
- Operational Excellence: Precise cost attribution per workflow
| Track | Priority | Effort | Impact | Timeline |
|---|---|---|---|---|
| 1. Batch API Integration | HIGH | Medium | 50% cost reduction | Week 1-2 |
| 2. Prompt Caching | HIGH | Low | 20-30% cost reduction | Week 1 |
| 3. Thinking Mode | MEDIUM | Low | Quality improvement | Week 2 |
| 4. Precise Token Counting | HIGH | Low | Better tracking | Week 1 |
| 5. Vision Workflows | MEDIUM | High | New capability | Week 3-4 |
| 6. Streaming Support | MEDIUM | Medium | Better UX | Week 2-3 |
| 7. Max Tokens Optimization | LOW | Low | Minor savings | Week 4 |
Enable Anthropic's Batch API for non-urgent tasks to achieve 50% cost reduction.
Anthropic's Batch API (announced 2024) processes requests asynchronously with:
- 50% cost reduction compared to real-time API
- 24-hour processing window (not suitable for interactive workflows)
- Same model quality as real-time API
- Bulk processing for analytics, data processing, batch evaluations
File: empathy_llm_toolkit/providers.py
Add new class:
from anthropic import Anthropic
from anthropic.types import BatchCreateParams, BatchStatus
from typing import List, Dict, Any
import asyncio
from datetime import datetime, timedelta
class AnthropicBatchProvider:
"""Provider for Anthropic Batch API (50% cost reduction)."""
def __init__(self, api_key: str | None = None):
"""Initialize batch provider.
Args:
api_key: Anthropic API key (defaults to ANTHROPIC_API_KEY env var)
"""
self.client = Anthropic(api_key=api_key)
self._batch_jobs: Dict[str, BatchStatus] = {}
def create_batch(
self,
requests: List[Dict[str, Any]],
job_id: str | None = None
) -> str:
"""Create a batch job.
Args:
requests: List of request dicts with 'model', 'messages', etc.
job_id: Optional job identifier for tracking
Returns:
Batch job ID for polling status
Example:
>>> requests = [
... {
... "model": "claude-sonnet-4-5",
... "messages": [{"role": "user", "content": "Analyze X"}],
... "max_tokens": 1024
... }
... ]
>>> job_id = provider.create_batch(requests)
"""
batch = self.client.batches.create(
requests=requests
)
self._batch_jobs[batch.id] = batch
return batch.id
def get_batch_status(self, batch_id: str) -> BatchStatus:
"""Get status of batch job.
Returns:
BatchStatus with status in ['processing', 'completed', 'failed']
"""
batch = self.client.batches.retrieve(batch_id)
self._batch_jobs[batch_id] = batch
return batch
def get_batch_results(self, batch_id: str) -> List[Dict[str, Any]]:
"""Get results from completed batch.
Args:
batch_id: Batch job ID
Returns:
List of result dicts matching input order
Raises:
ValueError: If batch is not completed
"""
status = self.get_batch_status(batch_id)
if status.status != "completed":
raise ValueError(
f"Batch {batch_id} not completed (status: {status.status})"
)
results = self.client.batches.results(batch_id)
return list(results)
async def wait_for_batch(
self,
batch_id: str,
poll_interval: int = 60,
timeout: int = 86400 # 24 hours
) -> List[Dict[str, Any]]:
"""Wait for batch to complete with polling.
Args:
batch_id: Batch job ID
poll_interval: Seconds between status checks
timeout: Maximum wait time in seconds
Returns:
Batch results when completed
Raises:
TimeoutError: If batch doesn't complete within timeout
"""
start_time = datetime.now()
while True:
status = self.get_batch_status(batch_id)
if status.status == "completed":
return self.get_batch_results(batch_id)
if status.status == "failed":
raise RuntimeError(f"Batch {batch_id} failed: {status.error}")
# Check timeout
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > timeout:
raise TimeoutError(
f"Batch {batch_id} did not complete within {timeout}s"
)
# Wait before next poll
await asyncio.sleep(poll_interval)Add to existing AnthropicProvider class:
class AnthropicProvider:
# ... existing code ...
def __init__(self, api_key: str | None = None, use_batch: bool = False):
"""Initialize provider.
Args:
api_key: Anthropic API key
use_batch: If True, use batch API for non-urgent requests
"""
self.client = Anthropic(api_key=api_key)
self.use_batch = use_batch
if use_batch:
self.batch_provider = AnthropicBatchProvider(api_key=api_key)
else:
self.batch_provider = NoneFile: src/empathy_os/models/tasks.py
Add new constant:
# Tasks eligible for batch processing (non-interactive, non-urgent)
BATCH_ELIGIBLE_TASKS = {
# Analytics & Reporting
"analyze_logs",
"generate_report",
"compute_metrics",
"aggregate_stats",
# Data Processing
"classify_bulk",
"extract_bulk",
"transform_bulk",
"validate_bulk",
# Code Analysis
"analyze_codebase",
"detect_patterns",
"compute_complexity",
"find_vulnerabilities",
# Content Generation (non-urgent)
"generate_docs",
"generate_tests",
"generate_comments",
"translate_bulk",
# Evaluation & Testing
"evaluate_responses",
"run_test_suite",
"validate_outputs",
}
# Tasks requiring real-time response
REALTIME_REQUIRED_TASKS = {
"chat",
"interactive_debug",
"live_coding",
"user_query",
"wizard_step",
}File: src/empathy_os/workflows/batch_processing.py (NEW)
"""Batch processing workflow using Anthropic Batch API."""
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import logging
from empathy_os.workflows.base import BaseWorkflow
from empathy_llm_toolkit.providers import AnthropicBatchProvider
from empathy_os.models import get_model
logger = logging.getLogger(__name__)
@dataclass
class BatchRequest:
"""Single request in a batch."""
task_id: str
task_type: str
input_data: Dict[str, Any]
model_tier: str = "capable" # cheap, capable, premium
@dataclass
class BatchResult:
"""Result from batch processing."""
task_id: str
success: bool
output: Dict[str, Any] | None = None
error: str | None = None
class BatchProcessingWorkflow(BaseWorkflow):
"""Process multiple tasks via Anthropic Batch API (50% cost savings)."""
def __init__(self, api_key: str | None = None):
"""Initialize batch workflow.
Args:
api_key: Anthropic API key (optional, uses env var)
"""
super().__init__()
self.batch_provider = AnthropicBatchProvider(api_key=api_key)
async def execute_batch(
self,
requests: List[BatchRequest],
poll_interval: int = 300, # 5 minutes
timeout: int = 86400 # 24 hours
) -> List[BatchResult]:
"""Execute batch of requests.
Args:
requests: List of batch requests
poll_interval: Seconds between status checks
timeout: Maximum wait time
Returns:
List of results matching input order
Example:
>>> workflow = BatchProcessingWorkflow()
>>> requests = [
... BatchRequest(
... task_id="task_1",
... task_type="analyze_logs",
... input_data={"logs": "..."}
... )
... ]
>>> results = await workflow.execute_batch(requests)
"""
# Convert to Anthropic batch format
api_requests = []
for req in requests:
model = get_model("anthropic", req.model_tier)
api_requests.append({
"custom_id": req.task_id,
"model": model.id,
"messages": self._format_messages(req),
"max_tokens": 4096,
})
# Submit batch
logger.info(f"Submitting batch of {len(requests)} requests")
batch_id = self.batch_provider.create_batch(api_requests)
logger.info(f"Batch {batch_id} created, waiting for completion...")
# Wait for completion
try:
raw_results = await self.batch_provider.wait_for_batch(
batch_id,
poll_interval=poll_interval,
timeout=timeout
)
except TimeoutError:
logger.error(f"Batch {batch_id} timed out after {timeout}s")
return [
BatchResult(
task_id=req.task_id,
success=False,
error="Batch processing timed out"
)
for req in requests
]
# Parse results
results = []
for raw in raw_results:
task_id = raw["custom_id"]
if "error" in raw:
results.append(BatchResult(
task_id=task_id,
success=False,
error=raw["error"]["message"]
))
else:
results.append(BatchResult(
task_id=task_id,
success=True,
output=raw["response"]
))
logger.info(
f"Batch {batch_id} completed: "
f"{sum(r.success for r in results)}/{len(results)} successful"
)
return results
def _format_messages(self, request: BatchRequest) -> List[Dict[str, str]]:
"""Format request into Anthropic messages format."""
# Task-specific formatting
task_prompts = {
"analyze_logs": "Analyze the following logs and identify issues:\n\n{logs}",
"generate_report": "Generate a report based on:\n\n{data}",
"classify_bulk": "Classify the following items:\n\n{items}",
# ... more task types
}
prompt = task_prompts.get(
request.task_type,
"Process the following:\n\n{input}"
)
# Format with input data
content = prompt.format(**request.input_data)
return [{"role": "user", "content": content}]File: src/empathy_os/cli.py
Add new command:
@app.command()
def batch(
task_type: str = typer.Option(..., help="Task type (analyze_logs, generate_docs, etc.)"),
input_file: str = typer.Option(..., help="JSON file with input data"),
output_file: str = typer.Option("batch_results.json", help="Output file"),
model_tier: str = typer.Option("capable", help="Model tier (cheap/capable/premium)"),
poll_interval: int = typer.Option(300, help="Status check interval (seconds)"),
):
"""Process tasks via Anthropic Batch API (50% cost savings).
Input file format:
[
{"task_id": "1", "input_data": {"logs": "..."}},
{"task_id": "2", "input_data": {"logs": "..."}}
]
"""
import json
import asyncio
from empathy_os.workflows.batch_processing import (
BatchProcessingWorkflow,
BatchRequest,
)
# Load input
with open(input_file) as f:
inputs = json.load(f)
# Create requests
requests = [
BatchRequest(
task_id=item["task_id"],
task_type=task_type,
input_data=item["input_data"],
model_tier=model_tier,
)
for item in inputs
]
console.print(f"[cyan]Submitting {len(requests)} tasks to batch API...[/cyan]")
console.print(f"[yellow]Note: Batch API processes within 24 hours (50% cost savings)[/yellow]")
# Execute
workflow = BatchProcessingWorkflow()
results = asyncio.run(workflow.execute_batch(requests, poll_interval=poll_interval))
# Save results
output_data = [
{
"task_id": r.task_id,
"success": r.success,
"output": r.output,
"error": r.error,
}
for r in results
]
with open(output_file, "w") as f:
json.dump(output_data, f, indent=2)
# Summary
successes = sum(r.success for r in results)
console.print(f"\n[green]✓ Batch complete: {successes}/{len(results)} successful[/green]")
console.print(f"[cyan]Results saved to: {output_file}[/cyan]")File: tests/unit/providers/test_batch_api.py (NEW)
"""Tests for Anthropic Batch API integration."""
import pytest
from unittest.mock import Mock, patch
from empathy_llm_toolkit.providers import AnthropicBatchProvider
class TestBatchProvider:
"""Test suite for batch API provider."""
@patch("anthropic.Anthropic")
def test_create_batch(self, mock_anthropic):
"""Test creating a batch job."""
# Mock response
mock_batch = Mock()
mock_batch.id = "batch_123"
mock_anthropic.return_value.batches.create.return_value = mock_batch
provider = AnthropicBatchProvider(api_key="test-key")
requests = [
{
"model": "claude-sonnet-4-5",
"messages": [{"role": "user", "content": "Test"}],
"max_tokens": 1024
}
]
batch_id = provider.create_batch(requests)
assert batch_id == "batch_123"
mock_anthropic.return_value.batches.create.assert_called_once()
@patch("anthropic.Anthropic")
def test_get_batch_status(self, mock_anthropic):
"""Test checking batch status."""
mock_batch = Mock()
mock_batch.status = "completed"
mock_anthropic.return_value.batches.retrieve.return_value = mock_batch
provider = AnthropicBatchProvider(api_key="test-key")
status = provider.get_batch_status("batch_123")
assert status.status == "completed"
@patch("anthropic.Anthropic")
def test_get_batch_results_completed(self, mock_anthropic):
"""Test getting results from completed batch."""
mock_batch = Mock()
mock_batch.status = "completed"
mock_anthropic.return_value.batches.retrieve.return_value = mock_batch
mock_anthropic.return_value.batches.results.return_value = [
{"custom_id": "1", "response": {"content": "Result 1"}}
]
provider = AnthropicBatchProvider(api_key="test-key")
results = provider.get_batch_results("batch_123")
assert len(results) == 1
assert results[0]["custom_id"] == "1"
@patch("anthropic.Anthropic")
def test_get_batch_results_not_completed_raises(self, mock_anthropic):
"""Test that getting results from incomplete batch raises error."""
mock_batch = Mock()
mock_batch.status = "processing"
mock_anthropic.return_value.batches.retrieve.return_value = mock_batch
provider = AnthropicBatchProvider(api_key="test-key")
with pytest.raises(ValueError, match="not completed"):
provider.get_batch_results("batch_123")Add to README.md:
### Batch Processing (50% Cost Savings)
For non-urgent tasks, use the Batch API to save 50% on API costs:
```bash
# Create input file
cat > batch_input.json <<EOF
[
{"task_id": "log_1", "input_data": {"logs": "ERROR: Connection failed..."}},
{"task_id": "log_2", "input_data": {"logs": "WARNING: High memory usage..."}}
]
EOF
# Submit batch job
empathy batch \
--task-type analyze_logs \
--input-file batch_input.json \
--output-file results.json \
--model-tier capableNote: Batch API processes within 24 hours. Not suitable for interactive workflows.
### Success Criteria
- [ ] Batch API client implemented and tested
- [ ] Batch workflow created with async polling
- [ ] CLI command added for batch processing
- [ ] Unit tests achieve 80%+ coverage
- [ ] Documentation with usage examples
- [ ] Successfully process 100+ task batch
- [ ] Verify 50% cost reduction via telemetry
### Estimated Impact
- **Cost Reduction:** 50% for batch-eligible tasks
- **Throughput:** 100x for bulk operations
- **Use Cases:** Log analysis, bulk docs generation, codebase scans
---
## 💾 Track 2: Enable Prompt Caching by Default
### Objective
Reduce API costs by 20-30% for workflows with repeated context through prompt caching.
### Background
Anthropic's Prompt Caching (2024):
- **Cache frequently used context** (system prompts, documents, code)
- **90% cost reduction** on cached tokens (read)
- **25% markup** on cache writes
- **5-minute TTL** before cache expires
- **Break-even:** ~3 requests with same context
### Implementation Steps
#### 2.1 Update Provider to Enable Caching
**File:** `empathy_llm_toolkit/providers.py`
**Modify `AnthropicProvider.__init__`:**
```python
class AnthropicProvider:
def __init__(
self,
api_key: str | None = None,
use_prompt_caching: bool = True, # CHANGED: Default to True
use_batch: bool = False,
enable_thinking: bool = False,
):
"""Initialize Anthropic provider.
Args:
api_key: Anthropic API key (defaults to ANTHROPIC_API_KEY env var)
use_prompt_caching: Enable prompt caching (default: True)
use_batch: Use batch API for non-urgent requests
enable_thinking: Enable extended thinking mode for complex tasks
"""
self.client = Anthropic(api_key=api_key)
self.use_prompt_caching = use_prompt_caching
self.use_batch = use_batch
self.enable_thinking = enable_thinking
if use_batch:
self.batch_provider = AnthropicBatchProvider(api_key=api_key)
Add caching logic to complete() method:
def complete(
self,
messages: List[Dict[str, str]],
model: str = "claude-sonnet-4-5",
system_prompt: str | None = None,
**kwargs
) -> Dict[str, Any]:
"""Complete a prompt with optional caching.
Args:
messages: Conversation messages
model: Model ID
system_prompt: System prompt (will be cached if caching enabled)
**kwargs: Additional API parameters
Returns:
API response with usage stats
"""
# Build request
request_params = {
"model": model,
"messages": messages,
"max_tokens": kwargs.get("max_tokens", 4096),
}
# Add system prompt with cache control
if system_prompt:
if self.use_prompt_caching:
# Mark system prompt for caching
request_params["system"] = [
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"}
}
]
else:
request_params["system"] = system_prompt
# Make API call
response = self.client.messages.create(**request_params)
# Track cache performance
usage = response.usage
if hasattr(usage, "cache_creation_input_tokens"):
cache_stats = {
"cache_creation_tokens": usage.cache_creation_input_tokens,
"cache_read_tokens": usage.cache_read_input_tokens,
"cache_hit": usage.cache_read_input_tokens > 0,
}
logger.info(f"Cache stats: {cache_stats}")
return response.model_dump()File: src/empathy_os/workflows/base.py
Add method to identify cacheable content:
class BaseWorkflow:
# ... existing code ...
def _build_cached_system_prompt(self, context: Dict[str, Any]) -> str:
"""Build system prompt optimized for caching.
Prompt caching works best with:
- Static content (docs, code, guidelines)
- Frequent reuse (>3 requests within 5 min)
- Large context (>1024 tokens)
Returns:
System prompt with static content first (for caching)
"""
parts = []
# 1. Static content (CACHEABLE - goes first)
if self.coding_standards:
parts.append("# Coding Standards\n" + self.coding_standards)
if self.style_guide:
parts.append("# Style Guide\n" + self.style_guide)
if context.get("documentation"):
parts.append("# Documentation\n" + context["documentation"])
if context.get("codebase_context"):
parts.append("# Codebase Context\n" + context["codebase_context"])
# 2. Dynamic content (NOT CACHEABLE - goes after cache boundary)
# This will be in the user message, not system prompt
return "\n\n".join(parts)File: src/empathy_os/telemetry/usage_tracker.py
Add cache metrics:
@dataclass
class TokenUsage:
"""Token usage with cache metrics."""
input_tokens: int
output_tokens: int
cache_creation_tokens: int = 0 # NEW
cache_read_tokens: int = 0 # NEW
@property
def total_tokens(self) -> int:
"""Total tokens including cache."""
return (
self.input_tokens +
self.output_tokens +
self.cache_creation_tokens +
self.cache_read_tokens
)
@property
def cache_hit_rate(self) -> float:
"""Percentage of input tokens served from cache."""
total_input = self.input_tokens + self.cache_read_tokens
if total_input == 0:
return 0.0
return self.cache_read_tokens / total_input
@property
def estimated_cost_savings(self) -> float:
"""Estimated cost savings from caching (USD).
Assumptions:
- Cache reads: 90% discount vs full price
- Cache writes: 25% markup vs full price
- Using Sonnet 4.5 pricing ($3/M input tokens)
"""
# Cost without caching
full_price_per_token = 3.0 / 1_000_000
cost_without_cache = (
(self.input_tokens + self.cache_read_tokens) * full_price_per_token
)
# Cost with caching
cache_read_price = full_price_per_token * 0.1 # 90% discount
cache_write_price = full_price_per_token * 1.25 # 25% markup
cost_with_cache = (
(self.input_tokens * full_price_per_token) +
(self.cache_creation_tokens * cache_write_price) +
(self.cache_read_tokens * cache_read_price)
)
return cost_without_cache - cost_with_cacheFile: src/empathy_os/telemetry/cli.py
Add command:
@app.command()
def cache_stats(
days: int = typer.Option(7, help="Days of history to analyze"),
output_format: str = typer.Option("table", help="Output format (table/json)"),
):
"""Show prompt caching performance statistics.
Displays:
- Cache hit rate over time
- Cost savings from caching
- Top workflows benefiting from cache
- Recommendations for optimization
"""
from empathy_os.telemetry import get_usage_tracker
from rich.table import Table
tracker = get_usage_tracker()
stats = tracker.get_cache_stats(days=days)
if output_format == "json":
console.print_json(data=stats)
return
# Table format
table = Table(title=f"Prompt Caching Stats (Last {days} Days)")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Cache Hit Rate", f"{stats['hit_rate']:.1%}")
table.add_row("Total Cache Reads", f"{stats['total_reads']:,} tokens")
table.add_row("Total Cache Writes", f"{stats['total_writes']:,} tokens")
table.add_row("Estimated Savings", f"${stats['savings']:.2f}")
table.add_row("Requests with Cache Hits", f"{stats['hit_count']:,}")
table.add_row("Total Requests", f"{stats['total_requests']:,}")
console.print(table)
# Recommendations
if stats['hit_rate'] < 0.3:
console.print("\n[yellow]⚠ Cache hit rate is low (<30%)[/yellow]")
console.print("Recommendations:")
console.print(" - Increase reuse of system prompts across requests")
console.print(" - Group similar requests together (5-min cache TTL)")
console.print(" - Consider using workflow batching")- Prompt caching enabled by default in provider
- Cache-aware system prompt builder
- Cache metrics tracked in telemetry
- Cache monitoring dashboard in CLI
- Documentation on optimizing for cache hits
- Achieve >50% cache hit rate in test workflows
- Verify 20-30% cost reduction for cached workflows
- Cost Reduction: 20-30% for workflows with repeated context
- Break-even: After 3 requests with same context
- Best Use Cases: Code review, repeated analysis, iterative workflows
Enable Anthropic's extended thinking mode for premium tier tasks to improve reasoning quality.
Extended Thinking (announced Dec 2024):
- Extended reasoning before generating response
- Up to 1M thinking tokens (not billed to user)
- Better performance on complex logical/analytical tasks
- Available on: Sonnet 4.5 and Opus 4.5
- Use cases: Mathematical reasoning, code architecture, complex analysis
File: src/empathy_os/models/registry.py
Update model definitions:
# Capable tier - Sonnet 4.5 with thinking support
ModelInfo(
provider="anthropic",
id="claude-sonnet-4-5-20250929",
tier="capable",
input_price=3.00,
output_price=15.00,
max_tokens=8192,
supports_tools=True,
supports_vision=True,
supports_thinking=True, # NEW
thinking_enabled_by_default=False, # NEW: Only for premium tasks
),
# Premium tier - Opus 4.5 with thinking support
ModelInfo(
provider="anthropic",
id="claude-opus-4-5-20251101",
tier="premium",
input_price=15.00,
output_price=75.00,
max_tokens=8192,
supports_tools=True,
supports_vision=True,
supports_thinking=True, # NEW
thinking_enabled_by_default=True, # NEW: Always use for premium
),File: empathy_llm_toolkit/providers.py
Add thinking support:
def complete(
self,
messages: List[Dict[str, str]],
model: str = "claude-sonnet-4-5",
system_prompt: str | None = None,
enable_thinking: bool | None = None, # NEW
**kwargs
) -> Dict[str, Any]:
"""Complete a prompt with optional extended thinking.
Args:
messages: Conversation messages
model: Model ID
system_prompt: System prompt
enable_thinking: Override thinking mode (None uses model default)
**kwargs: Additional API parameters
Returns:
API response with thinking blocks (if enabled)
"""
# Determine if thinking should be enabled
use_thinking = enable_thinking if enable_thinking is not None else self.enable_thinking
# Build request
request_params = {
"model": model,
"messages": messages,
"max_tokens": kwargs.get("max_tokens", 4096),
}
if system_prompt:
request_params["system"] = system_prompt
# Enable thinking mode
if use_thinking:
request_params["thinking"] = {
"type": "enabled",
"budget_tokens": kwargs.get("thinking_budget", 10000)
}
response = self.client.messages.create(**request_params)
# Extract thinking content if present
result = response.model_dump()
if use_thinking and response.content:
thinking_blocks = [
block for block in response.content
if block.get("type") == "thinking"
]
result["thinking"] = thinking_blocks
return resultFile: src/empathy_os/models/tasks.py
Add thinking-required tasks:
# Tasks that benefit from extended thinking
THINKING_REQUIRED_TASKS = {
# Architecture & Design
"architectural_decision",
"design_system",
"design_api",
"design_database",
# Complex Analysis
"analyze_security",
"analyze_performance",
"root_cause_analysis",
"impact_analysis",
# Strategic Planning
"coordinate",
"synthesis",
"strategic_planning",
"technical_strategy",
# Complex Problem Solving
"debug_complex",
"optimize_algorithm",
"solve_mathematical",
"prove_correctness",
}Update ModelRouter:
class ModelRouter:
def route(
self,
task_type: str,
context: Dict[str, Any] | None = None
) -> Tuple[ModelInfo, bool]:
"""Route task to appropriate model.
Returns:
Tuple of (model_info, enable_thinking)
"""
# Determine tier
if task_type in CHEAP_TASKS:
tier = "cheap"
elif task_type in PREMIUM_TASKS or task_type in THINKING_REQUIRED_TASKS:
tier = "premium"
else:
tier = "capable"
model = self.get_model("anthropic", tier)
# Enable thinking for complex tasks
enable_thinking = (
task_type in THINKING_REQUIRED_TASKS and
model.supports_thinking
)
return model, enable_thinkingFile: src/empathy_os/cli.py
Add option to workflow commands:
@app.command()
def workflow_run(
workflow_id: str,
input_data: str = typer.Option(None, help="JSON input data"),
enable_thinking: bool = typer.Option(False, help="Enable extended thinking mode"),
):
"""Run a workflow with optional thinking mode.
Example:
empathy workflow run code-review --enable-thinking --input '{"file": "foo.py"}'
"""
# ... existing code ...
# Pass thinking flag to workflow
result = workflow.execute(
input_data=data,
enable_thinking=enable_thinking
)File: src/empathy_os/workflows/base.py
Add method to display thinking:
def _display_thinking(self, response: Dict[str, Any]):
"""Display thinking process if available."""
if "thinking" not in response:
return
from rich.panel import Panel
for block in response["thinking"]:
thinking_text = block.get("thinking", "")
console.print(Panel(
thinking_text,
title="🧠 Extended Thinking",
border_style="cyan",
padding=(1, 2)
))- Thinking mode integrated in model registry
- Provider supports thinking API parameter
- Auto-enabled for THINKING_REQUIRED_TASKS
- CLI option to manually enable thinking
- Display thinking process in output
- Quality improvement measured on test set
- Documentation on when to use thinking
- Quality Improvement: 15-20% on complex reasoning tasks
- Use Cases: Architecture decisions, security analysis, debugging
- Cost: Free (thinking tokens not billed)
Replace rough token estimates with Anthropic's official token counter for accurate cost tracking.
Current implementation uses rough estimate (4 chars per token). Anthropic provides:
- Accurate token counting via SDK
- Model-specific tokenization (different models may tokenize differently)
- Billing-accurate counts matching API charges
File: empathy_llm_toolkit/utils/tokens.py (NEW)
"""Token counting utilities using Anthropic's tokenizer."""
from typing import List, Dict, Any
from anthropic import Anthropic
# Initialize client for token counting
_client = Anthropic()
def count_tokens(
text: str,
model: str = "claude-sonnet-4-5"
) -> int:
"""Count tokens using Anthropic's tokenizer.
Args:
text: Text to tokenize
model: Model ID (different models may have different tokenizers)
Returns:
Exact token count as would be billed by API
Example:
>>> count_tokens("Hello, world!")
4
"""
# Use Anthropic's count_tokens method
result = _client.count_tokens(text)
return result
def count_message_tokens(
messages: List[Dict[str, str]],
system_prompt: str | None = None,
model: str = "claude-sonnet-4-5"
) -> Dict[str, int]:
"""Count tokens in a conversation.
Args:
messages: List of message dicts
system_prompt: Optional system prompt
model: Model ID
Returns:
Dict with token counts by component
Example:
>>> messages = [{"role": "user", "content": "Hello!"}]
>>> count_message_tokens(messages, system_prompt="You are helpful")
{"system": 4, "messages": 6, "total": 10}
"""
counts = {}
# Count system prompt
if system_prompt:
counts["system"] = count_tokens(system_prompt, model)
else:
counts["system"] = 0
# Count messages
message_text = "\n".join(
f"{msg['role']}: {msg['content']}"
for msg in messages
)
counts["messages"] = count_tokens(message_text, model)
# Total
counts["total"] = counts["system"] + counts["messages"]
return counts
def estimate_cost(
input_tokens: int,
output_tokens: int,
model: str = "claude-sonnet-4-5"
) -> float:
"""Estimate cost in USD.
Args:
input_tokens: Input token count
output_tokens: Output token count
model: Model ID
Returns:
Estimated cost in USD
Example:
>>> estimate_cost(1000, 500, "claude-sonnet-4-5")
0.0105 # $3/M input + $15/M output
"""
from empathy_os.models import get_model_by_id
model_info = get_model_by_id(model)
if not model_info:
raise ValueError(f"Unknown model: {model}")
input_cost = (input_tokens / 1_000_000) * model_info.input_price
output_cost = (output_tokens / 1_000_000) * model_info.output_price
return input_cost + output_costFile: empathy_llm_toolkit/providers.py
Update AnthropicProvider:
from empathy_llm_toolkit.utils.tokens import count_message_tokens, estimate_cost
class AnthropicProvider:
# ... existing code ...
def estimate_tokens(
self,
messages: List[Dict[str, str]],
system_prompt: str | None = None,
model: str = "claude-sonnet-4-5"
) -> Dict[str, int]:
"""Estimate tokens BEFORE making API call.
Use this to:
- Validate input size before API call
- Estimate costs for budgeting
- Warn users about large requests
Returns:
Dict with token counts (system, messages, total)
"""
return count_message_tokens(messages, system_prompt, model)
def calculate_actual_cost(self, response: Dict[str, Any]) -> float:
"""Calculate actual cost from API response.
Args:
response: API response with usage stats
Returns:
Actual cost in USD
"""
usage = response.get("usage", {})
input_tokens = usage.get("input_tokens", 0)
output_tokens = usage.get("output_tokens", 0)
model = response.get("model", "claude-sonnet-4-5")
# Include cache tokens if present
cache_creation = usage.get("cache_creation_input_tokens", 0)
cache_read = usage.get("cache_read_input_tokens", 0)
# Calculate costs
base_cost = estimate_cost(input_tokens, output_tokens, model)
# Adjust for cache (if used)
if cache_creation or cache_read:
from empathy_os.models import get_model_by_id
model_info = get_model_by_id(model)
# Cache writes: 25% markup
cache_write_cost = (cache_creation / 1_000_000) * model_info.input_price * 1.25
# Cache reads: 90% discount
cache_read_cost = (cache_read / 1_000_000) * model_info.input_price * 0.1
base_cost += cache_write_cost + cache_read_cost
return base_costFile: src/empathy_os/workflows/base.py
Add validation:
class BaseWorkflow:
def __init__(self, max_input_tokens: int = 100_000):
"""Initialize workflow.
Args:
max_input_tokens: Maximum input tokens allowed (safety limit)
"""
self.max_input_tokens = max_input_tokens
def _validate_request_size(
self,
messages: List[Dict[str, str]],
system_prompt: str | None = None,
model: str = "claude-sonnet-4-5"
):
"""Validate request size before API call.
Raises:
ValueError: If request exceeds max_input_tokens
"""
from empathy_llm_toolkit.utils.tokens import count_message_tokens
counts = count_message_tokens(messages, system_prompt, model)
if counts["total"] > self.max_input_tokens:
raise ValueError(
f"Request too large: {counts['total']:,} tokens "
f"(max: {self.max_input_tokens:,})"
)
# Warn if approaching limit
if counts["total"] > self.max_input_tokens * 0.8:
logger.warning(
f"Request is large: {counts['total']:,} tokens "
f"({counts['total']/self.max_input_tokens:.0%} of max)"
)File: src/empathy_os/telemetry/usage_tracker.py
Track accurate costs:
class UsageTracker:
def track_request(
self,
workflow_id: str,
model: str,
response: Dict[str, Any]
):
"""Track request with accurate cost calculation."""
from empathy_llm_toolkit.providers import AnthropicProvider
provider = AnthropicProvider()
actual_cost = provider.calculate_actual_cost(response)
usage = response.get("usage", {})
self._records.append({
"timestamp": datetime.now(),
"workflow": workflow_id,
"model": model,
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"cache_creation_tokens": usage.get("cache_creation_input_tokens", 0),
"cache_read_tokens": usage.get("cache_read_input_tokens", 0),
"cost": actual_cost, # Accurate cost
})- Token counting utility using Anthropic SDK
- Replace all rough estimates with accurate counts
- Pre-request validation with token counts
- Accurate cost tracking in telemetry
- CLI command to estimate costs before running
- Documentation on token counting
- Cost accuracy within 1% of actual bills
- Cost Tracking: Accurate to within 1% (vs 10-20% error)
- Budget Management: Better cost predictions
- Optimization: Identify actual token usage patterns
Create workflows that leverage Claude's vision capabilities for image analysis, OCR, and visual debugging.
Claude Vision (Sonnet 4.5, Opus 4.5):
- Multi-modal understanding (text + images)
- OCR capabilities (extract text from images)
- Visual reasoning (understand charts, diagrams, UI)
- Code screenshots (debug from error screenshots)
- Supported formats: PNG, JPEG, GIF, WebP
File: empathy_llm_toolkit/providers.py
Add image handling:
import base64
from pathlib import Path
class AnthropicProvider:
# ... existing code ...
def complete_with_images(
self,
messages: List[Dict[str, Any]], # Now supports image content
model: str = "claude-sonnet-4-5",
system_prompt: str | None = None,
**kwargs
) -> Dict[str, Any]:
"""Complete a prompt with text and images.
Args:
messages: Messages with text and/or image content
model: Model ID (must support vision)
system_prompt: System prompt
Message format with images:
[
{
"role": "user",
"content": [
{"type": "text", "text": "What's in this image?"},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": "<base64_data>"
}
}
]
}
]
Returns:
API response
"""
from empathy_os.models import get_model_by_id
# Validate model supports vision
model_info = get_model_by_id(model)
if not model_info or not model_info.supports_vision:
raise ValueError(f"Model {model} does not support vision")
# Build request
request_params = {
"model": model,
"messages": messages,
"max_tokens": kwargs.get("max_tokens", 4096),
}
if system_prompt:
request_params["system"] = system_prompt
return self.client.messages.create(**request_params).model_dump()
@staticmethod
def load_image(image_path: str) -> Dict[str, Any]:
"""Load image file and encode as base64.
Args:
image_path: Path to image file
Returns:
Image content block for API
Example:
>>> image = provider.load_image("screenshot.png")
>>> messages = [{
... "role": "user",
... "content": [
... {"type": "text", "text": "What error is shown?"},
... image
... ]
... }]
"""
path = Path(image_path)
# Validate file exists
if not path.exists():
raise FileNotFoundError(f"Image not found: {image_path}")
# Determine media type
media_types = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".webp": "image/webp",
}
media_type = media_types.get(path.suffix.lower())
if not media_type:
raise ValueError(f"Unsupported image format: {path.suffix}")
# Encode as base64
image_data = base64.b64encode(path.read_bytes()).decode("utf-8")
return {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": image_data
}
}File: src/empathy_os/workflows/image_analysis.py (NEW)
"""Image analysis workflow using Claude Vision."""
from typing import List, Dict, Any
from pathlib import Path
import logging
from empathy_os.workflows.base import BaseWorkflow
from empathy_llm_toolkit.providers import AnthropicProvider
logger = logging.getLogger(__name__)
class ImageAnalysisWorkflow(BaseWorkflow):
"""Analyze images using Claude Vision."""
def __init__(self):
super().__init__()
self.provider = AnthropicProvider()
def analyze_image(
self,
image_path: str,
prompt: str = "Describe this image in detail.",
model: str = "claude-sonnet-4-5"
) -> Dict[str, Any]:
"""Analyze a single image.
Args:
image_path: Path to image file
prompt: Analysis prompt
model: Vision-capable model
Returns:
Analysis result with description
Example:
>>> workflow = ImageAnalysisWorkflow()
>>> result = workflow.analyze_image(
... "screenshot.png",
... "What error is shown in this screenshot?"
... )
>>> print(result["analysis"])
"""
# Load image
image = self.provider.load_image(image_path)
# Build message
messages = [{
"role": "user",
"content": [
{"type": "text", "text": prompt},
image
]
}]
# Analyze
response = self.provider.complete_with_images(
messages=messages,
model=model
)
# Extract text content
content = response.get("content", [])
text = " ".join(
block.get("text", "")
for block in content
if block.get("type") == "text"
)
return {
"success": True,
"image": image_path,
"analysis": text,
"model": model,
"usage": response.get("usage", {})
}
def extract_text_ocr(
self,
image_path: str,
model: str = "claude-sonnet-4-5"
) -> Dict[str, Any]:
"""Extract text from image (OCR).
Args:
image_path: Path to image with text
model: Vision-capable model
Returns:
Extracted text
"""
return self.analyze_image(
image_path,
prompt=(
"Extract all text from this image. "
"Preserve formatting and structure. "
"Return only the extracted text, no commentary."
),
model=model
)
def analyze_diagram(
self,
image_path: str,
model: str = "claude-sonnet-4-5"
) -> Dict[str, Any]:
"""Analyze a technical diagram or chart.
Args:
image_path: Path to diagram/chart
model: Vision-capable model
Returns:
Analysis of diagram structure and content
"""
return self.analyze_image(
image_path,
prompt=(
"Analyze this technical diagram or chart. Describe:\n"
"1. The type of diagram (flowchart, architecture, UML, etc.)\n"
"2. The main components and their relationships\n"
"3. The flow or data flow if applicable\n"
"4. Any issues or improvements you notice"
),
model=model
)
def debug_from_screenshot(
self,
screenshot_path: str,
model: str = "claude-sonnet-4-5"
) -> Dict[str, Any]:
"""Debug an error from a screenshot.
Args:
screenshot_path: Path to error screenshot
model: Vision-capable model
Returns:
Debug analysis with fix suggestions
"""
return self.analyze_image(
screenshot_path,
prompt=(
"Analyze this error screenshot. Provide:\n"
"1. What error occurred\n"
"2. The likely root cause\n"
"3. Step-by-step fix instructions\n"
"4. How to prevent this error in the future"
),
model=model
)
def compare_images(
self,
image1_path: str,
image2_path: str,
prompt: str = "What are the differences between these two images?",
model: str = "claude-sonnet-4-5"
) -> Dict[str, Any]:
"""Compare two images.
Args:
image1_path: First image
image2_path: Second image
prompt: Comparison prompt
model: Vision-capable model
Returns:
Comparison analysis
"""
# Load both images
image1 = self.provider.load_image(image1_path)
image2 = self.provider.load_image(image2_path)
# Build message with both images
messages = [{
"role": "user",
"content": [
{"type": "text", "text": prompt},
image1,
image2
]
}]
# Analyze
response = self.provider.complete_with_images(
messages=messages,
model=model
)
content = response.get("content", [])
text = " ".join(
block.get("text", "")
for block in content
if block.get("type") == "text"
)
return {
"success": True,
"images": [image1_path, image2_path],
"comparison": text,
"model": model,
"usage": response.get("usage", {})
}File: src/empathy_os/cli.py
Add image commands:
@app.command()
def image_analyze(
image_path: str = typer.Argument(..., help="Path to image file"),
prompt: str = typer.Option(
"Describe this image in detail.",
help="Analysis prompt"
),
model: str = typer.Option("claude-sonnet-4-5", help="Vision model"),
):
"""Analyze an image using Claude Vision.
Example:
empathy image-analyze screenshot.png --prompt "What error is shown?"
"""
from empathy_os.workflows.image_analysis import ImageAnalysisWorkflow
workflow = ImageAnalysisWorkflow()
console.print(f"[cyan]Analyzing {image_path}...[/cyan]")
result = workflow.analyze_image(image_path, prompt, model)
if result["success"]:
console.print("\n[green]Analysis:[/green]")
console.print(result["analysis"])
usage = result["usage"]
console.print(f"\n[dim]Tokens: {usage['input_tokens']:,} in, {usage['output_tokens']:,} out[/dim]")
else:
console.print(f"[red]Error: {result.get('error')}[/red]")
@app.command()
def image_ocr(
image_path: str = typer.Argument(..., help="Path to image with text"),
output_file: str = typer.Option(None, help="Save extracted text to file"),
):
"""Extract text from image (OCR).
Example:
empathy image-ocr document.png --output-file text.txt
"""
from empathy_os.workflows.image_analysis import ImageAnalysisWorkflow
workflow = ImageAnalysisWorkflow()
console.print(f"[cyan]Extracting text from {image_path}...[/cyan]")
result = workflow.extract_text_ocr(image_path)
if result["success"]:
extracted_text = result["analysis"]
console.print("\n[green]Extracted Text:[/green]")
console.print(extracted_text)
if output_file:
Path(output_file).write_text(extracted_text)
console.print(f"\n[cyan]Saved to: {output_file}[/cyan]")
else:
console.print(f"[red]Error: {result.get('error')}[/red]")
@app.command()
def image_debug(
screenshot_path: str = typer.Argument(..., help="Path to error screenshot"),
):
"""Debug an error from a screenshot.
Example:
empathy image-debug error_screenshot.png
"""
from empathy_os.workflows.image_analysis import ImageAnalysisWorkflow
from rich.panel import Panel
workflow = ImageAnalysisWorkflow()
console.print(f"[cyan]Analyzing error in {screenshot_path}...[/cyan]")
result = workflow.debug_from_screenshot(screenshot_path)
if result["success"]:
console.print(Panel(
result["analysis"],
title="🐛 Debug Analysis",
border_style="red"
))
else:
console.print(f"[red]Error: {result.get('error')}[/red]")File: tests/unit/workflows/test_image_analysis.py (NEW)
"""Tests for image analysis workflow."""
import pytest
from pathlib import Path
from unittest.mock import Mock, patch
from empathy_os.workflows.image_analysis import ImageAnalysisWorkflow
class TestImageAnalysisWorkflow:
"""Test suite for image analysis."""
@patch("empathy_llm_toolkit.providers.AnthropicProvider")
def test_analyze_image(self, mock_provider, tmp_path):
"""Test basic image analysis."""
# Create test image
test_image = tmp_path / "test.png"
test_image.write_bytes(b"fake image data")
# Mock response
mock_provider.return_value.complete_with_images.return_value = {
"content": [{"type": "text", "text": "A test image"}],
"usage": {"input_tokens": 100, "output_tokens": 50}
}
workflow = ImageAnalysisWorkflow()
result = workflow.analyze_image(str(test_image))
assert result["success"]
assert "test image" in result["analysis"].lower()
@patch("empathy_llm_toolkit.providers.AnthropicProvider")
def test_extract_text_ocr(self, mock_provider, tmp_path):
"""Test OCR text extraction."""
test_image = tmp_path / "document.png"
test_image.write_bytes(b"fake document")
mock_provider.return_value.complete_with_images.return_value = {
"content": [{"type": "text", "text": "Extracted text from document"}],
"usage": {"input_tokens": 200, "output_tokens": 10}
}
workflow = ImageAnalysisWorkflow()
result = workflow.extract_text_ocr(str(test_image))
assert result["success"]
assert "Extracted text" in result["analysis"]
def test_load_image_not_found(self):
"""Test that loading non-existent image raises error."""
from empathy_llm_toolkit.providers import AnthropicProvider
with pytest.raises(FileNotFoundError):
AnthropicProvider.load_image("nonexistent.png")
def test_load_image_unsupported_format(self, tmp_path):
"""Test that unsupported format raises error."""
from empathy_llm_toolkit.providers import AnthropicProvider
test_file = tmp_path / "test.txt"
test_file.write_text("not an image")
with pytest.raises(ValueError, match="Unsupported image format"):
AnthropicProvider.load_image(str(test_file))- Vision support added to provider
- Image analysis workflow implemented
- OCR, diagram analysis, debug workflows
- CLI commands for image operations
- Unit tests with mocked responses
- Documentation with examples
- Successfully analyze 10+ real images
- New Capabilities: Image analysis, OCR, visual debugging
- Use Cases: Error screenshots, diagram analysis, document processing
- User Value: Multi-modal understanding
Add real-time streaming for long-running workflows to improve perceived performance and user experience.
Anthropic Streaming:
- Server-Sent Events (SSE) for real-time chunks
- Token-by-token delivery reduces perceived latency
- Better UX for long responses (>500 tokens)
- Same pricing as non-streaming
File: empathy_llm_toolkit/providers.py
Add streaming method:
from typing import Iterator
class AnthropicProvider:
# ... existing code ...
def complete_streaming(
self,
messages: List[Dict[str, str]],
model: str = "claude-sonnet-4-5",
system_prompt: str | None = None,
**kwargs
) -> Iterator[Dict[str, Any]]:
"""Stream completion chunks in real-time.
Args:
messages: Conversation messages
model: Model ID
system_prompt: System prompt
**kwargs: Additional parameters
Yields:
Chunk dicts with incremental content
Example:
>>> for chunk in provider.complete_streaming(messages):
... print(chunk["delta"], end="", flush=True)
"""
request_params = {
"model": model,
"messages": messages,
"max_tokens": kwargs.get("max_tokens", 4096),
"stream": True # Enable streaming
}
if system_prompt:
request_params["system"] = system_prompt
# Stream response
with self.client.messages.stream(**request_params) as stream:
for event in stream:
if event.type == "content_block_delta":
yield {
"type": "content",
"delta": event.delta.text,
"done": False
}
elif event.type == "message_stop":
yield {
"type": "done",
"delta": "",
"done": True,
"usage": event.message.usage
}File: src/empathy_os/workflows/base.py
Add streaming support:
from typing import Iterator
class BaseWorkflow:
# ... existing code ...
def execute_streaming(
self,
input_data: Dict[str, Any],
**kwargs
) -> Iterator[Dict[str, Any]]:
"""Execute workflow with streaming output.
Yields:
Progress updates and incremental results
Example:
>>> for chunk in workflow.execute_streaming(data):
... if chunk["type"] == "progress":
... print(f"Progress: {chunk['message']}")
... elif chunk["type"] == "content":
... print(chunk["delta"], end="", flush=True)
"""
yield {"type": "progress", "message": "Preparing request..."}
# Build prompt
messages = self._build_messages(input_data)
system_prompt = self._build_system_prompt(input_data)
yield {"type": "progress", "message": "Streaming response..."}
# Stream completion
for chunk in self.provider.complete_streaming(
messages=messages,
system_prompt=system_prompt,
**kwargs
):
yield chunk
yield {"type": "progress", "message": "Complete!"}File: src/empathy_os/cli.py
Add streaming option:
@app.command()
def chat_stream(
message: str = typer.Argument(..., help="Your message"),
model: str = typer.Option("claude-sonnet-4-5", help="Model to use"),
):
"""Interactive chat with streaming responses.
Example:
empathy chat-stream "Explain async programming in Python"
"""
from empathy_llm_toolkit.providers import AnthropicProvider
from rich.live import Live
from rich.markdown import Markdown
provider = AnthropicProvider()
messages = [{"role": "user", "content": message}]
console.print("[cyan]Claude:[/cyan] ", end="")
response_text = ""
with Live(console=console, refresh_per_second=10) as live:
for chunk in provider.complete_streaming(messages, model=model):
if chunk["type"] == "content":
response_text += chunk["delta"]
live.update(Markdown(response_text))
elif chunk["type"] == "done":
usage = chunk.get("usage", {})
console.print(f"\n\n[dim]Tokens: {usage.get('output_tokens', 0)}[/dim]")- Streaming implemented in provider
- Streaming workflow base class
- CLI command with live updates
- Handle connection errors gracefully
- Display token usage after stream
- Documentation on when to use streaming
- UX Improvement: Perceived latency reduction
- Use Cases: Long responses, interactive chat
- User Satisfaction: Real-time feedback
Set tier-appropriate max_tokens limits to avoid unnecessary costs.
File: src/empathy_os/models/registry.py
Update max_tokens:
# Cheap tier - Lower limit for cost control
ModelInfo(
provider="anthropic",
id="claude-3-5-haiku-20241022",
tier="cheap",
max_tokens=2048, # Reduced from 8192
# ...
),
# Capable tier - Moderate limit
ModelInfo(
provider="anthropic",
id="claude-sonnet-4-5-20250929",
tier="capable",
max_tokens=4096, # Reduced from 8192
# ...
),
# Premium tier - Higher limit
ModelInfo(
provider="anthropic",
id="claude-opus-4-5-20251101",
tier="premium",
max_tokens=8192, # Keep at 8192
# ...
),Success Criteria:
- Tier-appropriate max_tokens set
- No workflow failures due to truncation
- Minor cost savings (<5%)
Days 1-3:
- Track 2: Enable prompt caching by default
- Track 4: Implement precise token counting
- Track 7: Optimize max_tokens by tier
Days 4-5:
- Testing and validation
- Update documentation
- Track cache hit rates
Days 1-3:
- Track 1: Implement Batch API client
- Track 1: Create batch workflow
- Track 1: Add CLI commands
Days 4-5:
- Track 3: Implement thinking mode
- Track 3: Auto-enable for complex tasks
- Testing and validation
Days 1-3:
- Track 5: Implement vision workflows
- Track 5: Add image analysis CLI
- Track 5: Create OCR and debug workflows
Days 4-5:
- Track 6: Implement streaming
- Track 6: Add streaming CLI
- Testing and validation
Days 1-2:
- Integration testing across all tracks
- Performance benchmarking
- Fix any issues
Days 3-4:
- Complete documentation
- Create usage examples
- Write migration guide
Day 5:
- Final review
- Release preparation
- Announcement
- 30-50% reduction for batch-eligible workflows
- 20-30% reduction for cache-heavy workflows
- Overall 25%+ cost reduction across all workflows
- 15-20% better reasoning on complex tasks (thinking mode)
- Zero accuracy loss on existing workflows
- Vision workflows operational
- OCR with >95% accuracy
- Streaming for responses >500 tokens
- Cost tracking accurate to <1% error
- Cache hit rate >50% for repeated contexts
- Token counting precise (SDK-based)
- Each track has 80%+ code coverage
- Mock Anthropic API responses
- Test error handling
- End-to-end workflow tests
- Real API calls (limited, in CI)
- Cross-track interactions
- Benchmark before/after
- Cost tracking validation
- Cache hit rate monitoring
- Test with real use cases
- Gather feedback
- Iterate based on findings
- Update README with new features
- Add vision workflow examples
- Document batch API usage
- Explain when to use thinking mode
- API reference for new methods
- Architecture diagrams
- Migration guide from v3.9 → v4.0
- Monitoring dashboards
- Cost optimization guide
- Troubleshooting common issues
Risk: Users may not understand 24-hour delay Mitigation: Clear documentation, warnings in CLI, alternative workflows
Risk: Images increase token costs significantly Mitigation: Pre-request cost estimation, warnings for large images
Risk: New features may break existing code Mitigation: Backward compatibility, deprecation warnings, version bumps
Risk: Cache hit rate may be lower than expected Mitigation: Monitoring, optimization recommendations, documentation
- Review this plan with team
- Prioritize tracks based on business needs
- Assign owners to each track
- Set up tracking (GitHub project, issues)
- Begin implementation following schedule
Questions or feedback?
- Open discussion in GitHub Discussions
- Slack: #empathy-framework-dev
- Email: engineering@empathyframework.com
Document Version: 1.0 Last Updated: January 16, 2026 Next Review: End of Week 1