Skip to content

Latest commit

 

History

History
1010 lines (762 loc) · 24 KB

File metadata and controls

1010 lines (762 loc) · 24 KB
Error in user YAML: (<unknown>): mapping values are not allowed in this context at line 1 column 55
---
description: Caching Strategy Plan - Empathy Framework: **Version:** 1.0 **Created:** January 10, 2026 **Status:** Phase 2 Planning (Quick Wins Implemented) **Owner:** Engin
---

Caching Strategy Plan - Empathy Framework

Version: 1.0 Created: January 10, 2026 Status: Phase 2 Planning (Quick Wins Implemented) Owner: Engineering Team


Executive Summary

This document provides a comprehensive caching strategy for the Empathy Framework v3.9.2+. It ranks caching opportunities by value, specifies invalidation strategies, configures memory bounds and TTLs, and establishes monitoring and risk mitigation approaches.

Quick Wins Completed:

  • ✅ AST cache monitoring infrastructure
  • ✅ File hash caching with mtime invalidation
  • ✅ Pattern match result caching module
  • ✅ Cache statistics reporting and health analysis
  • ✅ CacheMonitor enhancements for performance analysis

Expected Performance Gains:

  • AST parsing: 20% of scanner time → 5% (75% reduction)
  • Pattern matching: 12% of query time → 3% (75% reduction)
  • File hashing: 10% of scan time → 2% (80% reduction)
  • Overall scan time: 5.2s → 2.8s (46% faster)

Table of Contents

  1. Caching Opportunities by Priority
  2. Cache Configuration
  3. Invalidation Strategies
  4. Memory Management
  5. TTL Configuration
  6. Monitoring and Metrics
  7. Risk Assessment
  8. Implementation Roadmap
  9. Troubleshooting Guide
  10. Quick Reference

Caching Opportunities by Priority

🔴 PRIORITY 1: Critical Path (Implement Immediately)

1.1 AST Parsing Cache (IMPLEMENTED)

Location: src/empathy_os/project_index/scanner.py:_parse_python_cached()

Problem:

  • AST parsing accounts for 1.94s (20% of scanner time)
  • 2000-5000 files scanned per project
  • Parsing unchanged files wastes CPU

Solution:

@lru_cache(maxsize=2000)
def _parse_python_cached(file_path: str, file_hash: str) -> ast.Module | None:
    """Cache AST parsing with file hash invalidation."""
    content = Path(file_path).read_text(encoding="utf-8", errors="ignore")
    return ast.parse(content)

Expected Impact:

  • Hit rate: 85-90% for incremental scans
  • Time saved: ~1.5s per scan (30% of total)
  • Memory cost: ~20MB for 2000 entries

Monitoring:

  • Monitor cache at cache_monitor.get_stats("ast_parse")
  • Alert if hit rate drops below 60% (indicates invalidation issues)

1.2 File Hash Cache (IMPLEMENTED)

Location: src/empathy_os/project_index/scanner.py:_hash_file()

Problem:

  • File hashing (SHA256) is I/O and CPU intensive
  • Called for ~5000 files per scan
  • Repeated for unchanged files in incremental scans

Solution:

@lru_cache(maxsize=1000)
def _hash_file(file_path: str) -> str:
    """Cache file content hashes with 1000-entry LRU."""
    return hashlib.sha256(Path(file_path).read_bytes()).hexdigest()

Expected Impact:

  • Hit rate: 80%+ for incremental scans
  • Time saved: ~0.5s per scan (10%)
  • Memory cost: ~64KB for 1000 entries

Invalidation Strategy:

  • File modification triggers automatic invalidation (done by lru_cache)
  • Recalculation on file change ensures correctness

1.3 Pattern Match Result Cache (IMPLEMENTED)

Location: src/empathy_os/pattern_cache.py (new module)

Problem:

  • Pattern matching is expensive regex operation
  • Similar contexts repeat across agents
  • _calculate_relevance() runs for every pattern

Solution:

cache = PatternMatchCache(max_size=1000)

# Usage in query_patterns():
cached = cache.get(context)
if cached:
    return cached

# Expensive operation
matches = expensive_relevance_calculation()
cache.set(context, matches)

