Skip to content

Latest commit

 

History

History
457 lines (342 loc) · 13.6 KB

File metadata and controls

457 lines (342 loc) · 13.6 KB

Trends Tracking System

📋 Document Summary

What This Document Covers:

  • TrendOrchestrator architecture for real-time trend tracking
  • Reactive Firebase watchers for instant data synchronization
  • In-memory caching for ultra-fast API responses (~1-5ms)
  • Prominence scoring algorithm for ranking trends
  • Dual-collection system (potential vs confirmed trends)
  • API integration with Web UI (/api/trends endpoints)

Sections in This Document:

Related Documentation:

Context Tags: #trends #firebase #realtime #cache #prominence-scoring #orchestrator


Overview

LaunchAgencyBot implements a real-time trend tracking system that identifies emerging and established patterns in memecoin launches using Firebase watchers and in-memory caching.

Key Features:

  • Reactive Firebase Watchers - Instant updates when trends change (no polling)
  • In-Memory Cache - Ultra-fast reads (~1-5ms) with automatic updates
  • Prominence Scoring - Activity-based ranking for identifying hot trends
  • Dual Collections - Separate potential (emerging) and confirmed (established) trends
  • Auto-Initialization - TrendOrchestrator starts automatically on server startup

Architecture

Implementation: src/orchestrators/trends/trend_orchestrator.py

TrendOrchestrator Design: Initializes Firebase watchers and in-memory cache on startup; cache contains potential, confirmed, stats, and last_updated fields.

Auto-Initialization

Pattern: src/orchestrators/registry.py automatically initializes TrendOrchestrator on FastAPI startup event (@app.on_event("startup")). Watchers and cache are active before first API request.


Trend Detection Workflow

Location: src/workflows/trend_detection_workflow.py

Flow: Token Launch Events → LLM Classification → Firebase Write → Watcher Triggered → Cache Updated → API Responds

Workflow Components

Component Class Purpose Key Behavior
Input Source TokenLaunchSource WebSocket connection to pump.fun Real-time token launch events
Processor TrendDetectionProcessor Analyze metadata LLM classifies into trends, creates/updates trend documents
Executor FirebaseTrendExecutor Write to Firebase Updates potential_trends or confirmed_trends, triggers watchers
Orchestrator TrendOrchestrator Reactive cache Receives watcher events, updates in-memory cache, recalculates stats

Potential vs Confirmed Trends

Potential Trends (potential_trends collection):

  • Definition: Emerging patterns with 3-9 memecoins
  • Purpose: Early detection of new trends
  • Promotion Threshold: 10+ memecoins → move to confirmed

Confirmed Trends (confirmed_trends collection):

  • Definition: Established patterns with 10+ memecoins
  • Purpose: Validated trends with proven traction
  • Characteristics: Higher confidence, more stable

Auto-Promotion:

if trend.memecoin_count >= 10 and trend in potential_collection:
    # Move to confirmed collection
    firebase.create_document("confirmed_trends", trend_id, trend_data)
    firebase.delete_document("potential_trends", trend_id)

Reactive Watchers

Purpose: Real-time synchronization between Firebase and in-memory cache

Watcher Setup

def _setup_firebase_watchers(self):
    """Setup reactive listeners for trend data changes"""

    # Watch potential trends collection
    potential_ref = self.firebase.get_collection("potential_trends")
    self.firebase.watch_collection(
        collection_ref=potential_ref,
        on_snapshot=self._on_potential_trends_update,
        watcher_id="potential_trends_watcher"
    )

    # Watch confirmed trends collection
    confirmed_ref = self.firebase.get_collection("confirmed_trends")
    self.firebase.watch_collection(
        collection_ref=confirmed_ref,
        on_snapshot=self._on_confirmed_trends_update,
        watcher_id="confirmed_trends_watcher"
    )

    logger.info("👀 Firebase watchers active for trends collections")

Watcher Callbacks

def _on_potential_trends_update(self, snapshot):
    """Handle potential trends updates"""
    self._update_cache_from_snapshot(snapshot, "potential")

def _on_confirmed_trends_update(self, snapshot):
    """Handle confirmed trends updates"""
    self._update_cache_from_snapshot(snapshot, "confirmed")

