---
description: Sprint Series: Achieving 85%+ Test Coverage: **Version:** 1.0 **Created:** January 16, 2026 **Owner:** Engineering Team **Status:** Planning → Execution Ready -
---
Version: 1.0 Created: January 16, 2026 Owner: Engineering Team Status: Planning → Execution Ready
Current State: 54.67% coverage (6,615 tests passing) Target: 85%+ coverage (production readiness threshold) Gap: +30.33 percentage points needed Timeline: 3 sprints × 2 weeks = 6 weeks total Approach: Systematic module-by-module coverage improvement with OOP refactoring
Prerequisites Completed:
- ✅ Phase 1: Architecture validation and gap analysis
- ✅ Phase 2: Architectural test suite created (~200 tests)
- ✅ LongTermMemory class implemented (P0 gap resolved)
- ✅ Critical gaps documented and prioritized
| Module | Current % | Target % | Gap | Priority | Lines |
|---|---|---|---|---|---|
| orchestration/ | 22.53% | 90% | +67.47 | P0 | 1,234 |
| memory/ | 18-27% | 90% | +63-72 | P0 | 892 |
| models/ | 21-73% | 95% | +22-74 | P1 | 1,523 |
| workflows/ | 45-98% | 95% | 0-50 | P1 | 2,341 |
| security/ | 12% | 90% | +78 | P0 | 456 |
| telemetry/ | 67% | 85% | +18 | P2 | 678 |
| project_index/ | 34% | 85% | +51 | P2 | 789 |
| cli.py | 56% | 80% | +24 | P2 | 234 |
Total Lines to Cover: ~4,200 additional lines across 8 major modules
Goal: Complete OOP refactoring, enable architectural tests Target: 54.67% → 70% (+15.33 points)
Goal: Test orchestration, memory, and models thoroughly Target: 70% → 80% (+10 points)
Goal: Cover workflows, security, remaining modules Target: 80% → 85%+ (+5+ points)
- Complete OOP refactoring (ModelRegistry, FallbackPolicy)
- Enable all 200 architectural tests
- Achieve baseline coverage for critical modules
- Fix test API mismatches
File: src/empathy_os/models/registry.py
class ModelRegistry:
"""OOP interface for model tier management."""
def __init__(self, registry_dict: dict | None = None):
self._registry = registry_dict or MODEL_REGISTRY
self._by_tier_cache: dict = {}
self._build_tier_cache()
def get_model(self, provider: str, tier: str) -> ModelInfo | None:
"""Get model by provider and tier."""
return self._registry.get(provider, {}).get(tier)
def get_model_by_id(self, model_id: str) -> ModelInfo | None:
"""Get model by unique ID (e.g., 'gpt-4o')."""
for provider_models in self._registry.values():
for model in provider_models.values():
if model.model_id == model_id:
return model
return None
def get_models_by_tier(self, tier: str) -> list[ModelInfo]:
"""Get all models in a tier across providers."""
return self._by_tier_cache.get(tier, [])
def list_providers(self) -> list[str]:
"""List all registered providers."""
return list(self._registry.keys())
def list_tiers(self) -> list[str]:
"""List all available tiers."""
return ["cheap", "capable", "premium"]
# Backward compatibility
_default_registry = ModelRegistry()
def get_model(provider: str, tier: str) -> ModelInfo | None:
return _default_registry.get_model(provider, tier)Tests to Enable: 25 ModelRegistry tests in test_execution_and_fallback_architecture.py
Coverage Impact: +5% (models/registry.py: 21% → 80%)
File: src/empathy_os/models/fallback.py (new)
class FallbackTier(Enum):
"""Fallback tier progression."""
CHEAP = "cheap"
CAPABLE = "capable"
PREMIUM = "premium"
class FallbackPolicy:
"""Manages fallback logic for failed LLM requests."""
TIER_PROGRESSION = {
"cheap": "capable",
"capable": "premium",
"premium": None # No fallback from premium
}
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
self.fallback_count = 0
def get_next_tier(self, current_tier: str) -> str | None:
"""Get next tier in fallback chain."""
return self.TIER_PROGRESSION.get(current_tier)
def prepare_fallback_request(
self,
original_request: dict,
current_tier: str
) -> dict:
"""Prepare request for fallback tier."""
next_tier = self.get_next_tier(current_tier)
if not next_tier:
raise ValueError("No fallback available from premium tier")
return {
**original_request,
"tier": next_tier,
"is_fallback": True,
"fallback_from": current_tier,
"attempt": self.fallback_count + 1
}
def should_fallback(self, error: Exception, tier: str) -> bool:
"""Determine if fallback should be attempted."""
if self.fallback_count >= self.max_retries:
return False
if tier == "premium":
return False # Already at highest tier
# Fallback for rate limits, timeouts, temporary errors
fallback_errors = (TimeoutError, ConnectionError, ...)
return isinstance(error, fallback_errors)Tests to Enable: 30 FallbackPolicy tests
Coverage Impact: +3% (new module)
File: tests/unit/memory/test_memory_architecture.py
Fix API mismatches:
# Old (idealized API):
memory = UnifiedMemory(use_mock_redis=True)
# New (actual API):
config = MemoryConfig(redis_mock=True)
memory = UnifiedMemory(user_id="test_user", config=config)Tests to Enable: 40 memory architecture tests
Coverage Impact: +4% (memory/: 18-27% → 65%)
File: src/empathy_os/orchestration/meta_orchestrator.py
Make key methods public for testing:
class MetaOrchestrator:
# Public methods for testing
def analyze_task(self, task: str, context: dict) -> TaskRequirements:
"""Public wrapper for task analysis (testable)."""
return self._analyze_task(task, context)
def create_execution_plan(
self,
requirements: TaskRequirements,
agents: list[AgentTemplate],
strategy: CompositionPattern
) -> ExecutionPlan:
"""Create execution plan from components (extracted for testing)."""
return ExecutionPlan(
agents=agents,
strategy=strategy,
quality_gates=requirements.quality_gates,
estimated_cost=self._estimate_cost(agents),
estimated_duration=self._estimate_duration(agents, strategy),
)
# Original method now uses extracted components
def analyze_and_compose(self, task: str, context: dict) -> ExecutionPlan:
requirements = self.analyze_task(task, context)
agents = self._select_agents(requirements)
strategy = self._choose_composition_pattern(requirements, agents)
return self.create_execution_plan(requirements, agents, strategy)Tests to Enable: 50 meta-orchestrator tests (currently skipped)
Coverage Impact: +8% (orchestration/: 22.53% → 75%)
File: tests/integration/test_end_to_end_workflows.py (new)
Test complete user journeys:
def test_complete_health_check_workflow():
"""Test full health check workflow execution."""
orchestrator = MetaOrchestrator()
memory = UnifiedMemory(user_id="test", config=MemoryConfig(redis_mock=True))
# User journey: Request health check
plan = orchestrator.analyze_and_compose(
task="Run comprehensive health check on empathy-framework",
context={"project_root": "."}
)
# Verify agents selected
assert len(plan.agents) > 0
assert plan.strategy in [CompositionPattern.PARALLEL, CompositionPattern.SEQUENTIAL]
# Execute (mock LLM calls)
with patch_llm_calls():
result = execute_plan(plan)
# Store results in memory
memory.persist_pattern(
content=result.summary,
pattern_type="health_check_result"
)
# Verify persistence
patterns = memory.search_patterns(pattern_type="health_check_result")
assert len(patterns) > 0Tests to Write: 20 integration tests
Coverage Impact: +5% (integration coverage across modules)
- Run full test suite:
pytest --cov=src --cov-report=html - Verify 70%+ coverage achieved
- Document remaining gaps
- Update architectural gap analysis
Sprint 1 Deliverables:
- ✅ ModelRegistry class (backward compatible)
- ✅ FallbackPolicy class
- ✅ 200 architectural tests enabled
- ✅ Meta-orchestrator testability improved
- ✅ Coverage: 54.67% → 70%+
- Comprehensive orchestration testing (90% target)
- Memory system edge cases (90% target)
- Models & routing completeness (95% target)
- Security module baseline (60%+)
Focus: Test all 6 composition patterns thoroughly
class TestCompositionPatterns:
"""Test each composition pattern in isolation."""
def test_parallel_pattern_execution(self):
"""Test parallel pattern distributes work correctly."""
pattern = ParallelComposition()
agents = [create_mock_agent("Agent1"), create_mock_agent("Agent2")]
result = pattern.execute(task="parallel task", agents=agents)
# Both agents should execute concurrently
assert len(result.agent_results) == 2
assert result.execution_time < sum(agent_times) # Parallel speedup
def test_sequential_pattern_execution(self):
"""Test sequential pattern maintains order."""
pattern = SequentialComposition()
agents = [create_mock_agent("Step1"), create_mock_agent("Step2")]
result = pattern.execute(task="sequential task", agents=agents)
# Step2 should receive Step1's output
assert result.agent_results[1].input_context["previous_result"] == result.agent_results[0].output
# ... 4 more pattern tests (Refinement, Hierarchical, Debate, Voting)Tests to Write: 30 composition pattern tests
Coverage Impact: +6% (orchestration/execution_strategies.py: 0% → 75%)
Focus: Verify agent selection logic
class TestAgentSelection:
"""Test agent selection algorithms."""
def test_capability_based_selection(self):
"""Test agents selected by capability match."""
requirements = TaskRequirements(
complexity=TaskComplexity.COMPLEX,
domain=TaskDomain.TESTING,
capabilities_needed=["pytest", "coverage_analysis", "test_generation"]
)
orchestrator = MetaOrchestrator()
agents = orchestrator._select_agents(requirements)
# All required capabilities covered
agent_capabilities = set()
for agent in agents:
agent_capabilities.update(agent.capabilities)
assert "pytest" in agent_capabilities
assert "coverage_analysis" in agent_capabilities
assert "test_generation" in agent_capabilities
def test_complexity_affects_agent_count(self):
"""Test that complex tasks get more agents."""
simple_req = TaskRequirements(complexity=TaskComplexity.SIMPLE, ...)
complex_req = TaskRequirements(complexity=TaskComplexity.COMPLEX, ...)
simple_agents = orchestrator._select_agents(simple_req)
complex_agents = orchestrator._select_agents(complex_req)
assert len(complex_agents) >= len(simple_agents)Tests to Write: 25 agent selection tests
Coverage Impact: +5% (orchestration/meta_orchestrator.py: 75% → 90%)
Focus: TTL expiration, cross-tier operations, concurrency
class TestMemoryEdgeCases:
"""Test memory system edge cases and failure modes."""
def test_ttl_expiration_does_not_lose_data(self):
"""Test that expiring short-term memory promotes to long-term."""
memory = UnifiedMemory(...)
# Store with auto-promotion enabled
memory.stash("important_data", {"value": 42}, ttl=1)
# Wait for TTL expiration
time.sleep(1.5)
# Should still be retrievable from long-term
result = memory.retrieve("important_data")
assert result is not None
assert result["value"] == 42
def test_concurrent_writes_maintain_consistency(self):
"""Test that concurrent writes don't corrupt data."""
import threading
memory = UnifiedMemory(...)
errors = []
def writer(key, value):
try:
memory.long_term.store(key, value)
retrieved = memory.long_term.retrieve(key)
assert retrieved == value
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=writer, args=(f"key{i}", {"id": i}))
for i in range(100)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0 # No corruption
def test_memory_survives_redis_restart(self):
"""Test that data persists across Redis restarts."""
memory = UnifiedMemory(...)
# Store data
memory.persist_pattern(content="test", pattern_type="test")
# Simulate Redis restart (disconnect and reconnect)
memory._short_term = None
memory._initialize_backends()
# Long-term data should still be accessible
patterns = memory.search_patterns(pattern_type="test")
assert len(patterns) > 0Tests to Write: 35 memory edge case tests
Coverage Impact: +8% (memory/: 65% → 90%)
Focus: Task tier mapping, cost optimization, fallback activation
class TestRoutingAndCostTracking:
"""Test intelligent routing and cost tracking."""
def test_task_type_maps_to_correct_tier(self):
"""Test TASK_TIER_MAP routing logic."""
test_cases = [
(TaskType.CODE_GENERATION, "capable"), # Requires reasoning
(TaskType.SIMPLE_COMPLETION, "cheap"), # Simple task
(TaskType.COMPLEX_REASONING, "premium"), # Needs best model
]
for task_type, expected_tier in test_cases:
tier = TASK_TIER_MAP[task_type]
assert tier == expected_tier
def test_fallback_activates_on_rate_limit(self):
"""Test fallback when primary tier is rate limited."""
policy = FallbackPolicy()
executor = LLMExecutor(fallback_policy=policy)
# Mock rate limit error
with patch_rate_limit_error("cheap"):
result = executor.execute(
provider="anthropic",
tier="cheap",
messages=[...]
)
# Should have fallen back to capable tier
assert result.tier_used == "capable"
assert result.is_fallback is True
def test_cost_tracking_accurate(self):
"""Test that cost tracking matches actual token usage."""
executor = LLMExecutor()
result = executor.execute(
provider="anthropic",
tier="cheap",
messages=[{"role": "user", "content": "Hello"}]
)
# Cost should match (tokens * price_per_token)
expected_cost = result.tokens_used * MODEL_PRICES["cheap"]
assert abs(result.cost - expected_cost) < 0.001Tests to Write: 30 routing & cost tests
Coverage Impact: +10% (models/: 73% → 95%)
- Run coverage:
pytest --cov=src --cov-report=html - Write security module baseline tests (20 tests)
- Document Sprint 2 achievements
- Plan Sprint 3 priorities
Sprint 2 Deliverables:
- ✅ Orchestration at 90%+ coverage
- ✅ Memory at 90%+ coverage
- ✅ Models at 95%+ coverage
- ✅ Security baseline established (60%+)
- ✅ Coverage: 70% → 80%+
- Workflow coverage to 95%
- Security module to 90%
- Remaining modules to 85%
- Edge case and error handling coverage
Focus: Test workflow base class, execution lifecycle, error handling
Target modules:
workflows/base.py(45% → 95%)workflows/test_gen.py(68% → 95%)workflows/config.py(89% → 98%)
Tests to Write: 40 workflow tests
- Workflow state management
- Multi-tier execution
- Result aggregation
- Error recovery
- Resource cleanup
Coverage Impact: +6% (workflows/: 45-98% → 95% average)
Focus: Path validation, PII scrubbing, secrets detection, audit logging
class TestSecurityFeatures:
"""Comprehensive security testing."""
def test_path_traversal_prevention(self):
"""Test all path traversal attack vectors."""
from empathy_os.config import _validate_file_path
attack_vectors = [
"../../../etc/passwd",
"config\x00.json", # Null byte injection
"/etc/shadow",
"~/.ssh/id_rsa",
"\\\\network\\share\\file" # UNC path
]
for attack in attack_vectors:
with pytest.raises(ValueError):
_validate_file_path(attack)
def test_pii_scrubbing_comprehensive(self):
"""Test PII detection and scrubbing."""
from empathy_os.memory.security.pii_scrubber import PIIScrubber
scrubber = PIIScrubber()
test_cases = [
("SSN: 123-45-6789", "SSN: [REDACTED-SSN]"),
("Email: user@example.com", "Email: [REDACTED-EMAIL]"),
("Phone: (555) 123-4567", "Phone: [REDACTED-PHONE]"),
("CC: 4111-1111-1111-1111", "CC: [REDACTED-CREDIT_CARD]"),
]
for input_text, expected_output in test_cases:
scrubbed, detections = scrubber.scrub(input_text)
assert expected_output in scrubbed
assert len(detections) > 0
def test_audit_logging_complete(self):
"""Test that all security events are logged."""
from empathy_os.memory.security.audit_logger import AuditLogger
logger = AuditLogger()
# Trigger security event
logger.log_security_violation(
user_id="test",
violation_type="path_traversal",
severity="HIGH",
details={"attempted_path": "../../etc/passwd"}
)
# Verify logged
events = logger.get_events(event_type="security_violation")
assert len(events) == 1
assert events[0].severity == "HIGH"Tests to Write: 50 security tests
Coverage Impact: +12% (security/: 12% → 90%)
Focus: CLI, project_index, telemetry
Modules to complete:
cli.py(56% → 80%)project_index/scanner.py(34% → 85%)telemetry/(67% → 85%)
Tests to Write: 45 tests across remaining modules
Coverage Impact: +4% (final modules to target)
Focus: Test failure modes, resource limits, recovery scenarios
class TestEdgeCasesAndFailures:
"""Test system behavior under failure conditions."""
def test_disk_full_during_write(self):
"""Test graceful handling when disk is full."""
memory = LongTermMemory()
with patch_disk_full():
success = memory.store("key", {"data": "value"})
assert success is False # Should fail gracefully, not crash
def test_corrupted_memory_file(self):
"""Test handling of corrupted JSON files."""
memory = LongTermMemory()
# Create corrupted file
corrupt_path = memory.storage_path / "corrupt.json"
corrupt_path.write_text("{invalid json")
# Should return None, not crash
result = memory.retrieve("corrupt")
assert result is None
def test_max_agents_limit_enforced(self):
"""Test that resource limits prevent runaway agent spawning."""
orchestrator = MetaOrchestrator()
requirements = TaskRequirements(
complexity=TaskComplexity.COMPLEX,
capabilities_needed=["cap" + str(i) for i in range(100)]
)
agents = orchestrator._select_agents(requirements)
# Should cap at reasonable limit (e.g., 10 agents)
assert len(agents) <= 10Tests to Write: 30 edge case tests
Coverage Impact: +2% (edge cases across all modules)
- Run full test suite with coverage
- Generate coverage report:
pytest --cov=src --cov-report=html --cov-report=term - Verify 85%+ achieved
- Update documentation:
- API reference
- Architecture diagrams
- Test coverage report
- Known gaps and future work
Sprint 3 Deliverables:
- ✅ Workflows at 95%+
- ✅ Security at 90%+
- ✅ All modules at 85%+
- ✅ Edge cases covered
- ✅ Coverage: 80% → 85%+
| Module | Week 0 | Week 2 | Week 4 | Week 6 | Status |
|---|---|---|---|---|---|
| orchestration/ | 22.53% | 75% | 90% | 90% | ✅ |
| memory/ | 18-27% | 65% | 90% | 90% | ✅ |
| models/ | 21-73% | 80% | 95% | 95% | ✅ |
| workflows/ | 45-98% | 50% | 70% | 95% | ✅ |
| security/ | 12% | 20% | 60% | 90% | ✅ |
| telemetry/ | 67% | 70% | 75% | 85% | ✅ |
| project_index/ | 34% | 40% | 60% | 85% | ✅ |
| cli.py | 56% | 60% | 70% | 80% | ✅ |
| Overall | 54.67% | 70% | 80% | 85%+ | 🎯 |
- Week 0: 6,615 tests
- Week 2: ~6,900 tests (+285)
- Week 4: ~7,150 tests (+535)
- Week 6: ~7,400 tests (+785)
All sprints must maintain:
- ✅ 100% of existing tests passing
- ✅ No new linter warnings (ruff, mypy)
- ✅ No security issues (bandit scan clean)
- ✅ All architectural invariants validated
- ✅ Documentation updated with API changes
Mitigation:
- Maintain backward-compatible functional interfaces
- Use deprecation warnings, not breaking changes
- Run full test suite after each refactoring step
Rollback Plan:
- Keep old implementation behind feature flag
- Document breaking changes clearly
- Provide migration guide
Mitigation:
- Prioritize high-value coverage (critical paths)
- Accept 80%+ as success if 85% unrealistic
- Focus on architectural coverage over line coverage
Acceptance Criteria:
- 80%+ overall coverage = success
- 90%+ on critical modules (orchestration, memory, models)
- 95%+ on critical paths (user journeys)
Mitigation:
- Realistic daily goals (10-15 tests/day)
- Built-in buffer days (30 days for 6 weeks of work)
- Sprint reviews to adjust pace
Adjustment Plan:
- Extend timeline if needed (6 weeks → 8 weeks)
- Deprioritize P2 modules if time constrained
- Focus on P0/P1 first, P2 as stretch goals
Morning (3-4 hours):
- Write 10-15 new tests
- Fix any failures
- Refactor code for testability if needed
Afternoon (2-3 hours):
- Review coverage gains:
pytest --cov=src --cov-report=term-missing - Document findings
- Plan next day's tests
Weekly Review (Fridays):
- Run full test suite
- Generate coverage report
- Update progress tracker
- Adjust plan based on actual velocity
- ModelRegistry class
- FallbackPolicy class
- 200 architectural tests enabled
- Meta-orchestrator refactored for testability
- Coverage: 54.67% → 70%
- Comprehensive composition pattern tests
- Memory edge case coverage
- Routing and cost tracking complete
- Security baseline established
- Coverage: 70% → 80%
- Workflow coverage complete
- Security module comprehensive
- All modules at target coverage
- Edge cases and error handling
- Coverage: 80% → 85%+
- Performance regression testing
- Integration test suite expansion
- Load testing for concurrent workflows
- Security audit (penetration testing)
- Property-based testing (hypothesis)
- Mutation testing (mutmut)
- Benchmarking suite
- CI/CD pipeline optimization
class TestFeatureName:
"""Test [feature] functionality."""
def setup_method(self):
"""Set up test fixtures."""
self.subject = FeatureClass()
def test_happy_path(self):
"""Test normal operation."""
result = self.subject.method(valid_input)
assert result == expected_output
def test_edge_case_empty_input(self):
"""Test behavior with empty input."""
with pytest.raises(ValueError):
self.subject.method("")
def test_error_handling(self):
"""Test error handling."""
with pytest.raises(SpecificError):
self.subject.method(invalid_input)def test_end_to_end_workflow():
"""Test complete user journey."""
# Setup
orchestrator = MetaOrchestrator()
memory = UnifiedMemory(...)
# Execute
plan = orchestrator.analyze_and_compose(task="...", context={})
result = execute_plan(plan)
memory.persist_pattern(result)
# Verify
assert result.success is True
assert memory.search_patterns(...) is not None# Run full test suite with coverage
pytest --cov=src --cov-report=html --cov-report=term-missing
# Run specific module coverage
pytest tests/unit/orchestration/ --cov=src/empathy_os/orchestration
# Generate coverage badge
coverage-badge -o coverage.svg
# Find untested lines
pytest --cov=src --cov-report=term-missing | grep "0%"
# Coverage diff (before/after comparison)
diff <(coverage report) <(coverage report)Status: ✅ Plan Complete - Ready for Execution Next Step: Begin Sprint 1, Day 1 - ModelRegistry Implementation Document Version: 1.0 Last Updated: January 16, 2026