Expected Impact:

  • Hit rate: 60-70% for repeated queries
  • Time saved: ~0.5-1s per 1000 queries
  • Memory cost: ~5-10MB for 1000 cached results

Cache Key:

  • JSON serialization of context dict
  • Deterministic (sorted keys)
  • Includes agent_id, pattern_type, min_confidence

🟡 PRIORITY 2: High-Value (Implement in Phase 2)

2.1 Pattern Library Type Index

Current: O(n) scan for pattern_type filter Proposed: O(1) type index

# Already implemented in PatternLibrary
self._patterns_by_type: dict[str, list[str]] = {}
self._patterns_by_tag: dict[str, list[str]] = {}

# In query_patterns():
if pattern_type:
    pattern_ids = self._patterns_by_type.get(pattern_type, [])
    patterns_to_check = [self.patterns[pid] for pid in pattern_ids]

Expected Impact:

  • Query time: 15ms → 2ms (87% faster) for large libraries
  • Memory cost: ~1KB per pattern
  • Hit rate: 100% (deterministic structure)

Status: ✅ Already optimized in codebase


2.2 Workflow Execution Result Cache (TTL-based)

Location: src/empathy_os/workflows/base.py

Problem:

  • Workflows make expensive LLM API calls
  • Similar inputs get processed repeatedly
  • API costs scale with redundant calls

Solution:

from datetime import datetime, timedelta

class WorkflowCache:
    def __init__(self, ttl_seconds: int = 3600):
        self._cache: dict[str, tuple[Any, datetime]] = {}
        self._ttl = timedelta(seconds=ttl_seconds)

    def get(self, key: str) -> Any | None:
        entry = self._cache.get(key)
        if entry and datetime.now() < entry[1]:
            return entry[0]
        return None

    def set(self, key: str, result: Any):
        self._cache[key] = (result, datetime.now() + self._ttl)

Expected Impact:

  • Cache hit rate: 40-60% for typical usage patterns
  • API call reduction: 30-50%
  • Cost savings: ~$50-100/month (assuming $0.01-0.05 per call)

TTL Configuration:

  • Default: 1 hour (3600 seconds)
  • Configurable per workflow type
  • Manual invalidation available

🟢 PRIORITY 3: Nice-to-Have (Post-Release)

3.1 Token Estimator Cache

Location: src/empathy_os/models/token_estimator.py

Current Status: Already using @functools.lru_cache

@lru_cache(maxsize=2000)
def estimate_tokens(model: str, text: str) -> int:
    """Estimate token count for model."""
    # Cached calculation

Optimization Ideas:

  • Increase cache to 5000 for larger batch operations
  • Monitor hit rates (likely very high)
  • Consider memory impact

3.2 Telemetry Event Cache

Location: src/empathy_os/telemetry/usage_tracker.py

Problem:

  • Repeated telemetry events with same data
  • Batch processing could benefit from deduplication

Solution:

  • Simple 60-second window cache
  • Deduplicates rapid-fire events
  • Configurable retention period

3.3 Config File Parse Cache

Location: src/empathy_os/config.py and src/empathy_os/config/xml_config.py

Problem:

  • Config files parsed multiple times
  • YAML/XML parsing is CPU-intensive

Solution:

@lru_cache(maxsize=100)
def _parse_config_cached(file_path: str, file_mtime: float) -> dict:
    """Cache config parsing with mtime invalidation."""
    return yaml.safe_load(Path(file_path).read_text())

Expected Impact:

  • Hit rate: 90%+ for stable configs
  • Time saved: minimal (configs rarely change during runtime)
  • Benefit: mainly for multi-threaded scenarios

Cache Configuration

Quick Reference Table

Cache Name Max Size Type Hit Rate Target Memory Budget
ast_parse 2000 LRU 85%+ 20MB
file_hash 1000 LRU 80%+ 64KB
pattern_match 1000 LRU 60%+ 10MB
workflow_result 500 TTL 40%+ 50MB
token_estimate 2000 LRU 90%+ 8MB
config_parse 100 LRU 90%+ 1MB

Configuration Loading

From Environment:

