What This Document Covers:
- TrendOrchestrator architecture for real-time trend tracking
- Reactive Firebase watchers for instant data synchronization
- In-memory caching for ultra-fast API responses (~1-5ms)
- Prominence scoring algorithm for ranking trends
- Dual-collection system (potential vs confirmed trends)
- API integration with Web UI (/api/trends endpoints)
Sections in This Document:
- Overview
- Architecture
- Trend Detection Workflow
- Reactive Watchers
- In-Memory Cache
- Prominence Scoring
- API Integration
- Usage Examples
- Best Practices
Related Documentation:
- → FIREBASE_INTEGRATION.md - Firebase watchers and collections
- → WEB_UI.md - Trends page interface
- → ORCHESTRATORS.md - Orchestrator coordination
- → ../../.claude/ARCHITECTURE.md - System architecture
Context Tags: #trends #firebase #realtime #cache #prominence-scoring #orchestrator
LaunchAgencyBot implements a real-time trend tracking system that identifies emerging and established patterns in memecoin launches using Firebase watchers and in-memory caching.
Key Features:
- Reactive Firebase Watchers - Instant updates when trends change (no polling)
- In-Memory Cache - Ultra-fast reads (~1-5ms) with automatic updates
- Prominence Scoring - Activity-based ranking for identifying hot trends
- Dual Collections - Separate potential (emerging) and confirmed (established) trends
- Auto-Initialization - TrendOrchestrator starts automatically on server startup
Implementation: src/orchestrators/trends/trend_orchestrator.py
TrendOrchestrator Design: Initializes Firebase watchers and in-memory cache on startup; cache contains potential, confirmed, stats, and last_updated fields.
Pattern: src/orchestrators/registry.py automatically initializes TrendOrchestrator on FastAPI startup event (@app.on_event("startup")). Watchers and cache are active before first API request.
Location: src/workflows/trend_detection_workflow.py
Flow: Token Launch Events → LLM Classification → Firebase Write → Watcher Triggered → Cache Updated → API Responds
| Component | Class | Purpose | Key Behavior |
|---|---|---|---|
| Input Source | TokenLaunchSource |
WebSocket connection to pump.fun | Real-time token launch events |
| Processor | TrendDetectionProcessor |
Analyze metadata | LLM classifies into trends, creates/updates trend documents |
| Executor | FirebaseTrendExecutor |
Write to Firebase | Updates potential_trends or confirmed_trends, triggers watchers |
| Orchestrator | TrendOrchestrator |
Reactive cache | Receives watcher events, updates in-memory cache, recalculates stats |
Potential Trends (potential_trends collection):
- Definition: Emerging patterns with 3-9 memecoins
- Purpose: Early detection of new trends
- Promotion Threshold: 10+ memecoins → move to confirmed
Confirmed Trends (confirmed_trends collection):
- Definition: Established patterns with 10+ memecoins
- Purpose: Validated trends with proven traction
- Characteristics: Higher confidence, more stable
Auto-Promotion:
if trend.memecoin_count >= 10 and trend in potential_collection:
# Move to confirmed collection
firebase.create_document("confirmed_trends", trend_id, trend_data)
firebase.delete_document("potential_trends", trend_id)Purpose: Real-time synchronization between Firebase and in-memory cache
def _setup_firebase_watchers(self):
"""Setup reactive listeners for trend data changes"""
# Watch potential trends collection
potential_ref = self.firebase.get_collection("potential_trends")
self.firebase.watch_collection(
collection_ref=potential_ref,
on_snapshot=self._on_potential_trends_update,
watcher_id="potential_trends_watcher"
)
# Watch confirmed trends collection
confirmed_ref = self.firebase.get_collection("confirmed_trends")
self.firebase.watch_collection(
collection_ref=confirmed_ref,
on_snapshot=self._on_confirmed_trends_update,
watcher_id="confirmed_trends_watcher"
)
logger.info("👀 Firebase watchers active for trends collections")def _on_potential_trends_update(self, snapshot):
"""Handle potential trends updates"""
self._update_cache_from_snapshot(snapshot, "potential")
def _on_confirmed_trends_update(self, snapshot):
"""Handle confirmed trends updates"""
self._update_cache_from_snapshot(snapshot, "confirmed")
def _update_cache_from_snapshot(self, snapshot, collection_type):
"""Shared logic for cache updates"""
logger.info(f"📊 {collection_type.title()} trends updated: {len(snapshot)} trends")
self.cache[collection_type] = [self._process_trend_document(doc) for doc in snapshot]
self._update_statistics()
self.cache["last_updated"] = int(time.time())Instant Updates:
- Firebase change → Watcher triggered → Cache updated → API reflects immediately
- Latency: ~100-300ms from Firebase write to API response update
No Polling:
- Before (polling):
while True: fetch_trends(); sleep(5)→ 5-second staleness - After (watchers): Instant updates with zero polling overhead
Resource Efficiency:
- Polling: N requests/second (wasteful, expensive)
- Watchers: Event-driven (minimal overhead, efficient)
Purpose: Ultra-fast API responses (1-20ms) with automatic updates from Firebase watchers
Cache Structure: Contains potential array, confirmed array, and stats object (total/potential/confirmed/hot counts, last_updated timestamp). Each trend includes _id, trend_name, trend_description, memecoin_count, memecoins, timestamps, _collection, and prominence_score.
Access Patterns: All API endpoints read from cache (no Firebase queries). Stats endpoint: ~1-5ms, list/filter: ~5-20ms, detail with images: ~100-500ms.
Purpose: Rank trends by activity and relevance to identify "hot" trends
def calculate_prominence_score(trend: dict) -> float:
"""Calculate prominence score based on activity and recency"""
# 1. Memecoin count factor (base score)
count_factor = min(trend["memecoin_count"] / 50, 1.0) # Cap at 50
# 2. Recency factor (newer trends get boost)
now = int(time.time())
age_days = (now - trend["created_at"]) / (24 * 3600)
recency_factor = math.exp(-age_days / 14) # 14-day half-life
# 3. Update frequency factor (active trends get boost)
if trend["updated_at"] > trend["created_at"]:
update_gap_days = (trend["updated_at"] - trend["created_at"]) / (24 * 3600)
update_factor = min(update_gap_days / 7, 1.0) # Cap at 7 days
else:
update_factor = 0.0
# 4. Combined score (weighted)
prominence_score = (
count_factor * 0.5 + # 50% weight on size
recency_factor * 0.3 + # 30% weight on recency
update_factor * 0.2 # 20% weight on activity
) * 10 # Scale to 0-10
return prominence_scoreSmall Recent Active Trend (8 memecoins, 2 days old, updated 1 day ago):
- count_factor = 8/50 = 0.16, recency_factor = exp(-2/14) = 0.87, update_factor = 1/7 = 0.14
- prominence_score = (0.16 * 0.5 + 0.87 * 0.3 + 0.14 * 0.2) * 10 = 3.49
Definition: Trends with >= 10 memecoins (confirmed trends)
hot_trends = [
trend for trend in all_trends
if trend["memecoin_count"] >= 10
]
# Statistic
stats["hot_trends"] = len(hot_trends)Router: /api/trends
Module: src/web_ui/api/trends_routes.py
Purpose: Get aggregated statistics
Response:
{
"status": 200,
"success": true,
"message": "Statistics retrieved successfully",
"timestamp": "2025-10-16T14:30:00Z",
"total_trends": 42,
"potential_trends": 28,
"confirmed_trends": 14,
"hot_trends": 8,
"last_updated": 1697510000
}Performance: ~1-5ms (pure cache read)
Purpose: List all trends with optional filtering
Query Parameters:
collection: Optional[str] - Filter by 'potential' or 'confirmed'include_examples: int - Number of memecoin examples to include (0-3)
Response:
{
"status": 200,
"success": true,
"message": "Trends retrieved successfully",
"timestamp": "2025-10-16T14:30:00Z",
"total": 2,
"trends": [
{
"_id": "trend_abc123",
"trend_name": "Gaming Nostalgia Tokens",
"trend_description": "Memecoins referencing classic video games",
"memecoin_count": 12,
"memecoins": [], // Or up to 3 examples if include_examples > 0
"created_at": 1697500000,
"updated_at": 1697510000,
"_collection": "potential",
"prominence_score": 8.5
}
]
}Performance: ~5-20ms (cache read with filtering)
Purpose: Get specific trend by ID
Response:
{
"status": 200,
"success": true,
"message": "Trend retrieved successfully",
"timestamp": "2025-10-16T14:30:00Z",
"trend": {
"_id": "trend_abc123",
"trend_name": "Gaming Nostalgia Tokens",
"trend_description": "Memecoins referencing classic video games",
"memecoin_count": 12,
"memecoins": [],
"created_at": 1697500000,
"updated_at": 1697510000,
"_collection": "potential",
"prominence_score": 8.5
}
}Performance: ~1-5ms (cache lookup)
Purpose: Get ALL memecoins from specific trend with Base64 images
Response:
{
"status": 200,
"success": true,
"message": "Trend memecoins retrieved successfully",
"timestamp": "2025-10-16T14:30:00Z",
"trend_id": "trend_abc123",
"trend_name": "Gaming Nostalgia Tokens",
"total": 12,
"images_loaded": 12,
"memecoins": [
{
"token_address": "7xKXtg2CW...",
"token_name": "Mario Coin",
"ticker": "MARIO",
"description": "Classic plumber saves princess",
"image_data_uri": "data:image/jpeg;base64,/9j/4AAQ...",
"image_url": "https://pump.fun/images/abc.png",
"creation_timestamp": 1697500000
}
]
}Performance: ~100-500ms (loads images from filesystem)
Note: For large trends (50+ memecoins), consider client-side caching
Purpose: Get trend detection workflow status
Response:
{
"status": 200,
"success": true,
"message": "Detection status retrieved successfully",
"timestamp": "2025-10-16T14:30:00Z",
"detection_active": true,
"potential_trends": 28,
"confirmed_trends": 14
}Purpose: Toggle trend detection workflow on/off
Response:
{
"status": 200,
"success": true,
"message": "Trend detection enabled",
"timestamp": "2025-10-16T14:30:00Z",
"detection_active": true,
"potential_trends": 28,
"confirmed_trends": 14
}Behavior:
- When Enabled: Runs full trend detection workflow (WebSocket → LLM → Firebase)
- When Disabled: Only Firebase monitoring active (read-only, no new trends)
Use Case: Control expensive LLM API usage for trend classification
import requests
response = requests.get("http://localhost:8000/api/trends/stats")
stats = response.json()
print(f"Total: {stats['total_trends']}, Hot: {stats['hot_trends']}")trend_id = "trend_abc123"
response = requests.get(f"http://localhost:8000/api/trends/detail/{trend_id}")
data = response.json()
print(f"Trend: {data['trend_name']}")
print(f"Total Memecoins: {data['total']}")
# Access memecoin images
for memecoin in data["memecoins"]:
image_data_uri = memecoin["image_data_uri"]
# Display or process...- Use cache-optimized endpoints: Use
/api/trends/stats(1-5ms) for counts instead of fetching and counting all trends - Request only needed examples: Use
?include_examples=3parameter instead of fetching full trend details and filtering - Filter server-side: Use
?collection=confirmedparameter instead of client-side filtering - Cache trend details locally: Cache
/api/trends/detail/{trend_id}responses (100-500ms) to avoid repeated fetches - Monitor detection status: Check
/api/trends/detection/statusto verify trend detection workflow is active
| Component | File Path |
|---|---|
| TrendOrchestrator | src/orchestrators/trends/trend_orchestrator.py |
| Trends API Routes | src/web_ui/api/trends_routes.py |
| Trend Detection Workflow | src/workflows/trend_detection_workflow.py |
| Firebase Service | src/firebase/simple_firebase_service.py |
| Orchestrator Registry | src/orchestrators/registry.py |
Document Status: Complete Last Updated: October 16, 2025 Implementation: Real-time trend tracking with reactive Firebase watchers and in-memory caching