def _update_cache_from_snapshot(self, snapshot, collection_type):
    """Shared logic for cache updates"""
    logger.info(f"📊 {collection_type.title()} trends updated: {len(snapshot)} trends")
    self.cache[collection_type] = [self._process_trend_document(doc) for doc in snapshot]
    self._update_statistics()
    self.cache["last_updated"] = int(time.time())

Benefits

Instant Updates:

  • Firebase change → Watcher triggered → Cache updated → API reflects immediately
  • Latency: ~100-300ms from Firebase write to API response update

No Polling:

  • Before (polling): while True: fetch_trends(); sleep(5) → 5-second staleness
  • After (watchers): Instant updates with zero polling overhead

Resource Efficiency:

  • Polling: N requests/second (wasteful, expensive)
  • Watchers: Event-driven (minimal overhead, efficient)

In-Memory Cache

Purpose: Ultra-fast API responses (1-20ms) with automatic updates from Firebase watchers

Cache Structure: Contains potential array, confirmed array, and stats object (total/potential/confirmed/hot counts, last_updated timestamp). Each trend includes _id, trend_name, trend_description, memecoin_count, memecoins, timestamps, _collection, and prominence_score.

Access Patterns: All API endpoints read from cache (no Firebase queries). Stats endpoint: ~1-5ms, list/filter: ~5-20ms, detail with images: ~100-500ms.


Prominence Scoring

Purpose: Rank trends by activity and relevance to identify "hot" trends

Scoring Algorithm

def calculate_prominence_score(trend: dict) -> float:
    """Calculate prominence score based on activity and recency"""

    # 1. Memecoin count factor (base score)
    count_factor = min(trend["memecoin_count"] / 50, 1.0)  # Cap at 50

    # 2. Recency factor (newer trends get boost)
    now = int(time.time())
    age_days = (now - trend["created_at"]) / (24 * 3600)
    recency_factor = math.exp(-age_days / 14)  # 14-day half-life

    # 3. Update frequency factor (active trends get boost)
    if trend["updated_at"] > trend["created_at"]:
        update_gap_days = (trend["updated_at"] - trend["created_at"]) / (24 * 3600)
        update_factor = min(update_gap_days / 7, 1.0)  # Cap at 7 days
    else:
        update_factor = 0.0

    # 4. Combined score (weighted)
    prominence_score = (
        count_factor * 0.5 +        # 50% weight on size
        recency_factor * 0.3 +       # 30% weight on recency
        update_factor * 0.2          # 20% weight on activity
    ) * 10  # Scale to 0-10

    return prominence_score

Scoring Example

Small Recent Active Trend (8 memecoins, 2 days old, updated 1 day ago):

  • count_factor = 8/50 = 0.16, recency_factor = exp(-2/14) = 0.87, update_factor = 1/7 = 0.14
  • prominence_score = (0.16 * 0.5 + 0.87 * 0.3 + 0.14 * 0.2) * 10 = 3.49

Hot Trends Threshold

Definition: Trends with >= 10 memecoins (confirmed trends)

hot_trends = [
    trend for trend in all_trends
    if trend["memecoin_count"] >= 10
]

# Statistic
stats["hot_trends"] = len(hot_trends)

API Integration

Router: /api/trends Module: src/web_ui/api/trends_routes.py

API Endpoints

GET /api/trends/stats

Purpose: Get aggregated statistics

Response:

{
  "status": 200,
  "success": true,
  "message": "Statistics retrieved successfully",
  "timestamp": "2025-10-16T14:30:00Z",
  "total_trends": 42,
  "potential_trends": 28,
  "confirmed_trends": 14,
  "hot_trends": 8,
  "last_updated": 1697510000
}

Performance: ~1-5ms (pure cache read)


GET /api/trends

Purpose: List all trends with optional filtering

Query Parameters:

  • collection: Optional[str] - Filter by 'potential' or 'confirmed'
  • include_examples: int - Number of memecoin examples to include (0-3)

Response:

{
  "status": 200,
  "success": true,
  "message": "Trends retrieved successfully",
  "timestamp": "2025-10-16T14:30:00Z",
  "total": 2,
  "trends": [
    {
      "_id": "trend_abc123",
      "trend_name": "Gaming Nostalgia Tokens",
      "trend_description": "Memecoins referencing classic video games",
      "memecoin_count": 12,
      "memecoins": [],  // Or up to 3 examples if include_examples > 0
      "created_at": 1697500000,
      "updated_at": 1697510000,
      "_collection": "potential",
      "prominence_score": 8.5
    }
  ]
}