# Set cache sizes
export EMPATHY_AST_CACHE_SIZE=3000
export EMPATHY_PATTERN_CACHE_SIZE=2000
export EMPATHY_WORKFLOW_CACHE_TTL=1800  # 30 minutes

# Disable specific caches
export EMPATHY_DISABLE_PATTERN_CACHE=1

From Config File:

# .empathy/config.yml
cache:
  ast_parse:
    enabled: true
    max_size: 2000
    alert_threshold: 0.6

  pattern_match:
    enabled: true
    max_size: 1000
    ttl_seconds: 3600  # For TTL-based caches

  workflow_result:
    enabled: true
    max_size: 500
    ttl_seconds: 3600  # 1 hour

Invalidation Strategies

Strategy 1: Content Hash (File-based)

Used for: AST parsing, file hashing Mechanism: Cache key includes content hash

# Only cache if file content unchanged
file_hash = sha256(content)
ast = cache.get_or_compute((file_path, file_hash), compute_fn)

Pros:

  • Automatic invalidation on file change
  • No stale data issues
  • Simple to implement

Cons:

  • Hash computation overhead
  • No caching benefit if files change frequently

Strategy 2: Modification Time (Stat-based)

Used for: Config file parsing Mechanism: Cache key includes file mtime

mtime = Path(file_path).stat().st_mtime
config = cache.get_or_compute((file_path, mtime), compute_fn)

Pros:

  • Lightweight (stat() is fast)
  • Effective for stable files
  • Works across processes

Cons:

  • Can miss in-memory changes
  • Edge case: mtime resolution on some filesystems

Strategy 3: Time-to-Live (TTL)

Used for: Workflow results, telemetry Mechanism: Automatic expiration after configured period

# Workflow results valid for 1 hour
cache.set(key, result, ttl_seconds=3600)

# Stale after 1 hour, recomputed on next access
if cache.is_expired(key):
    result = compute_fn()

Pros:

  • Handles external state changes gracefully
  • Configurable freshness
  • Prevents unbounded memory growth

Cons:

  • Potential for stale data
  • Overhead of time checks
  • May need multiple expiration checks

Strategy 4: Explicit Invalidation

Used for: Pattern library, manual cache control Mechanism: API to manually clear cache

# Clear specific cache
cache.clear()

# Clear on specific events
observer.on_file_changed(lambda f: ast_cache.invalidate(f))

# Batch invalidation
cache.invalidate_by_pattern("user_*/config")

Pros:

  • Maximum control
  • No false positives
  • Precise timing

Cons:

  • Requires coordination
  • Easy to miss invalidation points
  • Can cause hard-to-debug staleness

Strategy 5: Event-based (Watch)

Used for: File system watching (future) Mechanism: File system watcher triggers invalidation

from watchdog.observers import Observer

class CacheInvalidator:
    def on_modified(self, event):
        if event.src_path.endswith('.py'):
            ast_cache.invalidate(event.src_path)
            file_hash_cache.invalidate(event.src_path)

observer = Observer()
observer.schedule(CacheInvalidator(), path=".", recursive=True)
observer.start()

Pros:

  • Accurate invalidation
  • Minimal computation
  • Real-time updates

Cons:

  • Requires watchdog dependency
  • Overhead on system resources
  • Platform-specific issues

Memory Management

Per-Cache Limits

# AST cache: ~10KB per entry * 2000 = 20MB
# File hash: ~32 bytes per entry * 1000 = 32KB
# Pattern match: ~5-10KB per entry * 1000 = 10MB
# Total: ~30MB reasonable budget

monitor = CacheMonitor.get_instance()

# Alert if cache exceeds 90% utilization
def check_cache_health():
    for stats in monitor.get_all_stats().values():
        if stats.utilization > 0.9:
            logger.warning(f"{stats.name} at {stats.utilization:.1%} capacity")

Global Limits

class CacheManager:
    """Manages global cache memory budget."""

    MAX_TOTAL_MEMORY = 100 * 1024 * 1024  # 100MB total

    def get_available_memory(self) -> int:
        """Get remaining available memory."""
        used = sum(s.size for s in monitor.get_all_stats().values())
        return self.MAX_TOTAL_MEMORY - used

    def is_over_budget(self) -> bool:
        """Check if caches exceed budget."""
        return self.get_available_memory() < 0

