Skip to content

Latest commit

 

History

History
394 lines (320 loc) · 15.9 KB

File metadata and controls

394 lines (320 loc) · 15.9 KB

🔍 Trend Detection Workflow

📋 Document Summary

What This Document Covers:

  • Real-time AI-powered trend classification using Gemini Flash 2.5
  • WebSocket-based token launch monitoring
  • Multimodal trend creation (text + image analysis)
  • Firebase trend storage with automatic promotion (potential → confirmed at 5+ memecoins)
  • TrendOrchestrator integration for Web UI

Sections in This Document:

Related Documentation:

Context Tags: #workflow #trends #real-time #ai-classification #firebase #websocket


Real-time AI-powered trend classification system for token launches with multimodal analysis

📋 Workflow Overview

The Trend Detection Workflow is a production-ready real-time system that monitors new token launches, classifies them into emerging trends using AI-powered analysis, and automatically manages trend lifecycle through Firebase collections. Unlike traditional static classification, this workflow uses multimodal LLM analysis to detect patterns, create new trends dynamically, and track trend evolution over time.

🎯 Core Innovation: AI-Powered Trend Discovery

Traditional Approach: Predefined categories → Static classification → Manual updates Trend Detection Approach: Monitor launches → Detect patterns → Auto-create trends → Track evolution

The workflow uses Gemini Flash 2.5 models for both text-based classification and multimodal trend creation, ensuring high-quality trend detection with minimal latency.

🏗️ Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                     Trend Detection Workflow                             │
│                                                                          │
│  WebSocket Launches → TrendDetectionSource → TrendDetectionProcessor    │
│                                                   ↓                      │
│                                          TrendFirebaseExecutor           │
│                                                   ↓                      │
│                                          Firebase Collections            │
│                        (potential_trends + confirmed_trends)            │
└─────────────────────────────────────────────────────────────────────────┘

Component Flow:
1. TokenLaunchSource (pump.fun WebSocket) → Real-time token launch events
2. TrendDetectionSource (Wrapper) → Enriches events with current trends from Firebase
3. TrendDetectionProcessor (4-stage pipeline) → AI classification and trend creation
4. TrendFirebaseExecutor (Firebase writes) → Dual-collection management with promotion

🔄 4-Stage Processing Pipeline

Stage 1: Trend Metadata Extraction (< 1 second)

  • Process: Extract memecoin from TokenLaunchEvent, download image from pump.fun
  • Input: TokenLaunchEvent (WebSocket data)
  • Output: SimplifiedMemecoin with downloaded image path
  • Failure: Terminates if image download fails or metadata incomplete

Stage 2: Trend Classification (3-5 seconds)

  • Process: LLM-powered classification against current trends using Gemini Flash 2.5 (TEXT model)
  • Input: SimplifiedMemecoin + current_trends list (from TrendDetectionEvent)
  • Output: TrendClassificationAction with matched trend or NO_MATCH signal
  • Classification Logic:
    • Analyzes memecoin name, ticker, and description
    • Compares against existing trend names, descriptions, and memecoins
    • Returns trend_id if strong match found (≥70% confidence)
    • Returns NO_MATCH if unique concept detected
  • Failure: Terminates if LLM service unavailable

Stage 3: New Trend Creation (5-8 seconds, conditional)

  • Process: Multimodal LLM creation using Gemini Flash 2.5 (MULTIMODAL model) - ONLY runs if Stage 2 returns NO_MATCH
  • Input: SimplifiedMemecoin with image for multimodal analysis
  • Output: NewTrendAction with trend_name, description, and reasoning
  • Multimodal Analysis: Uses both text metadata AND image content for trend identification
  • Skipped: If Stage 2 matched existing trend (classification_result.matched = True)
  • Failure: Terminates if multimodal LLM fails or invalid trend format

Stage 4: Trend Action Assembly (< 1 second)

  • Process: Assemble final TrendAction for executor
  • Input: SimplifiedMemecoin + classification_result OR new_trend_result
  • Output: TrendAction with operation type (APPEND_TO_EXISTING or CREATE_NEW_TREND)
  • Routing Logic:
    • APPEND_TO_EXISTING: If classification matched existing trend → append memecoin
    • CREATE_NEW_TREND: If new trend created → create in potential_trends collection
  • Failure: Cannot fail (final assembly stage)

🗄️ Firebase Collections Architecture

potential_trends Collection

Purpose: New and emerging trends awaiting promotion

Document Structure:

{
  "_id": "crying-wojak-despair",           // Slugified trend name
  "trend_name": "Crying Wojak Despair",    // Human-readable name
  "description": "Memes featuring crying Wojak expressing financial despair...",
  "created_at": 1704067200,                // Unix timestamp
  "updated_at": 1704067200,                // Unix timestamp
  "memecoin_count": 3,                     // Number of memecoins in trend
  "memecoins": [                           // Array of simplified memecoins
    {
      "token_name": "Crying Wojak",
      "ticker": "WOJAK",
      "description": "Expressing deep market despair",
      "image_uri": "https://...",
      "mint_address": "8x7k...",
      "metadata_uri": "https://...",
      "twitter": null,
      "telegram": null,
      "website": null
    }
  ],
  "example_tickers": ["WOJAK", "COPE", "BAGS"],  // Quick reference
  "_collection": "potential"                      // Collection marker
}

Promotion Logic: Automatically promoted to confirmed_trends when memecoin_count ≥ 5

confirmed_trends Collection

Purpose: Established trends with ≥5 memecoins

Document Structure: Same as potential_trends with "_collection": "confirmed"

Benefits:

  • High-confidence trends for UI display
  • Used for better classification accuracy (larger pattern database)
  • API filtering support (show only confirmed trends)

🎛️ TrendOrchestrator Integration

The workflow integrates with the TrendOrchestrator for Web UI consumption:

Orchestrator Features

  • Real-time Firebase Watching: Reactive collection watchers for instant cache updates
  • In-Memory Cache: Fast API responses with automatic invalidation
  • Prominence Scoring: Trend ranking based on memecoin_count × recency_factor
  • Dashboard Statistics: Total trends, hot trends (≥10 memecoins), collection breakdowns
  • Conditional Workflow Control: Enable/disable trend detection independently from monitoring

API Integration

# Orchestrator provides cached trend data to Web UI
GET /api/trends/all?collection=confirmed&sort=prominence
GET /api/trends/statistics
GET /api/trends/{trend_id}

# Control trend detection workflow
POST /api/trends/detection/enable
POST /api/trends/detection/disable

🚀 Running the Workflow

Prerequisites

# Required environment variables
export OPENROUTER_API_KEY="your_openrouter_key"  # For Gemini Flash 2.5 access
export FIREBASE_PROJECT_ID="your_project_id"
export FIREBASE_SERVICE_ACCOUNT_PATH="path/to/service-account.json"

Standalone Workflow Execution

# Start trend detection workflow (standalone mode)
PYTHONPATH=. python src/workflows/trend_detection_workflow.py

# Expected output:
# ====================================================================
# 🔍 TREND DETECTION WORKFLOW
# ====================================================================
# Architecture:
#   WebSocket → TrendDetectionSource → TrendDetectionProcessor → TrendFirebaseExecutor
# ...
# 🚀 Workflow started - listening for token launches...
# 🔄 Firebase watchers active (potential_trends + confirmed_trends)

Web UI Integration (Recommended)

# Start Web UI (includes TrendOrchestrator)
PYTHONPATH=. uvicorn src.web_ui.main:app --port 8000 --host 127.0.0.1 --reload

# Access trend tracking interface
http://127.0.0.1:8000/trends.html

# Enable trend detection via API
curl -X POST http://127.0.0.1:8000/api/trends/detection/enable

# Disable trend detection (keeps monitoring, stops workflow)
curl -X POST http://127.0.0.1:8000/api/trends/detection/disable

📊 Execution Paths

Path 1: Existing Trend Match (70% of cases)

1. WebSocket receives new token launch
2. TrendDetectionSource enriches event with current trends (50+ trends)
3. TrendMetadataExtractionStage downloads image
4. TrendClassificationStage matches to existing trend "Crying Wojak Despair"
5. NewTrendCreationStage SKIPPED (match found)
6. TrendActionStage assembles APPEND_TO_EXISTING action
7. TrendFirebaseExecutor appends memecoin to existing trend document
8. If memecoin_count reaches 5 → Auto-promote to confirmed_trends

Path 2: New Trend Creation (30% of cases)

1. WebSocket receives unique token launch
2. TrendDetectionSource enriches event with current trends
3. TrendMetadataExtractionStage downloads image
4. TrendClassificationStage returns NO_MATCH
5. NewTrendCreationStage runs multimodal LLM analysis (text + image)
6. TrendActionStage assembles CREATE_NEW_TREND action
7. TrendFirebaseExecutor creates new document in potential_trends
8. Document starts with memecoin_count = 1

🔧 Configuration

LLM Configuration (src/res/config/trend_llm.yaml)

models:
  text:
    provider: "openrouter"
    model: "google/gemini-flash-2.5"
    temperature: 0.1
    max_tokens: 500
    timeout: 15

  multimodal:
    provider: "openrouter"
    model: "google/gemini-flash-2.5"
    temperature: 0.3
    max_tokens: 800
    timeout: 20
    supports_vision: true

# TEXT model: Fast classification (3-5s, cheap)
# MULTIMODAL model: Trend creation with image analysis (5-8s, moderate cost)

Component Configuration