Performance: ~5-20ms (cache read with filtering)


GET /api/trends/{trend_id}

Purpose: Get specific trend by ID

Response:

{
  "status": 200,
  "success": true,
  "message": "Trend retrieved successfully",
  "timestamp": "2025-10-16T14:30:00Z",
  "trend": {
    "_id": "trend_abc123",
    "trend_name": "Gaming Nostalgia Tokens",
    "trend_description": "Memecoins referencing classic video games",
    "memecoin_count": 12,
    "memecoins": [],
    "created_at": 1697500000,
    "updated_at": 1697510000,
    "_collection": "potential",
    "prominence_score": 8.5
  }
}

Performance: ~1-5ms (cache lookup)


GET /api/trends/detail/{trend_id}

Purpose: Get ALL memecoins from specific trend with Base64 images

Response:

{
  "status": 200,
  "success": true,
  "message": "Trend memecoins retrieved successfully",
  "timestamp": "2025-10-16T14:30:00Z",
  "trend_id": "trend_abc123",
  "trend_name": "Gaming Nostalgia Tokens",
  "total": 12,
  "images_loaded": 12,
  "memecoins": [
    {
      "token_address": "7xKXtg2CW...",
      "token_name": "Mario Coin",
      "ticker": "MARIO",
      "description": "Classic plumber saves princess",
      "image_data_uri": "data:image/jpeg;base64,/9j/4AAQ...",
      "image_url": "https://pump.fun/images/abc.png",
      "creation_timestamp": 1697500000
    }
  ]
}

Performance: ~100-500ms (loads images from filesystem)

Note: For large trends (50+ memecoins), consider client-side caching


GET /api/trends/detection/status

Purpose: Get trend detection workflow status

Response:

{
  "status": 200,
  "success": true,
  "message": "Detection status retrieved successfully",
  "timestamp": "2025-10-16T14:30:00Z",
  "detection_active": true,
  "potential_trends": 28,
  "confirmed_trends": 14
}

POST /api/trends/detection/toggle

Purpose: Toggle trend detection workflow on/off

Response:

{
  "status": 200,
  "success": true,
  "message": "Trend detection enabled",
  "timestamp": "2025-10-16T14:30:00Z",
  "detection_active": true,
  "potential_trends": 28,
  "confirmed_trends": 14
}

Behavior:

  • When Enabled: Runs full trend detection workflow (WebSocket → LLM → Firebase)
  • When Disabled: Only Firebase monitoring active (read-only, no new trends)

Use Case: Control expensive LLM API usage for trend classification


Usage Examples

Get Real-Time Statistics

import requests

response = requests.get("http://localhost:8000/api/trends/stats")
stats = response.json()
print(f"Total: {stats['total_trends']}, Hot: {stats['hot_trends']}")

Get Full Trend Details with Images

trend_id = "trend_abc123"
response = requests.get(f"http://localhost:8000/api/trends/detail/{trend_id}")
data = response.json()

print(f"Trend: {data['trend_name']}")
print(f"Total Memecoins: {data['total']}")

# Access memecoin images
for memecoin in data["memecoins"]:
    image_data_uri = memecoin["image_data_uri"]
    # Display or process...

Best Practices

  • Use cache-optimized endpoints: Use /api/trends/stats (1-5ms) for counts instead of fetching and counting all trends
  • Request only needed examples: Use ?include_examples=3 parameter instead of fetching full trend details and filtering
  • Filter server-side: Use ?collection=confirmed parameter instead of client-side filtering
  • Cache trend details locally: Cache /api/trends/detail/{trend_id} responses (100-500ms) to avoid repeated fetches
  • Monitor detection status: Check /api/trends/detection/status to verify trend detection workflow is active

Implementation References

Key Files

Component File Path
TrendOrchestrator src/orchestrators/trends/trend_orchestrator.py
Trends API Routes src/web_ui/api/trends_routes.py
Trend Detection Workflow src/workflows/trend_detection_workflow.py
Firebase Service src/firebase/simple_firebase_service.py
Orchestrator Registry src/orchestrators/registry.py

Document Status: Complete Last Updated: October 16, 2025 Implementation: Real-time trend tracking with reactive Firebase watchers and in-memory caching