Eviction Policies

LRU (Least Recently Used):

# When cache full, remove oldest accessed entry
if len(cache) >= max_size:
    oldest_key = access_order.pop(0)
    del cache[oldest_key]

LFU (Least Frequently Used):

# When cache full, remove least-accessed entry
if len(cache) >= max_size:
    least_used = min(cache.items(), key=lambda x: access_count[x[0]])
    del cache[least_used[0]]

FIFO (First In, First Out):

# When cache full, remove oldest entry
if len(cache) >= max_size:
    oldest_key = insertion_order.pop(0)
    del cache[oldest_key]

Recommendation: Use LRU for all caches (most common pattern)


TTL Configuration

Default TTL Values

Cache Type Default TTL Rationale
AST Parse Unbounded Changes detected via content hash
File Hash Unbounded Changes detected via content hash
Pattern Match 1 hour Patterns stable for typical workflow
Workflow Result 30 minutes Balance between freshness and caching benefit
Config Parse Unbounded Config changes rare at runtime
Telemetry 60 seconds Deduplication window for rapid events

TTL Configuration Pattern

# In config
cache_config = {
    "ast_parse": {
        "ttl": None,  # No TTL, hash-based invalidation
        "max_size": 2000,
    },
    "workflow_result": {
        "ttl": 30 * 60,  # 30 minutes
        "max_size": 500,
    },
}

# In code
cache = create_cache(
    name="workflow_result",
    ttl_seconds=cache_config["workflow_result"]["ttl"],
    max_size=cache_config["workflow_result"]["max_size"],
)

Dynamic TTL Adjustment

class AdaptiveTTLCache:
    """Cache with adaptive TTL based on hit rate."""

    def __init__(self, initial_ttl: int = 3600):
        self.ttl = initial_ttl
        self.min_ttl = 60      # 1 minute
        self.max_ttl = 86400   # 1 day

    def adjust_ttl(self, hit_rate: float):
        """Adjust TTL based on cache effectiveness."""
        if hit_rate > 0.7:
            # Increase TTL for high-performing cache
            self.ttl = min(self.ttl * 1.5, self.max_ttl)
        elif hit_rate < 0.3:
            # Decrease TTL for low-performing cache
            self.ttl = max(self.ttl * 0.5, self.min_ttl)

Monitoring and Metrics

Key Metrics to Track

  1. Hit Rate - Cache hits / (hits + misses)

    • Target: >60% for most caches
    • Alert: <30% indicates cache ineffectiveness
  2. Eviction Rate - Evictions per minute

    • Target: <1 per minute
    • Alert: >10 per minute indicates cache thrashing
  3. Size Utilization - Current size / max size

    • Target: 50-80% utilization
    • Alert: >90% (approaching limit) or <10% (oversized)
  4. Memory Footprint - Total cache memory usage

    • Target: <100MB total
    • Alert: >150MB indicates memory leak or misconfiguration

Monitoring Integration

from empathy_os.cache_monitor import CacheMonitor
from empathy_os.cache_stats import CacheReporter

# Register caches on startup
monitor = CacheMonitor.get_instance()
monitor.register_cache("ast_parse", max_size=2000)
monitor.register_cache("pattern_match", max_size=1000)

# Periodically check health
def health_check():
    reporter = CacheReporter()
    health = reporter.generate_health_report()
    logger.info(health)

# On shutdown, generate final report
def shutdown_hook():
    reporter = CacheReporter()
    report = reporter.generate_full_report()
    logger.info("Final Cache Report:\n" + report)

Metrics Export

# Export to monitoring system (Prometheus, etc.)
def export_metrics():
    monitor = CacheMonitor.get_instance()
    for cache_name, stats in monitor.get_all_stats().items():
        prometheus_client.Counter(
            f"cache_{cache_name}_hits",
            "Cache hits",
        ).inc(stats.hits)

        prometheus_client.Counter(
            f"cache_{cache_name}_misses",
            "Cache misses",
        ).inc(stats.misses)

        prometheus_client.Gauge(
            f"cache_{cache_name}_size",
            "Current cache size",
        ).set(stats.size)