# Workflow initialization order (dependency-aware)
1. LiteLLMService (trend_llm.yaml)
2. SimpleFirebaseService (auto-initialized)
3. TokenLaunchSource (pump.fun WebSocket)
4. TrendDetectionSource (wraps WebSocket + Firebase)
5. TrendDetectionProcessor (4-stage pipeline)
6. TrendFirebaseExecutor (Firebase writes)
7. DefaultWorkflow (wires components together)

🎯 Use Cases

Real-time Trend Discovery

Monitor the memecoin market for emerging patterns and cultural trends in real-time.

Automated Trend Tracking

Maintain up-to-date trend database without manual curation or category management.

Market Intelligence

Identify hot trends (≥10 memecoins), emerging trends (3-9 memecoins), and nascent concepts (1-2 memecoins).

Content Analysis

Use multimodal analysis to understand both textual and visual memecoin characteristics.

Trend-Based Generation

Feed confirmed trends back into RAG generation workflow for trend-aware memecoin creation.

🔍 Monitoring & Observability

Workflow Logs

# Key log patterns to monitor
2025-01-14 15:30:42 - INFO - 🚀 Starting Trend Detection Workflow...
2025-01-14 15:30:45 - INFO - 📥 New token launch: WOJAK (Crying Wojak)
2025-01-14 15:30:48 - INFO - ✅ Matched existing trend: crying-wojak-despair
2025-01-14 15:30:49 - INFO - 📝 Appended to trend (count: 4 → 5)
2025-01-14 15:30:50 - INFO - 🎉 PROMOTED to confirmed_trends: crying-wojak-despair

Health Check Endpoints (via TrendOrchestrator)

# Orchestrator health
GET /api/health/trends

# Response:
{
  "healthy": true,
  "potential_trends_count": 23,
  "confirmed_trends_count": 12,
  "watchers_active": true,
  "trend_detection_enabled": true,
  "workflow_running": true
}

🚨 Error Handling

Stage Termination

  • Early Termination: Any stage can terminate pipeline via context.terminate_processing()
  • Graceful Failure: Failed processing logged, next token launch processed normally
  • No Cascade: Single token failure doesn't affect workflow availability

Firebase Connection Issues

  • Auto-Reconnect: SimpleFirebaseService handles reconnection automatically
  • Cache Fallback: TrendOrchestrator serves cached data during Firebase downtime
  • Degraded Mode: Classification continues with last-known trends list

LLM Service Failures

  • Classification Timeout: 15s timeout → terminates processing for that token
  • Multimodal Timeout: 20s timeout → terminates trend creation
  • Retry Logic: Not implemented (fail-fast approach for real-time workflow)

📚 Related Documentation

🔄 Workflow Lifecycle

Startup Sequence

1. Validate environment variables (OPENROUTER_API_KEY, FIREBASE_*)
2. Initialize LiteLLMService with trend_llm.yaml
3. Initialize SimpleFirebaseService
4. Create TokenLaunchSource (pump.fun WebSocket)
5. Create TrendDetectionSource (wraps WebSocket + Firebase)
6. Initialize TrendDetectionProcessor (4-stage pipeline)
7. Create TrendFirebaseExecutor
8. Create and initialize DefaultWorkflow
9. Setup signal handlers (SIGINT, SIGTERM)
10. Start workflow indefinitely

Shutdown Sequence

1. Receive shutdown signal (Ctrl+C or SIGTERM)
2. Stop TokenLaunchSource (close WebSocket)
3. Stop TrendDetectionProcessor (await pending stages)
4. Stop TrendFirebaseExecutor (flush pending writes)
5. Close Firebase connections
6. Exit gracefully with status 0

💡 Implementation Notes

Trend ID Generation

Trend IDs are slugified versions of trend names for readable Firebase document IDs:

  • "Crying Wojak Despair" → crying-wojak-despair
  • "Pepe Moon Mission" → pepe-moon-mission

Deduplication Logic

  • No explicit deduplication: TrendClassificationStage performs pattern matching
  • Classification as dedup: Matching to existing trend prevents duplicate trend creation
  • Variation recognition: LLM can recognize variations (e.g., "Sad Wojak" → "Crying Wojak Despair")

Multimodal Analysis

NewTrendCreationStage uses vision-enabled Gemini Flash 2.5 to analyze:

  • Visual style and artistic elements
  • Color schemes and composition
  • Cultural references visible in image
  • Meme format recognition (wojak, pepe, doge, etc.)

Promotion Threshold

  • Magic number: 5 memecoins triggers promotion to confirmed_trends
  • Rationale: 5+ instances indicate established pattern, not random occurrence
  • Configurable: Can be adjusted in TrendFirebaseExecutor if needed

Status: Production-ready Version: 1.0 Last Updated: 2025-01-14