Risk Assessment

Risk 1: Cache Bugs (Memory Leaks)

Severity: HIGH Likelihood: MEDIUM

Mitigation:

  • Comprehensive unit tests for cache operations
  • Memory profiling on test suite
  • Size bounds with alerts
  • Regular cache eviction

Detection:

# Monitor for unbounded growth
def detect_memory_leak():
    stats = monitor.get_stats("ast_parse")
    if stats.size >= stats.max_size:
        logger.error("Cache at maximum size - possible leak")

Risk 2: Stale Data

Severity: MEDIUM Likelihood: LOW

Mitigation:

  • Hash-based invalidation for file content
  • TTL for external APIs
  • Explicit invalidation on known changes
  • Comprehensive test coverage

Detection:

# Check for stale pattern results
def verify_cache_freshness():
    # Re-compute sample of cached results
    # Compare with cache values
    # Alert on mismatches

Risk 3: Performance Regression

Severity: MEDIUM Likelihood: LOW

Mitigation:

  • Benchmark cache impact before/after
  • Monitor hit rates (low rate = performance problem)
  • Gradual rollout with feature flags
  • Easy disable mechanism

Detection:

# Alert if hit rate drops below threshold
def monitor_hit_rate():
    for stats in monitor.get_all_stats().values():
        if stats.hit_rate < 0.3:  # Below threshold
            logger.warning(f"Low hit rate for {stats.name}: {stats.hit_rate:.1%}")

Risk 4: Cache Invalidation Failures

Severity: HIGH Likelihood: MEDIUM

Mitigation:

  • Content-hash based invalidation (most reliable)
  • File system watcher for immediate detection
  • Regular cache clearing as safety net
  • Extensive testing of invalidation logic

Detection:

# Verify cache results match live computation
def verify_cache_correctness():
    cached = cache.get(key)
    live = compute_result(key)
    if cached != live:
        logger.error("Cache invalidation failure detected")
        cache.clear()  # Safety net

Implementation Roadmap

Phase 1: Quick Wins (COMPLETED ✅)

Timeline: 1-2 days Focus: Immediate performance gains

  • Enhance CacheMonitor with health analysis
  • Add AST cache monitoring
  • Implement PatternMatchCache module
  • Create CacheStats and CacheAnalyzer
  • Write this strategy document

Metrics:

  • AST cache: ~1.5s saved per scan (30%)
  • Pattern cache: ~0.5-1s saved per 1000 queries
  • File hash cache: ~0.5s saved per scan (10%)

Phase 2: Consolidation (Planned)

Timeline: 2-3 weeks Focus: Integration and monitoring

  • Integrate PatternMatchCache into PatternLibrary
  • Add workflow result caching
  • Implement health dashboards
  • Set up performance benchmarks
  • Write troubleshooting guide

Success Criteria:

  • All caches > 60% hit rate
  • Cache health report integrated into CI/CD
  • No performance regressions
  • <30MB total cache memory

Phase 3: Optimization (Future)

Timeline: 1 month+ Focus: Advanced features

  • Implement event-based invalidation (watchdog)
  • Add adaptive TTL adjustment
  • Create cache warming strategies
  • Implement distributed cache (Redis)
  • Add cache persistence

Troubleshooting Guide

Problem: Low Hit Rate (< 30%)

Diagnosis:

  1. Check cache key generation
  2. Verify invalidation not too aggressive
  3. Confirm cache lookup is being used

Solution:

# Enable debug logging
logging.getLogger("empathy_os.cache").setLevel(logging.DEBUG)

# Check if cache is even being consulted
def cache_get_with_debug(key):
    result = cache.get(key)
    if result:
        logger.debug(f"Cache HIT for {key}")
    else:
        logger.debug(f"Cache MISS for {key}")
    return result

Problem: Memory Leak (Cache Growing)

Diagnosis:

  1. Check eviction is working
  2. Verify TTL expiration (for TTL-based)
  3. Look for unbounded keys

Solution:

# Monitor cache growth
def check_cache_growth():
    initial_size = monitor.get_stats("ast_parse").size
    time.sleep(60)
    final_size = monitor.get_stats("ast_parse").size

    if final_size > initial_size * 1.5:
        logger.warning("Cache growing too fast")
        cache.clear()  # Emergency clear

Problem: Stale Data

Diagnosis:

  1. Verify invalidation detection
  2. Check TTL configuration
  3. Test edge cases (files changed externally)

Solution:

# Add verification step
def verify_cache_validity():
    sample_keys = list(cache._cache.keys())[:10]
    for key in sample_keys:
        cached = cache._cache[key]
        live = recompute(key)
        assert cached == live, f"Stale data for {key}"

Problem: Performance Regression

Diagnosis:

  1. Check hit rate dropped
  2. Verify cache computation cost
  3. Compare with before/after benchmarks

Solution:

# Disable cache temporarily to verify it's the issue
cache.clear()
# Benchmark without cache
# If performance improves, cache has bug

# Or disable specific cache
CACHE_ENABLED = False
if CACHE_ENABLED:
    return cache.get_or_compute(key, fn)
else:
    return fn()  # Bypass cache

Quick Reference

Enable Cache Monitoring

from empathy_os.cache_monitor import CacheMonitor
from empathy_os.cache_stats import CacheReporter

monitor = CacheMonitor.get_instance()

# Get all statistics
stats = monitor.get_report(verbose=True)
print(stats)

# Get health assessment
reporter = CacheReporter()
health = reporter.generate_health_report()
print(health)

Common Operations

# Register a cache
monitor.register_cache("my_cache", max_size=1000)

# Record operations
monitor.record_hit("my_cache")
monitor.record_miss("my_cache")
monitor.update_size("my_cache", 500)

# Query performance
high_performers = monitor.get_high_performers(threshold=0.7)
underperformers = monitor.get_underperformers(threshold=0.3)

# Size report
print(monitor.get_size_report())

Cache Configuration

# .empathy/config.yml
cache:
  enabled: true

  ast_parse:
    enabled: true
    max_size: 2000
    invalidation: hash  # content-based

  pattern_match:
    enabled: true
    max_size: 1000
    ttl_seconds: 3600

  workflow_result:
    enabled: true
    max_size: 500
    ttl_seconds: 1800

Environment Variables

# Disable specific caches
EMPATHY_DISABLE_PATTERN_CACHE=1
EMPATHY_DISABLE_WORKFLOW_CACHE=1

# Adjust sizes
EMPATHY_AST_CACHE_SIZE=3000
EMPATHY_PATTERN_CACHE_SIZE=2000

# TTL configuration
EMPATHY_WORKFLOW_CACHE_TTL=3600
EMPATHY_PATTERN_CACHE_TTL=1800

Appendix: Performance Baselines

Before Caching (Baseline)

Project Scan (1000 files):
  Time: 5.2s
  - File discovery: 0.5s (10%)
  - File hashing: 0.5s (10%)
  - AST parsing: 1.94s (37%)
  - Dependency analysis: 0.8s (15%)
  - Metrics calculation: 1.48s (28%)

Pattern Query (1000 queries):
  Time: 850ms
  - Pattern matching: 510ms (60%)
    - Relevance calculation: 255ms (30%)
    - Tag matching: 127ms (15%)
    - Success rate boost: 128ms (15%)

After Caching (Projected)

Project Scan (1000 files, incremental):
  Time: 2.8s (46% faster)
  - File discovery: 0.5s (18%)
  - File hashing: 0.1s (4%) [80% hit rate]
  - AST parsing: 0.3s (11%) [85% hit rate]
  - Dependency analysis: 0.8s (29%)
  - Metrics calculation: 1.1s (39%)

Pattern Query (1000 queries, with cache):
  Time: 285ms (66% faster)
  - Cached hits: 600 queries @ 0.05ms = 30ms (10%)
  - New queries: 400 queries @ 0.64ms = 255ms (89%)
  - Overall: (600*0.05 + 400*0.64) / 1000 = 0.28ms average

References


Next Steps:

  1. Review and approve caching strategy
  2. Begin Phase 2 integration work
  3. Set up performance monitoring
  4. Plan Phase 3 advanced optimizations

Questions? See troubleshooting guide or contact engineering team.